Re: Number 9 of the mytest summary store: writing things
- From: Dave Cridland <dave cridland net>
- To: Dave Cridland <dave cridland net>
- Cc: Jeffrey Stedfast <fejj novell com>, tinymail-devel-list gnome org, Philip Van Hoof <spam pvanhoof be>
- Subject: Re: Number 9 of the mytest summary store: writing things
- Date: Mon, 07 Jan 2008 21:59:28 +0000
And another shot. This time I've included a uid method for
translating UIDs to sequence numbers, and various, probably
non-working expunge methods.
Dave.
--
Dave Cridland - mailto:dave cridland net - xmpp:dwd jabber org
- acap://acap.dave.cridland.net/byowner/user/dwd/bookmarks/
- http://dave.cridland.net/
Infotrope Polymer - ACAP, IMAP, ESMTP, and Lemonade
BLOCKID = 'blockid-'
class Error(Exception):
pass
class RangeNeededError(Error):
"""
The cache is missing data entirely for the range.
Issue a UID SEARCH.
"""
pass
class RefreshNeededError(Error):
"""
Flags for the supplied range are out of date, and need refreshing.
"""
pass
class CorruptedCacheError(Error):
"""
Cache is mismatching, such as UIDs out of order,
or an asserted UID match is failing.
Cache should be destroyed and reinitialized from the server.
"""
pass
"""
A cache, designed for holding UID mappings, and mutable message data such as
FLAGS and MODSEQ.
Errors include:
RangeNeededError - contains seq_start and seq_end members which will tell you
what to get from the server.
RefreshNeededError - likewise for FLAGS/MODSEQ data.
KeyError - data does not exist, such as UID not found.
IndexError - sequence number out of range.
"""
class modcache:
def __init__(self, dirname, uids_only=False):
"""
Initialize a modcache.
The first argument is a directory name.
The second optional argument is the struct format
of each record. The first member of the struct MUST be the UID,
although the actual format of it doesn't matter.
"""
import struct
self.__dirname = dirname
self.__blocks = None
self.__record_fmt = 'L4xQQ'
self.__uids_only = uids_only
if self.__uids_only:
self.__record_fmt = 'L'
self.__record_size = struct.calcsize(self.__record_fmt)
self.__maps = {}
self.__map_lru = []
self.__exists = None
self.__block_size = 32*1024/self.__record_size
def block_size(self, s):
"""
Return a suitable blocksize for this seqno.
"""
l = self.__exists
bs = self.__block_size
bs0 = 0
while l > s:
t = bs+bs0
bs0 = bs
bs = t
l -= bs
return bs
def set_exists(self, e):
self.__exists = e
def filename(self, b):
"""
Return a filename for the mapfile for this block.
"""
import os.path
return os.path.join(self.__dirname, '%s%d' % (BLOCKID, b))
def scan(self):
"""
Scan the directory for existing mapfiles.
"""
import os
import stat
self.__blocks = {}
for x in os.listdir(self.__dirname):
if x.startswith(BLOCKID):
import stat
b = int(x[len(BLOCKID):])
l = os.stat(x)[stat.ST_SIZE] / self.__record_size
self.__blocks[b] = l
print "Found block from %d, length %d" % (b,l)
def map(self, b):
"""
mmap a particular block.
"""
if b in self.__maps:
self.__map_lru.remove(b)
self.__map_lru.append(b)
return
import mmap
f = file(self.filename(b),'rb+')
m = mmap.mmap(f.fileno(), self.__blocks[b] * self.__record_size)
self.__map_lru.append(b)
self.__maps[b] = m
def getrecord(self, m, o):
import struct
o *= self.__record_size
return struct.unpack(self.__record_fmt, m[o:o+self.__record_size])
def unmap_all(self):
"""
Unmap all blocks, needed for certain cases.
"""
self.__maps = {}
def __getitem__(self, seq):
"""
Return the unpacked record at this sequence number, if it exists.
"""
if seq > self.__exists:
raise IndexError, seq
if seq < 1:
raise IndexError, seq
if self.__blocks is None:
self.scan()
for b,l in self.__blocks.items():
if b <= seq:
if (b + l - 1) >= seq:
import struct
self.map(b)
tmp = struct.unpack(self.__record_fmt, self.__maps[b][(seq - b)*self.__record_size:(seq - b + 1)*self.__record_size])
if tmp[0]:
return tmp
else:
raise CorruptedCacheError, "UID for record %d is 0" % seq
n = self.block_needed(seq)
if n:
raise RangeNeededError, n
else:
raise CorruptedCacheError, "%d not found, but no records needed?" % seq
def set_flags_now(self, seqno, uid, flags, modseq):
if seqno is None:
seqno = self.uid(uid)
elif uid is None:
uid = self.seqno(uid)
else:
self.set(seqno, [uid])
b = self.find_block(seqno)
self.unmap_all()
f = file(self.filename(b), "wb+")
f.seek((seqno-b)*self.__record_size)
f.write(struct.pack(self.__record_fmt, (uid,flags,modseq)))
f.close()
def set_flags(self, seqno, uid, flags, modseq):
self.set_flags_now(seqno, uid, flags, modseq)
def seqno(self, s):
"""
Given a seqno (msgno), find the corresponding UID.
May throw a RangeNeededError or an IndexError.
"""
return self[s][0]
def uid(self, u):
"""
Given a UID, find the corresponding sequence number.
May throw a RangeNeededError or a KeyError.
"""
if self.__blocks is None:
self.scan()
# To begin with, find the earliest and latest UIDs we have.
all_blocks = self.__blocks.keys()
all_blocks.sort()
if all_blocks:
seqno0 = all_blocks[0]
seqnon = all_blocks[-1]
seqnon += self.__blocks[seqnon] - 2
else: # Use the mailbox extents.
seqno0 = 1
seqnon = self.__exists
if self.__exists == 0:
raise KeyError, u
if self.__exists == 1:
if u != self.seqno(1):
raise KeyError, u
else:
return 1
if u < self.seqno(seqno0):
if seqno0 == 1:
raise KeyError, u
seqno0 = 1
if u > self.seqno(seqnon):
if seqnon == self.__exists:
raise KeyError, u
seqnon = self.__exists
# Now assume that UIDs increase uniformly.
uid0 = self.seqno(seqno0)
uidn = self.seqno(seqnon)
while True:
#print "For %d: s0: %d=>%d, sN: %d=>%d" % (u,seqno0,uid0,seqnon,uidn)
# Check we're still sane
if uid0 == u:
return seqno0
if uid0 == uidn:
raise KeyError, u
if uidn == u:
return seqnon
# Make a guess.
seqnox = seqno0 + int( ( u - uid0 ) * ( 1.0 * ( seqnon - seqno0 ) ) / ( uidn - uid0 ) )
uidx = self.seqno(seqnox)
#print " ==> %d=>%d" % (seqnox,uidx)
if uidx == u:
return seqnox
if uidx > u:
seqnon = seqnox - 1
uidn = self.seqno(seqnon)
seqno0_t = max(seqno0, seqnox - (uidx - u) - 1)
if seqno0_t != seqno0:
try:
self.seqno(seqno0_t)
seqno0 = seqno0_t
uid0 = self.seqno(seqno0)
except RangeNeededError:
pass
else:
seqno0 = seqnox + 1
uid0 = self.seqno(seqno0)
seqnon_t = min(seqnon, seqnox + (u - uidx) + 1)
if seqnon_t != seqnon:
try:
self.seqno(seqnon_t)
seqnon = seqnon_t
uidn = self.seqno(seqnon)
except RangeNeededError:
pass
def maps(self):
"""
For debugging.
"""
return self.__maps
def consolidate(self):
"""
Run after lots of things have happened.
Really ought to happen automagically.
"""
import os
all_blocks = self.__blocks.keys()
if not all_blocks:
print "No blocks to consolidate"
return
all_blocks.sort()
blocks = {}
blocks[all_blocks[0]] = self.__blocks[all_blocks[0]]
last_block = all_blocks[0]
for i in range(1, len(all_blocks)):
#print "Last block is",last_block
#print "Extent:",last_block + blocks[last_block]
#print "This block:",all_blocks[i]
if last_block + blocks[last_block] == all_blocks[i]:
#print "Candidate"
if self.__blocks[last_block] + self.__blocks[all_blocks[i]] < self.block_size(all_blocks[i]):
#print "Not too long"
self.unmap_all()
self.map(all_blocks[i])
f = file(self.filename(last_block),"ab")
f.write(self.__maps[all_blocks[i]])
f.close()
self.unmap_all()
os.remove(self.filename(all_blocks[i]))
blocks[last_block] += self.__blocks[all_blocks[i]]
continue
blocks[all_blocks[i]] = self.__blocks[all_blocks[i]]
last_block = all_blocks[i]
self.__blocks = blocks
def genrecord(self,uid):
import struct
if self.__uids_only:
return struct.pack(self.__record_fmt,uid)
else:
return struct.pack(self.__record_fmt,uid,0,0)
def set(self, seq_start, uids):
import struct
import os
near_block = None
stop = None
if self.__blocks is None:
self.scan()
for b,l in self.__blocks.items():
if b + l == seq_start:
near_block = b
if b > seq_start and b < (seq_start + len(uids)):
uids = uids[0:(b - seq_start)]
# Add some UIDs to any near block
if near_block is not None:
self.unmap_all()
offset = seq_start - near_block
length = min((self.block_size(seq_start)) - self.__blocks[near_block], len(uids))
print "Adding length",`length`
f = file(self.filename(near_block), 'ab')
f.seek(offset * self.__record_size)
f.write(''.join([self.genrecord(u) for u in uids[0:length]]))
f.close()
self.__blocks[near_block] += length
uids = uids[length:]
seq_start += length
# Now create new blocks
while uids:
length = min((self.block_size(seq_start)), len(uids))
f = file(self.filename(seq_start), 'wb')
f.write(''.join([self.genrecord(u) for u in uids[0:length]]))
f.close()
self.__blocks[seq_start] = length
uids = uids[length:]
seq_start += length
def addto(self, seq_start, uids):
"""
Insert records containing only the UIDs supplied at this point.
This might happen for CONTEXT.
"""
import struct
import os
# First move all subsequent UIDs.
blocks = {}
near_block = None
if self.__blocks is None:
self.scan()
for b,l in self.__blocks.items():
if b >= seq_start:
self.unmap_all()
os.rename(self.filename(b), self.filename(b+len(uids)))
blocks[b+len(uids)] = l
else:
blocks[b] = l
if b + l == seq_start:
near_block = b
self.__blocks = blocks
# Add some UIDs to any near block
if near_block is not None:
self.unmap_all()
offset = seq_start - near_block
length = min(self.block_size(seq_start) - self.__blocks[near_block], len(uids))
f = file(self.filename(near_block), 'rb+')
f.truncate((offset + 1 + length) * self.__record_size)
f.seek(offset * self.__record_size)
f.write(''.join([self.genrecord(u) for u in uids[0:length]]))
f.close()
self.__blocks[near_block] += length
uids = uids[length:]
seq_start += length
# Now create new blocks
while uids:
length = min(self.block_size(seq_start), len(uids))
f = file(self.filename(seq_start), 'wb')
f.write(''.join([self.genrecord(u) for u in uids[0:length]]))
f.close()
self.__blocks[seq_start] = length
uids = uids[length:]
seq_start += length
def block_needed(self, seq):
"""
Given a sequence number, find the start and end of a good range
to get from the server. Typically, you'd use UID SEARCH RETURN ()
If no range is needed, it will return None.
"""
low = None
high = None
if self.__blocks is None:
self.scan()
for b,l in self.__blocks.items():
if b <= seq:
if (b + l - 1) >= seq:
# We already have this seqno, no need to fetch anything.
return None
if low is None or b > low:
low = b
elif high is None or b < high:
high = b
print "Initial:",`low`,`high`,`self.block_size(seq)`
if low is None:
low = 1
if high is None:
high = self.__exists + 1
low += self.__blocks.get(low,0)
high -= 1
print "Fixed:",`low`,`high`
if high - low < 2*self.block_size(seq):
if high < low:
raise CorruptedCacheError, "High came out lower than low for %d" % (seq)
return (low,high)
clow = seq - self.block_size(seq)
chigh = seq + self.block_size(seq)
if chigh > high:
clow -= (chigh - high)
chigh = high
if clow < low:
chigh += (low - clow)
clow = low
if chigh <= clow:
raise CorruptedCacheError, "CHigh came out lower than CLow for %d" % (seq)
return clow,chigh
def expunge(self, seq):
"""
EXPUNGE.
"""
import os
self.__exists -= 1
self.unmap_all()
offset = 0
all_blocks = self.__blocks.keys()
if not all_blocks:
return
all_blocks.sort()
all_blocks.reverse()
for b in all_blocks:
if b > seq:
os.rename(self.filename(b), self.filename(b-1))
else:
if (self.__blocks[b] - 1) > (seq - b):
if self.__blocks[b] == 1:
os.remove(self.filename(b))
break
f = file(self.filename(b), "wb+")
data = ''
if (self.__blocks[b] - 2) != (seq - b):
f.seek((seq - b + 1) * self.__record_size)
data = f.read()
f.seek((seq - b) * self.__record_size)
f.write(data)
f.truncate((self.__blocks[b] - 1) * self.__record_size)
f.close()
break
self.scan()
def expunge_multi(self, seqs_in):
"""
For batch processing expunges.
"""
all_blocks = self.__blocks.keys()
all_blocks.sort()
# First, preprocess the seqs_in list so it's sorted.
seqs = seqs_in[:]
# We need to adjust the list so it's referencing seqnos
# right.
for x in range(len(seqs)):
s = seqs[x]
for i in range(len(seqs)-1,-1,-1):
if seqs[i] > seqs[x]:
seqs[i] -= 1
seqs.sort()
for x in range(len(seqs)):
seqs[x] += x
seqs.reverse()
# Now, we walk through the list of blocks we have, adjusting them
# as approriate.
offset = 0
self.unmap_all()
s0 = seqs.pop()
for b in all_blocks:
if not seqs:
break
if b <= s0:
if (self.__blocks[b] - 1) < (s0 - b):
if b == s0:
s = [s0 - b]
else:
s = [-1,s0-b]
noffset = 1
while len(seqs) and (self.__blocks[b] - 1) < (seqs[0] - b):
s.append(seqs.pop() - b)
noffset += 1
s.append(l)
self.map(b)
fname = self.filename(None)
f = file(fname, 'wb')
m = self.__maps[b]
for i in range(len(s)-1):
f.write(m[s[i]+1:s[i+1]])
f.close()
self.unmap_all()
os.remove(self.filename(b))
os.rename(fname,self.filename(b-offset))
offset += noffset
s0 = seqs.pop()
continue
if offset:
os.rename(self.filename(b),self.filename(b-offset))
self.scan()
def removefrom(self, where, what):
if where:
for i in range(len(what)):
self.expunge(where)
else:
self.vanished(what)
def vanished(self, uids):
self.expunge_multi([self.seqno(u) for u in uids])
import infotrope
sm = infotrope.sm()#trace=True)
imap = sm['imap://dwd turner dave cridland net/']
import newcache
c = newcache.modcache('.')
t,r,s = imap.send('SELECT','INBOX')
imap.wait(t)
print ""
print " Ready"
print ""
exists = int(imap.mailbox_info['EXISTS'])
print "%d messages." % exists
c.set_exists(exists)
class foo:
def __init__(self, c):
self.c = c
def _handle_search(self, t,r,s):
self.c.set(t.search_base,s)
return t,r,s
l = range(1,exists+1)
import random
random.shuffle(l)
def fetch_block(start, end):
if start == 0:
start = 1
print "Searching for %d:%d" % (start, end)
t,r,s = imap.send('UID SEARCH', '%d:%d' % (start, end))
t.search_base = start
handler = foo(c)
imap.register(t,handler)
imap.wait(t)
while True:
try:
print "UID %d is seqno %d" % (15970, c.uid(15970))
break
except newcache.RangeNeededError, e:
fetch_block(*e)
while l:
try:
c.seqno(l[-1])
l.pop()
except newcache.RangeNeededError, e:
fetch_block(*e)
c.consolidate()
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]