diff --git a/lib/modules/Sql.pmod/pgsql.h b/lib/modules/Sql.pmod/pgsql.h index d93a51d6b3f8addc568e9094af29e1c0e0e22277..57b6d8f00541cad5b041a5f43f01a0d95c8f3514 100644 --- a/lib/modules/Sql.pmod/pgsql.h +++ b/lib/modules/Sql.pmod/pgsql.h @@ -94,6 +94,7 @@ #define COPYINPROGRESS 4 #define CLOSING 5 #define CLOSED 6 +#define PURGED 7 #define KEEP 0 // Sendcmd subcommands #define SENDOUT 1 diff --git a/lib/modules/Sql.pmod/pgsql.pike b/lib/modules/Sql.pmod/pgsql.pike index c746b796245747ecf79f6a75200323cc0b51dc84..537e2b32a10bb4b892ca17db3550e1c317624a70 100644 --- a/lib/modules/Sql.pmod/pgsql.pike +++ b/lib/modules/Sql.pmod/pgsql.pike @@ -88,8 +88,6 @@ private int totalhits; private int cachedepth=STATEMENTCACHEDEPTH; private int timeout=QUERYTIMEOUT; private int portalbuffersize=PORTALBUFFERSIZE; -private int reconnected; // Number of times the connection was reset -private int reconnectdelay; // Time to next reconnect #ifdef PG_STATS private int skippeddescribe; // Number of times we skipped Describe phase private int portalsopened; // Number of portals opened @@ -148,12 +146,6 @@ private string _sprintf(int type, void|mapping flags) { //! @param options //! Currently supports at least the following: //! @mapping -//! @member int "reconnect" -//! Set it to zero to disable automatic reconnects upon losing -//! the connection to the database. Not setting it, or setting -//! it to one, will cause one timed reconnect to take place. -//! Setting it to -1 will cause the system to try and reconnect -//! indefinitely. //! @member int "use_ssl" //! If the database supports and allows SSL connections, the session //! will be SSL encrypted, if not, the connection will fallback @@ -223,7 +215,20 @@ protected void create(void|string host, void|string database, _port = PGSQL_DEFAULT_PORT; .pgsql_util.register_backend(); _shortmux=Thread.Mutex(); - reconnect(); + PD("Connect\n"); + waitforauthready = Thread.Condition(); + qportals=Thread.Queue(); + _readyforquerycount = 1; + qportals->write(1); + if (!(c = .pgsql_util.conxion(this, qportals, 0))) + ERROR("Couldn't connect to database on %s:%d\n",_host,_port); + _runtimeparameter = ([]); + _unnamedportalmux = Thread.Mutex(); + unnamedstatement = Thread.Mutex(); + readyforquery_cb = connect_cb; + _portalsinflight = 0; + _statementsinflight = 0; + _wasparallelisable = 0; } //! @returns @@ -290,8 +295,6 @@ protected void create(void|string host, void|string database, //! @int //! @value 0 //! Everything ok. -//! @value 1 -//! The connection has reconnected automatically. //! @value -1 //! The server has gone away, and the connection is dead. //! @endint @@ -299,8 +302,8 @@ protected void create(void|string host, void|string database, //! @seealso //! @[is_open()] /*semi*/final int ping() { - return is_open() && !catch(c->start()->sendcmd(FLUSHSEND)) - ? !!reconnected : -1; + waitauthready(); + return is_open() && !catch(c->start()->sendcmd(FLUSHSEND)) ? 0 : -1; } //! Cancels all currently running queries in this session. @@ -421,8 +424,6 @@ protected void create(void|string host, void|string database, //! @member int "messages_received" //! Total number of messages received from the database (one SQL-statement //! requires multiple messages to be exchanged). -//! @member int "reconnect_count" -//! Number of times the connection to the database has been lost. //! @member int "portals_in_flight" //! Currently still open portals, i.e. running statements. //! @endmapping @@ -443,9 +444,6 @@ protected void create(void|string host, void|string database, #endif "messages_received":_msgsreceived, "bytes_received":_bytesreceived, - "reconnect_count":reconnected, - "portals_in_flight":_portalsinflight, - "statements_in_flight":_statementsinflight, ]); return stats; } @@ -537,11 +535,6 @@ private void connect_cb() { PD("%O\n",_runtimeparameter); } -private void reconnect_cb() { - lastmessage+=({sprintf("Reconnected to database %s",host_info())}); - runcallback(backendpid,"_reconnect",""); -} - private array(string) showbindings(.pgsql_util.sql_result portal) { array(string) msgs=({}); array from; @@ -599,17 +592,13 @@ final void _processloop(.pgsql_util.conxion ci) { plugbuffer->add("user\0",user,0); if(database) plugbuffer->add("database\0",database,0); - _options.reconnect=undefinedp(_options.reconnect) || _options.reconnect; foreach(_options-.pgsql_util.censoroptions; string name; mixed value) plugbuffer->add(name,0,(string)value,0); plugbuffer->add_int8(0); PD("%O\n",(string)plugbuffer); object cs; if (catch(cs = ci->start())) { - if(_options.reconnect) - _connectfail(); - else - destruct(waitforauthready); + destruct(waitforauthready); unnamedstatement=0; termlock=0; return; @@ -647,6 +636,13 @@ private void procmessage() { ci->socket->query_fd(),++ci->queueinidx,portal,msgtype); }; #endif + int msgisfatal(mapping(string:string) msgresponse) { + if (!terminating) // Run the callback once per lost connection + runcallback(backendpid,"_lost",""); + return (has_prefix(msgresponse.C, "53") + || has_prefix(msgresponse.C, "3D") + || has_prefix(msgresponse.C, "57P")) && MAGICTERMINATE; + }; for(;;) { err=catch { #ifdef PG_DEBUG @@ -733,11 +729,8 @@ private void procmessage() { if (cnonce) { PD("Authentication validation still in progress\n"); errtype = PROTOCOLUNSUPPORTED; - } else { - .pgsql_util.local_backend->remove_call_out(reconnect); - reconnectdelay=0; + } else cancelsecret=""; - } break; case 2: PD("KerberosV5\n"); @@ -1175,7 +1168,7 @@ private void procmessage() { case "53000":case "53100":case "53200":case "53300":case "53400": case "57P01":case "57P02":case "57P03":case "57P04":case "3D000": preplastmessage(msgresponse); - PD(a2nls(lastmessage));throw(0); + PD(a2nls(lastmessage)); throw(msgisfatal(msgresponse)); case "08P01":case "42P05": errtype=PROTOCOLERROR; case "XX000":case "42883":case "42P01": @@ -1213,9 +1206,10 @@ private void procmessage() { warningscollected++; lastmessage=({sprintf("%s %s: %s", msgresponse.S,msgresponse.C,msgresponse.M)}); - if(has_prefix(msgresponse.C,"53")||has_prefix(msgresponse.C,"57P")) { + int val; + if (val = msgisfatal(msgresponse)) { // Some warnings are fatal preplastmessage(msgresponse); - PD(a2nls(lastmessage));throw(0); // Some warnings are fatal + PD(a2nls(lastmessage));throw(val); } break; } @@ -1315,19 +1309,19 @@ private void procmessage() { #endif portal->_purgeportal(); } - if(!terminating && _options.reconnect) - _connectfail(); - else - destruct(waitforauthready); + destruct(waitforauthready); termlock=0; if(err && !stringp(err)) throw(err); }; + catch { + unnamedstatement = 0; + termlock = 0; + }; if (err) { - unnamedstatement=0; - termlock = 0; PD("Terminating processloop due to %s\n", describe_backtrace(err)); } + catch(_connectfail(err)); } //! Closes the connection to the database, any running queries are @@ -1362,7 +1356,8 @@ protected void destroy() { * inside a destructor, throwing an error will not work anymore. * Warnings will be silently discarded at this point. */ - lastmessage = filter(lastmessage, has_prefix, "ERROR "); + lastmessage = filter(lastmessage, lambda(string val) { + return has_prefix(val, "ERROR ") || has_prefix(val, "FATAL "); }); if (err || (err = catch(errstring = error(1)))) werror(describe_backtrace(err)); else if (errstring && sizeof(errstring)) @@ -1370,78 +1365,12 @@ protected void destroy() { } final void _connectfail(void|mixed err) { - PD("Connect failed %O reconnectdelay %d\n",err,reconnectdelay); - if(!err || reconnectdelay) { - int tdelay; - switch(tdelay=reconnectdelay) { - case 0: - reconnectdelay=RECONNECTDELAY; - break; - default: - if(err) - _delayederror=err; - if (_options.reconnect!=-1) { - destruct(waitforauthready); - destruct(c); - return; - } - reconnectdelay=RECONNECTBACKOFF; - break; - } - Thread.MutexKey lock=_shortmux->lock(); - if(!waitforauthready) - waitforauthready=Thread.Condition(); - lock=0; - PD("Schedule reconnect in %ds\n",tdelay); - _delayederror=0; - callout(reconnect,tdelay,1); - } else if(err) - _delayederror=err; -} - -private int reconnect() { - int recon=0; - PD("(Re)connect\n"); - { - Thread.MutexKey lock=_shortmux->lock(); - if (!waitforauthready) - waitforauthready=Thread.Condition(); - lock=0; - } - if(c) { - PD("Close old connection\n"); - reconnected++;recon=1; -#ifdef PG_STATS - prepstmtused=0; -#endif - if (unnamedstatement) - termlock = unnamedstatement->lock(1); - catch(c->close()); - unnamedstatement = 0; - termlock = 0; - catch(destruct(c)); - PD("Flushing old cache\n"); - foreach(_prepareds;;mapping tp) - m_delete(tp,"preparedname"); - if(!_options.reconnect) - ERROR("Lost connection to database %s:%d\n",_host,_port); + if (err) { + PD("Connect failed %O\n", err); + _delayederror = err; } - PD("Actually start to connect\n"); - qportals=Thread.Queue(); - _readyforcommit=Thread.Condition(); - _readyforquerycount=1; - _waittocommit=0; - qportals->write(1); - if (!(c = .pgsql_util.conxion(this, qportals, 0))) - ERROR("Couldn't connect to database on %s:%d\n",_host,_port); - _runtimeparameter=([]); - _unnamedportalmux=Thread.Mutex(); - unnamedstatement=Thread.Mutex(); - readyforquery_cb=recon?reconnect_cb:connect_cb; - _portalsinflight=0; - _statementsinflight = 0; - _wasparallelisable = 0; - return 1; + destruct(waitforauthready); + destruct(c); } //! For PostgreSQL this function performs the same function as @[resync()]. @@ -1510,25 +1439,23 @@ private void sendsync() { }; PD("%O\n",err); } - if(!reconnect()&&sizeof(lastmessage)) + if(sizeof(lastmessage)) ERROR(a2nls(lastmessage)); } -//! This function allows you to connect to a database. Due to -//! restrictions of the Postgres frontend-backend protocol, you always -//! have to be connected to a database, so in fact this function just -//! allows you to connect to a different database on the same server. -//! -//! @note -//! This function @b{can@} raise exceptions if something goes wrong -//! (backend process not running, insufficient privileges...) +//! Due to restrictions of the Postgres frontend-backend protocol, you always +//! already have to be connected to a database. +//! To connect to a different database you have to select the right +//! database while connecting instead. This function is a no-op when +//! specifying the same database, and throws an error otherwise. //! //! @seealso //! @[create()] /*semi*/final void select_db(string dbname) { - database=dbname; - reconnect(); - reconnected=0; + if (database != dbname) + ERROR("Cannot switch databases from %O to %O" + " in an already established connection\n", + database, dbname); } //! With PostgreSQL you can LISTEN to NOTIFY events. @@ -1539,8 +1466,8 @@ private void sendsync() { //! to. A special case is the empty string, which matches all events, //! and can be used as fallback function which is called only when the //! specific condition is not handled. Another special case is -//! @expr{"_reconnect"@} which gets called whenever the connection -//! unexpectedly drops and reconnects to the database. +//! @expr{"_lost"@} which gets called whenever the connection +//! to the database unexpectedly drops. //! //! @param notify_cb //! Function to be called on receiving a notification-event of @@ -2012,9 +1939,7 @@ private inline void throwdelayederror(object parent) { tp = .pgsql_util.describenodata; // Description already known else if (!forcetext && forcecache == 1 || forcecache && sizeof(q) >= MINPREPARELENGTH) { - object plugbuffer; - while(catch(plugbuffer=c->start())) - reconnect(); + object plugbuffer = c->start(); if(tp=_prepareds[q]) { if(tp.preparedname) { #ifdef PG_STATS diff --git a/lib/modules/Sql.pmod/pgsql_util.pmod b/lib/modules/Sql.pmod/pgsql_util.pmod index fd1071b8501ac2279b9d9670032fb7fe4a52fd1a..958b38c1a3abba518515c346acef675af636f5e9 100644 --- a/lib/modules/Sql.pmod/pgsql_util.pmod +++ b/lib/modules/Sql.pmod/pgsql_util.pmod @@ -127,7 +127,8 @@ private void run_local_backend() { looponce=0; if(lock=backendmux->trylock()) { PD("Starting local backend\n"); - while (clientsregistered) { // Autoterminate when not needed + while (clientsregistered // Autoterminate when not needed + || sizeof(local_backend->call_out_info())) { mixed err; if (err = catch(local_backend(4096.0))) werror(describe_backtrace(err)); @@ -494,7 +495,7 @@ outer: socket->close(); }; } - connectfail=0; + catch(connectfail = 0); } final void connectloop(object pgsqlsess, int nossl) { @@ -673,7 +674,8 @@ class sql_result { int _portalbuffersize,int alltyped,array params,int forcetext, int _timeout, int _syncparse, int _transtype) { pgsqlsess = _pgsqlsess; - cr = (c = _c)->i; + if (catch(cr = (c = _c)->i)) + losterror(); _query = query; datarows = Thread.Queue(); _ddescribe=Thread.Condition(); @@ -755,9 +757,18 @@ class sql_result { return rowsreceived; } - private inline void trydelayederror() { + private void losterror() { + string err; + if (pgsqlsess) + err = pgsqlsess->error(1); + error("%s\n", err || "Database connection lost"); + } + + private void trydelayederror() { if(_delayederror) throwdelayederror(this); + else if (_state == PURGED) + losterror(); } //! @seealso @@ -1133,7 +1144,7 @@ class sql_result { case PARSING: --pgsqlsess->_statementsinflight; } - _state=CLOSED; + _state = PURGED; lock=0; releaseconditions(); } @@ -1214,8 +1225,7 @@ class sql_result { pgsqlsess=0; if(!datarowtypes) { Thread.MutexKey lock=_ddescribemux->lock(); - datarowtypes=emptyarray; - datarowdesc=emptyarray; + datarowdesc = datarowtypes = emptyarray; _ddescribe->broadcast(); lock=0; } @@ -1229,7 +1239,8 @@ class sql_result { conxsess plugbuffer; if (!catch(plugbuffer = c->start())) plugbuffer->sendcmd(_closeportal(plugbuffer)); - _state=CLOSED; + if (_state < CLOSED) + _state = CLOSED; datarows->write(1); // Signal EOF releaseconditions(); }