Skip to content
Snippets Groups Projects
Commit 042bcdd8 authored by Stephen R. van den Berg's avatar Stephen R. van den Berg
Browse files

pgsql: Stall fetching rows if they are not being consumed yet.

parent c5d38e35
Branches
Tags
No related merge requests found
...@@ -1339,16 +1339,8 @@ class sql_result { ...@@ -1339,16 +1339,8 @@ class sql_result {
return retval; return retval;
} }
final void _processdataready(array datarow, void|int msglen) { private void replenishrows() {
bytesreceived += msglen; if (_fetchlimit && sizeof(datarows) <= _fetchlimit >> 1) {
inflight--;
if (_state<CLOSED)
datarows->write(datarow);
if (++index == 1)
PD("<%O _fetchlimit %d=min(%d||1,%d), inflight %d\n", _portalname,
_fetchlimit, (portalbuffersize >> 1) * index / bytesreceived,
pgsqlsess._fetchlimit, inflight);
if (_fetchlimit) {
_fetchlimit = _fetchlimit =
min((portalbuffersize >> 1) * index / bytesreceived || 1, min((portalbuffersize >> 1) * index / bytesreceived || 1,
pgsqlsess._fetchlimit); pgsqlsess._fetchlimit);
...@@ -1361,6 +1353,18 @@ class sql_result { ...@@ -1361,6 +1353,18 @@ class sql_result {
} }
} }
final void _processdataready(array datarow, void|int msglen) {
bytesreceived += msglen;
inflight--;
if (_state<CLOSED)
datarows->write(datarow);
if (++index == 1)
PD("<%O _fetchlimit %d=min(%d||1,%d), inflight %d\n", _portalname,
_fetchlimit, (portalbuffersize >> 1) * index / bytesreceived,
pgsqlsess._fetchlimit, inflight);
replenishrows();
}
private void releaseconditions() { private void releaseconditions() {
_unnamedportalkey = _unnamedstatementkey = 0; _unnamedportalkey = _unnamedstatementkey = 0;
if (!datarowtypes) { if (!datarowtypes) {
...@@ -1426,6 +1430,7 @@ class sql_result { ...@@ -1426,6 +1430,7 @@ class sql_result {
//! @[eof()], @[send_row()] //! @[eof()], @[send_row()]
/*semi*/final array(mixed) fetch_row() { /*semi*/final array(mixed) fetch_row() {
int|array datarow; int|array datarow;
replenishrows();
if (arrayp(datarow = datarows->try_read())) if (arrayp(datarow = datarows->try_read()))
return datarow; return datarow;
if (!eoffound) { if (!eoffound) {
...@@ -1455,12 +1460,14 @@ class sql_result { ...@@ -1455,12 +1460,14 @@ class sql_result {
/*semi*/final array(array(mixed)) fetch_row_array() { /*semi*/final array(array(mixed)) fetch_row_array() {
if (eoffound) if (eoffound)
return 0; return 0;
replenishrows();
array(array|int) datarow = datarows->try_read_array(); array(array|int) datarow = datarows->try_read_array();
if (!sizeof(datarow)) { if (!sizeof(datarow)) {
array cid = setuptimeout(); array cid = setuptimeout();
PT(datarow = datarows->read_array()); PT(datarow = datarows->read_array());
local_backend->remove_call_out(cid); local_backend->remove_call_out(cid);
} }
replenishrows();
if (arrayp(datarow[-1])) if (arrayp(datarow[-1]))
return datarow; return datarow;
do datarow = datarow[..<1]; // Swallow EOF mark(s) do datarow = datarow[..<1]; // Swallow EOF mark(s)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment