/*
 * Some pgsql utility functions.
 * They are kept here to avoid circular references.
 */

//! The pgsql backend, shared between all connection instances.
//! It runs even in non-callback mode in a separate thread and makes sure
//! that communication with the database is real-time and event driven
//! at all times.
//!
//! @note
//! Callbacks running from this backend directly determine the latency
//! in reacting to communication with the database server; so it
//! would be prudent not to block in these callbacks.

#pike __REAL_VERSION__
#require constant(Thread.Thread)

#include "pgsql.h"

#define PORTALINIT		0	// Portal states
#define PARSING	  		1
#define BOUND			2
#define COMMITTED		3
#define COPYINPROGRESS		4
#define CLOSING			5
#define CLOSED			6
#define PURGED			7

#define NOERROR			0	// Error states networkparser
#define PROTOCOLERROR		1
#define PROTOCOLUNSUPPORTED	2

#define LOSTERROR	"Database connection lost"

//! The instance of the pgsql dedicated backend.
final Pike.Backend local_backend;

private Pike.Backend cb_backend;
private Thread.Mutex backendmux = Thread.Mutex();
private Thread.ResourceCount clientsregistered = Thread.ResourceCount();

constant emptyarray = ({});
constant describenodata
 = (["datarowdesc":emptyarray, "datarowtypes":emptyarray,
     "datatypeoid":emptyarray]);
private constant censoroptions = (<"use_ssl", "force_ssl",
 "cache_autoprepared_statements", "reconnect", "text_query", "is_superuser",
 "server_encoding", "server_version", "integer_datetimes",
 "session_authorization">);

 /* Statements matching createprefix cause the prepared statement cache
  * to be flushed to prevent stale references to (temporary) tables
  */
final Regexp createprefix = iregexp("^\a*(CREATE|DROP)\a");

 /* Statements matching dontcacheprefix never enter the cache
  */
private Regexp dontcacheprefix = iregexp("^\a*(FETCH|COPY)\a");

 /* Statements not matching paralleliseprefix will cause the driver
  * to stall submission until all previously started statements have
  * run to completion
  */
private Regexp paralleliseprefix
 = iregexp("^\a*((SELEC|INSER)T|(UPDA|DELE)TE|(FETC|WIT)H)\a");

 /* Statements matching transbeginprefix will cause the driver
  * insert a sync after the statement.
  * Failure to do so, will result in portal synchronisation errors
  * in the event of an ErrorResponse.
  */
final Regexp transbeginprefix
 = iregexp("^\a*(BEGIN|START)([; \t\f\r\n]|$)");

 /* Statements matching transendprefix will cause the driver
  * insert a sync after the statement.
  * Failure to do so, will result in portal synchronisation errors
  * in the event of an ErrorResponse.
  */
final Regexp transendprefix
 = iregexp("^\a*(COMMIT|ROLLBACK|END)([; \t\f\r\n]|$)");

 /* For statements matching execfetchlimit the resultrows will not be
  * fetched in pieces.  This heuristic will be sub-optimal whenever
  * either an UPDATE/DELETE/INSERT statement is prefixed by WITH, or
  * if there is a RETURNING with a *lot* of results.  In those cases
  * the portal will be busy until all results have been fetched, and will
  * not be able to deliver results belonging to other parallel queries
  * running on the same filedescriptor.
  *
  * However, considering that the current heuristic increases query-speed
  * in the majority of the real-world cases, it would be considered a good
  * tradeoff.
  */
private Regexp execfetchlimit
 = iregexp("^\a*((UPDA|DELE)TE|INSERT)\a|\aLIMIT\a+[1-9][; \t\f\r\n]*$");

private void default_backend_runs() {		// Runs as soon as the
  cb_backend = Pike.DefaultBackend;		// DefaultBackend has started
}

private void create() {
  // Run callbacks from our local_backend until DefaultBackend has started
  cb_backend = local_backend = Pike.SmallBackend();
  call_out(default_backend_runs, 0);
}

private Regexp iregexp(string expr) {
  Stdio.Buffer ret = Stdio.Buffer();
  foreach (expr; ; int c)
    if (c >= 'A' && c <= 'Z')
      ret->add('[', c, c + 'a' - 'A', ']');
    else if (c == '\a')			// Replace with generic whitespace
      ret->add("[ \t\f\r\n]");
    else
      ret->add_int8(c);
  return Regexp(ret->read());
}

final void closestatement(bufcon|conxsess plugbuffer, string oldprep) {
  if (oldprep) {
    PD("Close statement %s\n", oldprep);
    CHAIN(plugbuffer)->add_int8('C')->add_hstring(({'S', oldprep, 0}), 4, 4);
  }
}

private void run_local_backend() {
  Thread.MutexKey lock;
  int looponce;
  do {
    looponce = 0;
    if (lock = backendmux->trylock()) {
      PD("Starting local backend\n");
      while (!clientsregistered->drained()	// Autoterminate when not needed
       || sizeof(local_backend->call_out_info())) {
        mixed err;
        if (err = catch(local_backend(4096.0)))
          master()->handle_error(err);
      }
      PD("Terminating local backend\n");
      lock = 0;
      looponce = !clientsregistered->drained();
    }
  } while (looponce);
}

//! Registers yourself as a user of this backend.  If the backend
//! has not been started yet, it will be spawned automatically.
final Thread.ResourceCountKey register_backend() {
  int startbackend = clientsregistered->drained();
  Thread.ResourceCountKey key = clientsregistered->acquire();
  if (startbackend)
    Thread.Thread(run_local_backend);
  return key;
}

final void throwdelayederror(sql_result|proxy parent) {
  if (mixed err = parent->delayederror) {
    if (!objectp(parent->pgsqlsess))
      parent->untolderror = 0;
    else if (parent->pgsqlsess)
      parent->pgsqlsess->untolderror = 0;
    parent.delayederror = 0;
    if (stringp(err))
      err = ({err, backtrace()[..<2]});
    throw(err);
  }
}

private int readoidformat(int oid) {
  switch (oid) {
    case BOOLOID:
    case BYTEAOID:
    case CHAROID:
    case INT8OID:
    case INT2OID:
    case INT4OID:
    case FLOAT4OID:
#if !constant(__builtin.__SINGLE_PRECISION_FLOAT__)
    case FLOAT8OID:
#endif
    case NUMERICOID:
    case TEXTOID:
    case OIDOID:
    case XMLOID:
    case MACADDROID:
    case BPCHAROID:
    case VARCHAROID:
    case CTIDOID:
    case UUIDOID:
      return 1; //binary
  }
  return 0;	// text
}

private int writeoidformat(int oid, array(string|int) paramValues,
 array(int) ai) {
  mixed value = paramValues[ai[0]++];
  switch (oid) {
    case BOOLOID:
    case BYTEAOID:
    case CHAROID:
    case INT8OID:
    case INT2OID:
    case INT4OID:
    case TEXTOID:
    case OIDOID:
    case XMLOID:
    case MACADDROID:
    case BPCHAROID:
    case VARCHAROID:
    case CTIDOID:
    case UUIDOID:
      return 1; //binary
    case NUMERICOID:
    case FLOAT4OID:
#if !constant(__builtin.__SINGLE_PRECISION_FLOAT__)
    case FLOAT8OID:
#endif
      if (!stringp(value))
        return 1;
  }
  return 0;	// text
}

private inline mixed callout(function(mixed ...:void) f,
 float|int delay, mixed ... args) {
  return cb_backend->call_out(f, delay, @args);
}

// Some pgsql utility functions

class bufcon {
  inherit Stdio.Buffer;
  private Thread.ResourceCountKey dirty;

#ifdef PG_DEBUGRACE
  final bufcon `chain() {
    return this;
  }
#endif

  private conxion realbuffer;

  private void create(conxion _realbuffer) {
    realbuffer = _realbuffer;
  }

  final Thread.ResourceCount `stashcount() {
    return realbuffer->stashcount;
  }

  final bufcon start(void|int waitforreal) {
    dirty = realbuffer->stashcount->acquire();
#ifdef PG_DEBUG
    if (waitforreal)
      error("pgsql.bufcon not allowed here\n");
#endif
    return this;
  }

  final void sendcmd(int mode, void|sql_result portal) {
    Thread.MutexKey lock = realbuffer->shortmux->lock();
    if (portal)
      realbuffer->stashqueue->write(portal);
    if (mode == SYNCSEND) {
      add(PGSYNC);
      realbuffer->stashqueue->write(1);
      mode = SENDOUT;	    // Demote it to prevent an extra SYNC upon stashflush
    }
    realbuffer->stash->add(this);
    PD("%d>Stashed mode %d > %d\n",
     realbuffer->socket->query_fd(), mode, realbuffer->stashflushmode);
    if (mode > realbuffer->stashflushmode)
      realbuffer->stashflushmode = mode;
    dirty = 0;
    this->clear();
    if (lock = realbuffer->nostash->trylock(1)) {
#ifdef PG_DEBUGRACE
      conxsess sess = conxsess(realbuffer);
      realbuffer->started = lock;
      lock = 0;
      sess->sendcmd(SENDOUT);
#else
      realbuffer->started = lock;
      lock = 0;
      realbuffer->sendcmd(SENDOUT);
#endif
    }
  }
};

class conxiin {
  inherit Stdio.Buffer:i;

  final Thread.Condition fillread;
  final Thread.Mutex fillreadmux;
  final int procmsg;
  private int didreadcb;

  protected final bool range_error(int howmuch) {
#ifdef PG_DEBUG
    if (howmuch <= 0)
      error("Out of range %d\n", howmuch);
#endif
    if (fillread) {
      Thread.MutexKey lock = fillreadmux->lock();
      if (!didreadcb)
        fillread.wait(lock);
      didreadcb = 0;
    } else
      throw(MAGICTERMINATE);
    return true;
  }

