Skip to content
Snippets Groups Projects
Commit 279d8426 authored by Stephen R. van den Berg's avatar Stephen R. van den Berg
Browse files

pgsql: Terminate threads when connections are idle.

parent 1adaf636
No related branches found
No related tags found
No related merge requests found
...@@ -587,11 +587,16 @@ private inline mixed callout(function(mixed ...:void) f, ...@@ -587,11 +587,16 @@ private inline mixed callout(function(mixed ...:void) f,
return .pgsql_util.local_backend->call_out(f,delay,@args); return .pgsql_util.local_backend->call_out(f,delay,@args);
} }
private int|.pgsql_util.sql_result portal; // state information procmessage
#ifdef PG_DEBUG
private string datarowdebug;
private int datarowdebugcount;
#endif
final void _processloop(.pgsql_util.conxion ci) { final void _processloop(.pgsql_util.conxion ci) {
int terminating=0; (c=ci)->socket->set_id(procmessage);
.pgsql_util.conxiin cr=ci->i; cancelsecret=0;
int|.pgsql_util.sql_result portal; portal=0;
mixed err;
{ {
Stdio.Buffer plugbuffer=Stdio.Buffer()->add_int32(PG_PROTOCOL(3,0)); Stdio.Buffer plugbuffer=Stdio.Buffer()->add_int32(PG_PROTOCOL(3,0));
if(user) if(user)
...@@ -612,14 +617,18 @@ final void _processloop(.pgsql_util.conxion ci) { ...@@ -612,14 +617,18 @@ final void _processloop(.pgsql_util.conxion ci) {
_connectfail(); _connectfail();
else else
destruct(waitforauthready); destruct(waitforauthready);
return; } else // Do not flush at this point, PostgreSQL 9.4 disapproves
procmessage();
} }
} // Do not flush at this point, PostgreSQL 9.4 disapproves }
cancelsecret=0;
private void procmessage() {
int terminating=0;
.pgsql_util.conxion ci=c; // cache value FIXME sensible?
.pgsql_util.conxiin cr=ci->i; // cache value FIXME sensible?
mixed err;
#ifdef PG_DEBUG #ifdef PG_DEBUG
PD("Processloop\n"); PD("Processloop\n");
string datarowdebug;
int datarowdebugcount;
void showportal(int msgtype) { void showportal(int msgtype) {
if(objectp(portal)) if(objectp(portal))
...@@ -638,6 +647,15 @@ final void _processloop(.pgsql_util.conxion ci) { ...@@ -638,6 +647,15 @@ final void _processloop(.pgsql_util.conxion ci) {
datarowdebug=0; datarowdebugcount=0; datarowdebug=0; datarowdebugcount=0;
} }
#endif #endif
if(!sizeof(cr)) { // Preliminary check, fast path
Thread.MutexKey lock=cr->fillreadmux->lock();
if(!sizeof(cr)) { // Check for real
cr->procmsg=1;
lock=0;
return; // Terminate thread, wait for callback
}
lock=0;
}
int msgtype=cr->read_int8(); int msgtype=cr->read_int8();
if(!portal) { if(!portal) {
portal=qportals->try_read(); portal=qportals->try_read();
......
...@@ -185,6 +185,7 @@ class conxiin { ...@@ -185,6 +185,7 @@ class conxiin {
final Thread.Condition fillread; final Thread.Condition fillread;
final Thread.Mutex fillreadmux; final Thread.Mutex fillreadmux;
final int procmsg;
private int didreadcb; private int didreadcb;
protected bool range_error(int howmuch) { protected bool range_error(int howmuch) {
...@@ -204,8 +205,11 @@ class conxiin { ...@@ -204,8 +205,11 @@ class conxiin {
} }
final int read_cb(mixed id,mixed b) { final int read_cb(mixed id,mixed b) {
PD("Read callback %O\n",(string)b);
Thread.MutexKey lock=fillreadmux->lock(); Thread.MutexKey lock=fillreadmux->lock();
if(fillread) if(procmsg)
procmsg=0,lock=0,Thread.Thread(id);
else if(fillread)
didreadcb=1, fillread.signal(); didreadcb=1, fillread.signal();
lock=0; lock=0;
return 0; return 0;
...@@ -225,7 +229,7 @@ class conxion { ...@@ -225,7 +229,7 @@ class conxion {
private Thread.Queue qportals; private Thread.Queue qportals;
final Thread.Mutex shortmux; final Thread.Mutex shortmux;
final Stdio.File socket; final Stdio.File socket;
private object pgsqlsess; private function(void|mixed:void) connectfail;
private int towrite; private int towrite;
final Thread.Mutex nostash; final Thread.Mutex nostash;
...@@ -327,8 +331,7 @@ outer: ...@@ -327,8 +331,7 @@ outer:
return; return;
}; };
lock=0; lock=0;
if(pgsqlsess) catch(connectfail());
pgsqlsess->_connectfail();
} }
final void sendterminate() { final void sendterminate() {
...@@ -349,10 +352,10 @@ outer: ...@@ -349,10 +352,10 @@ outer:
protected void destroy() { protected void destroy() {
catch(close()); // Exceptions don't work inside destructors catch(close()); // Exceptions don't work inside destructors
pgsqlsess=0; connectfail=0;
} }
final void connectloop(int nossl) { final void connectloop(object pgsqlsess, int nossl) {
mixed err=catch { mixed err=catch {
for(;;clear()) { for(;;clear()) {
socket->connect(pgsqlsess._host,pgsqlsess._port); socket->connect(pgsqlsess._host,pgsqlsess._port);
...@@ -392,11 +395,11 @@ outer: ...@@ -392,11 +395,11 @@ outer:
socket->set_backend(local_backend); socket->set_backend(local_backend);
socket->set_buffer_mode(i,0); socket->set_buffer_mode(i,0);
socket->set_nonblocking(i->read_cb,write_cb,close); socket->set_nonblocking(i->read_cb,write_cb,close);
connectfail=pgsqlsess->_connectfail;
Thread.Thread(pgsqlsess->_processloop,this); Thread.Thread(pgsqlsess->_processloop,this);
return; return;
}; };
if(pgsqlsess) catch(connectfail(err));
pgsqlsess->_connectfail(err);
} }
private string _sprintf(int type, void|mapping flags) { private string _sprintf(int type, void|mapping flags) {
...@@ -413,7 +416,7 @@ outer: ...@@ -413,7 +416,7 @@ outer:
return res; return res;
} }
protected void create(object _pgsqlsess,Thread.Queue _qportals,int nossl) { protected void create(object pgsqlsess,Thread.Queue _qportals,int nossl) {
o::create(); o::create();
qportals = _qportals; qportals = _qportals;
synctransact = 1; synctransact = 1;
...@@ -424,8 +427,7 @@ outer: ...@@ -424,8 +427,7 @@ outer:
stashavail=Thread.Condition(); stashavail=Thread.Condition();
stashqueue=Thread.Queue(); stashqueue=Thread.Queue();
stash=Stdio.Buffer(); stash=Stdio.Buffer();
pgsqlsess=_pgsqlsess; Thread.Thread(connectloop,pgsqlsess,nossl);
Thread.Thread(connectloop,nossl);
} }
}; };
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment