mirror of
https://github.com/9fans/plan9port.git
synced 2025-01-27 11:52:03 +00:00
251 lines
4.3 KiB
C
251 lines
4.3 KiB
C
#include <u.h>
|
|
#include <libc.h>
|
|
#include <venti.h>
|
|
#include "queue.h"
|
|
|
|
long ventisendbytes, ventisendpackets;
|
|
long ventirecvbytes, ventirecvpackets;
|
|
|
|
static int
|
|
_vtsend(VtConn *z, Packet *p)
|
|
{
|
|
IOchunk ioc;
|
|
int n, tot;
|
|
uchar buf[2];
|
|
|
|
if(z->state != VtStateConnected) {
|
|
werrstr("session not connected");
|
|
return -1;
|
|
}
|
|
|
|
/* add framing */
|
|
n = packetsize(p);
|
|
if(n >= (1<<16)) {
|
|
werrstr("packet too large");
|
|
packetfree(p);
|
|
return -1;
|
|
}
|
|
buf[0] = n>>8;
|
|
buf[1] = n;
|
|
packetprefix(p, buf, 2);
|
|
ventisendbytes += n+2;
|
|
ventisendpackets++;
|
|
|
|
tot = 0;
|
|
for(;;){
|
|
n = packetfragments(p, &ioc, 1, 0);
|
|
if(n == 0)
|
|
break;
|
|
if(write(z->outfd, ioc.addr, ioc.len) < ioc.len){
|
|
vtlog(VtServerLog, "<font size=-1>%T %s:</font> sending packet %p: %r<br>\n", z->addr, p);
|
|
packetfree(p);
|
|
return -1;
|
|
}
|
|
packetconsume(p, nil, ioc.len);
|
|
tot += ioc.len;
|
|
}
|
|
vtlog(VtServerLog, "<font size=-1>%T %s:</font> sent packet %p (%d bytes)<br>\n", z->addr, p, tot);
|
|
packetfree(p);
|
|
return 1;
|
|
}
|
|
|
|
static int
|
|
interrupted(void)
|
|
{
|
|
char e[ERRMAX];
|
|
|
|
rerrstr(e, sizeof e);
|
|
return strstr(e, "interrupted") != nil;
|
|
}
|
|
|
|
|
|
static Packet*
|
|
_vtrecv(VtConn *z)
|
|
{
|
|
uchar buf[10], *b;
|
|
int n;
|
|
Packet *p;
|
|
int size, len;
|
|
|
|
if(z->state != VtStateConnected) {
|
|
werrstr("session not connected");
|
|
return nil;
|
|
}
|
|
|
|
p = z->part;
|
|
/* get enough for head size */
|
|
size = packetsize(p);
|
|
while(size < 2) {
|
|
b = packettrailer(p, 2);
|
|
assert(b != nil);
|
|
if(0) fprint(2, "%d read hdr\n", getpid());
|
|
n = read(z->infd, b, 2);
|
|
if(0) fprint(2, "%d got %d (%r)\n", getpid(), n);
|
|
if(n==0 || (n<0 && !interrupted()))
|
|
goto Err;
|
|
size += n;
|
|
packettrim(p, 0, size);
|
|
}
|
|
|
|
if(packetconsume(p, buf, 2) < 0)
|
|
goto Err;
|
|
len = (buf[0] << 8) | buf[1];
|
|
size -= 2;
|
|
|
|
while(size < len) {
|
|
n = len - size;
|
|
if(n > MaxFragSize)
|
|
n = MaxFragSize;
|
|
b = packettrailer(p, n);
|
|
if(0) fprint(2, "%d read body %d\n", getpid(), n);
|
|
n = read(z->infd, b, n);
|
|
if(0) fprint(2, "%d got %d (%r)\n", getpid(), n);
|
|
if(n > 0)
|
|
size += n;
|
|
packettrim(p, 0, size);
|
|
if(n==0 || (n<0 && !interrupted()))
|
|
goto Err;
|
|
}
|
|
ventirecvbytes += len;
|
|
ventirecvpackets++;
|
|
p = packetsplit(p, len);
|
|
vtlog(VtServerLog, "<font size=-1>%T %s:</font> read packet %p len %d<br>\n", z->addr, p, len);
|
|
return p;
|
|
Err:
|
|
vtlog(VtServerLog, "<font size=-1>%T %s:</font> error reading packet: %r<br>\n", z->addr);
|
|
return nil;
|
|
}
|
|
|
|
/*
|
|
* If you fork off two procs running vtrecvproc and vtsendproc,
|
|
* then vtrecv/vtsend (and thus vtrpc) will never block except on
|
|
* rendevouses, which is nice when it's running in one thread of many.
|
|
*/
|
|
void
|
|
vtrecvproc(void *v)
|
|
{
|
|
Packet *p;
|
|
VtConn *z;
|
|
Queue *q;
|
|
|
|
z = v;
|
|
q = _vtqalloc();
|
|
|
|
qlock(&z->lk);
|
|
z->readq = q;
|
|
qlock(&z->inlk);
|
|
rwakeup(&z->rpcfork);
|
|
qunlock(&z->lk);
|
|
|
|
while((p = _vtrecv(z)) != nil)
|
|
if(_vtqsend(q, p) < 0){
|
|
packetfree(p);
|
|
break;
|
|
}
|
|
qunlock(&z->inlk);
|
|
qlock(&z->lk);
|
|
_vtqhangup(q);
|
|
while((p = _vtnbqrecv(q)) != nil)
|
|
packetfree(p);
|
|
_vtqdecref(q);
|
|
z->readq = nil;
|
|
rwakeup(&z->rpcfork);
|
|
qunlock(&z->lk);
|
|
vthangup(z);
|
|
}
|
|
|
|
void
|
|
vtsendproc(void *v)
|
|
{
|
|
Queue *q;
|
|
Packet *p;
|
|
VtConn *z;
|
|
|
|
z = v;
|
|
q = _vtqalloc();
|
|
|
|
qlock(&z->lk);
|
|
z->writeq = q;
|
|
qlock(&z->outlk);
|
|
rwakeup(&z->rpcfork);
|
|
qunlock(&z->lk);
|
|
|
|
while((p = _vtqrecv(q)) != nil)
|
|
if(_vtsend(z, p) < 0)
|
|
break;
|
|
qunlock(&z->outlk);
|
|
qlock(&z->lk);
|
|
_vtqhangup(q);
|
|
while((p = _vtnbqrecv(q)) != nil)
|
|
packetfree(p);
|
|
_vtqdecref(q);
|
|
z->writeq = nil;
|
|
rwakeup(&z->rpcfork);
|
|
qunlock(&z->lk);
|
|
return;
|
|
}
|
|
|
|
Packet*
|
|
vtrecv(VtConn *z)
|
|
{
|
|
Packet *p;
|
|
Queue *q;
|
|
|
|
qlock(&z->lk);
|
|
if(z->state != VtStateConnected){
|
|
werrstr("not connected");
|
|
qunlock(&z->lk);
|
|
return nil;
|
|
}
|
|
if(z->readq){
|
|
q = _vtqincref(z->readq);
|
|
qunlock(&z->lk);
|
|
p = _vtqrecv(q);
|
|
_vtqdecref(q);
|
|
return p;
|
|
}
|
|
|
|
qlock(&z->inlk);
|
|
qunlock(&z->lk);
|
|
p = _vtrecv(z);
|
|
qunlock(&z->inlk);
|
|
if(!p)
|
|
vthangup(z);
|
|
return p;
|
|
}
|
|
|
|
int
|
|
vtsend(VtConn *z, Packet *p)
|
|
{
|
|
Queue *q;
|
|
|
|
qlock(&z->lk);
|
|
if(z->state != VtStateConnected){
|
|
packetfree(p);
|
|
werrstr("not connected");
|
|
qunlock(&z->lk);
|
|
return -1;
|
|
}
|
|
if(z->writeq){
|
|
q = _vtqincref(z->writeq);
|
|
qunlock(&z->lk);
|
|
if(_vtqsend(q, p) < 0){
|
|
_vtqdecref(q);
|
|
packetfree(p);
|
|
return -1;
|
|
}
|
|
_vtqdecref(q);
|
|
return 0;
|
|
}
|
|
|
|
qlock(&z->outlk);
|
|
qunlock(&z->lk);
|
|
if(_vtsend(z, p) < 0){
|
|
qunlock(&z->outlk);
|
|
vthangup(z);
|
|
return -1;
|
|
}
|
|
qunlock(&z->outlk);
|
|
return 0;
|
|
}
|
|
|