From 4d69e0677bf757bcd6c9c1373032a7e7db294ef6 Mon Sep 17 00:00:00 2001 From: "Stephen R. van den Berg" <srb@cuci.nl> Date: Tue, 28 Nov 2017 13:39:22 +0100 Subject: [PATCH] pgsql: Rely on block boundaries to release locks. --- lib/modules/Sql.pmod/pgsql.pike | 12 +-- lib/modules/Sql.pmod/pgsql_util.pmod | 118 +++++++++++++-------------- 2 files changed, 63 insertions(+), 67 deletions(-) diff --git a/lib/modules/Sql.pmod/pgsql.pike b/lib/modules/Sql.pmod/pgsql.pike index 964733a8d3..dce5a04358 100644 --- a/lib/modules/Sql.pmod/pgsql.pike +++ b/lib/modules/Sql.pmod/pgsql.pike @@ -452,7 +452,6 @@ private void waitauthready() { proxy.c?->socket && proxy.c->socket->query_fd(), backtrace()[-2]); Thread.MutexKey lock = proxy.shortmux->lock(); catch(PT(proxy.waitforauthready->wait(lock))); - lock = 0; PD("%d Wait for auth ready released.\n", proxy.c?->socket && proxy.c->socket->query_fd()); } @@ -1082,11 +1081,12 @@ private inline void throwdelayederror(object parent) { portal._portalname = ""; portal->_parseportal(); portal->_bindportal(); proxy.readyforquerycount++; - Thread.MutexKey lock = proxy.unnamedstatement->lock(1); - .pgsql_util.conxsess cs = c->start(1); - CHAIN(cs)->add_int8('Q')->add_hstring(({q, 0}), 4, 4); - cs->sendcmd(FLUSHLOGSEND, portal); - lock = 0; + { + Thread.MutexKey lock = proxy.unnamedstatement->lock(1); + .pgsql_util.conxsess cs = c->start(1); + CHAIN(cs)->add_int8('Q')->add_hstring(({q, 0}), 4, 4); + cs->sendcmd(FLUSHLOGSEND, portal); + } PD("Simple query: %O\n", q); } else { object plugbuffer; diff --git a/lib/modules/Sql.pmod/pgsql_util.pmod b/lib/modules/Sql.pmod/pgsql_util.pmod index d63975ff88..dd12ab5e8c 100644 --- a/lib/modules/Sql.pmod/pgsql_util.pmod +++ b/lib/modules/Sql.pmod/pgsql_util.pmod @@ -264,7 +264,6 @@ class conxiin { if (!didreadcb) fillread.wait(lock); didreadcb = 0; - lock = 0; } else throw(MAGICTERMINATE); return true; @@ -281,7 +280,6 @@ class conxiin { procmsg = 0, lock = 0, Thread.Thread(id); else if (fillread) didreadcb = 1, fillread.signal(); - lock = 0; return 0; } @@ -438,7 +436,7 @@ outer: towrite -= output_to(socket, towrite); } } while (0); - lock = started = 0; + started = 0; return; }; lock = 0; @@ -449,12 +447,13 @@ outer: final int close() { if (!closenext && nostash) { closenext = 1; - Thread.MutexKey lock = i->fillreadmux->lock(); - if (i->fillread) { // Delayed close() after flushing the output buffer - i->fillread.signal(); - i->fillread = 0; + { + Thread.MutexKey lock = i->fillreadmux->lock(); + if (i->fillread) { // Delayed close() after flushing the output buffer + i->fillread.signal(); + i->fillread = 0; + } } - lock = 0; PD("%d>Delayed close, flush write\n", socket->query_fd()); i->read_cb(socket->query_id(), 0); return 0; @@ -682,7 +681,9 @@ class sql_result { int _portalbuffersize, int alltyped, array params, int forcetext, int _timeout, int _syncparse, int _transtype) { pgsqlsess = _pgsqlsess; - if (catch(cr = (c = _c)->i)) + if (c = _c) + cr = c->i; + else losterror(); _query = query; datarows = Thread.Queue(); @@ -735,10 +736,11 @@ class sql_result { } private void waitfordescribe() { - Thread.MutexKey lock = _ddescribemux->lock(); - if (!datarowtypes) - PT(_ddescribe->wait(lock)); - lock = 0; + { + Thread.MutexKey lock = _ddescribemux->lock(); + if (!datarowtypes) + PT(_ddescribe->wait(lock)); + } if (this) // If object already destructed, skip the next call trydelayederror(); // since you cannot call functions anymore else @@ -887,7 +889,6 @@ class sql_result { datarowdesc = drowdesc; datarowtypes = drowtypes; _ddescribe->broadcast(); - lock = 0; } final void _preparebind(array dtoid) { @@ -1073,8 +1074,7 @@ class sql_result { execfetchlimit->match(_query)) && _fetchlimit, bindbuffer); } - } else - lock = 0; + } } final void _processrowdesc(array(mapping(string:mixed)) datarowdesc, @@ -1087,16 +1087,18 @@ class sql_result { } final void _parseportal() { - Thread.MutexKey lock = closemux->lock(); - _state = PARSING; - Thread.MutexKey lockc = pgsqlsess->shortmux->lock(); - if (syncparse || syncparse < 0 && pgsqlsess->wasparallelisable) { - PD("Commit waiting for statements to finish\n"); - catch(PT(pgsqlsess->statementsinflight->wait_till_drained(lockc))); + { + Thread.MutexKey lock = closemux->lock(); + _state = PARSING; + { + Thread.MutexKey lockc = pgsqlsess->shortmux->lock(); + if (syncparse || syncparse < 0 && pgsqlsess->wasparallelisable) { + PD("Commit waiting for statements to finish\n"); + catch(PT(pgsqlsess->statementsinflight->wait_till_drained(lockc))); + } + stmtifkey = pgsqlsess->statementsinflight->acquire(); + } } - stmtifkey = pgsqlsess->statementsinflight->acquire(); - lockc = 0; - lock = 0; statuscmdcomplete = 0; pgsqlsess->wasparallelisable = paralleliseprefix->match(_query); } @@ -1109,34 +1111,33 @@ class sql_result { _state = COMMITTED; stmtifkey = 0; } - lock = 0; } final void _bindportal() { Thread.MutexKey lock = closemux->lock(); _state = BOUND; portalsifkey = pgsqlsess->portalsinflight->acquire(); - lock = 0; } final void _purgeportal() { PD("Purge portal\n"); datarows->write(1); // Signal EOF - Thread.MutexKey lock = closemux->lock(); - _fetchlimit = 0; // disables further Executes - switch (_state) { - case COPYINPROGRESS: - case COMMITTED: - case BOUND: - portalsifkey = 0; - } - switch (_state) { - case BOUND: - case PARSING: - stmtifkey = 0; + { + Thread.MutexKey lock = closemux->lock(); + _fetchlimit = 0; // disables further Executes + switch (_state) { + case COPYINPROGRESS: + case COMMITTED: + case BOUND: + portalsifkey = 0; + } + switch (_state) { + case BOUND: + case PARSING: + stmtifkey = 0; + } + _state = PURGED; } - _state = PURGED; - lock = 0; releaseconditions(); } @@ -1183,7 +1184,6 @@ class sql_result { pgsqlsess->pportalcount = 0; } } - lock = 0; return retval; } @@ -1206,7 +1206,6 @@ class sql_result { else if (!_fetchlimit) PD("<%O _fetchlimit %d, inflight %d, skip execute\n", _portalname, _fetchlimit, inflight); - lock = 0; } } @@ -1442,7 +1441,7 @@ class proxy { switch (type) { case 'O': res = sprintf(DRIVERNAME".proxy(%s@%s:%d/%s,%d,%d)", - user, host, port, database, c?->socket && c->socket->query_fd(), + user, host, port, database, c && c->socket && c->socket->query_fd(), backendpid); break; } @@ -1486,10 +1485,7 @@ class proxy { } final int is_open() { - catch { - return c->socket->is_open(); - }; - return 0; + return c && c->socket && c->socket->is_open(); } final string geterror(void|int clear) { @@ -1657,10 +1653,8 @@ class proxy { throw(MAGICTERMINATE); // Force proper termination } cr->procmsg = 1; - lock = 0; return; // Terminate thread, wait for callback } - lock = 0; } int msgtype = cr->read_int8(); if (!portal) { @@ -2295,17 +2289,19 @@ class proxy { final void close() { throwdelayederror(this); - Thread.MutexKey lock; - if (qportals && qportals->size()) - catch(cancelquery()); - if (unnamedstatement) - termlock = unnamedstatement->lock(1); - if (c) // Prevent trivial backtraces - c->close(); - if (unnamedstatement) - lock = unnamedstatement->lock(1); - c->purge(); - lock = 0; + { + Thread.MutexKey lock; + if (qportals && qportals->size()) + catch(cancelquery()); + if (unnamedstatement) + termlock = unnamedstatement->lock(1); + if (c) // Prevent trivial backtraces + c->close(); + if (unnamedstatement) + lock = unnamedstatement->lock(1); + if (c) + c->purge(); + } destruct(waitforauthready); } -- GitLab