gefs: properly close and free connections

Before, we would just leak all the "Conn"
structures.

fshangup() could cause problems as it just
forcefully closes the file descriptors,
not considering someone else going to
write them afterwards.

Instead, we add a "hangup" flag to Conn,
which readers and writers check before
attempting i/o.

And we only close the file-descriptors
when the last reader/writer drops the
connection. (Make it ref-counted).

For faster teardown, also preserve the
"ctl" file descriptor from listen()
amd use it to kill the connection quickly
when fshangup() is called.
This commit is contained in:
cinap_lenrek 2024-08-28 18:34:14 +00:00
parent b061a21bfa
commit d8cf26110d
4 changed files with 91 additions and 41 deletions

View file

@ -654,12 +654,18 @@ struct Mount {
struct Conn {
Conn *next;
QLock wrlk;
int rfd;
int wfd;
int cfd;
int iounit;
int versioned;
int authok;
int hangup;
long ref;
/* fid hash table */
Lock fidtablk[Nfidtab];

View file

@ -94,7 +94,8 @@ void compresslog(Arena*);
void dlsync(void);
void setval(Blk*, Kvp*);
Conn* newconn(int, int);
Conn* newconn(int, int, int);
void putconn(Conn*);
int walk1(Tree*, vlong, char*, Qid*, vlong*);
void loadusers(int, Tree*);

View file

@ -344,38 +344,23 @@ fshangup(Conn *c, char *fmt, ...)
{
char buf[ERRMAX];
va_list ap;
Amsg *a;
Fid *f;
int i;
c->hangup = 1;
va_start(ap, fmt);
vsnprint(buf, sizeof(buf), fmt, ap);
va_end(ap);
fprint(2, "hangup: %s\n", buf);
close(c->rfd);
close(c->wfd);
for(i = 0; i < Nfidtab; i++){
lock(&c->fidtablk[i]);
for(f = c->fidtab[i]; f != nil; f = f->next){
lock(f);
if(waserror()){
unlock(f);
continue;
}
a = nil;
clunkfid(c, f, &a);
unlock(f);
if(a != nil)
chsend(fs->admchan, a);
nexterror();
}
unlock(&c->fidtablk[i]);
}
if(c->cfd >= 0)
hangup(c->cfd);
}
static void
respond(Fmsg *m, Fcall *r)
{
Conn *c;
RWLock *lk;
uchar buf[Max9p+IOHDRSZ];
int w, n;
@ -385,11 +370,12 @@ respond(Fmsg *m, Fcall *r)
assert(m->type+1 == r->type || r->type == Rerror);
if((n = convS2M(r, buf, sizeof(buf))) == 0)
abort();
qlock(&m->conn->wrlk);
w = write(m->conn->wfd, buf, n);
qunlock(&m->conn->wrlk);
c = m->conn;
qlock(&c->wrlk);
w = c->hangup? n: write(c->wfd, buf, n);
qunlock(&c->wrlk);
if(w != n)
fshangup(m->conn, Eio);
fshangup(c, Eio);
if(m->type == Tflush){
lk = &fs->flushq[ihash(m->oldtag) % Nflushtab];
wunlock(lk);
@ -398,6 +384,7 @@ respond(Fmsg *m, Fcall *r)
runlock(lk);
}
free(m);
putconn(c);
}
static void
@ -850,6 +837,7 @@ readmsg(Conn *c, Fmsg **pm)
free(m);
return -1;
}
aincl(&c->ref, 1);
m->conn = c;
m->sz = sz;
PBIT32(m->buf, sz);
@ -2256,22 +2244,75 @@ fsflush(Fmsg *m)
}
Conn *
newconn(int rfd, int wfd)
newconn(int rfd, int wfd, int cfd)
{
Conn *c;
if((c = mallocz(sizeof(*c), 1)) == nil)
return nil;
c->rfd = rfd;
c->wfd = wfd;
c->cfd = cfd;
c->iounit = Max9p;
c->next = fs->conns;
c->ref = 1;
lock(&fs->connlk);
c->next = fs->conns;
fs->conns = c;
unlock(&fs->connlk);
return c;
}
void
putconn(Conn *c)
{
Conn **pp;
Amsg *a;
Fid *f;
int i;
if(aincl(&c->ref, -1))
return;
lock(&fs->connlk);
for(pp = &fs->conns; *pp != nil; pp = &((*pp)->next)){
if(*pp == c){
*pp = c->next;
break;
}
}
unlock(&fs->connlk);
close(c->rfd);
if(c->rfd != c->wfd)
close(c->wfd);
if(c->cfd >= 0)
close(c->cfd);
for(i = 0; i < Nfidtab; i++){
lock(&c->fidtablk[i]);
for(f = c->fidtab[i]; f != nil; f = f->next){
lock(f);
if(waserror()){
unlock(f);
continue;
}
a = nil;
clunkfid(c, f, &a);
unlock(f);
if(a != nil)
chsend(fs->admchan, a);
nexterror();
}
unlock(&c->fidtablk[i]);
}
free(c);
}
void
runfs(int, void *pc)
{
@ -2284,20 +2325,20 @@ runfs(int, void *pc)
u32int h;
c = pc;
while(1){
while(!c->hangup){
if(readmsg(c, &m) < 0){
fshangup(c, "read message: %r");
return;
break;
}
if(m == nil)
break;
if(convM2S(m->buf, m->sz, m) == 0){
fshangup(c, "invalid message: %r");
return;
break;
}
if(m->type != Tversion && !c->versioned){
fshangup(c, "version required");
return;
break;
}
dprint("← %F\n", &m->Fcall);
@ -2350,6 +2391,7 @@ runfs(int, void *pc)
if(a != nil)
chsend(fs->admchan, a);
}
putconn(c);
}
void

View file

@ -257,21 +257,22 @@ runannounce(int, void *arg)
sysfatal("announce %s: %r", ann);
while(1){
if((lctl = listen(adir, ldir)) < 0){
fprint(2, "listen %s: %r", adir);
fprint(2, "listen %s: %r\n", adir);
break;
}
fd = accept(lctl, ldir);
close(lctl);
if(fd < 0){
fprint(2, "accept %s: %r", ldir);
fprint(2, "accept %s: %r\n", ldir);
close(lctl);
continue;
}
if(!(c = newconn(fd, fd))){
c = newconn(fd, fd, lctl);
if(c == nil){
fprint(2, "newconn: %r\n");
close(lctl);
close(fd);
fprint(2, "%r");
continue;
}
launch(runfs, c, "netio");
}
close(actl);
@ -440,12 +441,12 @@ main(int argc, char **argv)
for(i = 0; i < nann; i++)
launch(runannounce, ann[i], "announce");
if(srvfd != -1){
if((c = newconn(srvfd, srvfd)) == nil)
if((c = newconn(srvfd, srvfd, -1)) == nil)
sysfatal("%r");
launch(runfs, c, "srvio");
}
if(stdio){
if((c = newconn(0, 1)) == nil)
if((c = newconn(0, 1, -1)) == nil)
sysfatal("%r");
launch(runfs, c, "stdio");
}