Skip to content
Snippets Groups Projects
Commit c3317ce8 authored by Stephen R. van den Berg's avatar Stephen R. van den Berg
Browse files

pgsql: Track back deadlock, explicitly release lock, speeds up code.

parent 5f13d6fa
No related branches found
No related tags found
No related merge requests found
......@@ -473,7 +473,7 @@ class conxion {
PD("Nostash locked by %s\n",
describe_backtrace(nostash->current_locking_thread()->backtrace()));
#endif
while (lock = (waitforreal ? nostash->lock : nostash->trylock)(1)) {
while (lock = (waitforreal > 0 ? nostash->lock : nostash->trylock)(1)) {
int mode;
if (sizeof(stash) && (mode = getstash(KEEP)) > KEEP)
sendcmd(mode); // Force out stash to the server
......@@ -492,7 +492,7 @@ class conxion {
return this;
#endif
}
return bufcon(this)->start();
return !waitforreal && bufcon(this)->start();
}
private int write_cb() {
......@@ -894,9 +894,15 @@ class sql_result {
{
Thread.MutexKey lock = closemux->lock();
if (_fetchlimit) {
array reflock = ({ lock });
lock = 0;
_sendexecute(_fetchlimit = 0, reflock);
array reflock = ({ _fetchlimit = 0 });
for (;;) {
reflock[0] = lock;
lock = 0;
if (!_sendexecute(0, reflock)) {
lock = closemux->lock();
continue;
}
}
} else
lock = 0; // Force release before acquiring next
lock = _ddescribemux->lock();
......@@ -1467,20 +1473,25 @@ class sql_result {
private void replenishrows() {
if (_fetchlimit && datarows->size() <= _fetchlimit >> 1
&& _state >= COMMITTED) {
Thread.MutexKey lock = closemux->lock();
if (_fetchlimit) {
_fetchlimit = pgsqlsess._fetchlimit;
if (bytesreceived)
_fetchlimit =
min((portalbuffersize >> 1) * index / bytesreceived || 1, _fetchlimit);
if (_fetchlimit)
if (inflight <= (_fetchlimit - 1) >> 1) {
array reflock = ({ lock });
lock = 0;
_sendexecute(_fetchlimit, reflock);
} else
PD("<%O _fetchlimit %d, inflight %d, skip execute\n",
_portalname, _fetchlimit, inflight);
Thread.MutexKey lock;
for (;;) {
lock = closemux->lock();
if (_fetchlimit) {
_fetchlimit = pgsqlsess._fetchlimit;
if (bytesreceived)
_fetchlimit = min((portalbuffersize >> 1)
* index / bytesreceived || 1, _fetchlimit);
if (_fetchlimit)
if (inflight <= (_fetchlimit - 1) >> 1) {
array reflock = ({ lock });
lock = 0;
if (!_sendexecute(_fetchlimit, reflock))
continue;
} else
PD("<%O _fetchlimit %d, inflight %d, skip execute\n",
_portalname, _fetchlimit, inflight);
}
break;
}
}
}
......@@ -1522,7 +1533,7 @@ class sql_result {
array(Thread.MutexKey) reflock = ({closemux->lock()});
if (!catch(plugbuffer = c->start()))
plugbuffer->sendcmd(_closeportal(plugbuffer, reflock));
reflock = 0;
reflock[0] = 0;
if (_state < CLOSED) {
stmtifkey = 0;
_state = CLOSED;
......@@ -1537,7 +1548,7 @@ class sql_result {
};
}
final void _sendexecute(int fetchlimit,
final int _sendexecute(int fetchlimit,
void|array(Thread.MutexKey)|bufcon|conxsess plugbuffer) {
int flushmode;
array(Thread.MutexKey) reflock;
......@@ -1545,7 +1556,10 @@ class sql_result {
fetchlimit, transtype);
if (arrayp(plugbuffer)) {
reflock = plugbuffer;
plugbuffer = c->start(1);
if (!(plugbuffer = c->start(-1))) {
reflock[0] = 0;
return 0; // Found potential deadlock, release and try again
}
}
CHAIN(plugbuffer)->add_int8('E')->add_hstring(({_portalname, 0}), 4, 8)
->add_int32(fetchlimit);
......@@ -1557,6 +1571,9 @@ class sql_result {
} else
inflight += fetchlimit, flushmode = FLUSHSEND;
plugbuffer->sendcmd(flushmode, this);
if (reflock)
reflock[0] = 0;
return 1;
}
inline private array setuptimeout() {
......@@ -1578,6 +1595,8 @@ class sql_result {
//! @[eof()], @[send_row()]
/*semi*/final array(mixed) fetch_row() {
int|array datarow;
if (!this) // If object already destructed, return fast
return 0;
replenishrows();
if (arrayp(datarow = datarows->try_read()))
return datarow;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment