Skip to content
Snippets Groups Projects
Commit bac80cfd authored by Stephen R. van den Berg's avatar Stephen R. van den Berg
Browse files

Fix pgsql autoreconnect behaviour.

Rev: CHANGES:1.205
Rev: lib/modules/Sql.pmod/pgsql.pike:1.74
parent 41a3f648
No related branches found
No related tags found
No related merge requests found
...@@ -21,6 +21,8 @@ Bug fixes ...@@ -21,6 +21,8 @@ Bug fixes
o Fixed dangling cache reference in prepared statements when using CREATE o Fixed dangling cache reference in prepared statements when using CREATE
statements via Sql.pgsql. statements via Sql.pgsql.
o Fixed broken autoreconnect logic in Sql.pgsql.
o Improved widestring support for Parser.Tabular. o Improved widestring support for Parser.Tabular.
o Fixed segfault in combine_path_nt on windows when the first char o Fixed segfault in combine_path_nt on windows when the first char
......
...@@ -99,6 +99,7 @@ private int warningsdropcount; // Number of uncollected warnings ...@@ -99,6 +99,7 @@ private int warningsdropcount; // Number of uncollected warnings
private int prepstmtused; // Number of times prepared statements were used private int prepstmtused; // Number of times prepared statements were used
private int warningscollected; private int warningscollected;
private int invalidatecache; private int invalidatecache;
private int connectionclosed;
private string host, database, user, pass; private string host, database, user, pass;
private int port; private int port;
...@@ -538,6 +539,17 @@ final private string pinpointerror(void|string query,void|string offset) ...@@ -538,6 +539,17 @@ final private string pinpointerror(void|string query,void|string offset)
return MARKSTART+(k>1?query[..k-2]:"")+MARKERROR+query[k-1..]+MARKEND; return MARKSTART+(k>1?query[..k-2]:"")+MARKERROR+query[k-1..]+MARKEND;
} }
private void phasedreconnect()
{ connectionclosed=1;
if(!reconnect(1))
{ sleep(RECONNECTDELAY);
if(!reconnect(1))
{ sleep(RECONNECTBACKOFF);
reconnect(1);
}
}
}
final int _decodemsg(void|state waitforstate) final int _decodemsg(void|state waitforstate)
{ {
#ifdef DEBUG #ifdef DEBUG
...@@ -889,6 +901,11 @@ final int _decodemsg(void|state waitforstate) ...@@ -889,6 +901,11 @@ final int _decodemsg(void|state waitforstate)
case 'E':PD("ErrorResponse\n"); case 'E':PD("ErrorResponse\n");
{ mapping(string:string) msgresponse; { mapping(string:string) msgresponse;
msgresponse=getresponse(); msgresponse=getresponse();
void preplastmessage()
{ lastmessage=({sprintf("%s %s:%s %s\n (%s:%s:%s)",
msgresponse->S,msgresponse->C,msgresponse->P||"",msgresponse->M,
msgresponse->F||"",msgresponse->R||"",msgresponse->L||"")});
};
warningsdropcount+=warningscollected; warningsdropcount+=warningscollected;
warningscollected=0; warningscollected=0;
switch(msgresponse->C) switch(msgresponse->C)
...@@ -897,14 +914,16 @@ final int _decodemsg(void|state waitforstate) ...@@ -897,14 +914,16 @@ final int _decodemsg(void|state waitforstate)
USERERROR(a2nls(lastmessage USERERROR(a2nls(lastmessage
+({pinpointerror(_c.portal->_query,msgresponse->P)}) +({pinpointerror(_c.portal->_query,msgresponse->P)})
+showbindings())); +showbindings()));
case "57P01":case "57P02":case "57P03":
preplastmessage();phasedreconnect();
PD(a2nls(lastmessage));
USERERROR(a2nls(lastmessage));
case "08P01":case "42P05": case "08P01":case "42P05":
errtype=protocolerror; errtype=protocolerror;
case "XX000":case "42883":case "42P01": case "XX000":case "42883":case "42P01":
invalidatecache=1; invalidatecache=1;
default: default:
lastmessage=({sprintf("%s %s:%s %s\n (%s:%s:%s)", preplastmessage();
msgresponse->S,msgresponse->C,msgresponse->P||"",msgresponse->M,
msgresponse->F||"",msgresponse->R||"",msgresponse->L||"")});
if(msgresponse->D) if(msgresponse->D)
lastmessage+=({msgresponse->D}); lastmessage+=({msgresponse->D});
if(msgresponse->H) if(msgresponse->H)
...@@ -960,14 +979,7 @@ final int _decodemsg(void|state waitforstate) ...@@ -960,14 +979,7 @@ final int _decodemsg(void|state waitforstate)
} }
else else
{ array(string) msg=lastmessage; { array(string) msg=lastmessage;
if(!reconnect(1)) phasedreconnect();msg+=lastmessage;
{ sleep(RECONNECTDELAY);
if(!reconnect(1))
{ sleep(RECONNECTBACKOFF);
reconnect(1);
}
}
msg+=lastmessage;
string s=sizeof(msg)?a2nls(msg):""; string s=sizeof(msg)?a2nls(msg):"";
ERROR("%sConnection lost to database %s@%s:%d/%s %d\n", ERROR("%sConnection lost to database %s@%s:%d/%s %d\n",
s,user,host,port,database,backendpid); s,user,host,port,database,backendpid);
...@@ -982,9 +994,7 @@ final int _decodemsg(void|state waitforstate) ...@@ -982,9 +994,7 @@ final int _decodemsg(void|state waitforstate)
break; break;
case protocolerror: case protocolerror:
array(string) msg=lastmessage; array(string) msg=lastmessage;
lastmessage=({}); lastmessage=({});phasedreconnect();msg+=lastmessage;
reconnect(1);
msg+=lastmessage;
string s=sizeof(msg)?a2nls(msg):""; string s=sizeof(msg)?a2nls(msg):"";
ERROR("%sProtocol error with database %s\n",s,host_info()); ERROR("%sProtocol error with database %s\n",s,host_info());
break; break;
...@@ -1706,6 +1716,9 @@ object big_query(string q,void|mapping(string|int:mixed) bindings, ...@@ -1706,6 +1716,9 @@ object big_query(string q,void|mapping(string|int:mixed) bindings,
} // pgsql_result autoassigns to portal } // pgsql_result autoassigns to portal
else else
tp=UNDEFINED; tp=UNDEFINED;
connectionclosed=0;
for(;;)
{
.pgsql_util.pgsql_result(this,q,_fetchlimit,portalbuffersize,_alltyped,from); .pgsql_util.pgsql_result(this,q,_fetchlimit,portalbuffersize,_alltyped,from);
if(unnamedportalinuse) if(unnamedportalinuse)
portalname=PORTALPREFIX+(string)pportalcount++; portalname=PORTALPREFIX+(string)pportalcount++;
...@@ -1716,7 +1729,7 @@ object big_query(string q,void|mapping(string|int:mixed) bindings, ...@@ -1716,7 +1729,7 @@ object big_query(string q,void|mapping(string|int:mixed) bindings,
portalsinflight++; portalsopened++; portalsinflight++; portalsopened++;
clearmessage=1; clearmessage=1;
mixed err; mixed err;
if(err = catch if(!(err = catch
{ if(!sizeof(preparedname) || !tp || !tp->preparedname) { if(!sizeof(preparedname) || !tp || !tp->preparedname)
{ PD("Parse statement %s\n",preparedname); { PD("Parse statement %s\n",preparedname);
// Even though the protocol doesn't require the Parse command to be // Even though the protocol doesn't require the Parse command to be
...@@ -1910,11 +1923,14 @@ object big_query(string q,void|mapping(string|int:mixed) bindings, ...@@ -1910,11 +1923,14 @@ object big_query(string q,void|mapping(string|int:mixed) bindings,
} }
tprepared=tp; tprepared=tp;
} }
}) }))
{ PD("%O\n",err); break;
PD("%O\n",err);
resync(1); resync(1);
backendstatus=UNDEFINED; backendstatus=UNDEFINED;
if(!connectionclosed)
throw(err); throw(err);
tp=UNDEFINED;
} }
{ object tportal=_c.portal; // Make copy, because it might dislodge { object tportal=_c.portal; // Make copy, because it might dislodge
tportal->fetch_row(1); // upon initial fetch_row() tportal->fetch_row(1); // upon initial fetch_row()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment