diff --git a/lib/modules/Sql.pmod/pgsql.h b/lib/modules/Sql.pmod/pgsql.h index 57b6d8f00541cad5b041a5f43f01a0d95c8f3514..8d139382d6c972f16db4d65514dc7e25fe4b7b78 100644 --- a/lib/modules/Sql.pmod/pgsql.h +++ b/lib/modules/Sql.pmod/pgsql.h @@ -30,8 +30,6 @@ #define PREPSTMTPREFIX "pike_prep_" #define PTSTMTPREFIX "pike_tprep_" #define PORTALPREFIX "pike_portal_" -#define RECONNECTDELAY 1 // Initial delay for reconnects -#define RECONNECTBACKOFF 4 // Secondary delay for reconnect #define FACTORPLAN 8 // Determines criterium when caching plan // -> if parsingtime*FACTORPLAN >= runtime // cache the statement @@ -87,25 +85,12 @@ #define conxsess conxion #endif -#define PORTALINIT 0 // Portal states -#define PARSING 1 -#define BOUND 2 -#define COMMITTED 3 -#define COPYINPROGRESS 4 -#define CLOSING 5 -#define CLOSED 6 -#define PURGED 7 - #define KEEP 0 // Sendcmd subcommands #define SENDOUT 1 #define FLUSHSEND 2 #define FLUSHLOGSEND 3 #define SYNCSEND 4 -#define NOERROR 0 // Error states networkparser -#define PROTOCOLERROR 1 -#define PROTOCOLUNSUPPORTED 2 - #define NOTRANS 0 #define TRANSBEGIN 1 #define TRANSEND 2 diff --git a/lib/modules/Sql.pmod/pgsql.pike b/lib/modules/Sql.pmod/pgsql.pike index d480457fc035f8d62066dae702ba5e565bec2e8a..3cfcee54c66a76ead5f014b72b0ba5eb28e3edfb 100644 --- a/lib/modules/Sql.pmod/pgsql.pike +++ b/lib/modules/Sql.pmod/pgsql.pike @@ -17,6 +17,8 @@ //! PostgreSQL network protocol version 3, authentication methods //! currently supported are: cleartext, md5 and scram (recommended). //! @item +//! Optional asynchronous query interface through callbacks. +//! @item //! Streaming queries which do not buffer the whole resultset in memory. //! @item //! Automatic binary transfers to and from the database for most common @@ -52,7 +54,8 @@ //! difficult. //! //! @seealso -//! @[Sql.Sql], @[Sql.postgres], @url{http://www.postgresql.org/docs/@} +//! @[Sql.Sql], @[Sql.postgres], +//! @url{https://www.postgresql.org/docs/current/static/@} #pike __REAL_VERSION__ #require constant(Thread.Thread) @@ -61,68 +64,34 @@ #define ERROR(X ...) predef::error(X) -final int _fetchlimit=FETCHLIMIT; -final Thread.Mutex _unnamedportalmux; -private Thread.Mutex unnamedstatement; -private Thread.MutexKey termlock; -private Thread.ResourceCountKey backendreg; -final Thread.ResourceCount _portalsinflight, _statementsinflight; -final int _wasparallelisable; -final int _intransaction; +private .pgsql_util.proxy proxy; -private .pgsql_util.conxion c; -private string cancelsecret; -private int backendpid, backendstatus; -final mapping(string:mixed) _options; -private array(string) lastmessage=({}); -private int clearmessage; -private mapping(string:array(mixed)) notifylist=([]); -final mapping(string:string) _runtimeparameter; -final mapping(string:mapping(string:mixed)) _prepareds=([]); private int pstmtcount; private int ptstmtcount; // Periodically one would like to reset these // but checking when this is safe to do // probably is more costly than the gain -final int _pportalcount; -private int totalhits; -private int cachedepth=STATEMENTCACHEDEPTH; -private int timeout=QUERYTIMEOUT; -private int portalbuffersize=PORTALBUFFERSIZE; #ifdef PG_STATS private int skippeddescribe; // Number of times we skipped Describe phase private int portalsopened; // Number of portals opened private int prepstmtused; // Number of times we used prepared statements #endif -final int _msgsreceived; // Number of protocol messages received -final int _bytesreceived; // Number of bytes received -private int warningsdropcount; // Number of uncollected warnings -private int warningscollected; -private int invalidatecache; -private Thread.Queue qportals; -final mixed _delayederror; -private function (:void) readyforquery_cb; - -final string _host; -final int _port; -private string database, user, pass; -private Crypto.SCRAM SASLcontext; -private Thread.Condition waitforauthready; -final Thread.Mutex _shortmux; -final int _readyforquerycount; +private int cachedepth = STATEMENTCACHEDEPTH; +private int portalbuffersize = PORTALBUFFERSIZE; +private int timeout = QUERYTIMEOUT; protected string _sprintf(int type) { - string res=UNDEFINED; + string res; switch(type) { case 'O': - res=sprintf(DRIVERNAME"(%s@%s:%d/%s,%d,%d)", - user,_host,_port,database,c?->socket&&c->socket->query_fd(),backendpid); + res = sprintf(DRIVERNAME"(%s@%s:%d/%s,%d,%d)", + proxy.user, proxy.host, proxy.port, proxy.database, + proxy.c?->socket && proxy.c->socket->query_fd(), proxy.backendpid); break; } return res; } -//! With no arguments, this function initialises (reinitialises if a -//! connection has been set up previously) a connection to the +//! With no arguments, this function initialises a connection to the //! PostgreSQL backend. Since PostgreSQL requires a database to be //! selected, it will try to connect to the default database. The //! connection may fail however, for a variety of reasons; in this case @@ -156,7 +125,7 @@ protected string _sprintf(int type) { //! Send queries to and retrieve results from the database using text //! instead of the, generally more efficient, default native binary method. //! Turning this on will allow multiple statements per query separated -//! by semicolons. +//! by semicolons (not recommended). //! @member int "sync_parse" //! Set it to zero to turn synchronous parsing off for statements. //! Setting this to off can cause surprises because statements could @@ -184,50 +153,32 @@ protected string _sprintf(int type) { //! For the numerous other options please check the PostgreSQL manual. //! //! @note -//! You need to have a database selected before using the sql-object, +//! You need to have a database selected before using the SQL-object, //! otherwise you'll get exceptions when you try to query it. Also //! notice that this function @b{can@} raise exceptions if the db //! server doesn't respond, if the database doesn't exist or is not //! accessible to you. //! +//! @note +//! It is possible that the exception from a failed connect +//! will not be triggered on this call (because the connect +//! proceeds asynchronously in the background), but on the first +//! attempt to actually use the database instead. +//! //! @seealso //! @[Postgres.postgres], @[Sql.Sql], @[select_db()], -//! @url{http://www.postgresql.org/search/?u=%2Fdocs%2Fcurrent%2F&q=client+connection+search_path@} +//! @url{https://www.postgresql.org/docs/current/static/runtime-config-client.html@} protected void create(void|string host, void|string database, void|string user, void|string pass, void|mapping(string:mixed) options) { - this::pass = pass && pass != "" ? Standards.IDNA.to_ascii(pass) : pass; + string spass = pass && pass != "" ? Standards.IDNA.to_ascii(pass) : pass; if(pass) { String.secure(pass); pass = "CENSORED"; } - this::user = user && user != "" ? Standards.IDNA.to_ascii(user, 1) : user; - this::database = database; - _options = options || ([]); - - if(!host) host = PGSQL_DEFAULT_HOST; - if(has_value(host,":") && sscanf(host,"%s:%d",host,_port)!=2) - ERROR("Error in parsing the hostname argument\n"); - this::_host = host; - - if(!_port) - _port = PGSQL_DEFAULT_PORT; - backendreg = .pgsql_util.register_backend(); - _shortmux=Thread.Mutex(); - PD("Connect\n"); - waitforauthready = Thread.Condition(); - qportals=Thread.Queue(); - _readyforquerycount = 1; - qportals->write(1); - if (!(c = .pgsql_util.conxion(this, qportals, 0))) - ERROR("Couldn't connect to database on %s:%d\n",_host,_port); - _runtimeparameter = ([]); - _unnamedportalmux = Thread.Mutex(); - unnamedstatement = Thread.Mutex(); - readyforquery_cb = connect_cb; - _portalsinflight = Thread.ResourceCount(); - _statementsinflight = Thread.ResourceCount(); - _wasparallelisable = 0; + proxy = .pgsql_util.proxy(host, database, + user && user != "" ? Standards.IDNA.to_ascii(user, 1) : user, + spass, options || ([])); } //! @returns @@ -251,21 +202,16 @@ protected void create(void|string host, void|string database, //! @[big_query()] /*semi*/final string error(void|int clear) { throwdelayederror(this); - string s=lastmessage*"\n"; - if(clear) - lastmessage=({}); - warningscollected=0; - return sizeof(s) && s; + return proxy.geterror(clear); } //! This function returns a string describing what host are we talking to, -//! and how (TCP/IP or UNIX sockets). +//! and how (TCP/IP or UNIX socket). //! //! @seealso //! @[server_info()] /*semi*/final string host_info() { - return sprintf("fd:%d TCP/IP %s:%d PID %d", - c?c->socket->query_fd():-1,_host,_port,backendpid); + return proxy.host_info(); } //! Returns true if the connection seems to be open. @@ -276,15 +222,12 @@ protected void create(void|string host, void|string database, //! sent over the connection. //! //! For a more reliable check of whether the connection -//! is alive, please use @[ping()]. +//! is alive, please use @[ping()] instead. //! //! @seealso //! @[ping()] /*semi*/final int is_open() { - catch { - return c->socket->is_open(); - }; - return 0; + return proxy.is_open(); } //! Check whether the connection is alive. @@ -302,7 +245,7 @@ protected void create(void|string host, void|string database, //! @[is_open()] /*semi*/final int ping() { waitauthready(); - return is_open() && !catch(c->start()->sendcmd(FLUSHSEND)) ? 0 : -1; + return is_open() && !catch(proxy.c->start()->sendcmd(FLUSHSEND)) ? 0 : -1; } //! Cancels all currently running queries in this session. @@ -313,14 +256,7 @@ protected void create(void|string host, void|string database, //! @note //! This function is PostgreSQL-specific. /*semi*/final void cancelquery() { - PD("CancelRequest\n"); - .pgsql_util.conxion lcon = .pgsql_util.conxion(this, 0, 2); - lcon->add_int32(16)->add_int32(PG_PROTOCOL(1234,5678)) - ->add_int32(backendpid)->add(cancelsecret)->sendcmd(FLUSHSEND); - destruct(lcon); // Destruct explicitly to avoid delayed close -#ifdef PG_DEBUGMORE - PD("Closetrace %O\n",backtrace()); -#endif + proxy.cancelquery(); } //! Changes the connection charset. When set to @expr{"UTF8"@}, the query, @@ -331,10 +267,10 @@ protected void create(void|string host, void|string database, //! //! @seealso //! @[get_charset()], @[create()], -//! @url{http://www.postgresql.org/search/?u=%2Fdocs%2Fcurrent%2F&q=character+sets@} +//! @url{https://www.postgresql.org/docs/current/static/multibyte.html@} /*semi*/final void set_charset(string charset) { if(charset) - big_query(sprintf("SET CLIENT_ENCODING TO '%s'",quote(charset))); + big_query(sprintf("SET CLIENT_ENCODING TO '%s'", quote(charset))); } //! @returns @@ -342,10 +278,9 @@ protected void create(void|string host, void|string database, //! //! @seealso //! @[set_charset()], @[getruntimeparameters()], -//! @url{http://www.postgresql.org/search/?u=%2Fdocs%2Fcurrent%2F&q=character+sets@} +//! @url{https://www.postgresql.org/docs/current/static/multibyte.html@} /*semi*/final string get_charset() { - waitauthready(); - return _runtimeparameter[CLIENT_ENCODING]; + return getruntimeparameters()[CLIENT_ENCODING]; } //! @returns @@ -382,13 +317,13 @@ protected void create(void|string host, void|string database, //! For other runtimeparameters check the PostgreSQL documentation. //! //! @seealso -//! @url{http://www.postgresql.org/search/?u=%2Fdocs%2Fcurrent%2F&q=client+connection+search_path@} +//! @url{https://www.postgresql.org/docs/current/static/runtime-config-client.html@} //! //! @note //! This function is PostgreSQL-specific. /*semi*/final mapping(string:string) getruntimeparameters() { waitauthready(); - return _runtimeparameter+([]); + return proxy.runtimeparameter + ([]); } //! @returns @@ -428,18 +363,18 @@ protected void create(void|string host, void|string database, //! @note //! This function is PostgreSQL-specific. /*semi*/final mapping(string:mixed) getstatistics() { - mapping(string:mixed) stats=([ - "warnings_dropped":warningsdropcount, - "current_prepared_statements":sizeof(_prepareds), - "current_prepared_statement_hits":totalhits, + mapping(string:mixed) stats = ([ + "warnings_dropped":proxy.warningsdropcount, + "current_prepared_statements":sizeof(proxy.prepareds), + "current_prepared_statement_hits":proxy.totalhits, "prepared_statement_count":pstmtcount, #ifdef PG_STATS "used_prepared_statements":prepstmtused, "skipped_describe_count":skippeddescribe, "portals_opened_count":portalsopened, #endif - "messages_received":_msgsreceived, - "bytes_received":_bytesreceived, + "messages_received":proxy.msgsreceived, + "bytes_received":proxy.bytesreceived, ]); return stats; } @@ -453,9 +388,9 @@ protected void create(void|string host, void|string database, //! @note //! This function is PostgreSQL-specific. /*semi*/final int setcachedepth(void|int newdepth) { - int olddepth=cachedepth; - if(!undefinedp(newdepth) && newdepth>=0) - cachedepth=newdepth; + int olddepth = cachedepth; + if (!undefinedp(newdepth) && newdepth >= 0) + cachedepth = newdepth; return olddepth; } @@ -468,9 +403,9 @@ protected void create(void|string host, void|string database, //! @note //! This function is PostgreSQL-specific. /*semi*/final int settimeout(void|int newtimeout) { - int oldtimeout=timeout; - if(!undefinedp(newtimeout) && newtimeout>0) - timeout=newtimeout; + int oldtimeout = timeout; + if (!undefinedp(newtimeout) && newtimeout > 0) + timeout = newtimeout; return oldtimeout; } @@ -483,9 +418,9 @@ protected void create(void|string host, void|string database, //! @note //! This function is PostgreSQL-specific. /*semi*/final int setportalbuffersize(void|int newportalbuffersize) { - int oldportalbuffersize=portalbuffersize; - if(!undefinedp(newportalbuffersize) && newportalbuffersize>0) - portalbuffersize=newportalbuffersize; + int oldportalbuffersize = portalbuffersize; + if (!undefinedp(newportalbuffersize) && newportalbuffersize>0) + portalbuffersize = newportalbuffersize; return oldportalbuffersize; } @@ -498,792 +433,29 @@ protected void create(void|string host, void|string database, //! @note //! This function is PostgreSQL-specific. /*semi*/final int setfetchlimit(void|int newfetchlimit) { - int oldfetchlimit=_fetchlimit; - if(!undefinedp(newfetchlimit) && newfetchlimit>=0) - _fetchlimit=newfetchlimit; + int oldfetchlimit = proxy._fetchlimit; + if (!undefinedp(newfetchlimit) && newfetchlimit >= 0) + proxy._fetchlimit = newfetchlimit; return oldfetchlimit; } private string glob2reg(string glob) { - if(!glob||!sizeof(glob)) + if (!glob || !sizeof(glob)) return "%"; - return replace(glob,({"*","?","\\","%","_"}),({"%","_","\\\\","\\%","\\_"})); -} - -private string a2nls(array(string) msg) { - return msg*"\n"+"\n"; -} - -private string pinpointerror(void|string query,void|string offset) { - if(!query) - return ""; - int k=(int)offset; - if(k<=0) - return MARKSTART+query+MARKEND; - return MARKSTART+(k>1?query[..k-2]:"")+MARKERROR+query[k-1..]+MARKEND; -} - -private void connect_cb() { - PD("%O\n",_runtimeparameter); -} - -private array(string) showbindings(.pgsql_util.sql_result portal) { - array(string) msgs=({}); - array from; - if(portal && (from = portal._params)) { - array to,paramValues; - [from,to,paramValues] = from; - if(sizeof(paramValues)) { - string val; - int i; - string fmt=sprintf("%%%ds %%3s %%.61s",max(@map(from,sizeof))); - foreach(paramValues;i;val) - msgs+=({sprintf(fmt,from[i],to[i],sprintf("%O",val))}); - } - } - return msgs; -} - -private void preplastmessage(mapping(string:string) msgresponse) { - lastmessage=({ - sprintf("%s %s:%s %s\n (%s:%s:%s)", - msgresponse.S,msgresponse.C,msgresponse.P||"", - msgresponse.M,msgresponse.F||"",msgresponse.R||"", - msgresponse.L||"")}); + return replace(glob, ({"*", "?", "\\", "%", "_"}), + ({"%", "_", "\\\\", "\\%", "\\_"})); } private void waitauthready() { - if(waitforauthready) { + if (proxy.waitforauthready) { PD("%d Wait for auth ready %O\n", - c?->socket&&c->socket->query_fd(),backtrace()[-2]); - Thread.MutexKey lock=_shortmux->lock(); - catch(PT(waitforauthready->wait(lock))); - lock=0; - PD("%d Wait for auth ready released.\n",c?->socket&&c->socket->query_fd()); - } -} - -private inline mixed callout(function(mixed ...:void) f, - float|int delay,mixed ... args) { - return .pgsql_util.local_backend->call_out(f,delay,@args); -} - -private int|.pgsql_util.sql_result portal; // state information procmessage -#ifdef PG_DEBUG -private string datarowdebug; -private int datarowdebugcount; -#endif - -final void _processloop(.pgsql_util.conxion ci) { - (c=ci)->socket->set_id(procmessage); - cancelsecret=0; - portal=0; - { - Stdio.Buffer plugbuffer=Stdio.Buffer()->add_int32(PG_PROTOCOL(3,0)); - if(user) - plugbuffer->add("user\0",user,0); - if(database) - plugbuffer->add("database\0",database,0); - foreach(_options-.pgsql_util.censoroptions; string name; mixed value) - plugbuffer->add(name,0,(string)value,0); - plugbuffer->add_int8(0); - PD("%O\n",(string)plugbuffer); - object cs; - if (catch(cs = ci->start())) { - destruct(waitforauthready); - unnamedstatement=0; - termlock=0; - return; - } else { - CHAIN(cs)->add_hstring(plugbuffer, 4, 4); - cs->sendcmd(SENDOUT); - } - } // Do not flush at this point, PostgreSQL 9.4 disapproves - procmessage(); -} - -private void procmessage() { - mixed err; - int terminating=0; - err = catch { - .pgsql_util.conxion ci=c; // cache value FIXME sensible? - .pgsql_util.conxiin cr=ci->i; // cache value FIXME sensible? -#ifdef PG_DEBUG - PD("Processloop\n"); - -#ifdef PG_DEBUGMORE - void showportalstack(string label) { - PD(sprintf(">>>>>>>>>>> Portalstack %s: %O\n", label, portal)); - foreach(qportals->peek_array();;int|.pgsql_util.sql_result qp) - PD(" =========== Portal: %O\n",qp); - PD("<<<<<<<<<<<<<< Portalstack end\n"); - }; -#endif - void showportal(int msgtype) { - if(objectp(portal)) - PD("%d<%O %d %c switch portal\n", - ci->socket->query_fd(),portal._portalname,++ci->queueinidx,msgtype); - else if(portal>0) - PD("%d<Sync %d %d %c portal\n", - ci->socket->query_fd(),++ci->queueinidx,portal,msgtype); - }; -#endif - int msgisfatal(mapping(string:string) msgresponse) { - if (!terminating) // Run the callback once per lost connection - runcallback(backendpid,"_lost",""); - return (has_prefix(msgresponse.C, "53") - || has_prefix(msgresponse.C, "3D") - || has_prefix(msgresponse.C, "57P")) && MAGICTERMINATE; - }; - for(;;) { - err=catch { -#ifdef PG_DEBUG - if(!portal && datarowdebug) { - PD("%s rows %d\n",datarowdebug,datarowdebugcount); - datarowdebug=0; datarowdebugcount=0; - } -#endif -#ifdef PG_DEBUGMORE - showportalstack("LOOPTOP"); -#endif - if(!sizeof(cr)) { // Preliminary check, fast path - Thread.MutexKey lock=cr->fillreadmux->lock(); - if(!sizeof(cr)) { // Check for real - if(!cr->fillread) { - lock=0; - throw(MAGICTERMINATE); // Force proper termination - } - cr->procmsg=1; - lock=0; - return; // Terminate thread, wait for callback - } - lock=0; - } - int msgtype=cr->read_int8(); - if(!portal) { - portal=qportals->try_read(); -#ifdef PG_DEBUG - showportal(msgtype); -#endif - } - int msglen=cr->read_int32(); - _msgsreceived++; - _bytesreceived+=1+msglen; - int errtype=NOERROR; - PD("%d<",ci->socket->query_fd()); - switch(msgtype) { - array getcols() { - int bintext=cr->read_int8(); - int cols=cr->read_int16(); -#ifdef PG_DEBUG - array a; - msglen-=4+1+2+2*cols; - foreach(a=allocate(cols,([]));;mapping m) - m.type=cr->read_int16(); -#else - cr->consume(cols<<1); -#endif // Discard column info, and make it line oriented - return ({ ({(["name":"line"])}), ({bintext?BYTEAOID:TEXTOID}) }); - }; - array(string) reads() { -#ifdef PG_DEBUG - if(msglen<1) - errtype=PROTOCOLERROR; -#endif - array ret=({}),aw=({0}); - do { - string w=cr->read_cstring(); - msglen-=sizeof(w)+1; aw[0]=w; ret+=aw; - } while(msglen); - return ret; - }; - mapping(string:string) getresponse() { - mapping(string:string) msgresponse=([]); - msglen-=4; - foreach(reads();;string f) - if(sizeof(f)) - msgresponse[f[..0]]=f[1..]; - PD("%O\n",msgresponse); - return msgresponse; - }; - case 'R': { - void authresponse(string|array msg) { - object cs = ci->start(); - CHAIN(cs)->add_int8('p')->add_hstring(msg, 4, 4); - cs->sendcmd(SENDOUT); // No flushing, PostgreSQL 9.4 disapproves - }; - PD("Authentication "); - msglen-=4+4; - int authtype, k; - switch(authtype = cr->read_int32()) { - case 0: - PD("Ok\n"); - if (SASLcontext) { - PD("Authentication validation still in progress\n"); - errtype = PROTOCOLUNSUPPORTED; - } else - cancelsecret=""; - break; - case 2: - PD("KerberosV5\n"); - errtype=PROTOCOLUNSUPPORTED; - break; - case 3: - PD("ClearTextPassword\n"); - authresponse(({pass, 0})); - break; - case 4: - PD("CryptPassword\n"); - errtype=PROTOCOLUNSUPPORTED; - break; - case 5: - PD("MD5Password\n"); -#ifdef PG_DEBUG - if(msglen<4) - errtype=PROTOCOLERROR; -#endif -#define md5hex(x) String.string2hex(Crypto.MD5.hash(x)) - authresponse(({"md5", - md5hex(md5hex(pass + user) + cr->read(msglen)), 0})); -#ifdef PG_DEBUG - msglen=0; -#endif - break; - case 6: - PD("SCMCredential\n"); - errtype=PROTOCOLUNSUPPORTED; - break; - case 7: - PD("GSS\n"); - errtype=PROTOCOLUNSUPPORTED; - break; - case 9: - PD("SSPI\n"); - errtype=PROTOCOLUNSUPPORTED; - break; - case 8: - PD("GSSContinue\n"); - errtype=PROTOCOLUNSUPPORTED; - cr->read(msglen); // SSauthdata -#ifdef PG_DEBUG - if(msglen<1) - errtype=PROTOCOLERROR; - msglen=0; -#endif - break; - case 10: { - string word; - PD("AuthenticationSASL\n"); - k = 0; - while (sizeof(word = cr->read_cstring())) { - switch (word) { - case "SCRAM-SHA-256": - k = 1; - } -#ifdef PG_DEBUG - msglen -= sizeof(word) + 1; - if (msglen < 1) - break; -#endif - } - if (k) { - SASLcontext = Crypto.SCRAM(Crypto.SHA256); - word = SASLcontext.client_1(); - authresponse(({ - "SCRAM-SHA-256", 0, sprintf("%4c", sizeof(word)), word - })); - } else - errtype = PROTOCOLUNSUPPORTED; -#ifdef PG_DEBUG - if(msglen != 1) - errtype=PROTOCOLERROR; - msglen=0; -#endif - break; - } - case 11: { - PD("AuthenticationSASLContinue\n"); - string response; - if (response - = SASLcontext.client_2(cr->read(msglen), pass)) - authresponse(response); - else - errtype = PROTOCOLERROR; -#ifdef PG_DEBUG - msglen = 0; -#endif - break; - } - case 12: - PD("AuthenticationSASLFinal\n"); - if (SASLcontext.client_3(cr->read(msglen))) - SASLcontext = 0; // Clears context and approves server - else - errtype = PROTOCOLERROR; -#ifdef PG_DEBUG - msglen=0; -#endif - break; - default: - PD("Unknown Authentication Method %c\n",authtype); - errtype=PROTOCOLUNSUPPORTED; - break; - } - switch(errtype) { - default: - case PROTOCOLUNSUPPORTED: - ERROR("Unsupported authenticationmethod %c\n",authtype); - case NOERROR: - break; - } - break; - } - case 'K': - msglen-=4+4;backendpid=cr->read_int32(); - cancelsecret=cr->read(msglen); -#ifdef PG_DEBUG - PD("BackendKeyData %O\n",cancelsecret); - msglen=0; -#endif - break; - case 'S': { - PD("ParameterStatus "); - msglen-=4; - array(string) ts=reads(); -#ifdef PG_DEBUG - if(sizeof(ts)==2) { -#endif - _runtimeparameter[ts[0]]=ts[1]; -#ifdef PG_DEBUG - PD("%O=%O\n",ts[0],ts[1]); - } else - errtype=PROTOCOLERROR; -#endif - break; - } - case '3': -#ifdef PG_DEBUG - PD("CloseComplete\n"); - msglen-=4; -#endif - break; - case 'Z': { - backendstatus=cr->read_int8(); -#ifdef PG_DEBUG - msglen-=4+1; - PD("ReadyForQuery %c\n",backendstatus); -#endif -#ifdef PG_DEBUGMORE - showportalstack("READYFORQUERY"); -#endif - int keeplooking; - do - for(keeplooking = 0; objectp(portal); portal = qportals->read()) { -#ifdef PG_DEBUG - showportal(msgtype); -#endif - if (backendstatus == 'I' && _intransaction - && portal->transtype != TRANSEND) - keeplooking = 1; - portal->_purgeportal(); - } - while (keeplooking && (portal = qportals->read())); - if (backendstatus == 'I') - _intransaction = 0; - foreach(qportals->peek_array();;.pgsql_util.sql_result qp) { - if(objectp(qp) && qp._synctransact && qp._synctransact<=portal) { - PD("Checking portal %O %d<=%d\n", - qp._portalname,qp._synctransact,portal); - qp->_purgeportal(); - } - } - portal=0; -#ifdef PG_DEBUGMORE - showportalstack("AFTER READYFORQUERY"); -#endif - _readyforquerycount--; - if(readyforquery_cb) - readyforquery_cb(),readyforquery_cb=0; - destruct(waitforauthready); - break; - } - case '1': -#ifdef PG_DEBUG - PD("ParseComplete portal %O\n", portal); - msglen-=4; -#endif - break; - case 't': { - array a; -#ifdef PG_DEBUG - int cols=cr->read_int16(); - PD("%O ParameterDescription %d values\n",portal._query,cols); - msglen-=4+2+4*cols; - a=cr->read_ints(cols,4); -#else - a=cr->read_ints(cr->read_int16(),4); -#endif -#ifdef PG_DEBUGMORE - PD("%O\n",a); -#endif - if(portal._tprepared) - portal._tprepared.datatypeoid=a; - Thread.Thread(portal->_preparebind,a); - break; - } - case 'T': { - array a,at; - int cols=cr->read_int16(); -#ifdef PG_DEBUG - PD("RowDescription %d columns %O\n",cols,portal._query); - msglen-=4+2; -#endif - at=allocate(cols); - foreach(a=allocate(cols);int i;) - { - string s=cr->read_cstring(); - mapping(string:mixed) res=(["name":s]); -#ifdef PG_DEBUG - msglen-=sizeof(s)+1+4+2+4+2+4+2; - res.tableoid=cr->read_int32()||UNDEFINED; - res.tablecolattr=cr->read_int16()||UNDEFINED; -#else - cr->consume(6); -#endif - at[i]=cr->read_int32(); -#ifdef PG_DEBUG - res.type=at[i]; - { - int len=cr->read_sint(2); - res.length=len>=0?len:"variable"; - } - res.atttypmod=cr->read_int32(); - /* formatcode contains just a zero when Bind has not been issued - * yet, but the content is irrelevant because it's determined - * at query time - */ - res.formatcode=cr->read_int16(); -#else - cr->consume(8); -#endif - a[i]=res; - } -#ifdef PG_DEBUGMORE - PD("%O\n",a); -#endif - if(portal._forcetext) - portal->_setrowdesc(a,at); // Do not consume queued portal - else { - portal->_processrowdesc(a,at); - portal=0; - } - break; - } - case 'n': { -#ifdef PG_DEBUG - msglen-=4; - PD("NoData %O\n",portal._query); -#endif - portal._fetchlimit=0; // disables subsequent Executes - portal - ->_processrowdesc(.pgsql_util.emptyarray,.pgsql_util.emptyarray); - portal=0; - break; - } - case 'H': - portal->_processrowdesc(@getcols()); - PD("CopyOutResponse %O\n",portal._query); - break; - case '2': { - mapping tp; -#ifdef PG_DEBUG - msglen-=4; - PD("%O BindComplete\n",portal._portalname); -#endif - if (tp=portal._tprepared) { - int tend = gethrtime(); - int tstart = tp.trun; - if (tend == tstart) - m_delete(_prepareds,portal._query); - else { - tp.hits++; - totalhits++; - if (!tp.preparedname) { - if (sizeof(portal._preparedname)) { - PD("Finalising stored statement %s\n", portal._preparedname); - tp.preparedname = portal._preparedname; - } - tstart = tend - tstart; - if (!tp.tparse || tp.tparse>tstart) - tp.tparse = tstart; - } - tp.trunstart = tend; - } - } - break; - } - case 'D': - msglen-=4; -#ifdef PG_DEBUG -#ifdef PG_DEBUGMORE - PD("%O DataRow %d bytes\n",portal._portalname,msglen); -#endif - datarowdebugcount++; - if (!datarowdebug) - datarowdebug = sprintf( - "%O DataRow %d bytes",portal._portalname,msglen); -#endif -#ifdef PG_DEBUG - msglen= -#endif - portal->_decodedata(msglen,_runtimeparameter[CLIENT_ENCODING]); - break; - case 's': -#ifdef PG_DEBUG - PD("%O PortalSuspended\n",portal._portalname); - msglen-=4; -#endif - portal=0; - break; - case 'C': { - msglen-=4; -#ifdef PG_DEBUG - if(msglen<1) - errtype=PROTOCOLERROR; -#endif - string s=cr->read(msglen-1); - portal->_storetiming(); - PD("%O CommandComplete %O\n",portal._portalname,s); -#ifdef PG_DEBUG - if(cr->read_int8()) - errtype=PROTOCOLERROR; - msglen=0; -#else - cr->consume(1); -#endif -#ifdef PG_DEBUGMORE - showportalstack("COMMANDCOMPLETE"); -#endif - portal->_releasesession(s); - portal=0; - break; - } - case 'I': -#ifdef PG_DEBUG - PD("EmptyQueryResponse %O\n",portal._portalname); - msglen-=4; -#endif -#ifdef PG_DEBUGMORE - showportalstack("EMPTYQUERYRESPONSE"); -#endif - portal->_releasesession(); - portal=0; - break; - case 'd': - PD("%O CopyData\n",portal._portalname); - portal->_storetiming(); - msglen-=4; -#ifdef PG_DEBUG - if(msglen<0) - errtype=PROTOCOLERROR; -#endif - portal->_processdataready(({cr->read(msglen)}),msglen); -#ifdef PG_DEBUG - msglen=0; -#endif - break; - case 'G': - portal->_releasestatement(); - portal->_setrowdesc(@getcols()); - PD("%O CopyInResponse\n",portal._portalname); - portal._state=COPYINPROGRESS; - break; - case 'c': -#ifdef PG_DEBUG - PD("%O CopyDone\n",portal._portalname); - msglen-=4; -#endif - portal=0; - break; - case 'E': { -#ifdef PG_DEBUGMORE - showportalstack("ERRORRESPONSE"); -#endif - if (_portalsinflight->drained() && !_readyforquerycount) - sendsync(); - PD("%O ErrorResponse %O\n", - objectp(portal)&&(portal._portalname||portal._preparedname), - objectp(portal)&&portal._query); - mapping(string:string) msgresponse; - msgresponse=getresponse(); - warningsdropcount+=warningscollected; - warningscollected=0; - switch(msgresponse.C) { - case "P0001": - lastmessage=({sprintf("%s: %s",msgresponse.S,msgresponse.M)}); - USERERROR(a2nls(lastmessage - +({pinpointerror(portal._query,msgresponse.P)}) - +showbindings(portal))); - case "53000":case "53100":case "53200":case "53300":case "53400": - case "57P01":case "57P02":case "57P03":case "57P04":case "3D000": - preplastmessage(msgresponse); - PD(a2nls(lastmessage)); throw(msgisfatal(msgresponse)); - case "08P01":case "42P05": - errtype=PROTOCOLERROR; - case "XX000":case "42883":case "42P01": - invalidatecache=1; - default: - preplastmessage(msgresponse); - if(msgresponse.D) - lastmessage+=({msgresponse.D}); - if(msgresponse.H) - lastmessage+=({msgresponse.H}); - lastmessage+=({ - pinpointerror(objectp(portal)&&portal._query,msgresponse.P)+ - pinpointerror(msgresponse.q,msgresponse.p)}); - if(msgresponse.W) - lastmessage+=({msgresponse.W}); - if(objectp(portal)) - lastmessage+=showbindings(portal); - switch(msgresponse.S) { - case "PANIC":werror(a2nls(lastmessage)); - } - case "25P02": // Preserve last error message - USERERROR(a2nls(lastmessage)); // Implicitly closed portal - } - break; - } - case 'N': { - PD("NoticeResponse\n"); - mapping(string:string) msgresponse; - msgresponse=getresponse(); - if(clearmessage) { - warningsdropcount+=warningscollected; - clearmessage=warningscollected=0; - lastmessage=({}); - } - warningscollected++; - lastmessage=({sprintf("%s %s: %s", - msgresponse.S,msgresponse.C,msgresponse.M)}); - int val; - if (val = msgisfatal(msgresponse)) { // Some warnings are fatal - preplastmessage(msgresponse); - PD(a2nls(lastmessage));throw(val); - } - break; - } - case 'A': { - PD("NotificationResponse\n"); - msglen-=4+4; - int pid=cr->read_int32(); - string condition,extrainfo=UNDEFINED; - { - array(string) ts=reads(); - switch(sizeof(ts)) { -#if PG_DEBUG - case 0: - errtype=PROTOCOLERROR; - break; - default: - errtype=PROTOCOLERROR; -#endif - case 2: - extrainfo=ts[1]; - case 1: - condition=ts[0]; - } - } - PD("%d %s\n%s\n",pid,condition,extrainfo); - runcallback(pid,condition,extrainfo); - break; - } - default: - if(msgtype!=-1) { - string s; - PD("Unknown message received %c\n",msgtype); - s=cr->read(msglen-=4);PD("%O\n",s); -#ifdef PG_DEBUG - msglen=0; -#endif - errtype=PROTOCOLUNSUPPORTED; - } else { - lastmessage+=({ - sprintf("Connection lost to database %s@%s:%d/%s %d\n", - user,_host,_port,database,backendpid)}); - runcallback(backendpid, "_lost", ""); - if(!waitforauthready) - throw(0); - USERERROR(a2nls(lastmessage)); - } - break; - } -#ifdef PG_DEBUG - if(msglen) - errtype=PROTOCOLERROR; -#endif - { - string msg; - switch(errtype) { - case PROTOCOLUNSUPPORTED: - msg=sprintf("Unsupported servermessage received %c\n",msgtype); - break; - case PROTOCOLERROR: - msg=sprintf("Protocol error with database %s",host_info()); - break; - case NOERROR: - continue; // Normal production loop - } - ERROR(a2nls(lastmessage+=({msg}))); - } - }; // We only get here if there is an error - if(err==MAGICTERMINATE) { // Announce connection termination to server - catch { - object cs = ci->start(); - CHAIN(cs)->add("X\0\0\0\4"); - cs->sendcmd(SENDOUT); - }; - terminating=1; - err=0; - } else if(stringp(err)) { - .pgsql_util.sql_result or; - if(!objectp(or=portal)) - or=this; - if(!or._delayederror) - or._delayederror=err; -#ifdef PG_DEBUGMORE - showportalstack("THROWN"); -#endif - if(objectp(portal)) - portal->_releasesession(); - portal=0; - if(!waitforauthready) - continue; // Only continue if authentication did not fail - } - break; + proxy.c?->socket && proxy.c->socket->query_fd(), backtrace()[-2]); + Thread.MutexKey lock = proxy.shortmux->lock(); + catch(PT(proxy.waitforauthready->wait(lock))); + lock = 0; + PD("%d Wait for auth ready released.\n", + proxy.c?->socket && proxy.c->socket->query_fd()); } - PD("Closing database processloop %s\n", err ? describe_backtrace(err) : ""); - _delayederror=err; - if (objectp(portal)) { -#ifdef PG_DEBUG - showportal(0); -#endif - portal->_purgeportal(); - } - destruct(waitforauthready); - termlock=0; - if(err && !stringp(err)) - throw(err); - }; - catch { - unnamedstatement = 0; - termlock = 0; - }; - if (err) { - PD("Terminating processloop due to %s\n", describe_backtrace(err)); - _delayederror = err; - } - destruct(waitforauthready); - destruct(c); } //! Closes the connection to the database, any running queries are @@ -1292,37 +464,11 @@ private void procmessage() { //! @note //! This function is PostgreSQL-specific. /*semi*/final void close() { - throwdelayederror(this); - Thread.MutexKey lock; - if (qportals && qportals->size()) - catch(cancelquery()); - if (unnamedstatement) - termlock = unnamedstatement->lock(1); - if (c) // Prevent trivial backtraces - c->close(); - if (unnamedstatement) - lock = unnamedstatement->lock(1); - if (c) - destruct(c); - lock = 0; - destruct(waitforauthready); + proxy.close(); } protected void destroy() { - string errstring; - mixed err = catch(close()); - backendreg = 0; - /* - * Flush out any asynchronously reported errors to stderr; because we are - * inside a destructor, throwing an error will not work anymore. - * Warnings will be silently discarded at this point. - */ - lastmessage = filter(lastmessage, lambda(string val) { - return has_prefix(val, "ERROR ") || has_prefix(val, "FATAL "); }); - if (err || (err = catch(errstring = error(1)))) - werror(describe_backtrace(err)); - else if (errstring && sizeof(errstring)) - werror("%s\n", errstring); // Add missing terminating newline + destruct(proxy); } //! For PostgreSQL this function performs the same function as @[resync()]. @@ -1341,9 +487,9 @@ private void reset_dbsession() { } private void resync_cb() { - switch(backendstatus) { + switch (proxy.backendstatus) { case 'T':case 'E': - foreach(_prepareds;;mapping tp) { + foreach (proxy.prepareds; ; mapping tp) { m_delete(tp,"datatypeoid"); m_delete(tp,"datarowdesc"); m_delete(tp,"datarowtypes"); @@ -1352,11 +498,6 @@ private void resync_cb() { } } -private void sendsync() { - _readyforquerycount++; - c->start()->sendcmd(SYNCSEND); -} - //! Resyncs the database session; typically used to make sure the session is //! not still in a dangling transaction. //! @@ -1378,20 +519,20 @@ private void sendsync() { //! This function is PostgreSQL-specific. /*semi*/final void resync() { mixed err; - if(is_open()) { + if (is_open()) { err = catch { PD("Statementsinflight: %d Portalsinflight: %d\n", - _statementsinflight, _portalsinflight); - if(!waitforauthready) { - readyforquery_cb=resync_cb; - sendsync(); + proxy.statementsinflight, proxy.portalsinflight); + if(!proxy.waitforauthready) { + proxy.readyforquery_cb = resync_cb; + proxy.sendsync(); } return; }; - PD("%O\n",err); + PD("%O\n", err); } - if(sizeof(lastmessage)) - ERROR(a2nls(lastmessage)); + if (sizeof(proxy.lastmessage)) + ERROR(proxy.a2nls(proxy.lastmessage)); } //! Due to restrictions of the Postgres frontend-backend protocol, you always @@ -1403,10 +544,10 @@ private void sendsync() { //! @seealso //! @[create()] /*semi*/final void select_db(string dbname) { - if (database != dbname) + if (proxy.database != dbname) ERROR("Cannot switch databases from %O to %O" " in an already established connection\n", - database, dbname); + proxy.database, dbname); } //! With PostgreSQL you can LISTEN to NOTIFY events. @@ -1443,29 +584,22 @@ private void sendsync() { //! @note //! This function is PostgreSQL-specific. /*semi*/final void set_notify_callback(string condition, - void|function(int,string,string,mixed ...:void) notify_cb,void|int selfnotify, + void|function(int,string,string,mixed ...:void) notify_cb, void|int selfnotify, mixed ... args) { - if(!notify_cb) - m_delete(notifylist,condition); + if (!notify_cb) + m_delete(proxy.notifylist, condition); else { - array old=notifylist[condition]; - if(!old) - old=({notify_cb}); - if(selfnotify||args) - old+=({selfnotify}); - if(args) - old+=args; - notifylist[condition]=old; + array old = proxy.notifylist[condition]; + if (!old) + old = ({notify_cb}); + if (selfnotify || args) + old += ({selfnotify}); + if (args) + old += args; + proxy.notifylist[condition] = old; } } -private void runcallback(int pid,string condition,string extrainfo) { - array cb; - if((cb=notifylist[condition]||notifylist[""]) - && (pid!=backendpid || sizeof(cb)>1 && cb[1])) - callout(cb[0],0,pid,condition,extrainfo,@cb[2..]); -} - //! @returns //! The given string, but escapes/quotes all contained magic characters //! according to the quoting rules of the current session for non-binary @@ -1478,8 +612,8 @@ private void runcallback(int pid,string condition,string extrainfo) { //! @[big_query()], @[quotebinary()], @[create()] /*semi*/final string quote(string s) { waitauthready(); - string r=_runtimeparameter.standard_conforming_strings; - if(r && r=="on") + string r = proxy.runtimeparameter.standard_conforming_strings; + if (r && r == "on") return replace(s, "'", "''"); return replace(s, ({ "'", "\\" }), ({ "''", "\\\\" }) ); } @@ -1509,7 +643,7 @@ private void runcallback(int pid,string condition,string extrainfo) { //! @seealso //! @[drop_db()] /*semi*/final void create_db(string db) { - big_query(sprintf("CREATE DATABASE %s",db)); + big_query(sprintf("CREATE DATABASE %s", db)); } //! This function destroys a database and all the data it contains (assuming @@ -1523,7 +657,7 @@ private void runcallback(int pid,string condition,string extrainfo) { //! @seealso //! @[create_db()] /*semi*/final void drop_db(string db) { - big_query(sprintf("DROP DATABASE %s",db)); + big_query(sprintf("DROP DATABASE %s", db)); } //! @returns @@ -1536,7 +670,7 @@ private void runcallback(int pid,string condition,string extrainfo) { //! @[host_info()] /*semi*/final string server_info () { waitauthready(); - return DRIVERNAME"/"+(_runtimeparameter.server_version||"unknown"); + return DRIVERNAME"/" + (proxy.runtimeparameter.server_version || "unknown"); } //! @returns @@ -1545,14 +679,14 @@ private void runcallback(int pid,string condition,string extrainfo) { //! @param glob //! If specified, list only those databases matching it. /*semi*/final array(string) list_dbs (void|string glob) { - array row,ret=({}); + array row, ret = .pgsql_util.emptyarray; .pgsql_util.sql_result res=big_query("SELECT d.datname " "FROM pg_database d " "WHERE d.datname ILIKE :glob " "ORDER BY d.datname", ([":glob":glob2reg(glob)])); - while(row=res->fetch_row()) - ret+=({row[0]}); + while(row = res->fetch_row()) + ret += ({row[0]}); return ret; } @@ -1563,8 +697,9 @@ private void runcallback(int pid,string condition,string extrainfo) { //! @param glob //! If specified, list only the tables with matching names. /*semi*/final array(string) list_tables (void|string glob) { - array row,ret=({}); // This query might not work on PostgreSQL 7.4 - .pgsql_util.sql_result res=big_query( // due to missing schemasupport + array row, ret = .pgsql_util.emptyarray; + .pgsql_util.sql_result res = big_query( // due to missing schemasupport + // This query might not work on PostgreSQL 7.4 "SELECT CASE WHEN 'public'=n.nspname THEN '' ELSE n.nspname||'.' END " " ||c.relname AS name " "FROM pg_catalog.pg_class c " @@ -1574,8 +709,8 @@ private void runcallback(int pid,string condition,string extrainfo) { " AND c.relname ILIKE :glob " " ORDER BY 1", ([":glob":glob2reg(glob)])); - while(row=res->fetch_row()) - ret+=({row[0]}); + while(row = res->fetch_row()) + ret += ({row[0]}); return ret; } @@ -1621,8 +756,8 @@ private void runcallback(int pid,string condition,string extrainfo) { //! Setting it to @expr{*@} will include system columns in the list. /*semi*/final array(mapping(string:mixed)) list_fields(void|string table, void|string glob) { - array row, ret=({}); - string schema=UNDEFINED; + array row, ret = .pgsql_util.emptyarray; + string schema; sscanf(table||"*", "%s.%s", schema, table); @@ -1672,7 +807,7 @@ private void runcallback(int pid,string condition,string extrainfo) { array colnames=res->fetch_fields(); { - mapping(string:string) renames=([ + mapping(string:string) renames = ([ "attname":"name", "nspname":"schema", "relname":"table", @@ -1687,23 +822,23 @@ private void runcallback(int pid,string condition,string extrainfo) { "relhaspkey":"has_primarykey", "reltuples":"rowcount", ]); - foreach(colnames;int i;mapping m) { - string nf,field=m.name; + foreach(colnames; int i; mapping m) { + string nf, field=m.name; if(nf=renames[field]) - field=nf; + field=nf; colnames[i]=field; } } -#define delifzero(m,field) if(!(m)[field]) m_delete(m,field) +#define delifzero(m, field) if(!(m)[field]) m_delete(m, field) while(row=res->fetch_row()) { - mapping m=mkmapping(colnames,row); + mapping m=mkmapping(colnames, row); delifzero(m,"is_shared"); delifzero(m,"has_index"); delifzero(m,"has_primarykey"); delifzero(m,"default"); - ret+=({m}); + ret += ({m}); } return ret; } @@ -1728,12 +863,12 @@ private string trbackendst(int c) { //! @note //! This function is PostgreSQL-specific. final string status_commit() { - return trbackendst(backendstatus); + return trbackendst(proxy.backendstatus); } private inline void closestatement( - .pgsql_util.bufcon|.pgsql_util.conxsess plugbuffer,string oldprep) { - .pgsql_util.closestatement(plugbuffer,oldprep); + .pgsql_util.bufcon|.pgsql_util.conxsess plugbuffer, string oldprep) { + .pgsql_util.closestatement(plugbuffer, oldprep); } private inline string int2hex(int i) { @@ -1744,9 +879,8 @@ private inline void throwdelayederror(object parent) { .pgsql_util.throwdelayederror(parent); } -//! This is the only provided interface which allows you to query the -//! database. If you wish to use the simpler @[Sql.Sql()->query()] function, -//! you need to use the @[Sql.Sql] generic SQL-object. +//! This is the only provided direct interface which allows you to query the +//! database. A simpler synchronous interface can be used through @[query()]. //! //! Bindings are supported natively straight across the network. //! Special bindings supported are: @@ -1778,9 +912,9 @@ private inline void throwdelayederror(object parent) { //! @returns //! A @[Sql.pgsql_util.sql_result] object (which conforms to the //! @[Sql.sql_result] standard interface for accessing data). It is -//! recommended to use @[Sql.Sql()->query()] for simpler queries (because +//! recommended to use @[query()] for simpler queries (because //! it is easier to handle, but stores all the result in memory), and -//! @[Sql.Sql()->big_query()] for queries you expect to return huge amounts of +//! @[big_query()] for queries you expect to return huge amounts of //! data (it's harder to handle, but fetches results on demand). //! //! @note @@ -1798,81 +932,85 @@ private inline void throwdelayederror(object parent) { //! simply ignores any commands after the first unquoted semicolon. This can //! be viewed as a limited protection against SQL-injection attacks. //! To make it support multiple queries in one querystring, use the -//! @ref{:_text@} option. +//! @ref{:_text@} option (not recommended). //! //! @seealso //! @[big_typed_query()], @[Sql.Sql], @[Sql.sql_result], -//! @[Sql.Sql()->query()], @[Sql.pgsql_util.sql_result] +//! @[query()], @[Sql.pgsql_util.sql_result] /*semi*/final .pgsql_util.sql_result big_query(string q, void|mapping(string|int:mixed) bindings, void|int _alltyped) { throwdelayederror(this); - string preparedname=""; - int forcecache=-1, forcetext=_options.text_query; - int syncparse=zero_type(_options.sync_parse)?-1:_options.sync_parse; - if(waitforauthready) + string preparedname = ""; + mapping(string:mixed) options = proxy.options; + .pgsql_util.conxion c = proxy.c; + int forcecache = -1, forcetext = options.text_query; + int syncparse = zero_type(options.sync_parse) + ? -1 : options.sync_parse; + if (proxy.waitforauthready) waitauthready(); - string cenc=_runtimeparameter[CLIENT_ENCODING]; + string cenc = proxy.runtimeparameter[CLIENT_ENCODING]; switch(cenc) { case UTF8CHARSET: - q=string_to_utf8(q); + q = string_to_utf8(q); break; default: - if(String.width(q)>8) - ERROR("Don't know how to convert %O to %s encoding\n",q,cenc); + if (String.width(q) > 8) + ERROR("Don't know how to convert %O to %s encoding\n", q, cenc); } array(string|int) paramValues; array from; - if(bindings) { - if(forcetext) - q = .sql_util.emulate_bindings(q, bindings, this), paramValues=({}); + if (bindings) { + if (forcetext) + q = .sql_util.emulate_bindings(q, bindings, this), + paramValues = .pgsql_util.emptyarray; else { - int pi=0,rep=0; - paramValues=allocate(sizeof(bindings)); - from=allocate(sizeof(bindings)); - array(string) to=allocate(sizeof(bindings)); - foreach(bindings; mixed name; mixed value) { - if(stringp(name)) { // Throws if mapping key is empty string - if(name[0]!=':') - name=":"+name; - if(name[1]=='_') { // Special option parameter + int pi = 0, rep = 0; + paramValues = allocate(sizeof(bindings)); + from = allocate(sizeof(bindings)); + array(string) to = allocate(sizeof(bindings)); + foreach (bindings; mixed name; mixed value) { + if (stringp(name)) { // Throws if mapping key is empty string + if (name[0] != ':') + name = ":" + name; + if (name[1] == '_') { // Special option parameter switch(name) { case ":_cache": - forcecache=(int)value; + forcecache = (int)value; break; case ":_text": - forcetext=(int)value; + forcetext = (int)value; break; case ":_sync": - syncparse=(int)value; + syncparse = (int)value; break; } continue; } - if(!has_value(q,name)) + if (!has_value(q, name)) continue; } - from[rep]=name; + from[rep] = name; string rval; - if(multisetp(value)) // multisets are taken literally - rval=indices(value)*","; // and bypass the encoding logic + if (multisetp(value)) // multisets are taken literally + rval = indices(value)*","; // and bypass the encoding logic else { - paramValues[pi++]=value; - rval=sprintf("$%d",pi); + paramValues[pi++] = value; + rval = sprintf("$%d", pi); } - to[rep++]=rval; + to[rep++] = rval; } if(rep--) - q=replace(q,from=from[..rep],to=to[..rep]); - paramValues= pi ? paramValues[..pi-1] : ({}); - from=({from,to,paramValues}); + q = replace(q, from = from[..rep], to = to[..rep]); + paramValues = pi ? paramValues[..pi-1] : .pgsql_util.emptyarray; + from = ({from, to, paramValues}); } } else - paramValues=({}); - if(String.width(q)>8) - ERROR("Wide string literals in %O not supported\n",q); - if(has_value(q,"\0")) - ERROR("Querystring %O contains invalid literal nul-characters\n",q); + paramValues = .pgsql_util.emptyarray; + if (String.width(q) > 8) + ERROR("Wide string literals in %O not supported\n", q); + if (has_value(q, "\0")) + ERROR("Querystring %O contains invalid literal nul-characters\n", q); mapping(string:mixed) tp; int tstart; /* @@ -1888,80 +1026,80 @@ private inline void throwdelayederror(object parent) { else if (!forcetext && forcecache == 1 || forcecache && sizeof(q) >= MINPREPARELENGTH) { object plugbuffer = c->start(); - if(tp=_prepareds[q]) { - if(tp.preparedname) { + if (tp = proxy.prepareds[q]) { + if (tp.preparedname) { #ifdef PG_STATS - prepstmtused++; + prepstmtused++; #endif - preparedname=tp.preparedname; - } else if((tstart=tp.trun) - && tp.tparse*FACTORPLAN>=tstart - && (undefinedp(_options.cache_autoprepared_statements) - || _options.cache_autoprepared_statements)) - preparedname=PREPSTMTPREFIX+int2hex(pstmtcount++); + preparedname = tp.preparedname; + } else if((tstart = tp.trun) + && tp.tparse*FACTORPLAN >= tstart + && (undefinedp(options.cache_autoprepared_statements) + || options.cache_autoprepared_statements)) + preparedname = PREPSTMTPREFIX + int2hex(pstmtcount++); } else { - if(totalhits>=cachedepth) - foreach(_prepareds;string ind;tp) { - int oldhits=tp.hits; - totalhits-=oldhits-(tp.hits=oldhits>>1); - if(oldhits<=1) { - closestatement(plugbuffer,tp.preparedname); - m_delete(_prepareds,ind); - } - } - if(forcecache!=1 && .pgsql_util.createprefix->match(q)) { + if (proxy.totalhits >= cachedepth) + foreach (proxy.prepareds; string ind; tp) { + int oldhits = tp.hits; + proxy.totalhits -= oldhits-(tp.hits = oldhits >> 1); + if (oldhits <= 1) { + closestatement(plugbuffer, tp.preparedname); + m_delete(proxy.prepareds, ind); + } + } + if (forcecache != 1 && .pgsql_util.createprefix->match(q)) { PD("Invalidate cache\n"); - invalidatecache=1; // Flush cache on CREATE + proxy.invalidatecache = 1; // Flush cache on CREATE tp = 0; } else - _prepareds[q]=tp=([]); + proxy.prepareds[q] = tp = ([]); } - if(invalidatecache) { - invalidatecache=0; - foreach(_prepareds;;mapping np) { - closestatement(plugbuffer,np.preparedname); - m_delete(np,"preparedname"); + if (proxy.invalidatecache) { + proxy.invalidatecache = 0; + foreach (proxy.prepareds; ; mapping np) { + closestatement(plugbuffer, np.preparedname); + m_delete(np, "preparedname"); } } - if(sizeof(CHAIN(plugbuffer))) { + if (sizeof(CHAIN(plugbuffer))) { PD("%O\n",(string)CHAIN(plugbuffer)); plugbuffer->sendcmd(FLUSHSEND); // close expireds } else plugbuffer->sendcmd(KEEP); // close start() - tstart=gethrtime(); + tstart = gethrtime(); } else // sql_result autoassigns to portal tp = 0; .pgsql_util.sql_result portal; - portal=.pgsql_util.sql_result(this,c,q, portalbuffersize, _alltyped, from, - forcetext, timeout, syncparse, transtype); - portal._tprepared=tp; + portal = .pgsql_util.sql_result(proxy, c, q, portalbuffersize, _alltyped, + from, forcetext, timeout, syncparse, transtype); + portal._tprepared = tp; #ifdef PG_STATS portalsopened++; #endif - clearmessage=1; - if(forcetext) { // FIXME What happens if portals are still open? - portal._unnamedportalkey=_unnamedportalmux->lock(1); - portal._portalname=""; + proxy.clearmessage = 1; + if (forcetext) { // FIXME What happens if portals are still open? + portal._unnamedportalkey = proxy.unnamedportalmux->lock(1); + portal._portalname = ""; portal->_parseportal(); portal->_bindportal(); - _readyforquerycount++; - Thread.MutexKey lock=unnamedstatement->lock(1); + proxy.readyforquerycount++; + Thread.MutexKey lock = proxy.unnamedstatement->lock(1); .pgsql_util.conxsess cs = c->start(1); CHAIN(cs)->add_int8('Q')->add_hstring(({q, 0}), 4, 4); - cs->sendcmd(FLUSHLOGSEND,portal); - lock=0; - PD("Simple query: %O\n",q); + cs->sendcmd(FLUSHLOGSEND, portal); + lock = 0; + PD("Simple query: %O\n", q); } else { object plugbuffer; portal->_parseportal(); - if(!sizeof(preparedname) || !tp || !tp.preparedname) { - if(!sizeof(preparedname)) + if (!sizeof(preparedname) || !tp || !tp.preparedname) { + if (!sizeof(preparedname)) preparedname= - (portal._unnamedstatementkey = unnamedstatement->trylock(1)) + (portal._unnamedstatementkey = proxy.unnamedstatement->trylock(1)) ? "" : PTSTMTPREFIX+int2hex(ptstmtcount++); - PD("Parse statement %O=%O\n",preparedname,q); + PD("Parse statement %O=%O\n", preparedname, q); plugbuffer = c->start(); CHAIN(plugbuffer)->add_int8('P') - ->add_hstring(({preparedname,0,q,"\0\0\0"}),4,4) + ->add_hstring(({preparedname, 0, q, "\0\0\0"}), 4, 4) #if 0 // Even though the protocol doesn't require the Parse command to be // followed by a flush, it makes a VERY noticeable difference in @@ -1975,30 +1113,30 @@ private inline void throwdelayederror(object parent) { preparedname = tp.preparedname; // to shortcut a potential race PD("Using prepared statement %s for %O\n", preparedname, q); } - portal._preparedname=preparedname; - if(!tp || !tp.datatypeoid) { - PD("Describe statement %O\n",preparedname); + portal._preparedname = preparedname; + if (!tp || !tp.datatypeoid) { + PD("Describe statement %O\n", preparedname); if (!plugbuffer) plugbuffer = c->start(); CHAIN(plugbuffer)->add_int8('D') ->add_hstring(({'S', preparedname, 0}), 4, 4); - plugbuffer->sendcmd(FLUSHSEND,portal); + plugbuffer->sendcmd(FLUSHSEND, portal); } else { - if(plugbuffer) + if (plugbuffer) plugbuffer->sendcmd(KEEP); #ifdef PG_STATS skippeddescribe++; #endif - portal->_setrowdesc(tp.datarowdesc,tp.datarowtypes); + portal->_setrowdesc(tp.datarowdesc, tp.datarowtypes); } - if((portal._tprepared=tp) && tp.datatypeoid) { - mixed e=catch(portal->_preparebind(tp.datatypeoid)); - if (e && !portal._delayederror) { + if ((portal._tprepared=tp) && tp.datatypeoid) { + mixed e = catch(portal->_preparebind(tp.datatypeoid)); + if (e && !portal.delayederror) { portal._unnamedstatementkey = 0; // Release early, release often throw(e); } } - if (!unnamedstatement) + if (!proxy.unnamedstatement) portal._unnamedstatementkey = 0; // Cover for a destruct race } throwdelayederror(portal); @@ -2012,7 +1150,7 @@ private inline void throwdelayederror(object parent) { //! @[big_query()], @[big_typed_query()], @[Sql.Sql], @[Sql.sql_result] /*semi*/final inline .pgsql_util.sql_result streaming_query(string q, void|mapping(string|int:mixed) bindings) { - return big_query(q,bindings); + return big_query(q, bindings); } //! This function returns an object that allows streaming and typed @@ -2022,5 +1160,5 @@ private inline void throwdelayederror(object parent) { //! @[big_query()], @[Sql.Sql], @[Sql.sql_result] /*semi*/final inline .pgsql_util.sql_result big_typed_query(string q, void|mapping(string|int:mixed) bindings) { - return big_query(q,bindings,1); + return big_query(q, bindings, 1); } diff --git a/lib/modules/Sql.pmod/pgsql_util.pmod b/lib/modules/Sql.pmod/pgsql_util.pmod index e3b1a2f3d369bfb7e10d67bd2ba9732a3861cdc7..ee8fc9964a0f11aeb28946afa1fa9c78245b18f9 100644 --- a/lib/modules/Sql.pmod/pgsql_util.pmod +++ b/lib/modules/Sql.pmod/pgsql_util.pmod @@ -18,6 +18,21 @@ #include "pgsql.h" +#define PORTALINIT 0 // Portal states +#define PARSING 1 +#define BOUND 2 +#define COMMITTED 3 +#define COPYINPROGRESS 4 +#define CLOSING 5 +#define CLOSED 6 +#define PURGED 7 + +#define NOERROR 0 // Error states networkparser +#define PROTOCOLERROR 1 +#define PROTOCOLUNSUPPORTED 2 + +#define LOSTERROR "Database connection lost" + //! The instance of the pgsql dedicated backend. final Pike.Backend local_backend = Pike.SmallBackend(); @@ -28,27 +43,26 @@ constant emptyarray = ({}); constant describenodata = (["datarowdesc":emptyarray, "datarowtypes":emptyarray, "datatypeoid":emptyarray]); -final multiset censoroptions=(<"use_ssl","force_ssl", - "cache_autoprepared_statements","reconnect","text_query","is_superuser", - "server_encoding","server_version","integer_datetimes", +private constant censoroptions = (<"use_ssl", "force_ssl", + "cache_autoprepared_statements", "reconnect", "text_query", "is_superuser", + "server_encoding", "server_version", "integer_datetimes", "session_authorization">); -constant stdiobuftype = typeof(Stdio.Buffer()); /* Statements matching createprefix cause the prepared statement cache * to be flushed to prevent stale references to (temporary) tables */ -final Regexp createprefix=iregexp("^\a*(CREATE|DROP)\a"); +final Regexp createprefix = iregexp("^\a*(CREATE|DROP)\a"); /* Statements matching dontcacheprefix never enter the cache */ -private Regexp dontcacheprefix=iregexp("^\a*(FETCH|COPY)\a"); +private Regexp dontcacheprefix = iregexp("^\a*(FETCH|COPY)\a"); /* Statements not matching paralleliseprefix will cause the driver * to stall submission until all previously started statements have * run to completion */ private Regexp paralleliseprefix - =iregexp("^\a*((SELEC|INSER)T|(UPDA|DELE)TE|(FETC|WIT)H)\a"); + = iregexp("^\a*((SELEC|INSER)T|(UPDA|DELE)TE|(FETC|WIT)H)\a"); /* Statements matching transbeginprefix will cause the driver * insert a sync after the statement. @@ -56,7 +70,7 @@ private Regexp paralleliseprefix * in the event of an ErrorResponse. */ final Regexp transbeginprefix - =iregexp("^\a*(BEGIN|START)([; \t\f\r\n]|$)"); + = iregexp("^\a*(BEGIN|START)([; \t\f\r\n]|$)"); /* Statements matching transendprefix will cause the driver * insert a sync after the statement. @@ -64,7 +78,7 @@ final Regexp transbeginprefix * in the event of an ErrorResponse. */ final Regexp transendprefix - =iregexp("^\a*(COMMIT|ROLLBACK|END)([; \t\f\r\n]|$)"); + = iregexp("^\a*(COMMIT|ROLLBACK|END)([; \t\f\r\n]|$)"); /* For statements matching execfetchlimit the resultrows will not be * fetched in pieces. This heuristic will be sub-optimal whenever @@ -79,23 +93,23 @@ final Regexp transendprefix * tradeoff. */ private Regexp execfetchlimit - =iregexp("^\a*((UPDA|DELE)TE|INSERT)\a|\aLIMIT\a+[1-9][; \t\f\r\n]*$"); + = iregexp("^\a*((UPDA|DELE)TE|INSERT)\a|\aLIMIT\a+[1-9][; \t\f\r\n]*$"); private Regexp iregexp(string expr) { - Stdio.Buffer ret=Stdio.Buffer(); - foreach(expr;;int c) - if(c>='A'&&c<='Z') - ret->add('[',c,c+'a'-'A',']'); - else if(c=='\a') // Replace with generic whitespace + Stdio.Buffer ret = Stdio.Buffer(); + foreach (expr; ; int c) + if (c >= 'A' && c <= 'Z') + ret->add('[', c, c + 'a' - 'A', ']'); + else if (c == '\a') // Replace with generic whitespace ret->add("[ \t\f\r\n]"); else ret->add_int8(c); return Regexp(ret->read()); } -final void closestatement(bufcon|conxsess plugbuffer,string oldprep) { - if(oldprep) { - PD("Close statement %s\n",oldprep); +final void closestatement(bufcon|conxsess plugbuffer, string oldprep) { + if (oldprep) { + PD("Close statement %s\n", oldprep); CHAIN(plugbuffer)->add_int8('C')->add_hstring(({'S', oldprep, 0}), 4, 4); } } @@ -104,8 +118,8 @@ private void run_local_backend() { Thread.MutexKey lock; int looponce; do { - looponce=0; - if(lock=backendmux->trylock()) { + looponce = 0; + if (lock = backendmux->trylock()) { PD("Starting local backend\n"); while (!clientsregistered->drained() // Autoterminate when not needed || sizeof(local_backend->call_out_info())) { @@ -114,10 +128,10 @@ private void run_local_backend() { werror(describe_backtrace(err)); } PD("Terminating local backend\n"); - lock=0; + lock = 0; looponce = !clientsregistered->drained(); } - } while(looponce); + } while (looponce); } //! Registers yourself as a user of this backend. If the backend @@ -130,17 +144,21 @@ final Thread.ResourceCountKey register_backend() { return key; } -final void throwdelayederror(object parent) { - if(mixed err=parent._delayederror) { - parent._delayederror=UNDEFINED; - if(stringp(err)) - err=({err,backtrace()[..<2]}); +final void throwdelayederror(sql_result|proxy parent) { + if (mixed err = parent->delayederror) { + if (!objectp(parent->pgsqlsess)) + parent->untolderror = 0; + else if (parent->pgsqlsess) + parent->pgsqlsess->untolderror = 0; + parent.delayederror = 0; + if (stringp(err)) + err = ({err, backtrace()[..<2]}); throw(err); } } -final int oidformat(int oid) { - switch(oid) { +private int oidformat(int oid) { + switch (oid) { case BOOLOID: case BYTEAOID: case CHAROID: @@ -161,8 +179,8 @@ final int oidformat(int oid) { } private inline mixed callout(function(mixed ...:void) f, - float|int delay,mixed ... args) { - return local_backend->call_out(f,delay,@args); + float|int delay, mixed ... args) { + return local_backend->call_out(f, delay, @args); } // Some pgsql utility functions @@ -180,7 +198,7 @@ class bufcon { private conxion realbuffer; private void create(conxion _realbuffer) { - realbuffer=_realbuffer; + realbuffer = _realbuffer; } final Thread.ResourceCount `stashcount() { @@ -190,14 +208,14 @@ class bufcon { final bufcon start(void|int waitforreal) { dirty = realbuffer->stashcount->acquire(); #ifdef PG_DEBUG - if(waitforreal) + if (waitforreal) error("pgsql.bufcon not allowed here\n"); #endif return this; } - final void sendcmd(int mode,void|sql_result portal) { - Thread.MutexKey lock=realbuffer->shortmux->lock(); + final void sendcmd(int mode, void|sql_result portal) { + Thread.MutexKey lock = realbuffer->shortmux->lock(); if (portal) realbuffer->stashqueue->write(portal); if (mode == SYNCSEND) { @@ -212,7 +230,7 @@ class bufcon { realbuffer->stashflushmode = mode; dirty = 0; this->clear(); - if(lock=realbuffer->nostash->trylock(1)) { + if (lock = realbuffer->nostash->trylock(1)) { #ifdef PG_DEBUGRACE conxsess sess = conxsess(realbuffer); realbuffer->started = lock; @@ -237,39 +255,39 @@ class conxiin { protected final bool range_error(int howmuch) { #ifdef PG_DEBUG - if(howmuch<=0) - error("Out of range %d\n",howmuch); + if (howmuch <= 0) + error("Out of range %d\n", howmuch); #endif - if(fillread) { - Thread.MutexKey lock=fillreadmux->lock(); - if(!didreadcb) + if (fillread) { + Thread.MutexKey lock = fillreadmux->lock(); + if (!didreadcb) fillread.wait(lock); - didreadcb=0; - lock=0; + didreadcb = 0; + lock = 0; } else throw(MAGICTERMINATE); return true; } - final int read_cb(mixed id,mixed b) { + final int read_cb(mixed id, mixed b) { PD("Read callback %O\n", b && ((string)b) #ifndef PG_DEBUGMORE [..255] #endif ); - Thread.MutexKey lock=fillreadmux->lock(); - if(procmsg&&id) - procmsg=0,lock=0,Thread.Thread(id); - else if(fillread) - didreadcb=1, fillread.signal(); - lock=0; + Thread.MutexKey lock = fillreadmux->lock(); + if (procmsg && id) + procmsg = 0, lock = 0, Thread.Thread(id); + else if (fillread) + didreadcb = 1, fillread.signal(); + lock = 0; return 0; } private void create() { i::create(); - fillreadmux=Thread.Mutex(); - fillread=Thread.Condition(); + fillreadmux = Thread.Mutex(); + fillread = Thread.Condition(); } }; @@ -288,16 +306,20 @@ class conxion { final Thread.Mutex shortmux; private int closenext; - final sfile socket; + final sfile +#if constant(SSL.File) + |SSL.File +#endif + socket; private int towrite; - final multiset(function(void|mixed:void)) closecallbacks=(<>); + final multiset(sql_result) runningportals = (<>); final Thread.Mutex nostash; final Thread.MutexKey started; final Thread.Queue stashqueue; final Thread.Condition stashavail; final Stdio.Buffer stash; - final int stashflushmode; + final int(KEEP..SYNCSEND) stashflushmode; final Thread.ResourceCount stashcount; final int synctransact; #ifdef PG_DEBUGRACE @@ -305,27 +327,27 @@ class conxion { #endif #ifdef PG_DEBUG final int queueoutidx; - final int queueinidx=-1; + final int queueinidx = -1; #endif private inline void queueup(sql_result portal) { - qportals->write(portal); portal->_synctransact=synctransact; - PD("%d>%O %d %d Queue portal %d bytes\n",socket->query_fd(), - portal._portalname,++queueoutidx,synctransact,sizeof(this)); + qportals->write(portal); portal->_synctransact = synctransact; + PD("%d>%O %d %d Queue portal %d bytes\n", socket->query_fd(), + portal._portalname, ++queueoutidx, synctransact, sizeof(this)); } final bufcon|conxsess start(void|int waitforreal) { Thread.MutexKey lock; - if(lock=(waitforreal?nostash->lock:nostash->trylock)(1)) { + if (lock = (waitforreal ? nostash->lock : nostash->trylock)(1)) { int mode; #ifdef PG_DEBUGRACE conxsess sess = conxsess(this); #endif started = lock; - lock=shortmux->lock(); + lock = shortmux->lock(); stashcount->wait_till_drained(lock); mode = getstash(KEEP); - lock=0; + lock = 0; if (mode > KEEP) sendcmd(mode); // Force out stash to the server #ifdef PG_DEBUGRACE @@ -351,7 +373,7 @@ class conxion { private int getstash(int mode) { if (sizeof(stash)) { add(stash); stash->clear(); - foreach (stashqueue->try_read_array();; int|sql_result portal) + foreach (stashqueue->try_read_array(); ; int|sql_result portal) if (intp(portal)) qportals->write(synctransact++); else @@ -386,13 +408,16 @@ unfinalised: mode = FLUSHSEND; } qportals->write(synctransact++); - } while(0); + } while (0); lock = shortmux->lock(); mode = getstash(mode); +#ifdef PG_DEBUG + mixed err = +#endif catch { outer: do { - switch(mode) { + switch (mode) { default: PD("%d>Skip flush %d Queue %O\n", socket->query_fd(), mode, (string)this); @@ -402,115 +427,134 @@ outer: add(PGFLUSH); case SENDOUT:; } - if(towrite=sizeof(this)) { - PD("%d>Sendcmd %O\n",socket->query_fd(),((string)this)[..towrite-1]); - towrite-=output_to(socket,towrite); + if (towrite = sizeof(this)) { + PD("%d>Sendcmd %O\n", + socket->query_fd(), ((string)this)[..towrite-1]); + towrite -= output_to(socket, towrite); } - } while(0); - lock=started=0; + } while (0); + lock = started = 0; return; }; - lock=0; + lock = 0; + PD("Sendcmd failed %s\n", describe_backtrace(err)); destruct(this); } final int close() { - if(!closenext && nostash) { - closenext=1; - Thread.MutexKey lock=i->fillreadmux->lock(); - if(i->fillread) { // Delayed close() after flushing the output buffer + if (!closenext && nostash) { + closenext = 1; + Thread.MutexKey lock = i->fillreadmux->lock(); + if (i->fillread) { // Delayed close() after flushing the output buffer i->fillread.signal(); - i->fillread=0; + i->fillread = 0; } - lock=0; - PD("%d>Delayed close, flush write\n",socket->query_fd()); - i->read_cb(socket->query_id(),0); + lock = 0; + PD("%d>Delayed close, flush write\n", socket->query_fd()); + i->read_cb(socket->query_id(), 0); return 0; } else return -1; } - private void destroy() { - PD("%d>Close conxion %d\n", socket ? socket->query_fd() : -1, !!nostash); - int|.pgsql_util.sql_result portal; - if (qportals) // CancelRequest does not use qportals - while (portal = qportals->try_read()) - if (objectp(portal)) - portal->_purgeportal(); - if(nostash) { - catch { - while(sizeof(closecallbacks)) - foreach(closecallbacks;function(void|mixed:void) closecb;) - closecb(); + final void purge() { + if (stashcount) { + stashcount = 0; + PD("%d>Purge conxion %d\n", socket ? socket->query_fd() : -1, !!nostash); + int|sql_result portal; + if (qportals) // CancelRequest does not use qportals + while (portal = qportals->try_read()) + if (objectp(portal)) + portal->_purgeportal(); + if (nostash) { + while (sizeof(runningportals)) + catch { + foreach (runningportals; sql_result result; ) + if (!result.datarowtypes) { + result.datarowtypes = emptyarray; + if (result._state != PURGED && !result.delayederror) + result.delayederror = LOSTERROR; + result._ddescribe->broadcast(); + runningportals[result] = 0; + } else + destruct(result); + }; destruct(nostash); - socket->set_nonblocking(); // Drop all callbacks - PD("%d>Close socket\n",socket->query_fd()); - socket->close(); - }; + socket->set_non_blocking(); // Drop all callbacks + PD("%d>Close socket\n", socket->query_fd()); + socket->close(); // This will be an asynchronous close + } + destruct(this); } } - final void connectloop(object pgsqlsess, int nossl) { - mixed err=catch { - for(;;clear()) { - socket->connect(pgsqlsess._host,pgsqlsess._port); + private void destroy() { + PD("%d>Close conxion %d\n", socket ? socket->query_fd() : -1, !!nostash); + catch(purge()); + } + + final void connectloop(proxy pgsqlsess, int nossl) { + mixed err = catch { + for (; ; clear()) { + socket->connect(pgsqlsess. host, pgsqlsess. port); #if constant(SSL.File) - if(!nossl && !pgsqlsess->nossl - && (pgsqlsess._options.use_ssl || pgsqlsess._options.force_ssl)) { + if (!nossl && !pgsqlsess->nossl + && (pgsqlsess.options.use_ssl || pgsqlsess.options.force_ssl)) { PD("SSLRequest\n"); - start()->add_int32(8)->add_int32(PG_PROTOCOL(1234,5679)) + start()->add_int32(8)->add_int32(PG_PROTOCOL(1234, 5679)) ->sendcmd(SENDOUT); string s = socket.read(1); switch (sizeof(s) && s[0]) { case 'S': - object fcon = SSL.File(socket, SSL.Context()); - if(fcon->connect()) { + SSL.File fcon = SSL.File(socket, SSL.Context()); + if (fcon->connect()) { socket->set_backend(local_backend); - socket=fcon; + socket = fcon; break; } default: PD("%d>Close socket short\n", socket->query_fd()); socket->close(); - pgsqlsess.nossl=1; + pgsqlsess.nossl = 1; continue; case 'N': - if(pgsqlsess._options.force_ssl) + if (pgsqlsess.options.force_ssl) error("Encryption not supported on connection to %s:%d\n", - pgsqlsess.host,pgsqlsess.port); + pgsqlsess.host, pgsqlsess.port); } } #else - if(pgsqlsess._options.force_ssl) + if (pgsqlsess.options.force_ssl) error("Encryption library missing," " cannot establish connection to %s:%d\n", - pgsqlsess.host,pgsqlsess.port); + pgsqlsess.host, pgsqlsess.port); #endif break; } - if(!socket->is_open()) - error(strerror(socket->errno())+".\n"); + if (!socket->is_open()) + error(strerror(socket->errno()) + ".\n"); socket->set_backend(local_backend); - socket->set_buffer_mode(i,0); - socket->set_nonblocking(i->read_cb,write_cb,close); + socket->set_buffer_mode(i, 0); + socket->set_nonblocking(i->read_cb, write_cb, close); if (nossl != 2) - Thread.Thread(pgsqlsess->_processloop,this); + Thread.Thread(pgsqlsess->processloop, this); return; }; + catch(destruct(pgsqlsess->waitforauthready)); destruct(this); } - private string _sprintf(int type, void|mapping flags) { - string res=UNDEFINED; - switch(type) { + private string _sprintf(int type) { + string res; + switch (type) { case 'O': - int fd=-1; - if(socket) - catch(fd=socket->query_fd()); - res=predef::sprintf("conxion fd: %d input queue: %d/%d " + int fd = -1; + if (socket) + catch(fd = socket->query_fd()); + res = predef::sprintf("conxion fd: %d input queue: %d/%d " "queued portals: %d output queue: %d/%d\n" "started: %d\n", - fd,sizeof(i),i->_size_object(), + fd, sizeof(i), i->_size_object(), qportals && qportals->size(), sizeof(this), _size_object(), !!started); break; @@ -518,20 +562,20 @@ outer: return res; } - private void create(object pgsqlsess,Thread.Queue _qportals,int nossl) { + private void create(proxy pgsqlsess, Thread.Queue _qportals, int nossl) { o::create(); qportals = _qportals; synctransact = 1; - socket=sfile(); - i=conxiin(); - shortmux=Thread.Mutex(); - nostash=Thread.Mutex(); + socket = sfile(); + i = conxiin(); + shortmux = Thread.Mutex(); + nostash = Thread.Mutex(); closenext = 0; - stashavail=Thread.Condition(); - stashqueue=Thread.Queue(); - stash=Stdio.Buffer(); + stashavail = Thread.Condition(); + stashqueue = Thread.Queue(); + stash = Stdio.Buffer(); stashcount = Thread.ResourceCount(); - Thread.Thread(connectloop,pgsqlsess,nossl); + Thread.Thread(connectloop, pgsqlsess, nossl); } }; @@ -548,7 +592,7 @@ class conxsess { chain = parent; } - final void sendcmd(int mode,void|sql_result portal) { + final void sendcmd(int mode, void|sql_result portal) { chain->sendcmd(mode, portal); chain = 0; } @@ -568,15 +612,15 @@ class conxsess { //! @[Sql.sql_result], @[Sql.pgsql], @[Sql.Sql], @[Sql.pgsql()->big_query()] class sql_result { - private object pgsqlsess; - private int eoffound; + private proxy pgsqlsess; + private int(0..1) eoffound; private conxion c; private conxiin cr; - final mixed _delayederror; - final int _state; + final mixed delayederror; + final int(PORTALINIT..PURGED) _state; final int _fetchlimit; - private int alltext; - final int _forcetext; + private int(0..1) alltext; + final int(0..1) _forcetext; private int syncparse; private int transtype; @@ -584,18 +628,18 @@ class sql_result { private int rowsreceived; private int inflight; - private int portalbuffersize; + int portalbuffersize; private Thread.Mutex closemux; private Thread.Queue datarows; private Thread.ResourceCountKey stmtifkey, portalsifkey; private array(mapping(string:mixed)) datarowdesc; - private array(int) datarowtypes; // types from datarowdesc + final array(int) datarowtypes; // types from datarowdesc private string statuscmdcomplete; private int bytesreceived; final int _synctransact; final Thread.Condition _ddescribe; final Thread.Mutex _ddescribemux; - final Thread.MutexKey _unnamedportalkey,_unnamedstatementkey; + final Thread.MutexKey _unnamedportalkey, _unnamedstatementkey; final array _params; final string _query; final string _preparedname; @@ -604,37 +648,38 @@ class sql_result { private int timeout; protected string _sprintf(int type) { - string res=UNDEFINED; - switch(type) { + string res; + switch (type) { case 'O': - int fd=-1; - if(c&&c->socket) - catch(fd=c->socket->query_fd()); - res=sprintf("sql_result state: %d numrows: %d eof: %d inflight: %d\n" + int fd = -1; + if (c && c->socket) + catch(fd = c->socket->query_fd()); + res = sprintf("sql_result state: %d numrows: %d eof: %d inflight: %d\n" "query: %O\n" "fd: %O portalname: %O datarows: %d" " synctransact: %d laststatus: %s\n", - _state,rowsreceived,eoffound,inflight, - _query,fd,_portalname,datarowtypes&&sizeof(datarowtypes), - _synctransact, - statuscmdcomplete||(_unnamedstatementkey?"*parsing*":"")); + _state, rowsreceived, eoffound, inflight, + _query, fd, _portalname, + datarowtypes && sizeof(datarowtypes), _synctransact, + statuscmdcomplete + || (_unnamedstatementkey ? "*parsing*" : "")); break; } return res; } - protected void create(object _pgsqlsess,conxion _c,string query, - int _portalbuffersize,int alltyped,array params,int forcetext, + protected void create(proxy _pgsqlsess, conxion _c, string query, + int _portalbuffersize, int alltyped, array params, int forcetext, int _timeout, int _syncparse, int _transtype) { pgsqlsess = _pgsqlsess; if (catch(cr = (c = _c)->i)) losterror(); _query = query; datarows = Thread.Queue(); - _ddescribe=Thread.Condition(); - _ddescribemux=Thread.Mutex(); - closemux=Thread.Mutex(); - portalbuffersize=_portalbuffersize; + _ddescribe = Thread.Condition(); + _ddescribemux = Thread.Mutex(); + closemux = Thread.Mutex(); + portalbuffersize = _portalbuffersize; alltext = !alltyped; _params = params; _forcetext = forcetext; @@ -642,7 +687,7 @@ class sql_result { timeout = _timeout; syncparse = _syncparse; gottimeout = _pgsqlsess->cancelquery; - c->closecallbacks+=(<destroy>); + c->runningportals[this] = 1; transtype = _transtype; } @@ -666,32 +711,35 @@ class sql_result { //! This function is PostgreSQL-specific. /*semi*/final int affected_rows() { int rows; - if(statuscmdcomplete) - sscanf(statuscmdcomplete,"%*s %d",rows); + if (statuscmdcomplete) + sscanf(statuscmdcomplete, "%*s %d", rows); return rows; } final void _storetiming() { - if(_tprepared) { - _tprepared.trun=gethrtime()-_tprepared.trunstart; - m_delete(_tprepared,"trunstart"); - _tprepared = UNDEFINED; + if (_tprepared) { + _tprepared.trun = gethrtime() - _tprepared.trunstart; + m_delete(_tprepared, "trunstart"); + _tprepared = 0; } } private void waitfordescribe() { - Thread.MutexKey lock=_ddescribemux->lock(); - if(!datarowtypes) + Thread.MutexKey lock = _ddescribemux->lock(); + if (!datarowtypes) PT(_ddescribe->wait(lock)); - lock=0; + lock = 0; + if (this) // If object already destructed, skip the next call + trydelayederror(); // since you cannot call functions anymore + else + error(LOSTERROR); } //! @seealso //! @[Sql.sql_result()->num_fields()] /*semi*/final int num_fields() { - if(!datarowtypes) + if (!datarowtypes) waitfordescribe(); - trydelayederror(); return sizeof(datarowtypes); } @@ -711,12 +759,12 @@ class sql_result { private void losterror() { string err; if (pgsqlsess) - err = pgsqlsess->error(1); - error("%s\n", err || "Database connection lost"); + err = pgsqlsess->geterror(1); + error("%s\n", err || LOSTERROR); } private void trydelayederror() { - if(_delayederror) + if (delayederror) throwdelayederror(this); else if (_state == PURGED) losterror(); @@ -732,10 +780,11 @@ class sql_result { //! @seealso //! @[Sql.sql_result()->fetch_fields()] /*semi*/final array(mapping(string:mixed)) fetch_fields() { - if(!datarowtypes) + if (!datarowtypes) waitfordescribe(); - trydelayederror(); - return datarowdesc+emptyarray; + if (!datarowdesc) + error(LOSTERROR); + return datarowdesc + emptyarray; } #ifdef PG_DEBUG @@ -743,79 +792,79 @@ class sql_result { #else #define INTVOID void #endif - final INTVOID _decodedata(int msglen,string cenc) { + final INTVOID _decodedata(int msglen, string cenc) { _storetiming(); _releasestatement(); string serror; - bytesreceived+=msglen; - int cols=cr->read_int16(); - array a=allocate(cols,!alltext&&Val.null); + bytesreceived += msglen; + int cols = cr->read_int16(); + array a = allocate(cols, !alltext && Val.null); #ifdef PG_DEBUG - msglen-=2+4*cols; + msglen -= 2 + 4 * cols; #endif - foreach(datarowtypes;int i;int typ) { - int collen=cr->read_sint(4); - if(collen>0) { + foreach (datarowtypes; int i; int typ) { + int collen = cr->read_sint(4); + if (collen > 0) { #ifdef PG_DEBUG - msglen-=collen; + msglen -= collen; #endif mixed value; - switch(typ) { + switch (typ) { case FLOAT4OID: #if SIZEOF_FLOAT>=8 case FLOAT8OID: #endif - if(!alltext) { - value=(float)cr->read(collen); + if (!alltext) { + value = (float)cr->read(collen); break; } - default:value=cr->read(collen); + default:value = cr->read(collen); break; case CHAROID: - value=alltext?cr->read(1):cr->read_int8(); + value = alltext ? cr->read(1) : cr->read_int8(); break; - case BOOLOID:value=cr->read_int8(); - switch(value) { - case 'f':value=0; + case BOOLOID:value = cr->read_int8(); + switch (value) { + case 'f':value = 0; break; - case 't':value=1; + case 't':value = 1; } - if(alltext) - value=value?"t":"f"; + if (alltext) + value = value ? "t" : "f"; break; case TEXTOID: case BPCHAROID: case VARCHAROID: - value=cr->read(collen); - if(cenc==UTF8CHARSET && catch(value=utf8_to_string(value)) + value = cr->read(collen); + if (cenc == UTF8CHARSET && catch(value = utf8_to_string(value)) && !serror) - serror=SERROR("%O contains non-%s characters\n", - value,UTF8CHARSET); + serror = SERROR("%O contains non-%s characters\n", + value, UTF8CHARSET); break; case INT8OID:case INT2OID: case OIDOID:case INT4OID: - if(_forcetext) { - value=cr->read(collen); - if(!alltext) - value=(int)value; + if (_forcetext) { + value = cr->read(collen); + if (!alltext) + value = (int)value; } else { - switch(typ) { - case INT8OID:value=cr->read_sint(8); + switch (typ) { + case INT8OID:value = cr->read_sint(8); break; - case INT2OID:value=cr->read_sint(2); + case INT2OID:value = cr->read_sint(2); break; case OIDOID: - case INT4OID:value=cr->read_sint(4); + case INT4OID:value = cr->read_sint(4); } - if(alltext) - value=(string)value; + if (alltext) + value = (string)value; } } a[i]=value; - } else if(!collen) + } else if (!collen) a[i]=""; } _processdataready(a); - if(serror) + if (serror) error(serror); #ifdef PG_DEBUG return msglen; @@ -824,44 +873,45 @@ class sql_result { final void _setrowdesc(array(mapping(string:mixed)) drowdesc, array(int) drowtypes) { - Thread.MutexKey lock=_ddescribemux->lock(); - datarowdesc=drowdesc; - datarowtypes=drowtypes; + Thread.MutexKey lock = _ddescribemux->lock(); + datarowdesc = drowdesc; + datarowtypes = drowtypes; _ddescribe->broadcast(); - lock=0; + lock = 0; } final void _preparebind(array dtoid) { - array(string|int) paramValues=_params?_params[2]:({}); - if(sizeof(dtoid)!=sizeof(paramValues)) + array(string|int) paramValues =_params ? _params[2] : emptyarray; + if (sizeof(dtoid) != sizeof(paramValues)) SUSERERROR("Invalid number of bindings, expected %d, got %d\n", - sizeof(dtoid),sizeof(paramValues)); - Thread.MutexKey lock=_ddescribemux->lock(); - if(!_portalname) { - _portalname=(_unnamedportalkey=pgsqlsess._unnamedportalmux->trylock(1)) + sizeof(dtoid), sizeof(paramValues)); + Thread.MutexKey lock = _ddescribemux->lock(); + if (!_portalname) { + _portalname + = (_unnamedportalkey = pgsqlsess.unnamedportalmux->trylock(1)) ? "" : PORTALPREFIX #ifdef PG_DEBUG - +(string)(c->socket->query_fd())+"_" + + (string)(c->socket->query_fd()) + "_" #endif - +int2hex(pgsqlsess._pportalcount++); - lock=0; + + String.int2hex(pgsqlsess.pportalcount++); + lock = 0; #ifdef PG_DEBUGMORE - PD("ParamValues to bind: %O\n",paramValues); + PD("ParamValues to bind: %O\n", paramValues); #endif - Stdio.Buffer plugbuffer=Stdio.Buffer(); - { array dta=({sizeof(dtoid)}); - plugbuffer->add(_portalname,0,_preparedname,0) - ->add_ints(dta+map(dtoid,oidformat)+dta,2); + Stdio.Buffer plugbuffer = Stdio.Buffer(); + { array dta = ({sizeof(dtoid)}); + plugbuffer->add(_portalname, 0, _preparedname, 0) + ->add_ints(dta + map(dtoid, oidformat) + dta, 2); } - string cenc=pgsqlsess._runtimeparameter[CLIENT_ENCODING]; - foreach(paramValues;int i;mixed value) { - if(undefinedp(value) || objectp(value)&&value->is_val_null) + string cenc = pgsqlsess.runtimeparameter[CLIENT_ENCODING]; + foreach (paramValues; int i; mixed value) { + if (undefinedp(value) || objectp(value) && value->is_val_null) plugbuffer->add_int32(-1); // NULL - else if(stringp(value) && !sizeof(value)) { - int k=0; - switch(dtoid[i]) { + else if (stringp(value) && !sizeof(value)) { + int k = 0; + switch (dtoid[i]) { default: - k=-1; // cast empty strings to NULL for non-string types + k = -1; // cast empty strings to NULL for non-string types case BYTEAOID: case TEXTOID: case XMLOID: @@ -870,107 +920,107 @@ class sql_result { } plugbuffer->add_int32(k); } else - switch(dtoid[i]) { + switch (dtoid[i]) { case TEXTOID: case BPCHAROID: case VARCHAROID: { - if(!value) { + if (!value) { plugbuffer->add_int32(-1); break; } - value=(string)value; - switch(cenc) { + value = (string)value; + switch (cenc) { case UTF8CHARSET: - value=string_to_utf8(value); + value = string_to_utf8(value); break; default: - if(String.width(value)>8) { + if (String.width(value)>8) { SUSERERROR("Don't know how to convert %O to %s encoding\n", - value,cenc); + value, cenc); value=""; } } - plugbuffer->add_hstring(value,4); + plugbuffer->add_hstring(value, 4); break; } default: { - if(!value) { + if (!value) { plugbuffer->add_int32(-1); break; } - if (!objectp(value) || typeof(value) != stdiobuftype) - /* - * Like Oracle and SQLite, we accept literal binary values - * from single-valued multisets. - */ - if (multisetp(value) && sizeof(value) == 1) - value = indices(value)[0]; - else { + if (!objectp(value) || typeof(value) != typeof(Stdio.Buffer())); + /* + * Like Oracle and SQLite, we accept literal binary values + * from single-valued multisets. + */ + if (multisetp(value) && sizeof(value) == 1) + value = indices(value)[0]; + else { value = (string)value; if (String.width(value) > 8) if (dtoid[i] == BYTEAOID) - /* + /* * FIXME We should throw an error here, it would - * have been correct, but for historical reasons and - * as a DWIM convenience we autoconvert to UTF8 here. + * have been correct, but for historical reasons and + * as a DWIM convenience we autoconvert to UTF8 here. */ value = string_to_utf8(value); else { SUSERERROR( - "Wide string %O not supported for type OID %d\n", - value,dtoid[i]); - value=""; + "Wide string %O not supported for type OID %d\n", + value, dtoid[i]); + value = ""; } - } - plugbuffer->add_hstring(value,4); + } + plugbuffer->add_hstring(value, 4); break; } case BOOLOID: do { int tval; - if(stringp(value)) - tval=value[0]; - else if(!intp(value)) { - value=!!value; // cast to boolean + if (stringp(value)) + tval = value[0]; + else if (!intp(value)) { + value = !!value; // cast to boolean break; } else - tval=value; - switch(tval) { + tval = value; + switch (tval) { case 'o':case 'O': catch { - tval=value[1]; - value=tval=='n'||tval=='N'; + tval = value[1]; + value = tval == 'n' || tval == 'N'; }; break; default: - value=1; + value = 1; break; case 0:case '0':case 'f':case 'F':case 'n':case 'N': - value=0; + value = 0; break; } - } while(0); + } while (0); plugbuffer->add_int32(1)->add_int8(value); break; case CHAROID: - if(intp(value)) - plugbuffer->add_hstring(value,4); + if (intp(value)) + plugbuffer->add_hstring(value, 4); else { - value=(string)value; - switch(sizeof(value)) { + value = (string)value; + switch (sizeof(value)) { default: SUSERERROR( - "\"char\" types must be 1 byte wide, got %O\n",value); + "\"char\" types must be 1 byte wide, got %O\n", value); case 0: plugbuffer->add_int32(-1); // NULL break; case 1: - plugbuffer->add_hstring(value[0],4); + plugbuffer->add_hstring(value[0], 4); } } break; case INT8OID: - plugbuffer->add_int32(8)->add_int((int)value,8); + plugbuffer->add_int32(8)->add_int((int)value, 8); break; case OIDOID: case INT4OID: @@ -981,64 +1031,64 @@ class sql_result { break; } } - if(!datarowtypes) { - if(_tprepared && dontcacheprefix->match(_query)) - m_delete(pgsqlsess->_prepareds,_query),_tprepared=0; + if (!datarowtypes) { + if (_tprepared && dontcacheprefix->match(_query)) + m_delete(pgsqlsess->prepareds, _query), _tprepared = 0; waitfordescribe(); } - if(_state>=CLOSING) - lock=_unnamedstatementkey=0; + if (_state >= CLOSING) + lock = _unnamedstatementkey = 0; else { plugbuffer->add_int16(sizeof(datarowtypes)); - if(sizeof(datarowtypes)) - plugbuffer->add_ints(map(datarowtypes,oidformat),2); - else if (syncparse < 0 && !pgsqlsess->_wasparallelisable - && !pgsqlsess->_statementsinflight->drained(1)) { - lock = pgsqlsess->_shortmux->lock(); + if (sizeof(datarowtypes)) + plugbuffer->add_ints(map(datarowtypes, oidformat), 2); + else if (syncparse < 0 && !pgsqlsess->wasparallelisable + && !pgsqlsess->statementsinflight->drained(1)) { + lock = pgsqlsess->shortmux->lock(); PD("Commit waiting for statements to finish\n"); - catch(PT(pgsqlsess->_statementsinflight->wait_till_drained(lock, 1))); + catch(PT(pgsqlsess->statementsinflight->wait_till_drained(lock, 1))); } - lock=0; - PD("Bind portal %O statement %O\n",_portalname,_preparedname); - _fetchlimit=pgsqlsess->_fetchlimit; + lock = 0; + PD("Bind portal %O statement %O\n", _portalname, _preparedname); + _fetchlimit = pgsqlsess->_fetchlimit; _bindportal(); conxsess bindbuffer = c->start(); - _unnamedstatementkey=0; + _unnamedstatementkey = 0; CHAIN(bindbuffer)->add_int8('B')->add_hstring(plugbuffer, 4, 4); - if(!_tprepared && sizeof(_preparedname)) + if (!_tprepared && sizeof(_preparedname)) closestatement(CHAIN(bindbuffer), _preparedname); _sendexecute(_fetchlimit && !(transtype != NOTRANS - || sizeof(_query)>=MINPREPARELENGTH && + || sizeof(_query) >= MINPREPARELENGTH && execfetchlimit->match(_query)) - && _fetchlimit,bindbuffer); + && _fetchlimit, bindbuffer); } } else - lock=0; + lock = 0; } final void _processrowdesc(array(mapping(string:mixed)) datarowdesc, array(int) datarowtypes) { - _setrowdesc(datarowdesc,datarowtypes); - if(_tprepared) { - _tprepared.datarowdesc=datarowdesc; - _tprepared.datarowtypes=datarowtypes; + _setrowdesc(datarowdesc, datarowtypes); + if (_tprepared) { + _tprepared.datarowdesc = datarowdesc; + _tprepared.datarowtypes = datarowtypes; } } final void _parseportal() { Thread.MutexKey lock = closemux->lock(); - _state=PARSING; - Thread.MutexKey lockc = pgsqlsess->_shortmux->lock(); - if (syncparse || syncparse < 0 && pgsqlsess->_wasparallelisable) { + _state = PARSING; + Thread.MutexKey lockc = pgsqlsess->shortmux->lock(); + if (syncparse || syncparse < 0 && pgsqlsess->wasparallelisable) { PD("Commit waiting for statements to finish\n"); - catch(PT(pgsqlsess->_statementsinflight->wait_till_drained(lockc))); + catch(PT(pgsqlsess->statementsinflight->wait_till_drained(lockc))); } - stmtifkey = pgsqlsess->_statementsinflight->acquire(); + stmtifkey = pgsqlsess->statementsinflight->acquire(); lockc = 0; - lock=0; - statuscmdcomplete=UNDEFINED; - pgsqlsess->_wasparallelisable = paralleliseprefix->match(_query); + lock = 0; + statuscmdcomplete = 0; + pgsqlsess->wasparallelisable = paralleliseprefix->match(_query); } final void _releasestatement(void|int nolock) { @@ -1054,117 +1104,118 @@ class sql_result { final void _bindportal() { Thread.MutexKey lock = closemux->lock(); - _state=BOUND; - portalsifkey = pgsqlsess->_portalsinflight->acquire(); - lock=0; + _state = BOUND; + portalsifkey = pgsqlsess->portalsinflight->acquire(); + lock = 0; } final void _purgeportal() { PD("Purge portal\n"); datarows->write(1); // Signal EOF - Thread.MutexKey lock=closemux->lock(); - _fetchlimit=0; // disables further Executes - switch(_state) { + Thread.MutexKey lock = closemux->lock(); + _fetchlimit = 0; // disables further Executes + switch (_state) { case COPYINPROGRESS: case COMMITTED: case BOUND: portalsifkey = 0; } - switch(_state) { + switch (_state) { case BOUND: case PARSING: stmtifkey = 0; } _state = PURGED; - lock=0; + lock = 0; releaseconditions(); } final int _closeportal(conxsess cs) { - object plugbuffer = CHAIN(cs); - int retval=KEEP; - PD("%O Try Closeportal %d\n",_portalname,_state); - Thread.MutexKey lock=closemux->lock(); - _fetchlimit=0; // disables further Executes - switch(_state) { + void|bufcon|conxsess plugbuffer = CHAIN(cs); + int retval = KEEP; + PD("%O Try Closeportal %d\n", _portalname, _state); + Thread.MutexKey lock = closemux->lock(); + _fetchlimit = 0; // disables further Executes + switch (_state) { case PARSING: case BOUND: _releasestatement(1); } - switch(_state) { + switch (_state) { case PORTALINIT: case PARSING: - _unnamedstatementkey=0; - _state=CLOSING; + _unnamedstatementkey = 0; + _state = CLOSING; break; case COPYINPROGRESS: PD("CopyDone\n"); plugbuffer->add("c\0\0\0\4"); case COMMITTED: case BOUND: - _state=CLOSING; - lock=0; - PD("Close portal %O\n",_portalname); + _state = CLOSING; + lock = 0; + PD("Close portal %O\n", _portalname); if (_portalname && sizeof(_portalname)) { - plugbuffer->add_int8('C')->add_hstring(({'P',_portalname,0}),4,4); - retval=FLUSHSEND; + plugbuffer->add_int8('C') + ->add_hstring(({'P', _portalname, 0}), 4, 4); + retval = FLUSHSEND; } else - _unnamedportalkey=0; + _unnamedportalkey = 0; portalsifkey = 0; - if (pgsqlsess->_portalsinflight->drained()) { + if (pgsqlsess->portalsinflight->drained()) { if (plugbuffer->stashcount->drained() && transtype != TRANSBEGIN) /* * stashcount will be non-zero if a parse request has been queued * before the close was initiated. * It's a bit of a tricky race, but this check should be sufficient. */ - pgsqlsess->_readyforquerycount++, retval=SYNCSEND; - pgsqlsess->_pportalcount=0; + pgsqlsess->readyforquerycount++, retval = SYNCSEND; + pgsqlsess->pportalcount = 0; } } - lock=0; + lock = 0; return retval; } - final void _processdataready(array datarow,void|int msglen) { - bytesreceived+=msglen; + final void _processdataready(array datarow, void|int msglen) { + bytesreceived += msglen; inflight--; - if(_state<CLOSED) + if (_state<CLOSED) datarows->write(datarow); - if(++rowsreceived==1) - PD("<%O _fetchlimit %d=min(%d||1,%d), inflight %d\n",_portalname, - _fetchlimit,(portalbuffersize>>1)*rowsreceived/bytesreceived, - pgsqlsess._fetchlimit,inflight); - if(_fetchlimit) { - _fetchlimit= - min((portalbuffersize>>1)*rowsreceived/bytesreceived||1, + if (++rowsreceived == 1) + PD("<%O _fetchlimit %d=min(%d||1,%d), inflight %d\n", _portalname, + _fetchlimit, (portalbuffersize >> 1) * rowsreceived / bytesreceived, + pgsqlsess._fetchlimit, inflight); + if (_fetchlimit) { + _fetchlimit = + min((portalbuffersize >> 1) * rowsreceived / bytesreceived || 1, pgsqlsess._fetchlimit); - Thread.MutexKey lock=closemux->lock(); - if(_fetchlimit && inflight<=(_fetchlimit-1)>>1) + Thread.MutexKey lock = closemux->lock(); + if (_fetchlimit && inflight <= (_fetchlimit - 1) >> 1) _sendexecute(_fetchlimit); - else if(!_fetchlimit) + else if (!_fetchlimit) PD("<%O _fetchlimit %d, inflight %d, skip execute\n", - _portalname,_fetchlimit,inflight); - lock=0; + _portalname, _fetchlimit, inflight); + lock = 0; } } private void releaseconditions() { - _unnamedportalkey=_unnamedstatementkey=0; - pgsqlsess=0; - if(!datarowtypes) { - Thread.MutexKey lock=_ddescribemux->lock(); - datarowdesc = datarowtypes = emptyarray; + _unnamedportalkey = _unnamedstatementkey = 0; + if (!datarowtypes) { + if (_state != PURGED && !delayederror) + delayederror = LOSTERROR; + datarowtypes = emptyarray; _ddescribe->broadcast(); - lock=0; } + pgsqlsess = 0; } final void _releasesession(void|string statusccomplete) { - c->closecallbacks-=(<destroy>); - if(statusccomplete && !statuscmdcomplete) - statuscmdcomplete=statusccomplete; - inflight=0; + c->runningportals[this] = 0; + if (statusccomplete && !statuscmdcomplete) + statuscmdcomplete = statusccomplete; + inflight = 0; conxsess plugbuffer; if (!catch(plugbuffer = c->start())) plugbuffer->sendcmd(_closeportal(plugbuffer)); @@ -1180,22 +1231,22 @@ class sql_result { }; } - final void _sendexecute(int fetchlimit,void|bufcon|conxsess plugbuffer) { + final void _sendexecute(int fetchlimit, void|bufcon|conxsess plugbuffer) { int flushmode; PD("Execute portal %O fetchlimit %d transtype %d\n", _portalname, fetchlimit, transtype); - if(!plugbuffer) - plugbuffer=c->start(1); - CHAIN(plugbuffer)->add_int8('E')->add_hstring(({_portalname,0}), 4, 8) + if (!plugbuffer) + plugbuffer = c->start(1); + CHAIN(plugbuffer)->add_int8('E')->add_hstring(({_portalname, 0}), 4, 8) ->add_int32(fetchlimit); if (!fetchlimit) { if (transtype != NOTRANS) - pgsqlsess._intransaction = transtype == TRANSBEGIN; + pgsqlsess.intransaction = transtype == TRANSBEGIN; flushmode = _closeportal(plugbuffer) == SYNCSEND || transtype == TRANSEND ? SYNCSEND : FLUSHSEND; } else - inflight+=fetchlimit, flushmode=FLUSHSEND; - plugbuffer->sendcmd(flushmode,this); + inflight += fetchlimit, flushmode = FLUSHSEND; + plugbuffer->sendcmd(flushmode, this); } //! @returns @@ -1208,18 +1259,18 @@ class sql_result { //! @[eof()], @[send_row()] /*semi*/final array(mixed) fetch_row() { int|array datarow; - if(arrayp(datarow=datarows->try_read())) + if (arrayp(datarow = datarows->try_read())) return datarow; - if(!eoffound) { - if(!datarow) { - PD("%O Block for datarow\n",_portalname); - array cid=callout(gottimeout,timeout); - PT(datarow=datarows->read()); + if (!eoffound) { + if (!datarow) { + PD("%O Block for datarow\n", _portalname); + array cid = callout(gottimeout, timeout); + PT(datarow = datarows->read()); local_backend->remove_call_out(cid); - if(arrayp(datarow)) + if (arrayp(datarow)) return datarow; } - eoffound=1; + eoffound = 1; datarows->write(1); // Signal EOF for other threads } trydelayederror(); @@ -1235,20 +1286,20 @@ class sql_result { //! @seealso //! @[eof()], @[fetch_row()] /*semi*/final array(array(mixed)) fetch_row_array() { - if(eoffound) + if (eoffound) return 0; - array(array|int) datarow=datarows->try_read_array(); - if(!datarow) { - array cid=callout(gottimeout,timeout); - PT(datarow=datarows->read_array()); + array(array|int) datarow = datarows->try_read_array(); + if (!datarow) { + array cid = callout(gottimeout, timeout); + PT(datarow = datarows->read_array()); local_backend->remove_call_out(cid); } - if(arrayp(datarow[-1])) + if (arrayp(datarow[-1])) return datarow; trydelayederror(); - eoffound=1; + eoffound = 1; datarows->write(1); // Signal EOF for other threads - return (datarow=datarow[..<1]); + return (datarow = datarow[..<1]); } //! @param copydata @@ -1265,9 +1316,9 @@ class sql_result { //! @[fetch_row()], @[eof()] /*semi*/final void send_row(void|string|array(string) copydata) { trydelayederror(); - if(copydata) { + if (copydata) { PD("CopyData\n"); - object cs = c->start(); + void|bufcon|conxsess cs = c->start(); CHAIN(cs)->add_int8('d')->add_hstring(copydata, 4, 4); cs->sendcmd(SENDOUT); } else @@ -1278,16 +1329,16 @@ class sql_result { function(sql_result, array(mixed), mixed ...:void) callback, array(mixed) args) { int|array datarow; - for(;;) { - array cid=callout(gottimeout,timeout); - PT(datarow=datarows->read()); + for (;;) { + array cid = callout(gottimeout, timeout); + PT(datarow = datarows->read()); local_backend->remove_call_out(cid); - if(!arrayp(datarow)) + if (!arrayp(datarow)) break; callout(callback, 0, this, datarow, @args); } trydelayederror(); - eoffound=1; + eoffound = 1; callout(callback, 0, this, 0, @args); } @@ -1300,26 +1351,26 @@ class sql_result { /*semi*/final void set_result_callback( function(sql_result, array(mixed), mixed ...:void) callback, mixed ... args) { - if(callback) - Thread.Thread(run_result_cb,callback,args); + if (callback) + Thread.Thread(run_result_cb, callback, args); } private void run_result_array_cb( function(sql_result, array(array(mixed)), mixed ...:void) callback, array(mixed) args) { array(array|int) datarow; - for(;;) { - array cid=callout(gottimeout,timeout); - PT(datarow=datarows->read_array()); + for (;;) { + array cid = callout(gottimeout, timeout); + PT(datarow = datarows->read_array()); local_backend->remove_call_out(cid); - if(!datarow || !arrayp(datarow[-1])) + if (!datarow || !arrayp(datarow[-1])) break; callout(callback, 0, this, datarow, @args); } trydelayederror(); - eoffound=1; - if(sizeof(datarow)>1) - callout(callback, 0, this, datarow=datarow[..<1], @args); + eoffound = 1; + if (sizeof(datarow)>1) + callout(callback, 0, this, datarow = datarow[..<1], @args); callout(callback, 0, this, 0, @args); } @@ -1332,8 +1383,957 @@ class sql_result { /*semi*/final void set_result_array_callback( function(sql_result, array(array(mixed)), mixed ...:void) callback, mixed ... args) { - if(callback) - Thread.Thread(run_result_array_cb,callback,args); + if (callback) + Thread.Thread(run_result_array_cb, callback, args); + } + +}; + +class proxy { + final int _fetchlimit = FETCHLIMIT; + final Thread.Mutex unnamedportalmux; + final Thread.Mutex unnamedstatement; + private Thread.MutexKey termlock; + private Thread.ResourceCountKey backendreg; + final Thread.ResourceCount portalsinflight, statementsinflight; + final int(0..1) wasparallelisable; + final int(0..1) intransaction; + + final conxion c; + private string cancelsecret; + private int backendpid; + final int(-128..127) backendstatus; + final mapping(string:mixed) options; + private array(string) lastmessage = emptyarray; + final int(0..1) clearmessage; + final int(0..1) untolderror; + private mapping(string:array(mixed)) notifylist = ([]); + final mapping(string:string) runtimeparameter; + final mapping(string:mapping(string:mixed)) prepareds = ([]); + final int pportalcount; + final int totalhits; + final int msgsreceived; // Number of protocol messages received + final int bytesreceived; // Number of bytes received + final int warningsdropcount; // Number of uncollected warnings + private int warningscollected; + final int(0..1) invalidatecache; + private Thread.Queue qportals; + final mixed delayederror; + private function (:void) readyforquery_cb; + + final string host; + final int(0..65535) port; + private string database, user, pass; + private Crypto.SCRAM SASLcontext; + final Thread.Condition waitforauthready; + final Thread.Mutex shortmux; + final int readyforquerycount; + + private string _sprintf(int type) { + string res; + switch (type) { + case 'O': + res = sprintf(DRIVERNAME".proxy(%s@%s:%d/%s,%d,%d)", + user, host, port, database, c?->socket && c->socket->query_fd(), + backendpid); + break; + } + return res; + } + + private void create(void|string host, void|string database, + void|string user, void|string pass, + void|mapping(string:mixed) options) { + if (this::pass = pass) { + String.secure(pass); + pass = "CENSORED"; + } + this::user = user; + this::database = database; + this::options = options; + + if (!host) host = PGSQL_DEFAULT_HOST; + if (has_value(host,":") && sscanf(host,"%s:%d", host, port) != 2) + error("Error in parsing the hostname argument\n"); + this::host = host; + + if (!port) + port = PGSQL_DEFAULT_PORT; + backendreg = register_backend(); + shortmux = Thread.Mutex(); + PD("Connect\n"); + waitforauthready = Thread.Condition(); + qportals = Thread.Queue(); + readyforquerycount = 1; + qportals->write(1); + if (!(c = conxion(this, qportals, 0))) + error("Couldn't connect to database on %s:%d\n", host, port); + runtimeparameter = ([]); + unnamedportalmux = Thread.Mutex(); + unnamedstatement = Thread.Mutex(); + readyforquery_cb = connect_cb; + portalsinflight = Thread.ResourceCount(); + statementsinflight = Thread.ResourceCount(); + wasparallelisable = 0; } + final int is_open() { + catch { + return c->socket->is_open(); + }; + return 0; + } + + final string geterror(void|int clear) { + throwdelayederror(this); + untolderror = 0; + string s = lastmessage * "\n"; + if (clear) + lastmessage = emptyarray; + warningscollected = 0; + return sizeof(s) && s; + } + + final string host_info() { + return sprintf("fd:%d TCP/IP %s:%d PID %d", + c ? c->socket->query_fd() : -1, host, port, backendpid); + } + + final void cancelquery() { + if (cancelsecret > "") { + PD("CancelRequest\n"); + conxion lcon = conxion(this, 0, 2); +#ifdef PG_DEBUG + mixed err = +#endif + catch(lcon->add_int32(16)->add_int32(PG_PROTOCOL(1234, 5678)) + ->add_int32(backendpid)->add(cancelsecret)->sendcmd(FLUSHSEND)); +#ifdef PG_DEBUG + if (err) + PD("CancelRequest failed to connect %O\n", describe_backtrace(err)); +#endif + destruct(lcon); // Destruct explicitly to avoid delayed close +#ifdef PG_DEBUGMORE + PD("Closetrace %O\n", backtrace()); +#endif + } else + error("Connection not established, cannot cancel any query\n"); + } + + private string a2nls(array(string) msg) { + return msg * "\n" + "\n"; + } + + private string pinpointerror(void|string query, void|string offset) { + if (!query) + return ""; + int k = (int)offset; + if (k <= 0) + return MARKSTART + query + MARKEND; + return MARKSTART + (k > 1 ? query[..k-2] : "") + + MARKERROR + query[k - 1..] + MARKEND; + } + + private void connect_cb() { + PD("%O\n", runtimeparameter); + } + + private array(string) showbindings(sql_result portal) { + array(string) msgs = emptyarray; + array from; + if (portal && (from = portal._params)) { + array to, paramValues; + [from, to, paramValues] = from; + if (sizeof(paramValues)) { + string val; + int i; + string fmt = sprintf("%%%ds %%3s %%.61s", max(@map(from, sizeof))); + foreach (paramValues; i; val) + msgs += ({sprintf(fmt, from[i], to[i], sprintf("%O", val))}); + } + } + return msgs; + } + + private void preplastmessage(mapping(string:string) msgresponse) { + lastmessage = ({ + sprintf("%s %s:%s %s\n (%s:%s:%s)", + msgresponse.S, msgresponse.C, msgresponse.P || "", + msgresponse.M, msgresponse.F || "", msgresponse.R || "", + msgresponse.L||"")}); + } + + private int|sql_result portal; // state information procmessage + #ifdef PG_DEBUG + private string datarowdebug; + private int datarowdebugcount; + #endif + + final void processloop(conxion ci) { + (c = ci)->socket->set_id(procmessage); + cancelsecret = 0; + portal = 0; + { + Stdio.Buffer plugbuffer = Stdio.Buffer()->add_int32(PG_PROTOCOL(3, 0)); + if (user) + plugbuffer->add("user\0", user, 0); + if (database) + plugbuffer->add("database\0", database, 0); + foreach (options - censoroptions; string name; mixed value) + plugbuffer->add(name, 0, (string)value, 0); + plugbuffer->add_int8(0); + PD("%O\n", (string)plugbuffer); + void|bufcon|conxsess cs; + if (catch(cs = ci->start())) { + destruct(waitforauthready); + unnamedstatement = 0; + termlock = 0; + return; + } else { + CHAIN(cs)->add_hstring(plugbuffer, 4, 4); + cs->sendcmd(SENDOUT); + } + } // Do not flush at this point, PostgreSQL 9.4 disapproves + procmessage(); + } + + private void procmessage() { + mixed err; + int terminating = 0; + err = catch { + conxion ci = c; // cache value FIXME sensible? + conxiin cr = ci->i; // cache value FIXME sensible? + #ifdef PG_DEBUG + PD("Processloop\n"); + + #ifdef PG_DEBUGMORE + void showportalstack(string label) { + PD(sprintf(">>>>>>>>>>> Portalstack %s: %O\n", label, portal)); + foreach (qportals->peek_array(); ; int|sql_result qp) + PD(" =========== Portal: %O\n", qp); + PD("<<<<<<<<<<<<<< Portalstack end\n"); + }; + #endif + void showportal(int msgtype) { + if (objectp(portal)) + PD("%d<%O %d %c switch portal\n", + ci->socket->query_fd(), portal._portalname, ++ci->queueinidx, msgtype); + else if (portal>0) + PD("%d<Sync %d %d %c portal\n", + ci->socket->query_fd(), ++ci->queueinidx, portal, msgtype); + }; + #endif + int msgisfatal(mapping(string:string) msgresponse) { + if (!terminating) // Run the callback once per lost connection + runcallback(backendpid,"_lost",""); + return (has_prefix(msgresponse.C, "53") + || has_prefix(msgresponse.C, "3D") + || has_prefix(msgresponse.C, "57P")) && MAGICTERMINATE; + }; + for (;;) { + err = catch { + #ifdef PG_DEBUG + if (!portal && datarowdebug) { + PD("%s rows %d\n", datarowdebug, datarowdebugcount); + datarowdebug = 0; datarowdebugcount = 0; + } + #endif + #ifdef PG_DEBUGMORE + showportalstack("LOOPTOP"); + #endif + if (!sizeof(cr)) { // Preliminary check, fast path + Thread.MutexKey lock = cr->fillreadmux->lock(); + if (!sizeof(cr)) { // Check for real + if (!cr->fillread) { + lock = 0; + throw(MAGICTERMINATE); // Force proper termination + } + cr->procmsg = 1; + lock = 0; + return; // Terminate thread, wait for callback + } + lock = 0; + } + int msgtype = cr->read_int8(); + if (!portal) { + portal = qportals->try_read(); + #ifdef PG_DEBUG + showportal(msgtype); + #endif + } + int msglen = cr->read_int32(); + msgsreceived++; + bytesreceived += 1 + msglen; + int errtype = NOERROR; + PD("%d<", ci->socket->query_fd()); + switch (msgtype) { + array getcols() { + int bintext = cr->read_int8(); + int cols = cr->read_int16(); + #ifdef PG_DEBUG + array a; + msglen -= 4 + 1 + 2 + 2 * cols; + foreach (a = allocate(cols, ([])); ; mapping m) + m.type = cr->read_int16(); + #else + cr->consume(cols << 1); + #endif // Discard column info, and make it line oriented + return ({ ({(["name":"line"])}), ({bintext?BYTEAOID:TEXTOID}) }); + }; + array(string) reads() { + #ifdef PG_DEBUG + if (msglen < 1) + errtype = PROTOCOLERROR; + #endif + array ret = emptyarray, aw = ({0}); + do { + string w = cr->read_cstring(); + msglen -= sizeof(w) + 1; aw[0] = w; ret += aw; + } while (msglen); + return ret; + }; + mapping(string:string) getresponse() { + mapping(string:string) msgresponse = ([]); + msglen -= 4; + foreach (reads(); ; string f) + if (sizeof(f)) + msgresponse[f[..0]] = f[1..]; + PD("%O\n", msgresponse); + return msgresponse; + }; + case 'R': { + void authresponse(string|array msg) { + void|bufcon|conxsess cs = ci->start(); + CHAIN(cs)->add_int8('p')->add_hstring(msg, 4, 4); + cs->sendcmd(SENDOUT); // No flushing, PostgreSQL 9.4 disapproves + }; + PD("Authentication "); + msglen -= 4 + 4; + int authtype, k; + switch (authtype = cr->read_int32()) { + case 0: + PD("Ok\n"); + if (SASLcontext) { + PD("Authentication validation still in progress\n"); + errtype = PROTOCOLUNSUPPORTED; + } else + cancelsecret = ""; + break; + case 2: + PD("KerberosV5\n"); + errtype = PROTOCOLUNSUPPORTED; + break; + case 3: + PD("ClearTextPassword\n"); + authresponse(({pass, 0})); + break; + case 4: + PD("CryptPassword\n"); + errtype = PROTOCOLUNSUPPORTED; + break; + case 5: + PD("MD5Password\n"); + #ifdef PG_DEBUG + if (msglen < 4) + errtype = PROTOCOLERROR; + #endif + #define md5hex(x) String.string2hex(Crypto.MD5.hash(x)) + authresponse(({"md5", + md5hex(md5hex(pass + user) + cr->read(msglen)), 0})); + #ifdef PG_DEBUG + msglen = 0; + #endif + break; + case 6: + PD("SCMCredential\n"); + errtype = PROTOCOLUNSUPPORTED; + break; + case 7: + PD("GSS\n"); + errtype = PROTOCOLUNSUPPORTED; + break; + case 9: + PD("SSPI\n"); + errtype = PROTOCOLUNSUPPORTED; + break; + case 8: + PD("GSSContinue\n"); + errtype = PROTOCOLUNSUPPORTED; + cr->read(msglen); // SSauthdata + #ifdef PG_DEBUG + if (msglen<1) + errtype = PROTOCOLERROR; + msglen = 0; + #endif + break; + case 10: { + string word; + PD("AuthenticationSASL\n"); + k = 0; + while (sizeof(word = cr->read_cstring())) { + switch (word) { + case "SCRAM-SHA-256": + k = 1; + } + #ifdef PG_DEBUG + msglen -= sizeof(word) + 1; + if (msglen < 1) + break; + #endif + } + if (k) { + SASLcontext = Crypto.SCRAM(Crypto.SHA256); + word = SASLcontext.client_1(); + authresponse(({ + "SCRAM-SHA-256", 0, sprintf("%4c", sizeof(word)), word + })); + } else + errtype = PROTOCOLUNSUPPORTED; + #ifdef PG_DEBUG + if (msglen != 1) + errtype = PROTOCOLERROR; + msglen = 0; + #endif + break; + } + case 11: { + PD("AuthenticationSASLContinue\n"); + string response; + if (response + = SASLcontext.client_2(cr->read(msglen), pass)) + authresponse(response); + else + errtype = PROTOCOLERROR; + #ifdef PG_DEBUG + msglen = 0; + #endif + break; + } + case 12: + PD("AuthenticationSASLFinal\n"); + if (SASLcontext.client_3(cr->read(msglen))) + SASLcontext = 0; // Clears context and approves server + else + errtype = PROTOCOLERROR; + #ifdef PG_DEBUG + msglen = 0; + #endif + break; + default: + PD("Unknown Authentication Method %c\n", authtype); + errtype = PROTOCOLUNSUPPORTED; + break; + } + switch (errtype) { + default: + case PROTOCOLUNSUPPORTED: + error("Unsupported authenticationmethod %c\n", authtype); + case NOERROR: + break; + } + break; + } + case 'K': + msglen -= 4 + 4; backendpid = cr->read_int32(); + cancelsecret = cr->read(msglen); + #ifdef PG_DEBUG + PD("BackendKeyData %O\n", cancelsecret); + msglen = 0; + #endif + break; + case 'S': { + PD("ParameterStatus "); + msglen -= 4; + array(string) ts = reads(); + #ifdef PG_DEBUG + if (sizeof(ts) == 2) { + #endif + runtimeparameter[ts[0]] = ts[1]; + #ifdef PG_DEBUG + PD("%O=%O\n", ts[0], ts[1]); + } else + errtype = PROTOCOLERROR; + #endif + break; + } + case '3': + #ifdef PG_DEBUG + PD("CloseComplete\n"); + msglen -= 4; + #endif + break; + case 'Z': { + backendstatus = cr->read_int8(); + #ifdef PG_DEBUG + msglen -= 4 + 1; + PD("ReadyForQuery %c\n", backendstatus); + #endif + #ifdef PG_DEBUGMORE + showportalstack("READYFORQUERY"); + #endif + int keeplooking; + do + for (keeplooking = 0; + objectp(portal); + portal = qportals->read()) { + #ifdef PG_DEBUG + showportal(msgtype); + #endif + if (backendstatus == 'I' && intransaction + && portal->transtype != TRANSEND) + keeplooking = 1; + portal->_purgeportal(); + } + while (keeplooking && (portal = qportals->read())); + if (backendstatus == 'I') + intransaction = 0; + foreach (qportals->peek_array(); ; sql_result qp) { + if (objectp(qp) && qp._synctransact + && qp._synctransact <= portal) { + PD("Checking portal %O %d<=%d\n", + qp._portalname, qp._synctransact, portal); + qp->_purgeportal(); + } + } + portal = 0; + #ifdef PG_DEBUGMORE + showportalstack("AFTER READYFORQUERY"); + #endif + readyforquerycount--; + if (readyforquery_cb) + readyforquery_cb(), readyforquery_cb = 0; + destruct(waitforauthready); + break; + } + case '1': + #ifdef PG_DEBUG + PD("ParseComplete portal %O\n", portal); + msglen -= 4; + #endif + break; + case 't': { + array a; + #ifdef PG_DEBUG + int cols = cr->read_int16(); + PD("%O ParameterDescription %d values\n", portal._query, cols); + msglen -= 4 + 2 + 4 * cols; + a = cr->read_ints(cols, 4); + #else + a = cr->read_ints(cr->read_int16(), 4); + #endif + #ifdef PG_DEBUGMORE + PD("%O\n", a); + #endif + if (portal._tprepared) + portal._tprepared.datatypeoid = a; + Thread.Thread(portal->_preparebind, a); + break; + } + case 'T': { + array a, at; + int cols = cr->read_int16(); + #ifdef PG_DEBUG + PD("RowDescription %d columns %O\n", cols, portal._query); + msglen -= 4 + 2; + #endif + at = allocate(cols); + foreach (a = allocate(cols); int i; ) { + string s = cr->read_cstring(); + mapping(string:mixed) res = (["name":s]); + #ifdef PG_DEBUG + msglen -= sizeof(s) + 1 + 4 + 2 + 4 + 2 + 4 + 2; + res.tableoid = cr->read_int32() || UNDEFINED; + res.tablecolattr = cr->read_int16() || UNDEFINED; + #else + cr->consume(6); + #endif + at[i] = cr->read_int32(); + #ifdef PG_DEBUG + res.type = at[i]; + { + int len = cr->read_sint(2); + res.length = len >= 0 ? len : "variable"; + } + res.atttypmod = cr->read_int32(); + /* formatcode contains just a zero when Bind has not been issued + * yet, but the content is irrelevant because it's determined + * at query time + */ + res.formatcode = cr->read_int16(); + #else + cr->consume(8); + #endif + a[i] = res; + } + #ifdef PG_DEBUGMORE + PD("%O\n", a); + #endif + if (portal._forcetext) + portal->_setrowdesc(a, at); // Do not consume queued portal + else { + portal->_processrowdesc(a, at); + portal = 0; + } + break; + } + case 'n': { + #ifdef PG_DEBUG + msglen -= 4; + PD("NoData %O\n", portal._query); + #endif + portal._fetchlimit = 0; // disables subsequent Executes + portal->_processrowdesc(emptyarray, emptyarray); + portal = 0; + break; + } + case 'H': + portal->_processrowdesc(@getcols()); + PD("CopyOutResponse %O\n", portal. _query); + break; + case '2': { + mapping tp; + #ifdef PG_DEBUG + msglen -= 4; + PD("%O BindComplete\n", portal._portalname); + #endif + if (tp = portal._tprepared) { + int tend = gethrtime(); + int tstart = tp.trun; + if (tend == tstart) + m_delete(prepareds, portal._query); + else { + tp.hits++; + totalhits++; + if (!tp.preparedname) { + if (sizeof(portal._preparedname)) { + PD("Finalising stored statement %s\n", + portal._preparedname); + tp.preparedname = portal._preparedname; + } + tstart = tend - tstart; + if (!tp.tparse || tp.tparse>tstart) + tp.tparse = tstart; + } + tp.trunstart = tend; + } + } + break; + } + case 'D': + msglen -= 4; + #ifdef PG_DEBUG + #ifdef PG_DEBUGMORE + PD("%O DataRow %d bytes\n", portal._portalname, msglen); + #endif + datarowdebugcount++; + if (!datarowdebug) + datarowdebug = sprintf( + "%O DataRow %d bytes", portal._portalname, msglen); + #endif + #ifdef PG_DEBUG + msglen= + #endif + portal->_decodedata(msglen, runtimeparameter[CLIENT_ENCODING]); + break; + case 's': + #ifdef PG_DEBUG + PD("%O PortalSuspended\n", portal._portalname); + msglen -= 4; + #endif + portal = 0; + break; + case 'C': { + msglen -= 4; + #ifdef PG_DEBUG + if (msglen < 1) + errtype = PROTOCOLERROR; + #endif + string s = cr->read(msglen - 1); + portal->_storetiming(); + PD("%O CommandComplete %O\n", portal._portalname, s); + #ifdef PG_DEBUG + if (cr->read_int8()) + errtype = PROTOCOLERROR; + msglen = 0; + #else + cr->consume(1); + #endif + #ifdef PG_DEBUGMORE + showportalstack("COMMANDCOMPLETE"); + #endif + portal->_releasesession(s); + portal = 0; + break; + } + case 'I': + #ifdef PG_DEBUG + PD("EmptyQueryResponse %O\n", portal._portalname); + msglen -= 4; + #endif + #ifdef PG_DEBUGMORE + showportalstack("EMPTYQUERYRESPONSE"); + #endif + portal->_releasesession(); + portal = 0; + break; + case 'd': + PD("%O CopyData\n", portal._portalname); + portal->_storetiming(); + msglen -= 4; + #ifdef PG_DEBUG + if (msglen < 0) + errtype = PROTOCOLERROR; + #endif + portal->_processdataready(({cr->read(msglen)}), msglen); + #ifdef PG_DEBUG + msglen = 0; + #endif + break; + case 'G': + portal->_releasestatement(); + portal->_setrowdesc(@getcols()); + PD("%O CopyInResponse\n", portal._portalname); + portal._state = COPYINPROGRESS; + break; + case 'c': + #ifdef PG_DEBUG + PD("%O CopyDone\n", portal._portalname); + msglen -= 4; + #endif + portal = 0; + break; + case 'E': { + #ifdef PG_DEBUGMORE + showportalstack("ERRORRESPONSE"); + #endif + if (portalsinflight->drained() && !readyforquerycount) + sendsync(); + PD("%O ErrorResponse %O\n", + objectp(portal) && (portal._portalname || portal._preparedname), + objectp(portal) && portal._query); + mapping(string:string) msgresponse; + msgresponse = getresponse(); + warningsdropcount += warningscollected; + warningscollected = 0; + untolderror = 1; + switch (msgresponse.C) { + case "P0001": + lastmessage + = ({sprintf("%s: %s", msgresponse.S, msgresponse.M)}); + USERERROR(a2nls(lastmessage + +({pinpointerror(portal._query, msgresponse.P)}) + +showbindings(portal))); + case "53000":case "53100":case "53200":case "53300":case "53400": + case "57P01":case "57P02":case "57P03":case "57P04":case "3D000": + preplastmessage(msgresponse); + PD(a2nls(lastmessage)); throw(msgisfatal(msgresponse)); + case "08P01":case "42P05": + errtype = PROTOCOLERROR; + case "XX000":case "42883":case "42P01": + invalidatecache = 1; + default: + preplastmessage(msgresponse); + if (msgresponse.D) + lastmessage += ({msgresponse.D}); + if (msgresponse.H) + lastmessage += ({msgresponse.H}); + lastmessage += ({ + pinpointerror(objectp(portal) && portal._query, msgresponse.P) + + pinpointerror(msgresponse.q, msgresponse.p)}); + if (msgresponse.W) + lastmessage += ({msgresponse.W}); + if (objectp(portal)) + lastmessage += showbindings(portal); + switch (msgresponse.S) { + case "PANIC":werror(a2nls(lastmessage)); + } + case "25P02": // Preserve last error message + USERERROR(a2nls(lastmessage)); // Implicitly closed portal + } + break; + } + case 'N': { + PD("NoticeResponse\n"); + mapping(string:string) msgresponse = getresponse(); + if (clearmessage) { + warningsdropcount += warningscollected; + clearmessage = warningscollected = 0; + lastmessage = emptyarray; + } + warningscollected++; + lastmessage = ({sprintf("%s %s: %s", + msgresponse.S, msgresponse.C, msgresponse.M)}); + int val; + if (val = msgisfatal(msgresponse)) { // Some warnings are fatal + preplastmessage(msgresponse); + PD(a2nls(lastmessage)); throw(val); + } + break; + } + case 'A': { + PD("NotificationResponse\n"); + msglen -= 4 + 4; + int pid = cr->read_int32(); + string condition, extrainfo; + { + array(string) ts = reads(); + switch (sizeof(ts)) { + #if PG_DEBUG + case 0: + errtype = PROTOCOLERROR; + break; + default: + errtype = PROTOCOLERROR; + #endif + case 2: + extrainfo = ts[1]; + case 1: + condition = ts[0]; + } + } + PD("%d %s\n%s\n", pid, condition, extrainfo); + runcallback(pid, condition, extrainfo); + break; + } + default: + if (msgtype != -1) { + string s; + PD("Unknown message received %c\n", msgtype); + s = cr->read(msglen -= 4); PD("%O\n", s); + #ifdef PG_DEBUG + msglen = 0; + #endif + errtype = PROTOCOLUNSUPPORTED; + } else { + lastmessage += ({ + sprintf("Connection lost to database %s@%s:%d/%s %d\n", + user, host, port, database, backendpid)}); + runcallback(backendpid, "_lost", ""); + if (!waitforauthready) + throw(0); + USERERROR(a2nls(lastmessage)); + } + break; + } + #ifdef PG_DEBUG + if (msglen) + errtype = PROTOCOLERROR; + #endif + { + string msg; + switch (errtype) { + case PROTOCOLUNSUPPORTED: + msg + = sprintf("Unsupported servermessage received %c\n", msgtype); + break; + case PROTOCOLERROR: + msg = sprintf("Protocol error with database %s", host_info()); + break; + case NOERROR: + continue; // Normal production loop + } + error(a2nls(lastmessage += ({msg}))); + } + }; // We only get here if there is an error + if (err == MAGICTERMINATE) { // Announce connection termination to server + catch { + void|bufcon|conxsess cs = ci->start(); + CHAIN(cs)->add("X\0\0\0\4"); + cs->sendcmd(SENDOUT); + }; + terminating = 1; + err = 0; + } else if (stringp(err)) { + sql_result or; + if (!objectp(or = portal)) + or = this; + if (!or.delayederror) + or.delayederror = err; + #ifdef PG_DEBUGMORE + showportalstack("THROWN"); + #endif + if (objectp(portal)) + portal->_releasesession(); + portal = 0; + if (!waitforauthready) + continue; // Only continue if authentication did not fail + } + break; + } + PD("Closing database processloop %s\n", err ? describe_backtrace(err) : ""); + delayederror = err; + if (objectp(portal)) { + #ifdef PG_DEBUG + showportal(0); + #endif + portal->_purgeportal(); + } + destruct(waitforauthready); + termlock = 0; + if (err && !stringp(err)) + throw(err); + }; + catch { + unnamedstatement = 0; + termlock = 0; + if (err) { + PD("Terminating processloop due to %s\n", describe_backtrace(err)); + delayederror = err; + } + destruct(waitforauthready); + c->purge(); + }; + } + + final void close() { + throwdelayederror(this); + Thread.MutexKey lock; + if (qportals && qportals->size()) + catch(cancelquery()); + if (unnamedstatement) + termlock = unnamedstatement->lock(1); + if (c) // Prevent trivial backtraces + c->close(); + if (unnamedstatement) + lock = unnamedstatement->lock(1); + c->purge(); + lock = 0; + destruct(waitforauthready); + } + + private void destroy() { + string errstring; + mixed err = catch(close()); + backendreg = 0; + if (untolderror) { + /* + * Flush out any asynchronously reported errors to stderr; because we are + * inside a destructor, throwing an error will not work anymore. + * Warnings will be silently discarded at this point. + */ + lastmessage = filter(lastmessage, lambda(string val) { + return has_prefix(val, "ERROR ") || has_prefix(val, "FATAL "); }); + if (err || (err = catch(errstring = geterror(1)))) + werror(describe_backtrace(err)); + else if (errstring && sizeof(errstring)) + werror("%s\n", errstring); // Add missing terminating newline + } + } + + private void sendsync() { + readyforquerycount++; + c->start()->sendcmd(SYNCSEND); + } + + private void runcallback(int pid, string condition, string extrainfo) { + array cb; + if ((cb = notifylist[condition] || notifylist[""]) + && (pid != backendpid || sizeof(cb) > 1 && cb[1])) + callout(cb[0], 0, pid, condition, extrainfo, @cb[2..]); + } + + private inline void closestatement( + bufcon|conxsess plugbuffer, string oldprep) { + closestatement(plugbuffer, oldprep); + } };