From c6d49707c58b2025ff2bcdc49c718911b2c9f735 Mon Sep 17 00:00:00 2001
From: "Stephen R. van den Berg" <srb@cuci.nl>
Date: Tue, 8 May 2018 17:09:24 +0200
Subject: [PATCH] pgsql: Eliminate rare deadlock on heavy interleaved queries
 tuned.

---
 lib/modules/Sql.pmod/pgsql.h         |   3 +
 lib/modules/Sql.pmod/pgsql_util.pmod | 160 ++++++++++++++++++---------
 lib/modules/Thread.pmod              |  15 ++-
 3 files changed, 121 insertions(+), 57 deletions(-)

diff --git a/lib/modules/Sql.pmod/pgsql.h b/lib/modules/Sql.pmod/pgsql.h
index 77d3759feb..95f2b662d9 100644
--- a/lib/modules/Sql.pmod/pgsql.h
+++ b/lib/modules/Sql.pmod/pgsql.h
@@ -10,6 +10,9 @@
 //#define PG_DEBUGRACE	1
 
 //#define PG_STATS	1	    // Collect extra usage statistics
+#define PG_DEADLOCK_SENTINEL 0	    // If >0, defines the number seconds
+				    // a lock can be held before the deadlock
+				    // report is being dumped to stderr
 
 #define FETCHLIMIT	     1024   // Initial upper limit on the
 				    // number of rows to fetch across the
diff --git a/lib/modules/Sql.pmod/pgsql_util.pmod b/lib/modules/Sql.pmod/pgsql_util.pmod
index 49e0d80648..6a462f4b59 100644
--- a/lib/modules/Sql.pmod/pgsql_util.pmod
+++ b/lib/modules/Sql.pmod/pgsql_util.pmod
@@ -33,10 +33,56 @@
 
 #define LOSTERROR	"Database connection lost"
 
+#if PG_DEADLOCK_SENTINEL
+private multiset mutexes = set_weak_flag((<>), Pike.WEAK);
+private int deadlockseq;
+
+private class MUTEX {
+  inherit Thread.Mutex;
+
+  private Thread.Thread sentinelthread;
+
+  private void dump_lockgraph(Thread.Thread curthread) {
+    sleep(PG_DEADLOCK_SENTINEL);
+    String.Buffer buf = String.Buffer();
+    buf.add("\n{{{{{{{{{{{{{{{{{{{{{{{{{{{{{{{{{{{{{{{{{{{{{{{{{{{{{{{{{\n");
+    buf.sprintf("Deadlock detected at\n%s\n",
+     describe_backtrace(curthread.backtrace(), -1));
+    foreach(mutexes; this_program mu; )
+      if (mu) {
+        Thread.Thread ct;
+        if (ct = mu.current_locking_thread())
+        buf.sprintf("====================================== Mutex: %O\n%s\n",
+         mu, describe_backtrace(ct.backtrace(), -1));
+      }
+    buf.add("}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}");
+    werror(replace(buf.get(), "\n", sprintf("\n%d> ", deadlockseq++)) + "\n");
+  }
+
+  Thread.MutexKey lock(void|int type) {
+    Thread.MutexKey key;
+    if (!(key = trylock(type))) {
+      sentinelthread = Thread.Thread(dump_lockgraph, this_thread());
+      key = ::lock(type);
+      sentinelthread->kill();
+    }
+    return key;
+  }
+
+  protected void create() {
+    //::create();
+    mutexes[this] = 1;
+  }
+};
+#else
+#define MUTEX			Thread.Mutex
+#endif
+
 //! The instance of the pgsql dedicated backend.
 final Pike.Backend local_backend;
 
 private Pike.Backend cb_backend;
+private sql_result qalreadyprinted;
 private Thread.Mutex backendmux = Thread.Mutex();
 private Thread.ResourceCount clientsregistered = Thread.ResourceCount();
 
