From 51319cc5b56924b03d9c6188f6a1e189a497d42c Mon Sep 17 00:00:00 2001 From: Ori Bernstein Date: Sat, 23 Jan 2021 16:05:21 -0800 Subject: [PATCH] upas/runq: bring back -a Turns out -a is useful in crontab, so bring back a simplified version of it. This only iterates through directories one at a time. --- sys/man/8/qer | 57 ++++++++------- sys/src/cmd/upas/q/runq.c | 142 ++++++++++++++++++++++++-------------- 2 files changed, 124 insertions(+), 75 deletions(-) diff --git a/sys/man/8/qer b/sys/man/8/qer index 675286bc4..ba4a17e26 100644 --- a/sys/man/8/qer +++ b/sys/man/8/qer @@ -15,7 +15,7 @@ qer, runq \- queue management for spooled files .br .B runq [ -.B -dER +.B -adER ] [ .B -f @@ -50,13 +50,7 @@ separated by spaces. The data file contains the standard input to .IR qer . The files are created in the directory -.IR root / subdir , -where -.I subdir -is the argument to -.B -q -if present, else the contents of -.BR /dev/user . +.IR root / subdi The names of the control and data files differ only in the first character which is `C' and `D' respectively. .IR Mktemp (2) @@ -77,6 +71,18 @@ of the copies differ from the name of the data file only in the first character. The first one starts with 'F', the second 'G', etc. .P +Qer takes the following arguments: +.TP +.B -q subdir +Specifies the queue subdirectory to use. If +unspecified, the contents of +.B /dev/user +are used. +.TP +.B -f file +Specifies the files to copy into the queue +directory, in the manner described above. +.P .I Runq processes the files queued by .IR qer . @@ -124,32 +130,34 @@ a data file younger than one hour will not be processed if its error file exists and was last modified within the preceding 10 minutes. A data file older than one hour will not be processed if its error file exists and was last modified within the preceding hour. -The +.PP +The following flags are accepted by runq: +.TP +.B -a +Causes runq to process all user directories in sequence, instead +of only the directory of the current user. +.TP .B -E -flag causes all files to be reprocessed regardless of +Causes all files to be reprocessed regardless of the file times. -.P -The +.TP .B -R -flag instructs +Instructs .I runq never to give up on a failed queue job, instead leaving it in the queue to be retried. -.P -The +.TP .B -d -option causes debugging output on standard error +Causes debugging output on standard error describing the progress through the queues. -.P -The +.TP .B -t -flags specifies the number of hours +Specifies the number of hours that retries will continue after a send failure. The default is 48 hours. -.P -The +.TP .BR -r -flag limits the number of files that are processed in a single pass of a queue. +Limits the number of files that are processed in a single pass of a queue. .I Runq accumulates the entire directory containing a queue before processing any files. When a queue contains many files and the system does not @@ -161,10 +169,9 @@ to process the directory in chunks, allowing the queue to be drained incrementally. It is most useful in combination with the .I -q flag. -.P -The argument following the +.TP .B -n -flag specifies the number of queued jobs that are processed +Specifies the number of queued jobs that are processed in parallel from the queue; the default is 1. This is useful for a large queue to be processed with a bounded amount of parallelism. diff --git a/sys/src/cmd/upas/q/runq.c b/sys/src/cmd/upas/q/runq.c index 16445ea36..e0dc777a9 100644 --- a/sys/src/cmd/upas/q/runq.c +++ b/sys/src/cmd/upas/q/runq.c @@ -2,6 +2,13 @@ #include typedef struct Job Job; +typedef struct Wdir Wdir; + +struct Wdir { + Dir *d; + int nd; + char *name; +}; struct Job { Job *next; @@ -10,6 +17,7 @@ struct Job { int dfd; char **av; char *buf; /* backing for av */ + Wdir *wdir; /* work dir */ Dir *dp; /* not owned */ Mlock *l; Biobuf *b; @@ -17,30 +25,23 @@ struct Job { void doalldirs(void); void dodir(char*); -Job* dofile(Dir*); +Job* dofile(Wdir*, Dir*); Job* donefile(Job*, Waitmsg*); void freejob(Job*); void rundir(char*); char* file(char*, char); void warning(char*, void*); void error(char*, void*); -int returnmail(char**, char*, char*); -void logit(char*, char*, char**); +int returnmail(char**, Wdir*, char*, char*); +void logit(char*, Wdir*, char*, char**); void doload(int); -#define HUNK 32 char *cmd; char *root; int debug; int giveup = 2*24*60*60; int limit; -/* the current directory */ -Dir *dirbuf; -long ndirbuf = 0; -int nfiles; -char *curdir; - char *runqlog = "runq"; char **badsys; /* array of recalcitrant systems */ @@ -48,6 +49,7 @@ int nbad; int njob = 1; /* number of concurrent jobs to invoke */ int Eflag; /* ignore E.xxxxxx dates */ int Rflag; /* no giving up, ever */ +int aflag; /* do all dirs */ void usage(void) @@ -82,27 +84,37 @@ main(int argc, char **argv) case 'q': qdir = EARGF(usage()); break; + case 'a': + aflag++; + break; case 'n': njob = atoi(EARGF(usage())); if(njob == 0) usage(); break; + default: + usage(); + break; }ARGEND; if(argc != 2) usage(); - if(qdir == nil) + if(!aflag && qdir == nil){ qdir = getuser(); - if(qdir == nil) - error("unknown user", 0); + if(qdir == nil) + error("unknown user", 0); + } root = argv[0]; cmd = argv[1]; if(chdir(root) < 0) error("can't cd to %s", root); - dodir(qdir); + if(aflag) + doalldirs(); + else + dodir(qdir); exits(0); } @@ -128,14 +140,42 @@ emptydir(char *name) return 0; } +/* + * run all user directories, must be bootes (or root on unix) to do this + */ +void +doalldirs(void) +{ + Dir *db; + int fd; + long i, n; + + + if((fd = open(".", OREAD)) == -1) + warning("opening %s", root); + return; + } + if((n = dirreadall(fd, &db)) == -1){ + warning("reading %s: ", root); + return; + } + for(i=0; i 0 || fidx< nfiles){ - while(fidx< nfiles && nlive < njob){ - if(strncmp(dirbuf[fidx].name, "C.", 2) != 0){ + wd.name = name; + wd.nd = dirreadall(fd, &wd.d); + while(nlive > 0 || fidx< wd.nd){ + while(fidx< wd.nd && nlive < njob){ + if(strncmp(wd.d[fidx].name, "C.", 2) != 0){ fidx++; continue; } - if((j = dofile(&dirbuf[fidx])) != nil){ + if((j = dofile(&wd, &wd.d[fidx])) != nil){ nlive++; j->next = hd; hd = j; @@ -201,7 +243,7 @@ rescan: goto rescan; } assert(hd == nil); - free(dirbuf); + free(wd.d); close(fd); } @@ -209,15 +251,15 @@ rescan: * free files matching name in the current directory */ void -remmatch(char *name) +remmatch(Wdir *w, char *name) { long i; - syslog(0, runqlog, "removing %s/%s", curdir, name); + syslog(0, runqlog, "removing %s/%s", w->name, name); - for(i=0; ind; i++){ + if(strcmp(&w->d[i].name[1], &name[1]) == 0) + remove(w->d[i].name); } /* error file (may have) appeared after we read the directory */ @@ -263,7 +305,7 @@ keeplockalive(char *path, int fd) * tracks the running pid. */ Job* -dofile(Dir *dp) +dofile(Wdir *w, Dir *dp) { int dtime, efd, i, etime; Job *j; @@ -280,13 +322,13 @@ dofile(Dir *dp) d = dirstat(file(dp->name, 'D')); if(d == nil){ syslog(0, runqlog, "no data file for %s", dp->name); - remmatch(dp->name); + remmatch(w, dp->name); return nil; } if(dp->length == 0){ if(time(0)-dp->mtime > 15*60){ syslog(0, runqlog, "empty ctl file for %s", dp->name); - remmatch(dp->name); + remmatch(w, dp->name); } return nil; } @@ -338,7 +380,7 @@ dofile(Dir *dp) * - read args into (malloc'd) buffer * - malloc a vector and copy pointers to args into it */ - + j->wdir = w; j->buf = malloc(dp->length+1); if(j->buf == nil){ warning("buffer allocation", 0); @@ -381,9 +423,9 @@ dofile(Dir *dp) j->av[j->ac] = 0; if(!Eflag &&time(0) - dtime > giveup){ - if(returnmail(j->av, dp->name, "Giveup") != 0) - logit("returnmail failed", dp->name, j->av); - remmatch(dp->name); + if(returnmail(j->av, w, dp->name, "Giveup") != 0) + logit("returnmail failed", w, dp->name, j->av); + remmatch(w, dp->name); goto done; } @@ -415,7 +457,7 @@ dofile(Dir *dp) fprint(2, " %s", j->av[i]); fprint(2, "\n"); } - logit("execing", dp->name, j->av); + logit("execing", w, dp->name, j->av); close(0); dup(j->dfd, 0); close(j->dfd); @@ -461,9 +503,9 @@ donefile(Job *j, Waitmsg *wm) fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg); if(!Rflag && strstr(wm->msg, "Retry")==0){ /* return the message and remove it */ - if(returnmail(j->av, j->dp->name, wm->msg) != 0) - logit("returnmail failed", j->dp->name, j->av); - remmatch(j->dp->name); + if(returnmail(j->av, j->wdir, j->dp->name, wm->msg) != 0) + logit("returnmail failed", j->wdir, j->dp->name, j->av); + remmatch(j->wdir, j->dp->name); } else { /* add sys to bad list and try again later */ nbad++; @@ -472,7 +514,7 @@ donefile(Job *j, Waitmsg *wm) } } else { /* it worked remove the message */ - remmatch(j->dp->name); + remmatch(j->wdir, j->dp->name); } n = j->next; freejob(j); @@ -520,7 +562,7 @@ file(char *name, char type) * return 0 if successful */ int -returnmail(char **av, char *name, char *msg) +returnmail(char **av, Wdir *w, char *name, char *msg) { char buf[256], attachment[Pathlen], *sender; int i, fd, pfd[2]; @@ -529,7 +571,7 @@ returnmail(char **av, char *name, char *msg) String *s; if(av[1] == 0 || av[2] == 0){ - logit("runq - dumping bad file", name, av); + logit("runq - dumping bad file", w, name, av); return 0; } @@ -537,21 +579,21 @@ returnmail(char **av, char *name, char *msg) sender = s_to_c(s); if(!returnable(sender) || strcmp(sender, "postmaster") == 0) { - logit("runq - dumping p to p mail", name, av); + logit("runq - dumping p to p mail", w, name, av); return 0; } if(pipe(pfd) < 0){ - logit("runq - pipe failed", name, av); + logit("runq - pipe failed", w, name, av); return -1; } switch(rfork(RFFDG|RFPROC|RFENVG)){ case -1: - logit("runq - fork failed", name, av); + logit("runq - fork failed", w, name, av); return -1; case 0: - logit("returning", name, av); + logit("returning", w, name, av); close(pfd[1]); close(0); dup(pfd[0], 0); @@ -592,14 +634,14 @@ out: wm = wait(); if(wm == nil){ syslog(0, "runq", "wait: %r"); - logit("wait failed", name, av); + logit("wait failed", w, name, av); return -1; } i = 0; if(wm->msg[0]){ i = -1; syslog(0, "runq", "returnmail child: %s", wm->msg); - logit("returnmail child failed", name, av); + logit("returnmail child failed", w, name, av); } free(wm); return i; @@ -635,12 +677,12 @@ error(char *f, void *a) } void -logit(char *msg, char *file, char **av) +logit(char *msg, Wdir *w, char *file, char **av) { int n, m; char buf[256]; - n = snprint(buf, sizeof(buf), "%s/%s: %s", curdir, file, msg); + n = snprint(buf, sizeof(buf), "%s/%s: %s", w->name, file, msg); for(; *av; av++){ m = strlen(*av); if(n + m + 4 > sizeof(buf))