kernel: Fix qio flow control

There is a pathological case with qio that triggers
a dead-lock for single threaded servers and
multiple requesters that can be reproduced like this:

int pfd[2];

void
main(int argc, char *argv[])
{
	char buf[0x10000];
	int i, n;

	ARGBEGIN {
	} ARGEND;

	if(pipe(pfd) < 0)
		sysfatal("pipe: %r");

	if(fork() == 0){
		while((n = read(pfd[0], buf, sizeof(buf))) > 0){
			sleep(10);
			write(pfd[0], buf, n);
		}
		exits(nil);
	}

	for(i = 0; i < PROCS; i++){
		if(fork() == 0){
			buf[0] = i;
			for(;;){
				write(pfd[1], buf, sizeof(buf));
				if(read(pfd[1], buf, sizeof(buf)) <= 0)
					break;
				print("%d %d\n", i, buf[0]);
			}
			exits(nil);
		}
	}
	waitpid();
}

The problem is how the reader decides to wake up the writer,
which was based only on the global queue length, but it should
really depend on the local queuing position of the writers
and their distance to the reader position.

Otherwise, a writer can be blocked even tho its message
has already been consumed by the reader.
When the reader tries to reply, it can get blocked himself
on writing the reply.

The new qio code basically makes sure that writers get
unblocked in order avoiding the issue.

The qio block statistics and qwindow() are gone now as
they where mostly unused.
This commit is contained in:
cinap_lenrek 2024-03-24 20:37:01 +00:00
parent 0f9c172712
commit ad9748422e
4 changed files with 581 additions and 667 deletions

View file

@ -55,15 +55,6 @@ defn imagecache() {
}
}
defn qiostats() {
print ("padblockcnt=", *padblockcnt\D, "\n");
print ("concatblockcnt=", *concatblockcnt\D, "\n");
print ("pullupblockcnt=", *pullupblockcnt\D, "\n");
print ("copyblockcnt=", *copyblockcnt\D, "\n");
print ("consumecnt=", *consumecnt\D, "\n");
print ("producecnt=", *producecnt\D, "\n");
}
// dump channels
defn chan(c) {
local d, q;

View file

@ -370,8 +370,6 @@ pipewstat(Chan *c, uchar *dp, int n)
n = convM2D(dp, n, &d, nil);
if(n == 0)
error(Eshortstat);
if(d.length < 1 || d.length > conf.pipeqsize)
error(Ebadarg);
p = c->aux;
switch(NETTYPE(c->qid.path)){
@ -379,6 +377,8 @@ pipewstat(Chan *c, uchar *dp, int n)
error(Eperm);
case Qdata0:
case Qdata1:
if((uvlong)d.length > conf.pipeqsize)
error(Ebadarg);
qsetlimit(p->q[0], d.length);
qsetlimit(p->q[1], d.length);
break;

View file

@ -301,7 +301,6 @@ Block* qremove(Queue*);
void qreopen(Queue*);
void qsetlimit(Queue*, int);
void qunlock(QLock*);
int qwindow(Queue*);
int qwrite(Queue*, void*, int);
void qnoblock(Queue*, int);
void randominit(void);

File diff suppressed because it is too large Load diff