pgsql: Synchronous resync() and fix portalstack for text-multiqueries.

parent 49171bd2
......@@ -84,6 +84,8 @@ private int timeout = QUERYTIMEOUT;
private array connparmcache;
private int reconnected;
private int lastping = time(1);
private Thread.Condition resynced;
private Thread.Mutex resyncmux;
protected string _sprintf(int type) {
string res;
......@@ -294,7 +296,7 @@ protected void create(void|string host, void|string database,
//! @url{https://www.postgresql.org/docs/current/static/multibyte.html@}
/*semi*/final void set_charset(string charset) {
if(charset)
big_query(sprintf("SET CLIENT_ENCODING TO '%s'", quote(charset)));
textquery(sprintf("SET CLIENT_ENCODING TO '%s'", quote(charset)));
}
//! @returns
......@@ -502,17 +504,32 @@ protected void _destruct() {
resync();
}
private void textquery(string q) {
#if 1
foreach (q / ";"; ; string sq)
big_query(sq);
#else // textqueries and portals do not mix well
big_query(q, (["_text":1]));
#endif
}
private void resyncdone() {
Thread.MutexKey lock = resyncmux->lock();
resynced.signal(); // Allow resync() to continue
}
private void reset_dbsession() {
proxy.statementsinflight->wait_till_drained();
error(1);
big_query("ROLLBACK");
big_query("RESET ALL");
big_query("CLOSE ALL");
big_query("DISCARD TEMP");
textquery("ROLLBACK;RESET ALL;CLOSE ALL;DISCARD TEMP");
resyncdone();
}
private void resync_cb() {
switch (proxy.backendstatus) {
default:
resyncdone();
break;
case 'T':case 'E':
foreach (proxy.prepareds; ; mapping tp) {
m_delete(tp, "datatypeoid");
......@@ -549,8 +566,16 @@ private void resync_cb() {
PD("Statementsinflight: %d Portalsinflight: %d\n",
proxy.statementsinflight, proxy.portalsinflight);
if(!proxy.waitforauthready) {
if (!resynced) {
resynced = Thread.Condition();
resyncmux = Thread.Mutex();
}
proxy.readyforquery_cb = resync_cb;
proxy.sendsync();
if (proxy.readyforquery_cb) {
Thread.MutexKey lock = resyncmux->lock();
resynced.wait(lock); // Wait for the db to finish
}
}
return;
};
......@@ -668,7 +693,7 @@ private void resync_cb() {
//! @seealso
//! @[drop_db()]
/*semi*/final void create_db(string db) {
big_query(sprintf("CREATE DATABASE %s", db));
textquery(sprintf("CREATE DATABASE %s", db));
}
//! This function destroys a database and all the data it contains (assuming
......@@ -682,7 +707,7 @@ private void resync_cb() {
//! @seealso
//! @[create_db()]
/*semi*/final void drop_db(string db) {
big_query(sprintf("DROP DATABASE %s", db));
textquery(sprintf("DROP DATABASE %s", db));
}
//! @returns
......
......@@ -478,7 +478,6 @@ class conxion {
final MUTEX nostash;
final Thread.MutexKey started;
final Thread.Queue stashqueue;
final Thread.Condition stashavail;
final Stdio.Buffer stash;
//! @ignore
final int(KEEP..SYNCSEND) stashflushmode;
......@@ -772,7 +771,6 @@ outer:
shortmux = MUTEX();
nostash = MUTEX();
closenext = 0;
stashavail = Thread.Condition();
stashqueue = Thread.Queue();
stash = Stdio.Buffer();
stashcount = Thread.ResourceCount();
......@@ -2414,8 +2412,9 @@ class proxy {
showportalstack("AFTER READYFORQUERY");
#endif
readyforquerycount--;
if (readyforquery_cb)
readyforquery_cb(), readyforquery_cb = 0;
function (:void) cb;
if (cb = readyforquery_cb)
readyforquery_cb = 0, cb();
destruct(waitforauthready);
break;
}
......@@ -2495,9 +2494,11 @@ class proxy {
msglen -= 4;
PD("NoData %O\n", portal._query);
#endif
portal._fetchlimit = 0; // disables subsequent Executes
portal->_processrowdesc(({}), ({}));
portal = 0;
if (!portal._forcetext) {
portal._fetchlimit = 0; // disables subsequent Executes
portal->_processrowdesc(({}), ({}));
portal = 0;
}
break;
}
case 'H':
......@@ -2563,8 +2564,8 @@ class proxy {
errtype = PROTOCOLERROR;
#endif
string s = cr->read(msglen - 1);
portal->_storetiming();
PD("%O CommandComplete %O\n", portal._portalname, s);
PD("%O CommandComplete %O\n",
objectp(portal) && portal._portalname, s);
#ifdef PG_DEBUG
if (cr->read_int8())
errtype = PROTOCOLERROR;
......@@ -2575,8 +2576,11 @@ class proxy {
#ifdef PG_DEBUGMORE
showportalstack("COMMANDCOMPLETE");
#endif
portal->_releasesession(s);
portal = 0;
if (!portal._forcetext) {
portal->_storetiming();
portal->_releasesession(s);
portal = 0;
}
break;
}
case 'I':
......@@ -2587,8 +2591,10 @@ class proxy {
#ifdef PG_DEBUGMORE
showportalstack("EMPTYQUERYRESPONSE");
#endif
portal->_releasesession();
portal = 0;
if (!portal._forcetext) {
portal->_releasesession();
portal = 0;
}
break;
case 'd':
PD("%O CopyData\n", portal._portalname);
......@@ -2614,7 +2620,8 @@ class proxy {
PD("%O CopyDone\n", portal._portalname);
msglen -= 4;
#endif
portal = 0;
if (!portal._forcetext)
portal = 0;
break;
case 'E': {
#ifdef PG_DEBUGMORE
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment