Re: Number 9 of the mytest summary store: writing things



If you show me yours, I'll show you mine. :-)

Attached is a pure python mmap cache, designed solely to address:

- UID mapping.
- FLAGS
- MODSEQ

I'm mildly concerned that not all clients will want MODSEQ, and that it's not storing enough bit-space for FLAGS, but these are pretty minor issues, and in fact, I'm not yet bothering to store either FLAGS or MODSEQ yet anyway.

Basically, this is handling the stuff that I think your is currently doing least optimally. ENVELOPEs don't change, so storing them in the same place as FLAGS doesn't make sense. Admittedly, if we want to handle ANNOTATE, we need something more complex than this, but it'd still be a bright idea to keep it all away from the more static data.

That all said, it's working, and seems reasonably quick - faster than the code I have, so I'll probably try to blend it in.

So... How it works.

I'm blocking the data in multiple mmap files - this needs more cleverness, because really, I need to be using smaller blocks toward the end of larger mailboxes. mmap files are called by the sequence number they start at.

Finding data by sequence number is relatively easy, although less than efficient as yet, I'm just running through all blocks in hash order to find one that looks as if it might be right.

When a UID gets removed - currently not really implemented - I don't rewrite all subsequent blocks. Instead, I rename them. This means less I/O, which is always a good thing.

I think that this is basically a sound design, albeit badly implemented.

Some potential improvements:

1) I suspect that having an index file containing block sequence starts, lengths, and UID extents would be faster than using the directory listing. Renaming files causing the directory "file" to be rewritten, so it makes sense to avoid this, and give the blocks a 64-bit ID instead.

2) Lots of bits of this are very badly done and it's filled with bugs. That said, the basic "set" and "[]" operations work. I think. Mostly.

I've attached both newcache.py, which contains the implementation, and ct1.py, which contains a simple test driver.

Dave.
--
Dave Cridland - mailto:dave cridland net - xmpp:dwd jabber org
 - acap://acap.dave.cridland.net/byowner/user/dwd/bookmarks/
 - http://dave.cridland.net/
Infotrope Polymer - ACAP, IMAP, ESMTP, and Lemonade
import infotrope

sm = infotrope.sm(trace=True)
imap = sm['imap://dwd turner dave cridland net/']

import newcache

c = newcache.modcache('.')

t,r,s = imap.send('SELECT','INBOX')
imap.wait(t)

print ""
print " Ready"
print ""

exists = int(imap.mailbox_info['EXISTS'])

print "%d messages." % exists

class foo:
    def __init__(self, c):
        self.c = c

    def _handle_search(self, t,r,s):
        self.c.set(t.search_base,s)
        return t,r,s

l = range(exists)
import random
random.shuffle(l)

for i in l:
    feed = c.block_needed(i+1, exists)
    if feed is not None:
        if feed[0] == 0:
            feed = (1,feed[1])
        t,r,s = imap.send('UID SEARCH', '%d:%d' % feed)
        t.search_base = feed[0]
        handler = foo(c)
        imap.register(t,handler)
        imap.wait(t)
    #print "Message %d, UID %d" % (i+1, c[i+1][0])

c.consolidate()

BLOCKID = 'blockid-'
BLOCKSZ = 500

