Re: Number 9 of the mytest summary store: writing things



And another shot. This time I've included a uid method for translating UIDs to sequence numbers, and various, probably non-working expunge methods.

Dave.
--
Dave Cridland - mailto:dave cridland net - xmpp:dwd jabber org
 - acap://acap.dave.cridland.net/byowner/user/dwd/bookmarks/
 - http://dave.cridland.net/
Infotrope Polymer - ACAP, IMAP, ESMTP, and Lemonade
BLOCKID = 'blockid-'

class Error(Exception):
    pass

class RangeNeededError(Error):
    """
    The cache is missing data entirely for the range.
    Issue a UID SEARCH.
    """
    pass

class RefreshNeededError(Error):
    """
    Flags for the supplied range are out of date, and need refreshing.
    """
    pass

class CorruptedCacheError(Error):
    """
    Cache is mismatching, such as UIDs out of order,
    or an asserted UID match is failing.
    Cache should be destroyed and reinitialized from the server.
    """
    pass

"""
A cache, designed for holding UID mappings, and mutable message data such as
FLAGS and MODSEQ.
Errors include:
RangeNeededError - contains seq_start and seq_end members which will tell you
what to get from the server.
RefreshNeededError - likewise for FLAGS/MODSEQ data.
KeyError - data does not exist, such as UID not found.
IndexError - sequence number out of range.
"""

