From 6476a44d80c5441ca29ae042d79c295ef8f5f373 Mon Sep 17 00:00:00 2001 From: "Stephen R. van den Berg" <srb@cuci.nl> Date: Tue, 12 May 2020 15:55:42 +0200 Subject: [PATCH] pgsql: Synchronous resync() and fix portalstack for text-multiqueries. --- lib/modules/Sql.pmod/pgsql.pike | 39 +++++++++++++++++++++++----- lib/modules/Sql.pmod/pgsql_util.pmod | 35 +++++++++++++++---------- 2 files changed, 53 insertions(+), 21 deletions(-) diff --git a/lib/modules/Sql.pmod/pgsql.pike b/lib/modules/Sql.pmod/pgsql.pike index 93f068e97b..73cce7cf28 100644 --- a/lib/modules/Sql.pmod/pgsql.pike +++ b/lib/modules/Sql.pmod/pgsql.pike @@ -84,6 +84,8 @@ private int timeout = QUERYTIMEOUT; private array connparmcache; private int reconnected; private int lastping = time(1); +private Thread.Condition resynced; +private Thread.Mutex resyncmux; protected string _sprintf(int type) { string res; @@ -294,7 +296,7 @@ protected void create(void|string host, void|string database, //! @url{https://www.postgresql.org/docs/current/static/multibyte.html@} /*semi*/final void set_charset(string charset) { if(charset) - big_query(sprintf("SET CLIENT_ENCODING TO '%s'", quote(charset))); + textquery(sprintf("SET CLIENT_ENCODING TO '%s'", quote(charset))); } //! @returns @@ -502,17 +504,32 @@ protected void _destruct() { resync(); } +private void textquery(string q) { +#if 1 + foreach (q / ";"; ; string sq) + big_query(sq); +#else // textqueries and portals do not mix well + big_query(q, (["_text":1])); +#endif +} + +private void resyncdone() { + Thread.MutexKey lock = resyncmux->lock(); + resynced.signal(); // Allow resync() to continue +} + private void reset_dbsession() { proxy.statementsinflight->wait_till_drained(); error(1); - big_query("ROLLBACK"); - big_query("RESET ALL"); - big_query("CLOSE ALL"); - big_query("DISCARD TEMP"); + textquery("ROLLBACK;RESET ALL;CLOSE ALL;DISCARD TEMP"); + resyncdone(); } private void resync_cb() { switch (proxy.backendstatus) { + default: + resyncdone(); + break; case 'T':case 'E': foreach (proxy.prepareds; ; mapping tp) { m_delete(tp, "datatypeoid"); @@ -549,8 +566,16 @@ private void resync_cb() { PD("Statementsinflight: %d Portalsinflight: %d\n", proxy.statementsinflight, proxy.portalsinflight); if(!proxy.waitforauthready) { + if (!resynced) { + resynced = Thread.Condition(); + resyncmux = Thread.Mutex(); + } proxy.readyforquery_cb = resync_cb; proxy.sendsync(); + if (proxy.readyforquery_cb) { + Thread.MutexKey lock = resyncmux->lock(); + resynced.wait(lock); // Wait for the db to finish + } } return; }; @@ -668,7 +693,7 @@ private void resync_cb() { //! @seealso //! @[drop_db()] /*semi*/final void create_db(string db) { - big_query(sprintf("CREATE DATABASE %s", db)); + textquery(sprintf("CREATE DATABASE %s", db)); } //! This function destroys a database and all the data it contains (assuming @@ -682,7 +707,7 @@ private void resync_cb() { //! @seealso //! @[create_db()] /*semi*/final void drop_db(string db) { - big_query(sprintf("DROP DATABASE %s", db)); + textquery(sprintf("DROP DATABASE %s", db)); } //! @returns diff --git a/lib/modules/Sql.pmod/pgsql_util.pmod b/lib/modules/Sql.pmod/pgsql_util.pmod index e8173b7dfd..7e58c68a39 100644 --- a/lib/modules/Sql.pmod/pgsql_util.pmod +++ b/lib/modules/Sql.pmod/pgsql_util.pmod @@ -478,7 +478,6 @@ class conxion { final MUTEX nostash; final Thread.MutexKey started; final Thread.Queue stashqueue; - final Thread.Condition stashavail; final Stdio.Buffer stash; //! @ignore final int(KEEP..SYNCSEND) stashflushmode; @@ -772,7 +771,6 @@ outer: shortmux = MUTEX(); nostash = MUTEX(); closenext = 0; - stashavail = Thread.Condition(); stashqueue = Thread.Queue(); stash = Stdio.Buffer(); stashcount = Thread.ResourceCount(); @@ -2414,8 +2412,9 @@ class proxy { showportalstack("AFTER READYFORQUERY"); #endif readyforquerycount--; - if (readyforquery_cb) - readyforquery_cb(), readyforquery_cb = 0; + function (:void) cb; + if (cb = readyforquery_cb) + readyforquery_cb = 0, cb(); destruct(waitforauthready); break; } @@ -2495,9 +2494,11 @@ class proxy { msglen -= 4; PD("NoData %O\n", portal._query); #endif - portal._fetchlimit = 0; // disables subsequent Executes - portal->_processrowdesc(({}), ({})); - portal = 0; + if (!portal._forcetext) { + portal._fetchlimit = 0; // disables subsequent Executes + portal->_processrowdesc(({}), ({})); + portal = 0; + } break; } case 'H': @@ -2563,8 +2564,8 @@ class proxy { errtype = PROTOCOLERROR; #endif string s = cr->read(msglen - 1); - portal->_storetiming(); - PD("%O CommandComplete %O\n", portal._portalname, s); + PD("%O CommandComplete %O\n", + objectp(portal) && portal._portalname, s); #ifdef PG_DEBUG if (cr->read_int8()) errtype = PROTOCOLERROR; @@ -2575,8 +2576,11 @@ class proxy { #ifdef PG_DEBUGMORE showportalstack("COMMANDCOMPLETE"); #endif - portal->_releasesession(s); - portal = 0; + if (!portal._forcetext) { + portal->_storetiming(); + portal->_releasesession(s); + portal = 0; + } break; } case 'I': @@ -2587,8 +2591,10 @@ class proxy { #ifdef PG_DEBUGMORE showportalstack("EMPTYQUERYRESPONSE"); #endif - portal->_releasesession(); - portal = 0; + if (!portal._forcetext) { + portal->_releasesession(); + portal = 0; + } break; case 'd': PD("%O CopyData\n", portal._portalname); @@ -2614,7 +2620,8 @@ class proxy { PD("%O CopyDone\n", portal._portalname); msglen -= 4; #endif - portal = 0; + if (!portal._forcetext) + portal = 0; break; case 'E': { #ifdef PG_DEBUGMORE -- GitLab