  final int read_cb(mixed id, mixed b) {
    PD("Read callback %O\n", b && ((string)b)
#ifndef PG_DEBUGMORE
      [..255]
#endif
     );
    Thread.MutexKey lock = fillreadmux->lock();
    if (procmsg && id)
      procmsg = 0, lock = 0, Thread.Thread(id);
    else if (fillread)
      didreadcb = 1, fillread.signal();
    return 0;
  }

  private void create() {
    i::create();
    fillreadmux = Thread.Mutex();
    fillread = Thread.Condition();
  }
};

class sfile {
  inherit Stdio.File;
  final int query_fd() {
    return is_open() ? ::query_fd() : -1;
  }
};

class conxion {
  inherit Stdio.Buffer:o;
  final conxiin i;

  private Thread.Queue qportals;
  final Thread.Mutex shortmux;
  private int closenext;

  final sfile
#if constant(SSL.File)
             |SSL.File
#endif
                       socket;
  private int towrite;
  final multiset(sql_result) runningportals = (<>);

  final Thread.Mutex nostash;
  final Thread.MutexKey started;
  final Thread.Queue stashqueue;
  final Thread.Condition stashavail;
  final Stdio.Buffer stash;
  //! @ignore
  final int(KEEP..SYNCSEND) stashflushmode;
  //! @endignore
  final Thread.ResourceCount stashcount;
  final int synctransact;
#ifdef PG_DEBUGRACE
  final mixed nostrack;
#endif
#ifdef PG_DEBUG
  final int queueoutidx;
  final int queueinidx = -1;
#endif

  private inline void queueup(sql_result portal) {
    qportals->write(portal); portal->_synctransact = synctransact;
    PD("%d>%O %d %d Queue portal %d bytes\n", socket->query_fd(),
     portal._portalname, ++queueoutidx, synctransact, sizeof(this));
  }

  final bufcon|conxsess start(void|int waitforreal) {
    Thread.MutexKey lock;
    if (lock = (waitforreal ? nostash->lock : nostash->trylock)(1)) {
      int mode;
#ifdef PG_DEBUGRACE
      conxsess sess = conxsess(this);
#endif
      started = lock;
      lock = shortmux->lock();
      stashcount->wait_till_drained(lock);
      mode = getstash(KEEP);
      lock = 0;
      if (mode > KEEP)
        sendcmd(mode);			// Force out stash to the server
#ifdef PG_DEBUGRACE
      return sess;
#else
      return this;
#endif
    }
    return bufcon(this)->start();
  }

  private int write_cb() {
    Thread.MutexKey lock = shortmux->lock();
    if (this) {				// Guard against async destructs
      towrite -= output_to(socket, towrite);
      lock = 0;
      if (!i->fillread && !sizeof(this))
        close();
    }
    return 0;
  }

  private int getstash(int mode) {
    if (sizeof(stash)) {
      add(stash); stash->clear();
      foreach (stashqueue->try_read_array(); ; int|sql_result portal)
        if (intp(portal))
          qportals->write(synctransact++);
        else
          queueup(portal);
      PD("%d>Got stash mode %d > %d\n",
       socket->query_fd(), stashflushmode, mode);
      if (stashflushmode > mode)
        mode = stashflushmode;
      stashflushmode = KEEP;
    }
    return mode;
  }

  final void sendcmd(void|int mode, void|sql_result portal) {
    Thread.MutexKey lock;
    if (portal)
      queueup(portal);
unfinalised:
    do {
      switch (mode) {
        default:
          break unfinalised;
        case SYNCSEND:
          PD("%d>Sync %d %d Queue\n",
           socket->query_fd(), synctransact, ++queueoutidx);
          add(PGSYNC);
          mode = SENDOUT;
          break;
        case FLUSHLOGSEND:
          PD("%d>%O %d Queue simplequery %d bytes\n", socket->query_fd(),
           portal._portalname, ++queueoutidx, sizeof(this));
          mode = FLUSHSEND;
      }
      qportals->write(synctransact++);
    } while (0);
    lock = shortmux->lock();
    mode = getstash(mode);
#ifdef PG_DEBUG
    mixed err =
#endif
    catch {
outer:
      do {
        switch (mode) {
          default:
            PD("%d>Skip flush %d Queue %O\n",
             socket->query_fd(), mode, (string)this);
            break outer;
          case FLUSHSEND:
            PD("Flush\n");
            add(PGFLUSH);
          case SENDOUT:;
        }
        if (towrite = sizeof(this)) {
          PD("%d>Sendcmd %O\n",
           socket->query_fd(), ((string)this)[..towrite-1]);
          towrite -= output_to(socket, towrite);
        }
      } while (0);
      started = 0;
      return;
    };
    lock = 0;
    PD("Sendcmd failed %s\n", describe_backtrace(err));
    destruct(this);
  }

  final int close() {
    if (!closenext && nostash) {
      closenext = 1;
      {
        Thread.MutexKey lock = i->fillreadmux->lock();
        if (i->fillread) {  // Delayed close() after flushing the output buffer
          i->fillread.signal();
          i->fillread = 0;
        }
      }
      PD("%d>Delayed close, flush write\n", socket->query_fd());
      i->read_cb(socket->query_id(), 0);
      return 0;
    } else
      return -1;
  }

  final void purge() {
    if (stashcount) {
      stashcount = 0;
      PD("%d>Purge conxion %d\n", socket ? socket->query_fd() : -1, !!nostash);
      int|sql_result portal;
      if (qportals)			// CancelRequest does not use qportals
        while (portal = qportals->try_read())
          if (objectp(portal))
            portal->_purgeportal();
      if (nostash) {
        while (sizeof(runningportals))
          catch {
            foreach (runningportals; sql_result result; )
              if (!result.datarowtypes) {
                result.datarowtypes = emptyarray;
                if (result._state != PURGED && !result.delayederror)
                  result.delayederror = LOSTERROR;
                result._ddescribe->broadcast();
                runningportals[result] = 0;
              } else
                destruct(result);
          };
        destruct(nostash);
        socket->set_non_blocking();			// Drop all callbacks
        PD("%d>Close socket\n", socket->query_fd());
        socket->close();		// This will be an asynchronous close
      }
      destruct(this);
    }
  }

  private void destroy() {
    PD("%d>Close conxion %d\n", socket ? socket->query_fd() : -1, !!nostash);
    catch(purge());
  }

  final void connectloop(proxy pgsqlsess, int nossl) {
#ifdef PG_DEBUG
    mixed err =
#endif
    catch {
      for (; ; clear()) {
        socket->connect(pgsqlsess. host, pgsqlsess. port);
#if constant(SSL.File)
        if (!nossl && !pgsqlsess->nossl
         && (pgsqlsess.options.use_ssl || pgsqlsess.options.force_ssl)) {
          PD("SSLRequest\n");
          start()->add_int32(8)->add_int32(PG_PROTOCOL(1234, 5679))
           ->sendcmd(SENDOUT);
          string s = socket.read(1);
          switch (sizeof(s) && s[0]) {
            case 'S':
              SSL.File fcon = SSL.File(socket, SSL.Context());
              if (fcon->connect()) {
                socket->set_backend(local_backend);
                socket = fcon;
                break;
              }
            default:
              PD("%d>Close socket short\n", socket->query_fd());
              socket->close();
              pgsqlsess.nossl = 1;
              continue;
            case 'N':
              if (pgsqlsess.options.force_ssl)
                error("Encryption not supported on connection to %s:%d\n",
                      pgsqlsess.host, pgsqlsess.port);
          }
        }
#else
        if (pgsqlsess.options.force_ssl)
          error("Encryption library missing,"
                " cannot establish connection to %s:%d\n",
                pgsqlsess.host, pgsqlsess.port);
#endif
        break;
      }
      if (!socket->is_open())
        error(strerror(socket->errno()) + ".\n");
      socket->set_backend(local_backend);
      socket->set_buffer_mode(i, 0);
      socket->set_nonblocking(i->read_cb, write_cb, close);
      if (nossl != 2)
        Thread.Thread(pgsqlsess->processloop, this);
      return;
    };
    PD("Connect error %s\n", describe_backtrace(err));
    catch(destruct(pgsqlsess->waitforauthready));
    destruct(this);
  }

  private string _sprintf(int type) {
    string res;
    switch (type) {
      case 'O':
        int fd = -1;
        if (socket)
          catch(fd = socket->query_fd());
        res = predef::sprintf("conxion  fd: %d input queue: %d/%d "
                    "queued portals: %d  output queue: %d/%d\n"
                    "started: %d\n",
                    fd, sizeof(i), i->_size_object(),
                    qportals && qportals->size(), sizeof(this), _size_object(),
                    !!started);
        break;
    }
    return res;
  }

  private void create(proxy pgsqlsess, Thread.Queue _qportals, int nossl) {
    o::create();
    qportals = _qportals;
    synctransact = 1;
    socket = sfile();
    i = conxiin();
    shortmux = Thread.Mutex();
    nostash = Thread.Mutex();
    closenext = 0;
    stashavail = Thread.Condition();
    stashqueue = Thread.Queue();
    stash = Stdio.Buffer();
    stashcount = Thread.ResourceCount();
    Thread.Thread(connectloop, pgsqlsess, nossl);
  }
};

#ifdef PG_DEBUGRACE
class conxsess {
  final conxion chain;

  private void create(conxion parent) {
    if (parent->started)
      werror("Overwriting conxsess %s %s\n",
        describe_backtrace(({"new ", backtrace()[..<1]})),
        describe_backtrace(({"old ", parent->nostrack})));
    parent->nostrack = backtrace();
    chain = parent;
  }

  final void sendcmd(int mode, void|sql_result portal) {
    chain->sendcmd(mode, portal);
    chain = 0;
  }

  private void destroy() {
    if (chain)
      werror("Untransmitted conxsess %s\n",
       describe_backtrace(({"", backtrace()[..<1]})));
  }
};
#endif

