diff --git a/lib/modules/Sql.pmod/pgsql.pike b/lib/modules/Sql.pmod/pgsql.pike index e9e19c448f2d84c260a84d64bf1f6f4722b5515e..b4b4d1697ca01e14203b83ebb698901ee7bdbfe9 100644 --- a/lib/modules/Sql.pmod/pgsql.pike +++ b/lib/modules/Sql.pmod/pgsql.pike @@ -63,7 +63,8 @@ final int _fetchlimit=FETCHLIMIT; final Thread.Mutex _unnamedportalmux; -private Thread.Mutex unnamedstatement; +private Thread.Mutex unnamedstatement,termthread; +private Thread.MutexKey termlock; final int _portalsinflight; private .pgsql_util.conxion c; @@ -597,8 +598,6 @@ private int datarowdebugcount; #endif final void _processloop(.pgsql_util.conxion ci) { - if(!this) // Oops, current object already destructed - return; if(c && (!ci || c!=ci)) // If we are switching or dropping connections c->close(); // force a close on the old socket (c=ci)->socket->set_id(procmessage); @@ -616,11 +615,11 @@ final void _processloop(.pgsql_util.conxion ci) { plugbuffer->add_int8(0); PD("%O\n",(string)plugbuffer); if(catch(ci->start()->add_hstring(plugbuffer,4,4)->sendcmd(SENDOUT))) { - if(this) // Only when not destructed yet - if(_options.reconnect) - _connectfail(); - else - destruct(waitforauthready); + if(_options.reconnect) + _connectfail(); + else + destruct(waitforauthready); + termlock=0; return; } } // Do not flush at this point, PostgreSQL 9.4 disapproves @@ -628,8 +627,6 @@ final void _processloop(.pgsql_util.conxion ci) { } private void procmessage() { - if(!this) // Oops, current object already destructed - return; int terminating=0; .pgsql_util.conxion ci=c; // cache value FIXME sensible? .pgsql_util.conxiin cr=ci->i; // cache value FIXME sensible? @@ -1179,25 +1176,20 @@ private void procmessage() { } break; } - if(!this) { // Already destructed - ci->close(); // So close descriptors only - return; - } PD("Closing database processloop %O\n",err); - catch { // Cater for destruct races - _delayederror=err; - for(;objectp(portal);portal=qportals->read()) - if(objectp(portal)) { + _delayederror=err; + for(;objectp(portal);portal=qportals->read()) + if(objectp(portal)) { #ifdef PG_DEBUG - showportal(0); + showportal(0); #endif - portal->_purgeportal(); - } - if(!ci->close() && !terminating && _options.reconnect) - _connectfail(); - else - destruct(waitforauthready); - }; + portal->_purgeportal(); + } + if(!ci->close() && !terminating && _options.reconnect) + _connectfail(); + else + destruct(waitforauthready); + termlock=0; if(err && !stringp(err)) throw(err); } @@ -1211,14 +1203,16 @@ private void procmessage() { /*semi*/final void close() { if(qportals && qportals->size()) catch(cancelquery()); - catch(c->close()); + c->close(); c=0; destruct(waitforauthready); } protected void destroy() { + termlock=(termthread=Thread.Mutex())->lock(); catch(close()); .pgsql_util.unregister_backend(); + termthread->lock(1); } final void _connectfail(void|mixed err) { diff --git a/lib/modules/Sql.pmod/pgsql_util.pmod b/lib/modules/Sql.pmod/pgsql_util.pmod index 4fad1343759c22fcb5bead73d483216197ab2572..79fa3a920940a6a54164dcb09e8b47419a7e5bbc 100644 --- a/lib/modules/Sql.pmod/pgsql_util.pmod +++ b/lib/modules/Sql.pmod/pgsql_util.pmod @@ -228,6 +228,9 @@ class conxion { private Thread.Queue qportals; final Thread.Mutex shortmux; + private Thread.Mutex termthread; + private Thread.MutexKey termlock; + final Stdio.File socket; private function(void|mixed:void) connectfail; private int towrite; @@ -355,12 +358,15 @@ outer: foreach(closecallbacks;function(void|mixed:void) closecb;) closecb(); closecallbacks=(<>); + termlock=0; return ret; } protected void destroy() { + termlock=termthread->lock(); catch(close()); // Exceptions don't work inside destructors connectfail=0; + termthread->lock(1); } final void connectloop(object pgsqlsess, int nossl) { @@ -435,6 +441,7 @@ outer: i=conxiin(); shortmux=Thread.Mutex(); nostash=Thread.Mutex(); + termthread=Thread.Mutex(); stashavail=Thread.Condition(); stashqueue=Thread.Queue(); stash=Stdio.Buffer();