479 lines
14 KiB
Python
479 lines
14 KiB
Python
# -*- coding: utf-8 -*-
|
|
import os
|
|
import bisect
|
|
import struct
|
|
import binascii
|
|
|
|
try:
|
|
{}.iterkeys
|
|
iterkeys = lambda x: x.iterkeys()
|
|
except AttributeError:
|
|
iterkeys = lambda x: x.keys()
|
|
try:
|
|
unicode
|
|
except NameError:
|
|
unicode = str
|
|
|
|
class BuddyError(Exception):
|
|
pass
|
|
|
|
class Block(object):
|
|
def __init__(self, allocator, offset, size):
|
|
self._allocator = allocator
|
|
self._offset = offset
|
|
self._size = size
|
|
self._value = bytearray(allocator.read(offset, size))
|
|
self._pos = 0
|
|
self._dirty = False
|
|
|
|
def __len__(self):
|
|
return self._size
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_value, traceback):
|
|
self.close()
|
|
|
|
def close(self):
|
|
if self._dirty:
|
|
self.flush()
|
|
|
|
def flush(self):
|
|
if self._dirty:
|
|
self._dirty = False
|
|
self._allocator.write(self._offset, self._value)
|
|
|
|
def invalidate(self):
|
|
self._dirty = False
|
|
|
|
def zero_fill(self):
|
|
len = self._size - self._pos
|
|
zeroes = b'\0' * len
|
|
self._value[self._pos:self._size] = zeroes
|
|
self._dirty = True
|
|
|
|
def tell(self):
|
|
return self._pos
|
|
|
|
def seek(self, pos, whence=os.SEEK_SET):
|
|
if whence == os.SEEK_CUR:
|
|
pos += self._pos
|
|
elif whence == os.SEEK_END:
|
|
pos = self._size - pos
|
|
|
|
if pos < 0 or pos > self._size:
|
|
raise ValueError('Seek out of range in Block instance')
|
|
|
|
self._pos = pos
|
|
|
|
def read(self, size_or_format):
|
|
if isinstance(size_or_format, (str, unicode, bytes)):
|
|
size = struct.calcsize(size_or_format)
|
|
fmt = size_or_format
|
|
else:
|
|
size = size_or_format
|
|
fmt = None
|
|
|
|
if self._size - self._pos < size:
|
|
raise BuddyError('Unable to read %lu bytes in block' % size)
|
|
|
|
data = self._value[self._pos:self._pos + size]
|
|
self._pos += size
|
|
|
|
if fmt is not None:
|
|
if isinstance(data, bytearray):
|
|
return struct.unpack_from(fmt, bytes(data))
|
|
else:
|
|
return struct.unpack(fmt, data)
|
|
else:
|
|
return data
|
|
|
|
def write(self, data_or_format, *args):
|
|
if len(args):
|
|
data = struct.pack(data_or_format, *args)
|
|
else:
|
|
data = data_or_format
|
|
|
|
if self._pos + len(data) > self._size:
|
|
raise ValueError('Attempt to write past end of Block')
|
|
|
|
self._value[self._pos:self._pos + len(data)] = data
|
|
self._pos += len(data)
|
|
|
|
self._dirty = True
|
|
|
|
def insert(self, data_or_format, *args):
|
|
if len(args):
|
|
data = struct.pack(data_or_format, *args)
|
|
else:
|
|
data = data_or_format
|
|
|
|
del self._value[-len(data):]
|
|
self._value[self._pos:self._pos] = data
|
|
self._pos += len(data)
|
|
|
|
self._dirty = True
|
|
|
|
def delete(self, size):
|
|
if self._pos + size > self._size:
|
|
raise ValueError('Attempt to delete past end of Block')
|
|
del self._value[self._pos:self._pos + size]
|
|
self._value += b'\0' * size
|
|
self._dirty = True
|
|
|
|
def __str__(self):
|
|
return binascii.b2a_hex(self._value)
|
|
|
|
class Allocator(object):
|
|
def __init__(self, the_file):
|
|
self._file = the_file
|
|
self._dirty = False
|
|
|
|
self._file.seek(0)
|
|
|
|
# Read the header
|
|
magic1, magic2, offset, size, offset2, self._unknown1 \
|
|
= self.read(-4, '>I4sIII16s')
|
|
|
|
if magic2 != b'Bud1' or magic1 != 1:
|
|
raise BuddyError('Not a buddy file')
|
|
|
|
if offset != offset2:
|
|
raise BuddyError('Root addresses differ')
|
|
|
|
self._root = Block(self, offset, size)
|
|
|
|
# Read the block offsets
|
|
count, self._unknown2 = self._root.read('>II')
|
|
self._offsets = []
|
|
c = (count + 255) & ~255
|
|
while c:
|
|
self._offsets += self._root.read('>256I')
|
|
c -= 256
|
|
self._offsets = self._offsets[:count]
|
|
|
|
# Read the TOC
|
|
self._toc = {}
|
|
count = self._root.read('>I')[0]
|
|
for n in range(count):
|
|
nlen = self._root.read('B')[0]
|
|
name = bytes(self._root.read(nlen))
|
|
value = self._root.read('>I')[0]
|
|
self._toc[name] = value
|
|
|
|
# Read the free lists
|
|
self._free = []
|
|
for n in range(32):
|
|
count = self._root.read('>I')
|
|
self._free.append(list(self._root.read('>%uI' % count)))
|
|
|
|
@classmethod
|
|
def open(cls, file_or_name, mode='r+'):
|
|
if isinstance(file_or_name, (str, unicode)):
|
|
if not 'b' in mode:
|
|
mode = mode[:1] + 'b' + mode[1:]
|
|
f = open(file_or_name, mode)
|
|
else:
|
|
f = file_or_name
|
|
|
|
if 'w' in mode:
|
|
# Create an empty file in this case
|
|
f.truncate()
|
|
|
|
# An empty root block needs 1264 bytes:
|
|
#
|
|
# 0 4 offset count
|
|
# 4 4 unknown
|
|
# 8 4 root block offset (2048)
|
|
# 12 255 * 4 padding (offsets are in multiples of 256)
|
|
# 1032 4 toc count (0)
|
|
# 1036 228 free list
|
|
# total 1264
|
|
|
|
# The free list will contain the following:
|
|
#
|
|
# 0 5 * 4 no blocks of width less than 5
|
|
# 20 6 * 8 1 block each of widths 5 to 10
|
|
# 68 4 no blocks of width 11 (allocated for the root)
|
|
# 72 19 * 8 1 block each of widths 12 to 30
|
|
# 224 4 no blocks of width 31
|
|
# total 228
|
|
#
|
|
# (The reason for this layout is that we allocate 2**5 bytes for
|
|
# the header, which splits the initial 2GB region into every size
|
|
# below 2**31, including *two* blocks of size 2**5, one of which
|
|
# we take. The root block itself then needs a block of size
|
|
# 2**11. Conveniently, each of these initial blocks will be
|
|
# located at offset 2**n where n is its width.)
|
|
|
|
# Write the header
|
|
header = struct.pack(b'>I4sIII16s',
|
|
1, b'Bud1',
|
|
2048, 1264, 2048,
|
|
b'\x00\x00\x10\x0c'
|
|
b'\x00\x00\x00\x87'
|
|
b'\x00\x00\x20\x0b'
|
|
b'\x00\x00\x00\x00')
|
|
f.write(header)
|
|
f.write(b'\0' * 2016)
|
|
|
|
# Write the root block
|
|
free_list = [struct.pack(b'>5I', 0, 0, 0, 0, 0)]
|
|
for n in range(5, 11):
|
|
free_list.append(struct.pack(b'>II', 1, 2**n))
|
|
free_list.append(struct.pack(b'>I', 0))
|
|
for n in range(12, 31):
|
|
free_list.append(struct.pack(b'>II', 1, 2**n))
|
|
free_list.append(struct.pack(b'>I', 0))
|
|
|
|
root = b''.join([struct.pack(b'>III', 1, 0, 2048 | 5),
|
|
struct.pack(b'>I', 0) * 255,
|
|
struct.pack(b'>I', 0)] + free_list)
|
|
f.write(root)
|
|
|
|
return Allocator(f)
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_value, traceback):
|
|
self.close()
|
|
|
|
def close(self):
|
|
self.flush()
|
|
self._file.close()
|
|
|
|
def flush(self):
|
|
if self._dirty:
|
|
size = self._root_block_size()
|
|
self.allocate(size, 0)
|
|
with self.get_block(0) as rblk:
|
|
self._write_root_block_into(rblk)
|
|
|
|
addr = self._offsets[0]
|
|
offset = addr & ~0x1f
|
|
size = 1 << (addr & 0x1f)
|
|
|
|
self._file.seek(0, os.SEEK_SET)
|
|
self._file.write(struct.pack(b'>I4sIII16s',
|
|
1, b'Bud1',
|
|
offset, size, offset,
|
|
self._unknown1))
|
|
|
|
self._dirty = False
|
|
|
|
self._file.flush()
|
|
|
|
def read(self, offset, size_or_format):
|
|
"""Read data at `offset', or raise an exception. `size_or_format'
|
|
may either be a byte count, in which case we return raw data,
|
|
or a format string for `struct.unpack', in which case we
|
|
work out the size and unpack the data before returning it."""
|
|
# N.B. There is a fixed offset of four bytes(!)
|
|
self._file.seek(offset + 4, os.SEEK_SET)
|
|
|
|
if isinstance(size_or_format, (str, unicode)):
|
|
size = struct.calcsize(size_or_format)
|
|
fmt = size_or_format
|
|
else:
|
|
size = size_or_format
|
|
fmt = None
|
|
|
|
ret = self._file.read(size)
|
|
if len(ret) < size:
|
|
ret += b'\0' * (size - len(ret))
|
|
|
|
if fmt is not None:
|
|
if isinstance(ret, bytearray):
|
|
ret = struct.unpack_from(fmt, bytes(ret))
|
|
else:
|
|
ret = struct.unpack(fmt, ret)
|
|
|
|
return ret
|
|
|
|
def write(self, offset, data_or_format, *args):
|
|
"""Write data at `offset', or raise an exception. `data_or_format'
|
|
may either be the data to write, or a format string for `struct.pack',
|
|
in which case we pack the additional arguments and write the
|
|
resulting data."""
|
|
# N.B. There is a fixed offset of four bytes(!)
|
|
self._file.seek(offset + 4, os.SEEK_SET)
|
|
|
|
if len(args):
|
|
data = struct.pack(data_or_format, *args)
|
|
else:
|
|
data = data_or_format
|
|
|
|
self._file.write(data)
|
|
|
|
def get_block(self, block):
|
|
try:
|
|
addr = self._offsets[block]
|
|
except IndexError:
|
|
return None
|
|
|
|
offset = addr & ~0x1f
|
|
size = 1 << (addr & 0x1f)
|
|
|
|
return Block(self, offset, size)
|
|
|
|
def _root_block_size(self):
|
|
"""Return the number of bytes required by the root block."""
|
|
# Offsets
|
|
size = 8
|
|
size += 4 * ((len(self._offsets) + 255) & ~255)
|
|
|
|
# TOC
|
|
size += 4
|
|
size += sum([5 + len(s) for s in self._toc])
|
|
|
|
# Free list
|
|
size += sum([4 + 4 * len(fl) for fl in self._free])
|
|
|
|
return size
|
|
|
|
def _write_root_block_into(self, block):
|
|
# Offsets
|
|
block.write('>II', len(self._offsets), self._unknown2)
|
|
block.write('>%uI' % len(self._offsets), *self._offsets)
|
|
extra = len(self._offsets) & 255
|
|
if extra:
|
|
block.write(b'\0\0\0\0' * (256 - extra))
|
|
|
|
# TOC
|
|
keys = list(self._toc.keys())
|
|
keys.sort()
|
|
|
|
block.write('>I', len(keys))
|
|
for k in keys:
|
|
block.write('B', len(k))
|
|
block.write(k)
|
|
block.write('>I', self._toc[k])
|
|
|
|
# Free list
|
|
for w, f in enumerate(self._free):
|
|
block.write('>I', len(f))
|
|
if len(f):
|
|
block.write('>%uI' % len(f), *f)
|
|
|
|
def _buddy(self, offset, width):
|
|
f = self._free[width]
|
|
b = offset ^ (1 << width)
|
|
|
|
try:
|
|
ndx = f.index(b)
|
|
except ValueError:
|
|
ndx = None
|
|
|
|
return (f, b, ndx)
|
|
|
|
def _release(self, offset, width):
|
|
# Coalesce
|
|
while True:
|
|
f,b,ndx = self._buddy(offset, width)
|
|
|
|
if ndx is None:
|
|
break
|
|
|
|
offset &= b
|
|
width += 1
|
|
del f[ndx]
|
|
|
|
# Add to the list
|
|
bisect.insort(f, offset)
|
|
|
|
# Mark as dirty
|
|
self._dirty = True
|
|
|
|
def _alloc(self, width):
|
|
w = width
|
|
while not self._free[w]:
|
|
w += 1
|
|
while w > width:
|
|
offset = self._free[w].pop(0)
|
|
w -= 1
|
|
self._free[w] = [offset, offset ^ (1 << w)]
|
|
self._dirty = True
|
|
return self._free[width].pop(0)
|
|
|
|
def allocate(self, bytes, block=None):
|
|
"""Allocate or reallocate a block such that it has space for at least
|
|
`bytes' bytes."""
|
|
if block is None:
|
|
# Find the first unused block
|
|
try:
|
|
block = self._offsets.index(0)
|
|
except ValueError:
|
|
block = len(self._offsets)
|
|
self._offsets.append(0)
|
|
|
|
# Compute block width
|
|
width = max(bytes.bit_length(), 5)
|
|
|
|
addr = self._offsets[block]
|
|
offset = addr & ~0x1f
|
|
|
|
if addr:
|
|
blkwidth = addr & 0x1f
|
|
if blkwidth == width:
|
|
return block
|
|
self._release(offset, width)
|
|
self._offsets[block] = 0
|
|
|
|
offset = self._alloc(width)
|
|
self._offsets[block] = offset | width
|
|
return block
|
|
|
|
def release(self, block):
|
|
addr = self._offsets[block]
|
|
|
|
if addr:
|
|
width = addr & 0x1f
|
|
offset = addr & ~0x1f
|
|
self._release(offset, width)
|
|
|
|
if block == len(self._offsets):
|
|
del self._offsets[block]
|
|
else:
|
|
self._offsets[block] = 0
|
|
|
|
def __len__(self):
|
|
return len(self._toc)
|
|
|
|
def __getitem__(self, key):
|
|
if not isinstance(key, (str, unicode)):
|
|
raise TypeError('Keys must be of string type')
|
|
if not isinstance(key, bytes):
|
|
key = key.encode('latin_1')
|
|
return self._toc[key]
|
|
|
|
def __setitem__(self, key, value):
|
|
if not isinstance(key, (str, unicode)):
|
|
raise TypeError('Keys must be of string type')
|
|
if not isinstance(key, bytes):
|
|
key = key.encode('latin_1')
|
|
self._toc[key] = value
|
|
self._dirty = True
|
|
|
|
def __delitem__(self, key):
|
|
if not isinstance(key, (str, unicode)):
|
|
raise TypeError('Keys must be of string type')
|
|
if not isinstance(key, bytes):
|
|
key = key.encode('latin_1')
|
|
del self._toc[key]
|
|
self._dirty = True
|
|
|
|
def iterkeys(self):
|
|
return iterkeys(self._toc)
|
|
|
|
def keys(self):
|
|
return iterkeys(self._toc)
|
|
|
|
def __iter__(self):
|
|
return iterkeys(self._toc)
|
|
|
|
def __contains__(self, key):
|
|
return key in self._toc
|
|
|