diff --git a/lib/modules/Sql.pmod/pgsql_util.pmod b/lib/modules/Sql.pmod/pgsql_util.pmod index 7faa241ba632dc3d30cbd80ad87fcf4168e59a18..6846f67795f1bfba24c27ab83e40648e958a6e87 100644 --- a/lib/modules/Sql.pmod/pgsql_util.pmod +++ b/lib/modules/Sql.pmod/pgsql_util.pmod @@ -25,6 +25,8 @@ private Thread.Mutex backendmux = Thread.Mutex(); private int clientsregistered; constant emptyarray = ({}); +constant describenodata + = (["datarowdesc":emptyarray,"datarowtypes":emptyarray]); final multiset censoroptions=(<"use_ssl","force_ssl", "cache_autoprepared_statements","reconnect","text_query","is_superuser", "server_encoding","server_version","integer_datetimes", @@ -192,10 +194,16 @@ class bufcon { final void sendcmd(int mode,void|sql_result portal) { Thread.MutexKey lock=realbuffer->shortmux->lock(); - if(portal) + if(portal) { realbuffer->stashqueue->write(portal); + if (mode == SYNCSEND) { + add(PGSYNC); + realbuffer->stashqueue->write(1); + mode = SENDOUT; + } + } realbuffer->stash->add(this); - mode=mergemode(realbuffer,mode); + mergemode(realbuffer, mode); if(!--realbuffer->stashcount) realbuffer->stashavail.signal(); lock=0; @@ -299,6 +307,7 @@ class conxion { final bufcon|conxsess start(void|int waitforreal) { Thread.MutexKey lock; if(lock=(waitforreal?nostash->lock:nostash->trylock)(1)) { + int mode; #ifdef PG_DEBUGRACE conxsess sess = conxsess(this); #endif @@ -306,10 +315,10 @@ class conxion { lock=shortmux->lock(); if(stashcount) PT(stashavail.wait(lock)); - add(stash); stash->clear(); - foreach(stashqueue->try_read_array();;sql_result portal) - queueup(portal); + mode = getstash(KEEP); lock=0; + if (mode > KEEP) + sendcmd(mode); // Force out stash to the server #ifdef PG_DEBUGRACE return sess; #else @@ -331,33 +340,29 @@ class conxion { return 0; } + private int getstash(int mode) { + if (sizeof(stash)) { + add(stash); stash->clear(); + foreach (stashqueue->try_read_array();; int|sql_result portal) + if (intp(portal)) + qportals->write(synctransact++); + else + queueup(portal); + mode = mergemode(this, mode); + stashflushmode = KEEP; + } + return mode; + } + final void sendcmd(void|int mode, void|sql_result portal) { Thread.MutexKey lock; - - int emptystash() { - int ret = 0; - if (started) { - lock = shortmux->lock(); - if (sizeof(stash)) { - add(stash); - stash->clear(); - foreach (stashqueue->try_read_array();; sql_result portal) - queueup(portal); - ret = 1; - } - mode = mergemode(this, mode); - stashflushmode = KEEP; - } - return ret; - }; - - emptystash(); + if (portal) + queueup(portal); +unfinalised: do { - if (portal) - queueup(portal); switch (mode) { default: - continue; + break unfinalised; case SYNCSEND: PD("%d>Sync %d %d Queue\n", socket->query_fd(), synctransact, ++queueoutidx); @@ -365,12 +370,14 @@ class conxion { mode = SENDOUT; break; case FLUSHLOGSEND: - PD("%d>%O %d Queue simplequery %d bytes\n", - socket->query_fd(), portal._portalname, ++queueoutidx, sizeof(this)); + PD("%d>%O %d Queue simplequery %d bytes\n", socket->query_fd(), + portal._portalname, ++queueoutidx, sizeof(this)); mode = FLUSHSEND; } qportals->write(synctransact++); - } while (!lock && emptystash()); + } while(0); + lock = shortmux->lock(); + mode = getstash(mode); catch { outer: do { @@ -379,13 +386,13 @@ outer: PD("%d>Skip flush %d Queue %O\n", socket->query_fd(), mode, (string)this); break outer; + case FLUSHLOGSEND: case FLUSHSEND: PD("Flush\n"); add(PGFLUSH); + case SYNCSEND: case SENDOUT:; } - if(!lock) - lock=shortmux->lock(); if(towrite=sizeof(this)) { PD("%d>Sendcmd %O\n",socket->query_fd(),((string)this)[..towrite-1]); towrite-=output_to(socket,towrite); @@ -1164,7 +1171,8 @@ class sql_result { final void _sendexecute(int fetchlimit,void|bufcon|conxsess plugbuffer) { int flushmode; - PD("Execute portal %O fetchlimit %d\n",_portalname,fetchlimit); + PD("Execute portal %O fetchlimit %d transtype %d\n", _portalname, + fetchlimit, transtype); if(!plugbuffer) plugbuffer=c->start(1); CHAIN(plugbuffer)->add_int8('E')->add_hstring(({_portalname,0}), 4, 8)