Continue switching library over to pthreads when possible.

Tprimes works on Linux 2.6.  You can only have 128 procs
though.
This commit is contained in:
rsc 2004-09-17 03:34:32 +00:00
parent 06bb4ed20d
commit bcf527a98e
15 changed files with 146 additions and 561 deletions

View file

@ -454,6 +454,7 @@ struct _Procrend
int asleep;
Lock *l;
void *arg;
int pid;
#ifdef PLAN9_PTHREADS
pthread_cond_t cond;
#endif

View file

@ -24,6 +24,7 @@ extern "C" {
#include <fmt.h>
#include <math.h>
#include <ctype.h> /* for tolower */
#include <pthread.h> /* for Locks */
/*
* OS-specific crap

View file

@ -1,3 +1,6 @@
#ifndef _9PROC_H_
#define _9PROC_H_ 1
enum
{
NPRIV = 16,
@ -20,3 +23,4 @@ extern Uproc *_p9uproc(int);
extern void _p9uprocdie(void);
extern void _clearuproc(void);
#endif

View file

@ -1,197 +1,32 @@
#define ffork ffork_clone
#define getfforkid getfforkid_clone
#include "ffork-Linux-clone.c"
#undef ffork
#undef getfforkid
#define ffork ffork_pthread
#define getfforkid getfforkid_pthread
#include "ffork-pthread.c"
#undef ffork
#undef getfforkid
#ifdef OLD
/*
* Is nothing simple?
*
* We can't free the stack until we've finished executing,
* but once we've finished executing, we can't do anything
* at all, including call free. So instead we keep a linked list
* of all stacks for all processes, and every few times we try
* to allocate a new stack we scan the current stack list for
* dead processes and reclaim those stacks.
*/
#include <u.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <sched.h>
#include <signal.h>
#include <errno.h>
#include <libc.h>
#include "9proc.h"
int fforkstacksize = 16384;
typedef struct Stack Stack;
struct Stack
{
Stack *next;
Stack *fnext;
int pid;
};
static Lock stacklock;
static Stack *freestacks;
static Stack *allstacks;
static int stackmallocs;
static void gc(void);
static void*
mallocstack(void)
{
Stack *p;
lock(&stacklock);
top:
p = freestacks;
if(p)
freestacks = p->fnext;
else{
if(stackmallocs++%1 == 0)
gc();
if(freestacks)
goto top;
p = malloc(fforkstacksize);
p->next = allstacks;
allstacks = p;
}
if(p)
p->pid = 1;
unlock(&stacklock);
return p;
}
static void
gc(void)
{
Stack *p;
for(p=allstacks; p; p=p->next){
if(p->pid > 1)
if(kill(p->pid, 0) < 0 && errno == ESRCH){
if(0) fprint(2, "reclaim stack from %d\n", p->pid);
p->pid = 0;
}
if(p->pid == 0){
p->fnext = freestacks;
freestacks = p;
}
}
}
static void
freestack(void *v)
{
Stack *p;
p = v;
if(p == nil)
return;
lock(&stacklock);
p->fnext = freestacks;
p->pid = 0;
freestacks = p;
unlock(&stacklock);
return;
}
static int
tramp(void *v)
{
void (*fn)(void*), *arg;
void **v2;
void *p;
_p9uproc(0);
v2 = v;
fn = v2[0];
arg = v2[1];
p = v2[2];
free(v2);
fn(arg);
_exit(0);
return 0;
}
static int
trampnowait(void *v)
{
int pid;
int cloneflag;
void **v2;
int *pidp;
void *p;
v2 = v;
cloneflag = (int)v2[4];
pidp = v2[3];
p = v2[2];
pid = clone(tramp, p+fforkstacksize-512, cloneflag, v);
*pidp = pid;
_exit(0);
return 0;
}
extern int _islinuxnptl(void);
int
ffork(int flags, void (*fn)(void*), void *arg)
{
void **v;
char *p;
int cloneflag, pid, thepid, status, nowait;
_p9uproc(0);
p = mallocstack();
v = malloc(sizeof(void*)*5);
if(p==nil || v==nil){
freestack(p);
free(v);
return -1;
}
cloneflag = 0;
flags &= ~RFPROC;
if(flags&RFMEM){
cloneflag |= CLONE_VM;
flags &= ~RFMEM;
}
if(!(flags&RFFDG))
cloneflag |= CLONE_FILES;
if(_islinuxnptl())
return ffork_pthread(flags, fn, arg);
else
flags &= ~RFFDG;
nowait = flags&RFNOWAIT;
if(!(flags&RFNOWAIT))
cloneflag |= SIGCHLD;
else
flags &= ~RFNOWAIT;
if(flags){
fprint(2, "unknown rfork flags %x\n", flags);
freestack(p);
free(v);
return -1;
}
v[0] = fn;
v[1] = arg;
v[2] = p;
v[3] = &thepid;
v[4] = (void*)cloneflag;
thepid = -1;
pid = clone(nowait ? trampnowait : tramp, p+fforkstacksize-16, cloneflag, v);
if(pid > 0 && nowait){
if(wait4(pid, &status, __WALL, 0) < 0)
fprint(2, "ffork wait4: %r\n");
}else
thepid = pid;
if(thepid == -1)
freestack(p);
else
((Stack*)p)->pid = thepid;
return thepid;
return ffork_clone(flags, fn, arg);
}
int
getfforkid(void)
{
return getpid();
if(_islinuxnptl())
return getfforkid_pthread();
else
return getfforkid_clone();
}
#endif

View file

@ -141,7 +141,6 @@ LIB9OFILES=\
strecpy.$O\
sysfatal.$O\
sysname.$O\
tas-$OBJTYPE.$O\
time.$O\
tokenize.$O\
truerand.$O\

View file

@ -375,23 +375,3 @@ rwakeupall(Rendez *r)
;
return i;
}
void
_procsleep(_Procrend *rend)
{
//print("sleep %p %d\n", rend, getpid());
pthread_cond_init(&rend->cond, 0);
rend->asleep = 1;
while(rend->asleep)
pthread_cond_wait(&rend->cond, &rend->l->mutex);
pthread_cond_destroy(&rend->cond);
}
void
_procwakeup(_Procrend *rend)
{
//print("wakeup %p\n", rend);
rend->asleep = 0;
pthread_cond_signal(&rend->cond);
}

