332 lines
11 KiB
Python
332 lines
11 KiB
Python
# Copyright 2011 Matt Chaput. All rights reserved.
|
|
#
|
|
# Redistribution and use in source and binary forms, with or without
|
|
# modification, are permitted provided that the following conditions are met:
|
|
#
|
|
# 1. Redistributions of source code must retain the above copyright notice,
|
|
# this list of conditions and the following disclaimer.
|
|
#
|
|
# 2. Redistributions in binary form must reproduce the above copyright
|
|
# notice, this list of conditions and the following disclaimer in the
|
|
# documentation and/or other materials provided with the distribution.
|
|
#
|
|
# THIS SOFTWARE IS PROVIDED BY MATT CHAPUT ``AS IS'' AND ANY EXPRESS OR
|
|
# IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
|
|
# MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
|
|
# EVENT SHALL MATT CHAPUT OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
|
|
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
|
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
|
|
# OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
|
|
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
|
|
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
|
|
# EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
|
#
|
|
# The views and conclusions contained in the software and documentation are
|
|
# those of the authors and should not be interpreted as representing official
|
|
# policies, either expressed or implied, of Matt Chaput.
|
|
|
|
import errno
|
|
import os
|
|
import sys
|
|
from threading import Lock
|
|
from shutil import copyfileobj
|
|
|
|
try:
|
|
import mmap
|
|
except ImportError:
|
|
mmap = None
|
|
|
|
from whoosh.compat import BytesIO, memoryview_
|
|
from whoosh.filedb.structfile import BufferFile, StructFile
|
|
from whoosh.filedb.filestore import FileStorage, StorageError
|
|
from whoosh.system import emptybytes
|
|
from whoosh.util import random_name
|
|
|
|
|
|
class CompoundStorage(FileStorage):
|
|
readonly = True
|
|
|
|
def __init__(self, dbfile, use_mmap=True, basepos=0):
|
|
self._file = dbfile
|
|
self.is_closed = False
|
|
|
|
# Seek to the end to get total file size (to check if mmap is OK)
|
|
dbfile.seek(0, os.SEEK_END)
|
|
filesize = self._file.tell()
|
|
dbfile.seek(basepos)
|
|
|
|
self._diroffset = self._file.read_long()
|
|
self._dirlength = self._file.read_int()
|
|
self._file.seek(self._diroffset)
|
|
self._dir = self._file.read_pickle()
|
|
self._options = self._file.read_pickle()
|
|
self._locks = {}
|
|
self._source = None
|
|
|
|
use_mmap = (
|
|
use_mmap
|
|
and hasattr(self._file, "fileno") # check file is a real file
|
|
and filesize < sys.maxsize # check fit on 32-bit Python
|
|
)
|
|
if mmap and use_mmap:
|
|
# Try to open the entire segment as a memory-mapped object
|
|
try:
|
|
fileno = self._file.fileno()
|
|
self._source = mmap.mmap(fileno, 0, access=mmap.ACCESS_READ)
|
|
except (mmap.error, OSError):
|
|
e = sys.exc_info()[1]
|
|
# If we got an error because there wasn't enough memory to
|
|
# open the map, ignore it and fall through, we'll just use the
|
|
# (slower) "sub-file" implementation
|
|
if e.errno == errno.ENOMEM:
|
|
pass
|
|
else:
|
|
raise
|
|
else:
|
|
# If that worked, we can close the file handle we were given
|
|
self._file.close()
|
|
self._file = None
|
|
|
|
def __repr__(self):
|
|
return "<%s (%s)>" % (self.__class__.__name__, self._name)
|
|
|
|
def close(self):
|
|
if self.is_closed:
|
|
raise Exception("Already closed")
|
|
self.is_closed = True
|
|
|
|
if self._source:
|
|
try:
|
|
self._source.close()
|
|
except BufferError:
|
|
del self._source
|
|
if self._file:
|
|
self._file.close()
|
|
|
|
def range(self, name):
|
|
try:
|
|
fileinfo = self._dir[name]
|
|
except KeyError:
|
|
raise NameError("Unknown file %r" % (name,))
|
|
return fileinfo["offset"], fileinfo["length"]
|
|
|
|
def open_file(self, name, *args, **kwargs):
|
|
if self.is_closed:
|
|
raise StorageError("Storage was closed")
|
|
|
|
offset, length = self.range(name)
|
|
if self._source:
|
|
# Create a memoryview/buffer from the mmap
|
|
buf = memoryview_(self._source, offset, length)
|
|
f = BufferFile(buf, name=name)
|
|
elif hasattr(self._file, "subset"):
|
|
f = self._file.subset(offset, length, name=name)
|
|
else:
|
|
f = StructFile(SubFile(self._file, offset, length), name=name)
|
|
return f
|
|
|
|
def list(self):
|
|
return list(self._dir.keys())
|
|
|
|
def file_exists(self, name):
|
|
return name in self._dir
|
|
|
|
def file_length(self, name):
|
|
info = self._dir[name]
|
|
return info["length"]
|
|
|
|
def file_modified(self, name):
|
|
info = self._dir[name]
|
|
return info["modified"]
|
|
|
|
def lock(self, name):
|
|
if name not in self._locks:
|
|
self._locks[name] = Lock()
|
|
return self._locks[name]
|
|
|
|
@staticmethod
|
|
def assemble(dbfile, store, names, **options):
|
|
assert names, names
|
|
|
|
directory = {}
|
|
basepos = dbfile.tell()
|
|
dbfile.write_long(0) # Directory position
|
|
dbfile.write_int(0) # Directory length
|
|
|
|
# Copy the files into the compound file
|
|
for name in names:
|
|
if name.endswith(".toc") or name.endswith(".seg"):
|
|
raise Exception(name)
|
|
|
|
for name in names:
|
|
offset = dbfile.tell()
|
|
length = store.file_length(name)
|
|
modified = store.file_modified(name)
|
|
directory[name] = {"offset": offset, "length": length,
|
|
"modified": modified}
|
|
f = store.open_file(name)
|
|
copyfileobj(f, dbfile)
|
|
f.close()
|
|
|
|
CompoundStorage.write_dir(dbfile, basepos, directory, options)
|
|
|
|
@staticmethod
|
|
def write_dir(dbfile, basepos, directory, options=None):
|
|
options = options or {}
|
|
|
|
dirpos = dbfile.tell() # Remember the start of the directory
|
|
dbfile.write_pickle(directory) # Write the directory
|
|
dbfile.write_pickle(options)
|
|
endpos = dbfile.tell() # Remember the end of the directory
|
|
dbfile.flush()
|
|
dbfile.seek(basepos) # Seek back to the start
|
|
dbfile.write_long(dirpos) # Directory position
|
|
dbfile.write_int(endpos - dirpos) # Directory length
|
|
|
|
dbfile.close()
|
|
|
|
|
|
class SubFile(object):
|
|
def __init__(self, parentfile, offset, length, name=None):
|
|
self._file = parentfile
|
|
self._offset = offset
|
|
self._length = length
|
|
self._end = offset + length
|
|
self._pos = 0
|
|
|
|
self.name = name
|
|
self.closed = False
|
|
|
|
def close(self):
|
|
self.closed = True
|
|
|
|
def subset(self, position, length, name=None):
|
|
start = self._offset + position
|
|
end = start + length
|
|
name = name or self.name
|
|
assert self._offset >= start >= self._end
|
|
assert self._offset >= end >= self._end
|
|
return SubFile(self._file, self._offset + position, length, name=name)
|
|
|
|
def read(self, size=None):
|
|
if size is None:
|
|
size = self._length - self._pos
|
|
else:
|
|
size = min(size, self._length - self._pos)
|
|
if size < 0:
|
|
size = 0
|
|
|
|
if size > 0:
|
|
self._file.seek(self._offset + self._pos)
|
|
self._pos += size
|
|
return self._file.read(size)
|
|
else:
|
|
return emptybytes
|
|
|
|
def readline(self):
|
|
maxsize = self._length - self._pos
|
|
self._file.seek(self._offset + self._pos)
|
|
data = self._file.readline()
|
|
if len(data) > maxsize:
|
|
data = data[:maxsize]
|
|
self._pos += len(data)
|
|
return data
|
|
|
|
def seek(self, where, whence=0):
|
|
if whence == 0: # Absolute
|
|
pos = where
|
|
elif whence == 1: # Relative
|
|
pos = self._pos + where
|
|
elif whence == 2: # From end
|
|
pos = self._length - where
|
|
else:
|
|
raise ValueError
|
|
|
|
self._pos = pos
|
|
|
|
def tell(self):
|
|
return self._pos
|
|
|
|
|
|
class CompoundWriter(object):
|
|
def __init__(self, tempstorage, buffersize=32 * 1024):
|
|
assert isinstance(buffersize, int)
|
|
self._tempstorage = tempstorage
|
|
self._tempname = "%s.ctmp" % random_name()
|
|
self._temp = tempstorage.create_file(self._tempname, mode="w+b")
|
|
self._buffersize = buffersize
|
|
self._streams = {}
|
|
|
|
def create_file(self, name):
|
|
ss = self.SubStream(self._temp, self._buffersize)
|
|
self._streams[name] = ss
|
|
return StructFile(ss)
|
|
|
|
def _readback(self):
|
|
temp = self._temp
|
|
for name, substream in self._streams.items():
|
|
substream.close()
|
|
|
|
def gen():
|
|
for f, offset, length in substream.blocks:
|
|
if f is None:
|
|
f = temp
|
|
f.seek(offset)
|
|
yield f.read(length)
|
|
|
|
yield (name, gen)
|
|
temp.close()
|
|
self._tempstorage.delete_file(self._tempname)
|
|
|
|
def save_as_compound(self, dbfile):
|
|
basepos = dbfile.tell()
|
|
dbfile.write_long(0) # Directory offset
|
|
dbfile.write_int(0) # Directory length
|
|
|
|
directory = {}
|
|
for name, blocks in self._readback():
|
|
filestart = dbfile.tell()
|
|
for block in blocks():
|
|
dbfile.write(block)
|
|
directory[name] = {"offset": filestart,
|
|
"length": dbfile.tell() - filestart}
|
|
|
|
CompoundStorage.write_dir(dbfile, basepos, directory)
|
|
|
|
def save_as_files(self, storage, name_fn):
|
|
for name, blocks in self._readback():
|
|
f = storage.create_file(name_fn(name))
|
|
for block in blocks():
|
|
f.write(block)
|
|
f.close()
|
|
|
|
class SubStream(object):
|
|
def __init__(self, dbfile, buffersize):
|
|
self._dbfile = dbfile
|
|
self._buffersize = buffersize
|
|
self._buffer = BytesIO()
|
|
self.blocks = []
|
|
|
|
def tell(self):
|
|
return sum(b[2] for b in self.blocks) + self._buffer.tell()
|
|
|
|
def write(self, inbytes):
|
|
bio = self._buffer
|
|
buflen = bio.tell()
|
|
length = buflen + len(inbytes)
|
|
if length >= self._buffersize:
|
|
offset = self._dbfile.tell()
|
|
self._dbfile.write(bio.getvalue()[:buflen])
|
|
self._dbfile.write(inbytes)
|
|
|
|
self.blocks.append((None, offset, length))
|
|
self._buffer.seek(0)
|
|
else:
|
|
bio.write(inbytes)
|
|
|
|
def close(self):
|
|
bio = self._buffer
|
|
length = bio.tell()
|
|
if length:
|
|
self.blocks.append((bio, 0, length))
|