pgsql: Release a portal only once (destructor becomes a no-op).

parent 50267017
......@@ -593,8 +593,8 @@ unfinalised:
mode = SENDOUT;
break;
case FLUSHLOGSEND:
PD("%d>%O %d Queue simplequery %d bytes\n", socket->query_fd(),
portal._portalname, ++queueoutidx, sizeof(this));
PD("%d>%d Queue simplequery %d bytes\n",
socket->query_fd(), ++queueoutidx, sizeof(this));
mode = FLUSHSEND;
}
qportals->write(synctransact++);
......@@ -952,6 +952,7 @@ class Result {
reflock[0] = lock;
lock = 0;
if (!_sendexecute(0, reflock)) {
PD("Status_command_complete retry closemux %O\n", _portalname);
lock = closemux->lock();
continue;
}
......@@ -1268,6 +1269,7 @@ class Result {
if (sizeof(dtoid) != sizeof(paramValues))
SUSERERROR("Invalid number of bindings, expected %d, got %d\n",
sizeof(dtoid), sizeof(paramValues));
PD("PrepareBind\n");
Thread.MutexKey lock = _ddescribemux->lock();
if (!_portalname) {
_portalname
......@@ -1539,8 +1541,10 @@ class Result {
if (!datarowtypes) {
if (_tprepared && dontcacheprefix->match(_query))
m_delete(pgsqlsess->prepareds, _query), _tprepared = 0;
PD("WaitForDescribe\n");
waitfordescribe();
}
PD("About to bind %d\n", _state);
if (_state >= CLOSING)
lock = _unnamedstatementkey = 0;
else {
......@@ -1587,8 +1591,10 @@ class Result {
if ((syncparse || syncparse < 0 && pgsqlsess->wasparallelisable)
&& !pgsqlsess->statementsinflight->drained()) {
lock = 0; // Unlock while we wait
PD("Commit waiting for statements to finish\n");
PD("Commit waiting for statements to finish %d\n",
pgsqlsess->statementsinflight->_count);
catch(PT(pgsqlsess->statementsinflight->wait_till_drained()));
PD("Parseportal retry closemux %O\n", _portalname);
continue;
}
_state = PARSING;
......@@ -1684,8 +1690,10 @@ class Result {
if (inflight <= (_fetchlimit - 1) >> 1) {
array(Thread.MutexKey) reflock = ({ lock });
lock = 0;
if (!_sendexecute(_fetchlimit, reflock))
if (!_sendexecute(_fetchlimit, reflock)) {
PD("Replenishrows retry closemux %O\n", _portalname);
continue;
}
} else
PD("<%O _fetchlimit %d, inflight %d, skip execute\n",
_portalname, _fetchlimit, inflight);
......@@ -1721,6 +1729,7 @@ class Result {
}
final void _releasesession(void|string statusccomplete) {
int aborted = statusccomplete == "ABORT" ? 2 : 0;
c->runningportals[this] = 0;
if (statusccomplete && !statuscmdcomplete) {
Thread.MutexKey lock = _ddescribemux->lock();
......@@ -1731,13 +1740,14 @@ class Result {
conxsess plugbuffer;
array(Thread.MutexKey) reflock = ({ 0 });
for (;;) {
reflock[0] = closemux->lock();
if (!catch(plugbuffer = c->start(reflock))) {
reflock[0] = closemux->lock(aborted);
if (!catch(plugbuffer = c->start(reflock)))
if (plugbuffer)
plugbuffer->sendcmd(_closeportal(plugbuffer, reflock));
else
else {
PD("Releasesession retry closemux %O\n", _portalname);
continue;
}
}
break;
}
reflock[0] = 0;
......@@ -1746,13 +1756,14 @@ class Result {
_state = CLOSED;
}
datarows->write(1); // Signal EOF
releaseconditions(statusccomplete == "ABORT");
releaseconditions(aborted);
}
protected void _destruct() {
catch { // inside destructors, exceptions don't work
_releasesession("ABORT");
};
if (_state < CLOSED)
catch { // inside destructors, exceptions don't work
_releasesession("ABORT");
};
}
final int _sendexecute(int fetchlimit,
......@@ -2059,7 +2070,7 @@ class proxy {
->add_int32(backendpid)->add(cancelsecret)->sendcmd(FLUSHSEND));
#ifdef PG_DEBUG
if (err)
PD("CancelRequest failed to connect %O\n", describe_backtrace(err));
PD("CancelRequest failed to connect %s\n", describe_backtrace(err));
#endif
destruct(lcon); // Destruct explicitly to avoid delayed close
#ifdef PG_DEBUGMORE
......@@ -2692,7 +2703,7 @@ class proxy {
switch (msgresponse.S) {
case "PANIC":werror(a2nls(lastmessage));
}
case "25P02": // Preserve last error message
case "25P02": // Preserve last error message
USERERROR(a2nls(lastmessage)); // Implicitly closed portal
}
break;
......@@ -2700,18 +2711,23 @@ class proxy {
case 'N': {
PD("NoticeResponse\n");
mapping(string:string) msgresponse = getresponse();
if (clearmessage) {
warningsdropcount += warningscollected;
clearmessage = warningscollected = 0;
lastmessage = ({});
}
warningscollected++;
lastmessage = ({sprintf("%s %s: %s",
msgresponse.S, msgresponse.C, msgresponse.M)});
int val;
if (val = msgisfatal(msgresponse)) { // Some warnings are fatal
preplastmessage(msgresponse);
PD(a2nls(lastmessage)); throw(val);
switch (msgresponse.C) {
default:
if (clearmessage) {
warningsdropcount += warningscollected;
clearmessage = warningscollected = 0;
lastmessage = ({});
}
warningscollected++;
lastmessage = ({sprintf("%s %s: %s",
msgresponse.S, msgresponse.C, msgresponse.M)});
int val;
if (val = msgisfatal(msgresponse)) { // Some warnings are fatal
preplastmessage(msgresponse);
PD(a2nls(lastmessage)); throw(val);
}
case "25P01": // Suppress some warnings
break;
}
break;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment