diff --git a/lib/modules/Sql.pmod/pgsql.pike b/lib/modules/Sql.pmod/pgsql.pike index 9c09529772c6317298d21db403382f360c34dedd..065a91a29685b348926a09cfd0e06b04634663a7 100644 --- a/lib/modules/Sql.pmod/pgsql.pike +++ b/lib/modules/Sql.pmod/pgsql.pike @@ -81,6 +81,8 @@ private int timeout = QUERYTIMEOUT; private array connparmcache; private int reconnected; private int lastping = time(1); +private Thread.Condition resynced; +private Thread.Mutex resyncmux; protected string _sprintf(int type) { string res; @@ -291,7 +293,7 @@ protected void create(void|string host, void|string database, //! @url{https://www.postgresql.org/docs/current/static/multibyte.html@} /*semi*/final void set_charset(string charset) { if(charset) - big_query(sprintf("SET CLIENT_ENCODING TO '%s'", quote(charset))); + textquery(sprintf("SET CLIENT_ENCODING TO '%s'", quote(charset))); } //! @returns @@ -499,17 +501,32 @@ protected void destroy() { resync(); } +private void textquery(string q) { +#if 1 + foreach (q / ";"; ; string sq) + big_query(sq); +#else // textqueries and portals do not mix well + big_query(q, (["_text":1])); +#endif +} + +private void resyncdone() { + Thread.MutexKey lock = resyncmux->lock(); + resynced.signal(); // Allow resync() to continue +} + private void reset_dbsession() { proxy.statementsinflight->wait_till_drained(); error(1); - big_query("ROLLBACK"); - big_query("RESET ALL"); - big_query("CLOSE ALL"); - big_query("DISCARD TEMP"); + textquery("ROLLBACK;RESET ALL;CLOSE ALL;DISCARD TEMP"); + resyncdone(); } private void resync_cb() { switch (proxy.backendstatus) { + default: + resyncdone(); + break; case 'T':case 'E': foreach (proxy.prepareds; ; mapping tp) { m_delete(tp, "datatypeoid"); @@ -546,8 +563,16 @@ private void resync_cb() { PD("Statementsinflight: %d Portalsinflight: %d\n", proxy.statementsinflight, proxy.portalsinflight); if(!proxy.waitforauthready) { + if (!resynced) { + resynced = Thread.Condition(); + resyncmux = Thread.Mutex(); + } proxy.readyforquery_cb = resync_cb; proxy.sendsync(); + if (proxy.readyforquery_cb) { + Thread.MutexKey lock = resyncmux->lock(); + resynced.wait(lock); // Wait for the db to finish + } } return; }; @@ -665,7 +690,7 @@ private void resync_cb() { //! @seealso //! @[drop_db()] /*semi*/final void create_db(string db) { - big_query(sprintf("CREATE DATABASE %s", db)); + textquery(sprintf("CREATE DATABASE %s", db)); } //! This function destroys a database and all the data it contains (assuming @@ -679,7 +704,7 @@ private void resync_cb() { //! @seealso //! @[create_db()] /*semi*/final void drop_db(string db) { - big_query(sprintf("DROP DATABASE %s", db)); + textquery(sprintf("DROP DATABASE %s", db)); } //! @returns diff --git a/lib/modules/Sql.pmod/pgsql_util.pmod b/lib/modules/Sql.pmod/pgsql_util.pmod index 9796877dc3528e89e1b2db677d0bcd443ee7558c..e52286dac9a26b64b5ae627ffbc7c5e47f6a195a 100644 --- a/lib/modules/Sql.pmod/pgsql_util.pmod +++ b/lib/modules/Sql.pmod/pgsql_util.pmod @@ -430,7 +430,6 @@ class conxion { final MUTEX nostash; final Thread.MutexKey started; final Thread.Queue stashqueue; - final Thread.Condition stashavail; final Stdio.Buffer stash; //! @ignore final int(KEEP..SYNCSEND) stashflushmode; @@ -724,7 +723,6 @@ outer: shortmux = MUTEX(); nostash = MUTEX(); closenext = 0; - stashavail = Thread.Condition(); stashqueue = Thread.Queue(); stash = Stdio.Buffer(); stashcount = Thread.ResourceCount(); @@ -2232,8 +2230,9 @@ class proxy { showportalstack("AFTER READYFORQUERY"); #endif readyforquerycount--; - if (readyforquery_cb) - readyforquery_cb(), readyforquery_cb = 0; + function (:void) cb; + if (cb = readyforquery_cb) + readyforquery_cb = 0, cb(); destruct(waitforauthready); break; } @@ -2313,9 +2312,11 @@ class proxy { msglen -= 4; PD("NoData %O\n", portal._query); #endif - portal._fetchlimit = 0; // disables subsequent Executes - portal->_processrowdesc(({}), ({})); - portal = 0; + if (!portal._forcetext) { + portal._fetchlimit = 0; // disables subsequent Executes + portal->_processrowdesc(({}), ({})); + portal = 0; + } break; } case 'H': @@ -2381,8 +2382,8 @@ class proxy { errtype = PROTOCOLERROR; #endif string s = cr->read(msglen - 1); - portal->_storetiming(); - PD("%O CommandComplete %O\n", portal._portalname, s); + PD("%O CommandComplete %O\n", + objectp(portal) && portal._portalname, s); #ifdef PG_DEBUG if (cr->read_int8()) errtype = PROTOCOLERROR; @@ -2393,8 +2394,11 @@ class proxy { #ifdef PG_DEBUGMORE showportalstack("COMMANDCOMPLETE"); #endif - portal->_releasesession(s); - portal = 0; + if (!portal._forcetext) { + portal->_storetiming(); + portal->_releasesession(s); + portal = 0; + } break; } case 'I': @@ -2405,8 +2409,10 @@ class proxy { #ifdef PG_DEBUGMORE showportalstack("EMPTYQUERYRESPONSE"); #endif - portal->_releasesession(); - portal = 0; + if (!portal._forcetext) { + portal->_releasesession(); + portal = 0; + } break; case 'd': PD("%O CopyData\n", portal._portalname); @@ -2432,7 +2438,8 @@ class proxy { PD("%O CopyDone\n", portal._portalname); msglen -= 4; #endif - portal = 0; + if (!portal._forcetext) + portal = 0; break; case 'E': { #ifdef PG_DEBUGMORE