diff --git a/lib/modules/Sql.pmod/pgsql.pike b/lib/modules/Sql.pmod/pgsql.pike index 91058968dcb07399185818c220e926d79957f489..af1ce1487bb0f2b0a55434f53e12301bf56898c5 100644 --- a/lib/modules/Sql.pmod/pgsql.pike +++ b/lib/modules/Sql.pmod/pgsql.pike @@ -656,7 +656,7 @@ final void _processloop(.pgsql_util.conxion ci) { _msgsreceived++; _bytesreceived+=1+msglen; int errtype=NOERROR; - PD("%d>",ci->socket->query_fd()); + PD("%d<",ci->socket->query_fd()); switch(msgtype) { array(mapping) getcols() { int bintext=cr->read_int8(); @@ -845,7 +845,7 @@ final void _processloop(.pgsql_util.conxion ci) { #endif if(portal._tprepared) portal._tprepared.datatypeoid=a; - callout(portal->_preparebind,0,a); + Thread.Thread(portal->_preparebind,a); break; } case 'T': { diff --git a/lib/modules/Sql.pmod/pgsql_util.pmod b/lib/modules/Sql.pmod/pgsql_util.pmod index 4e93e8bfcd3bf81d3d344f9bd288a9af95dc5fb2..dc07d11c40d3f2548c41e520f64211173378eb35 100644 --- a/lib/modules/Sql.pmod/pgsql_util.pmod +++ b/lib/modules/Sql.pmod/pgsql_util.pmod @@ -430,12 +430,10 @@ class sql_result { private int rowsreceived; private int inflight; private int portalbuffersize; - private Stdio.Buffer prepbuffer; - private Thread.Condition prepbufferready; - private Thread.Mutex prepbuffermux; private Thread.Mutex closemux; private Thread.Queue datarows; private array(mapping(string:mixed)) datarowdesc; + private array(int) datarowtypes; // cached types from datarowdesc private string statuscmdcomplete; private int bytesreceived; final int _synctransact; @@ -457,7 +455,7 @@ class sql_result { " laststatus: %s\n", _state,rowsreceived,eoffound,inflight, _query,c?->socket?->query_fd(), - _portalname,datarowdesc&&sizeof(datarowdesc), + _portalname,datarowtypes&&sizeof(datarowtypes), statuscmdcomplete||(_unnamedstatementkey?"*parsing*":"")); break; } @@ -473,8 +471,6 @@ class sql_result { _ddescribe=Thread.Condition(); _ddescribemux=Thread.Mutex(); closemux=Thread.Mutex(); - prepbufferready=Thread.Condition(); - prepbuffermux=Thread.Mutex(); portalbuffersize=_portalbuffersize; alltext = !alltyped; _params = params; @@ -519,7 +515,7 @@ class sql_result { private void waitfordescribe() { Thread.MutexKey lock=_ddescribemux->lock(); - if(!datarowdesc) + if(!datarowtypes) _ddescribe->wait(lock); lock=0; } @@ -527,10 +523,10 @@ class sql_result { //! @seealso //! @[Sql.sql_result()->num_fields()] int num_fields() { - if(!datarowdesc) + if(!datarowtypes) waitfordescribe(); trydelayederror(); - return sizeof(datarowdesc); + return sizeof(datarowtypes); } //! @seealso @@ -555,7 +551,7 @@ class sql_result { //! @seealso //! @[Sql.sql_result()->fetch_fields()] array(mapping(string:mixed)) fetch_fields() { - if(!datarowdesc) + if(!datarowtypes) waitfordescribe(); trydelayederror(); return datarowdesc+({}); @@ -575,14 +571,14 @@ class sql_result { #ifdef PG_DEBUG msglen-=2+4*cols; #endif - foreach(datarowdesc;int i;mapping m) { + foreach(datarowtypes;int i;int typ) { int collen=cr->read_sint(4); if(collen>0) { #ifdef PG_DEBUG msglen-=collen; #endif mixed value; - switch(int typ=m.type) { + switch(typ) { case FLOAT4OID: #if SIZEOF_FLOAT>=8 case FLOAT8OID: @@ -647,7 +643,7 @@ class sql_result { final void _setrowdesc(array(mapping(string:mixed)) drowdesc) { Thread.MutexKey lock=_ddescribemux->lock(); - datarowdesc=drowdesc; + datarowtypes=map(datarowdesc=drowdesc,lambda(mapping m){return m.type;}); _ddescribe->broadcast(); lock=0; } @@ -657,7 +653,7 @@ class sql_result { if(sizeof(dtoid)!=sizeof(paramValues)) SUSERERROR("Invalid number of bindings, expected %d, got %d\n", sizeof(dtoid),sizeof(paramValues)); - Thread.MutexKey lock=prepbuffermux->lock(); + Thread.MutexKey lock=_ddescribemux->lock(); if(!_portalname) { _portalname=(_unnamedportalkey=pgsqlsess._unnamedportalmux->trylock(1)) ? "" : PORTALPREFIX @@ -788,71 +784,51 @@ class sql_result { break; } } - if(_tprepared) - if(_tprepared.datarowdesc) - gotdatarowdesc(plugbuffer); - else if(dontcacheprefix->match(_query)) // Don't cache FETCH/COPY + if(!datarowtypes) { + if(_tprepared && dontcacheprefix->match(_query)) m_delete(pgsqlsess->_prepareds,_query),_tprepared=0; - if(prepbufferready) { - lock=prepbuffermux->lock(); - prepbuffer=plugbuffer; - if(prepbufferready) - prepbufferready->signal(); + waitfordescribe(); } - } - lock=0; + if(_state>=CLOSING) + lock=_unnamedstatementkey=0; + else { + plugbuffer->add_int16(sizeof(datarowtypes)); + if(sizeof(datarowtypes)) + foreach(datarowtypes;;int typ) + plugbuffer->add_int16(oidformat(typ)); + else if(commitprefix->match(_query)) { + + lock=pgsqlsess->_shortmux->lock(); + if(pgsqlsess->_portalsinflight) { + pgsqlsess->_waittocommit++; + PD("Commit waiting for portals to finish\n"); + pgsqlsess->_readyforcommit->wait(lock); + pgsqlsess->_waittocommit--; + } + } + lock=0; + PD("Bind portal %O statement %O\n",_portalname,_preparedname); + _fetchlimit=pgsqlsess->_fetchlimit; + _openportal(); + conxion bindbuffer=c->start(1); + _unnamedstatementkey=0; + bindbuffer->add_int8('B')->add_hstring(plugbuffer,4,4); + if(!_tprepared && sizeof(_preparedname)) + closestatement(bindbuffer,_preparedname); + _sendexecute(_fetchlimit + && !(cachealways[_query] + || sizeof(_query)>=MINPREPARELENGTH && + execfetchlimit->match(_query)) + && _fetchlimit,bindbuffer); + } + } else + lock=0; } final void _processrowdesc(array(mapping(string:mixed)) datarowdesc) { _setrowdesc(datarowdesc); if(_tprepared) _tprepared.datarowdesc=datarowdesc; - if(prepbufferready) - Thread.Thread(gotdatarowdesc); // Do not use callout, it deadlocks - } - - private void gotdatarowdesc(void|Stdio.Buffer plugbuffer) { - Thread.MutexKey lock=prepbuffermux->lock(); - if(!plugbuffer) { - if(!prepbuffer && prepbufferready) - prepbufferready->wait(lock); - plugbuffer=prepbuffer; - prepbuffer=0; // Free memory when plugbuffer leaves scope - } - if(!prepbufferready || _state>=CLOSING) - lock=_unnamedstatementkey=0; - else { - prepbufferready->signal(); - prepbufferready=0; // Make sure we do this exactly once - lock=0; - plugbuffer->add_int16(sizeof(datarowdesc)); - if(sizeof(datarowdesc)) - foreach(datarowdesc;;mapping col) - plugbuffer->add_int16(oidformat(col.type)); - else if(commitprefix->match(_query)) { - lock=pgsqlsess->_shortmux->lock(); - if(pgsqlsess->_portalsinflight) { - pgsqlsess->_waittocommit++; - PD("Commit waiting for portals to finish\n"); - pgsqlsess->_readyforcommit->wait(lock); - pgsqlsess->_waittocommit--; - } - lock=0; - } - PD("Bind portal %O statement %O\n",_portalname,_preparedname); - _fetchlimit=pgsqlsess->_fetchlimit; - _openportal(); - conxion bindbuffer=c->start(1); - _unnamedstatementkey=0; - bindbuffer->add_int8('B')->add_hstring(plugbuffer,4,4); - if(!_tprepared && sizeof(_preparedname)) - closestatement(bindbuffer,_preparedname); - _sendexecute(_fetchlimit - && !(cachealways[_query] - || sizeof(_query)>=MINPREPARELENGTH && - execfetchlimit->match(_query)) - && _fetchlimit,bindbuffer); - } } final void _openportal() { @@ -946,18 +922,12 @@ class sql_result { private void releaseconditions() { pgsqlsess=0; - Thread.MutexKey lock; - if(prepbufferready) { - lock=prepbuffermux->lock(); - if(prepbufferready) - prepbufferready->signal(); - } - if(!datarowdesc) { - lock=_ddescribemux->lock(); - datarowdesc=({}); + if(!datarowtypes) { + Thread.MutexKey lock=_ddescribemux->lock(); + datarowtypes=datarowdesc=({}); _ddescribe->broadcast(); + lock=0; } - lock=0; } final void _releasesession(void|string statusccomplete) {