View file

@ -1,3 +1,60 @@
/* Could use futex(2) here instead of signals? */
/*
* On Linux 2.6 and later, we can use pthreads (in fact, we must),
* but on earlier Linux, pthreads are incompatible with using our
* own coroutines in libthread. In order to make binaries that work
* on either system, we detect the pthread library in use and call
* the appropriate functions.
*/
#include <u.h>
#include <signal.h>
#include <pthread.h>
#include <libc.h>
#define _procsleep _procsleep_signal
#define _procwakeup _procwakeup_signal
#include "rendez-signal.c"
#undef _procsleep
#undef _procwakeup
#define _procsleep _procsleep_pthread
#define _procwakeup _procwakeup_pthread
#include "rendez-pthread.c"
#undef _procsleep
#undef _procwakeup
int
_islinuxnptl(void)
{
static char buf[100];
static int isnptl = -1;
if(isnptl == -1){
if(confstr(_CS_GNU_LIBPTHREAD_VERSION, buf, sizeof buf) > 0
&& strncmp(buf, "NPTL", 4) == 0)
isnptl = 1;
else
isnptl = 0;
}
return isnptl;
}
void
_procsleep(_Procrend *r)
{
if(_islinuxnptl())
return _procsleep_pthread(r);
else
return _procsleep_signal(r);
}
void
_procwakeup(_Procrend *r)
{
if(_islinuxnptl())
return _procwakeup_pthread(r);
else
return _procwakeup_signal(r);
}

View file

@ -1,170 +1,23 @@
/*
NAME
rendezvous - user level process synchronization
SYNOPSIS
ulong rendezvous(ulong tag, ulong value)
DESCRIPTION
The rendezvous system call allows two processes to synchro-
nize and exchange a value. In conjunction with the shared
memory system calls (see segattach(2) and fork(2)), it
enables parallel programs to control their scheduling.
Two processes wishing to synchronize call rendezvous with a
common tag, typically an address in memory they share. One
process will arrive at the rendezvous first; it suspends
execution until a second arrives. When a second process
meets the rendezvous the value arguments are exchanged
between the processes and returned as the result of the
respective rendezvous system calls. Both processes are
awakened when the rendezvous succeeds.
The set of tag values which two processes may use to
rendezvous-their tag space-is inherited when a process
forks, unless RFREND is set in the argument to rfork; see
fork(2).
If a rendezvous is interrupted the return value is ~0, so
that value should not be used in normal communication.
* This assumes we're using pthreads and simulates rendezvous using
* shared memory and mutexes.
*/
#include <u.h>
#include <pthread.h>
#include <signal.h>
#include <libc.h>
enum
void
_procsleep(_Procrend *rend)
{
VOUSHASH = 257,
};
typedef struct Vous Vous;
struct Vous
{
Vous *link;
Lock lk;
ulong val;
ulong tag;
pthread_mutex_t mutex;
};
static void
ign(int x)
{
USED(x);
//print("sleep %p %d\n", rend, getpid());
pthread_cond_init(&rend->cond, 0);
rend->asleep = 1;
while(rend->asleep)
pthread_cond_wait(&rend->cond, &rend->l->mutex);
pthread_cond_destroy(&rend->cond);
}
void /*__attribute__((constructor))*/
ignusr1(void)
void
_procwakeup(_Procrend *rend)
{
signal(SIGUSR1, ign);
}
static Vous vouspool[2048];
static int nvousused;
static Vous *vousfree;
static Vous *voushash[VOUSHASH];
static Lock vouslock;
static Vous*
getvous(void)
{
Vous *v;
if(vousfree){
v = vousfree;
vousfree = v->link;
}else if(nvousused < nelem(vouspool)){
v = &vouspool[nvousused++];
pthread_mutex_init(&v->mutex, NULL);
}else
abort();
return v;
}
static void
putvous(Vous *v)
{
lock(&vouslock);
v->link = vousfree;
vousfree = v;
unlock(&vouslock);
}
static Vous*
findvous(ulong tag, ulong val, int *found)
{
int h;
Vous *v, **l;
lock(&vouslock);
h = tag%VOUSHASH;
for(l=&voushash[h], v=*l; v; l=&(*l)->link, v=*l){
if(v->tag == tag){
*l = v->link;
*found = 1;
unlock(&vouslock);
return v;
}
}
v = getvous();
v->link = voushash[h];
v->val = val;
v->tag = tag;
lock(&v->lk);
voushash[h] = v;
unlock(&vouslock);
*found = 0;
return v;
}
#define DBG 0
ulong
rendezvous(ulong tag, ulong val)
{
int found;
ulong rval;
Vous *v;
v = findvous(tag, val, &found);
if(!found){
if(DBG)fprint(2, "tag %lux, sleeping on %p\n", tag, v);
/*
* No rendezvous partner was found; the next guy
* through will find v and wake us, so we must go
* to sleep. Do this by locking the mutex (it is
* unlocked) and then locking it again (our waker will
* unlock it for us).
*/
if(pthread_mutex_lock(&v->mutex) != 0)
abort();
unlock(&v->lk);
if(pthread_mutex_lock(&v->mutex) != 0)
abort();
rval = v->val;
pthread_mutex_unlock(&v->mutex);
if(DBG)fprint(2, " awake on %p\n", v);
unlock(&v->lk);
putvous(v);
}else{
/*
* Found someone to meet. Wake him:
*
* A. lock v->lk (waits for him to lock the mutex once.
* B. unlock the mutex (wakes him up)
*/
if(DBG)fprint(2, "found tag %lux on %p, waking\n", tag, v);
lock(&v->lk);
rval = v->val;
v->val = val;
if(pthread_mutex_unlock(&v->mutex) != 0)
abort();
/* lock passes to him */
}
return rval;
//print("wakeup %p\n", rend);
rend->asleep = 0;
pthread_cond_signal(&rend->cond);
}

