venti: reduce locking contention in buildindex

This commit is contained in:
Russ Cox 2008-07-03 22:44:24 -04:00
parent cd87898f5d
commit ac5a97e6b3
3 changed files with 74 additions and 13 deletions

View file

@ -229,6 +229,22 @@ markbloomfilter(Bloom *b, u8int *score)
runlock(&b->lk);
}
void
markbloomfiltern(Bloom *b, u8int score[][20], int n)
{
int i;
if(b == nil || b->data == nil)
return;
rlock(&b->lk);
qlock(&b->mod);
for(i=0; i<n; i++)
_markbloomfilter(b, score[i]);
qunlock(&b->mod);
runlock(&b->lk);
}
static void
bloomwriteproc(void *v)
{

View file

@ -11,6 +11,20 @@ enum
MaxBufSize = 4*1024*1024,
};
typedef struct IEntryBuf IEntryBuf;
struct IEntryBuf
{
IEntry ie[100];
int nie;
};
typedef struct ScoreBuf ScoreBuf;
struct ScoreBuf
{
uchar score[100][VtScoreSize];
int nscore;
};
int dumb;
int errors;
char **isect;
@ -117,10 +131,10 @@ threadmain(int argc, char *argv[])
/* start index procs */
fprint(2, "%T read index\n");
isectdonechan = chancreate(sizeof(void*), 0);
isectdonechan = chancreate(sizeof(void*), 1);
for(i=0; i<ix->nsects; i++){
if(shouldprocess(ix->sects[i])){
ix->sects[i]->writechan = chancreate(sizeof(IEntry), 0);
ix->sects[i]->writechan = chancreate(sizeof(IEntryBuf), 1);
vtproc(isectproc, ix->sects[i]);
}
}
@ -208,12 +222,17 @@ arenapartproc(void *v)
ClumpInfo *ci, *cis;
IEntry ie;
Part *p;
IEntryBuf *buf, *b;
uchar *score;
ScoreBuf sb;
p = v;
threadsetname("arenaproc %s", p->name);
buf = MKNZ(IEntryBuf, ix->nsects);
nskip = 0;
tot = 0;
sb.nscore = 0;
cis = MKN(ClumpInfo, ClumpChunks);
for(i=0; i<ix->narenas; i++){
a = ix->arenas[i];
@ -252,10 +271,23 @@ arenapartproc(void *v)
tot++;
x = indexsect(ix, ie.score);
assert(0 <= x && x < ix->nsects);
if(ix->sects[x]->writechan)
send(ix->sects[x]->writechan, &ie);
if(ix->bloom)
markbloomfilter(ix->bloom, ie.score);
if(ix->sects[x]->writechan) {
b = &buf[x];
b->ie[b->nie] = ie;
b->nie++;
if(b->nie == nelem(b->ie)) {
send(ix->sects[x]->writechan, b);
b->nie = 0;
}
}
if(ix->bloom) {
score = sb.score[sb.nscore++];
scorecp(score, ie.score);
if(sb.nscore == nelem(sb.score)) {
markbloomfiltern(ix->bloom, sb.score, sb.nscore);
sb.nscore = 0;
}
}
}
}
}
@ -264,6 +296,14 @@ arenapartproc(void *v)
}
add(&arenaentries, tot);
add(&skipentries, nskip);
for(i=0; i<ix->nsects; i++)
if(ix->sects[i]->writechan && buf[i].nie > 0)
send(ix->sects[i]->writechan, &buf[i]);
free(buf);
free(cis);
if(ix->bloom && sb.nscore > 0)
markbloomfiltern(ix->bloom, sb.score, sb.nscore);
sendp(arenadonechan, p);
}
@ -743,6 +783,7 @@ isectproc(void *v)
uchar *data, *p;
Buf *buf;
IEntry ie;
IEntryBuf ieb;
IPool *ipool;
ISect *is;
Minibuf *mbuf, *mb;
@ -837,14 +878,17 @@ isectproc(void *v)
assert(p == data+nbuf*bufsize);
n = 0;
while(recv(is->writechan, &ie) == 1){
if(ie.ia.addr == 0)
while(recv(is->writechan, &ieb) == 1){
if(ieb.nie == 0)
break;
buck = score2bucket(is, ie.score);
i = buck/bufbuckets;
assert(i < nbuf);
bwrite(&buf[i], &ie);
n++;
for(j=0; j<ieb.nie; j++){
ie = ieb.ie[j];
buck = score2bucket(is, ie.score);
i = buck/bufbuckets;
assert(i < nbuf);
bwrite(&buf[i], &ie);
n++;
}
}
add(&indexentries, n);

View file

@ -105,6 +105,7 @@ Lump *lookuplump(u8int *score, int type);
int lookupscore(u8int *score, int type, IAddr *ia);
int maparenas(AMap *am, Arena **arenas, int n, char *what);
void markbloomfilter(Bloom*, u8int*);
void markbloomfiltern(Bloom*, u8int[][20], int);
uint msec(void);
int namecmp(char *s, char *t);
void namecp(char *dst, char *src);