diff --git a/sys/lib/acid/kernel b/sys/lib/acid/kernel index 803703077..a5a090ef1 100644 --- a/sys/lib/acid/kernel +++ b/sys/lib/acid/kernel @@ -55,15 +55,6 @@ defn imagecache() { } } -defn qiostats() { - print ("padblockcnt=", *padblockcnt\D, "\n"); - print ("concatblockcnt=", *concatblockcnt\D, "\n"); - print ("pullupblockcnt=", *pullupblockcnt\D, "\n"); - print ("copyblockcnt=", *copyblockcnt\D, "\n"); - print ("consumecnt=", *consumecnt\D, "\n"); - print ("producecnt=", *producecnt\D, "\n"); -} - // dump channels defn chan(c) { local d, q; diff --git a/sys/src/9/port/devpipe.c b/sys/src/9/port/devpipe.c index ebb7233c1..2dd78d157 100644 --- a/sys/src/9/port/devpipe.c +++ b/sys/src/9/port/devpipe.c @@ -370,8 +370,6 @@ pipewstat(Chan *c, uchar *dp, int n) n = convM2D(dp, n, &d, nil); if(n == 0) error(Eshortstat); - if(d.length < 1 || d.length > conf.pipeqsize) - error(Ebadarg); p = c->aux; switch(NETTYPE(c->qid.path)){ @@ -379,6 +377,8 @@ pipewstat(Chan *c, uchar *dp, int n) error(Eperm); case Qdata0: case Qdata1: + if((uvlong)d.length > conf.pipeqsize) + error(Ebadarg); qsetlimit(p->q[0], d.length); qsetlimit(p->q[1], d.length); break; diff --git a/sys/src/9/port/portfns.h b/sys/src/9/port/portfns.h index 1d4ae875e..9dafb37ea 100644 --- a/sys/src/9/port/portfns.h +++ b/sys/src/9/port/portfns.h @@ -301,7 +301,6 @@ Block* qremove(Queue*); void qreopen(Queue*); void qsetlimit(Queue*, int); void qunlock(QLock*); -int qwindow(Queue*); int qwrite(Queue*, void*, int); void qnoblock(Queue*, int); void randominit(void); diff --git a/sys/src/9/port/qio.c b/sys/src/9/port/qio.c index 048248049..7de2e09a2 100644 --- a/sys/src/9/port/qio.c +++ b/sys/src/9/port/qio.c @@ -5,41 +5,34 @@ #include "fns.h" #include "../port/error.h" -static ulong padblockcnt; -static ulong concatblockcnt; -static ulong pullupblockcnt; -static ulong copyblockcnt; -static ulong consumecnt; -static ulong producecnt; - -#define QDEBUG if(0) +#define QDEBUG if(1) /* * IO queues */ typedef struct Queue Queue; - struct Queue { Lock; + int state; + int dlen; /* data length in bytes */ + uint rp, wp; /* read/write position (counting BALLOC() bytes) */ + int limit; /* max BALLOC() bytes in queue */ + int inilim; /* initial limit */ + uchar noblock; /* true if writes return immediately when q full */ + uchar eof; /* number of eofs read by user */ + Block* bfirst; /* buffer */ Block* blast; - int len; /* bytes allocated to queue */ - int dlen; /* data bytes in queue */ - int limit; /* max bytes in queue */ - int inilim; /* initial limit */ - int state; - int noblock; /* true if writes return immediately when q full */ - int eof; /* number of eofs read by user */ - + void* arg; /* argument to kick and bypass */ void (*kick)(void*); /* restart output */ void (*bypass)(void*, Block*); /* bypass queue altogether */ - void* arg; /* argument to kick */ QLock rlock; /* mutex for reading processes */ Rendez rr; /* process waiting to read */ + QLock wlock; /* mutex for writing processes */ Rendez wr; /* process waiting to write */ @@ -68,44 +61,6 @@ freeblist(Block *b) } } -/* - * pad a block to the front (or the back if size is negative) - */ -Block* -padblock(Block *bp, int size) -{ - int n; - Block *nbp; - - QDEBUG checkb(bp, "padblock 0"); - if(size >= 0){ - if(bp->rp - bp->base >= size){ - bp->rp -= size; - return bp; - } - n = BLEN(bp); - nbp = allocb(size+n); - nbp->rp += size; - nbp->wp = nbp->rp; - memmove(nbp->wp, bp->rp, n); - nbp->wp += n; - nbp->rp -= size; - } else { - size = -size; - if(bp->lim - bp->wp >= size) - return bp; - n = BLEN(bp); - nbp = allocb(n+size); - memmove(nbp->wp, bp->rp, n); - nbp->wp += n; - } - nbp->next = bp->next; - freeb(bp); - padblockcnt++; - QDEBUG checkb(nbp, "padblock 1"); - return nbp; -} - /* * return count of bytes in a string of blocks */ @@ -123,19 +78,33 @@ blocklen(Block *bp) } /* - * return count of space in blocks + * copy the contents of a string of blocks into + * memory from an offset. blocklist kept unchanged. + * return number of copied bytes. */ -int -blockalloclen(Block *bp) +long +readblist(Block *b, uchar *p, long n, ulong o) { - int len; + ulong m, r; - len = 0; - while(bp != nil) { - len += BALLOC(bp); - bp = bp->next; + r = 0; + while(n > 0 && b != nil){ + m = BLEN(b); + if(o >= m) + o -= m; + else { + m -= o; + if(n < m) + m = n; + memmove(p, b->rp + o, m); + p += m; + r += m; + n -= m; + o = 0; + } + b = b->next; } - return len; + return r; } /* @@ -150,7 +119,6 @@ concatblock(Block *bp) if(bp->next == nil) return bp; len = blocklen(bp); - concatblockcnt += len; return pullupblock(bp, len); } @@ -163,6 +131,8 @@ pullupblock(Block *bp, int n) Block *nbp; int i; + assert(n >= 0); + /* * this should almost always be true, it's * just to avoid every caller checking. @@ -185,7 +155,6 @@ pullupblock(Block *bp, int n) */ n -= BLEN(bp); while((nbp = bp->next) != nil){ - pullupblockcnt++; i = BLEN(nbp); if(i > n) { memmove(bp->wp, nbp->rp, n); @@ -224,6 +193,8 @@ pullupqueue(Queue *q, int n) { Block *b; + assert(n >= 0); + if(BLEN(q->bfirst) >= n) return q->bfirst; q->bfirst = pullupblock(q->bfirst, n); @@ -242,6 +213,9 @@ trimblock(Block *bp, int offset, int len) ulong l; Block *nb, *startb; + assert(len >= 0); + assert(offset >= 0); + QDEBUG checkb(bp, "trimblock 1"); l = blocklen(bp); if(offset == 0 && len == l) @@ -277,6 +251,43 @@ trimblock(Block *bp, int offset, int len) return startb; } +/* + * pad a block to the front (or the back if size is negative) + */ +Block* +padblock(Block *bp, int size) +{ + int n; + Block *nbp; + + QDEBUG checkb(bp, "padblock 0"); + if(size >= 0){ + if(bp->rp - bp->base >= size){ + bp->rp -= size; + return bp; + } + n = BLEN(bp); + nbp = allocb(size+n); + nbp->rp += size; + nbp->wp = nbp->rp; + memmove(nbp->wp, bp->rp, n); + nbp->wp += n; + nbp->rp -= size; + } else { + size = -size; + if(bp->lim - bp->wp >= size) + return bp; + n = BLEN(bp); + nbp = allocb(n+size); + memmove(nbp->wp, bp->rp, n); + nbp->wp += n; + } + nbp->next = bp->next; + freeb(bp); + QDEBUG checkb(nbp, "padblock 1"); + return nbp; +} + /* * copy 'count' bytes into a new block */ @@ -286,6 +297,8 @@ copyblock(Block *bp, int count) int l; Block *nbp; + assert(count >= 0); + QDEBUG checkb(bp, "copyblock 0"); nbp = allocb(count); for(; count > 0 && bp != nil; bp = bp->next){ @@ -300,7 +313,6 @@ copyblock(Block *bp, int count) memset(nbp->wp, 0, count); nbp->wp += count; } - copyblockcnt++; QDEBUG checkb(nbp, "copyblock 1"); return nbp; @@ -334,275 +346,6 @@ adjustblock(Block* bp, int len) return bp; } - -/* - * throw away up to count bytes from a - * list of blocks. Return count of bytes - * thrown away. - */ -int -pullblock(Block **bph, int count) -{ - Block *bp; - int n, bytes; - - bytes = 0; - if(bph == nil) - return 0; - - while(*bph != nil && count != 0) { - bp = *bph; - n = BLEN(bp); - if(count < n) - n = count; - bytes += n; - count -= n; - bp->rp += n; - QDEBUG checkb(bp, "pullblock "); - if(BLEN(bp) == 0) { - *bph = bp->next; - bp->next = nil; - freeb(bp); - } - } - return bytes; -} - -/* - * get next block from a queue, return null if nothing there - */ -Block* -qget(Queue *q) -{ - int dowakeup; - Block *b; - - /* sync with qwrite */ - ilock(q); - - b = q->bfirst; - if(b == nil){ - q->state |= Qstarve; - iunlock(q); - return nil; - } - QDEBUG checkb(b, "qget"); - q->bfirst = b->next; - b->next = nil; - q->len -= BALLOC(b); - q->dlen -= BLEN(b); - - /* if writer flow controlled, restart */ - if((q->state & Qflow) && q->len < q->limit/2){ - q->state &= ~Qflow; - dowakeup = 1; - } else - dowakeup = 0; - - iunlock(q); - - if(dowakeup) - wakeup(&q->wr); - - return b; -} - -/* - * throw away the next 'len' bytes in the queue - */ -int -qdiscard(Queue *q, int len) -{ - Block *b, *tofree = nil; - int dowakeup, n, sofar; - - ilock(q); - for(sofar = 0; sofar < len; sofar += n){ - b = q->bfirst; - if(b == nil) - break; - QDEBUG checkb(b, "qdiscard"); - n = BLEN(b); - if(n <= len - sofar){ - q->bfirst = b->next; - q->len -= BALLOC(b); - q->dlen -= BLEN(b); - - /* remember to free this */ - b->next = tofree; - tofree = b; - } else { - n = len - sofar; - b->rp += n; - q->dlen -= n; - } - } - - /* - * if writer flow controlled, restart - * - * This used to be - * q->len < q->limit/2 - * but it slows down tcp too much for certain write sizes. - * I really don't understand it completely. It may be - * due to the queue draining so fast that the transmission - * stalls waiting for the app to produce more data. - presotto - */ - if((q->state & Qflow) && q->len < q->limit){ - q->state &= ~Qflow; - dowakeup = 1; - } else - dowakeup = 0; - - iunlock(q); - - if(dowakeup) - wakeup(&q->wr); - - if(tofree != nil) - freeblist(tofree); - - return sofar; -} - -/* - * Interrupt level copy out of a queue, return # bytes copied. - */ -int -qconsume(Queue *q, void *vp, int len) -{ - Block *b, *tofree = nil; - int n, dowakeup; - uchar *p = vp; - - /* sync with qwrite */ - ilock(q); - - for(;;) { - b = q->bfirst; - if(b == nil){ - q->state |= Qstarve; - len = -1; - goto out; - } - QDEBUG checkb(b, "qconsume 1"); - - n = BLEN(b); - if(n > 0) - break; - q->bfirst = b->next; - q->len -= BALLOC(b); - - /* remember to free this */ - b->next = tofree; - tofree = b; - }; - - consumecnt += n; - if(n < len) - len = n; - memmove(p, b->rp, len); - b->rp += len; - q->dlen -= len; - - /* discard the block if we're done with it */ - if((q->state & Qmsg) || len == n){ - q->bfirst = b->next; - q->len -= BALLOC(b); - q->dlen -= BLEN(b); - - /* remember to free this */ - b->next = tofree; - tofree = b; - } - -out: - /* if writer flow controlled, restart */ - if((q->state & Qflow) && q->len < q->limit/2){ - q->state &= ~Qflow; - dowakeup = 1; - } else - dowakeup = 0; - - iunlock(q); - - if(dowakeup) - wakeup(&q->wr); - - if(tofree != nil) - freeblist(tofree); - - return len; -} - -int -qpass(Queue *q, Block *b) -{ - int len, dowakeup; - - /* sync with qread */ - dowakeup = 0; - ilock(q); - if(q->len >= q->limit){ - iunlock(q); - freeblist(b); - return -1; - } - if(q->state & Qclosed){ - iunlock(q); - freeblist(b); - return 0; - } - - len = qaddlist(q, b); - - if(q->len >= q->limit/2) - q->state |= Qflow; - - if(q->state & Qstarve){ - q->state &= ~Qstarve; - dowakeup = 1; - } - iunlock(q); - - if(dowakeup) - wakeup(&q->rr); - - return len; -} - -int -qpassnolim(Queue *q, Block *b) -{ - int len, dowakeup; - - /* sync with qread */ - dowakeup = 0; - ilock(q); - - if(q->state & Qclosed){ - iunlock(q); - freeblist(b); - return 0; - } - - len = qaddlist(q, b); - - if(q->len >= q->limit/2) - q->state |= Qflow; - - if(q->state & Qstarve){ - q->state &= ~Qstarve; - dowakeup = 1; - } - iunlock(q); - - if(dowakeup) - wakeup(&q->rr); - - return len; -} - /* * if the allocated space is way out of line with the used * space, reallocate to a smaller block @@ -627,48 +370,323 @@ packblock(Block *bp) return bp; } +/* + * throw away up to count bytes from a + * list of blocks. Return count of bytes + * thrown away. + */ +int +pullblock(Block **bph, int count) +{ + Block *bp; + int n, bytes; + + bytes = 0; + if(bph == nil) + return 0; + + while((bp = *bph) != nil && count > 0) { + QDEBUG checkb(bp, "pullblock "); + n = BLEN(bp); + if(count < n) + n = count; + bytes += n; + count -= n; + bp->rp += n; + if(BLEN(bp) == 0) { + *bph = bp->next; + bp->next = nil; + freeb(bp); + } + } + return bytes; +} + +/* + * remove a block from the front of the queue + */ +Block* +qremove(Queue *q) +{ + Block *b; + + b = q->bfirst; + if(b == nil) + return nil; + QDEBUG checkb(b, "qremove"); + q->bfirst = b->next; + b->next = nil; + q->dlen -= BLEN(b); + q->rp += BALLOC(b); + return b; +} + +/* + * put a block back to the front of the queue + */ +void +qputback(Queue *q, Block *b) +{ + QDEBUG checkb(b, "qputback"); + b->next = q->bfirst; + if(q->bfirst == nil) + q->blast = b; + q->bfirst = b; + q->dlen += BLEN(b); + q->rp -= BALLOC(b); +} + +/* + * after removing data from the queue, + * unlock queue and wakeup blocked writer. + * called at interrupt level. + */ +static int +iunlock_consumer(Queue *q) +{ + int s = q->state; + + /* stop flow control when back at or below the limit */ + if((int)(q->wp - q->rp) <= q->limit) + q->state = s & ~Qflow; + + iunlock(q); + + if(s & Qflow){ + /* + * wakeup flow controlled writers. + * note that this is done even when q->state + * still has Qflow set, as the unblocking + * condition depends on the writers local queuing + * position, not on the global queue length. + */ + wakeup(&q->wr); + } + return s; +} + +/* + * after removing data from the queue, + * unlock queue and wakeup blocked writer. + * get output going again when it was blocked. + * called at process level. + */ +static int +iunlock_reader(Queue *q) +{ + int s = iunlock_consumer(q); + + if(q->kick != nil && s & Qflow) + (*q->kick)(q->arg); + + return s; +} + +/* + * after inserting into queue, + * unlock queue and wakeup starved reader. + * called at interrupt level. + */ +static int +iunlock_producer(Queue *q) +{ + int s = q->state; + + /* start flow control when above the limit */ + if((int)(q->wp - q->rp) > q->limit) + s |= Qflow; + + q->state = s & ~Qstarve; + iunlock(q); + + if(s & Qstarve){ + Proc *p = wakeup(&q->rr); + + /* if we just wokeup a higher priority process, let it run */ + if(p != nil && up != nil && p->priority > up->priority && islo()) + sched(); + } + return s; +} + +/* + * unlock queue and wakeup starved reader. + * get output going again when it was starved. + * called at process level. + */ +static int +iunlock_writer(Queue *q) +{ + int s = iunlock_producer(q); + + if(q->kick != nil && s & (Qstarve|Qkick)) + (*q->kick)(q->arg); + + return s; +} + +/* + * get next block from a queue, return null if nothing there + * called at interrupt level. + */ +Block* +qget(Queue *q) +{ + Block *b; + + ilock(q); + if((b = qremove(q)) == nil){ + q->state |= Qstarve; + iunlock(q); + return nil; + } + iunlock_consumer(q); + + return b; +} + +/* + * Interrupt level copy out of a queue, return # bytes copied. + */ +int +qconsume(Queue *q, void *vp, int len) +{ + Block *b, *tofree = nil; + int n; + + assert(len >= 0); + + ilock(q); + for(;;) { + b = q->bfirst; + if(b == nil){ + q->state |= Qstarve; + len = -1; + goto out; + } + QDEBUG checkb(b, "qconsume 1"); + + n = BLEN(b); + if(n > 0) + break; + + /* get rid of zero-length blocks */ + q->bfirst = b->next; + q->rp += BALLOC(b); + + /* remember to free this */ + b->next = tofree; + tofree = b; + }; + + if(n < len) + len = n; + memmove(vp, b->rp, len); + b->rp += len; + q->dlen -= len; + + /* discard the block if we're done with it */ + if((q->state & Qmsg) || len == n){ + q->bfirst = b->next; + q->rp += BALLOC(b); + q->dlen -= BLEN(b); + + /* remember to free this */ + b->next = tofree; + tofree = b; + } +out: + iunlock_consumer(q); + + freeblist(tofree); + + return len; +} + +/* + * add a block list to a queue, return bytes added + */ +int +qaddlist(Queue *q, Block *b) +{ + int len; + + QDEBUG checkb(b, "qaddlist 1"); + + /* queue the block */ + if(q->bfirst != nil) + q->blast->next = b; + else + q->bfirst = b; + + len = BLEN(b); + q->wp += BALLOC(b); + while(b->next != nil){ + b = b->next; + QDEBUG checkb(b, "qaddlist 2"); + len += BLEN(b); + q->wp += BALLOC(b); + } + q->dlen += len; + q->blast = b; + return len; +} + +int +qpass(Queue *q, Block *b) +{ + int len; + + ilock(q); + if(q->state & Qclosed){ + iunlock(q); + freeblist(b); + return 0; + } + if(q->state & Qflow){ + iunlock(q); + freeblist(b); + return -1; + } + len = qaddlist(q, b); + iunlock_producer(q); + + return len; +} + +int +qpassnolim(Queue *q, Block *b) +{ + int len; + + ilock(q); + if(q->state & Qclosed){ + iunlock(q); + freeblist(b); + return 0; + } + len = qaddlist(q, b); + iunlock_producer(q); + + return len; +} + int qproduce(Queue *q, void *vp, int len) { Block *b; - int dowakeup; - uchar *p = vp; + + assert(len >= 0); b = iallocb(len); if(b == nil) return 0; - /* sync with qread */ - dowakeup = 0; - ilock(q); - - /* no waiting receivers, room in buffer? */ - if(q->len >= q->limit){ - q->state |= Qflow; - iunlock(q); - freeb(b); - return -1; - } - producecnt += len; - /* save in buffer */ - memmove(b->wp, p, len); + memmove(b->wp, vp, len); b->wp += len; - qaddlist(q, b); - if(q->state & Qstarve){ - q->state &= ~Qstarve; - dowakeup = 1; - } - - if(q->len >= q->limit) - q->state |= Qflow; - iunlock(q); - - if(dowakeup) - wakeup(&q->rr); - - return len; + return qpass(q, b); } /* @@ -679,6 +697,8 @@ qcopy(Queue *q, int len, ulong offset) { Block *b; + assert(len >= 0); + b = allocb(len); ilock(q); b->wp += readblist(q->bfirst, b->wp, len, offset); @@ -694,16 +714,18 @@ qopen(int limit, int msg, void (*kick)(void*), void *arg) { Queue *q; + assert(limit >= 0); + q = malloc(sizeof(Queue)); if(q == nil) return nil; + q->dlen = 0; + q->wp = q->rp = 0; q->limit = q->inilim = limit; q->kick = kick; q->arg = arg; - q->state = msg; - - q->state |= Qstarve; + q->state = msg | Qstarve; q->eof = 0; q->noblock = 0; @@ -720,10 +742,14 @@ qbypass(void (*bypass)(void*, Block*), void *arg) if(q == nil) return nil; + q->dlen = 0; + q->wp = q->rp = 0; q->limit = 0; q->arg = arg; q->bypass = bypass; q->state = 0; + q->eof = 0; + q->noblock = 0; return q; } @@ -733,7 +759,7 @@ notempty(void *a) { Queue *q = a; - return (q->state & Qclosed) || q->bfirst != nil; + return q->bfirst != nil || (q->state & Qclosed); } /* @@ -749,10 +775,9 @@ qwait(Queue *q) break; if(q->state & Qclosed){ - if(++q->eof > 3) - return -1; - if(*q->err && strcmp(q->err, Ehungup) != 0) + if(q->eof >= 3 || *q->err && strcmp(q->err, Ehungup) != 0) return -1; + q->eof++; return 0; } @@ -764,101 +789,6 @@ qwait(Queue *q) return 1; } -/* - * add a block list to a queue, return bytes added - */ -int -qaddlist(Queue *q, Block *b) -{ - int len, dlen; - - QDEBUG checkb(b, "qaddlist 1"); - - /* queue the block */ - if(q->bfirst != nil) - q->blast->next = b; - else - q->bfirst = b; - - len = BALLOC(b); - dlen = BLEN(b); - while(b->next != nil){ - b = b->next; - QDEBUG checkb(b, "qaddlist 2"); - - len += BALLOC(b); - dlen += BLEN(b); - } - q->blast = b; - q->len += len; - q->dlen += dlen; - return dlen; -} - -/* - * called with q ilocked - */ -Block* -qremove(Queue *q) -{ - Block *b; - - b = q->bfirst; - if(b == nil) - return nil; - QDEBUG checkb(b, "qremove"); - q->bfirst = b->next; - b->next = nil; - q->dlen -= BLEN(b); - q->len -= BALLOC(b); - return b; -} - -/* - * copy the contents of a string of blocks into - * memory from an offset. blocklist kept unchanged. - * return number of copied bytes. - */ -long -readblist(Block *b, uchar *p, long n, ulong o) -{ - ulong m, r; - - r = 0; - while(n > 0 && b != nil){ - m = BLEN(b); - if(o >= m) - o -= m; - else { - m -= o; - if(n < m) - m = n; - memmove(p, b->rp + o, m); - p += m; - r += m; - n -= m; - o = 0; - } - b = b->next; - } - return r; -} - -/* - * put a block back to the front of the queue - * called with q ilocked - */ -void -qputback(Queue *q, Block *b) -{ - b->next = q->bfirst; - if(q->bfirst == nil) - q->blast = b; - q->bfirst = b; - q->len += BALLOC(b); - q->dlen += BLEN(b); -} - /* * cut off n bytes from the end of *h. return a new * block with the tail and change *h to refer to the @@ -888,31 +818,6 @@ splitblock(Block **h, int n) } } -/* - * flow control, get producer going again - * called with q ilocked - */ -static void -qwakeup_iunlock(Queue *q) -{ - int dowakeup = 0; - - /* if writer flow controlled, restart */ - if((q->state & Qflow) && q->len < q->limit/2){ - q->state &= ~Qflow; - dowakeup = 1; - } - - iunlock(q); - - /* wakeup flow controlled writers */ - if(dowakeup){ - if(q->kick != nil) - q->kick(q->arg); - wakeup(&q->wr); - } -} - /* * get next block from a queue (up to a limit) */ @@ -922,6 +827,8 @@ qbread(Queue *q, int len) Block *b; int n; + assert(len >= 0); + eqlock(&q->rlock); if(waserror()){ qunlock(&q->rlock); @@ -954,9 +861,7 @@ qbread(Queue *q, int len) else b->wp -= n; } - - /* restart producer */ - qwakeup_iunlock(q); + iunlock_reader(q); qunlock(&q->rlock); poperror(); @@ -974,6 +879,8 @@ qread(Queue *q, void *vp, int len) Block *b, *first, **last; int m, n; + assert(len >= 0); + eqlock(&q->rlock); if(waserror()){ qunlock(&q->rlock); @@ -1005,8 +912,8 @@ again: freeb(qremove(q)); goto again; } - - /* grab the first block plus as many + /* + * grab the first block plus as many * following blocks as will partially * fit in the read. */ @@ -1029,8 +936,7 @@ again: if(n > len && (q->state & Qmsg) == 0) qputback(q, splitblock(last, n - len)); - /* restart producer */ - qwakeup_iunlock(q); + iunlock_reader(q); qunlock(&q->rlock); poperror(); @@ -1046,34 +952,39 @@ again: return n; } -static int -qnotfull(void *a) -{ - Queue *q = a; +/* + * a Flow represens a flow controlled + * writer on queue q with position p. + */ +typedef struct { + Queue* q; + uint p; +} Flow; - return q->len < q->limit || (q->state & Qclosed); +static int +unblocked(void *a) +{ + Flow *f = a; + Queue *q = f->q; + + return q->noblock || (int)(f->p - q->rp) <= q->limit || (q->state & Qclosed); } /* - * flow control, wait for queue to get below the limit + * flow control, wait for queue to drain back to the limit */ static void -qflow(Queue *q) +qflow(Flow *f) { - for(;;){ - if(q->noblock || qnotfull(q)) - break; - - ilock(q); - q->state |= Qflow; - iunlock(q); + Queue *q = f->q; + while(!unblocked(f)){ eqlock(&q->wlock); if(waserror()){ qunlock(&q->wlock); nexterror(); } - sleep(&q->wr, qnotfull, q); + sleep(&q->wr, unblocked, f); qunlock(&q->wlock); poperror(); } @@ -1085,8 +996,8 @@ qflow(Queue *q) long qbwrite(Queue *q, Block *b) { - int len, dowakeup; - Proc *p; + Flow flow; + int len; if(q->bypass != nil){ len = blocklen(b); @@ -1094,7 +1005,6 @@ qbwrite(Queue *q, Block *b) return len; } - dowakeup = 0; if(waserror()){ freeblist(b); nexterror(); @@ -1106,51 +1016,58 @@ qbwrite(Queue *q, Block *b) iunlock(q); error(q->err); } - - /* don't queue over the limit */ - if(q->len >= q->limit && q->noblock){ + /* + * if the queue is full, + * silently discard when non-blocking + */ + if(q->state & Qflow && q->noblock){ iunlock(q); poperror(); len = blocklen(b); freeblist(b); return len; } - len = qaddlist(q, b); - - /* make sure other end gets awakened */ - if(q->state & Qstarve){ - q->state &= ~Qstarve; - dowakeup = 1; - } - iunlock(q); poperror(); - /* get output going again */ - if(q->kick != nil && (dowakeup || (q->state&Qkick))) - q->kick(q->arg); - - /* wakeup anyone consuming at the other end */ - if(dowakeup){ - p = wakeup(&q->rr); - - /* if we just wokeup a higher priority process, let it run */ - if(p != nil && p->priority > up->priority) - sched(); + /* + * save our current position in queue + * for flow control below. + */ + flow.q = q; + flow.p = q->wp; + if(iunlock_writer(q) & Qflow){ + /* + * flow control, before allowing the process to continue and + * queue more. We do this here so that postnote can only + * interrupt us after the data has been queued. This means that + * things like 9p flushes and ssl messages will not be disrupted + * by software interrupts. + */ + qflow(&flow); } - /* - * flow control, before allowing the process to continue and - * queue more. We do this here so that postnote can only - * interrupt us after the data has been queued. This means that - * things like 9p flushes and ssl messages will not be disrupted - * by software interrupts. - */ - qflow(q); - return len; } +/* + * block here uninterruptable until queue drains. + */ +static void +qbloated(Queue *q) +{ + Flow flow; + + flow.q = q; + flow.p = q->wp; + while(waserror()){ + if(up->procctl == Proc_exitme || up->procctl == Proc_exitbig) + error(Egreg); + } + qflow(&flow); + poperror(); +} + /* * write to a queue. only Maxatomic bytes at a time is atomic. */ @@ -1161,8 +1078,11 @@ qwrite(Queue *q, void *vp, int len) Block *b; uchar *p = vp; + assert(len >= 0); + QDEBUG if(!islo()) print("qwrite hi %#p\n", getcallerpc(&q)); + /* * when the queue length grew over twice the limit, * block here before allocating more blocks. @@ -1170,14 +1090,8 @@ qwrite(Queue *q, void *vp, int len) * interrupted by notes, preventing effective * flow control. */ - if(q->len/2 >= q->limit && q->noblock == 0 && q->bypass == nil){ - while(waserror()){ - if(up->procctl == Proc_exitme || up->procctl == Proc_exitbig) - error(Egreg); - } - qflow(q); - poperror(); - } + if(q->state & Qflow && (int)(q->wp - q->rp)/2 > q->limit) + qbloated(q); sofar = 0; do { @@ -1207,11 +1121,11 @@ qwrite(Queue *q, void *vp, int len) int qiwrite(Queue *q, void *vp, int len) { - int n, sofar, dowakeup; + int n, sofar; Block *b; uchar *p = vp; - dowakeup = 0; + assert(len >= 0); sofar = 0; do { @@ -1226,34 +1140,105 @@ qiwrite(Queue *q, void *vp, int len) b->wp += n; ilock(q); - - if((q->state & Qclosed) != 0 || q->len >= q->limit){ + if(q->state & (Qflow|Qclosed)){ iunlock(q); freeb(b); break; } - - qaddlist(q, b); - - if(q->state & Qstarve){ - q->state &= ~Qstarve; - dowakeup = 1; - } - - iunlock(q); - - if(dowakeup){ - if(q->kick != nil) - q->kick(q->arg); - wakeup(&q->rr); - } - - sofar += n; + sofar += qaddlist(q, b); + iunlock_writer(q); } while(sofar < len && (q->state & Qmsg) == 0); return sofar; } +/* + * throw away the next 'len' bytes in the queue + */ +int +qdiscard(Queue *q, int len) +{ + Block *b, *tofree = nil; + int n, sofar; + + assert(len >= 0); + + ilock(q); + for(sofar = 0; sofar < len; sofar += n){ + b = q->bfirst; + if(b == nil) + break; + QDEBUG checkb(b, "qdiscard"); + n = BLEN(b); + if(n <= len - sofar){ + q->bfirst = b->next; + q->rp += BALLOC(b); + + /* remember to free this */ + b->next = tofree; + tofree = b; + } else { + n = len - sofar; + b->rp += n; + } + q->dlen -= n; + } + iunlock_reader(q); + + freeblist(tofree); + + return sofar; +} + +/* + * flush the output queue + */ +void +qflush(Queue *q) +{ + Block *tofree; + + ilock(q); + tofree = q->bfirst; + q->bfirst = nil; + q->rp = q->wp; + q->dlen = 0; + iunlock_reader(q); + + freeblist(tofree); +} + +/* + * Mark a queue as closed. No further IO is permitted. + * All blocks are released. + */ +void +qclose(Queue *q) +{ + Block *tofree; + + if(q == nil) + return; + + ilock(q); + q->state |= Qclosed; + q->state &= ~(Qflow|Qstarve); + kstrcpy(q->err, Ehungup, ERRMAX); + tofree = q->bfirst; + q->bfirst = nil; + q->rp = q->wp; + q->dlen = 0; + q->noblock = 0; + iunlock(q); + + /* wake up readers/writers */ + wakeup(&q->rr); + wakeup(&q->wr); + + /* free queued blocks */ + freeblist(tofree); +} + /* * be extremely careful when calling this, * as there is no reference accounting @@ -1265,38 +1250,6 @@ qfree(Queue *q) free(q); } -/* - * Mark a queue as closed. No further IO is permitted. - * All blocks are released. - */ -void -qclose(Queue *q) -{ - Block *bfirst; - - if(q == nil) - return; - - /* mark it */ - ilock(q); - q->state |= Qclosed; - q->state &= ~(Qflow|Qstarve); - kstrcpy(q->err, Ehungup, ERRMAX); - bfirst = q->bfirst; - q->bfirst = nil; - q->len = 0; - q->dlen = 0; - q->noblock = 0; - iunlock(q); - - /* free queued blocks */ - freeblist(bfirst); - - /* wake up readers/writers */ - wakeup(&q->rr); - wakeup(&q->wr); -} - /* * Mark a queue as closed. Wakeup any readers. Don't remove queued * blocks. @@ -1304,7 +1257,6 @@ qclose(Queue *q) void qhangup(Queue *q, char *msg) { - /* mark it */ ilock(q); q->state |= Qclosed; if(msg == nil || *msg == '\0') @@ -1349,20 +1301,6 @@ qlen(Queue *q) return q->dlen; } -/* - * return space remaining before flow control - */ -int -qwindow(Queue *q) -{ - int l; - - l = q->limit - q->len; - if(l < 0) - l = 0; - return l; -} - /* * return true if we can read without blocking */ @@ -1372,13 +1310,26 @@ qcanread(Queue *q) return q->bfirst != nil; } +/* + * return non-zero when the queue is full + */ +int +qfull(Queue *q) +{ + return q->state & Qflow; +} + /* * change queue limit */ void qsetlimit(Queue *q, int limit) { + assert(limit >= 0); + + ilock(q); q->limit = limit; + iunlock_consumer(q); } /* @@ -1387,34 +1338,7 @@ qsetlimit(Queue *q, int limit) void qnoblock(Queue *q, int onoff) { - q->noblock = onoff; -} - -/* - * flush the output queue - */ -void -qflush(Queue *q) -{ - Block *bfirst; - - /* mark it */ ilock(q); - bfirst = q->bfirst; - q->bfirst = nil; - q->len = 0; - q->dlen = 0; - iunlock(q); - - /* free queued blocks */ - freeblist(bfirst); - - /* wake up writers */ - wakeup(&q->wr); -} - -int -qfull(Queue *q) -{ - return q->state & Qflow; + q->noblock = onoff; + iunlock_consumer(q); }