Re: Number 9 of the mytest summary store: writing things
- From: Dave Cridland <dave cridland net>
- To: Philip Van Hoof <spam pvanhoof be>
- Cc: Jeffrey Stedfast <fejj novell com>, tinymail-devel-list gnome org
- Subject: Re: Number 9 of the mytest summary store: writing things
- Date: Mon, 07 Jan 2008 09:47:21 +0000
If you show me yours, I'll show you mine. :-)
Attached is a pure python mmap cache, designed solely to address:
- UID mapping.
- FLAGS
- MODSEQ
I'm mildly concerned that not all clients will want MODSEQ, and that
it's not storing enough bit-space for FLAGS, but these are pretty
minor issues, and in fact, I'm not yet bothering to store either
FLAGS or MODSEQ yet anyway.
Basically, this is handling the stuff that I think your is currently
doing least optimally. ENVELOPEs don't change, so storing them in the
same place as FLAGS doesn't make sense. Admittedly, if we want to
handle ANNOTATE, we need something more complex than this, but it'd
still be a bright idea to keep it all away from the more static data.
That all said, it's working, and seems reasonably quick - faster than
the code I have, so I'll probably try to blend it in.
So... How it works.
I'm blocking the data in multiple mmap files - this needs more
cleverness, because really, I need to be using smaller blocks toward
the end of larger mailboxes. mmap files are called by the sequence
number they start at.
Finding data by sequence number is relatively easy, although less
than efficient as yet, I'm just running through all blocks in hash
order to find one that looks as if it might be right.
When a UID gets removed - currently not really implemented - I don't
rewrite all subsequent blocks. Instead, I rename them. This means
less I/O, which is always a good thing.
I think that this is basically a sound design, albeit badly
implemented.
Some potential improvements:
1) I suspect that having an index file containing block sequence
starts, lengths, and UID extents would be faster than using the
directory listing. Renaming files causing the directory "file" to be
rewritten, so it makes sense to avoid this, and give the blocks a
64-bit ID instead.
2) Lots of bits of this are very badly done and it's filled with
bugs. That said, the basic "set" and "[]" operations work. I think.
Mostly.
I've attached both newcache.py, which contains the implementation,
and ct1.py, which contains a simple test driver.
Dave.
--
Dave Cridland - mailto:dave cridland net - xmpp:dwd jabber org
- acap://acap.dave.cridland.net/byowner/user/dwd/bookmarks/
- http://dave.cridland.net/
Infotrope Polymer - ACAP, IMAP, ESMTP, and Lemonade
import infotrope
sm = infotrope.sm(trace=True)
imap = sm['imap://dwd turner dave cridland net/']
import newcache
c = newcache.modcache('.')
t,r,s = imap.send('SELECT','INBOX')
imap.wait(t)
print ""
print " Ready"
print ""
exists = int(imap.mailbox_info['EXISTS'])
print "%d messages." % exists
class foo:
def __init__(self, c):
self.c = c
def _handle_search(self, t,r,s):
self.c.set(t.search_base,s)
return t,r,s
l = range(exists)
import random
random.shuffle(l)
for i in l:
feed = c.block_needed(i+1, exists)
if feed is not None:
if feed[0] == 0:
feed = (1,feed[1])
t,r,s = imap.send('UID SEARCH', '%d:%d' % feed)
t.search_base = feed[0]
handler = foo(c)
imap.register(t,handler)
imap.wait(t)
#print "Message %d, UID %d" % (i+1, c[i+1][0])
c.consolidate()
BLOCKID = 'blockid-'
BLOCKSZ = 500
class modcache:
def __init__(self, dirname, fmt=None):
"""
Initialize a modcache.
The first argument is a directory name.
The second optional argument is the struct format
of each record. The first member of the struct MUST be the UID,
although the actual format of it doesn't matter.
"""
import struct
self.__dirname = dirname
self.__blocks = None
self.__record_fmt = fmt or 'L4xQQ'
self.__record_size = struct.calcsize(self.__record_fmt)
self.__maps = {}
self.__map_lru = []
def filename(self, b):
"""
Return a filename for the mapfile for this block.
"""
import os.path
return os.path.join(self.__dirname, '%s%d' % (BLOCKID, b))
def scan(self):
"""
Scan the directory for existing mapfiles.
"""
import os
import stat
self.__blocks = {}
for x in os.listdir(self.__dirname):
if x.startswith(BLOCKID):
import stat
b = int(x[len(BLOCKID):])
l = os.stat(x)[stat.ST_SIZE] / self.__record_size
self.__blocks[b] = l
print "Found block from %d, length %d" % (b,l)
def map(self, b):
"""
mmap a particular block.
"""
if b in self.__maps:
self.__map_lru.remove(b)
self.__map_lru.append(b)
return
import mmap
f = file(self.filename(b),'rb+')
m = mmap.mmap(f.fileno(), self.__blocks[b] * self.__record_size)
self.__map_lru.append(b)
self.__maps[b] = m
def unmap_all(self):
"""
Unmap all blocks, needed for certain cases.
"""
self.__maps = {}
def __getitem__(self, seq):
"""
Return the unpacked record at this sequence number, if it exists.
"""
if self.__blocks is None:
self.scan()
for b,l in self.__blocks.items():
if b <= seq:
if l > (seq - b):
import struct
self.map(b)
tmp = struct.unpack(self.__record_fmt, self.__maps[b][(seq - b)*self.__record_size:(seq - b + 1)*self.__record_size])
if tmp[0]:
return tmp
raise KeyError, seq
def __setitem__(self, seq, data):
"""
Replace the existing record at this point.
"""
import struct
if self.__blocks is None:
self.scan()
block_low = None
for b,l in self.__blocks.items():
if b <= seq:
block_low = b
if l >= (seq - b):
self.unmap(b)
f = file(self.filename(b), 'rb+')
f.seek((seq - b) * self.__record_size)
f.write(struct.pack(self.__record_fmt, data))
f.close()
return
elif (seq - b) <= BLOCKSZ:
self.unmap(b)
f = file(self.filename(b), 'rb+')
f.truncate((seq - b + 1) * self.__record_size)
f.seek((seq - b) * self.__record_size)
f.write(struct.pack(self.__record_fmt, data))
f.close()
return
b = int(seq / BLOCKSZ) * BLOCKSZ
f = file(self.filename(0), 'wb+')
f.truncate((seq - b + 1) * self.__record_size)
f.seek((seq - b) * self.__record_size)
f.write(struct.pack(self.__record_fmt, data))
f.close()
def consolidate(self):
import os
all_blocks = self.__blocks.keys()
if not all_blocks:
print "No blocks to consolidate"
return
all_blocks.sort()
blocks = {}
blocks[all_blocks[0]] = self.__blocks[all_blocks[0]]
last_block = all_blocks[0]
for i in range(1, len(all_blocks)):
print "Last block is",last_block
if last_block + blocks[last_block] >= all_blocks[i]:
print "Candidate"
if self.__blocks[last_block] + self.__blocks[all_blocks[i]] < 4*BLOCKSZ:
print "Not too long"
self.unmap_all()
self.map(all_blocks[i])
f = file(self.filename(last_block),"ab")
f.write(self.__maps[all_blocks[i]])
f.close()
self.unmap_all()
os.remove(self.filename(all_blocks[i]))
blocks[last_block] += self.__blocks[all_blocks[i]]
continue
blocks[all_blocks[i]] = self.__blocks[all_blocks[i]]
last_block = all_blocks[i]
self.__blocks = blocks
def set(self, seq_start, uids):
import struct
import os
near_block = None
stop = None
if self.__blocks is None:
self.scan()
for b,l in self.__blocks.items():
if b + l == seq_start:
near_block = b
if b > seq_start and b < (seq_start + len(uids)):
uids = uids[0:(b - seq_start)]
# Add some UIDs to any near block
if near_block is not None:
self.unmap_all()
offset = seq_start - near_block
length = min((4*BLOCKSZ) - self.__blocks[near_block], len(uids))
f = file(self.filename(near_block), 'rb+')
f.truncate((offset + 1 + length) * self.__record_size)
f.seek(offset * self.__record_size)
f.write(''.join([struct.pack(self.__record_fmt,u,0,0) for u in uids[0:length]]))
f.close()
self.__blocks[near_block] += length
uids = uids[length:]
seq_start += length
# Now create new blocks
while uids:
length = min((4*BLOCKSZ), len(uids))
f = file(self.filename(seq_start), 'wb')
f.write(''.join([struct.pack(self.__record_fmt,u,0,0) for u in uids[0:length]]))
f.close()
self.__blocks[seq_start] = length
uids = uids[length:]
seq_start += length
def insert(self, seq_start, uids):
"""
Insert records containing only the UIDs supplied at this point.
Typically, you'd do this for either a FETCH response you got a UID in,
or else a UID SEARCH result.
"""
import struct
import os
# First move all subsequent UIDs.
blocks = {}
near_block = None
if self.__blocks is None:
self.scan()
for b,l in self.__blocks.items():
if b >= seq_start:
self.unmap_all()
os.rename(self.filename(b), self.filename(b+len(uids)))
blocks[b+len(uids)] = l
else:
blocks[b] = l
if b + l == seq_start:
near_block = b
self.__blocks = blocks
# Add some UIDs to any near block
if near_block is not None:
self.unmap_all()
offset = seq_start - near_block
length = min(BLOCKSZ - self.__blocks[near_block], len(uids))
f = file(self.filename(near_block), 'rb+')
f.truncate((offset + 1 + length) * self.__record_size)
f.seek(offset * self.__record_size)
f.write(''.join([struct.pack(self.__record_fmt,u,0,0) for u in uids[0:length]]))
f.close()
self.__blocks[near_block] += length
uids = uids[length:]
seq_start += length
# Now create new blocks
while uids:
length = min(BLOCKSZ, len(uids))
f = file(self.filename(seq_start), 'wb')
f.write(''.join([struct.pack(self.__record_fmt,u,0,0) for u in uids[0:length]]))
f.close()
self.__blocks[seq_start] = length
uids = uids[length:]
seq_start += length
def block_needed(self, seq, mango_max=None):
"""
Given a sequence number, find the start and end of a good range
to get from the server. Typically, you'd use UID SEARCH RETURN ()
If no range is needed, it will return None.
Note that it may return a seqno higher than the number of messages,
so you'll need to trim it down, unless you supply a message count.
"""
low = None
high = None
if self.__blocks is None:
self.scan()
for b,l in self.__blocks.items():
if b <= seq:
if l > (seq - b):
# We already have this seqno, no need to fetch anything.
return None
if low is None or b > low:
low = b
elif high is None or b < high:
high = b
if low is None:
low = 0
if high is None:
high = (mango_max + 1) or (seq + BLOCKSZ)
low += self.__blocks.get(low,0)
high -= 1
if high - low < 2*BLOCKSZ:
return (low,high)
clow = seq - BLOCKSZ
chigh = seq + BLOCKSZ
if chigh > high:
clow -= (chigh - high)
chigh = high
if clow < low:
chigh += (low - clow)
clow = low
return clow,chigh
def kill(self, seqs):
import os
if not seqs:
return
self.unmap_all()
offset = 0
seqs = seqs.sort()
seqs = seqs.reverse()
for b,l in self.__blocks:
block_array = None
while seqs and (seqs[0]-offset) < b:
offset += 1
seqs.pop()
os.rename(self.filename(b),self.filename(b-offset))
b -= offset
while seqs and (seqs[0]-offset) > b and l >= (b - (seqs[0]-offset)):
if block_array is None:
block_array = self.read_block_array(b)
seq = seqs[0] - offset - b
off = b - seq
off *= self.__record_size * off
block_array = block_array[0:off] + block_array[off+self.__record_size:]
offset += 1
if block_array is not None:
f = file(self.block_name(b),'wb')
f.write(block_array)
f.close()
block_array = None
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]