Skip to content
Snippets Groups Projects
Commit 1c847c7a authored by Stephen R. van den Berg's avatar Stephen R. van den Berg
Browse files

pgsql: Disentangle portal-sync race for concurrent db-errors.

parent bd47ec9a
Branches
Tags
No related merge requests found
......@@ -25,6 +25,8 @@ private Thread.Mutex backendmux = Thread.Mutex();
private int clientsregistered;
constant emptyarray = ({});
constant describenodata
= (["datarowdesc":emptyarray,"datarowtypes":emptyarray]);
final multiset censoroptions=(<"use_ssl","force_ssl",
"cache_autoprepared_statements","reconnect","text_query","is_superuser",
"server_encoding","server_version","integer_datetimes",
......@@ -192,10 +194,16 @@ class bufcon {
final void sendcmd(int mode,void|sql_result portal) {
Thread.MutexKey lock=realbuffer->shortmux->lock();
if(portal)
if(portal) {
realbuffer->stashqueue->write(portal);
if (mode == SYNCSEND) {
add(PGSYNC);
realbuffer->stashqueue->write(1);
mode = SENDOUT;
}
}
realbuffer->stash->add(this);
mode=mergemode(realbuffer,mode);
mergemode(realbuffer, mode);
if(!--realbuffer->stashcount)
realbuffer->stashavail.signal();
lock=0;
......@@ -299,6 +307,7 @@ class conxion {
final bufcon|conxsess start(void|int waitforreal) {
Thread.MutexKey lock;
if(lock=(waitforreal?nostash->lock:nostash->trylock)(1)) {
int mode;
#ifdef PG_DEBUGRACE
conxsess sess = conxsess(this);
#endif
......@@ -306,10 +315,10 @@ class conxion {
lock=shortmux->lock();
if(stashcount)
PT(stashavail.wait(lock));
add(stash); stash->clear();
foreach(stashqueue->try_read_array();;sql_result portal)
queueup(portal);
mode = getstash(KEEP);
lock=0;
if (mode > KEEP)
sendcmd(mode); // Force out stash to the server
#ifdef PG_DEBUGRACE
return sess;
#else
......@@ -331,33 +340,29 @@ class conxion {
return 0;
}
final void sendcmd(void|int mode, void|sql_result portal) {
Thread.MutexKey lock;
int emptystash() {
int ret = 0;
if (started) {
lock = shortmux->lock();
private int getstash(int mode) {
if (sizeof(stash)) {
add(stash);
stash->clear();
foreach (stashqueue->try_read_array();; sql_result portal)
add(stash); stash->clear();
foreach (stashqueue->try_read_array();; int|sql_result portal)
if (intp(portal))
qportals->write(synctransact++);
else
queueup(portal);
ret = 1;
}
mode = mergemode(this, mode);
stashflushmode = KEEP;
}
return ret;
};
return mode;
}
emptystash();
do {
final void sendcmd(void|int mode, void|sql_result portal) {
Thread.MutexKey lock;
if (portal)
queueup(portal);
unfinalised:
do {
switch (mode) {
default:
continue;
break unfinalised;
case SYNCSEND:
PD("%d>Sync %d %d Queue\n",
socket->query_fd(), synctransact, ++queueoutidx);
......@@ -365,12 +370,14 @@ class conxion {
mode = SENDOUT;
break;
case FLUSHLOGSEND:
PD("%d>%O %d Queue simplequery %d bytes\n",
socket->query_fd(), portal._portalname, ++queueoutidx, sizeof(this));
PD("%d>%O %d Queue simplequery %d bytes\n", socket->query_fd(),
portal._portalname, ++queueoutidx, sizeof(this));
mode = FLUSHSEND;
}
qportals->write(synctransact++);
} while (!lock && emptystash());
} while(0);
lock = shortmux->lock();
mode = getstash(mode);
catch {
outer:
do {
......@@ -379,13 +386,13 @@ outer:
PD("%d>Skip flush %d Queue %O\n",
socket->query_fd(), mode, (string)this);
break outer;
case FLUSHLOGSEND:
case FLUSHSEND:
PD("Flush\n");
add(PGFLUSH);
case SYNCSEND:
case SENDOUT:;
}
if(!lock)
lock=shortmux->lock();
if(towrite=sizeof(this)) {
PD("%d>Sendcmd %O\n",socket->query_fd(),((string)this)[..towrite-1]);
towrite-=output_to(socket,towrite);
......@@ -1164,7 +1171,8 @@ class sql_result {
final void _sendexecute(int fetchlimit,void|bufcon|conxsess plugbuffer) {
int flushmode;
PD("Execute portal %O fetchlimit %d\n",_portalname,fetchlimit);
PD("Execute portal %O fetchlimit %d transtype %d\n", _portalname,
fetchlimit, transtype);
if(!plugbuffer)
plugbuffer=c->start(1);
CHAIN(plugbuffer)->add_int8('E')->add_hstring(({_portalname,0}), 4, 8)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment