Skip to content
Snippets Groups Projects
Commit 3b31faea authored by Stephen R. van den Berg's avatar Stephen R. van den Berg
Browse files

pgsql: Cosmetics.

parent e302b667
No related branches found
No related tags found
No related merge requests found
......@@ -66,7 +66,7 @@ final Thread.Mutex _unnamedportalmux;
private Thread.Mutex unnamedstatement;
final int _portalsinflight;
private .pgsql_util.PGassist c;
private .pgsql_util.conxion c;
private string cancelsecret;
private int backendpid, backendstatus;
final mapping(string:mixed) _options;
......@@ -303,8 +303,8 @@ int ping() {
? !!reconnected : -1;
}
private .pgsql_util.PGassist getsocket(void|int nossl) {
return .pgsql_util.PGassist(this,qportals,(int)nossl);
private .pgsql_util.conxion getsocket(void|int nossl) {
return .pgsql_util.conxion(this,qportals,(int)nossl);
}
//! Cancels all currently running queries in this session.
......@@ -317,7 +317,7 @@ private .pgsql_util.PGassist getsocket(void|int nossl) {
//! through the generic SQL-interface.
void cancelquery() {
PD("CancelRequest\n");
.pgsql_util.PGassist lcon=getsocket(1);
.pgsql_util.conxion lcon=getsocket(1);
lcon->add_int32(16)->add_int32(PG_PROTOCOL(1234,5678))
->add_int32(backendpid)->add(cancelsecret)->sendcmd(flushsend);
lcon->close();
......@@ -325,8 +325,8 @@ void cancelquery() {
PD("Closetrace %O\n",backtrace());
#endif
if(c) {
.pgsql_util.PGassist plugbuffer=c->start(1);
foreach(qportals->peek_array();;int|.pgsql_util.pgsql_result portal)
.pgsql_util.conxion plugbuffer=c->start(1);
foreach(qportals->peek_array();;int|.pgsql_util.sql_result portal)
if(objectp(portal))
portal->_closeportal(plugbuffer);
plugbuffer->sendcmd(sendout);
......@@ -554,7 +554,7 @@ private void reconnect_cb() {
runcallback(backendpid,"_reconnect","");
}
private array(string) showbindings(.pgsql_util.pgsql_result portal) {
private array(string) showbindings(.pgsql_util.sql_result portal) {
array(string) msgs=({});
array from;
if(portal && (from = portal._params)) {
......@@ -579,7 +579,7 @@ private void preplastmessage(mapping(string:string) msgresponse) {
msgresponse.L||"")});
}
private void storetiming(.pgsql_util.pgsql_result portal) {
private void storetiming(.pgsql_util.sql_result portal) {
mapping(string:mixed) tp=portal._tprepared;
tp.trun=gethrtime()-tp.trunstart;
m_delete(tp,"trunstart");
......@@ -601,9 +601,9 @@ private inline mixed callout(function(mixed ...:void) f,
return .pgsql_util.local_backend->call_out(f,delay,@args);
}
final void _processloop(.pgsql_util.PGassist ci) {
final void _processloop(.pgsql_util.conxion ci) {
int terminating=0;
int|.pgsql_util.pgsql_result portal;
int|.pgsql_util.sql_result portal;
mixed err;
{
Stdio.Buffer plugbuffer=Stdio.Buffer()->add_int32(PG_PROTOCOL(3,0));
......@@ -815,7 +815,7 @@ final void _processloop(.pgsql_util.PGassist ci) {
#endif
portal->_purgeportal();
}
foreach(qportals->peek_array();;.pgsql_util.pgsql_result qp) {
foreach(qportals->peek_array();;.pgsql_util.sql_result qp) {
if(objectp(qp) && qp._synctransact && qp._synctransact<=portal) {
PD("Checking portal %O %d<=%d\n",
qp._portalname,qp._synctransact,portal);
......@@ -1225,14 +1225,14 @@ final void _processloop(.pgsql_util.PGassist ci) {
ERROR(a2nls(lastmessage+=({msg})));
}
}; // We only get here if there is an error
if(err==MAGICTERMINATE) {
if(err==MAGICTERMINATE) { // Announce connection termination to server
ci->start()->add("X\0\0\0\4")->sendcmd(sendout);
terminating=1;
if(!sizeof(ci))
break;
}
if(stringp(err)) {
.pgsql_util.pgsql_result or;
.pgsql_util.sql_result or;
if(!objectp(or=portal))
or=this;
if(!or._delayederror)
......@@ -1584,7 +1584,7 @@ string server_info () {
//! If specified, list only those databases matching it.
array(string) list_dbs (void|string glob) {
array row,ret=({});
.pgsql_util.pgsql_result res=big_query("SELECT d.datname "
.pgsql_util.sql_result res=big_query("SELECT d.datname "
"FROM pg_database d "
"WHERE d.datname ILIKE :glob "
"ORDER BY d.datname",
......@@ -1602,7 +1602,7 @@ array(string) list_dbs (void|string glob) {
//! If specified, list only the tables with matching names.
array(string) list_tables (void|string glob) {
array row,ret=({}); // This query might not work on PostgreSQL 7.4
.pgsql_util.pgsql_result res=big_query( // due to missing schemasupport
.pgsql_util.sql_result res=big_query( // due to missing schemasupport
"SELECT CASE WHEN 'public'=n.nspname THEN '' ELSE n.nspname||'.' END "
" ||c.relname AS name "
"FROM pg_catalog.pg_class c "
......@@ -1663,7 +1663,7 @@ array(mapping(string:mixed)) list_fields(void|string table, void|string glob) {
sscanf(table||"*", "%s.%s", schema, table);
.pgsql_util.pgsql_result res = big_typed_query(
.pgsql_util.sql_result res = big_typed_query(
"SELECT a.attname, a.atttypid, t.typname, a.attlen, "
" c.relhasindex, c.relhaspkey, CAST(c.reltuples AS BIGINT) AS reltuples, "
" (c.relpages "
......@@ -1770,7 +1770,7 @@ final string status_commit() {
}
private inline void closestatement(
.pgsql_util.PGassist|.pgsql_util.PGplugbuffer plugbuffer,string oldprep) {
.pgsql_util.conxion|.pgsql_util.bufcon plugbuffer,string oldprep) {
.pgsql_util.closestatement(plugbuffer,oldprep);
}
......@@ -1782,8 +1782,8 @@ private inline void throwdelayederror(object parent) {
.pgsql_util.throwdelayederror(parent);
}
//! @decl Sql.pgsql_util.pgsql_result big_query(string query)
//! @decl Sql.pgsql_util.pgsql_result big_query(string query, mapping bindings)
//! @decl Sql.pgsql_util.sql_result big_query(string query)
//! @decl Sql.pgsql_util.sql_result big_query(string query, mapping bindings)
//!
//! This is the only provided interface which allows you to query the
//! database. If you wish to use the simpler @[Sql.Sql()->query()] function,
......@@ -1808,7 +1808,7 @@ private inline void throwdelayederror(object parent) {
//! @endmapping
//!
//! @returns
//! A @[Sql.pgsql_util.pgsql_result] object (which conforms to the
//! A @[Sql.pgsql_util.sql_result] object (which conforms to the
//! @[Sql.sql_result] standard interface for accessing data). It is
//! recommended to use @[Sql.Sql()->query()] for simpler queries (because
//! it is easier to handle, but stores all the result in memory), and
......@@ -1834,8 +1834,8 @@ private inline void throwdelayederror(object parent) {
//!
//! @seealso
//! @[big_typed_query()], @[Sql.Sql], @[Sql.sql_result],
//! @[Sql.Sql()->query()], @[Sql.pgsql_util.pgsql_result]
.pgsql_util.pgsql_result big_query(string q,
//! @[Sql.Sql()->query()], @[Sql.pgsql_util.sql_result]
.pgsql_util.sql_result big_query(string q,
void|mapping(string|int:mixed) bindings,
void|int _alltyped) {
throwdelayederror(this);
......@@ -1952,10 +1952,10 @@ private inline void throwdelayederror(object parent) {
} else
plugbuffer->sendcmd(); // close start()
tstart=gethrtime();
} else // pgsql_result autoassigns to portal
} else // sql_result autoassigns to portal
tp=UNDEFINED;
.pgsql_util.pgsql_result portal;
portal=.pgsql_util.pgsql_result(this,c,q,
.pgsql_util.sql_result portal;
portal=.pgsql_util.sql_result(this,c,q,
portalbuffersize,_alltyped,from,forcetext);
portal._tprepared=tp;
#ifdef PG_STATS
......@@ -2018,7 +2018,7 @@ private inline void throwdelayederror(object parent) {
//!
//! @seealso
//! @[big_query()], @[big_typed_query()], @[Sql.Sql], @[Sql.sql_result]
.pgsql_util.pgsql_result streaming_query(string q,
.pgsql_util.sql_result streaming_query(string q,
void|mapping(string|int:mixed) bindings) {
return big_query(q,bindings);
}
......@@ -2028,7 +2028,7 @@ private inline void throwdelayederror(object parent) {
//!
//! @seealso
//! @[big_query()], @[Sql.Sql], @[Sql.sql_result]
.pgsql_util.pgsql_result big_typed_query(string q,
.pgsql_util.sql_result big_typed_query(string q,
void|mapping(string|int:mixed) bindings) {
return big_query(q,bindings,1);
}
......@@ -4,7 +4,9 @@
*/
//! The pgsql backend, shared between all connection instances.
//! It runs even in non-callback mode in a separate thread.
//! It runs even in non-callback mode in a separate thread and makes sure
//! that communication with the database is real-time and event driven
//! at all times.
//!
//! @note
//! Callbacks running from this backend directly determine the latency
......@@ -34,7 +36,7 @@ private Regexp execfetchlimit
[Ii][Nn][Ss][Ee][Rr][Tt])[ \t\f\r\n]|\
[ \t\f\r\n][Ll][Ii][Mm][Ii][Tt][ \t\f\r\n]+[12][; \t\f\r\n]*$");
final void closestatement(PGplugbuffer|PGassist plugbuffer,string oldprep) {
final void closestatement(bufcon|conxion plugbuffer,string oldprep) {
if(oldprep) {
PD("Close statement %s\n",oldprep);
plugbuffer->add_int8('C')->add_hstring(({'S',oldprep,0}),4,4);
......@@ -100,7 +102,7 @@ final int oidformat(int oid) {
return 0; // text
}
private sctype mergemode(PGassist realbuffer,sctype mode) {
private sctype mergemode(conxion realbuffer,sctype mode) {
if(mode>realbuffer->stashflushmode)
realbuffer->stashflushmode=mode;
return realbuffer->stashflushmode;
......@@ -113,25 +115,25 @@ private inline mixed callout(function(mixed ...:void) f,
// Some pgsql utility functions
class PGplugbuffer {
class bufcon {
inherit Stdio.Buffer;
private PGassist realbuffer;
private conxion realbuffer;
protected void create(PGassist _realbuffer) {
protected void create(conxion _realbuffer) {
realbuffer=_realbuffer;
}
final PGplugbuffer start(void|int waitforreal) {
final bufcon start(void|int waitforreal) {
realbuffer->stashcount++;
#ifdef PG_DEBUG
if(waitforreal)
error("pgsql.PGplugbuffer not allowed here\n");
error("pgsql.bufcon not allowed here\n");
#endif
return this;
}
final void sendcmd(void|sctype mode,void|pgsql_result portal) {
final void sendcmd(void|sctype mode,void|sql_result portal) {
Thread.MutexKey lock=realbuffer->stashupdate->lock();
if(portal)
realbuffer->stashqueue->write(portal);
......@@ -149,7 +151,7 @@ class PGplugbuffer {
}
class PGassist {
class conxion {
inherit Stdio.Buffer:i;
inherit Stdio.Buffer:o;
......@@ -176,13 +178,13 @@ class PGassist {
final int queueinidx=-1;
#endif
private inline void queueup(pgsql_result portal) {
private inline void queueup(sql_result portal) {
qportals->write(portal); portal->_synctransact=synctransact;
PD(">%O %d %d Queue portal %d bytes\n",portal._portalname,++queueoutidx,
synctransact,sizeof(this));
}
final PGassist|PGplugbuffer start(void|int waitforreal) {
final conxion|bufcon start(void|int waitforreal) {
Thread.MutexKey lock;
if(lock=(waitforreal?nostash->lock:nostash->trylock)(1)) {
started=lock;
......@@ -190,13 +192,13 @@ class PGassist {
if(stashcount)
stashavail.wait(lock);
add(stash); stash->clear();
foreach(stashqueue->try_read_array();;pgsql_result portal)
foreach(stashqueue->try_read_array();;sql_result portal)
queueup(portal);
lock=0;
return this;
}
stashcount++;
return PGplugbuffer(this);
return bufcon(this);
}
protected bool range_error(int howmuch) {
......@@ -242,7 +244,7 @@ class PGassist {
final inline int read_int32() { return i::read_int32(); }
final inline string read_cstring() { return i::read_cstring(); }
final void sendcmd(void|sctype mode,void|pgsql_result portal) {
final void sendcmd(void|sctype mode,void|sql_result portal) {
if(portal)
queueup(portal);
nosync:
......@@ -266,7 +268,7 @@ nosync:
Thread.MutexKey lock=stashupdate->lock();
if(sizeof(stash)) {
add(stash); stash->clear();
foreach(stashqueue->try_read_array();;pgsql_result portal)
foreach(stashqueue->try_read_array();;sql_result portal)
queueup(portal);
}
mode=mergemode(this,mode);
......@@ -360,7 +362,7 @@ outer:
string res=UNDEFINED;
switch(type) {
case 'O':
res=predef::sprintf("PGassist fd: %d input queue: %d/%d "
res=predef::sprintf("conxion fd: %d input queue: %d/%d "
"queued portals: %d output queue: %d/%d\n",
socket&&socket->query_fd(),
sizeof(i::this),i::_size_object(),
......@@ -394,12 +396,12 @@ outer:
//!
//! @seealso
//! @[Sql.sql_result], @[Sql.pgsql], @[Sql.Sql], @[Sql.pgsql()->big_query()]
class pgsql_result {
class sql_result {
private object pgsqlsess;
private int numrows;
private int eoffound;
private PGassist c;
private conxion c;
final mixed _delayederror;
final portalstate _state;
final int _fetchlimit;
......@@ -433,7 +435,7 @@ class pgsql_result {
string res=UNDEFINED;
switch(type) {
case 'O':
res=sprintf("pgsql_result numrows: %d eof: %d inflight: %d\n"
res=sprintf("sql_result numrows: %d eof: %d inflight: %d\n"
"query: %O\n"
"portalname: %O datarows: %d"
" laststatus: %s\n",
......@@ -446,7 +448,7 @@ class pgsql_result {
return res;
}
protected void create(object _pgsqlsess,PGassist _c,string query,
protected void create(object _pgsqlsess,conxion _c,string query,
int portalbuffersize,int alltyped,array params,int forcetext) {
pgsqlsess = _pgsqlsess;
c = _c;
......@@ -722,7 +724,7 @@ class pgsql_result {
PD("Bind portal %O statement %O\n",_portalname,_preparedname);
_fetchlimit=pgsqlsess->_fetchlimit;
_openportal();
PGassist bindbuffer=c->start(1);
conxion bindbuffer=c->start(1);
_unnamedstatementkey=0;
bindbuffer->add_int8('B')->add_hstring(plugbuffer,4,4);
if(!_tprepared)
......@@ -757,7 +759,7 @@ class pgsql_result {
releaseconditions();
}
final sctype _closeportal(PGplugbuffer plugbuffer) {
final sctype _closeportal(bufcon plugbuffer) {
sctype retval=keep;
PD("%O Try Closeportal %d\n",_portalname,_state);
Thread.MutexKey lock=closemux->lock();
......@@ -838,7 +840,7 @@ class pgsql_result {
final void _releasesession() {
_inflight=0;
_datarows->write(1); // Signal EOF
PGassist plugbuffer=c->start(1);
conxion plugbuffer=c->start(1);
plugbuffer->sendcmd(_closeportal(plugbuffer));
releaseconditions();
}
......@@ -849,7 +851,7 @@ class pgsql_result {
};
}
final void _sendexecute(int fetchlimit,void|PGplugbuffer plugbuffer) {
final void _sendexecute(int fetchlimit,void|bufcon plugbuffer) {
int flushmode;
PD("Execute portal %O fetchlimit %d\n",_portalname,fetchlimit);
if(!plugbuffer)
......@@ -931,7 +933,7 @@ class pgsql_result {
}
private void run_result_cb(
function(pgsql_result, array(mixed), mixed ...:void) callback,
function(sql_result, array(mixed), mixed ...:void) callback,
array(mixed) args) {
int|array datarow;
while(arrayp(datarow=_datarows->read_array()))
......@@ -948,14 +950,14 @@ class pgsql_result {
//! @seealso
//! @[fetch_row()]
void set_result_callback(
function(pgsql_result, array(mixed), mixed ...:void) callback,
function(sql_result, array(mixed), mixed ...:void) callback,
mixed ... args) {
if(callback)
Thread.Thread(run_result_cb,callback,args);
}
private void run_result_array_cb(
function(pgsql_result, array(array(mixed)), mixed ...:void) callback,
function(sql_result, array(array(mixed)), mixed ...:void) callback,
array(mixed) args) {
array(array|int) datarow;
while((datarow=_datarows->read_array()) && arrayp(datarow[-1]))
......@@ -974,7 +976,7 @@ class pgsql_result {
//! @seealso
//! @[fetch_row()]
void set_result_array_callback(
function(pgsql_result, array(array(mixed)), mixed ...:void) callback,
function(sql_result, array(array(mixed)), mixed ...:void) callback,
mixed ... args) {
if(callback)
Thread.Thread(run_result_array_cb,callback,args);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment