Skip to content
Snippets Groups Projects
Commit d3e71488 authored by Stephen R. van den Berg's avatar Stephen R. van den Berg
Browse files

pgsql: Avoid a closemux mutex race in parallel queries.

parent b22c04c6
Branches
Tags
No related merge requests found
...@@ -138,7 +138,8 @@ final Regexp transendprefix ...@@ -138,7 +138,8 @@ final Regexp transendprefix
* tradeoff. * tradeoff.
*/ */
private Regexp execfetchlimit private Regexp execfetchlimit
= iregexp("^\a*((UPDA|DELE)TE|INSERT)\a|\aLIMIT\a+[1-9][; \t\f\r\n]*$"); = iregexp("^\a*((UPDA|DELE)TE|INSERT"
"|RESET|CLOSE|DISCARD)\a|\aLIMIT\a+[1-9][; \t\f\r\n]*$");
private void default_backend_runs() { // Runs as soon as the private void default_backend_runs() { // Runs as soon as the
cb_backend = Pike.DefaultBackend; // DefaultBackend has started cb_backend = Pike.DefaultBackend; // DefaultBackend has started
...@@ -300,8 +301,12 @@ class bufcon { ...@@ -300,8 +301,12 @@ class bufcon {
return realbuffer->stashcount; return realbuffer->stashcount;
} }
final bufcon start(void|int waitforreal) { final MUTEX `shortmux() {
dirty = realbuffer->stashcount->acquire(); return realbuffer->shortmux;
}
final bufcon start(void|int|array(Thread.MutexKey) waitforreal) {
dirty = stashcount->acquire();
#ifdef PG_DEBUG #ifdef PG_DEBUG
if (waitforreal) if (waitforreal)
error("pgsql.bufcon not allowed here\n"); error("pgsql.bufcon not allowed here\n");
...@@ -310,7 +315,7 @@ class bufcon { ...@@ -310,7 +315,7 @@ class bufcon {
} }
final void sendcmd(int mode, void|sql_result portal) { final void sendcmd(int mode, void|sql_result portal) {
Thread.MutexKey lock = realbuffer->shortmux->lock(); Thread.MutexKey lock = shortmux->lock();
if (portal) if (portal)
realbuffer->stashqueue->write(portal); realbuffer->stashqueue->write(portal);
if (mode == SYNCSEND) { if (mode == SYNCSEND) {
...@@ -323,8 +328,8 @@ class bufcon { ...@@ -323,8 +328,8 @@ class bufcon {
realbuffer->socket->query_fd(), mode, realbuffer->stashflushmode); realbuffer->socket->query_fd(), mode, realbuffer->stashflushmode);
if (mode > realbuffer->stashflushmode) if (mode > realbuffer->stashflushmode)
realbuffer->stashflushmode = mode; realbuffer->stashflushmode = mode;
dirty = 0; // Countdown before releasing the lock
lock = 0; lock = 0;
dirty = 0;
this->clear(); this->clear();
if (lock = realbuffer->nostash->trylock(1)) { if (lock = realbuffer->nostash->trylock(1)) {
#ifdef PG_DEBUGRACE #ifdef PG_DEBUGRACE
...@@ -462,20 +467,25 @@ class conxion { ...@@ -462,20 +467,25 @@ class conxion {
portal._portalname, ++queueoutidx, synctransact, sizeof(this)); portal._portalname, ++queueoutidx, synctransact, sizeof(this));
} }
final bufcon|conxsess start(void|int waitforreal) { final bufcon|conxsess start(void|int|array(Thread.MutexKey) waitforreal) {
Thread.MutexKey lock; Thread.MutexKey lock;
#ifdef PG_DEBUGRACE #ifdef PG_DEBUGRACE
if (nostash->current_locking_thread()) if (nostash->current_locking_thread())
PD("Nostash locked by %s\n", PD("Nostash locked by %s\n",
describe_backtrace(nostash->current_locking_thread()->backtrace())); describe_backtrace(nostash->current_locking_thread()->backtrace()));
#endif #endif
while (lock = (waitforreal > 0 ? nostash->lock : nostash->trylock)(1)) { while (lock = (intp(waitforreal) && waitforreal > 0
? nostash->lock : nostash->trylock)(1)) {
int mode; int mode;
if (sizeof(stash) && (mode = getstash(KEEP)) > KEEP) if (sizeof(stash) && (mode = getstash(KEEP)) > KEEP)
sendcmd(mode); // Force out stash to the server sendcmd(mode); // Force out stash to the server
if (!stashcount->drained()) { if (!stashcount->drained()) {
lock = 0; // Unlock while we wait lock = 0; // Unlock while we wait
if (arrayp(waitforreal))
waitforreal[0] = 0;
stashcount->wait_till_drained(); stashcount->wait_till_drained();
if (arrayp(waitforreal))
return 0;
continue; // Try again continue; // Try again
} }
#ifdef PG_DEBUGRACE #ifdef PG_DEBUGRACE
...@@ -488,6 +498,8 @@ class conxion { ...@@ -488,6 +498,8 @@ class conxion {
return this; return this;
#endif #endif
} }
if (arrayp(waitforreal))
waitforreal[0] = 0;
return !waitforreal && bufcon(this)->start(); return !waitforreal && bufcon(this)->start();
} }
...@@ -563,8 +575,7 @@ outer: ...@@ -563,8 +575,7 @@ outer:
} }
Thread.MutexKey lock = shortmux->trylock(); Thread.MutexKey lock = shortmux->trylock();
if (lock && sizeof(this)) { if (lock && sizeof(this)) {
PD("%d>Sendcmd %O\n", PD("%d>Sendcmd %O\n", socket->query_fd(), (string)this);
socket->query_fd(), (string)this);
output_to(socket); output_to(socket);
} }
} while (0); } while (0);
...@@ -890,7 +901,7 @@ class sql_result { ...@@ -890,7 +901,7 @@ class sql_result {
{ {
Thread.MutexKey lock = closemux->lock(); Thread.MutexKey lock = closemux->lock();
if (_fetchlimit) { if (_fetchlimit) {
array reflock = ({ _fetchlimit = 0 }); array(Thread.MutexKey) reflock = ({ _fetchlimit = 0 });
for (;;) { for (;;) {
reflock[0] = lock; reflock[0] = lock;
lock = 0; lock = 0;
...@@ -899,7 +910,7 @@ class sql_result { ...@@ -899,7 +910,7 @@ class sql_result {
continue; continue;
} }
} }
} else }
lock = 0; // Force release before acquiring next lock = 0; // Force release before acquiring next
lock = _ddescribemux->lock(); lock = _ddescribemux->lock();
if (!statuscmdcomplete) if (!statuscmdcomplete)
...@@ -1357,7 +1368,7 @@ class sql_result { ...@@ -1357,7 +1368,7 @@ class sql_result {
lock = 0; lock = 0;
} else if (syncparse < 0 && !pgsqlsess->wasparallelisable } else if (syncparse < 0 && !pgsqlsess->wasparallelisable
&& !pgsqlsess->statementsinflight->drained(1)) { && !pgsqlsess->statementsinflight->drained(1)) {
lock = 0; lock = 0; // Unlock while we wait
PD("Commit waiting for statements to finish\n"); PD("Commit waiting for statements to finish\n");
catch(PT(pgsqlsess->statementsinflight->wait_till_drained(1))); catch(PT(pgsqlsess->statementsinflight->wait_till_drained(1)));
} }
...@@ -1389,14 +1400,18 @@ class sql_result { ...@@ -1389,14 +1400,18 @@ class sql_result {
} }
final void _parseportal() { final void _parseportal() {
{ for (;;) {
Thread.MutexKey lock = closemux->lock(); Thread.MutexKey lock = closemux->lock();
_state = PARSING; if ((syncparse || syncparse < 0 && pgsqlsess->wasparallelisable)
if (syncparse || syncparse < 0 && pgsqlsess->wasparallelisable) { && !pgsqlsess->statementsinflight->drained()) {
lock = 0; // Unlock while we wait
PD("Commit waiting for statements to finish\n"); PD("Commit waiting for statements to finish\n");
catch(PT(pgsqlsess->statementsinflight->wait_till_drained())); catch(PT(pgsqlsess->statementsinflight->wait_till_drained()));
continue;
} }
_state = PARSING;
stmtifkey = pgsqlsess->statementsinflight->acquire(); stmtifkey = pgsqlsess->statementsinflight->acquire();
break;
} }
statuscmdcomplete = 0; statuscmdcomplete = 0;
pgsqlsess->wasparallelisable = paralleliseprefix->match(_query); pgsqlsess->wasparallelisable = paralleliseprefix->match(_query);
...@@ -1457,13 +1472,15 @@ class sql_result { ...@@ -1457,13 +1472,15 @@ class sql_result {
_unnamedportalkey = 0; _unnamedportalkey = 0;
portalsifkey = 0; portalsifkey = 0;
if (pgsqlsess->portalsinflight->drained()) { if (pgsqlsess->portalsinflight->drained()) {
if (plugbuffer->stashcount->drained() && transtype != TRANSBEGIN) if (plugbuffer->stashcount->drained() && transtype != TRANSBEGIN) {
/* /*
* stashcount will be non-zero if a parse request has been queued * stashcount will be non-zero if a parse request has been queued
* before the close was initiated. * before the close was initiated.
* It's a bit of a tricky race, but this check should be sufficient.
*/ */
Thread.MutexKey lock = plugbuffer->shortmux->lock();
if (plugbuffer->stashcount->drained())
pgsqlsess->readyforquerycount++, retval = SYNCSEND; pgsqlsess->readyforquerycount++, retval = SYNCSEND;
}
pgsqlsess->pportalcount = 0; pgsqlsess->pportalcount = 0;
} }
} }
...@@ -1483,7 +1500,7 @@ class sql_result { ...@@ -1483,7 +1500,7 @@ class sql_result {
* index / bytesreceived || 1, _fetchlimit); * index / bytesreceived || 1, _fetchlimit);
if (_fetchlimit) if (_fetchlimit)
if (inflight <= (_fetchlimit - 1) >> 1) { if (inflight <= (_fetchlimit - 1) >> 1) {
array reflock = ({ lock }); array(Thread.MutexKey) reflock = ({ lock });
lock = 0; lock = 0;
if (!_sendexecute(_fetchlimit, reflock)) if (!_sendexecute(_fetchlimit, reflock))
continue; continue;
...@@ -1530,9 +1547,17 @@ class sql_result { ...@@ -1530,9 +1547,17 @@ class sql_result {
} }
inflight = 0; inflight = 0;
conxsess plugbuffer; conxsess plugbuffer;
array(Thread.MutexKey) reflock = ({closemux->lock()}); array(Thread.MutexKey) reflock = ({ 0 });
if (!catch(plugbuffer = c->start())) for (;;) {
reflock[0] = closemux->lock();
if (!catch(plugbuffer = c->start(reflock))) {
if (plugbuffer)
plugbuffer->sendcmd(_closeportal(plugbuffer, reflock)); plugbuffer->sendcmd(_closeportal(plugbuffer, reflock));
else
continue;
}
break;
}
reflock[0] = 0; reflock[0] = 0;
if (_state < CLOSED) { if (_state < CLOSED) {
stmtifkey = 0; stmtifkey = 0;
...@@ -1556,11 +1581,9 @@ class sql_result { ...@@ -1556,11 +1581,9 @@ class sql_result {
fetchlimit, transtype); fetchlimit, transtype);
if (arrayp(plugbuffer)) { if (arrayp(plugbuffer)) {
reflock = plugbuffer; reflock = plugbuffer;
if (!(plugbuffer = c->start(-1))) { if (!(plugbuffer = c->start(reflock)))
reflock[0] = 0;
return 0; // Found potential deadlock, release and try again return 0; // Found potential deadlock, release and try again
} }
}
CHAIN(plugbuffer)->add_int8('E')->add_hstring(({_portalname, 0}), 4, 8) CHAIN(plugbuffer)->add_int8('E')->add_hstring(({_portalname, 0}), 4, 8)
->add_int32(fetchlimit); ->add_int32(fetchlimit);
if (!fetchlimit) { if (!fetchlimit) {
...@@ -2231,9 +2254,9 @@ class proxy { ...@@ -2231,9 +2254,9 @@ class proxy {
#endif #endif
readyforquerycount--; readyforquerycount--;
function (:void) cb; function (:void) cb;
destruct(waitforauthready);
if (cb = readyforquery_cb) if (cb = readyforquery_cb)
readyforquery_cb = 0, cb(); readyforquery_cb = 0, cb();
destruct(waitforauthready);
break; break;
} }
case '1': case '1':
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment