/* * Some pgsql utility functions. * They are kept here to avoid circular references. */ //! The pgsql backend, shared between all connection instances. //! It runs even in non-callback mode in a separate thread and makes sure //! that communication with the database is real-time and event driven //! at all times. //! //! @note //! Callbacks running from this backend directly determine the latency //! in reacting to communication with the database server; so it //! would be prudent not to block in these callbacks. #pike __REAL_VERSION__ #require constant(Thread.Thread) #include "pgsql.h" //! The instance of the pgsql dedicated backend. final Pike.Backend local_backend = Pike.SmallBackend(); private Thread.Mutex backendmux = Thread.Mutex(); private int clientsregistered; constant emptyarray = ({}); constant describenodata = (["datarowdesc":emptyarray, "datarowtypes":emptyarray, "datatypeoid":emptyarray]); final multiset censoroptions=(<"use_ssl","force_ssl", "cache_autoprepared_statements","reconnect","text_query","is_superuser", "server_encoding","server_version","integer_datetimes", "session_authorization">); constant stdiobuftype = typeof(Stdio.Buffer()); /* Statements matching createprefix cause the prepared statement cache * to be flushed to prevent stale references to (temporary) tables */ final Regexp createprefix=iregexp("^\a*(CREATE|DROP)\a"); /* Statements matching dontcacheprefix never enter the cache */ private Regexp dontcacheprefix=iregexp("^\a*(FETCH|COPY)\a"); /* Statements not matching paralleliseprefix will cause the driver * to stall submission until all previously started statements have * run to completion */ private Regexp paralleliseprefix =iregexp("^\a*((SELEC|INSER)T|(UPDA|DELE)TE|(FETC|WIT)H)\a"); /* Statements matching transbeginprefix will cause the driver * insert a sync after the statement. * Failure to do so, will result in portal synchronisation errors * in the event of an ErrorResponse. */ final Regexp transbeginprefix =iregexp("^\a*(BEGIN|START)([; \t\f\r\n]|$)"); /* Statements matching transendprefix will cause the driver * insert a sync after the statement. * Failure to do so, will result in portal synchronisation errors * in the event of an ErrorResponse. */ final Regexp transendprefix =iregexp("^\a*(COMMIT|ROLLBACK|END)([; \t\f\r\n]|$)"); /* For statements matching execfetchlimit the resultrows will not be * fetched in pieces */ private Regexp execfetchlimit =iregexp("^\a*((UPDA|DELE)TE|INSERT)\a|\aLIMIT\a+[1-9][; \t\f\r\n]*$"); private Regexp iregexp(string expr) { Stdio.Buffer ret=Stdio.Buffer(); foreach(expr;;int c) if(c>='A'&&c<='Z') ret->add('[',c,c+'a'-'A',']'); else if(c=='\a') // Replace with generic whitespace ret->add("[ \t\f\r\n]"); else ret->add_int8(c); return Regexp(ret->read()); } final void closestatement(bufcon|conxsess plugbuffer,string oldprep) { if(oldprep) { PD("Close statement %s\n",oldprep); CHAIN(plugbuffer)->add_int8('C')->add_hstring(({'S', oldprep, 0}), 4, 4); } } private void run_local_backend() { Thread.MutexKey lock; int looponce; do { looponce=0; if(lock=backendmux->trylock()) { PD("Starting local backend\n"); while (clientsregistered) { // Autoterminate when not needed mixed err; if (err = catch(local_backend(4096.0))) werror(describe_backtrace(err)); } PD("Terminating local backend\n"); lock=0; looponce=clientsregistered; } } while(looponce); } //! Registers yourself as a user of this backend. If the backend //! has not been started yet, it will be spawned automatically. final void register_backend() { if(!clientsregistered++) Thread.Thread(run_local_backend); } //! Unregisters yourself as a user of this backend. If there are //! no longer any registered users, the backend will be terminated. final void unregister_backend() { --clientsregistered; } final void throwdelayederror(object parent) { if(mixed err=parent._delayederror) { parent._delayederror=UNDEFINED; if(stringp(err)) err=({err,backtrace()[..<2]}); throw(err); } } final int oidformat(int oid) { switch(oid) { case BOOLOID: case BYTEAOID: case CHAROID: case INT8OID: case INT2OID: case INT4OID: case TEXTOID: case OIDOID: case XMLOID: case MACADDROID: case BPCHAROID: case VARCHAROID: case CTIDOID: case UUIDOID: return 1; //binary } return 0; // text } private inline mixed callout(function(mixed ...:void) f, float|int delay,mixed ... args) { return local_backend->call_out(f,delay,@args); } // Some pgsql utility functions class bufcon { inherit Stdio.Buffer; #ifdef PG_DEBUGRACE final bufcon `chain() { return this; } #endif private conxion realbuffer; protected void create(conxion _realbuffer) { realbuffer=_realbuffer; } final int `stashcount() { return realbuffer->stashcount; } final bufcon start(void|int waitforreal) { realbuffer->stashcount++; #ifdef PG_DEBUG if(waitforreal) error("pgsql.bufcon not allowed here\n"); #endif return this; } final void sendcmd(int mode,void|sql_result portal) { Thread.MutexKey lock=realbuffer->shortmux->lock(); if (portal) realbuffer->stashqueue->write(portal); if (mode == SYNCSEND) { add(PGSYNC); realbuffer->stashqueue->write(1); mode = SENDOUT; // Demote it to prevent an extra SYNC upon stashflush } realbuffer->stash->add(this); PD("%d>Stashed mode %d > %d\n", realbuffer->socket->query_fd(), mode, realbuffer->stashflushmode); if (mode > realbuffer->stashflushmode) realbuffer->stashflushmode = mode; if(!--realbuffer->stashcount) realbuffer->stashavail.signal(); lock=0; this->clear(); if(lock=realbuffer->nostash->trylock(1)) { #ifdef PG_DEBUGRACE conxsess sess = conxsess(realbuffer); realbuffer->started = lock; lock = 0; sess->sendcmd(SENDOUT); #else realbuffer->started = lock; lock = 0; realbuffer->sendcmd(SENDOUT); #endif } } }; class conxiin { inherit Stdio.Buffer:i; final Thread.Condition fillread; final Thread.Mutex fillreadmux; final int procmsg; private int didreadcb; protected bool range_error(int howmuch) { #ifdef PG_DEBUG if(howmuch<=0) error("Out of range %d\n",howmuch); #endif if(fillread) { Thread.MutexKey lock=fillreadmux->lock(); if(!didreadcb) fillread.wait(lock); didreadcb=0; lock=0; } else throw(MAGICTERMINATE); return true; } final int read_cb(mixed id,mixed b) { PD("Read callback %O\n", b && ((string)b) #ifndef PG_DEBUGMORE [..255] #endif ); Thread.MutexKey lock=fillreadmux->lock(); if(procmsg&&id) procmsg=0,lock=0,Thread.Thread(id); else if(fillread) didreadcb=1, fillread.signal(); lock=0; return 0; } protected void create() { i::create(); fillreadmux=Thread.Mutex(); fillread=Thread.Condition(); } }; class conxion { inherit Stdio.Buffer:o; final conxiin i; private Thread.Queue qportals; final Thread.Mutex shortmux; private int closenext; final Stdio.File socket; private function(void|mixed:void) connectfail; private int towrite; final multiset(function(void|mixed:void)) closecallbacks=(<>); final Thread.Mutex nostash; final Thread.MutexKey started; final Thread.Queue stashqueue; final Thread.Condition stashavail; final Stdio.Buffer stash; final int stashflushmode; final int stashcount; final int synctransact; #ifdef PG_DEBUGRACE final mixed nostrack; #endif #ifdef PG_DEBUG final int queueoutidx; final int queueinidx=-1; #endif private inline void queueup(sql_result portal) { qportals->write(portal); portal->_synctransact=synctransact; PD("%d>%O %d %d Queue portal %d bytes\n",socket->query_fd(), portal._portalname,++queueoutidx,synctransact,sizeof(this)); } final bufcon|conxsess start(void|int waitforreal) { Thread.MutexKey lock; if(lock=(waitforreal?nostash->lock:nostash->trylock)(1)) { int mode; #ifdef PG_DEBUGRACE conxsess sess = conxsess(this); #endif started = lock; lock=shortmux->lock(); if(stashcount) PT(stashavail.wait(lock)); mode = getstash(KEEP); lock=0; if (mode > KEEP) sendcmd(mode); // Force out stash to the server #ifdef PG_DEBUGRACE return sess; #else return this; #endif } stashcount++; return bufcon(this); } private int write_cb() { Thread.MutexKey lock = shortmux->lock(); if (this) { // Guard against async destructs towrite -= output_to(socket, towrite); lock = 0; if (!i->fillread && !sizeof(this)) close(); } return 0; } private int getstash(int mode) { if (sizeof(stash)) { add(stash); stash->clear(); foreach (stashqueue->try_read_array();; int|sql_result portal) if (intp(portal)) qportals->write(synctransact++); else queueup(portal); PD("%d>Got stash mode %d > %d\n", socket->query_fd(), stashflushmode, mode); if (stashflushmode > mode) mode = stashflushmode; stashflushmode = KEEP; } return mode; } final void sendcmd(void|int mode, void|sql_result portal) { Thread.MutexKey lock; if (portal) queueup(portal); unfinalised: do { switch (mode) { default: break unfinalised; case SYNCSEND: PD("%d>Sync %d %d Queue\n", socket->query_fd(), synctransact, ++queueoutidx); add(PGSYNC); mode = SENDOUT; break; case FLUSHLOGSEND: PD("%d>%O %d Queue simplequery %d bytes\n", socket->query_fd(), portal._portalname, ++queueoutidx, sizeof(this)); mode = FLUSHSEND; } qportals->write(synctransact++); } while(0); lock = shortmux->lock(); mode = getstash(mode); catch { outer: do { switch(mode) { default: PD("%d>Skip flush %d Queue %O\n", socket->query_fd(), mode, (string)this); break outer; case FLUSHSEND: PD("Flush\n"); add(PGFLUSH); case SENDOUT:; } if(towrite=sizeof(this)) { PD("%d>Sendcmd %O\n",socket->query_fd(),((string)this)[..towrite-1]); towrite-=output_to(socket,towrite); } } while(0); lock=started=0; return; }; lock=0; catch(connectfail()); } final int close() { if(!closenext && nostash) { closenext=1; Thread.MutexKey lock=i->fillreadmux->lock(); if(i->fillread) { // Delayed close() after flushing the output buffer i->fillread.signal(); i->fillread=0; } lock=0; PD("%d>Delayed close, flush write\n",socket->query_fd()); i->read_cb(socket->query_id(),0); return 0; } else return -1; } protected void destroy() { PD("%d>Close conxion %d\n", socket ? socket->query_fd() : -1, !!nostash); int|.pgsql_util.sql_result portal; if (qportals) // CancelRequest does not use qportals while (portal = qportals->try_read()) if (objectp(portal)) portal->_purgeportal(); if(nostash) { catch { while(sizeof(closecallbacks)) foreach(closecallbacks;function(void|mixed:void) closecb;) closecb(); destruct(nostash); socket->set_nonblocking(); // Drop all callbacks PD("%d>Close socket\n",socket->query_fd()); socket->close(); }; } connectfail=0; } final void connectloop(object pgsqlsess, int nossl) { mixed err=catch { for(;;clear()) { socket->connect(pgsqlsess._host,pgsqlsess._port); #if constant(SSL.File) if(!nossl && !pgsqlsess->nossl && (pgsqlsess._options.use_ssl || pgsqlsess._options.force_ssl)) { PD("SSLRequest\n"); start()->add_int32(8)->add_int32(PG_PROTOCOL(1234,5679)) ->sendcmd(SENDOUT); switch(read_int8()) { case 'S': object fcon=SSL.File(socket,SSL.Context()); if(fcon->connect()) { socket=fcon; break; } default: PD("%d>Close socket short\n", socket->query_fd()); socket->close(); pgsqlsess.nossl=1; continue; case 'N': if(pgsqlsess._options.force_ssl) error("Encryption not supported on connection to %s:%d\n", pgsqlsess.host,pgsqlsess.port); } } #else if(pgsqlsess._options.force_ssl) error("Encryption library missing," " cannot establish connection to %s:%d\n", pgsqlsess.host,pgsqlsess.port); #endif break; } connectfail=pgsqlsess->_connectfail; if(!socket->is_open()) error(strerror(socket->errno())+".\n"); socket->set_backend(local_backend); socket->set_buffer_mode(i,0); socket->set_nonblocking(i->read_cb,write_cb,close); if (nossl != 2) { connectfail=pgsqlsess->_connectfail; Thread.Thread(pgsqlsess->_processloop,this); } return; }; catch(connectfail(err)); } private string _sprintf(int type, void|mapping flags) { string res=UNDEFINED; switch(type) { case 'O': int fd=-1; if(socket) catch(fd=socket->query_fd()); res=predef::sprintf("conxion fd: %d input queue: %d/%d " "queued portals: %d output queue: %d/%d\n" "started: %d\n", fd,sizeof(i),i->_size_object(), qportals && qportals->size(), sizeof(this), _size_object(), !!started); break; } return res; } protected void create(object pgsqlsess,Thread.Queue _qportals,int nossl) { o::create(); qportals = _qportals; synctransact = 1; socket=Stdio.File(); i=conxiin(); shortmux=Thread.Mutex(); nostash=Thread.Mutex(); closenext = 0; stashavail=Thread.Condition(); stashqueue=Thread.Queue(); stash=Stdio.Buffer(); Thread.Thread(connectloop,pgsqlsess,nossl); } }; #ifdef PG_DEBUGRACE class conxsess { final conxion chain; void create(conxion parent) { if (parent->started) werror("Overwriting conxsess %s %s\n", describe_backtrace(({"new ", backtrace()[..<1]})), describe_backtrace(({"old ", parent->nostrack}))); parent->nostrack = backtrace(); chain = parent; } final void sendcmd(int mode,void|sql_result portal) { chain->sendcmd(mode, portal); chain = 0; } void destroy() { if (chain) werror("Untransmitted conxsess %s\n", describe_backtrace(({"", backtrace()[..<1]}))); } }; #endif //! The result object returned by @[Sql.pgsql()->big_query()], except for //! the noted differences it behaves the same as @[Sql.sql_result]. //! //! @seealso //! @[Sql.sql_result], @[Sql.pgsql], @[Sql.Sql], @[Sql.pgsql()->big_query()] class sql_result { private object pgsqlsess; private int eoffound; private conxion c; private conxiin cr; final mixed _delayederror; final int _state; final int _fetchlimit; private int alltext; final int _forcetext; private int syncparse; private int transtype; final string _portalname; private int rowsreceived; private int inflight; private int portalbuffersize; private Thread.Mutex closemux; private Thread.Queue datarows; private array(mapping(string:mixed)) datarowdesc; private array(int) datarowtypes; // types from datarowdesc private string statuscmdcomplete; private int bytesreceived; final int _synctransact; final Thread.Condition _ddescribe; final Thread.Mutex _ddescribemux; final Thread.MutexKey _unnamedportalkey,_unnamedstatementkey; final array _params; final string _query; final string _preparedname; final mapping(string:mixed) _tprepared; private function(:void) gottimeout; private int timeout; private string _sprintf(int type, void|mapping flags) { string res=UNDEFINED; switch(type) { case 'O': int fd=-1; if(c&&c->socket) catch(fd=c->socket->query_fd()); res=sprintf("sql_result state: %d numrows: %d eof: %d inflight: %d\n" "query: %O\n" "fd: %O portalname: %O datarows: %d" " synctransact: %d laststatus: %s\n", _state,rowsreceived,eoffound,inflight, _query,fd,_portalname,datarowtypes&&sizeof(datarowtypes), _synctransact, statuscmdcomplete||(_unnamedstatementkey?"*parsing*":"")); break; } return res; } protected void create(object _pgsqlsess,conxion _c,string query, int _portalbuffersize,int alltyped,array params,int forcetext, int _timeout, int _syncparse, int _transtype) { pgsqlsess = _pgsqlsess; cr = (c = _c)->i; _query = query; datarows = Thread.Queue(); _ddescribe=Thread.Condition(); _ddescribemux=Thread.Mutex(); closemux=Thread.Mutex(); portalbuffersize=_portalbuffersize; alltext = !alltyped; _params = params; _forcetext = forcetext; _state = PORTALINIT; timeout = _timeout; syncparse = _syncparse; gottimeout = _pgsqlsess->cancelquery; c->closecallbacks+=(<destroy>); transtype = _transtype; } //! Returns the command-complete status for this query. //! //! @seealso //! @[affected_rows()] //! //! @note //! This function is PostgreSQL-specific, and thus it is not available //! through the generic SQL-interface. /*semi*/final string status_command_complete() { return statuscmdcomplete; } //! Returns the number of affected rows by this query. //! //! @seealso //! @[status_command_complete()] //! //! @note //! This function is PostgreSQL-specific, and thus it is not available //! through the generic SQL-interface. /*semi*/final int affected_rows() { int rows; if(statuscmdcomplete) sscanf(statuscmdcomplete,"%*s %d",rows); return rows; } final void _storetiming() { if(_tprepared) { _tprepared.trun=gethrtime()-_tprepared.trunstart; m_delete(_tprepared,"trunstart"); _tprepared = UNDEFINED; } } private void waitfordescribe() { Thread.MutexKey lock=_ddescribemux->lock(); if(!datarowtypes) PT(_ddescribe->wait(lock)); lock=0; } //! @seealso //! @[Sql.sql_result()->num_fields()] /*semi*/final int num_fields() { if(!datarowtypes) waitfordescribe(); trydelayederror(); return sizeof(datarowtypes); } //! @note //! This method returns the number of rows already received from //! the database for the current query. Note that this number //! can still increase between subsequent calls if the results from //! the query are not complete yet. This function is only guaranteed to //! return the correct count after EOF has been reached. //! @seealso //! @[Sql.sql_result()->num_rows()] /*semi*/final int num_rows() { trydelayederror(); return rowsreceived; } private inline void trydelayederror() { if(_delayederror) throwdelayederror(this); } //! @seealso //! @[Sql.sql_result()->eof()] /*semi*/final int eof() { trydelayederror(); return eoffound; } //! @seealso //! @[Sql.sql_result()->fetch_fields()] /*semi*/final array(mapping(string:mixed)) fetch_fields() { if(!datarowtypes) waitfordescribe(); trydelayederror(); return datarowdesc+emptyarray; } #ifdef PG_DEBUG #define INTVOID int #else #define INTVOID void #endif final INTVOID _decodedata(int msglen,string cenc) { _storetiming(); _releasestatement(); string serror; bytesreceived+=msglen; int cols=cr->read_int16(); array a=allocate(cols,!alltext&&Val.null); #ifdef PG_DEBUG msglen-=2+4*cols; #endif foreach(datarowtypes;int i;int typ) { int collen=cr->read_sint(4); if(collen>0) { #ifdef PG_DEBUG msglen-=collen; #endif mixed value; switch(typ) { case FLOAT4OID: #if SIZEOF_FLOAT>=8 case FLOAT8OID: #endif if(!alltext) { value=(float)cr->read(collen); break; } default:value=cr->read(collen); break; case CHAROID: value=alltext?cr->read(1):cr->read_int8(); break; case BOOLOID:value=cr->read_int8(); switch(value) { case 'f':value=0; break; case 't':value=1; } if(alltext) value=value?"t":"f"; break; case TEXTOID: case BPCHAROID: case VARCHAROID: value=cr->read(collen); if(cenc==UTF8CHARSET && catch(value=utf8_to_string(value)) && !serror) serror=SERROR("%O contains non-%s characters\n", value,UTF8CHARSET); break; case INT8OID:case INT2OID: case OIDOID:case INT4OID: if(_forcetext) { value=cr->read(collen); if(!alltext) value=(int)value; } else { switch(typ) { case INT8OID:value=cr->read_sint(8); break; case INT2OID:value=cr->read_sint(2); break; case OIDOID: case INT4OID:value=cr->read_sint(4); } if(alltext) value=(string)value; } } a[i]=value; } else if(!collen) a[i]=""; } _processdataready(a); if(serror) error(serror); #ifdef PG_DEBUG return msglen; #endif } final void _setrowdesc(array(mapping(string:mixed)) drowdesc, array(int) drowtypes) { Thread.MutexKey lock=_ddescribemux->lock(); datarowdesc=drowdesc; datarowtypes=drowtypes; _ddescribe->broadcast(); lock=0; } final void _preparebind(array dtoid) { array(string|int) paramValues=_params?_params[2]:({}); if(sizeof(dtoid)!=sizeof(paramValues)) SUSERERROR("Invalid number of bindings, expected %d, got %d\n", sizeof(dtoid),sizeof(paramValues)); Thread.MutexKey lock=_ddescribemux->lock(); if(!_portalname) { _portalname=(_unnamedportalkey=pgsqlsess._unnamedportalmux->trylock(1)) ? "" : PORTALPREFIX #ifdef PG_DEBUG +(string)(c->socket->query_fd())+"_" #endif +int2hex(pgsqlsess._pportalcount++); lock=0; #ifdef PG_DEBUGMORE PD("ParamValues to bind: %O\n",paramValues); #endif Stdio.Buffer plugbuffer=Stdio.Buffer(); { array dta=({sizeof(dtoid)}); plugbuffer->add(_portalname,0,_preparedname,0) ->add_ints(dta+map(dtoid,oidformat)+dta,2); } string cenc=pgsqlsess._runtimeparameter[CLIENT_ENCODING]; foreach(paramValues;int i;mixed value) { if(undefinedp(value) || objectp(value)&&value->is_val_null) plugbuffer->add_int32(-1); // NULL else if(stringp(value) && !sizeof(value)) { int k=0; switch(dtoid[i]) { default: k=-1; // cast empty strings to NULL for non-string types case BYTEAOID: case TEXTOID: case XMLOID: case BPCHAROID: case VARCHAROID:; } plugbuffer->add_int32(k); } else switch(dtoid[i]) { case TEXTOID: case BPCHAROID: case VARCHAROID: { if(!value) { plugbuffer->add_int32(-1); break; } value=(string)value; switch(cenc) { case UTF8CHARSET: value=string_to_utf8(value); break; default: if(String.width(value)>8) { SUSERERROR("Don't know how to convert %O to %s encoding\n", value,cenc); value=""; } } plugbuffer->add_hstring(value,4); break; } default: { if(!value) { plugbuffer->add_int32(-1); break; } if (!objectp(value) || typeof(value) != stdiobuftype) /* * Like Oracle and SQLite, we accept literal binary values * from single-valued multisets. */ if (multisetp(value) && sizeof(value) == 1) value = indices(value)[0]; else { value = (string)value; if (String.width(value) > 8) if (dtoid[i] == BYTEAOID) /* * FIXME We should throw an error here, it would * have been correct, but for historical reasons and * as a DWIM convenience we autoconvert to UTF8 here. */ value = string_to_utf8(value); else { SUSERERROR( "Wide string %O not supported for type OID %d\n", value,dtoid[i]); value=""; } } plugbuffer->add_hstring(value,4); break; } case BOOLOID: do { int tval; if(stringp(value)) tval=value[0]; else if(!intp(value)) { value=!!value; // cast to boolean break; } else tval=value; switch(tval) { case 'o':case 'O': catch { tval=value[1]; value=tval=='n'||tval=='N'; }; break; default: value=1; break; case 0:case '0':case 'f':case 'F':case 'n':case 'N': value=0; break; } } while(0); plugbuffer->add_int32(1)->add_int8(value); break; case CHAROID: if(intp(value)) plugbuffer->add_hstring(value,4); else { value=(string)value; switch(sizeof(value)) { default: SUSERERROR( "\"char\" types must be 1 byte wide, got %O\n",value); case 0: plugbuffer->add_int32(-1); // NULL break; case 1: plugbuffer->add_hstring(value[0],4); } } break; case INT8OID: plugbuffer->add_int32(8)->add_int((int)value,8); break; case OIDOID: case INT4OID: plugbuffer->add_int32(4)->add_int32((int)value); break; case INT2OID: plugbuffer->add_int32(2)->add_int16((int)value); break; } } if(!datarowtypes) { if(_tprepared && dontcacheprefix->match(_query)) m_delete(pgsqlsess->_prepareds,_query),_tprepared=0; waitfordescribe(); } if(_state>=CLOSING) lock=_unnamedstatementkey=0; else { plugbuffer->add_int16(sizeof(datarowtypes)); if(sizeof(datarowtypes)) plugbuffer->add_ints(map(datarowtypes,oidformat),2); else if (syncparse < 0 && !pgsqlsess->_wasparallelisable && pgsqlsess->_statementsinflight > 1) { lock=pgsqlsess->_shortmux->lock(); // Decrement temporarily to account for ourselves if(--pgsqlsess->_statementsinflight) { pgsqlsess->_waittocommit++; PD("Commit waiting for statements to finish\n"); catch(PT(pgsqlsess->_readyforcommit->wait(lock))); pgsqlsess->_waittocommit--; } // Increment again to account for ourselves pgsqlsess->_statementsinflight++; } lock=0; PD("Bind portal %O statement %O\n",_portalname,_preparedname); _fetchlimit=pgsqlsess->_fetchlimit; _bindportal(); conxsess bindbuffer = c->start(); _unnamedstatementkey=0; CHAIN(bindbuffer)->add_int8('B')->add_hstring(plugbuffer, 4, 4); if(!_tprepared && sizeof(_preparedname)) closestatement(CHAIN(bindbuffer), _preparedname); _sendexecute(_fetchlimit && !(transtype != NOTRANS || sizeof(_query)>=MINPREPARELENGTH && execfetchlimit->match(_query)) && _fetchlimit,bindbuffer); } } else lock=0; } final void _processrowdesc(array(mapping(string:mixed)) datarowdesc, array(int) datarowtypes) { _setrowdesc(datarowdesc,datarowtypes); if(_tprepared) { _tprepared.datarowdesc=datarowdesc; _tprepared.datarowtypes=datarowtypes; } } final void _parseportal() { Thread.MutexKey lock = closemux->lock(); _state=PARSING; Thread.MutexKey lockc = pgsqlsess->_shortmux->lock(); if (syncparse || syncparse < 0 && pgsqlsess->_wasparallelisable) { if(pgsqlsess->_statementsinflight) { pgsqlsess->_waittocommit++; PD("Commit waiting for statements to finish\n"); // Do NOT put this in a function, it would require passing a lock // variable on the argumentstack to the function, which will cause // unpredictable lock release issues due to the extra copy on the stack catch(PT(pgsqlsess->_readyforcommit->wait(lockc))); pgsqlsess->_waittocommit--; } } pgsqlsess->_statementsinflight++; lockc = 0; lock=0; statuscmdcomplete=UNDEFINED; pgsqlsess->_wasparallelisable = paralleliseprefix->match(_query); } final void _releasestatement(void|int nolock) { Thread.MutexKey lock; if (!nolock) lock = closemux->lock(); if (_state <= BOUND) { _state = COMMITTED; lock = pgsqlsess->_shortmux->lock(); if (!--pgsqlsess->_statementsinflight && pgsqlsess->_waittocommit) { PD("Signal no statements in flight\n"); catch(pgsqlsess->_readyforcommit->signal()); } } lock = 0; } final void _bindportal() { Thread.MutexKey lock = closemux->lock(); _state=BOUND; Thread.MutexKey lockc = pgsqlsess->_shortmux->lock(); pgsqlsess->_portalsinflight++; lockc = 0; lock=0; } final void _purgeportal() { PD("Purge portal\n"); datarows->write(1); // Signal EOF Thread.MutexKey lock=closemux->lock(); _fetchlimit=0; // disables further Executes switch(_state) { case COPYINPROGRESS: case COMMITTED: case BOUND: --pgsqlsess->_portalsinflight; } switch(_state) { case BOUND: case PARSING: --pgsqlsess->_statementsinflight; } _state=CLOSED; lock=0; releaseconditions(); } final int _closeportal(conxsess cs) { object plugbuffer = CHAIN(cs); int retval=KEEP; PD("%O Try Closeportal %d\n",_portalname,_state); Thread.MutexKey lock=closemux->lock(); _fetchlimit=0; // disables further Executes switch(_state) { case PARSING: case BOUND: _releasestatement(1); } switch(_state) { case PORTALINIT: case PARSING: _unnamedstatementkey=0; _state=CLOSING; break; case COPYINPROGRESS: PD("CopyDone\n"); plugbuffer->add("c\0\0\0\4"); case COMMITTED: case BOUND: _state=CLOSING; lock=0; PD("Close portal %O\n",_portalname); if (_portalname && sizeof(_portalname)) { plugbuffer->add_int8('C')->add_hstring(({'P',_portalname,0}),4,4); retval=FLUSHSEND; } else _unnamedportalkey=0; Thread.MutexKey lockc=pgsqlsess->_shortmux->lock(); if(!--pgsqlsess->_portalsinflight) { if(!pgsqlsess->_waittocommit && !plugbuffer->stashcount && transtype != TRANSBEGIN) /* * stashcount will be non-zero if a parse request has been queued * before the close was initiated. * It's a bit of a tricky race, but this check should be sufficient. */ pgsqlsess->_readyforquerycount++, retval=SYNCSEND; pgsqlsess->_pportalcount=0; } lockc=0; } lock=0; return retval; } final void _processdataready(array datarow,void|int msglen) { bytesreceived+=msglen; inflight--; if(_state<CLOSED) datarows->write(datarow); if(++rowsreceived==1) PD("<%O _fetchlimit %d=min(%d||1,%d), inflight %d\n",_portalname, _fetchlimit,(portalbuffersize>>1)*rowsreceived/bytesreceived, pgsqlsess._fetchlimit,inflight); if(_fetchlimit) { _fetchlimit= min((portalbuffersize>>1)*rowsreceived/bytesreceived||1, pgsqlsess._fetchlimit); Thread.MutexKey lock=closemux->lock(); if(_fetchlimit && inflight<=(_fetchlimit-1)>>1) _sendexecute(_fetchlimit); else if(!_fetchlimit) PD("<%O _fetchlimit %d, inflight %d, skip execute\n", _portalname,_fetchlimit,inflight); lock=0; } } private void releaseconditions() { _unnamedportalkey=_unnamedstatementkey=0; pgsqlsess=0; if(!datarowtypes) { Thread.MutexKey lock=_ddescribemux->lock(); datarowtypes=emptyarray; datarowdesc=emptyarray; _ddescribe->broadcast(); lock=0; } } final void _releasesession(void|string statusccomplete) { c->closecallbacks-=(<destroy>); if(statusccomplete && !statuscmdcomplete) statuscmdcomplete=statusccomplete; inflight=0; conxsess plugbuffer; if (!catch(plugbuffer = c->start())) plugbuffer->sendcmd(_closeportal(plugbuffer)); _state=CLOSED; datarows->write(1); // Signal EOF releaseconditions(); } protected void destroy() { catch { // inside destructors, exceptions don't work _releasesession(); }; } final void _sendexecute(int fetchlimit,void|bufcon|conxsess plugbuffer) { int flushmode; PD("Execute portal %O fetchlimit %d transtype %d\n", _portalname, fetchlimit, transtype); if(!plugbuffer) plugbuffer=c->start(1); CHAIN(plugbuffer)->add_int8('E')->add_hstring(({_portalname,0}), 4, 8) ->add_int32(fetchlimit); if (!fetchlimit) { if (transtype != NOTRANS) pgsqlsess._intransaction = transtype == TRANSBEGIN; flushmode = _closeportal(plugbuffer) == SYNCSEND || transtype == TRANSEND ? SYNCSEND : FLUSHSEND; } else inflight+=fetchlimit, flushmode=FLUSHSEND; plugbuffer->sendcmd(flushmode,this); } //! @returns //! One result row at a time. //! //! When using COPY FROM STDOUT, this method returns one row at a time //! as a single string containing the entire row. //! //! @seealso //! @[eof()], @[send_row()] /*semi*/final array(mixed) fetch_row() { int|array datarow; if(arrayp(datarow=datarows->try_read())) return datarow; if(!eoffound) { if(!datarow) { PD("%O Block for datarow\n",_portalname); array cid=callout(gottimeout,timeout); PT(datarow=datarows->read()); local_backend->remove_call_out(cid); if(arrayp(datarow)) return datarow; } eoffound=1; datarows->write(1); // Signal EOF for other threads } trydelayederror(); return 0; } //! @returns //! Multiple result rows at a time (at least one). //! //! When using COPY FROM STDOUT, this method returns one row at a time //! as a single string containing the entire row. //! //! @seealso //! @[eof()], @[fetch_row()] /*semi*/final array(array(mixed)) fetch_row_array() { if(eoffound) return 0; array(array|int) datarow=datarows->try_read_array(); if(!datarow) { array cid=callout(gottimeout,timeout); PT(datarow=datarows->read_array()); local_backend->remove_call_out(cid); } if(arrayp(datarow[-1])) return datarow; trydelayederror(); eoffound=1; datarows->write(1); // Signal EOF for other threads return (datarow=datarow[..<1]); } //! @param copydata //! When using COPY FROM STDIN, this method accepts a string or an //! array of strings to be processed by the COPY command; when sending //! the amount of data sent per call does not have to hit row or column //! boundaries. //! //! The COPY FROM STDIN sequence needs to be completed by either //! explicitly or implicitly destroying the result object, or by passing no //! argument to this method. //! //! @seealso //! @[fetch_row()], @[eof()] /*semi*/final void send_row(void|string|array(string) copydata) { trydelayederror(); if(copydata) { PD("CopyData\n"); object cs = c->start(); CHAIN(cs)->add_int8('d')->add_hstring(copydata, 4, 4); cs->sendcmd(SENDOUT); } else _releasesession(); } private void run_result_cb( function(sql_result, array(mixed), mixed ...:void) callback, array(mixed) args) { int|array datarow; for(;;) { array cid=callout(gottimeout,timeout); PT(datarow=datarows->read()); local_backend->remove_call_out(cid); if(!arrayp(datarow)) break; callout(callback, 0, this, datarow, @args); } trydelayederror(); eoffound=1; callout(callback, 0, this, 0, @args); } //! Sets up a callback for every row returned from the database. //! First argument passed is the resultobject itself, second argument //! is the result row (zero on EOF). //! //! @seealso //! @[fetch_row()] /*semi*/final void set_result_callback( function(sql_result, array(mixed), mixed ...:void) callback, mixed ... args) { if(callback) Thread.Thread(run_result_cb,callback,args); } private void run_result_array_cb( function(sql_result, array(array(mixed)), mixed ...:void) callback, array(mixed) args) { array(array|int) datarow; for(;;) { array cid=callout(gottimeout,timeout); PT(datarow=datarows->read_array()); local_backend->remove_call_out(cid); if(!datarow || !arrayp(datarow[-1])) break; callout(callback, 0, this, datarow, @args); } trydelayederror(); eoffound=1; if(sizeof(datarow)>1) callout(callback, 0, this, datarow=datarow[..<1], @args); callout(callback, 0, this, 0, @args); } //! Sets up a callback for sets of rows returned from the database. //! First argument passed is the resultobject itself, second argument //! is the array of result rows (zero on EOF). //! //! @seealso //! @[fetch_row()] /*semi*/final void set_result_array_callback( function(sql_result, array(array(mixed)), mixed ...:void) callback, mixed ... args) { if(callback) Thread.Thread(run_result_array_cb,callback,args); } };