//! The result object returned by @[Sql.pgsql()->big_query()], except for
//! the noted differences it behaves the same as @[Sql.sql_result].
//!
//! @seealso
//!   @[Sql.sql_result], @[Sql.pgsql], @[Sql.Sql], @[Sql.pgsql()->big_query()]
class sql_result {

  private proxy pgsqlsess;
  private int(0..1) eoffound;
  private conxion c;
  private conxiin cr;
  final int(0..1) untolderror;
  final mixed delayederror;
  //! @ignore
  final int(PORTALINIT..PURGED) _state;
  //! @endignore
  final int _fetchlimit;
  private int(0..1) alltext;
  final int(0..1) _forcetext;
  private int syncparse;
  private int transtype;

  final string _portalname;

  private int index;
  private int inflight;
  int portalbuffersize;
  private Thread.Mutex closemux;
  private Thread.Queue datarows;
  private Thread.ResourceCountKey stmtifkey, portalsifkey;
  private array(mapping(string:mixed)) datarowdesc;
  final array(int) datarowtypes;	// types from datarowdesc
  private string statuscmdcomplete;
  private int bytesreceived;
  final int _synctransact;
  final Thread.Condition _ddescribe;
  final Thread.Mutex _ddescribemux;
  final Thread.MutexKey _unnamedportalkey, _unnamedstatementkey;
  final array _params;
  final string _query;
  final string _preparedname;
  final mapping(string:mixed) _tprepared;
  private function(:void) gottimeout;
  private int timeout;

  protected string _sprintf(int type) {
    string res;
    switch (type) {
      case 'O':
        int fd = -1;
        if (c && c->socket)
          catch(fd = c->socket->query_fd());
        res = sprintf("sql_result state: %d numrows: %d eof: %d inflight: %d\n"
                    "query: %O\n"
                    "fd: %O portalname: %O  datarows: %d"
                    "  synctransact: %d laststatus: %s\n",
                    _state, index, eoffound, inflight,
                    _query, fd, _portalname,
                    datarowtypes && sizeof(datarowtypes), _synctransact,
                    statuscmdcomplete
                    || (_unnamedstatementkey ? "*parsing*" : ""));
        break;
    }
    return res;
  }

  protected void create(proxy _pgsqlsess, conxion _c, string query,
   int _portalbuffersize, int alltyped, array params, int forcetext,
   int _timeout, int _syncparse, int _transtype) {
    pgsqlsess = _pgsqlsess;
    if (c = _c)
      cr = c->i;
    else
      losterror();
    _query = query;
    datarows = Thread.Queue();
    _ddescribe = Thread.Condition();
    _ddescribemux = Thread.Mutex();
    closemux = Thread.Mutex();
    portalbuffersize = _portalbuffersize;
    alltext = !alltyped;
    _params = params;
    _forcetext = forcetext;
    _state = PORTALINIT;
    timeout = _timeout;
    syncparse = _syncparse;
    gottimeout = _pgsqlsess->cancelquery;
    c->runningportals[this] = 1;
    transtype = _transtype;
  }

  //! Returns the command-complete status for this query.
  //!
  //! @note
  //! This function is PostgreSQL-specific.
  //!
  //! @seealso
  //!  @[affected_rows()]
  /*semi*/final string status_command_complete() {
    if (!statuscmdcomplete) {
      if (!datarowtypes)
        waitfordescribe();
      {
        Thread.MutexKey lock = closemux->lock();
        if (_fetchlimit)
          _sendexecute(_fetchlimit = 0);
        lock = _ddescribemux->lock();
        if (!statuscmdcomplete)
          PT(_ddescribe->wait(lock));
      }
      if (this)		// If object already destructed, skip the next call
        trydelayederror();	// since you cannot call functions anymore
      else
        error(LOSTERROR);
    }
    return statuscmdcomplete;
  }

  //! Returns the number of affected rows by this query.
  //!
  //! @note
  //! This function is PostgreSQL-specific.
  //!
  //! @seealso
  //!  @[num_rows()], @[status_command_complete()]
  /*semi*/final int affected_rows() {
    int rows;
    sscanf(status_command_complete() || "", "%*s %d %d", rows, rows);
    return rows;
  }

  final void _storetiming() {
    if (_tprepared) {
      _tprepared.trun = gethrtime() - _tprepared.trunstart;
      m_delete(_tprepared, "trunstart");
      _tprepared = 0;
    }
  }

  private void waitfordescribe() {
    {
      Thread.MutexKey lock = _ddescribemux->lock();
      if (!datarowtypes)
        PT(_ddescribe->wait(lock));
    }
    if (this)		// If object already destructed, skip the next call
      trydelayederror();	// since you cannot call functions anymore
    else
      error(LOSTERROR);
  }

  //! @seealso
  //!  @[Sql.sql_result()->num_fields()]
  /*semi*/final int num_fields() {
    if (!datarowtypes)
      waitfordescribe();
    return sizeof(datarowtypes);
  }

  //! @note
  //!  This method returns the number of rows already received from
  //!  the database for the current query.  Note that this number
  //!  can still increase between subsequent calls if the results from
  //!  the query are not complete yet.  This function is only guaranteed to
  //!  return the correct count after EOF has been reached.
  //! @seealso
  //!  @[Sql.sql_result()->num_rows()]
  /*semi*/final int num_rows() {
    trydelayederror();
    return index;
  }

  private void losterror() {
    string err;
    if (pgsqlsess)
      err = pgsqlsess->geterror(1);
    error("%s\n", err || LOSTERROR);
  }

  private void trydelayederror() {
    if (delayederror)
      throwdelayederror(this);
    else if (_state == PURGED)
      losterror();
  }

  //! @seealso
  //!  @[Sql.sql_result()->eof()]
  /*semi*/final int eof() {
    trydelayederror();
    return eoffound;
  }

  //! @seealso
  //!  @[Sql.sql_result()->fetch_fields()]
  /*semi*/final array(mapping(string:mixed)) fetch_fields() {
    if (!datarowtypes)
      waitfordescribe();
    if (!datarowdesc)
      error(LOSTERROR);
    return datarowdesc + emptyarray;
  }

#ifdef PG_DEBUG
#define INTVOID int
#else
#define INTVOID void
#endif
  final INTVOID _decodedata(int msglen, string cenc) {
    _storetiming(); _releasestatement();
    string serror;
    bytesreceived += msglen;
    int cols = cr->read_int16();
    array a = allocate(cols, !alltext && Val.null);
#ifdef PG_DEBUG
    msglen -= 2 + 4 * cols;
#endif
    foreach (datarowtypes; int i; int typ) {
      int collen = cr->read_sint(4);
      if (collen > 0) {
#ifdef PG_DEBUG
        msglen -= collen;
#endif
        mixed value;
        switch (typ) {
          case FLOAT4OID:
#if !constant(__builtin.__SINGLE_PRECISION_FLOAT__)
          case FLOAT8OID:
#endif
            if (_forcetext) {
              if (!alltext) {
                value = (float)cr->read(collen);
                break;
              }
            } else {
              [ value ] = cr->sscanf(collen == 4 ? "%4F" : "%8F");
              if (alltext)
                value = sprintf("%.*g", collen == 4 ? 9 : 17, value);
              break;
            }
          default:value = cr->read(collen);
            break;
          case CHAROID:
            value = alltext ? cr->read(1) : cr->read_int8();
            break;
          case BOOLOID:value = cr->read_int8();
            switch (value) {
              case 'f':value = 0;
                break;
              case 't':value = 1;
            }
            if (alltext)
              value = value ? "t" : "f";
            break;
          case TEXTOID:
          case BPCHAROID:
          case VARCHAROID:
            value = cr->read(collen);
            if (cenc == UTF8CHARSET && catch(value = utf8_to_string(value))
             && !serror)
              serror = SERROR("%O contains non-%s characters\n",
                                                     value, UTF8CHARSET);
            break;
          case NUMERICOID:
            if (_forcetext) {
              value = cr->read(collen);
              if (!alltext) {
                value = value/".";
                if (sizeof(value) == 1)
                  value = (int)value[0];
                else {
                  int i, denom;
                  for (i = sizeof(value[1]), denom = 1; --i >= 0; denom *= 10);
                  i = (int)value[0];
                  value = (int)value[1];
                  value = Gmp.mpq(i * denom + (i >= 0 ? value : -value),
                   denom);
                }
              }
            } else {
              int nwords = cr->read_int16();
              int magnitude = cr->read_sint(2);
              int sign = cr->read_int16();
              cr->consume(2);
              if (nwords) {
                for (value = cr->read_int16(); --nwords; magnitude--)
                  value = value * NUMERIC_MAGSTEP + cr->read_int16();
                if (sign)
                  value = -value;
                if (magnitude > 0)
                  do
                    value *= NUMERIC_MAGSTEP;
                  while (--magnitude);
                else if (magnitude < 0) {
                  for (sign = NUMERIC_MAGSTEP;
                       ++magnitude;
                       sign *= NUMERIC_MAGSTEP);
                  value = Gmp.mpq(value, sign);
                }
              } else
                value = 0;
              if (alltext)
                value = (string)value;
            }
            break;
          case INT8OID:case INT2OID:
          case OIDOID:case INT4OID:
            if (_forcetext) {
              value = cr->read(collen);
              if (!alltext)
                value = (int)value;
            } else {
              switch (typ) {
                case INT8OID:value = cr->read_sint(8);
                  break;
                case INT2OID:value = cr->read_sint(2);
                  break;
                case OIDOID:
                case INT4OID:value = cr->read_sint(4);
              }
              if (alltext)
                value = (string)value;
            }
        }
        a[i]=value;
      } else if (!collen)
        a[i]="";
    }
    _processdataready(a);
    if (serror)
      error(serror);
#ifdef PG_DEBUG
    return msglen;
#endif
  }

