diff --git a/lib/modules/Sql.pmod/pgsql.pike b/lib/modules/Sql.pmod/pgsql.pike index 4b4b740543ec51e16f35a3e3e742f29f2f7f92bd..7bec4ffb4e2ec3718af8e410f7125f2f0b4992b4 100644 --- a/lib/modules/Sql.pmod/pgsql.pike +++ b/lib/modules/Sql.pmod/pgsql.pike @@ -36,6 +36,7 @@ private int backendstatus; private mapping(string:mixed) options; private string lastmessage; private int clearmessage; +private int earlyclose; private mapping(string:array(mixed)) notifylist=([]); private mapping(string:string) msgresponse; private mapping(string:string) runtimeparameter; @@ -706,7 +707,7 @@ private void reconnect(void|int force) { foreach(plugbuf;;string s) len+=sizeof(s); plugbuf[0]=_c.plugint32(len); - _c.sendcmd(plugbuf); + _c.write(plugbuf); PD("%O\n",plugbuf); _decodemsg(readyforquery); PD("%O\n",runtimeparameter); @@ -720,11 +721,13 @@ void reload(void|int special) { mixed err; int didsync; if(err = catch { - sendclose(); + sendclose(1); PD("Portalsinflight: %d\n",portalsinflight); if(!portalsinflight) { - PD("Sync\n"); - _c.sendcmd(({"S",_c.plugint32(4)})); + if(!earlyclose) { + PD("Sync\n"); + _c.sendcmd(({"S",_c.plugint32(4)}),2); + } didsync=1; if(!special) { _decodemsg(readyforquery); @@ -734,7 +737,9 @@ void reload(void|int special) { } } } + earlyclose=0; }) { + earlyclose=0; PD("%O\n",err); reconnect(1); } @@ -1032,11 +1037,20 @@ final void _sendexecute(int fetchlimit) { string portalname=_c.portal->_portalname; PD("Execute portal %s fetchlimit %d\n",portalname,fetchlimit); _c.sendcmd(({"E",_c.plugint32(4+sizeof(portalname)+1+4),portalname, - "\0",_c.plugint32(fetchlimit)}),1); + "\0",_c.plugint32(fetchlimit)}),!!fetchlimit); _c.portal->_inflight+=fetchlimit; + if(!fetchlimit) { + earlyclose=1; + if(sizeof(portalname)) { + PD("Close portal %s & Sync\n",portalname); + _c.sendcmd(({"C",_c.plugint32(4+1+sizeof(portalname)+1), + "P",portalname,"\0"})); + } + _c.sendcmd(({"S",_c.plugint32(4)}),2); + } } -final private void sendclose() { +final private void sendclose(void|int hold) { string portalname; portalsinflight--; if(_c.portal && (portalname=_c.portal->_portalname)) { @@ -1047,10 +1061,14 @@ final private void sendclose() { #endif if(!sizeof(portalname)) unnamedportalinuse--; - PD("Close portal %s\n",portalname); - _c.sendcmd(({"C",_c.plugint32(4+1+sizeof(portalname)+1), - "P",portalname,"\0"}),1); - _closesent=1; + if(sizeof(portalname)) { + if(!earlyclose) { + PD("Close portal %s\n",portalname); + _c.sendcmd(({"C",_c.plugint32(4+1+sizeof(portalname)+1), + "P",portalname,"\0"}),!hold||portalsinflight?1:0); + } + _closesent=1; + } } } @@ -1185,6 +1203,7 @@ object big_query(string q,void|mapping(string|int:mixed) bindings) { clearmessage=1; mixed err; if(err = catch { + _c.set_read_callback(0); if(!sizeof(preparedname) || !tprepared || !tprepared->preparedname) { PD("Parse statement %s\n",preparedname); // Even though the protocol doesn't require the Parse command to be @@ -1192,10 +1211,9 @@ object big_query(string q,void|mapping(string|int:mixed) bindings) { // performance if it is omitted; seems like a flaw in the PostgreSQL // server _c.sendcmd(({"P",_c.plugint32(4+sizeof(preparedname)+1+sizeof(q)+1+2), - preparedname,"\0",q,"\0",_c.plugint16(0)}),1); + preparedname,"\0",q,"\0",_c.plugint16(0)}),3); PD("Query: %O\n",q); - } // sends Parameter- and RowDescription for 'S' - _c.set_read_callback(0); + } // sends Parameter- and RowDescription for 'S' if(!tprepared || !tprepared->datatypeoid) { PD("Describe statement %s\n",preparedname); _c.sendcmd(({"D",_c.plugint32(4+1+sizeof(preparedname)+1), @@ -1212,8 +1230,8 @@ object big_query(string q,void|mapping(string|int:mixed) bindings) { _c.plugint16(sizeof(paramValues))}); if(!tprepared || !tprepared->datatypeoid) { _decodemsg(gotparameterdescription); - if(tprepared) - tprepared->datatypeoid=_c.portal->_datatypeoid; + if(tprepared) + tprepared->datatypeoid=_c.portal->_datatypeoid; } array dtoid=_c.portal->_datatypeoid; foreach(dtoid;;int textbin) @@ -1227,15 +1245,15 @@ object big_query(string q,void|mapping(string|int:mixed) bindings) { default: { int k; len+=k=sizeof(value=(string)value); - plugbuf+=({_c.plugint32(k),value}); - break; + plugbuf+=({_c.plugint32(k),value}); + break; } case BOOLOID:plugbuf+=({_c.plugint32(1)});len++; switch(stringp(value)?value[0]:value) { - case 'o':case 'O': - _c.plugbyte(stringp(value)&&sizeof(value)>1 + case 'o':case 'O': + _c.plugbyte(stringp(value)&&sizeof(value)>1 &&(value[1]=='n'||value[1]=='N')); - break; + break; case 0:case 'f':case 'F':case 'n':case 'N': plugbuf+=({_c.plugbyte(0)}); break; @@ -1245,15 +1263,15 @@ object big_query(string q,void|mapping(string|int:mixed) bindings) { } break; case CHAROID:plugbuf+=({_c.plugint32(1)});len++; - if(intp(value)) - plugbuf+=({_c.plugbyte(value)}); - else { - value=(string)value; - if(sizeof(value)!=1) - ERROR("\"char\" types must be 1 byte wide, got %d\n", - sizeof(value)); - plugbuf+=({value}); - } + if(intp(value)) + plugbuf+=({_c.plugbyte(value)}); + else { + value=(string)value; + if(sizeof(value)!=1) + ERROR("\"char\" types must be 1 byte wide, got %d\n", + sizeof(value)); + plugbuf+=({value}); + } break; case INT8OID:len+=8; plugbuf+=({_c.plugint32(8),_c.plugint64((int)value)}); @@ -1268,8 +1286,8 @@ object big_query(string q,void|mapping(string|int:mixed) bindings) { } if(!tprepared || !tprepared->datarowdesc) { _decodemsg(gotrowdescription); - if(tprepared) - tprepared->datarowdesc=_c.portal->_datarowdesc; + if(tprepared) + tprepared->datarowdesc=_c.portal->_datarowdesc; } { array a;int i; len+=(i=sizeof(a=_c.portal->_datarowdesc))*2; diff --git a/lib/modules/Sql.pmod/pgsql_util.pmod b/lib/modules/Sql.pmod/pgsql_util.pmod index 29081c1f3370b613aca85261a0a0655146c46abd..b025889b7577ef91b55abbe005f9c6057fd061fa 100644 --- a/lib/modules/Sql.pmod/pgsql_util.pmod +++ b/lib/modules/Sql.pmod/pgsql_util.pmod @@ -29,6 +29,8 @@ class PGassist { int close() { } + private final array(string) cmdbuf=({}); + #ifdef USEPGsql inherit _PGsql.PGsql; #else @@ -112,23 +114,32 @@ class PGassist { sendcmd(({}),1); } - final int sendcmd(string|array(string) data,void|int flush) { - if(flush) { - if(stringp(data)) - data=({data,FLUSH}); - else - data+=({FLUSH}); - PD("Flush\n"); - flushed=1; + final void sendcmd(string|array(string) data,void|int flush) { + if(arrayp(data)) + cmdbuf+=data; + else + cmdbuf+=({data}); + switch(flush) { + case 3: + cmdbuf+=({FLUSH}); + flushed=1; + break; + default: + flushed=0; + break; + case 1: + cmdbuf+=({FLUSH}); + PD("Flush\n"); + case 2: + flushed=1; + write(cmdbuf); + cmdbuf=({}); } - else if(flushed!=-1) - flushed=0; - return write(data); } final void sendterminate() { PD("Terminate\n"); - sendcmd(({"X",plugint32(4)})); + sendcmd(({"X",plugint32(4)}),2); close(); } } @@ -372,7 +383,7 @@ int|array(string|int) fetch_row(void|int|string buffer) { if(stringp(buffer)) { PD("CopyData\n"); _pgsqlsess._c.sendcmd(({"d",_pgsqlsess._c.plugint32(4+sizeof(buffer)), - buffer})); + buffer}),2); } else releasesession();