diff --git a/lib/modules/Sql.pmod/pgsql.pike b/lib/modules/Sql.pmod/pgsql.pike index 217da06db4f85d665b6d21a0eaeaea00789007ee..1f57f94452910b8e6011c09cdbc748801a3f0a79 100644 --- a/lib/modules/Sql.pmod/pgsql.pike +++ b/lib/modules/Sql.pmod/pgsql.pike @@ -66,7 +66,7 @@ final Thread.Mutex _unnamedportalmux; private Thread.Mutex unnamedstatement; final int _portalsinflight; -private .pgsql_util.PGassist c; +private .pgsql_util.conxion c; private string cancelsecret; private int backendpid, backendstatus; final mapping(string:mixed) _options; @@ -303,8 +303,8 @@ int ping() { ? !!reconnected : -1; } -private .pgsql_util.PGassist getsocket(void|int nossl) { - return .pgsql_util.PGassist(this,qportals,(int)nossl); +private .pgsql_util.conxion getsocket(void|int nossl) { + return .pgsql_util.conxion(this,qportals,(int)nossl); } //! Cancels all currently running queries in this session. @@ -317,7 +317,7 @@ private .pgsql_util.PGassist getsocket(void|int nossl) { //! through the generic SQL-interface. void cancelquery() { PD("CancelRequest\n"); - .pgsql_util.PGassist lcon=getsocket(1); + .pgsql_util.conxion lcon=getsocket(1); lcon->add_int32(16)->add_int32(PG_PROTOCOL(1234,5678)) ->add_int32(backendpid)->add(cancelsecret)->sendcmd(flushsend); lcon->close(); @@ -325,8 +325,8 @@ void cancelquery() { PD("Closetrace %O\n",backtrace()); #endif if(c) { - .pgsql_util.PGassist plugbuffer=c->start(1); - foreach(qportals->peek_array();;int|.pgsql_util.pgsql_result portal) + .pgsql_util.conxion plugbuffer=c->start(1); + foreach(qportals->peek_array();;int|.pgsql_util.sql_result portal) if(objectp(portal)) portal->_closeportal(plugbuffer); plugbuffer->sendcmd(sendout); @@ -554,7 +554,7 @@ private void reconnect_cb() { runcallback(backendpid,"_reconnect",""); } -private array(string) showbindings(.pgsql_util.pgsql_result portal) { +private array(string) showbindings(.pgsql_util.sql_result portal) { array(string) msgs=({}); array from; if(portal && (from = portal._params)) { @@ -579,7 +579,7 @@ private void preplastmessage(mapping(string:string) msgresponse) { msgresponse.L||"")}); } -private void storetiming(.pgsql_util.pgsql_result portal) { +private void storetiming(.pgsql_util.sql_result portal) { mapping(string:mixed) tp=portal._tprepared; tp.trun=gethrtime()-tp.trunstart; m_delete(tp,"trunstart"); @@ -601,9 +601,9 @@ private inline mixed callout(function(mixed ...:void) f, return .pgsql_util.local_backend->call_out(f,delay,@args); } -final void _processloop(.pgsql_util.PGassist ci) { +final void _processloop(.pgsql_util.conxion ci) { int terminating=0; - int|.pgsql_util.pgsql_result portal; + int|.pgsql_util.sql_result portal; mixed err; { Stdio.Buffer plugbuffer=Stdio.Buffer()->add_int32(PG_PROTOCOL(3,0)); @@ -815,7 +815,7 @@ final void _processloop(.pgsql_util.PGassist ci) { #endif portal->_purgeportal(); } - foreach(qportals->peek_array();;.pgsql_util.pgsql_result qp) { + foreach(qportals->peek_array();;.pgsql_util.sql_result qp) { if(objectp(qp) && qp._synctransact && qp._synctransact<=portal) { PD("Checking portal %O %d<=%d\n", qp._portalname,qp._synctransact,portal); @@ -1225,14 +1225,14 @@ final void _processloop(.pgsql_util.PGassist ci) { ERROR(a2nls(lastmessage+=({msg}))); } }; // We only get here if there is an error - if(err==MAGICTERMINATE) { + if(err==MAGICTERMINATE) { // Announce connection termination to server ci->start()->add("X\0\0\0\4")->sendcmd(sendout); terminating=1; if(!sizeof(ci)) break; } if(stringp(err)) { - .pgsql_util.pgsql_result or; + .pgsql_util.sql_result or; if(!objectp(or=portal)) or=this; if(!or._delayederror) @@ -1584,7 +1584,7 @@ string server_info () { //! If specified, list only those databases matching it. array(string) list_dbs (void|string glob) { array row,ret=({}); - .pgsql_util.pgsql_result res=big_query("SELECT d.datname " + .pgsql_util.sql_result res=big_query("SELECT d.datname " "FROM pg_database d " "WHERE d.datname ILIKE :glob " "ORDER BY d.datname", @@ -1602,7 +1602,7 @@ array(string) list_dbs (void|string glob) { //! If specified, list only the tables with matching names. array(string) list_tables (void|string glob) { array row,ret=({}); // This query might not work on PostgreSQL 7.4 - .pgsql_util.pgsql_result res=big_query( // due to missing schemasupport + .pgsql_util.sql_result res=big_query( // due to missing schemasupport "SELECT CASE WHEN 'public'=n.nspname THEN '' ELSE n.nspname||'.' END " " ||c.relname AS name " "FROM pg_catalog.pg_class c " @@ -1663,7 +1663,7 @@ array(mapping(string:mixed)) list_fields(void|string table, void|string glob) { sscanf(table||"*", "%s.%s", schema, table); - .pgsql_util.pgsql_result res = big_typed_query( + .pgsql_util.sql_result res = big_typed_query( "SELECT a.attname, a.atttypid, t.typname, a.attlen, " " c.relhasindex, c.relhaspkey, CAST(c.reltuples AS BIGINT) AS reltuples, " " (c.relpages " @@ -1770,7 +1770,7 @@ final string status_commit() { } private inline void closestatement( - .pgsql_util.PGassist|.pgsql_util.PGplugbuffer plugbuffer,string oldprep) { + .pgsql_util.conxion|.pgsql_util.bufcon plugbuffer,string oldprep) { .pgsql_util.closestatement(plugbuffer,oldprep); } @@ -1782,8 +1782,8 @@ private inline void throwdelayederror(object parent) { .pgsql_util.throwdelayederror(parent); } -//! @decl Sql.pgsql_util.pgsql_result big_query(string query) -//! @decl Sql.pgsql_util.pgsql_result big_query(string query, mapping bindings) +//! @decl Sql.pgsql_util.sql_result big_query(string query) +//! @decl Sql.pgsql_util.sql_result big_query(string query, mapping bindings) //! //! This is the only provided interface which allows you to query the //! database. If you wish to use the simpler @[Sql.Sql()->query()] function, @@ -1808,7 +1808,7 @@ private inline void throwdelayederror(object parent) { //! @endmapping //! //! @returns -//! A @[Sql.pgsql_util.pgsql_result] object (which conforms to the +//! A @[Sql.pgsql_util.sql_result] object (which conforms to the //! @[Sql.sql_result] standard interface for accessing data). It is //! recommended to use @[Sql.Sql()->query()] for simpler queries (because //! it is easier to handle, but stores all the result in memory), and @@ -1834,8 +1834,8 @@ private inline void throwdelayederror(object parent) { //! //! @seealso //! @[big_typed_query()], @[Sql.Sql], @[Sql.sql_result], -//! @[Sql.Sql()->query()], @[Sql.pgsql_util.pgsql_result] -.pgsql_util.pgsql_result big_query(string q, +//! @[Sql.Sql()->query()], @[Sql.pgsql_util.sql_result] +.pgsql_util.sql_result big_query(string q, void|mapping(string|int:mixed) bindings, void|int _alltyped) { throwdelayederror(this); @@ -1952,10 +1952,10 @@ private inline void throwdelayederror(object parent) { } else plugbuffer->sendcmd(); // close start() tstart=gethrtime(); - } else // pgsql_result autoassigns to portal + } else // sql_result autoassigns to portal tp=UNDEFINED; - .pgsql_util.pgsql_result portal; - portal=.pgsql_util.pgsql_result(this,c,q, + .pgsql_util.sql_result portal; + portal=.pgsql_util.sql_result(this,c,q, portalbuffersize,_alltyped,from,forcetext); portal._tprepared=tp; #ifdef PG_STATS @@ -2018,7 +2018,7 @@ private inline void throwdelayederror(object parent) { //! //! @seealso //! @[big_query()], @[big_typed_query()], @[Sql.Sql], @[Sql.sql_result] -.pgsql_util.pgsql_result streaming_query(string q, +.pgsql_util.sql_result streaming_query(string q, void|mapping(string|int:mixed) bindings) { return big_query(q,bindings); } @@ -2028,7 +2028,7 @@ private inline void throwdelayederror(object parent) { //! //! @seealso //! @[big_query()], @[Sql.Sql], @[Sql.sql_result] -.pgsql_util.pgsql_result big_typed_query(string q, +.pgsql_util.sql_result big_typed_query(string q, void|mapping(string|int:mixed) bindings) { return big_query(q,bindings,1); } diff --git a/lib/modules/Sql.pmod/pgsql_util.pmod b/lib/modules/Sql.pmod/pgsql_util.pmod index 9ca7a5f176a1aaaf514fc3ca2e8a95ff36084bba..35cd8eaa86aa89101941c3a84259e887283c0957 100644 --- a/lib/modules/Sql.pmod/pgsql_util.pmod +++ b/lib/modules/Sql.pmod/pgsql_util.pmod @@ -4,7 +4,9 @@ */ //! The pgsql backend, shared between all connection instances. -//! It runs even in non-callback mode in a separate thread. +//! It runs even in non-callback mode in a separate thread and makes sure +//! that communication with the database is real-time and event driven +//! at all times. //! //! @note //! Callbacks running from this backend directly determine the latency @@ -34,7 +36,7 @@ private Regexp execfetchlimit [Ii][Nn][Ss][Ee][Rr][Tt])[ \t\f\r\n]|\ [ \t\f\r\n][Ll][Ii][Mm][Ii][Tt][ \t\f\r\n]+[12][; \t\f\r\n]*$"); -final void closestatement(PGplugbuffer|PGassist plugbuffer,string oldprep) { +final void closestatement(bufcon|conxion plugbuffer,string oldprep) { if(oldprep) { PD("Close statement %s\n",oldprep); plugbuffer->add_int8('C')->add_hstring(({'S',oldprep,0}),4,4); @@ -100,7 +102,7 @@ final int oidformat(int oid) { return 0; // text } -private sctype mergemode(PGassist realbuffer,sctype mode) { +private sctype mergemode(conxion realbuffer,sctype mode) { if(mode>realbuffer->stashflushmode) realbuffer->stashflushmode=mode; return realbuffer->stashflushmode; @@ -113,25 +115,25 @@ private inline mixed callout(function(mixed ...:void) f, // Some pgsql utility functions -class PGplugbuffer { +class bufcon { inherit Stdio.Buffer; - private PGassist realbuffer; + private conxion realbuffer; - protected void create(PGassist _realbuffer) { + protected void create(conxion _realbuffer) { realbuffer=_realbuffer; } - final PGplugbuffer start(void|int waitforreal) { + final bufcon start(void|int waitforreal) { realbuffer->stashcount++; #ifdef PG_DEBUG if(waitforreal) - error("pgsql.PGplugbuffer not allowed here\n"); + error("pgsql.bufcon not allowed here\n"); #endif return this; } - final void sendcmd(void|sctype mode,void|pgsql_result portal) { + final void sendcmd(void|sctype mode,void|sql_result portal) { Thread.MutexKey lock=realbuffer->stashupdate->lock(); if(portal) realbuffer->stashqueue->write(portal); @@ -149,7 +151,7 @@ class PGplugbuffer { } -class PGassist { +class conxion { inherit Stdio.Buffer:i; inherit Stdio.Buffer:o; @@ -176,13 +178,13 @@ class PGassist { final int queueinidx=-1; #endif - private inline void queueup(pgsql_result portal) { + private inline void queueup(sql_result portal) { qportals->write(portal); portal->_synctransact=synctransact; PD(">%O %d %d Queue portal %d bytes\n",portal._portalname,++queueoutidx, synctransact,sizeof(this)); } - final PGassist|PGplugbuffer start(void|int waitforreal) { + final conxion|bufcon start(void|int waitforreal) { Thread.MutexKey lock; if(lock=(waitforreal?nostash->lock:nostash->trylock)(1)) { started=lock; @@ -190,13 +192,13 @@ class PGassist { if(stashcount) stashavail.wait(lock); add(stash); stash->clear(); - foreach(stashqueue->try_read_array();;pgsql_result portal) + foreach(stashqueue->try_read_array();;sql_result portal) queueup(portal); lock=0; return this; } stashcount++; - return PGplugbuffer(this); + return bufcon(this); } protected bool range_error(int howmuch) { @@ -242,7 +244,7 @@ class PGassist { final inline int read_int32() { return i::read_int32(); } final inline string read_cstring() { return i::read_cstring(); } - final void sendcmd(void|sctype mode,void|pgsql_result portal) { + final void sendcmd(void|sctype mode,void|sql_result portal) { if(portal) queueup(portal); nosync: @@ -266,7 +268,7 @@ nosync: Thread.MutexKey lock=stashupdate->lock(); if(sizeof(stash)) { add(stash); stash->clear(); - foreach(stashqueue->try_read_array();;pgsql_result portal) + foreach(stashqueue->try_read_array();;sql_result portal) queueup(portal); } mode=mergemode(this,mode); @@ -360,7 +362,7 @@ outer: string res=UNDEFINED; switch(type) { case 'O': - res=predef::sprintf("PGassist fd: %d input queue: %d/%d " + res=predef::sprintf("conxion fd: %d input queue: %d/%d " "queued portals: %d output queue: %d/%d\n", socket&&socket->query_fd(), sizeof(i::this),i::_size_object(), @@ -394,12 +396,12 @@ outer: //! //! @seealso //! @[Sql.sql_result], @[Sql.pgsql], @[Sql.Sql], @[Sql.pgsql()->big_query()] -class pgsql_result { +class sql_result { private object pgsqlsess; private int numrows; private int eoffound; - private PGassist c; + private conxion c; final mixed _delayederror; final portalstate _state; final int _fetchlimit; @@ -433,7 +435,7 @@ class pgsql_result { string res=UNDEFINED; switch(type) { case 'O': - res=sprintf("pgsql_result numrows: %d eof: %d inflight: %d\n" + res=sprintf("sql_result numrows: %d eof: %d inflight: %d\n" "query: %O\n" "portalname: %O datarows: %d" " laststatus: %s\n", @@ -446,7 +448,7 @@ class pgsql_result { return res; } - protected void create(object _pgsqlsess,PGassist _c,string query, + protected void create(object _pgsqlsess,conxion _c,string query, int portalbuffersize,int alltyped,array params,int forcetext) { pgsqlsess = _pgsqlsess; c = _c; @@ -722,7 +724,7 @@ class pgsql_result { PD("Bind portal %O statement %O\n",_portalname,_preparedname); _fetchlimit=pgsqlsess->_fetchlimit; _openportal(); - PGassist bindbuffer=c->start(1); + conxion bindbuffer=c->start(1); _unnamedstatementkey=0; bindbuffer->add_int8('B')->add_hstring(plugbuffer,4,4); if(!_tprepared) @@ -757,7 +759,7 @@ class pgsql_result { releaseconditions(); } - final sctype _closeportal(PGplugbuffer plugbuffer) { + final sctype _closeportal(bufcon plugbuffer) { sctype retval=keep; PD("%O Try Closeportal %d\n",_portalname,_state); Thread.MutexKey lock=closemux->lock(); @@ -838,7 +840,7 @@ class pgsql_result { final void _releasesession() { _inflight=0; _datarows->write(1); // Signal EOF - PGassist plugbuffer=c->start(1); + conxion plugbuffer=c->start(1); plugbuffer->sendcmd(_closeportal(plugbuffer)); releaseconditions(); } @@ -849,7 +851,7 @@ class pgsql_result { }; } - final void _sendexecute(int fetchlimit,void|PGplugbuffer plugbuffer) { + final void _sendexecute(int fetchlimit,void|bufcon plugbuffer) { int flushmode; PD("Execute portal %O fetchlimit %d\n",_portalname,fetchlimit); if(!plugbuffer) @@ -931,7 +933,7 @@ class pgsql_result { } private void run_result_cb( - function(pgsql_result, array(mixed), mixed ...:void) callback, + function(sql_result, array(mixed), mixed ...:void) callback, array(mixed) args) { int|array datarow; while(arrayp(datarow=_datarows->read_array())) @@ -948,14 +950,14 @@ class pgsql_result { //! @seealso //! @[fetch_row()] void set_result_callback( - function(pgsql_result, array(mixed), mixed ...:void) callback, + function(sql_result, array(mixed), mixed ...:void) callback, mixed ... args) { if(callback) Thread.Thread(run_result_cb,callback,args); } private void run_result_array_cb( - function(pgsql_result, array(array(mixed)), mixed ...:void) callback, + function(sql_result, array(array(mixed)), mixed ...:void) callback, array(mixed) args) { array(array|int) datarow; while((datarow=_datarows->read_array()) && arrayp(datarow[-1])) @@ -974,7 +976,7 @@ class pgsql_result { //! @seealso //! @[fetch_row()] void set_result_array_callback( - function(pgsql_result, array(array(mixed)), mixed ...:void) callback, + function(sql_result, array(array(mixed)), mixed ...:void) callback, mixed ... args) { if(callback) Thread.Thread(run_result_array_cb,callback,args);