From 4b46bbfa6c2cf7eb2c7ca47baca2d4297e556af8 Mon Sep 17 00:00:00 2001 From: "Stephen R. van den Berg" <srb@cuci.nl> Date: Tue, 22 May 2018 08:50:00 +0200 Subject: [PATCH] pgsql: Disentangle concurrent stash flushes explicitly. - Solves the last remaining one-deadlock-per-month problem. - Speeds up the critical path with regard to stash-flushing. --- lib/modules/Sql.pmod/pgsql_util.pmod | 91 +++++++++++++++------------- 1 file changed, 50 insertions(+), 41 deletions(-) diff --git a/lib/modules/Sql.pmod/pgsql_util.pmod b/lib/modules/Sql.pmod/pgsql_util.pmod index ae53380246..50aba734ea 100644 --- a/lib/modules/Sql.pmod/pgsql_util.pmod +++ b/lib/modules/Sql.pmod/pgsql_util.pmod @@ -320,6 +320,7 @@ class bufcon { realbuffer->socket->query_fd(), mode, realbuffer->stashflushmode); if (mode > realbuffer->stashflushmode) realbuffer->stashflushmode = mode; + lock = 0; dirty = 0; this->clear(); if (lock = realbuffer->nostash->trylock(1)) { @@ -475,10 +476,7 @@ class conxion { #endif started = lock; lock = 0; // Force release before acquiring next - lock = shortmux->lock(); - mode = getstash(KEEP); - lock = 0; - if (mode > KEEP) + if (sizeof(stash) && getstash(KEEP) > KEEP) sendcmd(mode); // Force out stash to the server #ifdef PG_DEBUGRACE return sess; @@ -501,19 +499,18 @@ class conxion { } private int getstash(int mode) { - if (sizeof(stash)) { - add(stash); stash->clear(); - foreach (stashqueue->try_read_array(); ; int|sql_result portal) - if (intp(portal)) - qportals->write(synctransact++); - else - queueup(portal); - PD("%d>Got stash mode %d > %d\n", - socket->query_fd(), stashflushmode, mode); - if (stashflushmode > mode) - mode = stashflushmode; - stashflushmode = KEEP; - } + Thread.MutexKey lock = shortmux->lock(); + add(stash); stash->clear(); + foreach (stashqueue->try_read_array(); ; int|sql_result portal) + if (intp(portal)) + qportals->write(synctransact++); + else + queueup(portal); + PD("%d>Got stash mode %d > %d\n", + socket->query_fd(), stashflushmode, mode); + if (stashflushmode > mode) + mode = stashflushmode; + stashflushmode = KEEP; return mode; } @@ -538,34 +535,46 @@ unfinalised: } qportals->write(synctransact++); } while (0); - Thread.MutexKey lock = shortmux->lock(); - mode = getstash(mode); + if (sizeof(stash)) + mode = getstash(mode); + for(;;) { #ifdef PG_DEBUG - mixed err = + mixed err = #endif - catch { + catch { outer: - do { - switch (mode) { - default: - PD("%d>Skip flush %d Queue %O\n", - socket->query_fd(), mode, (string)this); - break outer; - case FLUSHSEND: - PD("Flush\n"); - add(PGFLUSH); - case SENDOUT:; - } - if (towrite = sizeof(this)) { - PD("%d>Sendcmd %O\n", - socket->query_fd(), ((string)this)[..towrite-1]); - towrite -= output_to(socket, towrite); + do { + switch (mode) { + default: + PD("%d>Skip flush %d Queue %O\n", + socket->query_fd(), mode, (string)this); + break outer; + case FLUSHSEND: + PD("Flush\n"); + add(PGFLUSH); + case SENDOUT:; + } + if (towrite = sizeof(this)) { + PD("%d>Sendcmd %O\n", + socket->query_fd(), ((string)this)[..towrite-1]); + towrite -= output_to(socket, towrite); + } + } while (0); + started = 0; + if (sizeof(stash) && (started = nostash->trylock(1))) { +#ifdef PG_DEBUGRACE + conxsess sess = conxsess(this); + sess->sendcmd(SENDOUT); +#else + mode = getstash(SENDOUT); + continue; +#endif } - } while (0); - started = 0; - return; - }; - lock = 0; + return; + }; + break; + } + started = 0; PD("Sendcmd failed %s\n", describe_backtrace(err)); destruct(this); } -- GitLab