diff --git a/lib/modules/Sql.pmod/pgsql_util.pmod b/lib/modules/Sql.pmod/pgsql_util.pmod index e52286dac9a26b64b5ae627ffbc7c5e47f6a195a..61b059e3c36098418aa4aa78c2bc982cb365dd6e 100644 --- a/lib/modules/Sql.pmod/pgsql_util.pmod +++ b/lib/modules/Sql.pmod/pgsql_util.pmod @@ -138,7 +138,8 @@ final Regexp transendprefix * tradeoff. */ private Regexp execfetchlimit - = iregexp("^\a*((UPDA|DELE)TE|INSERT)\a|\aLIMIT\a+[1-9][; \t\f\r\n]*$"); + = iregexp("^\a*((UPDA|DELE)TE|INSERT" + "|RESET|CLOSE|DISCARD)\a|\aLIMIT\a+[1-9][; \t\f\r\n]*$"); private void default_backend_runs() { // Runs as soon as the cb_backend = Pike.DefaultBackend; // DefaultBackend has started @@ -300,8 +301,12 @@ class bufcon { return realbuffer->stashcount; } - final bufcon start(void|int waitforreal) { - dirty = realbuffer->stashcount->acquire(); + final MUTEX `shortmux() { + return realbuffer->shortmux; + } + + final bufcon start(void|int|array(Thread.MutexKey) waitforreal) { + dirty = stashcount->acquire(); #ifdef PG_DEBUG if (waitforreal) error("pgsql.bufcon not allowed here\n"); @@ -310,7 +315,7 @@ class bufcon { } final void sendcmd(int mode, void|sql_result portal) { - Thread.MutexKey lock = realbuffer->shortmux->lock(); + Thread.MutexKey lock = shortmux->lock(); if (portal) realbuffer->stashqueue->write(portal); if (mode == SYNCSEND) { @@ -323,8 +328,8 @@ class bufcon { realbuffer->socket->query_fd(), mode, realbuffer->stashflushmode); if (mode > realbuffer->stashflushmode) realbuffer->stashflushmode = mode; + dirty = 0; // Countdown before releasing the lock lock = 0; - dirty = 0; this->clear(); if (lock = realbuffer->nostash->trylock(1)) { #ifdef PG_DEBUGRACE @@ -462,20 +467,25 @@ class conxion { portal._portalname, ++queueoutidx, synctransact, sizeof(this)); } - final bufcon|conxsess start(void|int waitforreal) { + final bufcon|conxsess start(void|int|array(Thread.MutexKey) waitforreal) { Thread.MutexKey lock; #ifdef PG_DEBUGRACE if (nostash->current_locking_thread()) PD("Nostash locked by %s\n", describe_backtrace(nostash->current_locking_thread()->backtrace())); #endif - while (lock = (waitforreal > 0 ? nostash->lock : nostash->trylock)(1)) { + while (lock = (intp(waitforreal) && waitforreal > 0 + ? nostash->lock : nostash->trylock)(1)) { int mode; if (sizeof(stash) && (mode = getstash(KEEP)) > KEEP) sendcmd(mode); // Force out stash to the server if (!stashcount->drained()) { lock = 0; // Unlock while we wait + if (arrayp(waitforreal)) + waitforreal[0] = 0; stashcount->wait_till_drained(); + if (arrayp(waitforreal)) + return 0; continue; // Try again } #ifdef PG_DEBUGRACE @@ -488,6 +498,8 @@ class conxion { return this; #endif } + if (arrayp(waitforreal)) + waitforreal[0] = 0; return !waitforreal && bufcon(this)->start(); } @@ -563,8 +575,7 @@ outer: } Thread.MutexKey lock = shortmux->trylock(); if (lock && sizeof(this)) { - PD("%d>Sendcmd %O\n", - socket->query_fd(), (string)this); + PD("%d>Sendcmd %O\n", socket->query_fd(), (string)this); output_to(socket); } } while (0); @@ -890,7 +901,7 @@ class sql_result { { Thread.MutexKey lock = closemux->lock(); if (_fetchlimit) { - array reflock = ({ _fetchlimit = 0 }); + array(Thread.MutexKey) reflock = ({ _fetchlimit = 0 }); for (;;) { reflock[0] = lock; lock = 0; @@ -899,8 +910,8 @@ class sql_result { continue; } } - } else - lock = 0; // Force release before acquiring next + } + lock = 0; // Force release before acquiring next lock = _ddescribemux->lock(); if (!statuscmdcomplete) PT(_ddescribe->wait(lock)); @@ -1357,7 +1368,7 @@ class sql_result { lock = 0; } else if (syncparse < 0 && !pgsqlsess->wasparallelisable && !pgsqlsess->statementsinflight->drained(1)) { - lock = 0; + lock = 0; // Unlock while we wait PD("Commit waiting for statements to finish\n"); catch(PT(pgsqlsess->statementsinflight->wait_till_drained(1))); } @@ -1389,14 +1400,18 @@ class sql_result { } final void _parseportal() { - { + for (;;) { Thread.MutexKey lock = closemux->lock(); - _state = PARSING; - if (syncparse || syncparse < 0 && pgsqlsess->wasparallelisable) { + if ((syncparse || syncparse < 0 && pgsqlsess->wasparallelisable) + && !pgsqlsess->statementsinflight->drained()) { + lock = 0; // Unlock while we wait PD("Commit waiting for statements to finish\n"); catch(PT(pgsqlsess->statementsinflight->wait_till_drained())); + continue; } + _state = PARSING; stmtifkey = pgsqlsess->statementsinflight->acquire(); + break; } statuscmdcomplete = 0; pgsqlsess->wasparallelisable = paralleliseprefix->match(_query); @@ -1457,13 +1472,15 @@ class sql_result { _unnamedportalkey = 0; portalsifkey = 0; if (pgsqlsess->portalsinflight->drained()) { - if (plugbuffer->stashcount->drained() && transtype != TRANSBEGIN) + if (plugbuffer->stashcount->drained() && transtype != TRANSBEGIN) { /* * stashcount will be non-zero if a parse request has been queued * before the close was initiated. - * It's a bit of a tricky race, but this check should be sufficient. */ - pgsqlsess->readyforquerycount++, retval = SYNCSEND; + Thread.MutexKey lock = plugbuffer->shortmux->lock(); + if (plugbuffer->stashcount->drained()) + pgsqlsess->readyforquerycount++, retval = SYNCSEND; + } pgsqlsess->pportalcount = 0; } } @@ -1483,7 +1500,7 @@ class sql_result { * index / bytesreceived || 1, _fetchlimit); if (_fetchlimit) if (inflight <= (_fetchlimit - 1) >> 1) { - array reflock = ({ lock }); + array(Thread.MutexKey) reflock = ({ lock }); lock = 0; if (!_sendexecute(_fetchlimit, reflock)) continue; @@ -1530,9 +1547,17 @@ class sql_result { } inflight = 0; conxsess plugbuffer; - array(Thread.MutexKey) reflock = ({closemux->lock()}); - if (!catch(plugbuffer = c->start())) - plugbuffer->sendcmd(_closeportal(plugbuffer, reflock)); + array(Thread.MutexKey) reflock = ({ 0 }); + for (;;) { + reflock[0] = closemux->lock(); + if (!catch(plugbuffer = c->start(reflock))) { + if (plugbuffer) + plugbuffer->sendcmd(_closeportal(plugbuffer, reflock)); + else + continue; + } + break; + } reflock[0] = 0; if (_state < CLOSED) { stmtifkey = 0; @@ -1556,10 +1581,8 @@ class sql_result { fetchlimit, transtype); if (arrayp(plugbuffer)) { reflock = plugbuffer; - if (!(plugbuffer = c->start(-1))) { - reflock[0] = 0; + if (!(plugbuffer = c->start(reflock))) return 0; // Found potential deadlock, release and try again - } } CHAIN(plugbuffer)->add_int8('E')->add_hstring(({_portalname, 0}), 4, 8) ->add_int32(fetchlimit); @@ -2231,9 +2254,9 @@ class proxy { #endif readyforquerycount--; function (:void) cb; + destruct(waitforauthready); if (cb = readyforquery_cb) readyforquery_cb = 0, cb(); - destruct(waitforauthready); break; } case '1':