From 8675b0e6985e13baef20a5aa1a952588535a7e90 Mon Sep 17 00:00:00 2001
From: "Stephen R. van den Berg" <srb@cuci.nl>
Date: Sat, 3 Jun 2017 12:42:45 +0200
Subject: [PATCH] pgsql: Add instrumentation to aid in diagnosing
 raceconditions.

---
 lib/modules/Sql.pmod/pgsql.h         |  8 +++
 lib/modules/Sql.pmod/pgsql.pike      | 44 +++++++++-----
 lib/modules/Sql.pmod/pgsql_util.pmod | 89 ++++++++++++++++++++++------
 3 files changed, 108 insertions(+), 33 deletions(-)

diff --git a/lib/modules/Sql.pmod/pgsql.h b/lib/modules/Sql.pmod/pgsql.h
index b8056f277e..a31a2770e4 100644
--- a/lib/modules/Sql.pmod/pgsql.h
+++ b/lib/modules/Sql.pmod/pgsql.h
@@ -7,6 +7,7 @@
 
 //#define PG_DEBUG  1
 //#define PG_DEBUGMORE  1
+//#define PG_DEBUGRACE	1
 
 //#define PG_STATS	1	    // Collect extra usage statistics
 
@@ -79,6 +80,13 @@
 #define PT(X ...)	     (X)
 #endif
 
+#ifdef PG_DEBUGRACE
+#define CHAIN(x)	((x)->chain)
+#else
+#define CHAIN(x)	(x)
+#define conxsess	conxion
+#endif
+
 #define PORTALINIT	0		// Portal states
 #define BOUND		1
 #define COPYINPROGRESS	2