  final void _setrowdesc(array(mapping(string:mixed)) drowdesc,
   array(int) drowtypes) {
    Thread.MutexKey lock = _ddescribemux->lock();
    datarowdesc = drowdesc;
    datarowtypes = drowtypes;
    _ddescribe->broadcast();
  }

  final void _preparebind(array dtoid) {
    array(string|int) paramValues =_params ? _params[2] : emptyarray;
    if (sizeof(dtoid) != sizeof(paramValues))
      SUSERERROR("Invalid number of bindings, expected %d, got %d\n",
                 sizeof(dtoid), sizeof(paramValues));
    Thread.MutexKey lock = _ddescribemux->lock();
    if (!_portalname) {
      _portalname
       = (_unnamedportalkey = pgsqlsess.unnamedportalmux->trylock(1))
         ? "" : PORTALPREFIX
#ifdef PG_DEBUG
          + (string)(c->socket->query_fd()) + "_"
#endif
          + String.int2hex(pgsqlsess.pportalcount++);
      lock = 0;
#ifdef PG_DEBUGMORE
      PD("ParamValues to bind: %O\n", paramValues);
#endif
      Stdio.Buffer plugbuffer = Stdio.Buffer();
      { array dta = ({sizeof(dtoid)});
        plugbuffer->add(_portalname, 0, _preparedname, 0)
         ->add_ints(dta
           + map(dtoid, writeoidformat, paramValues, ({0})) + dta, 2);
      }
      string cenc = pgsqlsess.runtimeparameter[CLIENT_ENCODING];
      foreach (paramValues; int i; mixed value) {
        if (undefinedp(value) || objectp(value) && value->is_val_null)
          plugbuffer->add_int32(-1);				// NULL
        else if (stringp(value) && !sizeof(value)) {
          int k = 0;
          switch (dtoid[i]) {
            default:
              k = -1;	     // cast empty strings to NULL for non-string types
            case BYTEAOID:
            case TEXTOID:
            case XMLOID:
            case BPCHAROID:
            case VARCHAROID:;
          }
          plugbuffer->add_int32(k);
        } else
          switch (dtoid[i]) {
            case TEXTOID:
            case BPCHAROID:
            case VARCHAROID: {
              if (!value) {
                plugbuffer->add_int32(-1);
                break;
              }
              value = (string)value;
              switch (cenc) {
                case UTF8CHARSET:
                  value = string_to_utf8(value);
                  break;
                default:
                  if (String.width(value)>8) {
                    SUSERERROR("Don't know how to convert %O to %s encoding\n",
                               value, cenc);
                    value="";
                  }
              }
              plugbuffer->add_hstring(value, 4);
              break;
            }
            default: {
              if (!value) {
                plugbuffer->add_int32(-1);
                break;
              }
              if (!objectp(value) || typeof(value) != typeof(Stdio.Buffer()));
                /*
                 *  Like Oracle and SQLite, we accept literal binary values
                 *  from single-valued multisets.
                 */
                if (multisetp(value) && sizeof(value) == 1)
                  value = indices(value)[0];
                else {
                  value = (string)value;
                  if (String.width(value) > 8)
                    if (dtoid[i] == BYTEAOID)
                      /*
                       *  FIXME We should throw an error here, it would
                       *  have been correct, but for historical reasons and
                       *  as a DWIM convenience we autoconvert to UTF8 here.
                       */
                      value = string_to_utf8(value);
                    else {
                      SUSERERROR(
                        "Wide string %O not supported for type OID %d\n",
                        value, dtoid[i]);
                      value = "";
                    }
                }
              plugbuffer->add_hstring(value, 4);
              break;
            }
            case BOOLOID:
              do {
                int tval;
                if (stringp(value))
                  tval = value[0];
                else if (!intp(value)) {
                  value = !!value;			// cast to boolean
                  break;
                } else
                  tval = value;
                switch (tval) {
                  case 'o':case 'O':
                    catch {
                      tval = value[1];
                      value = tval == 'n' || tval == 'N';
                    };
                    break;
                  default:
                    value = 1;
                    break;
                  case 0:case '0':case 'f':case 'F':case 'n':case 'N':
                    value = 0;
                      break;
                }
              } while (0);
              plugbuffer->add("\0\0\0\1", value);
              break;
            case CHAROID:
              if (intp(value))
                plugbuffer->add_hstring(value, 4);
              else {
                value = (string)value;
                switch (sizeof(value)) {
                  default:
                    SUSERERROR(
                     "\"char\" types must be 1 byte wide, got %O\n", value);
                  case 0:
                    plugbuffer->add_int32(-1);			// NULL
                    break;
                  case 1:
                    plugbuffer->add_hstring(value[0], 4);
                }
              }
              break;
            case NUMERICOID:
              if (stringp(value))
                plugbuffer->add_hstring(value, 4);
              else {
                int num, den, sign, magnitude = 0;
                value = Gmp.mpq(value);
                num = value->num();
                den = value->den();
                for (value = den;
                     value > NUMERIC_MAGSTEP;
                     value /= NUMERIC_MAGSTEP);
                if (value > 1) {
                  value = NUMERIC_MAGSTEP / value;
                  num *= value;
                  den *= value;
                }
                if (num < 0)
                  sign = 0x4000, num = -num;
                else
                  sign = 0;
                array stor = ({});
                if (num) {
                  while (!(num % NUMERIC_MAGSTEP))
                    num /= NUMERIC_MAGSTEP, magnitude++;
                  do
                    stor = ({num % NUMERIC_MAGSTEP}) + stor, magnitude++;
                  while (num /= NUMERIC_MAGSTEP);
                  num = --magnitude << 2;
                  while (den > 1)
                    magnitude--, den /= NUMERIC_MAGSTEP;
                }
                plugbuffer->add_int32(4 * 2 + (sizeof(stor) << 1))
                 ->add_int16(sizeof(stor))->add_int16(magnitude)
                 ->add_int16(sign)->add_int16(num)->add_ints(stor, 2);
              }
              break;
            case FLOAT4OID:
#if !constant(__builtin.__SINGLE_PRECISION_FLOAT__)
            case FLOAT8OID:
#endif
              if (stringp(value))
                plugbuffer->add_hstring(value, 4);
              else {
                int w = dtoid[i] == FLOAT4OID ? 4 : 8;
                plugbuffer->add_int32(w)
                 ->sprintf(w == 4 ? "%4F" : "%8F", value);
              }
              break;
            case INT8OID:
              plugbuffer->add_int32(8)->add_int((int)value, 8);
              break;
            case OIDOID:
            case INT4OID:
              plugbuffer->add_int32(4)->add_int32((int)value);
              break;
            case INT2OID:
              plugbuffer->add_int32(2)->add_int16((int)value);
              break;
          }
      }
      if (!datarowtypes) {
        if (_tprepared && dontcacheprefix->match(_query))
          m_delete(pgsqlsess->prepareds, _query), _tprepared = 0;
        waitfordescribe();
      }
      if (_state >= CLOSING)
        lock = _unnamedstatementkey = 0;
      else {
        plugbuffer->add_int16(sizeof(datarowtypes));
        if (sizeof(datarowtypes))
          plugbuffer->add_ints(map(datarowtypes, readoidformat), 2);
        else if (syncparse < 0 && !pgsqlsess->wasparallelisable
         && !pgsqlsess->statementsinflight->drained(1)) {
          lock = pgsqlsess->shortmux->lock();
          PD("Commit waiting for statements to finish\n");
          catch(PT(pgsqlsess->statementsinflight->wait_till_drained(lock, 1)));
        }
        lock = 0;
        PD("Bind portal %O statement %O\n", _portalname, _preparedname);
        _fetchlimit = pgsqlsess->_fetchlimit;
        _bindportal();
        conxsess bindbuffer = c->start();
        _unnamedstatementkey = 0;
        stmtifkey = 0;
        CHAIN(bindbuffer)->add_int8('B')->add_hstring(plugbuffer, 4, 4);
        if (!_tprepared && sizeof(_preparedname))
          closestatement(CHAIN(bindbuffer), _preparedname);
        _sendexecute(_fetchlimit
                             && !(transtype != NOTRANS
                                  || sizeof(_query) >= MINPREPARELENGTH &&
                                  execfetchlimit->match(_query))
                             && _fetchlimit, bindbuffer);
      }
    }
  }

  final void _processrowdesc(array(mapping(string:mixed)) datarowdesc,
   array(int) datarowtypes) {
    _setrowdesc(datarowdesc, datarowtypes);
    if (_tprepared) {
      _tprepared.datarowdesc = datarowdesc;
      _tprepared.datarowtypes = datarowtypes;
    }
  }

  final void _parseportal() {
    {
      Thread.MutexKey lock = closemux->lock();
      _state = PARSING;
      {
        Thread.MutexKey lockc = pgsqlsess->shortmux->lock();
        if (syncparse || syncparse < 0 && pgsqlsess->wasparallelisable) {
          PD("Commit waiting for statements to finish\n");
          catch(PT(pgsqlsess->statementsinflight->wait_till_drained(lockc)));
        }
        stmtifkey = pgsqlsess->statementsinflight->acquire();
      }
    }
    statuscmdcomplete = 0;
    pgsqlsess->wasparallelisable = paralleliseprefix->match(_query);
  }

  final void _releasestatement(void|int nolock) {
    Thread.MutexKey lock;
    if (!nolock)
      lock = closemux->lock();
    if (_state <= BOUND) {
      _state = COMMITTED;
      stmtifkey = 0;
    }
  }

  final void _bindportal() {
    Thread.MutexKey lock = closemux->lock();
    _state = BOUND;
    portalsifkey = pgsqlsess->portalsinflight->acquire();
  }

