From c41dcdd39e14b65c4a54bf810325e45e1f0986a9 Mon Sep 17 00:00:00 2001
From: "Stephen R. van den Berg" <srb@cuci.nl>
Date: Tue, 12 May 2020 15:55:42 +0200
Subject: [PATCH] pgsql: Synchronous resync() and fix portalstack for
 text-multiqueries.

---
 lib/modules/Sql.pmod/pgsql.pike      | 39 +++++++++++++++++++++++-----
 lib/modules/Sql.pmod/pgsql_util.pmod | 35 +++++++++++++++----------
 2 files changed, 53 insertions(+), 21 deletions(-)

diff --git a/lib/modules/Sql.pmod/pgsql.pike b/lib/modules/Sql.pmod/pgsql.pike
index 9c09529772..065a91a296 100644
--- a/lib/modules/Sql.pmod/pgsql.pike
+++ b/lib/modules/Sql.pmod/pgsql.pike
@@ -81,6 +81,8 @@ private int timeout = QUERYTIMEOUT;
 private array connparmcache;
 private int reconnected;
 private int lastping = time(1);
+private Thread.Condition resynced;
+private Thread.Mutex resyncmux;
 
 protected string _sprintf(int type) {
   string res;
@@ -291,7 +293,7 @@ protected void create(void|string host, void|string database,
 //!   @url{https://www.postgresql.org/docs/current/static/multibyte.html@}
 /*semi*/final void set_charset(string charset) {
   if(charset)
-    big_query(sprintf("SET CLIENT_ENCODING TO '%s'", quote(charset)));
+    textquery(sprintf("SET CLIENT_ENCODING TO '%s'", quote(charset)));
 }
 
 //! @returns
@@ -499,17 +501,32 @@ protected void destroy() {
   resync();
 }
 
+private void textquery(string q) {
+#if 1
+  foreach (q / ";"; ; string sq)
+    big_query(sq);
+#else				// textqueries and portals do not mix well
+  big_query(q, (["_text":1]));
+#endif
+}
+
+private void resyncdone() {
+  Thread.MutexKey lock = resyncmux->lock();
+  resynced.signal();			// Allow resync() to continue
+}
+
 private void reset_dbsession() {
   proxy.statementsinflight->wait_till_drained();
   error(1);
-  big_query("ROLLBACK");
-  big_query("RESET ALL");
-  big_query("CLOSE ALL");
-  big_query("DISCARD TEMP");
+  textquery("ROLLBACK;RESET ALL;CLOSE ALL;DISCARD TEMP");
+  resyncdone();
 }
 
 private void resync_cb() {
   switch (proxy.backendstatus) {
+    default:
+      resyncdone();
+      break;
     case 'T':case 'E':
       foreach (proxy.prepareds; ; mapping tp) {
         m_delete(tp, "datatypeoid");
@@ -546,8 +563,16 @@ private void resync_cb() {
       PD("Statementsinflight: %d  Portalsinflight: %d\n",
        proxy.statementsinflight, proxy.portalsinflight);
       if(!proxy.waitforauthready) {
+        if (!resynced) {
+          resynced = Thread.Condition();
+          resyncmux = Thread.Mutex();
+        }
         proxy.readyforquery_cb = resync_cb;
         proxy.sendsync();
+        if (proxy.readyforquery_cb) {
+          Thread.MutexKey lock = resyncmux->lock();
+          resynced.wait(lock);	      // Wait for the db to finish
+        }
       }
       return;
     };
@@ -665,7 +690,7 @@ private void resync_cb() {
 //! @seealso
 //!   @[drop_db()]
 /*semi*/final void create_db(string db) {
-  big_query(sprintf("CREATE DATABASE %s", db));
+  textquery(sprintf("CREATE DATABASE %s", db));
 }
 
 //! This function destroys a database and all the data it contains (assuming
@@ -679,7 +704,7 @@ private void resync_cb() {
 //! @seealso
 //!   @[create_db()]
 /*semi*/final void drop_db(string db) {
-  big_query(sprintf("DROP DATABASE %s", db));
+  textquery(sprintf("DROP DATABASE %s", db));
 }
 
 //! @returns
diff --git a/lib/modules/Sql.pmod/pgsql_util.pmod b/lib/modules/Sql.pmod/pgsql_util.pmod
index 9796877dc3..e52286dac9 100644
--- a/lib/modules/Sql.pmod/pgsql_util.pmod
+++ b/lib/modules/Sql.pmod/pgsql_util.pmod
@@ -430,7 +430,6 @@ class conxion {
   final MUTEX nostash;
   final Thread.MutexKey started;
   final Thread.Queue stashqueue;
-  final Thread.Condition stashavail;
   final Stdio.Buffer stash;
   //! @ignore
   final int(KEEP..SYNCSEND) stashflushmode;
@@ -724,7 +723,6 @@ outer:
     shortmux = MUTEX();
     nostash = MUTEX();
     closenext = 0;
-    stashavail = Thread.Condition();
     stashqueue = Thread.Queue();
     stash = Stdio.Buffer();
     stashcount = Thread.ResourceCount();
@@ -2232,8 +2230,9 @@ class proxy {
             showportalstack("AFTER READYFORQUERY");
 #endif
             readyforquerycount--;
-            if (readyforquery_cb)
-              readyforquery_cb(), readyforquery_cb = 0;
+            function (:void) cb;
+            if (cb = readyforquery_cb)
+              readyforquery_cb = 0, cb();
             destruct(waitforauthready);
             break;
           }
@@ -2313,9 +2312,11 @@ class proxy {
             msglen -= 4;
             PD("NoData %O\n", portal._query);
 #endif
-            portal._fetchlimit = 0;		// disables subsequent Executes
-            portal->_processrowdesc(({}), ({}));
-            portal = 0;
+            if (!portal._forcetext) {
+              portal._fetchlimit = 0;		// disables subsequent Executes
+              portal->_processrowdesc(({}), ({}));
+              portal = 0;
+            }
             break;
           }
           case 'H':
@@ -2381,8 +2382,8 @@ class proxy {
               errtype = PROTOCOLERROR;
 #endif
             string s = cr->read(msglen - 1);
-            portal->_storetiming();
-            PD("%O CommandComplete %O\n", portal._portalname, s);
+            PD("%O CommandComplete %O\n",
+             objectp(portal) && portal._portalname, s);
 #ifdef PG_DEBUG
             if (cr->read_int8())
               errtype = PROTOCOLERROR;
@@ -2393,8 +2394,11 @@ class proxy {
 #ifdef PG_DEBUGMORE
             showportalstack("COMMANDCOMPLETE");
 #endif
-            portal->_releasesession(s);
-            portal = 0;
+            if (!portal._forcetext) {
+              portal->_storetiming();
+              portal->_releasesession(s);
+              portal = 0;
+            }
             break;
           }
           case 'I':
@@ -2405,8 +2409,10 @@ class proxy {
 #ifdef PG_DEBUGMORE
             showportalstack("EMPTYQUERYRESPONSE");
 #endif
-            portal->_releasesession();
-            portal = 0;
+            if (!portal._forcetext) {
+              portal->_releasesession();
+              portal = 0;
+            }
             break;
           case 'd':
             PD("%O CopyData\n", portal._portalname);
@@ -2432,7 +2438,8 @@ class proxy {
             PD("%O CopyDone\n", portal._portalname);
             msglen -= 4;
 #endif
-            portal = 0;
+            if (!portal._forcetext)
+              portal = 0;
             break;
           case 'E': {
 #ifdef PG_DEBUGMORE
-- 
GitLab