diff --git a/lib/modules/Sql.pmod/pgsql_util.pmod b/lib/modules/Sql.pmod/pgsql_util.pmod index ae53380246fbb5d97733e0325b3e39bf0b2c2001..50aba734ea506127ab3747e53c3a1d30a5081f65 100644 --- a/lib/modules/Sql.pmod/pgsql_util.pmod +++ b/lib/modules/Sql.pmod/pgsql_util.pmod @@ -320,6 +320,7 @@ class bufcon { realbuffer->socket->query_fd(), mode, realbuffer->stashflushmode); if (mode > realbuffer->stashflushmode) realbuffer->stashflushmode = mode; + lock = 0; dirty = 0; this->clear(); if (lock = realbuffer->nostash->trylock(1)) { @@ -475,10 +476,7 @@ class conxion { #endif started = lock; lock = 0; // Force release before acquiring next - lock = shortmux->lock(); - mode = getstash(KEEP); - lock = 0; - if (mode > KEEP) + if (sizeof(stash) && getstash(KEEP) > KEEP) sendcmd(mode); // Force out stash to the server #ifdef PG_DEBUGRACE return sess; @@ -501,19 +499,18 @@ class conxion { } private int getstash(int mode) { - if (sizeof(stash)) { - add(stash); stash->clear(); - foreach (stashqueue->try_read_array(); ; int|sql_result portal) - if (intp(portal)) - qportals->write(synctransact++); - else - queueup(portal); - PD("%d>Got stash mode %d > %d\n", - socket->query_fd(), stashflushmode, mode); - if (stashflushmode > mode) - mode = stashflushmode; - stashflushmode = KEEP; - } + Thread.MutexKey lock = shortmux->lock(); + add(stash); stash->clear(); + foreach (stashqueue->try_read_array(); ; int|sql_result portal) + if (intp(portal)) + qportals->write(synctransact++); + else + queueup(portal); + PD("%d>Got stash mode %d > %d\n", + socket->query_fd(), stashflushmode, mode); + if (stashflushmode > mode) + mode = stashflushmode; + stashflushmode = KEEP; return mode; } @@ -538,34 +535,46 @@ unfinalised: } qportals->write(synctransact++); } while (0); - Thread.MutexKey lock = shortmux->lock(); - mode = getstash(mode); + if (sizeof(stash)) + mode = getstash(mode); + for(;;) { #ifdef PG_DEBUG - mixed err = + mixed err = #endif - catch { + catch { outer: - do { - switch (mode) { - default: - PD("%d>Skip flush %d Queue %O\n", - socket->query_fd(), mode, (string)this); - break outer; - case FLUSHSEND: - PD("Flush\n"); - add(PGFLUSH); - case SENDOUT:; - } - if (towrite = sizeof(this)) { - PD("%d>Sendcmd %O\n", - socket->query_fd(), ((string)this)[..towrite-1]); - towrite -= output_to(socket, towrite); + do { + switch (mode) { + default: + PD("%d>Skip flush %d Queue %O\n", + socket->query_fd(), mode, (string)this); + break outer; + case FLUSHSEND: + PD("Flush\n"); + add(PGFLUSH); + case SENDOUT:; + } + if (towrite = sizeof(this)) { + PD("%d>Sendcmd %O\n", + socket->query_fd(), ((string)this)[..towrite-1]); + towrite -= output_to(socket, towrite); + } + } while (0); + started = 0; + if (sizeof(stash) && (started = nostash->trylock(1))) { +#ifdef PG_DEBUGRACE + conxsess sess = conxsess(this); + sess->sendcmd(SENDOUT); +#else + mode = getstash(SENDOUT); + continue; +#endif } - } while (0); - started = 0; - return; - }; - lock = 0; + return; + }; + break; + } + started = 0; PD("Sendcmd failed %s\n", describe_backtrace(err)); destruct(this); }