View file

@ -1,58 +1,9 @@
/*
NAME
rendezvous - user level process synchronization
SYNOPSIS
ulong rendezvous(ulong tag, ulong value)
DESCRIPTION
The rendezvous system call allows two processes to synchro-
nize and exchange a value. In conjunction with the shared
memory system calls (see segattach(2) and fork(2)), it
enables parallel programs to control their scheduling.
Two processes wishing to synchronize call rendezvous with a
common tag, typically an address in memory they share. One
process will arrive at the rendezvous first; it suspends
execution until a second arrives. When a second process
meets the rendezvous the value arguments are exchanged
between the processes and returned as the result of the
respective rendezvous system calls. Both processes are
awakened when the rendezvous succeeds.
The set of tag values which two processes may use to
rendezvous-their tag space-is inherited when a process
forks, unless RFREND is set in the argument to rfork; see
fork(2).
If a rendezvous is interrupted the return value is ~0, so
that value should not be used in normal communication.
* This simulates rendezvous with shared memory, sigsuspend, and SIGUSR1.
*/
#include <u.h>
#include <signal.h>
#include <libc.h>
#define DBG 0
enum
{
VOUSHASH = 257,
};
typedef struct Vous Vous;
struct Vous
{
Vous *link;
int pid;
int wokeup;
ulong tag;
ulong val1; /* value for the sleeper */
ulong val2; /* value for the waker */
};
static void
ign(int x)
{
@ -73,128 +24,40 @@ ignusr1(int restart)
sigaction(SIGUSR1, &sa, nil);
}
static Vous vouspool[2048];
static int nvousused;
static Vous *vousfree;
static Vous *voushash[VOUSHASH];
static Lock vouslock;
static Vous*
getvous(void)
void
_procsleep(_Procrend *r)
{
Vous *v;
if(vousfree){
v = vousfree;
vousfree = v->link;
}else if(nvousused < nelem(vouspool))
v = &vouspool[nvousused++];
else{
fprint(2, "rendezvous: out of vous!\n");
abort();
}
return v;
}
static void
putvous(Vous *v)
{
v->link = vousfree;
vousfree = v;
}
static Vous*
findvous(ulong tag)
{
int h;
Vous *v, **l;
h = tag%VOUSHASH;
for(l=&voushash[h], v=*l; v; l=&(*l)->link, v=*l){
if(v->tag == tag){
*l = v->link;
v->link = nil;
return v;
}
}
return nil;
}
static Vous*
mkvous(ulong tag)
{
Vous *v;
int h;
h = tag%VOUSHASH;
v = getvous();
v->link = voushash[h];
v->tag = tag;
voushash[h] = v;
return v;
}
ulong
rendezvous(ulong tag, ulong val)
{
int vpid, pid;
ulong rval;
Vous *v;
sigset_t mask;
pid = getpid();
lock(&vouslock);
if((v = findvous(tag)) == nil){
/*
* Go to sleep.
*
* Block USR1, set the handler to interrupt system calls,
* unlock the vouslock so our waker can wake us,
* and then suspend.
*/
v = mkvous(tag);
v->pid = pid;
v->val2 = val;
v->wokeup = 0;
sigprocmask(SIG_SETMASK, nil, &mask);
sigaddset(&mask, SIGUSR1);
sigprocmask(SIG_SETMASK, &mask, nil);
ignusr1(0);
if(DBG) fprint(2, "%d rv(%lux, %lux) -> s\n", pid, tag, val);
unlock(&vouslock);
sigdelset(&mask, SIGUSR1);
sigsuspend(&mask);
/*
* Go to sleep.
*
* Block USR1, set the handler to interrupt system calls,
* unlock the vouslock so our waker can wake us,
* and then suspend.
*/
r->asleep = 1;
r->pid = getpid();
/*
* We're awake. Make USR1 not interrupt system calls.
* Were we awakened or interrupted?
*/
ignusr1(1);
lock(&vouslock);
if(v->wokeup){
rval = v->val1;
if(DBG) fprint(2, "%d rv(%lux, %lux) -> g %lux\n", pid, tag, val, rval);
}else{
if(findvous(tag) != v){
fprint(2, "rendezvous: interrupted but not found in hash table\n");
abort();
}
rval = ~(ulong)0;
if(DBG) fprint(2, "%d rv(%lux, %lux) -> g i\n", pid, tag, val);
}
putvous(v);
unlock(&vouslock);
}else{
/*
* Wake up sleeper.
*/
rval = v->val2;
v->val1 = val;
vpid = v->pid;
v->wokeup = 1;
if(DBG) fprint(2, "%d rv(%lux, %lux) -> g %lux, w %d\n", pid, tag, val, rval, vpid);
unlock(&vouslock);
kill(vpid, SIGUSR1);
}
return rval;
sigprocmask(SIG_SETMASK, nil, &mask);
sigaddset(&mask, SIGUSR1);
sigprocmask(SIG_SETMASK, &mask, nil);
ignusr1(0);
unlock(r->l);
sigdelset(&mask, SIGUSR1);
sigsuspend(&mask);
/*
* We're awake. Make USR1 not interrupt system calls.
*/
ignusr1(1);
assert(r->asleep == 0);
}
void
_procwakeup(_Procrend *r)
{
r->asleep = 0;
assert(r->pid >= 1);
kill(r->pid, SIGUSR1);
}

