diff --git a/lib/modules/Sql.pmod/pgsql_util.pmod b/lib/modules/Sql.pmod/pgsql_util.pmod index 61b059e3c36098418aa4aa78c2bc982cb365dd6e..0cc914194a05905363f954f2e54afb63ee3e964c 100644 --- a/lib/modules/Sql.pmod/pgsql_util.pmod +++ b/lib/modules/Sql.pmod/pgsql_util.pmod @@ -545,8 +545,8 @@ unfinalised: mode = SENDOUT; break; case FLUSHLOGSEND: - PD("%d>%O %d Queue simplequery %d bytes\n", socket->query_fd(), - portal._portalname, ++queueoutidx, sizeof(this)); + PD("%d>%d Queue simplequery %d bytes\n", + socket->query_fd(), ++queueoutidx, sizeof(this)); mode = FLUSHSEND; } qportals->write(synctransact++); @@ -906,6 +906,7 @@ class sql_result { reflock[0] = lock; lock = 0; if (!_sendexecute(0, reflock)) { + PD("Status_command_complete retry closemux %O\n", _portalname); lock = closemux->lock(); continue; } @@ -1155,6 +1156,7 @@ class sql_result { if (sizeof(dtoid) != sizeof(paramValues)) SUSERERROR("Invalid number of bindings, expected %d, got %d\n", sizeof(dtoid), sizeof(paramValues)); + PD("PrepareBind\n"); Thread.MutexKey lock = _ddescribemux->lock(); if (!_portalname) { _portalname @@ -1357,8 +1359,10 @@ class sql_result { if (!datarowtypes) { if (_tprepared && dontcacheprefix->match(_query)) m_delete(pgsqlsess->prepareds, _query), _tprepared = 0; + PD("WaitForDescribe\n"); waitfordescribe(); } + PD("About to bind %d\n", _state); if (_state >= CLOSING) lock = _unnamedstatementkey = 0; else { @@ -1405,8 +1409,10 @@ class sql_result { if ((syncparse || syncparse < 0 && pgsqlsess->wasparallelisable) && !pgsqlsess->statementsinflight->drained()) { lock = 0; // Unlock while we wait - PD("Commit waiting for statements to finish\n"); + PD("Commit waiting for statements to finish %d\n", + pgsqlsess->statementsinflight->_count); catch(PT(pgsqlsess->statementsinflight->wait_till_drained())); + PD("Parseportal retry closemux %O\n", _portalname); continue; } _state = PARSING; @@ -1502,8 +1508,10 @@ class sql_result { if (inflight <= (_fetchlimit - 1) >> 1) { array(Thread.MutexKey) reflock = ({ lock }); lock = 0; - if (!_sendexecute(_fetchlimit, reflock)) + if (!_sendexecute(_fetchlimit, reflock)) { + PD("Replenishrows retry closemux %O\n", _portalname); continue; + } } else PD("<%O _fetchlimit %d, inflight %d, skip execute\n", _portalname, _fetchlimit, inflight); @@ -1539,6 +1547,7 @@ class sql_result { } final void _releasesession(void|string statusccomplete) { + int aborted = statusccomplete == "ABORT" ? 2 : 0; c->runningportals[this] = 0; if (statusccomplete && !statuscmdcomplete) { Thread.MutexKey lock = _ddescribemux->lock(); @@ -1549,13 +1558,14 @@ class sql_result { conxsess plugbuffer; array(Thread.MutexKey) reflock = ({ 0 }); for (;;) { - reflock[0] = closemux->lock(); - if (!catch(plugbuffer = c->start(reflock))) { + reflock[0] = closemux->lock(aborted); + if (!catch(plugbuffer = c->start(reflock))) if (plugbuffer) plugbuffer->sendcmd(_closeportal(plugbuffer, reflock)); - else + else { + PD("Releasesession retry closemux %O\n", _portalname); continue; - } + } break; } reflock[0] = 0; @@ -1564,13 +1574,14 @@ class sql_result { _state = CLOSED; } datarows->write(1); // Signal EOF - releaseconditions(statusccomplete == "ABORT"); + releaseconditions(aborted); } protected void destroy() { - catch { // inside destructors, exceptions don't work - _releasesession("ABORT"); - }; + if (_state < CLOSED) + catch { // inside destructors, exceptions don't work + _releasesession("ABORT"); + }; } final int _sendexecute(int fetchlimit, @@ -1877,7 +1888,7 @@ class proxy { ->add_int32(backendpid)->add(cancelsecret)->sendcmd(FLUSHSEND)); #ifdef PG_DEBUG if (err) - PD("CancelRequest failed to connect %O\n", describe_backtrace(err)); + PD("CancelRequest failed to connect %s\n", describe_backtrace(err)); #endif destruct(lcon); // Destruct explicitly to avoid delayed close #ifdef PG_DEBUGMORE @@ -2510,7 +2521,7 @@ class proxy { switch (msgresponse.S) { case "PANIC":werror(a2nls(lastmessage)); } - case "25P02": // Preserve last error message + case "25P02": // Preserve last error message USERERROR(a2nls(lastmessage)); // Implicitly closed portal } break; @@ -2518,18 +2529,23 @@ class proxy { case 'N': { PD("NoticeResponse\n"); mapping(string:string) msgresponse = getresponse(); - if (clearmessage) { - warningsdropcount += warningscollected; - clearmessage = warningscollected = 0; - lastmessage = ({}); - } - warningscollected++; - lastmessage = ({sprintf("%s %s: %s", - msgresponse.S, msgresponse.C, msgresponse.M)}); - int val; - if (val = msgisfatal(msgresponse)) { // Some warnings are fatal - preplastmessage(msgresponse); - PD(a2nls(lastmessage)); throw(val); + switch (msgresponse.C) { + default: + if (clearmessage) { + warningsdropcount += warningscollected; + clearmessage = warningscollected = 0; + lastmessage = ({}); + } + warningscollected++; + lastmessage = ({sprintf("%s %s: %s", + msgresponse.S, msgresponse.C, msgresponse.M)}); + int val; + if (val = msgisfatal(msgresponse)) { // Some warnings are fatal + preplastmessage(msgresponse); + PD(a2nls(lastmessage)); throw(val); + } + case "25P01": // Suppress some warnings + break; } break; }