  final void _purgeportal() {
    PD("Purge portal\n");
    datarows->write(1);				   // Signal EOF
    {
      Thread.MutexKey lock = closemux->lock();
      _fetchlimit = 0;				   // disables further Executes
      switch (_state) {
        case COPYINPROGRESS:
        case COMMITTED:
        case BOUND:
          portalsifkey = 0;
      }
      switch (_state) {
        case BOUND:
        case PARSING:
          stmtifkey = 0;
      }
      _state = PURGED;
    }
    releaseconditions();
  }

  final int _closeportal(conxsess cs) {
    void|bufcon|conxsess plugbuffer = CHAIN(cs);
    int retval = KEEP;
    PD("%O Try Closeportal %d\n", _portalname, _state);
    Thread.MutexKey lock = closemux->lock();
    _fetchlimit = 0;				   // disables further Executes
    switch (_state) {
      case PARSING:
      case BOUND:
        _releasestatement(1);
    }
    switch (_state) {
      case PORTALINIT:
      case PARSING:
        _unnamedstatementkey = 0;
        _state = CLOSING;
        break;
      case COPYINPROGRESS:
        PD("CopyDone\n");
        plugbuffer->add("c\0\0\0\4");
      case COMMITTED:
      case BOUND:
        _state = CLOSING;
        lock = 0;
        PD("Close portal %O\n", _portalname);
        if (_portalname && sizeof(_portalname)) {
          plugbuffer->add_int8('C')
           ->add_hstring(({'P', _portalname, 0}), 4, 4);
          retval = FLUSHSEND;
        } else
          _unnamedportalkey = 0;
        portalsifkey = 0;
        if (pgsqlsess->portalsinflight->drained()) {
          if (plugbuffer->stashcount->drained() && transtype != TRANSBEGIN)
           /*
            * stashcount will be non-zero if a parse request has been queued
            * before the close was initiated.
            * It's a bit of a tricky race, but this check should be sufficient.
            */
            pgsqlsess->readyforquerycount++, retval = SYNCSEND;
          pgsqlsess->pportalcount = 0;
        }
    }
    return retval;
  }

  private void replenishrows() {
    if (_fetchlimit && sizeof(datarows) <= _fetchlimit >> 1) {
      _fetchlimit = pgsqlsess._fetchlimit;
      if (bytesreceived)
        _fetchlimit =
         min((portalbuffersize >> 1) * index / bytesreceived || 1, _fetchlimit);
      Thread.MutexKey lock = closemux->lock();
      if (_fetchlimit && inflight <= (_fetchlimit - 1) >> 1)
        _sendexecute(_fetchlimit);
      else if (!_fetchlimit)
        PD("<%O _fetchlimit %d, inflight %d, skip execute\n",
         _portalname, _fetchlimit, inflight);
    }
  }

  final void _processdataready(array datarow, void|int msglen) {
    bytesreceived += msglen;
    inflight--;
    if (_state<CLOSED)
      datarows->write(datarow);
    if (++index == 1)
      PD("<%O _fetchlimit %d=min(%d||1,%d), inflight %d\n", _portalname,
       _fetchlimit, (portalbuffersize >> 1) * index / bytesreceived,
       pgsqlsess._fetchlimit, inflight);
    replenishrows();
  }

  private void releaseconditions() {
    _unnamedportalkey = _unnamedstatementkey = 0;
    if (!datarowtypes) {
      if (_state != PURGED && !delayederror)
        delayederror = LOSTERROR;
      datarowtypes = emptyarray;
      _ddescribe->broadcast();
    }
    if (delayederror && !pgsqlsess.delayederror)
      pgsqlsess.delayederror = delayederror;	// Preserve error upstream
    pgsqlsess = 0;
  }

  final void _releasesession(void|string statusccomplete) {
    c->runningportals[this] = 0;
    if (statusccomplete && !statuscmdcomplete) {
      Thread.MutexKey lock = _ddescribemux->lock();
      statuscmdcomplete = statusccomplete;
      _ddescribe->broadcast();
    }
    inflight = 0;
    conxsess plugbuffer;
    if (!catch(plugbuffer = c->start()))
      plugbuffer->sendcmd(_closeportal(plugbuffer));
    if (_state < CLOSED)
      _state = CLOSED;
    datarows->write(1);				// Signal EOF
    releaseconditions();
  }

  protected void destroy() {
    catch {			   // inside destructors, exceptions don't work
      _releasesession();
    };
  }

  final void _sendexecute(int fetchlimit, void|bufcon|conxsess plugbuffer) {
    int flushmode;
    PD("Execute portal %O fetchlimit %d transtype %d\n", _portalname,
     fetchlimit, transtype);
    if (!plugbuffer)
      plugbuffer = c->start(1);
    CHAIN(plugbuffer)->add_int8('E')->add_hstring(({_portalname, 0}), 4, 8)
     ->add_int32(fetchlimit);
    if (!fetchlimit) {
      if (transtype != NOTRANS)
        pgsqlsess.intransaction = transtype == TRANSBEGIN;
      flushmode = _closeportal(plugbuffer) == SYNCSEND
       || transtype == TRANSEND ? SYNCSEND : FLUSHSEND;
    } else
      inflight += fetchlimit, flushmode = FLUSHSEND;
    plugbuffer->sendcmd(flushmode, this);
  }

  inline private array setuptimeout() {
    return local_backend->call_out(gottimeout, timeout);
  }

  //! @returns
  //!  One result row at a time.
  //!
  //! When using COPY FROM STDOUT, this method returns one row at a time
  //! as a single string containing the entire row.
  //!
  //! @seealso
  //!  @[eof()], @[send_row()]
  /*semi*/final array(mixed) fetch_row() {
    int|array datarow;
    replenishrows();
    if (arrayp(datarow = datarows->try_read()))
      return datarow;
    if (!eoffound) {
      if (!datarow) {
        PD("%O Block for datarow\n", _portalname);
        array cid = setuptimeout();
        PT(datarow = datarows->read());
        local_backend->remove_call_out(cid);
        if (arrayp(datarow))
          return datarow;
      }
      eoffound = 1;
      datarows->write(1);			// Signal EOF for other threads
    }
    trydelayederror();
    return 0;
  }

  //! @returns
  //!  Multiple result rows at a time (at least one).
  //!
  //! When using COPY FROM STDOUT, this method returns one row at a time
  //! as a single string containing the entire row.
  //!
  //! @seealso
  //!  @[eof()], @[fetch_row()]
  /*semi*/final array(array(mixed)) fetch_row_array() {
    if (eoffound)
      return 0;
    replenishrows();
    array(array|int) datarow = datarows->try_read_array();
    if (!sizeof(datarow)) {
      array cid = setuptimeout();
      PT(datarow = datarows->read_array());
      local_backend->remove_call_out(cid);
    }
    replenishrows();
    if (arrayp(datarow[-1]))
      return datarow;
    do datarow = datarow[..<1];			// Swallow EOF mark(s)
    while (sizeof(datarow) && !arrayp(datarow[-1]));
    trydelayederror();
    eoffound = 1;
    datarows->write(1);				// Signal EOF for other threads
    return datarow;
  }

  //! @param copydata
  //! When using COPY FROM STDIN, this method accepts a string or an
  //! array of strings to be processed by the COPY command; when sending
  //! the amount of data sent per call does not have to hit row or column
  //! boundaries.
  //!
  //! The COPY FROM STDIN sequence needs to be completed by either
  //! explicitly or implicitly destroying the result object, or by passing no
  //! argument to this method.
  //!
  //! @seealso
  //!  @[fetch_row()], @[eof()]
  /*semi*/final void send_row(void|string|array(string) copydata) {
    trydelayederror();
    if (copydata) {
      PD("CopyData\n");
      void|bufcon|conxsess cs = c->start();
      CHAIN(cs)->add_int8('d')->add_hstring(copydata, 4, 4);
      cs->sendcmd(SENDOUT);
    } else
      _releasesession();
  }

  private void run_result_cb(
   function(sql_result, array(mixed), mixed ...:void) callback,
   array(mixed) args) {
    int|array datarow;
    for (;;) {
      array cid = setuptimeout();
      PT(datarow = datarows->read());
      local_backend->remove_call_out(cid);
      if (!arrayp(datarow))
        break;
      callout(callback, 0, this, datarow, @args);
    }
    eoffound = 1;
    callout(callback, 0, this, 0, @args);
  }

  //! Sets up a callback for every row returned from the database.
  //! First argument passed is the resultobject itself, second argument
  //! is the result row (zero on EOF).
  //!
  //! @seealso
  //!  @[fetch_row()]
  /*semi*/final void set_result_callback(
   function(sql_result, array(mixed), mixed ...:void) callback,
   mixed ... args) {
    if (callback)
      Thread.Thread(run_result_cb, callback, args);
  }

  private void run_result_array_cb(
   function(sql_result, array(array(mixed)), mixed ...:void) callback,
   array(mixed) args) {
    array(array|int) datarow;
    for (;;) {
      array cid = setuptimeout();
      PT(datarow = datarows->read_array());
      local_backend->remove_call_out(cid);
      if (!datarow || !arrayp(datarow[-1]))
        break;
      callout(callback, 0, this, datarow, @args);
    }
    eoffound = 1;
    if (sizeof(datarow)>1)
      callout(callback, 0, this, datarow = datarow[..<1], @args);
    callout(callback, 0, this, 0, @args);
  }

  //! Sets up a callback for sets of rows returned from the database.
  //! First argument passed is the resultobject itself, second argument
  //! is the array of result rows (zero on EOF).
  //!
  //! @seealso
  //!  @[fetch_row()]
  /*semi*/final void set_result_array_callback(
   function(sql_result, array(array(mixed)), mixed ...:void) callback,
   mixed ... args) {
    if (callback)
      Thread.Thread(run_result_array_cb, callback, args);
  }

};

class proxy {
  final int _fetchlimit = FETCHLIMIT;
  final Thread.Mutex unnamedportalmux;
  final Thread.Mutex unnamedstatement;
  private Thread.MutexKey termlock;
  private Thread.ResourceCountKey backendreg;
  final Thread.ResourceCount portalsinflight, statementsinflight;
  final int(0..1) wasparallelisable;
  final int(0..1) intransaction;