View file

@ -29,7 +29,7 @@ canexec(Alt *a)
/* are there senders or receivers blocked? */
otherop = (CHANSND+CHANRCV) - a->op;
for(i=0; i<c->nentry; i++)
if(c->qentry[i] && c->qentry[i]->op==otherop && c->qentry[i]->thread==nil){
if(c->qentry[i] && c->qentry[i]->op==otherop && c->qentry[i]->thread->altc==nil){
_threaddebug(DBGCHAN, "can rendez alt %p chan %p", a, c);
return 1;
}
@ -460,7 +460,7 @@ altexec(Alt *a, int spl)
b = nil;
me = a->v;
for(i=0; i<c->nentry; i++)
if(c->qentry[i] && c->qentry[i]->op==otherop && c->qentry[i]->thread==nil)
if(c->qentry[i] && c->qentry[i]->op==otherop && c->qentry[i]->thread->altc==nil)
if(nrand(++n) == 0)
b = c->qentry[i];
if(b != nil){
@ -488,7 +488,7 @@ altexec(Alt *a, int spl)
altcopy(waiter, me, c->e);
}
b->thread->altc = c;
_procwakeup(&b->thread->altrend);
_threadwakeup(&b->thread->altrend);
_threaddebug(DBGCHAN, "chanlock is %lud", *(ulong*)(void*)&chanlock);
_threaddebug(DBGCHAN, "unlocking the chanlock");
unlock(&chanlock);

View file

@ -49,9 +49,7 @@ main(int argc, char **argv)
a = _threadmalloc(sizeof *a, 1);
a->argc = argc;
a->argv = argv;
malloc(10);
p = _newproc(mainlauncher, a, mainstacksize, "threadmain", 0, 0);
malloc(10);
_schedinit(p);
abort(); /* not reached */
return 0;
@ -62,9 +60,7 @@ mainlauncher(void *arg)
{
Mainarg *a;
malloc(10);
a = arg;
malloc(10);
threadmain(a->argc, a->argv);
threadexits("threadmain");
}

View file

@ -41,6 +41,9 @@ HFILES=\
<$PLAN9/src/mksyslib
tfork: tfork.$O $PLAN9/lib/$LIB
$LD -o tfork tfork.$O $LDFLAGS -lthread -l9
tprimes: tprimes.$O $PLAN9/lib/$LIB
$LD -o tprimes tprimes.$O $LDFLAGS -lthread -l9

View file

@ -36,7 +36,6 @@ _schedinit(void *arg)
unlock(&p->lock);
while(_setlabel(&p->sched))
;
malloc(10);
_threaddebug(DBGSCHED, "top of schedinit, _threadexitsallstatus=%p", _threadexitsallstatus);
if(_threadexitsallstatus)
_exits(_threadexitsallstatus);
@ -148,12 +147,10 @@ relock:
_threaddebug(DBGSCHED, "sleeping for more work (%d threads)", p->nthreads);
q->asleep = 1;
unlock(&p->readylock);
while(rendezvous((ulong)q, 0) == ~0){
if(_threadexitsallstatus)
_exits(_threadexitsallstatus);
}
/* lock picked up from _threadready */
p->rend.l = &p->readylock;
_procsleep(&p->rend);
if(_threadexitsallstatus)
_exits(_threadexitsallstatus);
}
t = q->head;
q->head = t->next;
@ -185,18 +182,15 @@ _sched(void)
Resched:
p = _threadgetproc();
//fprint(2, "p %p\n", p);
malloc(10);
if((t = p->thread) != nil){
needstack(512);
// _threaddebug(DBGSCHED, "pausing, state=%s set %p goto %p",
// psstate(t->state), &t->sched, &p->sched);
print("swap\n");
if(_setlabel(&t->sched)==0)
_gotolabel(&p->sched);
_threadstacklimit(t->stk, t->stk+t->stksize);
return p->nsched++;
}else{
malloc(10);
t = runthread(p);
if(t == nil){
_threaddebug(DBGSCHED, "all threads gone; exiting");
@ -211,8 +205,6 @@ malloc(10);
}
t->state = Running;
t->nextstate = Ready;
malloc(10);
print("gotolabel\n");
_gotolabel(&t->sched);
for(;;);
}
@ -253,13 +245,11 @@ _threadready(Thread *t)
assert(q->asleep == 1);
q->asleep = 0;
/* lock passes to runthread */
_threaddebug(DBGSCHED, "waking process %d", t->proc->pid);
while(rendezvous((ulong)q, 0) == ~0){
if(_threadexitsallstatus)
_exits(_threadexitsallstatus);
}
}else
unlock(&t->proc->readylock);
_procwakeup(&t->proc->rend);
}
unlock(&t->proc->readylock);
if(_threadexitsallstatus)
_exits(_threadexitsallstatus);
}
void

View file

@ -95,6 +95,8 @@ struct Thread
Chanstate chan; /* which channel operation is current */
Alt *alt; /* pointer to current alt structure (debugging) */
ulong userpc;
Channel *c;
pthread_cond_t cond;
void* udata[NPRIV]; /* User per-thread data pointer */
int lastfd;
@ -136,6 +138,8 @@ struct Proc
uint nextID; /* ID of most recently created thread */
Proc *next; /* linked list of Procs */
_Procrend rend; /* sleep here for more ready threads */
void *arg; /* passed between shared and unshared stk */
char str[ERRMAX]; /* used by threadexits to avoid malloc */
char errbuf[ERRMAX]; /* errstr */

View file

@ -41,7 +41,6 @@ threadmain(int argc, char **argv)
int i;
Channel *c;
malloc(10);
ARGBEGIN{
case 'D':
_threaddebuglevel = atoi(ARGF());