From c3317ce89f26bde330f331d82a0761c21928c5cb Mon Sep 17 00:00:00 2001
From: "Stephen R. van den Berg" <srb@cuci.nl>
Date: Fri, 1 Nov 2019 09:25:44 +0100
Subject: [PATCH] pgsql: Track back deadlock, explicitly release lock, speeds
 up code.

---
 lib/modules/Sql.pmod/pgsql_util.pmod | 63 ++++++++++++++++++----------
 1 file changed, 41 insertions(+), 22 deletions(-)

diff --git a/lib/modules/Sql.pmod/pgsql_util.pmod b/lib/modules/Sql.pmod/pgsql_util.pmod
index b4d5f4d93f..ff8e03566b 100644
--- a/lib/modules/Sql.pmod/pgsql_util.pmod
+++ b/lib/modules/Sql.pmod/pgsql_util.pmod
@@ -473,7 +473,7 @@ class conxion {
       PD("Nostash locked by %s\n",
        describe_backtrace(nostash->current_locking_thread()->backtrace()));
 #endif
-    while (lock = (waitforreal ? nostash->lock : nostash->trylock)(1)) {
+    while (lock = (waitforreal > 0 ? nostash->lock : nostash->trylock)(1)) {
       int mode;
       if (sizeof(stash) && (mode = getstash(KEEP)) > KEEP)
         sendcmd(mode);		// Force out stash to the server
@@ -492,7 +492,7 @@ class conxion {
       return this;
 #endif
     }
-    return bufcon(this)->start();
+    return !waitforreal && bufcon(this)->start();
   }
 
   private int write_cb() {
@@ -894,9 +894,15 @@ class sql_result {
       {
         Thread.MutexKey lock = closemux->lock();
         if (_fetchlimit) {
-          array reflock = ({ lock });
-          lock = 0;
-          _sendexecute(_fetchlimit = 0, reflock);
+          array reflock = ({ _fetchlimit = 0 });
+          for (;;) {
+            reflock[0] = lock;
+            lock = 0;
+            if (!_sendexecute(0, reflock)) {
+              lock = closemux->lock();
+              continue;
+            }
+          }
         } else
           lock = 0;			// Force release before acquiring next
         lock = _ddescribemux->lock();
@@ -1467,20 +1473,25 @@ class sql_result {
   private void replenishrows() {
    if (_fetchlimit && datarows->size() <= _fetchlimit >> 1
     && _state >= COMMITTED) {
-      Thread.MutexKey lock = closemux->lock();
-      if (_fetchlimit) {
-        _fetchlimit = pgsqlsess._fetchlimit;
-        if (bytesreceived)
-          _fetchlimit =
-           min((portalbuffersize >> 1) * index / bytesreceived || 1, _fetchlimit);
-        if (_fetchlimit)
-          if (inflight <= (_fetchlimit - 1) >> 1) {
-            array reflock = ({ lock });
-            lock = 0;
-            _sendexecute(_fetchlimit, reflock);
-          } else
-            PD("<%O _fetchlimit %d, inflight %d, skip execute\n",
-             _portalname, _fetchlimit, inflight);
+      Thread.MutexKey lock;
+      for (;;) {
+        lock = closemux->lock();
+        if (_fetchlimit) {
+          _fetchlimit = pgsqlsess._fetchlimit;
+          if (bytesreceived)
+            _fetchlimit = min((portalbuffersize >> 1)
+             * index / bytesreceived || 1, _fetchlimit);
+          if (_fetchlimit)
+            if (inflight <= (_fetchlimit - 1) >> 1) {
+              array reflock = ({ lock });
+              lock = 0;
+              if (!_sendexecute(_fetchlimit, reflock))
+                continue;
+            } else
+              PD("<%O _fetchlimit %d, inflight %d, skip execute\n",
+               _portalname, _fetchlimit, inflight);
+        }
+        break;
       }
     }
   }
@@ -1522,7 +1533,7 @@ class sql_result {
     array(Thread.MutexKey) reflock = ({closemux->lock()});
     if (!catch(plugbuffer = c->start()))
       plugbuffer->sendcmd(_closeportal(plugbuffer, reflock));
-    reflock = 0;
+    reflock[0] = 0;
     if (_state < CLOSED) {
       stmtifkey = 0;
       _state = CLOSED;
@@ -1537,7 +1548,7 @@ class sql_result {
     };
   }
 
-  final void _sendexecute(int fetchlimit,
+  final int _sendexecute(int fetchlimit,
    void|array(Thread.MutexKey)|bufcon|conxsess plugbuffer) {
     int flushmode;
     array(Thread.MutexKey) reflock;
@@ -1545,7 +1556,10 @@ class sql_result {
      fetchlimit, transtype);
     if (arrayp(plugbuffer)) {
       reflock = plugbuffer;
-      plugbuffer = c->start(1);
+      if (!(plugbuffer = c->start(-1))) {
+        reflock[0] = 0;
+        return 0;	// Found potential deadlock, release and try again
+      }
     }
     CHAIN(plugbuffer)->add_int8('E')->add_hstring(({_portalname, 0}), 4, 8)
      ->add_int32(fetchlimit);
@@ -1557,6 +1571,9 @@ class sql_result {
     } else
       inflight += fetchlimit, flushmode = FLUSHSEND;
     plugbuffer->sendcmd(flushmode, this);
+    if (reflock)
+      reflock[0] = 0;
+    return 1;
   }
 
   inline private array setuptimeout() {
@@ -1578,6 +1595,8 @@ class sql_result {
   //!  @[eof()], @[send_row()]
   /*semi*/final array(mixed) fetch_row() {
     int|array datarow;
+    if (!this)			// If object already destructed, return fast
+      return 0;
     replenishrows();
     if (arrayp(datarow = datarows->try_read()))
       return datarow;
-- 
GitLab