1
0
Fork 0
mirror of https://github.com/9fans/plan9port.git synced 2025-01-24 11:41:58 +00:00
plan9port/src/cmd/9pserve.c

1252 lines
22 KiB
C
Raw Normal View History

2003-12-06 18:08:52 +00:00
#include <u.h>
#include <libc.h>
#include <fcall.h>
#include <thread.h>
#include <errno.h>
2003-12-06 18:08:52 +00:00
enum
{
STACK = 32768,
NHASH = 31,
MAXMSG = 64, /* per connection */
};
typedef struct Hash Hash;
typedef struct Fid Fid;
typedef struct Msg Msg;
typedef struct Conn Conn;
typedef struct Queue Queue;
struct Hash
{
Hash *next;
uint n;
void *v;
};
struct Fid
{
int fid;
int ref;
int cfid;
int openfd;
2003-12-06 18:08:52 +00:00
Fid *next;
};
struct Msg
{
Conn *c;
int internal;
int ref;
int ctag;
int tag;
int isopenfd;
2003-12-06 18:08:52 +00:00
Fcall tx;
Fcall rx;
Fid *fid;
Fid *newfid;
Fid *afid;
Msg *oldm;
Msg *next;
uchar *tpkt;
uchar *rpkt;
};
struct Conn
{
int fd;
int fdmode;
Fid *fdfid;
2003-12-06 18:08:52 +00:00
int nmsg;
int nfid;
Channel *inc;
Channel *internal;
int inputstalled;
char dir[40];
Hash *tag[NHASH];
Hash *fid[NHASH];
Queue *outq;
Queue *inq;
};
char *addr;
int afd;
char adir[40];
int isunix;
Queue *outq;
Queue *inq;
int verbose = 0;
2005-01-04 21:24:52 +00:00
int logging = 0;
int msize = 8192;
2003-12-06 18:08:52 +00:00
void *gethash(Hash**, uint);
int puthash(Hash**, uint, void*);
int delhash(Hash**, uint, void*);
Msg *mread9p(Ioproc*, int);
int mwrite9p(Ioproc*, int, uchar*);
uchar *read9ppkt(Ioproc*, int);
2003-12-06 18:08:52 +00:00
int write9ppkt(int, uchar*);
2005-01-04 21:24:52 +00:00
Msg *msgnew(int);
2003-12-06 18:08:52 +00:00
void msgput(Msg*);
2005-01-04 21:24:52 +00:00
void msgclear(Msg*);
2003-12-06 18:08:52 +00:00
Msg *msgget(int);
2005-01-04 21:24:52 +00:00
void msgincref(Msg*);
2003-12-06 18:08:52 +00:00
Fid *fidnew(int);
void fidput(Fid*);
void *emalloc(int);
void *erealloc(void*, int);
Queue *qalloc(void);
2003-12-06 18:08:52 +00:00
int sendq(Queue*, void*);
void *recvq(Queue*);
void connthread(void*);
void connoutthread(void*);
2003-12-06 18:08:52 +00:00
void listenthread(void*);
void outputthread(void*);
void inputthread(void*);
2003-12-06 18:08:52 +00:00
void rewritehdr(Fcall*, uchar*);
int tlisten(char*, char*);
int taccept(int, char*);
int iolisten(Ioproc*, char*, char*);
int ioaccept(Ioproc*, int, char*);
int iorecvfd(Ioproc*, int);
int iosendfd(Ioproc*, int, int);
void mainproc(void*);
int ignorepipe(void*, char*);
2005-01-04 21:24:52 +00:00
int timefmt(Fmt*);
2003-12-06 18:08:52 +00:00
void
usage(void)
{
2005-01-04 21:24:52 +00:00
fprint(2, "usage: 9pserve [-lv] address\n");
2003-12-06 18:08:52 +00:00
fprint(2, "\treads/writes 9P messages on stdin/stdout\n");
exits("usage");
}
uchar vbuf[128];
extern int _threaddebuglevel;
2003-12-06 18:08:52 +00:00
void
threadmain(int argc, char **argv)
{
char *file;
2005-01-04 21:24:52 +00:00
int fd;
2003-12-06 18:08:52 +00:00
ARGBEGIN{
default:
usage();
case 'v':
verbose++;
break;
2003-12-06 18:08:52 +00:00
case 'u':
isunix = 1;
break;
2005-01-04 21:24:52 +00:00
case 'l':
logging++;
break;
2003-12-06 18:08:52 +00:00
}ARGEND
if(argc != 1)
usage();
addr = argv[0];
2003-12-06 18:08:52 +00:00
2005-01-04 21:24:52 +00:00
fmtinstall('T', timefmt);
2003-12-06 18:08:52 +00:00
if((afd = announce(addr, adir)) < 0)
sysfatal("announce %s: %r", addr);
2005-01-04 21:24:52 +00:00
if(logging){
if(strncmp(addr, "unix!", 5) == 0)
addr += 5;
file = smprint("%s.log", addr);
if(file == nil)
sysfatal("smprint log: %r");
if((fd = create(file, OWRITE, 0666)) < 0)
sysfatal("create %s: %r", file);
dup(fd, 2);
if(fd > 2)
close(fd);
}
if(verbose) fprint(2, "%T 9pserve running\n");
proccreate(mainproc, nil, STACK);
}
void
mainproc(void *v)
{
int n, nn;
Fcall f;
USED(v);
atnotify(ignorepipe, 1);
fmtinstall('D', dirfmt);
fmtinstall('M', dirmodefmt);
fmtinstall('F', fcallfmt);
fmtinstall('H', encodefmt);
outq = qalloc();
inq = qalloc();
f.type = Tversion;
f.version = "9P2000";
f.msize = msize;
f.tag = NOTAG;
n = convS2M(&f, vbuf, sizeof vbuf);
2005-01-04 21:24:52 +00:00
if(verbose > 1) fprint(2, "%T * <- %F\n", &f);
nn = write(1, vbuf, n);
if(n != nn)
sysfatal("error writing Tversion: %r\n");
2004-12-27 16:56:16 +00:00
n = read9pmsg(0, vbuf, sizeof vbuf);
if(convM2S(vbuf, n, &f) != n)
sysfatal("convM2S failure");
if(f.msize < msize)
msize = f.msize;
2005-01-04 21:24:52 +00:00
if(verbose > 1) fprint(2, "%T * -> %F\n", &f);
threadcreate(inputthread, nil, STACK);
threadcreate(outputthread, nil, STACK);
threadcreate(listenthread, nil, STACK);
threadexits(0);
}
int
ignorepipe(void *v, char *s)
{
USED(v);
if(strcmp(s, "sys: write on closed pipe") == 0)
return 1;
2005-01-04 21:24:52 +00:00
fprint(2, "%T msg: %s\n", s);
return 0;
2003-12-06 18:08:52 +00:00
}
void
listenthread(void *arg)
{
Conn *c;
Ioproc *io;
2003-12-06 18:08:52 +00:00
io = ioproc();
2003-12-06 18:08:52 +00:00
USED(arg);
2004-12-28 01:37:18 +00:00
threadsetname("listen %s", adir);
2003-12-06 18:08:52 +00:00
for(;;){
c = emalloc(sizeof(Conn));
c->fd = iolisten(io, adir, c->dir);
2003-12-06 18:08:52 +00:00
if(c->fd < 0){
2005-01-04 21:24:52 +00:00
if(verbose) fprint(2, "%T listen: %r\n");
2003-12-06 18:08:52 +00:00
close(afd);
free(c);
return;
}
c->inc = chancreate(sizeof(void*), 0);
c->internal = chancreate(sizeof(void*), 0);
c->inq = qalloc();
c->outq = qalloc();
2005-01-04 21:24:52 +00:00
if(verbose) fprint(2, "%T incoming call on %s\n", c->dir);
2003-12-06 18:08:52 +00:00
threadcreate(connthread, c, STACK);
}
}
void
send9pmsg(Msg *m)
2003-12-06 18:08:52 +00:00
{
int n, nn;
n = sizeS2M(&m->rx);
m->rpkt = emalloc(n);
nn = convS2M(&m->rx, m->rpkt, n);
if(nn != n)
sysfatal("sizeS2M + convS2M disagree");
sendq(m->c->outq, m);
}
void
sendomsg(Msg *m)
{
int n, nn;
n = sizeS2M(&m->tx);
m->tpkt = emalloc(n);
nn = convS2M(&m->tx, m->tpkt, n);
if(nn != n)
sysfatal("sizeS2M + convS2M disagree");
sendq(outq, m);
}
void
err(Msg *m, char *ename)
{
m->rx.type = Rerror;
m->rx.ename = ename;
m->rx.tag = m->tx.tag;
send9pmsg(m);
}
2003-12-06 18:08:52 +00:00
void
connthread(void *arg)
{
int i, fd;
Conn *c;
Hash *h, *hnext;
Msg *m, *om, *mm;
2003-12-06 18:08:52 +00:00
Fid *f;
Ioproc *io;
2003-12-06 18:08:52 +00:00
c = arg;
2004-12-28 01:37:18 +00:00
threadsetname("conn %s", c->dir);
io = ioproc();
fd = ioaccept(io, c->fd, c->dir);
2003-12-06 18:08:52 +00:00
if(fd < 0){
2005-01-04 21:24:52 +00:00
if(verbose) fprint(2, "%T accept %s: %r\n", c->dir);
2003-12-06 18:08:52 +00:00
goto out;
}
close(c->fd);
c->fd = fd;
threadcreate(connoutthread, c, STACK);
while((m = mread9p(io, c->fd)) != nil){
2005-01-04 21:24:52 +00:00
if(verbose > 1) fprint(2, "%T fd#%d -> %F\n", c->fd, &m->tx);
2003-12-06 18:08:52 +00:00
m->c = c;
m->ctag = m->tx.tag;
2003-12-06 18:08:52 +00:00
c->nmsg++;
2005-01-04 21:24:52 +00:00
if(verbose > 1) fprint(2, "%T fd#%d: new msg %p\n", c->fd, m);
2003-12-06 18:08:52 +00:00
if(puthash(c->tag, m->tx.tag, m) < 0){
err(m, "duplicate tag");
continue;
}
2005-01-04 21:24:52 +00:00
msgincref(m);
2003-12-06 18:08:52 +00:00
switch(m->tx.type){
case Tversion:
m->rx.tag = m->tx.tag;
m->rx.msize = m->tx.msize;
if(m->rx.msize > msize)
m->rx.msize = msize;
m->rx.version = "9P2000";
m->rx.type = Rversion;
send9pmsg(m);
continue;
2003-12-06 18:08:52 +00:00
case Tflush:
if((m->oldm = gethash(c->tag, m->tx.oldtag)) == nil){
m->rx.tag = m->tx.tag;
m->rx.type = Rflush;
send9pmsg(m);
2003-12-06 18:08:52 +00:00
continue;
}
2005-01-04 21:24:52 +00:00
msgincref(m->oldm);
2003-12-06 18:08:52 +00:00
break;
case Tattach:
m->afid = nil;
if(m->tx.afid != NOFID
&& (m->afid = gethash(c->fid, m->tx.afid)) == nil){
err(m, "unknown fid");
continue;
}
2003-12-06 18:08:52 +00:00
m->fid = fidnew(m->tx.fid);
if(puthash(c->fid, m->tx.fid, m->fid) < 0){
err(m, "duplicate fid");
continue;
}
m->fid->ref++;
break;
case Twalk:
if((m->fid = gethash(c->fid, m->tx.fid)) == nil){
err(m, "unknown fid");
continue;
}
m->fid->ref++;
2003-12-06 18:08:52 +00:00
if(m->tx.newfid == m->tx.fid){
m->fid->ref++;
m->newfid = m->fid;
}else{
m->newfid = fidnew(m->tx.newfid);
if(puthash(c->fid, m->tx.newfid, m->newfid) < 0){
err(m, "duplicate fid");
continue;
}
m->newfid->ref++;
}
break;
case Tauth:
m->afid = fidnew(m->tx.afid);
if(puthash(c->fid, m->tx.afid, m->afid) < 0){
2003-12-06 18:08:52 +00:00
err(m, "duplicate fid");
continue;
}
m->afid->ref++;
2003-12-06 18:08:52 +00:00
break;
case Topenfd:
if(m->tx.mode&~(OTRUNC|3)){
err(m, "bad openfd mode");
continue;
}
m->isopenfd = 1;
m->tx.type = Topen;
m->tpkt[4] = Topen;
/* fall through */
case Tcreate:
2003-12-06 18:08:52 +00:00
case Topen:
case Tclunk:
case Tread:
case Twrite:
case Tremove:
2003-12-06 18:08:52 +00:00
case Tstat:
case Twstat:
if((m->fid = gethash(c->fid, m->tx.fid)) == nil){
err(m, "unknown fid");
continue;
}
m->fid->ref++;
break;
}
/* have everything - translate and send */
m->c = c;
m->ctag = m->tx.tag;
m->tx.tag = m->tag;
if(m->fid)
m->tx.fid = m->fid->fid;
if(m->newfid)
m->tx.newfid = m->newfid->fid;
if(m->afid)
m->tx.afid = m->afid->fid;
if(m->oldm)
m->tx.oldtag = m->oldm->tag;
/* reference passes to outq */
2003-12-06 18:08:52 +00:00
sendq(outq, m);
while(c->nmsg >= MAXMSG){
c->inputstalled = 1;
recvp(c->inc);
}
}
2005-01-04 21:24:52 +00:00
if(verbose) fprint(2, "%T fd#%d eof; flushing conn\n", c->fd);
/* flush the output queue */
sendq(c->outq, nil);
while(c->outq != nil)
yield();
2003-12-06 18:08:52 +00:00
/* flush all outstanding messages */
for(i=0; i<NHASH; i++){
for(h=c->tag[i]; h; h=hnext){
2003-12-06 18:08:52 +00:00
om = h->v;
2005-01-04 21:24:52 +00:00
m = msgnew(0);
2003-12-06 18:08:52 +00:00
m->internal = 1;
m->c = c;
c->nmsg++;
2003-12-06 18:08:52 +00:00
m->tx.type = Tflush;
m->tx.tag = m->tag;
m->tx.oldtag = om->tag;
m->oldm = om;
2005-01-04 21:24:52 +00:00
msgincref(om);
msgincref(m); /* for outq */
sendomsg(m);
mm = recvp(c->internal);
assert(mm == m);
msgput(m); /* got from recvp */
msgput(m); /* got from msgnew */
msgput(om); /* got from hash table */
hnext = h->next;
free(h);
2003-12-06 18:08:52 +00:00
}
}
/* clunk all outstanding fids */
for(i=0; i<NHASH; i++){
for(h=c->fid[i]; h; h=hnext){
2003-12-06 18:08:52 +00:00
f = h->v;
2005-01-04 21:24:52 +00:00
m = msgnew(0);
2003-12-06 18:08:52 +00:00
m->internal = 1;
m->c = c;
c->nmsg++;
2003-12-06 18:08:52 +00:00
m->tx.type = Tclunk;
m->tx.tag = m->tag;
m->tx.fid = f->fid;
m->fid = f;
f->ref++;
2005-01-04 21:24:52 +00:00
msgincref(m);
sendomsg(m);
mm = recvp(c->internal);
assert(mm == m);
2005-01-04 21:24:52 +00:00
msgclear(m);
msgput(m); /* got from recvp */
msgput(m); /* got from msgnew */
fidput(f); /* got from hash table */
hnext = h->next;
free(h);
2003-12-06 18:08:52 +00:00
}
}
out:
2005-01-04 21:24:52 +00:00
closeioproc(io);
2003-12-06 18:08:52 +00:00
assert(c->nmsg == 0);
assert(c->nfid == 0);
close(c->fd);
chanfree(c->internal);
c->internal = 0;
chanfree(c->inc);
c->inc = 0;
free(c->inq);
c->inq = 0;
free(c);
}
static void
openfdthread(void *v)
{
Conn *c;
Fid *fid;
Msg *m;
int n;
vlong tot;
Ioproc *io;
char buf[1024];
c = v;
fid = c->fdfid;
io = ioproc();
2004-12-28 01:37:18 +00:00
threadsetname("openfd %s", c->fdfid);
tot = 0;
m = nil;
if(c->fdmode == OREAD){
for(;;){
2005-01-04 21:24:52 +00:00
if(verbose) fprint(2, "%T tread...");
m = msgnew(0);
m->internal = 1;
m->c = c;
m->tx.type = Tread;
m->tx.count = msize - IOHDRSZ;
m->tx.fid = fid->fid;
m->tx.tag = m->tag;
m->tx.offset = tot;
m->fid = fid;
fid->ref++;
2005-01-04 21:24:52 +00:00
msgincref(m);
sendomsg(m);
recvp(c->internal);
if(m->rx.type == Rerror){
2005-01-04 21:24:52 +00:00
// fprint(2, "%T read error: %s\n", m->rx.ename);
break;
}
if(m->rx.count == 0)
break;
tot += m->rx.count;
if(iowrite(io, c->fd, m->rx.data, m->rx.count) != m->rx.count){
2005-01-04 21:24:52 +00:00
// fprint(2, "%T pipe write error: %r\n");
break;
}
msgput(m);
msgput(m);
m = nil;
}
}else{
for(;;){
2005-01-04 21:24:52 +00:00
if(verbose) fprint(2, "%T twrite...");
n = sizeof buf;
if(n > msize)
n = msize;
if((n=ioread(io, c->fd, buf, n)) <= 0){
if(n < 0)
2005-01-04 21:24:52 +00:00
fprint(2, "%T pipe read error: %r\n");
break;
}
2005-01-04 21:24:52 +00:00
m = msgnew(0);
m->internal = 1;
m->c = c;
m->tx.type = Twrite;
m->tx.fid = fid->fid;
m->tx.data = buf;
m->tx.count = n;
m->tx.tag = m->tag;
m->tx.offset = tot;
m->fid = fid;
fid->ref++;
2005-01-04 21:24:52 +00:00
msgincref(m);
sendomsg(m);
recvp(c->internal);
if(m->rx.type == Rerror){
2005-01-04 21:24:52 +00:00
// fprint(2, "%T write error: %s\n", m->rx.ename);
}
tot += n;
msgput(m);
msgput(m);
m = nil;
}
}
2005-01-04 21:24:52 +00:00
if(verbose) fprint(2, "%T eof on %d fid %d\n", c->fd, fid->fid);
close(c->fd);
closeioproc(io);
if(m){
msgput(m);
msgput(m);
}
2005-01-04 21:24:52 +00:00
if(verbose) fprint(2, "%T eof on %d fid %d ref %d\n", c->fd, fid->fid, fid->ref);
if(--fid->openfd == 0){
2005-01-04 21:24:52 +00:00
m = msgnew(0);
m->internal = 1;
m->c = c;
m->tx.type = Tclunk;
m->tx.tag = m->tag;
m->tx.fid = fid->fid;
m->fid = fid;
fid->ref++;
2005-01-04 21:24:52 +00:00
msgincref(m);
sendomsg(m);
recvp(c->internal);
msgput(m);
msgput(m);
}
fidput(fid);
c->fdfid = nil;
chanfree(c->internal);
c->internal = 0;
2003-12-06 18:08:52 +00:00
free(c);
}
int
xopenfd(Msg *m)
{
char errs[ERRMAX];
int n, p[2];
Conn *nc;
if(pipe(p) < 0){
rerrstr(errs, sizeof errs);
err(m, errs);
/* XXX return here? */
}
2005-01-04 21:24:52 +00:00
if(verbose) fprint(2, "%T xopen pipe %d %d...", p[0], p[1]);
/* now we're committed. */
/* a new connection for this fid */
nc = emalloc(sizeof(Conn));
nc->internal = chancreate(sizeof(void*), 0);
/* a ref for us */
nc->fdfid = m->fid;
m->fid->ref++;
nc->fdfid->openfd++;
nc->fdmode = m->tx.mode;
nc->fd = p[0];
/* a thread to tend the pipe */
threadcreate(openfdthread, nc, STACK);
/* if mode is ORDWR, that openfdthread will write; start a reader */
if((m->tx.mode&3) == ORDWR){
nc = emalloc(sizeof(Conn));
nc->internal = chancreate(sizeof(void*), 0);
nc->fdfid = m->fid;
m->fid->ref++;
nc->fdfid->openfd++;
nc->fdmode = OREAD;
nc->fd = dup(p[0], -1);
threadcreate(openfdthread, nc, STACK);
}
/* steal fid from other connection */
if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
fidput(m->fid);
/* rewrite as Ropenfd */
m->rx.type = Ropenfd;
n = GBIT32(m->rpkt);
m->rpkt = erealloc(m->rpkt, n+4);
PBIT32(m->rpkt+n, p[1]);
n += 4;
PBIT32(m->rpkt, n);
m->rpkt[4] = Ropenfd;
m->rx.unixfd = p[1];
return 0;
2003-12-06 18:08:52 +00:00
}
void
connoutthread(void *arg)
{
int err;
Conn *c;
Queue *outq;
2003-12-06 18:08:52 +00:00
Msg *m, *om;
Ioproc *io;
2003-12-06 18:08:52 +00:00
c = arg;
outq = c->outq;
io = ioproc();
2004-12-28 01:37:18 +00:00
threadsetname("connout %s", c->dir);
while((m = recvq(outq)) != nil){
2003-12-06 18:08:52 +00:00
err = m->tx.type+1 != m->rx.type;
if(!err && m->isopenfd)
if(xopenfd(m) < 0)
continue;
2003-12-06 18:08:52 +00:00
switch(m->tx.type){
case Tflush:
om = m->oldm;
if(om)
if(delhash(om->c->tag, om->ctag, om) == 0)
msgput(om);
2003-12-06 18:08:52 +00:00
break;
case Tclunk:
case Tremove:
if(m->fid)
if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
fidput(m->fid);
2003-12-06 18:08:52 +00:00
break;
case Tauth:
if(err && m->afid){
2005-01-04 21:24:52 +00:00
fprint(2, "%T auth error\n");
if(delhash(m->c->fid, m->afid->cfid, m->afid) == 0)
2003-12-06 18:08:52 +00:00
fidput(m->fid);
}
break;
2003-12-06 18:08:52 +00:00
case Tattach:
if(err && m->fid)
2003-12-06 18:08:52 +00:00
if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
fidput(m->fid);
break;
case Twalk:
if(err && m->tx.fid != m->tx.newfid && m->newfid)
2003-12-06 18:08:52 +00:00
if(delhash(m->c->fid, m->newfid->cfid, m->newfid) == 0)
fidput(m->newfid);
break;
}
if(delhash(m->c->tag, m->ctag, m) == 0)
msgput(m);
2005-01-04 21:24:52 +00:00
if(verbose > 1) fprint(2, "%T fd#%d <- %F\n", c->fd, &m->rx);
rewritehdr(&m->rx, m->rpkt);
if(mwrite9p(io, c->fd, m->rpkt) < 0)
2005-01-04 21:24:52 +00:00
if(verbose) fprint(2, "%T write error: %r\n");
2003-12-06 18:08:52 +00:00
msgput(m);
if(c->inputstalled && c->nmsg < MAXMSG)
nbsendp(c->inc, 0);
}
closeioproc(io);
free(outq);
c->outq = nil;
2003-12-06 18:08:52 +00:00
}
void
outputthread(void *arg)
{
Msg *m;
Ioproc *io;
2003-12-06 18:08:52 +00:00
USED(arg);
io = ioproc();
2004-12-28 01:37:18 +00:00
threadsetname("output");
2003-12-06 18:08:52 +00:00
while((m = recvq(outq)) != nil){
2005-01-04 21:24:52 +00:00
if(verbose > 1) fprint(2, "%T * <- %F\n", &m->tx);
rewritehdr(&m->tx, m->tpkt);
if(mwrite9p(io, 1, m->tpkt) < 0)
2003-12-06 18:08:52 +00:00
sysfatal("output error: %r");
msgput(m);
}
closeioproc(io);
2005-01-04 21:24:52 +00:00
fprint(2, "%T output eof\n");
threadexitsall(0);
2003-12-06 18:08:52 +00:00
}
void
inputthread(void *arg)
{
uchar *pkt;
int n, nn, tag;
Msg *m;
Ioproc *io;
2003-12-06 18:08:52 +00:00
2004-12-28 01:37:18 +00:00
threadsetname("input");
2005-01-04 21:24:52 +00:00
if(verbose) fprint(2, "%T input thread\n");
io = ioproc();
USED(arg);
while((pkt = read9ppkt(io, 0)) != nil){
2003-12-06 18:08:52 +00:00
n = GBIT32(pkt);
if(n < 7){
2005-01-04 21:24:52 +00:00
fprint(2, "%T short 9P packet from server\n");
2003-12-06 18:08:52 +00:00
free(pkt);
continue;
}
2005-01-04 21:24:52 +00:00
if(verbose > 2) fprint(2, "%T read %.*H\n", n, pkt);
2003-12-06 18:08:52 +00:00
tag = GBIT16(pkt+5);
if((m = msgget(tag)) == nil){
2005-01-04 21:24:52 +00:00
fprint(2, "%T unexpected 9P response tag %d\n", tag);
2003-12-06 18:08:52 +00:00
free(pkt);
continue;
}
if((nn = convM2S(pkt, n, &m->rx)) != n){
2005-01-04 21:24:52 +00:00
fprint(2, "%T bad packet - convM2S %d but %d\n", nn, n);
2003-12-06 18:08:52 +00:00
free(pkt);
msgput(m);
continue;
}
2005-01-04 21:24:52 +00:00
if(verbose > 1) fprint(2, "%T * -> %F%s\n", &m->rx,
m->internal ? " (internal)" : "");
2003-12-06 18:08:52 +00:00
m->rpkt = pkt;
m->rx.tag = m->ctag;
if(m->internal)
sendp(m->c->internal, m);
else if(m->c->outq)
sendq(m->c->outq, m);
else
msgput(m);
2003-12-06 18:08:52 +00:00
}
closeioproc(io);
2005-01-04 21:24:52 +00:00
//fprint(2, "%T input eof\n");
threadexitsall(0);
2003-12-06 18:08:52 +00:00
}
void*
gethash(Hash **ht, uint n)
{
Hash *h;
for(h=ht[n%NHASH]; h; h=h->next)
if(h->n == n)
return h->v;
return nil;
}
int
delhash(Hash **ht, uint n, void *v)
{
Hash *h, **l;
for(l=&ht[n%NHASH]; h=*l; l=&h->next)
if(h->n == n){
if(h->v != v){
2005-01-04 21:24:52 +00:00
if(verbose) fprint(2, "%T delhash %d got %p want %p\n", n, h->v, v);
return -1;
}
2003-12-06 18:08:52 +00:00
*l = h->next;
free(h);
return 0;
}
return -1;
}
int
puthash(Hash **ht, uint n, void *v)
{
Hash *h;
if(gethash(ht, n))
return -1;
h = emalloc(sizeof(Hash));
h->next = ht[n%NHASH];
h->n = n;
h->v = v;
ht[n%NHASH] = h;
return 0;
}
Fid **fidtab;
int nfidtab;
Fid *freefid;
Fid*
fidnew(int cfid)
{
Fid *f;
if(freefid == nil){
fidtab = erealloc(fidtab, (nfidtab+1)*sizeof(fidtab[0]));
2003-12-06 18:08:52 +00:00
fidtab[nfidtab] = emalloc(sizeof(Fid));
freefid = fidtab[nfidtab];
freefid->fid = nfidtab++;
2003-12-06 18:08:52 +00:00
}
f = freefid;
freefid = f->next;
f->cfid = cfid;
2003-12-06 18:08:52 +00:00
f->ref = 1;
return f;
}
void
fidput(Fid *f)
{
if(f == nil)
return;
2003-12-06 18:08:52 +00:00
assert(f->ref > 0);
if(--f->ref > 0)
return;
f->next = freefid;
f->cfid = -1;
freefid = f;
}
Msg **msgtab;
int nmsgtab;
Msg *freemsg;
2005-01-04 21:24:52 +00:00
void
msgincref(Msg *m)
{
if(verbose > 1) fprint(2, "%T msgincref @0x%lux %p tag %d/%d ref %d=>%d\n",
getcallerpc(&m), m, m->tag, m->ctag, m->ref, m->ref+1);
m->ref++;
}
2003-12-06 18:08:52 +00:00
Msg*
2005-01-04 21:24:52 +00:00
msgnew(int x)
2003-12-06 18:08:52 +00:00
{
Msg *m;
if(freemsg == nil){
msgtab = erealloc(msgtab, (nmsgtab+1)*sizeof(msgtab[0]));
2003-12-06 18:08:52 +00:00
msgtab[nmsgtab] = emalloc(sizeof(Msg));
freemsg = msgtab[nmsgtab];
freemsg->tag = nmsgtab++;
2003-12-06 18:08:52 +00:00
}
m = freemsg;
freemsg = m->next;
m->ref = 1;
2005-01-04 21:24:52 +00:00
if(verbose > 1) fprint(2, "%T msgnew @0x%lux %p tag %d ref %d\n",
getcallerpc(&x), m, m->tag, m->ref);
2003-12-06 18:08:52 +00:00
return m;
}
2005-01-04 21:24:52 +00:00
/*
* Clear data associated with connections, so that
* if all msgs have been msgcleared, the connection
* can be freed. Note that this does *not* free the tpkt
* and rpkt; they are freed in msgput with the msg itself.
* The io write thread might still be holding a ref to msg
* even once the connection has finished with it.
*/
void
msgclear(Msg *m)
{
if(m->c){
m->c->nmsg--;
m->c = nil;
}
if(m->oldm){
msgput(m->oldm);
m->oldm = nil;
}
if(m->fid){
fidput(m->fid);
m->fid = nil;
}
if(m->afid){
fidput(m->afid);
m->afid = nil;
}
if(m->newfid){
fidput(m->newfid);
m->newfid = nil;
}
if(m->rx.type == Ropenfd && m->rx.unixfd >= 0){
close(m->rx.unixfd);
m->rx.unixfd = -1;
}
}
2003-12-06 18:08:52 +00:00
void
msgput(Msg *m)
{
if(m == nil)
return;
2005-01-04 21:24:52 +00:00
if(verbose > 1) fprint(2, "%T msgput 0x%lux %p tag %d/%d ref %d\n",
getcallerpc(&m), m, m->tag, m->ctag, m->ref);
2003-12-06 18:08:52 +00:00
assert(m->ref > 0);
if(--m->ref > 0)
return;
2005-01-04 21:24:52 +00:00
msgclear(m);
if(m->tpkt){
free(m->tpkt);
m->tpkt = nil;
}
if(m->rpkt){
free(m->rpkt);
m->rpkt = nil;
}
m->isopenfd = 0;
m->internal = 0;
2003-12-06 18:08:52 +00:00
m->next = freemsg;
freemsg = m;
}
Msg*
msgget(int n)
{
Msg *m;
if(n < 0 || n >= nmsgtab)
return nil;
m = msgtab[n];
if(m->ref == 0)
return nil;
2005-01-04 21:24:52 +00:00
if(verbose) fprint(2, "%T msgget %d = %p\n", n, m);
msgincref(m);
return m;
}
2003-12-06 18:08:52 +00:00
void*
emalloc(int n)
{
void *v;
v = mallocz(n, 1);
if(v == nil){
abort();
sysfatal("out of memory allocating %d", n);
}
2003-12-06 18:08:52 +00:00
return v;
}
void*
erealloc(void *v, int n)
{
v = realloc(v, n);
if(v == nil){
abort();
sysfatal("out of memory reallocating %d", n);
}
2003-12-06 18:08:52 +00:00
return v;
}
typedef struct Qel Qel;
struct Qel
{
Qel *next;
void *p;
};
struct Queue
{
int hungup;
QLock lk;
Rendez r;
Qel *head;
Qel *tail;
};
Queue*
qalloc(void)
{
Queue *q;
q = mallocz(sizeof(Queue), 1);
if(q == nil)
return nil;
q->r.l = &q->lk;
return q;
}
int
sendq(Queue *q, void *p)
{
Qel *e;
e = emalloc(sizeof(Qel));
qlock(&q->lk);
if(q->hungup){
werrstr("hungup queue");
qunlock(&q->lk);
return -1;
}
e->p = p;
e->next = nil;
if(q->head == nil)
q->head = e;
else
q->tail->next = e;
q->tail = e;
rwakeup(&q->r);
qunlock(&q->lk);
return 0;
}
void*
recvq(Queue *q)
{
void *p;
Qel *e;
qlock(&q->lk);
while(q->head == nil && !q->hungup)
rsleep(&q->r);
if(q->hungup){
qunlock(&q->lk);
return nil;
}
e = q->head;
q->head = e->next;
qunlock(&q->lk);
p = e->p;
free(e);
return p;
}
uchar*
read9ppkt(Ioproc *io, int fd)
{
uchar buf[4], *pkt;
int n, nn;
n = ioreadn(io, fd, buf, 4);
if(n != 4)
return nil;
n = GBIT32(buf);
pkt = emalloc(n);
PBIT32(pkt, n);
nn = ioreadn(io, fd, pkt+4, n-4);
if(nn != n-4){
free(pkt);
return nil;
}
/* would do this if we ever got one of these, but we only generate them
if(pkt[4] == Ropenfd){
newfd = iorecvfd(io, fd);
PBIT32(pkt+n-4, newfd);
}
*/
return pkt;
}
Msg*
mread9p(Ioproc *io, int fd)
{
int n, nn;
uchar *pkt;
Msg *m;
if((pkt = read9ppkt(io, fd)) == nil)
return nil;
2005-01-04 21:24:52 +00:00
m = msgnew(0);
m->tpkt = pkt;
n = GBIT32(pkt);
nn = convM2S(pkt, n, &m->tx);
if(nn != n){
2005-01-04 21:24:52 +00:00
fprint(2, "%T read bad packet from %d\n", fd);
return nil;
}
return m;
}
int
mwrite9p(Ioproc *io, int fd, uchar *pkt)
{
int n, nfd;
n = GBIT32(pkt);
2005-01-04 21:24:52 +00:00
if(verbose > 2) fprint(2, "%T write %d %d %.*H\n", fd, n, n, pkt);
if(iowrite(io, fd, pkt, n) != n){
2005-01-04 21:24:52 +00:00
fprint(2, "%T write error: %r\n");
return -1;
}
if(pkt[4] == Ropenfd){
nfd = GBIT32(pkt+n-4);
if(iosendfd(io, fd, nfd) < 0){
2005-01-04 21:24:52 +00:00
fprint(2, "%T send fd error: %r\n");
return -1;
}
}
return 0;
}
void
restring(uchar *pkt, int pn, char *s)
{
int n;
if(s < (char*)pkt || s >= (char*)pkt+pn)
return;
n = strlen(s);
memmove(s+1, s, n);
PBIT16((uchar*)s-1, n);
}
void
rewritehdr(Fcall *f, uchar *pkt)
{
int i, n;
n = GBIT32(pkt);
PBIT16(pkt+5, f->tag);
switch(f->type){
case Tversion:
case Rversion:
restring(pkt, n, f->version);
break;
case Tauth:
PBIT32(pkt+7, f->afid);
restring(pkt, n, f->uname);
restring(pkt, n, f->aname);
break;
case Tflush:
PBIT16(pkt+7, f->oldtag);
break;
case Tattach:
restring(pkt, n, f->uname);
restring(pkt, n, f->aname);
PBIT32(pkt+7, f->fid);
PBIT32(pkt+11, f->afid);
break;
case Twalk:
PBIT32(pkt+7, f->fid);
PBIT32(pkt+11, f->newfid);
for(i=0; i<f->nwname; i++)
restring(pkt, n, f->wname[i]);
break;
case Tcreate:
restring(pkt, n, f->name);
/* fall through */
case Topen:
case Tread:
case Twrite:
case Tclunk:
case Tremove:
case Tstat:
case Twstat:
PBIT32(pkt+7, f->fid);
break;
case Rerror:
restring(pkt, n, f->ename);
break;
}
}
static long
_iolisten(va_list *arg)
{
char *a, *b;
a = va_arg(*arg, char*);
b = va_arg(*arg, char*);
return listen(a, b);
}
int
iolisten(Ioproc *io, char *a, char *b)
{
return iocall(io, _iolisten, a, b);
}
static long
_ioaccept(va_list *arg)
{
int fd;
char *dir;
fd = va_arg(*arg, int);
dir = va_arg(*arg, char*);
return accept(fd, dir);
}
int
ioaccept(Ioproc *io, int fd, char *dir)
{
return iocall(io, _ioaccept, fd, dir);
}
2005-01-04 21:24:52 +00:00
int
timefmt(Fmt *fmt)
{
static char *mon[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun",
"Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
Tm tm;
tm = *localtime(time(0));
return fmtprint(fmt, "%s %2d %02d:%02d:%02d",
mon[tm.mon], tm.mday, tm.hour, tm.min, tm.sec);
}