diff --git a/lib/modules/Sql.pmod/pgsql.h b/lib/modules/Sql.pmod/pgsql.h index 133f3f21665fb50041544a4768b5b5f0b7a5e5ef..62238d93df7a6a1e6322745d86acb5618608ac53 100644 --- a/lib/modules/Sql.pmod/pgsql.h +++ b/lib/modules/Sql.pmod/pgsql.h @@ -77,10 +77,13 @@ #define PD(X ...) 0 #endif -protected enum portalstate { - portalinit=0,bound,copyinprogress,closed -}; +#define PORTALINIT 0 // Portal states +#define BOUND 1 +#define COPYINPROGRESS 2 +#define CLOSED 3 -protected enum sctype { - keep=0,sendout,flushsend,flushlogsend,syncsend -}; +#define KEEP 0 // Sendcmd subcommands +#define SENDOUT 1 +#define FLUSHSEND 2 +#define FLUSHLOGSEND 3 +#define SYNCSEND 4 diff --git a/lib/modules/Sql.pmod/pgsql.pike b/lib/modules/Sql.pmod/pgsql.pike index 1a66722bce8d15b20a8975ec1bb0df06e815de56..700da729f73bc6f3f7f7339998572d54128a3b8e 100644 --- a/lib/modules/Sql.pmod/pgsql.pike +++ b/lib/modules/Sql.pmod/pgsql.pike @@ -299,7 +299,7 @@ int is_open() { //! @seealso //! @[is_open()] int ping() { - return is_open() && !catch(c->start()->sendcmd(flushsend)) + return is_open() && !catch(c->start()->sendcmd(FLUSHSEND)) ? !!reconnected : -1; } @@ -319,7 +319,7 @@ void cancelquery() { PD("CancelRequest\n"); .pgsql_util.conxion lcon=getsocket(1); lcon->add_int32(16)->add_int32(PG_PROTOCOL(1234,5678)) - ->add_int32(backendpid)->add(cancelsecret)->sendcmd(flushsend); + ->add_int32(backendpid)->add(cancelsecret)->sendcmd(FLUSHSEND); lcon->close(); #ifdef PG_DEBUGMORE PD("Closetrace %O\n",backtrace()); @@ -329,7 +329,7 @@ void cancelquery() { foreach(qportals->peek_array();;int|.pgsql_util.sql_result portal) if(objectp(portal)) portal->_closeportal(plugbuffer); - plugbuffer->sendcmd(sendout); + plugbuffer->sendcmd(SENDOUT); } } @@ -621,7 +621,7 @@ final void _processloop(.pgsql_util.conxion ci) { plugbuffer->add(name)->add_int8(0)->add((string)value)->add_int8(0); plugbuffer->add_int8(0); PD("%O\n",(string)plugbuffer); - ci->start()->add_hstring(plugbuffer,4,4)->sendcmd(sendout); + ci->start()->add_hstring(plugbuffer,4,4)->sendcmd(SENDOUT); } // Do not flush at this point, PostgreSQL 9.4 disapproves cancelsecret=0; #ifdef PG_DEBUG @@ -768,7 +768,7 @@ final void _processloop(.pgsql_util.conxion ci) { case noerror: if(cancelsecret!="") ci->start()->add_int8('p')->add_hstring(sendpass,4,5) - ->add_int8(0)->sendcmd(sendout); + ->add_int8(0)->sendcmd(SENDOUT); break; // No flushing here, PostgreSQL 9.4 disapproves default: case protocolunsupported: @@ -1091,7 +1091,7 @@ final void _processloop(.pgsql_util.conxion ci) { portal->_setrowdesc(getcols()); PD("<%O CopyInResponse %d columns\n", portal._portalname,sizeof(portal._datarowdesc)); - portal._state=copyinprogress; + portal._state=COPYINPROGRESS; { Thread.MutexKey resultlock=portal._resultmux->lock(); portal._newresult.signal(); @@ -1229,7 +1229,7 @@ final void _processloop(.pgsql_util.conxion ci) { } }; // We only get here if there is an error if(err==MAGICTERMINATE) { // Announce connection termination to server - ci->start()->add("X\0\0\0\4")->sendcmd(sendout); + ci->start()->add("X\0\0\0\4")->sendcmd(SENDOUT); terminating=1; if(!sizeof(ci)) break; @@ -1397,7 +1397,7 @@ private void resync_cb() { private void sendsync() { _readyforquerycount++; - c->start()->sendcmd(syncsend); + c->start()->sendcmd(SYNCSEND); } //! @decl void resync() @@ -1963,9 +1963,9 @@ private inline void throwdelayederror(object parent) { } if(sizeof(plugbuffer)) { PD("%O\n",(string)plugbuffer); - plugbuffer->sendcmd(flushsend); // close expireds + plugbuffer->sendcmd(FLUSHSEND); // close expireds } else - plugbuffer->sendcmd(); // close start() + plugbuffer->sendcmd(KEEP); // close start() tstart=gethrtime(); } else // sql_result autoassigns to portal tp=UNDEFINED; @@ -1984,7 +1984,7 @@ private inline void throwdelayederror(object parent) { _readyforquerycount++; Thread.MutexKey lock=unnamedstatement->lock(1); c->start(1)->add_int8('Q')->add_hstring(q,4,4+1)->add_int8(0) - ->sendcmd(flushlogsend,portal); + ->sendcmd(FLUSHLOGSEND,portal); lock=0; PD("Simple query: %O\n",q); } else { @@ -2008,10 +2008,10 @@ private inline void throwdelayederror(object parent) { if(!tp || !tp.datatypeoid) { PD("Describe statement %O\n",preparedname); (plugbuffer||c->start())->add_int8('D') - ->add_hstring(({'S',preparedname,0}),4,4)->sendcmd(flushsend,portal); + ->add_hstring(({'S',preparedname,0}),4,4)->sendcmd(FLUSHSEND,portal); } else { if(plugbuffer) - plugbuffer->sendcmd(); + plugbuffer->sendcmd(KEEP); #ifdef PG_STATS skippeddescribe++; #endif diff --git a/lib/modules/Sql.pmod/pgsql_util.pmod b/lib/modules/Sql.pmod/pgsql_util.pmod index 1766ba1b020a54f4a54257c606c612969d610b1d..f139c57288b4ecf20b38fb11c7c7a535b40b5830 100644 --- a/lib/modules/Sql.pmod/pgsql_util.pmod +++ b/lib/modules/Sql.pmod/pgsql_util.pmod @@ -102,7 +102,7 @@ final int oidformat(int oid) { return 0; // text } -private sctype mergemode(conxion realbuffer,sctype mode) { +private int mergemode(conxion realbuffer,int mode) { if(mode>realbuffer->stashflushmode) realbuffer->stashflushmode=mode; return realbuffer->stashflushmode; @@ -133,7 +133,7 @@ class bufcon { return this; } - final void sendcmd(void|sctype mode,void|sql_result portal) { + final void sendcmd(int mode,void|sql_result portal) { Thread.MutexKey lock=realbuffer->stashupdate->lock(); if(portal) realbuffer->stashqueue->write(portal); @@ -145,7 +145,7 @@ class bufcon { this->clear(); if(lock=realbuffer->nostash->trylock(1)) { realbuffer->started=lock; lock=0; - realbuffer->sendcmd(sendout); + realbuffer->sendcmd(SENDOUT); } } @@ -246,7 +246,7 @@ class conxion { final inline int read_int32() { return i::read_int32(); } final inline string read_cstring() { return i::read_cstring(); } - final void sendcmd(void|sctype mode,void|sql_result portal) { + final void sendcmd(void|int mode,void|sql_result portal) { if(portal) queueup(portal); nosync: @@ -254,16 +254,16 @@ nosync: switch(mode) { default: break nosync; - case syncsend: + case SYNCSEND: PD("%d>Sync %d %d Queue\n", socket->query_fd(),synctransact,++queueoutidx); add(PGSYNC); - mode=sendout; + mode=SENDOUT; break; - case flushlogsend: + case FLUSHLOGSEND: PD("%d>%O %d Queue simplequery %d bytes\n", socket->query_fd(),portal._portalname,++queueoutidx,sizeof(this)); - mode=flushsend; + mode=FLUSHSEND; } qportals->write(synctransact++); } while(0); @@ -275,7 +275,7 @@ nosync: queueup(portal); } mode=mergemode(this,mode); - stashflushmode=keep; + stashflushmode=KEEP; lock=0; } catch { @@ -284,10 +284,10 @@ outer: switch(mode) { default: break outer; - case flushsend: + case FLUSHSEND: PD("Flush\n"); add(PGFLUSH); - case sendout:; + case SENDOUT:; } if(towrite=sizeof(this)) { PD("%d>Sendcmd %O\n",socket->query_fd(),((string)this)[..towrite-1]); @@ -325,7 +325,7 @@ outer: && (pgsqlsess._options.use_ssl || pgsqlsess._options.force_ssl)) { PD("SSLRequest\n"); start()->add_int32(8)->add_int32(PG_PROTOCOL(1234,5679)) - ->sendcmd(sendout); + ->sendcmd(SENDOUT); switch(read_int8()) { case 'S': object fcon=SSL.File(socket,SSL.Context()); @@ -408,7 +408,7 @@ class sql_result { private int eoffound; private conxion c; final mixed _delayederror; - final portalstate _state; + final int _state; final int _fetchlimit; final int _alltext; final int _forcetext; @@ -468,7 +468,7 @@ class sql_result { _alltext = !alltyped; _params = params; _forcetext = forcetext; - _state = portalinit; + _state = PORTALINIT; } //! Returns the command-complete status for this query. @@ -706,7 +706,7 @@ class sql_result { Thread.MutexKey lock=prepbuffermux->lock(); prepbufferready->wait(lock); lock=0; - if(_state==closed) + if(_state==CLOSED) return; prepbuffermux=0; } @@ -744,7 +744,7 @@ class sql_result { final void _openportal() { pgsqlsess->_portalsinflight++; Thread.MutexKey lock=closemux->lock(); - _state=bound; + _state=BOUND; lock=0; _statuscmdcomplete=UNDEFINED; } @@ -754,18 +754,18 @@ class sql_result { Thread.MutexKey lock=closemux->lock(); _fetchlimit=0; // disables further Executes switch(_state) { - case copyinprogress: - case bound: + case COPYINPROGRESS: + case BOUND: _datarows->write(1); // Signal EOF --pgsqlsess->_portalsinflight; } - _state=closed; + _state=CLOSED; lock=0; releaseconditions(); } - final sctype _closeportal(bufcon plugbuffer) { - sctype retval=keep; + final int _closeportal(bufcon plugbuffer) { + int retval=KEEP; PD("%O Try Closeportal %d\n",_portalname,_state); Thread.MutexKey lock=closemux->lock(); _fetchlimit=0; // disables further Executes @@ -775,20 +775,20 @@ class sql_result { * It's a bit of a tricky race, but this check should be sufficient. */ switch(_state) { - case portalinit: + case PORTALINIT: _unnamedstatementkey=0; - _state=closed; + _state=CLOSED; break; - case copyinprogress: + case COPYINPROGRESS: PD("CopyDone\n"); plugbuffer->add("c\0\0\0\4"); - case bound: - _state=closed; + case BOUND: + _state=CLOSED; lock=0; PD("Close portal %O\n",_portalname); if(sizeof(_portalname)) { plugbuffer->add_int8('C')->add_hstring(({'P',_portalname,0}),4,4); - retval=flushsend; + retval=FLUSHSEND; } else _unnamedportalkey=0; Thread.MutexKey lockc=pgsqlsess->_commitmux->lock(); @@ -798,7 +798,7 @@ class sql_result { pgsqlsess->_readyforcommit->signal(); lockc=0; } else if(!alreadyfilled) - pgsqlsess->_readyforquerycount++, retval=syncsend; + pgsqlsess->_readyforquerycount++, retval=SYNCSEND; pgsqlsess->_pportalcount=0; } lockc=0; @@ -864,9 +864,9 @@ class sql_result { plugbuffer->add_int8('E')->add_hstring(_portalname,4,8+1) ->add_int8(0)->add_int32(fetchlimit); if(!fetchlimit) - flushmode=_closeportal(plugbuffer)==syncsend?syncsend:flushsend; + flushmode=_closeportal(plugbuffer)==SYNCSEND?SYNCSEND:FLUSHSEND; else - _inflight+=fetchlimit, flushmode=flushsend; + _inflight+=fetchlimit, flushmode=FLUSHSEND; plugbuffer->sendcmd(flushmode,this); } @@ -932,7 +932,7 @@ class sql_result { trydelayederror(); if(copydata) { PD("CopyData\n"); - c->start()->add_int8('d')->add_hstring(copydata,4,4)->sendcmd(sendout); + c->start()->add_int8('d')->add_hstring(copydata,4,4)->sendcmd(SENDOUT); } else _releasesession(); }