diff --git a/lib/modules/Sql.pmod/pgsql.pike b/lib/modules/Sql.pmod/pgsql.pike
index 3a1da1cce3..db250c5290 100644
--- a/lib/modules/Sql.pmod/pgsql.pike
+++ b/lib/modules/Sql.pmod/pgsql.pike
@@ -321,7 +321,7 @@ private .pgsql_util.conxion getsocket(void|int nossl) {
   PD("Closetrace %O\n",backtrace());
 #endif
   if(c) {
-    .pgsql_util.conxion plugbuffer;
+    .pgsql_util.conxsess plugbuffer;
     if(!catch(plugbuffer=c->start(1))) {
       foreach(qportals->peek_array();;int|.pgsql_util.sql_result portal)
         if(objectp(portal))
@@ -612,7 +612,8 @@ final void _processloop(.pgsql_util.conxion ci) {
       plugbuffer->add(name,0,(string)value,0);
     plugbuffer->add_int8(0);
     PD("%O\n",(string)plugbuffer);
-    if(catch(ci->start()->add_hstring(plugbuffer,4,4)->sendcmd(SENDOUT))) {
+    object cs;
+    if (catch(cs = ci->start())) {
       if(_options.reconnect)
         _connectfail();
       else
@@ -620,6 +621,9 @@ final void _processloop(.pgsql_util.conxion ci) {
       unnamedstatement=0;
       termlock=0;
       return;
+    } else {
+      CHAIN(cs)->add_hstring(plugbuffer, 4, 4);
+      cs->sendcmd(SENDOUT);
     }
   }		      // Do not flush at this point, PostgreSQL 9.4 disapproves
   procmessage();
@@ -777,9 +781,11 @@ private void procmessage() {
           }
           switch(errtype) {
             case NOERROR:
-              if(cancelsecret!="")
-                ci->start()->add_int8('p')->add_hstring(({sendpass,0}),4,4)
-                 ->sendcmd(SENDOUT);
+              if(cancelsecret!="") {
+                object cs = ci->start();
+                CHAIN(cs)->add_int8('p')->add_hstring(({sendpass, 0}), 4, 4);
+                cs->sendcmd(SENDOUT);
+              }
               break;	// No flushing here, PostgreSQL 9.4 disapproves
             default:
             case PROTOCOLUNSUPPORTED:
@@ -1159,7 +1165,9 @@ private void procmessage() {
       }
     };				// We only get here if there is an error
     if(err==MAGICTERMINATE) {	// Announce connection termination to server
-      ci->start()->add("X\0\0\0\4")->sendcmd(SENDOUT);
+      object cs = ci->start();
+      CHAIN(cs)->add("X\0\0\0\4");
+      cs->sendcmd(SENDOUT);
       terminating=1;
       err=0;
     } else if(stringp(err)) {
@@ -1269,7 +1277,8 @@ private int reconnect() {
 #ifdef PG_STATS
     prepstmtused=0;
 #endif
-    termlock=unnamedstatement->lock(1);
+    if (unnamedstatement)
+      termlock = unnamedstatement->lock(1);
     catch(c->close());
     unnamedstatement = 0;
     termlock = 0;
@@ -1708,7 +1717,7 @@ final string status_commit() {
 }
 
 private inline void closestatement(
-  .pgsql_util.conxion|.pgsql_util.bufcon plugbuffer,string oldprep) {
+  .pgsql_util.bufcon|.pgsql_util.conxsess plugbuffer,string oldprep) {
   .pgsql_util.closestatement(plugbuffer,oldprep);
 }
 
@@ -1891,8 +1900,8 @@ private inline void throwdelayederror(object parent) {
 	m_delete(np,"preparedname");
       }
     }
-    if(sizeof(plugbuffer)) {
-      PD("%O\n",(string)plugbuffer);
+    if(sizeof(CHAIN(plugbuffer))) {
+      PD("%O\n",(string)CHAIN(plugbuffer));
       plugbuffer->sendcmd(FLUSHSEND);			      // close expireds
     } else
       plugbuffer->sendcmd(KEEP);			       // close start()
@@ -1913,8 +1922,9 @@ private inline void throwdelayederror(object parent) {
     portal->_openportal();
     _readyforquerycount++;
     Thread.MutexKey lock=unnamedstatement->lock(1);
-    c->start(1)->add_int8('Q')->add_hstring(({q,0}),4,4)
-     ->sendcmd(FLUSHLOGSEND,portal);
+    .pgsql_util.conxsess cs = c->start(1);
+    CHAIN(cs)->add_int8('Q')->add_hstring(({q, 0}), 4, 4);
+    cs->sendcmd(FLUSHLOGSEND,portal);
     lock=0;
     PD("Simple query: %O\n",q);
   } else {
@@ -1925,7 +1935,8 @@ private inline void throwdelayederror(object parent) {
           (portal._unnamedstatementkey = unnamedstatement->trylock(1))
            ? "" : PTSTMTPREFIX+int2hex(ptstmtcount++);
       PD("Parse statement %O=%O\n",preparedname,q);
-      plugbuffer=c->start()->add_int8('P')
+      plugbuffer = c->start();
+      CHAIN(plugbuffer)->add_int8('P')
        ->add_hstring(({preparedname,0,q,"\0\0\0"}),4,4)
 #if 0
       // Even though the protocol doesn't require the Parse command to be
@@ -1940,8 +1951,11 @@ private inline void throwdelayederror(object parent) {
     portal._preparedname=preparedname;
     if(!tp || !tp.datatypeoid) {
       PD("Describe statement %O\n",preparedname);
-      (plugbuffer||c->start())->add_int8('D')
-       ->add_hstring(({'S',preparedname,0}),4,4)->sendcmd(FLUSHSEND,portal);
+      if (!plugbuffer)
+        plugbuffer = c->start();
+      CHAIN(plugbuffer)->add_int8('D')
+       ->add_hstring(({'S', preparedname, 0}), 4, 4);
+      plugbuffer->sendcmd(FLUSHSEND,portal);
     } else {
       if(plugbuffer)
         plugbuffer->sendcmd(KEEP);
diff --git a/lib/modules/Sql.pmod/pgsql_util.pmod b/lib/modules/Sql.pmod/pgsql_util.pmod
index d828c6ac77..115623a50a 100644
--- a/lib/modules/Sql.pmod/pgsql_util.pmod
+++ b/lib/modules/Sql.pmod/pgsql_util.pmod
@@ -65,10 +65,10 @@ private Regexp iregexp(string expr) {
   return Regexp(ret->read());
 }
 
-final void closestatement(bufcon|conxion plugbuffer,string oldprep) {
+final void closestatement(bufcon|conxsess plugbuffer,string oldprep) {
   if(oldprep) {
     PD("Close statement %s\n",oldprep);
-    plugbuffer->add_int8('C')->add_hstring(({'S',oldprep,0}),4,4);
+    CHAIN(plugbuffer)->add_int8('C')->add_hstring(({'S', oldprep, 0}), 4, 4);
   }
 }
 
@@ -150,6 +150,12 @@ private inline mixed callout(function(mixed ...:void) f,
 class bufcon {
   inherit Stdio.Buffer;
 
+#ifdef PG_DEBUGRACE
+  final bufcon `chain() {
+    return this;
+  }
+#endif
+
   private conxion realbuffer;
 
   protected void create(conxion _realbuffer) {
@@ -180,11 +186,18 @@ class bufcon {
     lock=0;
     this->clear();
     if(lock=realbuffer->nostash->trylock(1)) {
-      realbuffer->started=lock; lock=0;
+#ifdef PG_DEBUGRACE
+      conxsess sess = conxsess(realbuffer);
+      realbuffer->started = lock;
+      lock = 0;
+      sess->sendcmd(SENDOUT);
+#else
+      realbuffer->started = lock;
+      lock = 0;
       realbuffer->sendcmd(SENDOUT);
+#endif
     }
   }
-
 };
 
 class conxiin {
@@ -254,6 +267,9 @@ class conxion {
   final int stashflushmode;
   final int stashcount;
   final int synctransact;
+#ifdef PG_DEBUGRACE
+  final mixed nostrack;
+#endif
 #ifdef PG_DEBUG
   final int queueoutidx;
   final int queueinidx=-1;
@@ -265,10 +281,13 @@ class conxion {
      portal._portalname,++queueoutidx,synctransact,sizeof(this));
   }
 
-  final conxion|bufcon start(void|int waitforreal) {
+  final bufcon|conxsess start(void|int waitforreal) {
     Thread.MutexKey lock;
     if(lock=(waitforreal?nostash->lock:nostash->trylock)(1)) {
-      started=lock;
+#ifdef PG_DEBUGRACE
+      conxsess sess = conxsess(this);
+#endif
+      started = lock;
       lock=shortmux->lock();
       if(stashcount)
         PT(stashavail.wait(lock));
@@ -276,7 +295,11 @@ class conxion {
       foreach(stashqueue->try_read_array();;sql_result portal)
         queueup(portal);
       lock=0;
+#ifdef PG_DEBUGRACE
+      return sess;
+#else
       return this;
+#endif
     }
     stashcount++;
     return bufcon(this);
@@ -448,9 +471,11 @@ outer:
         if(socket)
           catch(fd=socket->query_fd());
         res=predef::sprintf("conxion  fd: %d input queue: %d/%d "
-                    "queued portals: %d  output queue: %d/%d\n",
+                    "queued portals: %d  output queue: %d/%d\n"
+                    "started: %d\n",
                     fd,sizeof(i),i->_size_object(),
-                    qportals->size(),sizeof(this),_size_object());
+                    qportals->size(),sizeof(this),_size_object(),
+                    !!started);
         break;
     }
     return res;
@@ -472,6 +497,32 @@ outer:
   }
 };
 
+#ifdef PG_DEBUGRACE
+class conxsess {
+  final conxion chain;
+
+  void create(conxion parent) {
+    if (parent->started)
+      werror("Overwriting conxsess %s %s\n",
+        describe_backtrace(({"new ", backtrace()[..<1]})),
+        describe_backtrace(({"old ", parent->nostrack})));
+    parent->nostrack = backtrace();
+    chain = parent;
+  }
+
+  final void sendcmd(int mode,void|sql_result portal) {
+    chain->sendcmd(mode, portal);
+    chain = 0;
+  }
+
+  void destroy() {
+    if (chain)
+      werror("Untransmitted conxsess %s\n",
+       describe_backtrace(({"", backtrace()[..<1]})));
+  }
+};
+#endif
+
 //! The result object returned by @[Sql.pgsql()->big_query()], except for
 //! the noted differences it behaves the same as @[Sql.sql_result].
 //!
@@ -889,11 +940,11 @@ class sql_result {
         PD("Bind portal %O statement %O\n",_portalname,_preparedname);
         _fetchlimit=pgsqlsess->_fetchlimit;
         _openportal();
-        conxion bindbuffer=c->start();
+        conxsess bindbuffer = c->start();
         _unnamedstatementkey=0;
-        bindbuffer->add_int8('B')->add_hstring(plugbuffer,4,4);
+        CHAIN(bindbuffer)->add_int8('B')->add_hstring(plugbuffer, 4, 4);
         if(!_tprepared && sizeof(_preparedname))
-          closestatement(bindbuffer,_preparedname);
+          closestatement(CHAIN(bindbuffer), _preparedname);
         _sendexecute(_fetchlimit
                              && !(cachealways[_query]
                                   || sizeof(_query)>=MINPREPARELENGTH &&
@@ -935,7 +986,8 @@ class sql_result {
     releaseconditions();
   }
 
-  final int _closeportal(bufcon plugbuffer) {
+  final int _closeportal(conxsess cs) {
+    object plugbuffer = CHAIN(cs);
     int retval=KEEP;
     PD("%O Try Closeportal %d\n",_portalname,_state);
     Thread.MutexKey lock=closemux->lock();
@@ -1018,10 +1070,9 @@ class sql_result {
     if(statusccomplete && !statuscmdcomplete)
       statuscmdcomplete=statusccomplete;
     inflight=0;
-    catch {
-      conxion plugbuffer=c->start();
+    conxsess plugbuffer;
+    if (!catch(plugbuffer = c->start()))
       plugbuffer->sendcmd(_closeportal(plugbuffer));
-    };
     _state=CLOSED;
     datarows->write(1);				// Signal EOF
     releaseconditions();
@@ -1033,12 +1084,12 @@ class sql_result {
     };
   }
 
-  final void _sendexecute(int fetchlimit,void|bufcon plugbuffer) {
+  final void _sendexecute(int fetchlimit,void|bufcon|conxsess plugbuffer) {
     int flushmode;
     PD("Execute portal %O fetchlimit %d\n",_portalname,fetchlimit);
     if(!plugbuffer)
       plugbuffer=c->start(1);
-    plugbuffer->add_int8('E')->add_hstring(({_portalname,0}),4,8)
+    CHAIN(plugbuffer)->add_int8('E')->add_hstring(({_portalname,0}), 4, 8)
      ->add_int32(fetchlimit);
     if(!fetchlimit)
       flushmode=_closeportal(plugbuffer)==SYNCSEND?SYNCSEND:FLUSHSEND;
@@ -1116,7 +1167,9 @@ class sql_result {
     trydelayederror();
     if(copydata) {
       PD("CopyData\n");
-      c->start()->add_int8('d')->add_hstring(copydata,4,4)->sendcmd(SENDOUT);
+      object cs = c->start();
+      CHAIN(cs)->add_int8('d')->add_hstring(copydata, 4, 4);
+      cs->sendcmd(SENDOUT);
     } else
       _releasesession();
   }
-- 
GitLab