  final conxion c;
  private string cancelsecret;
  private int backendpid;
  final int(-128..127) backendstatus;
  final mapping(string:mixed) options;
  private array(string) lastmessage = emptyarray;
  final int(0..1) clearmessage;
  final int(0..1) untolderror;
  private mapping(string:array(mixed)) notifylist = ([]);
  final mapping(string:string) runtimeparameter;
  final mapping(string:mapping(string:mixed)) prepareds = ([]);
  final int pportalcount;
  final int totalhits;
  final int msgsreceived;	// Number of protocol messages received
  final int bytesreceived;	// Number of bytes received
  final int warningsdropcount;	// Number of uncollected warnings
  private int warningscollected;
  final int(0..1) invalidatecache;
  private Thread.Queue qportals;
  final mixed delayederror;
  final function (:void) readyforquery_cb;

  final string host;
  final int(0..65535) port;
  private string database, user, pass;
  private Crypto.Hash.SCRAM SASLcontext;
  final Thread.Condition waitforauthready;
  final Thread.Mutex shortmux;
  final int readyforquerycount;

  private string _sprintf(int type) {
    string res;
    switch (type) {
      case 'O':
        res = sprintf(DRIVERNAME".proxy(%s@%s:%d/%s,%d,%d)",
          user, host, port, database, c && c->socket && c->socket->query_fd(),
          backendpid);
        break;
    }
    return res;
  }

  private void create(void|string host, void|string database,
                        void|string user, void|string pass,
                        void|mapping(string:mixed) options) {
    if (this::pass = pass) {
      String.secure(pass);
      pass = "CENSORED";
    }
    this::user = user;
    this::database = database;
    this::options = options;

    if (!host) host = PGSQL_DEFAULT_HOST;
    if (has_value(host,":") && sscanf(host,"%s:%d", host, port) != 2)
      error("Error in parsing the hostname argument\n");
    this::host = host;

    if (!port)
      port = PGSQL_DEFAULT_PORT;
    backendreg = register_backend();
    shortmux = Thread.Mutex();
    PD("Connect\n");
    waitforauthready = Thread.Condition();
    qportals = Thread.Queue();
    readyforquerycount = 1;
    qportals->write(1);
    if (!(c = conxion(this, qportals, 0)))
      error("Couldn't connect to database on %s:%d\n", host, port);
    runtimeparameter = ([]);
    unnamedportalmux = Thread.Mutex();
    unnamedstatement = Thread.Mutex();
    readyforquery_cb = connect_cb;
    portalsinflight = Thread.ResourceCount();
    statementsinflight = Thread.ResourceCount();
    wasparallelisable = 0;
  }

  final int is_open() {
    return c && c->socket && c->socket->is_open();
  }

  final string geterror(void|int clear) {
    throwdelayederror(this);
    untolderror = 0;
    string s = lastmessage * "\n";
    if (clear)
      lastmessage = emptyarray;
    warningscollected = 0;
    return sizeof(s) && s;
  }

  final string host_info() {
    return sprintf("fd:%d TCP/IP %s:%d PID %d",
                   c ? c->socket->query_fd() : -1, host, port, backendpid);
  }

  final void cancelquery() {
    if (cancelsecret > "") {
      PD("CancelRequest\n");
      conxion lcon = conxion(this, 0, 2);
#ifdef PG_DEBUG
      mixed err =
#endif
      catch(lcon->add_int32(16)->add_int32(PG_PROTOCOL(1234, 5678))
        ->add_int32(backendpid)->add(cancelsecret)->sendcmd(FLUSHSEND));
#ifdef PG_DEBUG
      if (err)
        PD("CancelRequest failed to connect %O\n", describe_backtrace(err));
#endif
      destruct(lcon);		// Destruct explicitly to avoid delayed close
#ifdef PG_DEBUGMORE
      PD("Closetrace %O\n", backtrace());
#endif
    } else
      error("Connection not established, cannot cancel any query\n");
  }

  private string a2nls(array(string) msg) {
    return msg * "\n" + "\n";
  }

  private string pinpointerror(void|string query, void|string offset) {
    if (!query)
      return "";
    int k = (int)offset;
    if (k <= 0)
      return MARKSTART + query + MARKEND;
    return MARKSTART + (k > 1 ? query[..k-2] : "")
         + MARKERROR + query[k - 1..] + MARKEND;
  }

  private void connect_cb() {
    PD("%O\n", runtimeparameter);
  }

  private array(string) showbindings(sql_result portal) {
    array(string) msgs = emptyarray;
    array from;
    if (portal && (from = portal._params)) {
      array to, paramValues;
      [from, to, paramValues] = from;
      if (sizeof(paramValues)) {
        string val;
        int i;
        string fmt = sprintf("%%%ds %%3s %%.61s", max(@map(from, sizeof)));
        foreach (paramValues; i; val)
          msgs += ({sprintf(fmt, from[i], to[i], sprintf("%O", val))});
      }
    }
    return msgs;
  }

  private void preplastmessage(mapping(string:string) msgresponse) {
    lastmessage = ({
      sprintf("%s %s:%s %s\n (%s:%s:%s)",
              msgresponse.S, msgresponse.C, msgresponse.P || "",
              msgresponse.M, msgresponse.F || "", msgresponse.R || "",
              msgresponse.L||"")});
  }

  private int|sql_result portal;  // state information procmessage
#ifdef PG_DEBUG
  private string datarowdebug;
  private int datarowdebugcount;
#endif

  final void processloop(conxion ci) {
    (c = ci)->socket->set_id(procmessage);
    cancelsecret = 0;
    portal = 0;
    {
      Stdio.Buffer plugbuffer = Stdio.Buffer()->add_int32(PG_PROTOCOL(3, 0));
      if (user)
        plugbuffer->add("user\0", user, 0);
      if (database)
        plugbuffer->add("database\0", database, 0);
      foreach (options - censoroptions; string name; mixed value)
        plugbuffer->add(name, 0, (string)value, 0);
      plugbuffer->add_int8(0);
      PD("%O\n", (string)plugbuffer);
      void|bufcon|conxsess cs;
      if (catch(cs = ci->start())) {
        destruct(waitforauthready);
        unnamedstatement = 0;
        termlock = 0;
        return;
      } else {
        CHAIN(cs)->add_hstring(plugbuffer, 4, 4);
        cs->sendcmd(SENDOUT);
      }
    }		      // Do not flush at this point, PostgreSQL 9.4 disapproves
    procmessage();
  }