@@ -295,7 +341,7 @@ class conxiin {
   inherit Stdio.Buffer:i;
 
   final Thread.Condition fillread;
-  final Thread.Mutex fillreadmux;
+  final MUTEX fillreadmux;
   final int procmsg;
   private int didreadcb;
 
@@ -330,7 +376,7 @@ class conxiin {
 
   private void create() {
     i::create();
-    fillreadmux = Thread.Mutex();
+    fillreadmux = MUTEX();
     fillread = Thread.Condition();
   }
 };
@@ -347,7 +393,7 @@ class conxion {
   final conxiin i;
 
   private Thread.Queue qportals;
-  final Thread.Mutex shortmux;
+  final MUTEX shortmux;
   private int closenext;
 
   final sfile
@@ -358,7 +404,7 @@ class conxion {
   private int towrite;
   final multiset(sql_result) runningportals = (<>);
 
-  final Thread.Mutex nostash;
+  final MUTEX nostash;
   final Thread.MutexKey started;
   final Thread.Queue stashqueue;
   final Thread.Condition stashavail;
@@ -389,14 +435,19 @@ class conxion {
       PD("Nostash locked by %s\n",
        describe_backtrace(nostash->current_locking_thread()->backtrace()));
 #endif
-    if (lock = (waitforreal ? nostash->lock : nostash->trylock)(1)) {
+    while (lock = (waitforreal ? nostash->lock : nostash->trylock)(1)) {
       int mode;
+      if (!stashcount->drained()) {
+        lock = 0;			// Force release before acquiring next
+        stashcount->wait_till_drained();
+        continue;
+      }
 #ifdef PG_DEBUGRACE
       conxsess sess = conxsess(this);
 #endif
       started = lock;
+      lock = 0;				// Force release before acquiring next
       lock = shortmux->lock();
-      stashcount->wait_till_drained(lock);
       mode = getstash(KEEP);
       lock = 0;
       if (mode > KEEP)
@@ -439,7 +490,6 @@ class conxion {
   }
 
   final void sendcmd(void|int mode, void|sql_result portal) {
-    Thread.MutexKey lock;
     if (portal)
       queueup(portal);
 unfinalised:
@@ -460,7 +510,7 @@ unfinalised:
       }
       qportals->write(synctransact++);
     } while (0);
-    lock = shortmux->lock();
+    Thread.MutexKey lock = shortmux->lock();
     mode = getstash(mode);
 #ifdef PG_DEBUG
     mixed err =
@@ -624,8 +674,8 @@ outer:
     synctransact = 1;
     socket = sfile();
     i = conxiin();
-    shortmux = Thread.Mutex();
-    nostash = Thread.Mutex();
+    shortmux = MUTEX();
+    nostash = MUTEX();
     closenext = 0;
     stashavail = Thread.Condition();
     stashqueue = Thread.Queue();
@@ -688,7 +738,7 @@ class sql_result {
   private int index;
   private int inflight;
   int portalbuffersize;
-  private Thread.Mutex closemux;
+  private MUTEX closemux;
   private Thread.Queue datarows;
   private Thread.ResourceCountKey stmtifkey, portalsifkey;
   private array(mapping(string:mixed)) datarowdesc;
@@ -697,7 +747,7 @@ class sql_result {
   private int bytesreceived;
   final int _synctransact;
   final Thread.Condition _ddescribe;
-  final Thread.Mutex _ddescribemux;
+  final MUTEX _ddescribemux;
   final Thread.MutexKey _unnamedportalkey, _unnamedstatementkey;
   final array _params;
   final string _query;
@@ -718,10 +768,11 @@ class sql_result {
                     "fd: %O portalname: %O  datarows: %d"
                     "  synctransact: %d laststatus: %s\n",
                     _state, index, eoffound, inflight,
-                    _query, fd, _portalname,
+                    qalreadyprinted == this ? "..." : _query, fd, _portalname,
                     datarowtypes && sizeof(datarowtypes), _synctransact,
                     statuscmdcomplete
                     || (_unnamedstatementkey ? "*parsing*" : ""));
+        qalreadyprinted = this;
         break;
     }
     return res;
@@ -738,8 +789,8 @@ class sql_result {
     _query = query;
     datarows = Thread.Queue();
     _ddescribe = Thread.Condition();
-    _ddescribemux = Thread.Mutex();
-    closemux = Thread.Mutex();
+    _ddescribemux = MUTEX();
+    closemux = MUTEX();
     portalbuffersize = _portalbuffersize;
     alltext = !alltyped;
     _params = params;
@@ -765,8 +816,12 @@ class sql_result {
         waitfordescribe();
       {
         Thread.MutexKey lock = closemux->lock();
-        if (_fetchlimit)
-          _sendexecute(_fetchlimit = 0);
+        if (_fetchlimit) {
+          array reflock = ({ lock });
+          lock = 0;
+          _sendexecute(_fetchlimit = 0, reflock);
+        } else
+          lock = 0;			// Force release before acquiring next
         lock = _ddescribemux->lock();
         if (!statuscmdcomplete)
           PT(_ddescribe->wait(lock));
@@ -1214,15 +1269,15 @@ class sql_result {
         lock = _unnamedstatementkey = 0;
       else {
         plugbuffer->add_int16(sizeof(datarowtypes));
-        if (sizeof(datarowtypes))
+        if (sizeof(datarowtypes)) {
           plugbuffer->add_ints(map(datarowtypes, readoidformat), 2);
-        else if (syncparse < 0 && !pgsqlsess->wasparallelisable
+          lock = 0;
+        } else if (syncparse < 0 && !pgsqlsess->wasparallelisable
          && !pgsqlsess->statementsinflight->drained(1)) {
-          lock = pgsqlsess->shortmux->lock();
+          lock = 0;
           PD("Commit waiting for statements to finish\n");
-          catch(PT(pgsqlsess->statementsinflight->wait_till_drained(lock, 1)));
+          catch(PT(pgsqlsess->statementsinflight->wait_till_drained(1)));
         }
-        lock = 0;
         PD("Bind portal %O statement %O\n", _portalname, _preparedname);
         _fetchlimit = pgsqlsess->_fetchlimit;
         _bindportal();
@@ -1254,23 +1309,18 @@ class sql_result {
     {
       Thread.MutexKey lock = closemux->lock();
       _state = PARSING;
-      {
-        Thread.MutexKey lockc = pgsqlsess->shortmux->lock();
-        if (syncparse || syncparse < 0 && pgsqlsess->wasparallelisable) {
-          PD("Commit waiting for statements to finish\n");
-          catch(PT(pgsqlsess->statementsinflight->wait_till_drained(lockc)));
-        }
-        stmtifkey = pgsqlsess->statementsinflight->acquire();
+      if (syncparse || syncparse < 0 && pgsqlsess->wasparallelisable) {
+        PD("Commit waiting for statements to finish\n");
+        catch(PT(pgsqlsess->statementsinflight->wait_till_drained()));
       }
+      stmtifkey = pgsqlsess->statementsinflight->acquire();
     }
     statuscmdcomplete = 0;
     pgsqlsess->wasparallelisable = paralleliseprefix->match(_query);
   }
 
-  final void _releasestatement(void|int nolock) {
-    Thread.MutexKey lock;
-    if (!nolock)
-      lock = closemux->lock();
+  final void _releasestatement() {
+    Thread.MutexKey lock = closemux->lock(2);
     if (_state <= BOUND) {
       _state = COMMITTED;
       stmtifkey = 0;
@@ -1305,16 +1355,15 @@ class sql_result {
     releaseconditions();
   }
 
-  final int _closeportal(conxsess cs) {
+  final int _closeportal(conxsess cs, array(Thread.MutexKey) reflock) {
     void|bufcon|conxsess plugbuffer = CHAIN(cs);
     int retval = KEEP;
     PD("%O Try Closeportal %d\n", _portalname, _state);
-    Thread.MutexKey lock = closemux->lock(2);	   // When called from _sendexecute(), it is already locked
     _fetchlimit = 0;				   // disables further Executes
     switch (_state) {
       case PARSING:
       case BOUND:
-        _releasestatement(1);
+        _releasestatement();
     }
     switch (_state) {
       case PORTALINIT:
@@ -1328,7 +1377,8 @@ class sql_result {
       case COMMITTED:
       case BOUND:
         _state = CLOSING;
-        lock = 0;
+        if (reflock)
+          reflock[0] = 0;
         PD("Close portal %O\n", _portalname);
         if (_portalname && sizeof(_portalname)) {
           plugbuffer->add_int8('C')
@@ -1352,7 +1402,7 @@ class sql_result {
   }
 
   private void replenishrows() {
-    if (_fetchlimit && sizeof(datarows) <= _fetchlimit >> 1) {
+   if (_fetchlimit && sizeof(datarows) <= _fetchlimit >> 1) {
       Thread.MutexKey lock = closemux->lock();
       if (_fetchlimit) {
         _fetchlimit = pgsqlsess._fetchlimit;
@@ -1360,9 +1410,11 @@ class sql_result {
           _fetchlimit =
            min((portalbuffersize >> 1) * index / bytesreceived || 1, _fetchlimit);
         if (_fetchlimit)
-          if (inflight <= (_fetchlimit - 1) >> 1)
-            _sendexecute(_fetchlimit);
-          else
+          if (inflight <= (_fetchlimit - 1) >> 1) {
+            array reflock = ({ lock });
+            lock = 0;
+            _sendexecute(_fetchlimit, reflock);
+          } else
             PD("<%O _fetchlimit %d, inflight %d, skip execute\n",
              _portalname, _fetchlimit, inflight);
       }
@@ -1403,8 +1455,10 @@ class sql_result {
     }
     inflight = 0;
     conxsess plugbuffer;
+    array(Thread.MutexKey) reflock = ({closemux->lock()});
     if (!catch(plugbuffer = c->start()))
-      plugbuffer->sendcmd(_closeportal(plugbuffer));
+      plugbuffer->sendcmd(_closeportal(plugbuffer, reflock));
+    reflock = 0;
     if (_state < CLOSED)
       _state = CLOSED;
     datarows->write(1);				// Signal EOF
@@ -1417,18 +1471,22 @@ class sql_result {
     };
   }
 
-  final void _sendexecute(int fetchlimit, void|bufcon|conxsess plugbuffer) {
+  final void _sendexecute(int fetchlimit,
+   void|array(Thread.MutexKey)|bufcon|conxsess plugbuffer) {
     int flushmode;
+    array(Thread.MutexKey) reflock;
     PD("Execute portal %O fetchlimit %d transtype %d\n", _portalname,
      fetchlimit, transtype);
-    if (!plugbuffer)
+    if (arrayp(plugbuffer)) {
+      reflock = plugbuffer;
       plugbuffer = c->start(1);
+    }
     CHAIN(plugbuffer)->add_int8('E')->add_hstring(({_portalname, 0}), 4, 8)
      ->add_int32(fetchlimit);
     if (!fetchlimit) {
       if (transtype != NOTRANS)
         pgsqlsess.intransaction = transtype == TRANSBEGIN;
-      flushmode = _closeportal(plugbuffer) == SYNCSEND
+      flushmode = _closeportal(plugbuffer, reflock) == SYNCSEND
        || transtype == TRANSEND ? SYNCSEND : FLUSHSEND;
     } else
       inflight += fetchlimit, flushmode = FLUSHSEND;
@@ -1584,8 +1642,8 @@ class sql_result {
 
 class proxy {
   final int _fetchlimit = FETCHLIMIT;
-  final Thread.Mutex unnamedportalmux;
-  final Thread.Mutex unnamedstatement;
+  final MUTEX unnamedportalmux;
+  final MUTEX unnamedstatement;
   private Thread.MutexKey termlock;
   private Thread.ResourceCountKey backendreg;
   final Thread.ResourceCount portalsinflight, statementsinflight;
@@ -1619,7 +1677,7 @@ class proxy {
   private string database, user, pass;
   private Crypto.Hash.SCRAM SASLcontext;
   final Thread.Condition waitforauthready;
-  final Thread.Mutex shortmux;
+  final MUTEX shortmux;
   final int readyforquerycount;
 
   private string _sprintf(int type) {
@@ -1653,7 +1711,7 @@ class proxy {
     if (!port)
       port = PGSQL_DEFAULT_PORT;
     backendreg = register_backend();
-    shortmux = Thread.Mutex();
+    shortmux = MUTEX();
     PD("Connect\n");
     waitforauthready = Thread.Condition();
     qportals = Thread.Queue();
@@ -1662,8 +1720,8 @@ class proxy {
     if (!(c = conxion(this, qportals, 0)))
       error("Couldn't connect to database on %s:%d\n", host, port);
     runtimeparameter = ([]);
-    unnamedportalmux = Thread.Mutex();
-    unnamedstatement = Thread.Mutex();
+    unnamedportalmux = MUTEX();
+    unnamedstatement = MUTEX();
     readyforquery_cb = connect_cb;
     portalsinflight = Thread.ResourceCount();
     statementsinflight = Thread.ResourceCount();
diff --git a/lib/modules/Thread.pmod b/lib/modules/Thread.pmod
index 40f4dcf7f1..532be317ba 100644
--- a/lib/modules/Thread.pmod
+++ b/lib/modules/Thread.pmod
@@ -864,6 +864,7 @@ optional class ResourceCountKey {
 
   /*semi*/private void destroy() {
     --parent->_count;
+    MutexKey lock = parent->_mutex->lock();
     parent->_cond->signal();
   }
 }
@@ -877,6 +878,7 @@ optional class ResourceCountKey {
 optional class ResourceCount {
   /*semi*/final int _count;
   /*semi*/final Condition _cond = Condition();
+  /*semi*/final Mutex _mutex = Mutex();
 
   //! @param level
   //!   The maximum level that is considered drained.
@@ -889,15 +891,16 @@ optional class ResourceCount {
 
   //! Blocks until the resource-counter dips to max @ref{level@}.
   //!
-  //! @param lock
-  //!   A previously acquired @[MutexKey].
-  //!
   //! @param level
   //!   The maximum level that is considered drained.
-  /*semi*/final void wait_till_drained(MutexKey lock, void|int level) {
+  variant /*semi*/final void wait_till_drained(void|int level) {
     while (_count > level)		// Recheck before allowing further
-      _cond->wait(lock);
-    lock = 0;				// Eliminate references
+      _cond->wait(_mutex->lock());
+  }
+
+  // FIXME deprecated
+  variant final /*__deprecated__*/ void wait_till_drained(MutexKey lock, void|int level) {
+    wait_till_drained(level);
   }
 
   //! Increments the resource-counter.
-- 
GitLab