class modcache:
    def __init__(self, dirname, fmt=None):
        """
        Initialize a modcache.
        The first argument is a directory name.
        The second optional argument is the struct format
        of each record. The first member of the struct MUST be the UID,
        although the actual format of it doesn't matter.
        """
        import struct
        self.__dirname = dirname
        self.__blocks = None
        self.__record_fmt = fmt or 'L4xQQ'
        self.__record_size = struct.calcsize(self.__record_fmt)
        self.__maps = {}
        self.__map_lru = []

    def filename(self, b):
        """
        Return a filename for the mapfile for this block.
        """
        import os.path
        return os.path.join(self.__dirname, '%s%d' % (BLOCKID, b))

    def scan(self):
        """
        Scan the directory for existing mapfiles.
        """
        import os
        import stat
        self.__blocks = {}
        for x in os.listdir(self.__dirname):
            if x.startswith(BLOCKID):
                import stat
                b = int(x[len(BLOCKID):])
                l = os.stat(x)[stat.ST_SIZE] / self.__record_size
                self.__blocks[b] = l
                print "Found block from %d, length %d" % (b,l)

    def map(self, b):
        """
        mmap a particular block.
        """
        if b in self.__maps:
            self.__map_lru.remove(b)
            self.__map_lru.append(b)
            return
        import mmap
        f = file(self.filename(b),'rb+')
        m = mmap.mmap(f.fileno(), self.__blocks[b] * self.__record_size)
        self.__map_lru.append(b)
        self.__maps[b] = m

    def unmap_all(self):
        """
        Unmap all blocks, needed for certain cases.
        """
        self.__maps = {}

    def __getitem__(self, seq):
        """
        Return the unpacked record at this sequence number, if it exists.
        """
        if self.__blocks is None:
            self.scan()
        for b,l in self.__blocks.items():
            if b <= seq:
                if l > (seq - b):
                    import struct
                    self.map(b)
                    tmp = struct.unpack(self.__record_fmt, self.__maps[b][(seq - b)*self.__record_size:(seq - b + 1)*self.__record_size])
                    if tmp[0]:
                        return tmp
        raise KeyError, seq

    def __setitem__(self, seq, data):
        """
        Replace the existing record at this point.
        """
        import struct
        if self.__blocks is None:
            self.scan()
        block_low = None
        for b,l in self.__blocks.items():
            if b <= seq:
                block_low = b
                if l >= (seq - b):
                    self.unmap(b)
                    f = file(self.filename(b), 'rb+')
                    f.seek((seq - b) * self.__record_size)
                    f.write(struct.pack(self.__record_fmt, data))
                    f.close()
                    return
                elif (seq - b) <= BLOCKSZ:
                    self.unmap(b)
                    f = file(self.filename(b), 'rb+')
                    f.truncate((seq - b + 1) * self.__record_size)
                    f.seek((seq - b) * self.__record_size)
                    f.write(struct.pack(self.__record_fmt, data))
                    f.close()
                    return
        b = int(seq / BLOCKSZ) * BLOCKSZ
        f = file(self.filename(0), 'wb+')
        f.truncate((seq - b + 1) * self.__record_size)
        f.seek((seq - b) * self.__record_size)
        f.write(struct.pack(self.__record_fmt, data))
        f.close()

    def consolidate(self):
        import os
        all_blocks = self.__blocks.keys()
        if not all_blocks:
            print "No blocks to consolidate"
            return
        all_blocks.sort()
        blocks = {}
        blocks[all_blocks[0]] = self.__blocks[all_blocks[0]]
        last_block = all_blocks[0]
        for i in range(1, len(all_blocks)):
            print "Last block is",last_block
            if last_block + blocks[last_block] >= all_blocks[i]:
                print "Candidate"
                if self.__blocks[last_block] + self.__blocks[all_blocks[i]] < 4*BLOCKSZ:
                    print "Not too long"
                    self.unmap_all()
                    self.map(all_blocks[i])
                    f = file(self.filename(last_block),"ab")
                    f.write(self.__maps[all_blocks[i]])
                    f.close()
                    self.unmap_all()
                    os.remove(self.filename(all_blocks[i]))
                    blocks[last_block] += self.__blocks[all_blocks[i]]
                    continue
            blocks[all_blocks[i]] = self.__blocks[all_blocks[i]]
            last_block = all_blocks[i]
        self.__blocks = blocks

    def set(self, seq_start, uids):
        import struct
        import os
        near_block = None
        stop = None
        if self.__blocks is None:
            self.scan()
        for b,l in self.__blocks.items():
            if b + l == seq_start:
                near_block = b
            if b > seq_start and b < (seq_start + len(uids)):
                uids = uids[0:(b - seq_start)]
        # Add some UIDs to any near block
        if near_block is not None:
            self.unmap_all()
            offset = seq_start - near_block
            length = min((4*BLOCKSZ) - self.__blocks[near_block], len(uids))
            f = file(self.filename(near_block), 'rb+')
            f.truncate((offset + 1 + length) * self.__record_size)
            f.seek(offset * self.__record_size)
            f.write(''.join([struct.pack(self.__record_fmt,u,0,0) for u in uids[0:length]]))
            f.close()
            self.__blocks[near_block] += length
            uids = uids[length:]
            seq_start += length
        # Now create new blocks
        while uids:
            length = min((4*BLOCKSZ), len(uids))
            f = file(self.filename(seq_start), 'wb')
            f.write(''.join([struct.pack(self.__record_fmt,u,0,0) for u in uids[0:length]]))
            f.close()
            self.__blocks[seq_start] = length
            uids = uids[length:]
            seq_start += length

    def insert(self, seq_start, uids):
        """
        Insert records containing only the UIDs supplied at this point.
        Typically, you'd do this for either a FETCH response you got a UID in,
        or else a UID SEARCH result.
        """
        import struct
        import os
        # First move all subsequent UIDs.
        blocks = {}
        near_block = None
        if self.__blocks is None:
            self.scan()
        for b,l in self.__blocks.items():
            if b >= seq_start:
                self.unmap_all()
                os.rename(self.filename(b), self.filename(b+len(uids)))
                blocks[b+len(uids)] = l
            else:
                blocks[b] = l
                if b + l == seq_start:
                    near_block = b
        self.__blocks = blocks
        # Add some UIDs to any near block
        if near_block is not None:
            self.unmap_all()
            offset = seq_start - near_block
            length = min(BLOCKSZ - self.__blocks[near_block], len(uids))
            f = file(self.filename(near_block), 'rb+')
            f.truncate((offset + 1 + length) * self.__record_size)
            f.seek(offset * self.__record_size)
            f.write(''.join([struct.pack(self.__record_fmt,u,0,0) for u in uids[0:length]]))
            f.close()
            self.__blocks[near_block] += length
            uids = uids[length:]
            seq_start += length
        # Now create new blocks
        while uids:
            length = min(BLOCKSZ, len(uids))
            f = file(self.filename(seq_start), 'wb')
            f.write(''.join([struct.pack(self.__record_fmt,u,0,0) for u in uids[0:length]]))
            f.close()
            self.__blocks[seq_start] = length
            uids = uids[length:]
            seq_start += length

    def block_needed(self, seq, mango_max=None):
        """
        Given a sequence number, find the start and end of a good range
        to get from the server. Typically, you'd use UID SEARCH RETURN ()
        If no range is needed, it will return None.
        Note that it may return a seqno higher than the number of messages,
        so you'll need to trim it down, unless you supply a message count.
        """
        low = None
        high = None
        if self.__blocks is None:
            self.scan()
        for b,l in self.__blocks.items():
            if b <= seq:
                if l > (seq - b):
                    # We already have this seqno, no need to fetch anything.
                    return None
                if low is None or b > low:
                    low = b
            elif high is None or b < high:
                high = b
        if low is None:
            low = 0
        if high is None:
            high = (mango_max + 1) or (seq + BLOCKSZ)
        low += self.__blocks.get(low,0)
        high -= 1
        if high - low < 2*BLOCKSZ:
            return (low,high)
        clow = seq - BLOCKSZ
        chigh = seq + BLOCKSZ
        if chigh > high:
            clow -= (chigh - high)
            chigh = high
        if clow < low:
            chigh += (low - clow)
            clow = low
        return clow,chigh
    
    def kill(self, seqs):
        import os
        if not seqs:
            return
        self.unmap_all()
        offset = 0
        seqs = seqs.sort()
        seqs = seqs.reverse()
        for b,l in self.__blocks:
            block_array = None
            while seqs and (seqs[0]-offset) < b:
                offset += 1
                seqs.pop()
            os.rename(self.filename(b),self.filename(b-offset))
            b -= offset
            while seqs and (seqs[0]-offset) > b and l >= (b - (seqs[0]-offset)):
                if block_array is None:
                    block_array = self.read_block_array(b)
                seq = seqs[0] - offset - b
                off = b - seq
                off *= self.__record_size * off
                block_array = block_array[0:off] + block_array[off+self.__record_size:]
                offset += 1
            if block_array is not None:
                f = file(self.block_name(b),'wb')
                f.write(block_array)
                f.close()
                block_array = None


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]