  private void procmessage() {
    mixed err;
    int terminating = 0;
    err = catch {
    conxion ci = c;		// cache value FIXME sensible?
    conxiin cr = ci->i;		// cache value FIXME sensible?
#ifdef PG_DEBUG
    PD("Processloop\n");

#ifdef PG_DEBUGMORE
    void showportalstack(string label) {
      PD(sprintf(">>>>>>>>>>> Portalstack %s: %O\n", label, portal));
      foreach (qportals->peek_array(); ; int|sql_result qp)
        PD(" =========== Portal: %O\n", qp);
      PD("<<<<<<<<<<<<<< Portalstack end\n");
    };
#endif
    void showportal(int msgtype) {
      if (objectp(portal))
        PD("%d<%O %d %c switch portal\n",
         ci->socket->query_fd(), portal._portalname, ++ci->queueinidx, msgtype);
      else if (portal>0)
        PD("%d<Sync %d %d %c portal\n",
         ci->socket->query_fd(), ++ci->queueinidx, portal, msgtype);
    };
#endif
    int msgisfatal(mapping(string:string) msgresponse) {
      int isfatal = (has_prefix(msgresponse.C, "53")
                  || has_prefix(msgresponse.C, "3D")
                  || has_prefix(msgresponse.C, "57P")) && MAGICTERMINATE;
      if (isfatal && !terminating) // Run the callback once per lost connection
        runcallback(backendpid, "_lost", "");
      return isfatal;
    };
    for (;;) {
      err = catch {
#ifdef PG_DEBUG
        if (!portal && datarowdebug) {
          PD("%s rows %d\n", datarowdebug, datarowdebugcount);
          datarowdebug = 0; datarowdebugcount = 0;
        }
#endif
#ifdef PG_DEBUGMORE
        showportalstack("LOOPTOP");
#endif
        if (!sizeof(cr)) {			// Preliminary check, fast path
          Thread.MutexKey lock = cr->fillreadmux->lock();
          if (!sizeof(cr)) {			// Check for real
            if (!cr->fillread) {
              lock = 0;
              throw(MAGICTERMINATE);	// Force proper termination
            }
            cr->procmsg = 1;
            return;			// Terminate thread, wait for callback
          }
        }
        int msgtype = cr->read_int8();
        if (!portal) {
          portal = qportals->try_read();
#ifdef PG_DEBUG
          showportal(msgtype);
#endif
        }
        int msglen = cr->read_int32();
        msgsreceived++;
        bytesreceived += 1 + msglen;
        int errtype = NOERROR;
        PD("%d<", ci->socket->query_fd());
        switch (msgtype) {
          array getcols() {
            int bintext = cr->read_int8();
            int cols = cr->read_int16();
#ifdef PG_DEBUG
            array a;
            msglen -= 4 + 1 + 2 + 2 * cols;
            foreach (a = allocate(cols, ([])); ; mapping m)
              m.type = cr->read_int16();
#else
            cr->consume(cols << 1);
#endif		      // Discard column info, and make it line oriented
            return ({ ({(["name":"line"])}), ({bintext?BYTEAOID:TEXTOID}) });
          };
          array(string) reads() {
#ifdef PG_DEBUG
            if (msglen < 1)
              errtype = PROTOCOLERROR;
#endif
            array ret = emptyarray, aw = ({0});
            do {
              string w = cr->read_cstring();
              msglen -= sizeof(w) + 1; aw[0] = w; ret += aw;
            } while (msglen);
            return ret;
          };
          mapping(string:string) getresponse() {
            mapping(string:string) msgresponse = ([]);
            msglen -= 4;
            foreach (reads(); ; string f)
              if (sizeof(f))
                msgresponse[f[..0]] = f[1..];
            PD("%O\n", msgresponse);
            return msgresponse;
          };
          case 'R': {
            void authresponse(string|array msg) {
              void|bufcon|conxsess cs = ci->start();
              CHAIN(cs)->add_int8('p')->add_hstring(msg, 4, 4);
              cs->sendcmd(SENDOUT); // No flushing, PostgreSQL 9.4 disapproves
            };
            PD("Authentication ");
            msglen -= 4 + 4;
            int authtype, k;
            switch (authtype = cr->read_int32()) {
              case 0:
                PD("Ok\n");
                if (SASLcontext) {
                  PD("Authentication validation still in progress\n");
                  errtype = PROTOCOLUNSUPPORTED;
                } else
                  cancelsecret = "";
                break;
              case 2:
                PD("KerberosV5\n");
                errtype = PROTOCOLUNSUPPORTED;
                break;
              case 3:
                PD("ClearTextPassword\n");
                authresponse(({pass, 0}));
                break;
              case 4:
                PD("CryptPassword\n");
                errtype = PROTOCOLUNSUPPORTED;
                break;
              case 5:
                PD("MD5Password\n");
#ifdef PG_DEBUG
                if (msglen < 4)
                  errtype = PROTOCOLERROR;
#endif
#define md5hex(x) String.string2hex(Crypto.MD5.hash(x))
                authresponse(({"md5",
                 md5hex(md5hex(pass + user) + cr->read(msglen)), 0}));
#ifdef PG_DEBUG
                msglen = 0;
#endif
                break;
              case 6:
                PD("SCMCredential\n");
                errtype = PROTOCOLUNSUPPORTED;
                break;
              case 7:
                PD("GSS\n");
                errtype = PROTOCOLUNSUPPORTED;
                break;
              case 9:
                PD("SSPI\n");
                errtype = PROTOCOLUNSUPPORTED;
                break;
              case 8:
                PD("GSSContinue\n");
                errtype = PROTOCOLUNSUPPORTED;
                cr->read(msglen);				// SSauthdata
#ifdef PG_DEBUG
                if (msglen<1)
                  errtype = PROTOCOLERROR;
                msglen = 0;
#endif
                break;
              case 10: {
                string word;
                PD("AuthenticationSASL\n");
                k = 0;
                while (sizeof(word = cr->read_cstring())) {
                  switch (word) {
                    case "SCRAM-SHA-256":
                      k = 1;
                  }
#ifdef PG_DEBUG
                  msglen -= sizeof(word) + 1;
                  if (msglen < 1)
                    break;
#endif
                }
                if (k) {
                  SASLcontext = Crypto.SHA256.SCRAM();
                  word = SASLcontext.client_1();
                  authresponse(({
                    "SCRAM-SHA-256", 0, sprintf("%4c", sizeof(word)), word
                   }));
                } else
                  errtype = PROTOCOLUNSUPPORTED;
#ifdef PG_DEBUG
                if (msglen != 1)
                  errtype = PROTOCOLERROR;
                msglen = 0;
#endif
                break;
              }
              case 11: {
                PD("AuthenticationSASLContinue\n");
                string response;
                if (response
                 = SASLcontext.client_2(cr->read(msglen), pass))
                  authresponse(response);
                else
                  errtype = PROTOCOLERROR;
#ifdef PG_DEBUG
                msglen = 0;
#endif
                break;
              }
              case 12:
                PD("AuthenticationSASLFinal\n");
                if (SASLcontext.client_3(cr->read(msglen)))
                  SASLcontext = 0;	// Clears context and approves server
                else
                  errtype = PROTOCOLERROR;
#ifdef PG_DEBUG
                msglen = 0;
#endif
                break;
              default:
                PD("Unknown Authentication Method %c\n", authtype);
                errtype = PROTOCOLUNSUPPORTED;
                break;
            }
            switch (errtype) {
              default:
              case PROTOCOLUNSUPPORTED:
                error("Unsupported authenticationmethod %c\n", authtype);
              case NOERROR:
                break;
            }
            break;
          }
          case 'K':
            msglen -= 4 + 4; backendpid = cr->read_int32();
            cancelsecret = cr->read(msglen);
#ifdef PG_DEBUG
            PD("BackendKeyData %O\n", cancelsecret);
            msglen = 0;
#endif
            break;
          case 'S': {
            PD("ParameterStatus ");
            msglen -= 4;
            array(string) ts = reads();
#ifdef PG_DEBUG
            if (sizeof(ts) == 2) {
#endif
              runtimeparameter[ts[0]] = ts[1];
#ifdef PG_DEBUG
              PD("%O=%O\n", ts[0], ts[1]);
            } else
              errtype = PROTOCOLERROR;
#endif
            break;
          }
          case '3':
#ifdef PG_DEBUG
            PD("CloseComplete\n");
            msglen -= 4;
#endif
            break;
          case 'Z': {
            backendstatus = cr->read_int8();
#ifdef PG_DEBUG
            msglen -= 4 + 1;
            PD("ReadyForQuery %c\n", backendstatus);
#endif
#ifdef PG_DEBUGMORE
            showportalstack("READYFORQUERY");
#endif
            int keeplooking;
            do
              for (keeplooking = 0;
                   objectp(portal);
                   portal = qportals->read()) {
#ifdef PG_DEBUG
                showportal(msgtype);
#endif
                if (backendstatus == 'I' && intransaction
                 && portal->transtype != TRANSEND)
                  keeplooking = 1;
                portal->_purgeportal();
              }
            while (keeplooking && (portal = qportals->read()));
            if (backendstatus == 'I')
              intransaction = 0;
            foreach (qportals->peek_array(); ; sql_result qp) {
              if (objectp(qp) && qp._synctransact
               && qp._synctransact <= portal) {
                PD("Checking portal %O %d<=%d\n",
                 qp._portalname, qp._synctransact, portal);
                qp->_purgeportal();
              }
            }
            portal = 0;
#ifdef PG_DEBUGMORE
            showportalstack("AFTER READYFORQUERY");
#endif
            readyforquerycount--;
            if (readyforquery_cb)
              readyforquery_cb(), readyforquery_cb = 0;
            destruct(waitforauthready);
            break;
          }
          case '1':
#ifdef PG_DEBUG
            PD("ParseComplete portal %O\n", portal);
            msglen -= 4;
#endif
            break;
          case 't': {
            array a;
#ifdef PG_DEBUG
            int cols = cr->read_int16();
            PD("%O ParameterDescription %d values\n", portal._query, cols);
            msglen -= 4 + 2 + 4 * cols;
            a = cr->read_ints(cols, 4);
#else
            a = cr->read_ints(cr->read_int16(), 4);
#endif
#ifdef PG_DEBUGMORE
            PD("%O\n", a);
#endif
            if (portal._tprepared)
              portal._tprepared.datatypeoid = a;
            Thread.Thread(portal->_preparebind, a);
            break;
          }
          case 'T': {
            array a, at;
            int cols = cr->read_int16();
#ifdef PG_DEBUG
            PD("RowDescription %d columns %O\n", cols, portal._query);
            msglen -= 4 + 2;
#endif
            at = allocate(cols);
            foreach (a = allocate(cols); int i; ) {
              string s = cr->read_cstring();
              mapping(string:mixed) res = (["name":s]);
#ifdef PG_DEBUG
              msglen -= sizeof(s) + 1 + 4 + 2 + 4 + 2 + 4 + 2;
              res.tableoid = cr->read_int32() || UNDEFINED;
              res.tablecolattr = cr->read_int16() || UNDEFINED;
#else
              cr->consume(6);
#endif
              at[i] = cr->read_int32();
#ifdef PG_DEBUG
              res.type = at[i];
              {
                int len = cr->read_sint(2);
                res.length = len >= 0 ? len : "variable";
              }
              res.atttypmod = cr->read_int32();
              /* formatcode contains just a zero when Bind has not been issued
               * yet, but the content is irrelevant because it's determined
               * at query time
               */
              res.formatcode = cr->read_int16();
#else
              cr->consume(8);
#endif
              a[i] = res;
            }
#ifdef PG_DEBUGMORE
            PD("%O\n", a);
#endif
            if (portal._forcetext)
              portal->_setrowdesc(a, at);	// Do not consume queued portal
            else {
              portal->_processrowdesc(a, at);
              portal = 0;
            }
            break;
          }
          case 'n': {
#ifdef PG_DEBUG
            msglen -= 4;
            PD("NoData %O\n", portal._query);
#endif
            portal._fetchlimit = 0;		// disables subsequent Executes
            portal->_processrowdesc(emptyarray, emptyarray);
            portal = 0;
            break;
          }
          case 'H':
            portal->_processrowdesc(@getcols());
            PD("CopyOutResponse %O\n", portal. _query);
            break;
          case '2': {
            mapping tp;
#ifdef PG_DEBUG
            msglen -= 4;
            PD("%O BindComplete\n", portal._portalname);
#endif
            if (tp = portal._tprepared) {
              int tend = gethrtime();
              int tstart = tp.trun;
              if (tend == tstart)
                m_delete(prepareds, portal._query);
              else {
                tp.hits++;
                totalhits++;
                if (!tp.preparedname) {
                  if (sizeof(portal._preparedname)) {
                    PD("Finalising stored statement %s\n",
                     portal._preparedname);
                    tp.preparedname = portal._preparedname;
                  }
                  tstart = tend - tstart;
                  if (!tp.tparse || tp.tparse>tstart)
                    tp.tparse = tstart;
                }
                tp.trunstart = tend;
              }
            }
            break;
          }
          case 'D':
            msglen -= 4;
#ifdef PG_DEBUG
#ifdef PG_DEBUGMORE
            PD("%O DataRow %d bytes\n", portal._portalname, msglen);
#endif
            datarowdebugcount++;
            if (!datarowdebug)
              datarowdebug = sprintf(
               "%O DataRow %d bytes", portal._portalname, msglen);
#endif
#ifdef PG_DEBUG
            msglen=
#endif
            portal->_decodedata(msglen, runtimeparameter[CLIENT_ENCODING]);
            break;
          case 's':
#ifdef PG_DEBUG
            PD("%O PortalSuspended\n", portal._portalname);
            msglen -= 4;
#endif
            portal = 0;
            break;
          case 'C': {
            msglen -= 4;
#ifdef PG_DEBUG
            if (msglen < 1)
              errtype = PROTOCOLERROR;
#endif
            string s = cr->read(msglen - 1);
            portal->_storetiming();
            PD("%O CommandComplete %O\n", portal._portalname, s);
#ifdef PG_DEBUG
            if (cr->read_int8())
              errtype = PROTOCOLERROR;
            msglen = 0;
#else
            cr->consume(1);
#endif
#ifdef PG_DEBUGMORE
            showportalstack("COMMANDCOMPLETE");
#endif
            portal->_releasesession(s);
            portal = 0;
            break;
          }
          case 'I':
#ifdef PG_DEBUG
            PD("EmptyQueryResponse %O\n", portal._portalname);
            msglen -= 4;
#endif
#ifdef PG_DEBUGMORE
            showportalstack("EMPTYQUERYRESPONSE");
#endif
            portal->_releasesession();
            portal = 0;
            break;
          case 'd':
            PD("%O CopyData\n", portal._portalname);
            portal->_storetiming();
            msglen -= 4;
#ifdef PG_DEBUG
            if (msglen < 0)
              errtype = PROTOCOLERROR;
#endif
            portal->_processdataready(({cr->read(msglen)}), msglen);
#ifdef PG_DEBUG
            msglen = 0;
#endif
            break;
          case 'G':
            portal->_releasestatement();
            portal->_setrowdesc(@getcols());
            PD("%O CopyInResponse\n", portal._portalname);
            portal._state = COPYINPROGRESS;
            break;
          case 'c':
#ifdef PG_DEBUG
            PD("%O CopyDone\n", portal._portalname);
            msglen -= 4;
#endif
            portal = 0;
            break;
          case 'E': {
#ifdef PG_DEBUGMORE
            showportalstack("ERRORRESPONSE");
#endif
            if (portalsinflight->drained() && !readyforquerycount)
              sendsync();
            PD("%O ErrorResponse %O\n",
             objectp(portal) && (portal._portalname || portal._preparedname),
             objectp(portal) && portal._query);
            mapping(string:string) msgresponse;
            msgresponse = getresponse();
            warningsdropcount += warningscollected;
            warningscollected = 0;
            untolderror = 1;
            switch (msgresponse.C) {
              case "P0001":
                lastmessage
                 = ({sprintf("%s: %s", msgresponse.S, msgresponse.M)});
                USERERROR(a2nls(lastmessage
                               +({pinpointerror(portal._query, msgresponse.P)})
                               +showbindings(portal)));
              case "53000":case "53100":case "53200":case "53300":case "53400":
              case "57P01":case "57P02":case "57P03":case "57P04":case "3D000":
                preplastmessage(msgresponse);
                PD(a2nls(lastmessage)); throw(msgisfatal(msgresponse));
              case "08P01":case "42P05":
                errtype = PROTOCOLERROR;
              case "XX000":case "42883":case "42P01":
                invalidatecache = 1;
              default:
                preplastmessage(msgresponse);
                if (msgresponse.D)
                  lastmessage += ({msgresponse.D});
                if (msgresponse.H)
                  lastmessage += ({msgresponse.H});
                lastmessage += ({
                  pinpointerror(objectp(portal) && portal._query, msgresponse.P)
                  + pinpointerror(msgresponse.q, msgresponse.p)});
                if (msgresponse.W)
                  lastmessage += ({msgresponse.W});
                if (objectp(portal))
                  lastmessage += showbindings(portal);
                switch (msgresponse.S) {
                  case "PANIC":werror(a2nls(lastmessage));
                }
              case "25P02":		        // Preserve last error message
                USERERROR(a2nls(lastmessage));	// Implicitly closed portal
            }
            break;
          }
          case 'N': {
            PD("NoticeResponse\n");
            mapping(string:string) msgresponse = getresponse();
            if (clearmessage) {
              warningsdropcount += warningscollected;
              clearmessage = warningscollected = 0;
              lastmessage = emptyarray;
            }
            warningscollected++;
            lastmessage = ({sprintf("%s %s: %s",
                             msgresponse.S, msgresponse.C, msgresponse.M)});
            int val;
            if (val = msgisfatal(msgresponse)) {  // Some warnings are fatal
              preplastmessage(msgresponse);
              PD(a2nls(lastmessage)); throw(val);
            }
            break;
          }
          case 'A': {
            PD("NotificationResponse\n");
            msglen -= 4 + 4;
            int pid = cr->read_int32();
            string condition, extrainfo;
            {
              array(string) ts = reads();
              switch (sizeof(ts)) {
#if PG_DEBUG
                case 0:
                  errtype = PROTOCOLERROR;
                  break;
                default:
                  errtype = PROTOCOLERROR;
#endif
                case 2:
                  extrainfo = ts[1];
                case 1:
                  condition = ts[0];
              }
            }
            PD("%d %s\n%s\n", pid, condition, extrainfo);
            runcallback(pid, condition, extrainfo);
            break;
          }
          default:
            if (msgtype != -1) {
              string s;
              PD("Unknown message received %c\n", msgtype);
              s = cr->read(msglen -= 4); PD("%O\n", s);
#ifdef PG_DEBUG
              msglen = 0;
#endif
              errtype = PROTOCOLUNSUPPORTED;
            } else {
              lastmessage += ({
               sprintf("Connection lost to database %s@%s:%d/%s %d\n",
                    user, host, port, database, backendpid)});
              runcallback(backendpid, "_lost", "");
              if (!waitforauthready)
                throw(0);
              USERERROR(a2nls(lastmessage));
            }
            break;
        }
#ifdef PG_DEBUG
        if (msglen)
          errtype = PROTOCOLERROR;
#endif
        {
          string msg;
          switch (errtype) {
            case PROTOCOLUNSUPPORTED:
              msg
               = sprintf("Unsupported servermessage received %c\n", msgtype);
              break;
            case PROTOCOLERROR:
              msg = sprintf("Protocol error with database %s", host_info());
              break;
            case NOERROR:
              continue;				// Normal production loop
          }
          error(a2nls(lastmessage += ({msg})));
        }
      };			// We only get here if there is an error
      if (err == MAGICTERMINATE) { // Announce connection termination to server
        catch {
          void|bufcon|conxsess cs = ci->start();
          CHAIN(cs)->add("X\0\0\0\4");
          cs->sendcmd(SENDOUT);
        };
        terminating = 1;
        err = 0;
      } else if (stringp(err)) {
        sql_result or;
        if (!objectp(or = portal))
          or = this;
        if (!or.delayederror)
          or.delayederror = err;
#ifdef PG_DEBUGMORE
        showportalstack("THROWN");
#endif
        if (objectp(portal))
          portal->_releasesession();
        portal = 0;
        if (!waitforauthready)
          continue;		// Only continue if authentication did not fail
      }
      break;
    }
    PD("Closing database processloop %s\n", err ? describe_backtrace(err) : "");
    delayederror = err;
    if (objectp(portal)) {
#ifdef PG_DEBUG
      showportal(0);
#endif
      portal->_purgeportal();
    }
    destruct(waitforauthready);
    termlock = 0;
    if (err && !stringp(err))
      throw(err);
    };
    catch {
     unnamedstatement = 0;
     termlock = 0;
      if (err) {
        PD("Terminating processloop due to %s\n", describe_backtrace(err));
        delayederror = err;
      }
      destruct(waitforauthready);
      c->purge();
    };
  }

