pgsql: Avoid a closemux mutex race in parallel queries.

parent 2e4af17b
......@@ -139,7 +139,8 @@ final Regexp transendprefix
* tradeoff.
*/
private Regexp execfetchlimit
= iregexp("^\a*((UPDA|DELE)TE|INSERT)\a|\aLIMIT\a+[1-9][; \t\f\r\n]*$");
= iregexp("^\a*((UPDA|DELE)TE|INSERT"
"|RESET|CLOSE|DISCARD)\a|\aLIMIT\a+[1-9][; \t\f\r\n]*$");
private void default_backend_runs() { // Runs as soon as the
cb_backend = Pike.DefaultBackend; // DefaultBackend has started
......@@ -348,8 +349,12 @@ class bufcon {
return realbuffer->stashcount;
}
final bufcon start(void|int waitforreal) {
dirty = realbuffer->stashcount->acquire();
final MUTEX `shortmux() {
return realbuffer->shortmux;
}
final bufcon start(void|int|array(Thread.MutexKey) waitforreal) {
dirty = stashcount->acquire();
#ifdef PG_DEBUG
if (waitforreal)
error("pgsql.bufcon not allowed here\n");
......@@ -358,7 +363,7 @@ class bufcon {
}
final void sendcmd(int mode, void|Result portal) {
Thread.MutexKey lock = realbuffer->shortmux->lock();
Thread.MutexKey lock = shortmux->lock();
if (portal)
realbuffer->stashqueue->write(portal);
if (mode == SYNCSEND) {
......@@ -371,8 +376,8 @@ class bufcon {
realbuffer->socket->query_fd(), mode, realbuffer->stashflushmode);
if (mode > realbuffer->stashflushmode)
realbuffer->stashflushmode = mode;
dirty = 0; // Countdown before releasing the lock
lock = 0;
dirty = 0;
this->clear();
if (lock = realbuffer->nostash->trylock(1)) {
#ifdef PG_DEBUGRACE
......@@ -510,20 +515,25 @@ class conxion {
portal._portalname, ++queueoutidx, synctransact, sizeof(this));
}
final bufcon|conxsess start(void|int waitforreal) {
final bufcon|conxsess start(void|int|array(Thread.MutexKey) waitforreal) {
Thread.MutexKey lock;
#ifdef PG_DEBUGRACE
if (nostash->current_locking_thread())
PD("Nostash locked by %s\n",
describe_backtrace(nostash->current_locking_thread()->backtrace()));
#endif
while (lock = (waitforreal > 0 ? nostash->lock : nostash->trylock)(1)) {
while (lock = (intp(waitforreal) && waitforreal > 0
? nostash->lock : nostash->trylock)(1)) {
int mode;
if (sizeof(stash) && (mode = getstash(KEEP)) > KEEP)
sendcmd(mode); // Force out stash to the server
if (!stashcount->drained()) {
lock = 0; // Unlock while we wait
if (arrayp(waitforreal))
waitforreal[0] = 0;
stashcount->wait_till_drained();
if (arrayp(waitforreal))
return 0;
continue; // Try again
}
#ifdef PG_DEBUGRACE
......@@ -536,6 +546,8 @@ class conxion {
return this;
#endif
}
if (arrayp(waitforreal))
waitforreal[0] = 0;
return !waitforreal && bufcon(this)->start();
}
......@@ -611,8 +623,7 @@ outer:
}
Thread.MutexKey lock = shortmux->trylock();
if (lock && sizeof(this)) {
PD("%d>Sendcmd %O\n",
socket->query_fd(), (string)this);
PD("%d>Sendcmd %O\n", socket->query_fd(), (string)this);
output_to(socket);
}
} while (0);
......@@ -936,7 +947,7 @@ class Result {
{
Thread.MutexKey lock = closemux->lock();
if (_fetchlimit) {
array reflock = ({ _fetchlimit = 0 });
array(Thread.MutexKey) reflock = ({ _fetchlimit = 0 });
for (;;) {
reflock[0] = lock;
lock = 0;
......@@ -945,8 +956,8 @@ class Result {
continue;
}
}
} else
lock = 0; // Force release before acquiring next
}
lock = 0; // Force release before acquiring next
lock = _ddescribemux->lock();
if (!statuscmdcomplete)
PT(_ddescribe->wait(lock));
......@@ -1539,7 +1550,7 @@ class Result {
lock = 0;
} else if (syncparse < 0 && !pgsqlsess->wasparallelisable
&& !pgsqlsess->statementsinflight->drained(1)) {
lock = 0;
lock = 0; // Unlock while we wait
PD("Commit waiting for statements to finish\n");
catch(PT(pgsqlsess->statementsinflight->wait_till_drained(1)));
}
......@@ -1571,14 +1582,18 @@ class Result {
}
final void _parseportal() {
{
for (;;) {
Thread.MutexKey lock = closemux->lock();
_state = PARSING;
if (syncparse || syncparse < 0 && pgsqlsess->wasparallelisable) {
if ((syncparse || syncparse < 0 && pgsqlsess->wasparallelisable)
&& !pgsqlsess->statementsinflight->drained()) {
lock = 0; // Unlock while we wait
PD("Commit waiting for statements to finish\n");
catch(PT(pgsqlsess->statementsinflight->wait_till_drained()));
continue;
}
_state = PARSING;
stmtifkey = pgsqlsess->statementsinflight->acquire();
break;
}
statuscmdcomplete = 0;
pgsqlsess->wasparallelisable = paralleliseprefix->match(_query);
......@@ -1639,13 +1654,15 @@ class Result {
_unnamedportalkey = 0;
portalsifkey = 0;
if (pgsqlsess->portalsinflight->drained()) {
if (plugbuffer->stashcount->drained() && transtype != TRANSBEGIN)
if (plugbuffer->stashcount->drained() && transtype != TRANSBEGIN) {
/*
* stashcount will be non-zero if a parse request has been queued
* before the close was initiated.
* It's a bit of a tricky race, but this check should be sufficient.
*/
pgsqlsess->readyforquerycount++, retval = SYNCSEND;
Thread.MutexKey lock = plugbuffer->shortmux->lock();
if (plugbuffer->stashcount->drained())
pgsqlsess->readyforquerycount++, retval = SYNCSEND;
}
pgsqlsess->pportalcount = 0;
}
}
......@@ -1665,7 +1682,7 @@ class Result {
* index / bytesreceived || 1, _fetchlimit);
if (_fetchlimit)
if (inflight <= (_fetchlimit - 1) >> 1) {
array reflock = ({ lock });
array(Thread.MutexKey) reflock = ({ lock });
lock = 0;
if (!_sendexecute(_fetchlimit, reflock))
continue;
......@@ -1712,9 +1729,17 @@ class Result {
}
inflight = 0;
conxsess plugbuffer;
array(Thread.MutexKey) reflock = ({closemux->lock()});
if (!catch(plugbuffer = c->start()))
plugbuffer->sendcmd(_closeportal(plugbuffer, reflock));
array(Thread.MutexKey) reflock = ({ 0 });
for (;;) {
reflock[0] = closemux->lock();
if (!catch(plugbuffer = c->start(reflock))) {
if (plugbuffer)
plugbuffer->sendcmd(_closeportal(plugbuffer, reflock));
else
continue;
}
break;
}
reflock[0] = 0;
if (_state < CLOSED) {
stmtifkey = 0;
......@@ -1738,10 +1763,8 @@ class Result {
fetchlimit, transtype);
if (arrayp(plugbuffer)) {
reflock = plugbuffer;
if (!(plugbuffer = c->start(-1))) {
reflock[0] = 0;
if (!(plugbuffer = c->start(reflock)))
return 0; // Found potential deadlock, release and try again
}
}
CHAIN(plugbuffer)->add_int8('E')->add_hstring(({_portalname, 0}), 4, 8)
->add_int32(fetchlimit);
......@@ -2413,9 +2436,9 @@ class proxy {
#endif
readyforquerycount--;
function (:void) cb;
destruct(waitforauthready);
if (cb = readyforquery_cb)
readyforquery_cb = 0, cb();
destruct(waitforauthready);
break;
}
case '1':
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment