plan9port/src/cmd/venti/srv/bloom.c

211 lines
3.6 KiB
C
Raw Normal View History

2005-07-12 15:23:36 +00:00
/*
* Bloom filter tracking which scores are present in our arenas
* and (more importantly) which are not.
*/
#include "stdinc.h"
#include "dat.h"
#include "fns.h"
int
bloominit(Bloom *b, vlong vsize, u8int *data)
{
ulong size;
size = vsize;
if(size != vsize){ /* truncation */
werrstr("bloom data too big");
return -1;
}
b->size = size;
b->nhash = 32; /* will be fixed by caller on initialization */
if(data != nil)
if(unpackbloomhead(b, data) < 0)
return -1;
fprint(2, "bloom size %lud nhash %d\n", b->size, b->nhash);
b->mask = b->size-1;
b->data = data;
return 0;
}
void
wbbloomhead(Bloom *b)
{
packbloomhead(b, b->data);
}
Bloom*
readbloom(Part *p)
{
int i, n;
uint ones;
uchar buf[512];
uchar *data;
u32int *a;
Bloom *b;
b = vtmallocz(sizeof *b);
if(readpart(p, 0, buf, sizeof buf) < 0)
return nil;
fprint(2, "header %.16H\n", buf);
if(bloominit(b, 0, buf) < 0){
vtfree(b);
return nil;
}
data = vtmallocz(b->size);
if(readpart(p, 0, data, b->size) < 0){
vtfree(b);
vtfree(data);
return nil;
}
b->data = data;
b->part = p;
a = (u32int*)b->data;
n = b->size/4;
ones = 0;
for(i=0; i<n; i++)
ones += countbits(a[i]);
addstat(StatBloomOnes, ones);
if(b->size == MaxBloomSize) /* 2^32 overflows ulong */
addstat(StatBloomBits, b->size*8-1);
else
addstat(StatBloomBits, b->size*8);
return b;
}
int
writebloom(Bloom *b)
{
wbbloomhead(b);
return writepart(b->part, 0, b->data, b->size);
}
/*
* Derive two random 32-bit quantities a, b from the score
* and then use a+b*i as a sequence of bloom filter indices.
* Michael Mitzenmacher has a recent (2005) paper saying this is okay.
* We reserve the bottom bytes (BloomHeadSize*8 bits) for the header.
*/
static void
gethashes(u8int *score, ulong *h)
{
int i;
u32int a, b;
a = 0;
b = 0;
for(i=4; i+8<=VtScoreSize; i+=8){
a ^= *(u32int*)(score+i);
b ^= *(u32int*)(score+i+4);
}
if(i+4 <= VtScoreSize) /* 20 is not 4-aligned */
a ^= *(u32int*)(score+i);
for(i=0; i<BloomMaxHash; i++, a+=b)
h[i] = a < BloomHeadSize*8 ? BloomHeadSize*8 : a;
}
static void
_markbloomfilter(Bloom *b, u8int *score)
{
int i, nnew;
ulong h[BloomMaxHash];
u32int x, *y, z, *tab;
trace("markbloomfilter", "markbloomfilter %V", score);
gethashes(score, h);
nnew = 0;
tab = (u32int*)b->data;
for(i=0; i<b->nhash; i++){
x = h[i];
y = &tab[(x&b->mask)>>5];
z = 1<<(x&31);
if(!(*y&z)){
nnew++;
*y |= z;
}
}
if(nnew)
addstat(StatBloomOnes, nnew);
trace("markbloomfilter", "markbloomfilter exit");
}
static int
_inbloomfilter(Bloom *b, u8int *score)
{
int i;
ulong h[BloomMaxHash], x;
u32int *tab;
gethashes(score, h);
tab = (u32int*)b->data;
for(i=0; i<b->nhash; i++){
x = h[i];
if(!(tab[(x&b->mask)>>5] & (1<<(x&31))))
return 0;
}
return 1;
}
int
inbloomfilter(Bloom *b, u8int *score)
{
int r;
uint ms;
if(b == nil)
return 1;
ms = msec();
rlock(&b->lk);
r = _inbloomfilter(b, score);
runlock(&b->lk);
ms = ms - msec();
addstat2(StatBloomLookup, 1, StatBloomLookupTime, ms);
if(r)
addstat(StatBloomMiss, 1);
else
addstat(StatBloomHit, 1);
return r;
}
void
markbloomfilter(Bloom *b, u8int *score)
{
if(b == nil)
return;
rlock(&b->lk);
qlock(&b->mod);
_markbloomfilter(b, score);
qunlock(&b->mod);
runlock(&b->lk);
}
static void
bloomwriteproc(void *v)
{
Bloom *b;
b = v;
for(;;){
recv(b->writechan, 0);
if(writebloom(b) < 0)
fprint(2, "oops! writing bloom: %r\n");
send(b->writedonechan, 0);
}
}
void
startbloomproc(Bloom *b)
{
b->writechan = chancreate(sizeof(void*), 0);
b->writedonechan = chancreate(sizeof(void*), 0);
vtproc(bloomwriteproc, b);
}