diff --git a/lib/modules/Sql.pmod/pgsql.pike b/lib/modules/Sql.pmod/pgsql.pike index fc121e5de445b9d873a7b750b2c13df51a4fbc3e..61ad4cb41046160f6273900d215563fff22db77c 100644 --- a/lib/modules/Sql.pmod/pgsql.pike +++ b/lib/modules/Sql.pmod/pgsql.pike @@ -580,13 +580,6 @@ private void preplastmessage(mapping(string:string) msgresponse) { msgresponse.L||"")}); } -private void storetiming(.pgsql_util.sql_result portal) { - mapping(string:mixed) tp=portal._tprepared; - tp.trun=gethrtime()-tp.trunstart; - m_delete(tp,"trunstart"); - portal._tprepared = UNDEFINED; -} - private void waitauthready() { if(waitforauthready) { PD("%d Wait for auth ready %O\n",c?->socket?->query_fd(),backtrace()[-2]); @@ -915,8 +908,7 @@ final void _processloop(.pgsql_util.conxion ci) { } case 'H': portal->_processrowdesc(getcols()); - PD("<CopyOutResponse %d %O\n", - sizeof(portal._datarowdesc),portal._query); + PD("<CopyOutResponse %O\n",portal._query); break; case '2': { mapping tp; @@ -944,93 +936,22 @@ final void _processloop(.pgsql_util.conxion ci) { } break; } - case 'D': { + case 'D': msglen-=4; - string serror; - if(portal._tprepared) - storetiming(portal); - portal._bytesreceived+=msglen; - array datarowdesc=portal._datarowdesc; - int cols=ci->read_int16(); #ifdef PG_DEBUG #ifdef PG_DEBUGMORE - PD("<%O DataRow %d cols %d bytes\n",portal._portalname,cols,msglen); + PD("<%O DataRow %d bytes\n",portal._portalname,msglen); #endif datarowdebugcount++; if(!datarowdebug) datarowdebug=sprintf( - "<%O DataRow %d cols %d bytes",portal._portalname,cols,msglen); + "<%O DataRow %d bytes",portal._portalname,msglen); #endif - int atext = portal._alltext; // cache locally for speed - int forcetext = portal._forcetext; // cache locally for speed - string cenc=_runtimeparameter[CLIENT_ENCODING]; - array a=allocate(cols,!atext&&Val.null); - msglen-=2+4*cols; - foreach(datarowdesc;int i;mapping m) { - int collen=ci->read_sint(4); - if(collen>0) { - msglen-=collen; - mixed value; - switch(int typ=m.type) { - case FLOAT4OID: -#if SIZEOF_FLOAT>=8 - case FLOAT8OID: +#ifdef PG_DEBUG + msglen= #endif - if(!atext) { - value=(float)ci->read(collen); - break; - } - default:value=ci->read(collen); - break; - case CHAROID: - value=atext?ci->read(1):ci->read_int8(); - break; - case BOOLOID:value=ci->read_int8(); - switch(value) { - case 'f':value=0; - break; - case 't':value=1; - } - if(atext) - value=value?"t":"f"; - break; - case TEXTOID: - case BPCHAROID: - case VARCHAROID: - value=ci->read(collen); - if(cenc==UTF8CHARSET && catch(value=utf8_to_string(value)) - && !serror) - serror=SERROR("%O contains non-%s characters\n", - value,UTF8CHARSET); - break; - case INT8OID:case INT2OID: - case OIDOID:case INT4OID: - if(forcetext) { - value=ci->read(collen); - if(!atext) - value=(int)value; - } else { - switch(typ) { - case INT8OID:value=ci->read_sint(8); - break; - case INT2OID:value=ci->read_sint(2); - break; - case OIDOID: - case INT4OID:value=ci->read_sint(4); - } - if(atext) - value=(string)value; - } - } - a[i]=value; - } else if(!collen) - a[i]=""; - } - portal->_processdataready(a); - if(serror) - ERROR(serror); + portal->_decodedata(msglen,_runtimeparameter[CLIENT_ENCODING]); break; - } case 's': #ifdef PG_DEBUG PD("<%O PortalSuspended\n",portal._portalname); @@ -1045,8 +966,7 @@ final void _processloop(.pgsql_util.conxion ci) { errtype=protocolerror; #endif string s=ci->read(msglen-1); - if(portal._tprepared) - storetiming(portal); + portal->_storetiming(); PD("<%O CommandComplete %O\n",portal._portalname,s); #ifdef PG_DEBUG if(ci->read_int8()) @@ -1069,23 +989,20 @@ final void _processloop(.pgsql_util.conxion ci) { break; case 'd': PD("<%O CopyData\n",portal._portalname); - if(portal._tprepared) - storetiming(portal); + portal->_storetiming(); msglen-=4; #ifdef PG_DEBUG if(msglen<0) errtype=protocolerror; #endif - portal._bytesreceived+=msglen; - portal->_processdataready(({ci->read(msglen)})); + portal->_processdataready(({ci->read(msglen)}),msglen); #ifdef PG_DEBUG msglen=0; #endif break; case 'G': portal->_setrowdesc(getcols()); - PD("<%O CopyInResponse %d columns\n", - portal._portalname,sizeof(portal._datarowdesc)); + PD("<%O CopyInResponse\n",portal._portalname); portal._state=COPYINPROGRESS; { Thread.MutexKey resultlock=portal._resultmux->lock(); diff --git a/lib/modules/Sql.pmod/pgsql_util.pmod b/lib/modules/Sql.pmod/pgsql_util.pmod index cd3a5324e41de3403325d821dbc539a720081e6a..980ca3c1bf35f7aef28e8c245fa14adb88dff392 100644 --- a/lib/modules/Sql.pmod/pgsql_util.pmod +++ b/lib/modules/Sql.pmod/pgsql_util.pmod @@ -415,7 +415,7 @@ class sql_result { final mixed _delayederror; final int _state; final int _fetchlimit; - final int _alltext; + private int alltext; final int _forcetext; final string _portalname; @@ -428,15 +428,15 @@ class sql_result { private Thread.Mutex prepbuffermux; private Thread.Mutex closemux; private Thread.Queue datarows; + private array(mapping(string:mixed)) datarowdesc; private string statuscmdcomplete; - final int _bytesreceived; + private int bytesreceived; final int _synctransact; final Thread.Condition _ddescribe; final Thread.Mutex _ddescribemux; final Thread.MutexKey _unnamedportalkey,_unnamedstatementkey; final array _params; final string _query; - final array(mapping(string:mixed)) _datarowdesc; final string _preparedname; final mapping(string:mixed) _tprepared; @@ -450,7 +450,7 @@ class sql_result { " laststatus: %s\n", _state,rowsreceived,eoffound,inflight, _query, - _portalname,_datarowdesc&&sizeof(_datarowdesc), + _portalname,datarowdesc&&sizeof(datarowdesc), statuscmdcomplete||(_unnamedstatementkey?"*parsing*":"")); break; } @@ -469,7 +469,7 @@ class sql_result { prepbufferready=Thread.Condition(); prepbuffermux=Thread.Mutex(); portalbuffersize=_portalbuffersize; - _alltext = !alltyped; + alltext = !alltyped; _params = params; _forcetext = forcetext; _state = PORTALINIT; @@ -502,9 +502,17 @@ class sql_result { return rows; } + final void _storetiming() { + if(_tprepared) { + _tprepared.trun=gethrtime()-_tprepared.trunstart; + m_delete(_tprepared,"trunstart"); + _tprepared = UNDEFINED; + } + } + private void waitfordescribe() { Thread.MutexKey lock=_ddescribemux->lock(); - if(!_datarowdesc) + if(!datarowdesc) _ddescribe->wait(lock); lock=0; } @@ -512,10 +520,10 @@ class sql_result { //! @seealso //! @[Sql.sql_result()->num_fields()] int num_fields() { - if(!_datarowdesc) + if(!datarowdesc) waitfordescribe(); trydelayederror(); - return sizeof(_datarowdesc); + return sizeof(datarowdesc); } //! @seealso @@ -540,15 +548,99 @@ class sql_result { //! @seealso //! @[Sql.sql_result()->fetch_fields()] array(mapping(string:mixed)) fetch_fields() { - if(!_datarowdesc) + if(!datarowdesc) waitfordescribe(); trydelayederror(); - return _datarowdesc+({}); + return datarowdesc+({}); + } + +#ifdef PG_DEBUG + final int +#else + final void +#endif + _decodedata(int msglen,string cenc) { + _storetiming(); + string serror; + bytesreceived+=msglen; + int cols=c->read_int16(); + array a=allocate(cols,!alltext&&Val.null); +#ifdef PG_DEBUG + msglen-=2+4*cols; +#endif + foreach(datarowdesc;int i;mapping m) { + int collen=c->read_sint(4); + if(collen>0) { +#ifdef PG_DEBUG + msglen-=collen; +#endif + mixed value; + switch(int typ=m.type) { + case FLOAT4OID: +#if SIZEOF_FLOAT>=8 + case FLOAT8OID: +#endif + if(!alltext) { + value=(float)c->read(collen); + break; + } + default:value=c->read(collen); + break; + case CHAROID: + value=alltext?c->read(1):c->read_int8(); + break; + case BOOLOID:value=c->read_int8(); + switch(value) { + case 'f':value=0; + break; + case 't':value=1; + } + if(alltext) + value=value?"t":"f"; + break; + case TEXTOID: + case BPCHAROID: + case VARCHAROID: + value=c->read(collen); + if(cenc==UTF8CHARSET && catch(value=utf8_to_string(value)) + && !serror) + serror=SERROR("%O contains non-%s characters\n", + value,UTF8CHARSET); + break; + case INT8OID:case INT2OID: + case OIDOID:case INT4OID: + if(_forcetext) { + value=c->read(collen); + if(!alltext) + value=(int)value; + } else { + switch(typ) { + case INT8OID:value=c->read_sint(8); + break; + case INT2OID:value=c->read_sint(2); + break; + case OIDOID: + case INT4OID:value=c->read_sint(4); + } + if(alltext) + value=(string)value; + } + } + a[i]=value; + } else if(!collen) + a[i]=""; + } + _processdataready(a); + if(serror) + error(serror); +#ifdef PG_DEBUG + return msglen; +#endif } - final void _setrowdesc(array(mapping(string:mixed)) datarowdesc) { + final void _setrowdesc(array(mapping(string:mixed)) drowdesc) { Thread.MutexKey lock=_ddescribemux->lock(); - _datarowdesc=datarowdesc; + datarowdesc=drowdesc; _ddescribe->broadcast(); lock=0; } @@ -716,9 +808,9 @@ class sql_result { else { destruct(prepbufferready); // Make sure we do this exactly once lock=0; - plugbuffer->add_int16(sizeof(_datarowdesc)); - if(sizeof(_datarowdesc)) - foreach(_datarowdesc;;mapping col) + plugbuffer->add_int16(sizeof(datarowdesc)); + if(sizeof(datarowdesc)) + foreach(datarowdesc;;mapping col) plugbuffer->add_int16(oidformat(col.type)); else if(commitprefix->match(_query)) { lock=pgsqlsess->_commitmux->lock(); @@ -812,17 +904,17 @@ class sql_result { return retval; } - final void _processdataready(array datarow) { + final void _processdataready(array datarow,void|int msglen) { + bytesreceived+=msglen; inflight--; datarows->write(datarow); - rowsreceived++; - if(rowsreceived==1) + if(++rowsreceived==1) PD("<%O _fetchlimit %d=min(%d||1,%d), inflight %d\n",_portalname, - _fetchlimit,(portalbuffersize>>1)*rowsreceived/_bytesreceived, + _fetchlimit,(portalbuffersize>>1)*rowsreceived/bytesreceived, pgsqlsess._fetchlimit,inflight); if(_fetchlimit) { _fetchlimit= - min((portalbuffersize>>1)*rowsreceived/_bytesreceived||1, + min((portalbuffersize>>1)*rowsreceived/bytesreceived||1, pgsqlsess._fetchlimit); Thread.MutexKey lock=closemux->lock(); if(_fetchlimit && inflight<=_fetchlimit-1) @@ -841,9 +933,9 @@ class sql_result { Thread.MutexKey lock=prepbuffermux->lock(); catch(prepbufferready->signal()); } - if(!_datarowdesc) { + if(!datarowdesc) { lock=_ddescribemux->lock(); - _datarowdesc=({}); + datarowdesc=({}); _ddescribe->broadcast(); } lock=0;