  final void close() {
    throwdelayederror(this);
    {
      Thread.MutexKey lock;
      if (unnamedstatement)
        termlock = unnamedstatement->lock(1);
      foreach (c->runningportals; sql_result result; )
        catch(result->status_command_complete());
      if (c)				// Prevent trivial backtraces
        c->close();
      if (unnamedstatement)
        lock = unnamedstatement->lock(1);
      if (c)
        c->purge();
    }
    destruct(waitforauthready);
  }

  private void destroy() {
    string errstring;
    mixed err = catch(close());
    backendreg = 0;
    if (untolderror) {
      /*
       * Flush out any asynchronously reported errors to stderr; because we are
       * inside a destructor, throwing an error will not work anymore.
       * Warnings will be silently discarded at this point.
       */
      lastmessage = filter(lastmessage, lambda(string val) {
        return has_prefix(val, "ERROR ") || has_prefix(val, "FATAL "); });
      if (err || (err = catch(errstring = geterror(1))))
        werror(describe_backtrace(err));
      else if (errstring && sizeof(errstring))
        werror("%s\n", errstring);	// Add missing terminating newline
    }
  }

  final void sendsync() {
    readyforquerycount++;
    c->start()->sendcmd(SYNCSEND);
  }

  private void runcallback(int pid, string condition, string extrainfo) {
    array cb;
    if (condition == "_lost")
      destruct(c);
    if ((cb = notifylist[condition] || notifylist[""])
       && (pid != backendpid || sizeof(cb) > 1 && cb[1]))
      callout(cb[0], 0, pid, condition, extrainfo, @cb[2..]);
  }

  private inline void closestatement(
   bufcon|conxsess plugbuffer, string oldprep) {
    closestatement(plugbuffer, oldprep);
  }
};