class modcache:
    def __init__(self, dirname, uids_only=False):
        """
        Initialize a modcache.
        The first argument is a directory name.
        The second optional argument is the struct format
        of each record. The first member of the struct MUST be the UID,
        although the actual format of it doesn't matter.
        """
        import struct
        self.__dirname = dirname
        self.__blocks = None
        self.__record_fmt = 'L4xQQ'
        self.__uids_only = uids_only
        if self.__uids_only:
            self.__record_fmt = 'L'
        self.__record_size = struct.calcsize(self.__record_fmt)
        self.__maps = {}
        self.__map_lru = []
        self.__exists = None
        self.__block_size = 32*1024/self.__record_size

    def block_size(self, s):
        """
        Return a suitable blocksize for this seqno.
        """
        l = self.__exists
        bs = self.__block_size
        bs0 = 0
        while l > s:
            t = bs+bs0
            bs0 = bs
            bs = t
            l -= bs
        return bs
        
    def set_exists(self, e):
        self.__exists = e
        
    def filename(self, b):
        """
        Return a filename for the mapfile for this block.
        """
        import os.path
        return os.path.join(self.__dirname, '%s%d' % (BLOCKID, b))

    def scan(self):
        """
        Scan the directory for existing mapfiles.
        """
        import os
        import stat
        self.__blocks = {}
        for x in os.listdir(self.__dirname):
            if x.startswith(BLOCKID):
                import stat
                b = int(x[len(BLOCKID):])
                l = os.stat(x)[stat.ST_SIZE] / self.__record_size
                self.__blocks[b] = l
                print "Found block from %d, length %d" % (b,l)

    def map(self, b):
        """
        mmap a particular block.
        """
        if b in self.__maps:
            self.__map_lru.remove(b)
            self.__map_lru.append(b)
            return
        import mmap
        f = file(self.filename(b),'rb+')
        m = mmap.mmap(f.fileno(), self.__blocks[b] * self.__record_size)
        self.__map_lru.append(b)
        self.__maps[b] = m

    def getrecord(self, m, o):
        import struct
        o *= self.__record_size
        return struct.unpack(self.__record_fmt, m[o:o+self.__record_size])

    def unmap_all(self):
        """
        Unmap all blocks, needed for certain cases.
        """
        self.__maps = {}

    def __getitem__(self, seq):
        """
        Return the unpacked record at this sequence number, if it exists.
        """
        if seq > self.__exists:
            raise IndexError, seq
        if seq < 1:
            raise IndexError, seq
        if self.__blocks is None:
            self.scan()
        for b,l in self.__blocks.items():
            if b <= seq:
                if (b + l - 1) >= seq:
                    import struct
                    self.map(b)
                    tmp = struct.unpack(self.__record_fmt, self.__maps[b][(seq - b)*self.__record_size:(seq - b + 1)*self.__record_size])
                    if tmp[0]:
                        return tmp
                    else:
                        raise CorruptedCacheError, "UID for record %d is 0" % seq
        n = self.block_needed(seq)
        if n:
            raise RangeNeededError, n
        else:
            raise CorruptedCacheError, "%d not found, but no records needed?" % seq

    def set_flags_now(self, seqno, uid, flags, modseq):
        if seqno is None:
            seqno = self.uid(uid)
        elif uid is None:
            uid = self.seqno(uid)
        else:
            self.set(seqno, [uid])
        b = self.find_block(seqno)
        self.unmap_all()
        f = file(self.filename(b), "wb+")
        f.seek((seqno-b)*self.__record_size)
        f.write(struct.pack(self.__record_fmt, (uid,flags,modseq)))
        f.close()

    def set_flags(self, seqno, uid, flags, modseq):
        self.set_flags_now(seqno, uid, flags, modseq)

    def seqno(self, s):
        """
        Given a seqno (msgno), find the corresponding UID.
        May throw a RangeNeededError or an IndexError.
        """
        return self[s][0]

    def uid(self, u):
        """
        Given a UID, find the corresponding sequence number.
        May throw a RangeNeededError or a KeyError.
        """
        if self.__blocks is None:
            self.scan()
        # To begin with, find the earliest and latest UIDs we have.
        all_blocks = self.__blocks.keys()
        all_blocks.sort()
        if all_blocks:
            seqno0 = all_blocks[0]
            seqnon = all_blocks[-1]
            seqnon += self.__blocks[seqnon] - 2
        else: # Use the mailbox extents.
            seqno0 = 1
            seqnon = self.__exists
        if self.__exists == 0:
            raise KeyError, u
        if self.__exists == 1:
            if u != self.seqno(1):
                raise KeyError, u
            else:
                return 1
        if u < self.seqno(seqno0):
            if seqno0 == 1:
                raise KeyError, u
            seqno0 = 1
        if u > self.seqno(seqnon):
            if seqnon == self.__exists:
                raise KeyError, u
            seqnon = self.__exists
        # Now assume that UIDs increase uniformly.
        uid0 = self.seqno(seqno0)
        uidn = self.seqno(seqnon)
        while True:
            #print "For %d: s0: %d=>%d, sN: %d=>%d" % (u,seqno0,uid0,seqnon,uidn)
            # Check we're still sane
            if uid0 == u:
                return seqno0
            if uid0 == uidn:
                raise KeyError, u
            if uidn == u:
                return seqnon
            # Make a guess.
            seqnox = seqno0 + int( ( u - uid0 ) * ( 1.0 * ( seqnon - seqno0 ) ) / ( uidn - uid0 ) )
            uidx = self.seqno(seqnox)
            #print "  ==> %d=>%d" % (seqnox,uidx)
            if uidx == u:
                return seqnox
            if uidx > u:
                seqnon = seqnox - 1
                uidn = self.seqno(seqnon)
                seqno0_t = max(seqno0, seqnox - (uidx - u) - 1)
                if seqno0_t != seqno0:
                    try:
                        self.seqno(seqno0_t)
                        seqno0 = seqno0_t
                        uid0 = self.seqno(seqno0)
                    except RangeNeededError:
                        pass
            else:
                seqno0 = seqnox + 1
                uid0 = self.seqno(seqno0)
                seqnon_t = min(seqnon, seqnox + (u - uidx) + 1)
                if seqnon_t != seqnon:
                    try:
                        self.seqno(seqnon_t)
                        seqnon = seqnon_t
                        uidn = self.seqno(seqnon)
                    except RangeNeededError:
                        pass

    def maps(self):
        """
        For debugging.
        """
        return self.__maps

    def consolidate(self):
        """
        Run after lots of things have happened.
        Really ought to happen automagically.
        """
        import os
        all_blocks = self.__blocks.keys()
        if not all_blocks:
            print "No blocks to consolidate"
            return
        all_blocks.sort()
        blocks = {}
        blocks[all_blocks[0]] = self.__blocks[all_blocks[0]]
        last_block = all_blocks[0]
        for i in range(1, len(all_blocks)):
            #print "Last block is",last_block
            #print "Extent:",last_block + blocks[last_block]
            #print "This block:",all_blocks[i]
            if last_block + blocks[last_block] == all_blocks[i]:
                #print "Candidate"
                if self.__blocks[last_block] + self.__blocks[all_blocks[i]] < self.block_size(all_blocks[i]):
                    #print "Not too long"
                    self.unmap_all()
                    self.map(all_blocks[i])
                    f = file(self.filename(last_block),"ab")
                    f.write(self.__maps[all_blocks[i]])
                    f.close()
                    self.unmap_all()
                    os.remove(self.filename(all_blocks[i]))
                    blocks[last_block] += self.__blocks[all_blocks[i]]
                    continue
            blocks[all_blocks[i]] = self.__blocks[all_blocks[i]]
            last_block = all_blocks[i]
        self.__blocks = blocks

    def genrecord(self,uid):
        import struct
        if self.__uids_only:
            return struct.pack(self.__record_fmt,uid)
        else:
            return struct.pack(self.__record_fmt,uid,0,0)

    def set(self, seq_start, uids):
        import struct
        import os
        near_block = None
        stop = None
        if self.__blocks is None:
            self.scan()
        for b,l in self.__blocks.items():
            if b + l == seq_start:
                near_block = b
            if b > seq_start and b < (seq_start + len(uids)):
                uids = uids[0:(b - seq_start)]
        # Add some UIDs to any near block
        if near_block is not None:
            self.unmap_all()
            offset = seq_start - near_block
            length = min((self.block_size(seq_start)) - self.__blocks[near_block], len(uids))
            print "Adding length",`length`
            f = file(self.filename(near_block), 'ab')
            f.seek(offset * self.__record_size)
            f.write(''.join([self.genrecord(u) for u in uids[0:length]]))
            f.close()
            self.__blocks[near_block] += length
            uids = uids[length:]
            seq_start += length
        # Now create new blocks
        while uids:
            length = min((self.block_size(seq_start)), len(uids))
            f = file(self.filename(seq_start), 'wb')
            f.write(''.join([self.genrecord(u) for u in uids[0:length]]))
            f.close()
            self.__blocks[seq_start] = length
            uids = uids[length:]
            seq_start += length

    def addto(self, seq_start, uids):
        """
        Insert records containing only the UIDs supplied at this point.
        This might happen for CONTEXT.
        """
        import struct
        import os
        # First move all subsequent UIDs.
        blocks = {}
        near_block = None
        if self.__blocks is None:
            self.scan()
        for b,l in self.__blocks.items():
            if b >= seq_start:
                self.unmap_all()
                os.rename(self.filename(b), self.filename(b+len(uids)))
                blocks[b+len(uids)] = l
            else:
                blocks[b] = l
                if b + l == seq_start:
                    near_block = b
        self.__blocks = blocks
        # Add some UIDs to any near block
        if near_block is not None:
            self.unmap_all()
            offset = seq_start - near_block
            length = min(self.block_size(seq_start) - self.__blocks[near_block], len(uids))
            f = file(self.filename(near_block), 'rb+')
            f.truncate((offset + 1 + length) * self.__record_size)
            f.seek(offset * self.__record_size)
            f.write(''.join([self.genrecord(u) for u in uids[0:length]]))
            f.close()
            self.__blocks[near_block] += length
            uids = uids[length:]
            seq_start += length
        # Now create new blocks
        while uids:
            length = min(self.block_size(seq_start), len(uids))
            f = file(self.filename(seq_start), 'wb')
            f.write(''.join([self.genrecord(u) for u in uids[0:length]]))
            f.close()
            self.__blocks[seq_start] = length
            uids = uids[length:]
            seq_start += length

    def block_needed(self, seq):
        """
        Given a sequence number, find the start and end of a good range
        to get from the server. Typically, you'd use UID SEARCH RETURN ()
        If no range is needed, it will return None.
        """
        low = None
        high = None
        if self.__blocks is None:
            self.scan()
        for b,l in self.__blocks.items():
            if b <= seq:
                if (b + l - 1) >= seq:
                    # We already have this seqno, no need to fetch anything.
                    return None
                if low is None or b > low:
                    low = b
            elif high is None or b < high:
                high = b
        print "Initial:",`low`,`high`,`self.block_size(seq)`
        if low is None:
            low = 1
        if high is None:
            high = self.__exists + 1
        low += self.__blocks.get(low,0)
        high -= 1
        print "Fixed:",`low`,`high`
        if high - low < 2*self.block_size(seq):
            if high < low:
                raise CorruptedCacheError, "High came out lower than low for %d" % (seq)
            return (low,high)
        clow = seq - self.block_size(seq)
        chigh = seq + self.block_size(seq)
        if chigh > high:
            clow -= (chigh - high)
            chigh = high
        if clow < low:
            chigh += (low - clow)
            clow = low
        if chigh <= clow:
            raise CorruptedCacheError, "CHigh came out lower than CLow for %d" % (seq)
        return clow,chigh
    
    def expunge(self, seq):
        """
        EXPUNGE.
        """
        import os
        self.__exists -= 1
        self.unmap_all()
        offset = 0
        all_blocks = self.__blocks.keys()
        if not all_blocks:
            return
        all_blocks.sort()
        all_blocks.reverse()
        for b in all_blocks:
            if b > seq:
                os.rename(self.filename(b), self.filename(b-1))
            else:
                if (self.__blocks[b] - 1) > (seq - b):
                    if self.__blocks[b] == 1:
                        os.remove(self.filename(b))
                        break
                    f = file(self.filename(b), "wb+")
                    data = ''
                    if (self.__blocks[b] - 2) != (seq - b):
                        f.seek((seq - b + 1) * self.__record_size)
                        data = f.read()
                    f.seek((seq - b) * self.__record_size)
                    f.write(data)
                    f.truncate((self.__blocks[b] - 1) * self.__record_size)
                    f.close()
                    break
        self.scan()

    def expunge_multi(self, seqs_in):
        """
        For batch processing expunges.
        """
        all_blocks = self.__blocks.keys()
        all_blocks.sort()
        # First, preprocess the seqs_in list so it's sorted.
        seqs = seqs_in[:]
        # We need to adjust the list so it's referencing seqnos
        # right.
        for x in range(len(seqs)):
            s = seqs[x]
            for i in range(len(seqs)-1,-1,-1):
                if seqs[i] > seqs[x]:
                    seqs[i] -= 1
        seqs.sort()
        for x in range(len(seqs)):
            seqs[x] += x
        seqs.reverse()
        # Now, we walk through the list of blocks we have, adjusting them
        # as approriate.
        offset = 0
        self.unmap_all()
        s0 = seqs.pop()
        for b in all_blocks:
            if not seqs:
                break
            if b <= s0:
                if (self.__blocks[b] - 1) < (s0 - b):
                    if b == s0:
                        s = [s0 - b]
                    else:
                        s = [-1,s0-b]
                    noffset = 1
                    while len(seqs) and (self.__blocks[b] - 1) < (seqs[0] - b):
                        s.append(seqs.pop() - b)
                        noffset += 1
                    s.append(l)
                    self.map(b)
                    fname = self.filename(None)
                    f = file(fname, 'wb')
                    m = self.__maps[b]
                    for i in range(len(s)-1):
                        f.write(m[s[i]+1:s[i+1]])
                    f.close()
                    self.unmap_all()
                    os.remove(self.filename(b))
                    os.rename(fname,self.filename(b-offset))
                    offset += noffset
                    s0 = seqs.pop()
                    continue
            if offset:
                os.rename(self.filename(b),self.filename(b-offset))
        self.scan()

    def removefrom(self, where, what):
        if where:
            for i in range(len(what)):
                self.expunge(where)
        else:
            self.vanished(what)

    def vanished(self, uids):
        self.expunge_multi([self.seqno(u) for u in uids])
import infotrope

sm = infotrope.sm()#trace=True)
imap = sm['imap://dwd turner dave cridland net/']

import newcache

c = newcache.modcache('.')

t,r,s = imap.send('SELECT','INBOX')
imap.wait(t)

print ""
print " Ready"
print ""

exists = int(imap.mailbox_info['EXISTS'])

print "%d messages." % exists

c.set_exists(exists)

class foo:
    def __init__(self, c):
        self.c = c

    def _handle_search(self, t,r,s):
        self.c.set(t.search_base,s)
        return t,r,s

l = range(1,exists+1)
import random
random.shuffle(l)

def fetch_block(start, end):
    if start == 0:
        start = 1
    print "Searching for %d:%d" % (start, end)
    t,r,s = imap.send('UID SEARCH', '%d:%d' % (start, end))
    t.search_base = start
    handler = foo(c)
    imap.register(t,handler)
    imap.wait(t)

while True:
    try:
        print "UID %d is seqno %d" % (15970, c.uid(15970))
        break
    except newcache.RangeNeededError, e:
        fetch_block(*e)

while l:
    try:
        c.seqno(l[-1])
        l.pop()
    except newcache.RangeNeededError, e:
        fetch_block(*e)

c.consolidate()



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]