386 lines
15 KiB
Python
386 lines
15 KiB
Python
|
# Copyright 2011 Matt Chaput. All rights reserved.
|
||
|
#
|
||
|
# Redistribution and use in source and binary forms, with or without
|
||
|
# modification, are permitted provided that the following conditions are met:
|
||
|
#
|
||
|
# 1. Redistributions of source code must retain the above copyright notice,
|
||
|
# this list of conditions and the following disclaimer.
|
||
|
#
|
||
|
# 2. Redistributions in binary form must reproduce the above copyright
|
||
|
# notice, this list of conditions and the following disclaimer in the
|
||
|
# documentation and/or other materials provided with the distribution.
|
||
|
#
|
||
|
# THIS SOFTWARE IS PROVIDED BY MATT CHAPUT ``AS IS'' AND ANY EXPRESS OR
|
||
|
# IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
|
||
|
# MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
|
||
|
# EVENT SHALL MATT CHAPUT OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
|
||
|
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
||
|
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
|
||
|
# OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
|
||
|
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
|
||
|
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
|
||
|
# EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||
|
#
|
||
|
# The views and conclusions contained in the software and documentation are
|
||
|
# those of the authors and should not be interpreted as representing official
|
||
|
# policies, either expressed or implied, of Matt Chaput.
|
||
|
|
||
|
from __future__ import with_statement
|
||
|
import os
|
||
|
from multiprocessing import Process, Queue, cpu_count
|
||
|
|
||
|
from whoosh.compat import queue, xrange, iteritems, pickle
|
||
|
from whoosh.codec import base
|
||
|
from whoosh.writing import PostingPool, SegmentWriter
|
||
|
from whoosh.externalsort import imerge
|
||
|
from whoosh.util import random_name
|
||
|
|
||
|
|
||
|
def finish_subsegment(writer, k=64):
|
||
|
# Tell the pool to finish up the current file
|
||
|
writer.pool.save()
|
||
|
# Tell the pool to merge any and all runs in the pool until there
|
||
|
# is only one run remaining. "k" is an optional parameter passed
|
||
|
# from the parent which sets the maximum number of files to open
|
||
|
# while reducing.
|
||
|
writer.pool.reduce_to(1, k)
|
||
|
|
||
|
# The filename of the single remaining run
|
||
|
runname = writer.pool.runs[0]
|
||
|
# The indexed field names
|
||
|
fieldnames = writer.pool.fieldnames
|
||
|
# The segment object (parent can use this to re-open the files created
|
||
|
# by the sub-writer)
|
||
|
segment = writer._partial_segment()
|
||
|
|
||
|
return runname, fieldnames, segment
|
||
|
|
||
|
|
||
|
# Multiprocessing Writer
|
||
|
|
||
|
class SubWriterTask(Process):
|
||
|
# This is a Process object that takes "jobs" off a job Queue, processes
|
||
|
# them, and when it's done, puts a summary of its work on a results Queue
|
||
|
|
||
|
def __init__(self, storage, indexname, jobqueue, resultqueue, kwargs,
|
||
|
multisegment):
|
||
|
Process.__init__(self)
|
||
|
self.storage = storage
|
||
|
self.indexname = indexname
|
||
|
self.jobqueue = jobqueue
|
||
|
self.resultqueue = resultqueue
|
||
|
self.kwargs = kwargs
|
||
|
self.multisegment = multisegment
|
||
|
self.running = True
|
||
|
|
||
|
def run(self):
|
||
|
# This is the main loop of the process. OK, so the way this works is
|
||
|
# kind of brittle and stupid, but I had to figure out how to use the
|
||
|
# multiprocessing module, work around bugs, and address performance
|
||
|
# issues, so there is at least some reasoning behind some of this
|
||
|
|
||
|
# The "parent" task farms individual documents out to the subtasks for
|
||
|
# indexing. You could pickle the actual documents and put them in the
|
||
|
# queue, but that is not very performant. Instead, we assume the tasks
|
||
|
# share a filesystem and use that to pass the information around. The
|
||
|
# parent task writes a certain number of documents to a file, then puts
|
||
|
# the filename on the "job queue". A subtask gets the filename off the
|
||
|
# queue and reads through the file processing the documents.
|
||
|
|
||
|
jobqueue = self.jobqueue
|
||
|
resultqueue = self.resultqueue
|
||
|
multisegment = self.multisegment
|
||
|
|
||
|
# Open a placeholder object representing the index
|
||
|
ix = self.storage.open_index(self.indexname)
|
||
|
# Open a writer for the index. The _lk=False parameter means to not try
|
||
|
# to lock the index (the parent object that started me takes care of
|
||
|
# locking the index)
|
||
|
writer = self.writer = SegmentWriter(ix, _lk=False, **self.kwargs)
|
||
|
|
||
|
# If the parent task calls cancel() on me, it will set self.running to
|
||
|
# False, so I'll notice the next time through the loop
|
||
|
while self.running:
|
||
|
# Take an object off the job queue
|
||
|
jobinfo = jobqueue.get()
|
||
|
# If the object is None, it means the parent task wants me to
|
||
|
# finish up
|
||
|
if jobinfo is None:
|
||
|
break
|
||
|
# The object from the queue is a tuple of (filename,
|
||
|
# number_of_docs_in_file). Pass those two pieces of information as
|
||
|
# arguments to _process_file().
|
||
|
self._process_file(*jobinfo)
|
||
|
# jobqueue.task_done()
|
||
|
|
||
|
if not self.running:
|
||
|
# I was cancelled, so I'll cancel my underlying writer
|
||
|
writer.cancel()
|
||
|
else:
|
||
|
if multisegment:
|
||
|
# Actually finish the segment and return it with no run
|
||
|
runname = None
|
||
|
fieldnames = writer.pool.fieldnames
|
||
|
segment = writer._finalize_segment()
|
||
|
else:
|
||
|
# Merge all runs in the writer's pool into one run, close the
|
||
|
# segment, and return the run name and the segment
|
||
|
k = self.kwargs.get("k", 64)
|
||
|
runname, fieldnames, segment = finish_subsegment(writer, k)
|
||
|
|
||
|
# Put the results (the run filename and the segment object) on the
|
||
|
# result queue
|
||
|
resultqueue.put((runname, fieldnames, segment), timeout=5)
|
||
|
|
||
|
def _process_file(self, filename, doc_count):
|
||
|
# This method processes a "job file" written out by the parent task. A
|
||
|
# job file is a series of pickled (code, arguments) tuples. Currently
|
||
|
# the only command codes is 0=add_document
|
||
|
|
||
|
writer = self.writer
|
||
|
tempstorage = writer.temp_storage()
|
||
|
|
||
|
load = pickle.load
|
||
|
with tempstorage.open_file(filename).raw_file() as f:
|
||
|
for _ in xrange(doc_count):
|
||
|
# Load the next pickled tuple from the file
|
||
|
code, args = load(f)
|
||
|
assert code == 0
|
||
|
writer.add_document(**args)
|
||
|
# Remove the job file
|
||
|
tempstorage.delete_file(filename)
|
||
|
|
||
|
def cancel(self):
|
||
|
self.running = False
|
||
|
|
||
|
|
||
|
class MpWriter(SegmentWriter):
|
||
|
def __init__(self, ix, procs=None, batchsize=100, subargs=None,
|
||
|
multisegment=False, **kwargs):
|
||
|
# This is the "main" writer that will aggregate the results created by
|
||
|
# the sub-tasks
|
||
|
SegmentWriter.__init__(self, ix, **kwargs)
|
||
|
|
||
|
self.procs = procs or cpu_count()
|
||
|
# The maximum number of documents in each job file submitted to the
|
||
|
# sub-tasks
|
||
|
self.batchsize = batchsize
|
||
|
# You can use keyword arguments or the "subargs" argument to pass
|
||
|
# keyword arguments to the sub-writers
|
||
|
self.subargs = subargs if subargs else kwargs
|
||
|
# If multisegment is True, don't merge the segments created by the
|
||
|
# sub-writers, just add them directly to the TOC
|
||
|
self.multisegment = multisegment
|
||
|
|
||
|
# A list to hold the sub-task Process objects
|
||
|
self.tasks = []
|
||
|
# A queue to pass the filenames of job files to the sub-tasks
|
||
|
self.jobqueue = Queue(self.procs * 4)
|
||
|
# A queue to get back the final results of the sub-tasks
|
||
|
self.resultqueue = Queue()
|
||
|
# A buffer for documents before they are flushed to a job file
|
||
|
self.docbuffer = []
|
||
|
|
||
|
self._grouping = 0
|
||
|
self._added_sub = False
|
||
|
|
||
|
def _new_task(self):
|
||
|
task = SubWriterTask(self.storage, self.indexname,
|
||
|
self.jobqueue, self.resultqueue, self.subargs,
|
||
|
self.multisegment)
|
||
|
self.tasks.append(task)
|
||
|
task.start()
|
||
|
return task
|
||
|
|
||
|
def _enqueue(self):
|
||
|
# Flush the documents stored in self.docbuffer to a file and put the
|
||
|
# filename on the job queue
|
||
|
docbuffer = self.docbuffer
|
||
|
dump = pickle.dump
|
||
|
length = len(docbuffer)
|
||
|
|
||
|
filename = "%s.doclist" % random_name()
|
||
|
with self.temp_storage().create_file(filename).raw_file() as f:
|
||
|
for item in docbuffer:
|
||
|
dump(item, f, 2)
|
||
|
|
||
|
if len(self.tasks) < self.procs:
|
||
|
self._new_task()
|
||
|
jobinfo = (filename, length)
|
||
|
self.jobqueue.put(jobinfo)
|
||
|
self.docbuffer = []
|
||
|
|
||
|
def cancel(self):
|
||
|
try:
|
||
|
for task in self.tasks:
|
||
|
task.cancel()
|
||
|
finally:
|
||
|
SegmentWriter.cancel(self)
|
||
|
|
||
|
def start_group(self):
|
||
|
self._grouping += 1
|
||
|
|
||
|
def end_group(self):
|
||
|
if not self._grouping:
|
||
|
raise Exception("Unbalanced end_group")
|
||
|
self._grouping -= 1
|
||
|
|
||
|
def add_document(self, **fields):
|
||
|
# Add the document to the docbuffer
|
||
|
self.docbuffer.append((0, fields))
|
||
|
# If the buffer is full, flush it to the job queue
|
||
|
if not self._grouping and len(self.docbuffer) >= self.batchsize:
|
||
|
self._enqueue()
|
||
|
self._added_sub = True
|
||
|
|
||
|
def _read_and_renumber_run(self, path, offset):
|
||
|
# Note that SortingPool._read_run() automatically deletes the run file
|
||
|
# when it's finished
|
||
|
|
||
|
gen = self.pool._read_run(path)
|
||
|
# If offset is 0, just return the items unchanged
|
||
|
if not offset:
|
||
|
return gen
|
||
|
else:
|
||
|
# Otherwise, add the offset to each docnum
|
||
|
return ((fname, text, docnum + offset, weight, value)
|
||
|
for fname, text, docnum, weight, value in gen)
|
||
|
|
||
|
def commit(self, mergetype=None, optimize=None, merge=None):
|
||
|
if self._added_sub:
|
||
|
# If documents have been added to sub-writers, use the parallel
|
||
|
# merge commit code
|
||
|
self._commit(mergetype, optimize, merge)
|
||
|
else:
|
||
|
# Otherwise, just do a regular-old commit
|
||
|
SegmentWriter.commit(self, mergetype=mergetype, optimize=optimize,
|
||
|
merge=merge)
|
||
|
|
||
|
def _commit(self, mergetype, optimize, merge):
|
||
|
# Index the remaining documents in the doc buffer
|
||
|
if self.docbuffer:
|
||
|
self._enqueue()
|
||
|
# Tell the tasks to finish
|
||
|
for task in self.tasks:
|
||
|
self.jobqueue.put(None)
|
||
|
|
||
|
# Merge existing segments
|
||
|
finalsegments = self._merge_segments(mergetype, optimize, merge)
|
||
|
|
||
|
# Wait for the subtasks to finish
|
||
|
for task in self.tasks:
|
||
|
task.join()
|
||
|
|
||
|
# Pull a (run_file_name, fieldnames, segment) tuple off the result
|
||
|
# queue for each sub-task, representing the final results of the task
|
||
|
results = []
|
||
|
for _ in self.tasks:
|
||
|
try:
|
||
|
results.append(self.resultqueue.get(timeout=1))
|
||
|
except queue.Empty:
|
||
|
pass
|
||
|
|
||
|
if self.multisegment:
|
||
|
# If we're not merging the segments, we don't care about the runname
|
||
|
# and fieldnames in the results... just pull out the segments and
|
||
|
# add them to the list of final segments
|
||
|
finalsegments += [s for _, _, s in results]
|
||
|
if self._added:
|
||
|
finalsegments.append(self._finalize_segment())
|
||
|
else:
|
||
|
self._close_segment()
|
||
|
assert self.perdocwriter.is_closed
|
||
|
else:
|
||
|
# Merge the posting sources from the sub-writers and my
|
||
|
# postings into this writer
|
||
|
self._merge_subsegments(results, mergetype)
|
||
|
self._close_segment()
|
||
|
self._assemble_segment()
|
||
|
finalsegments.append(self.get_segment())
|
||
|
assert self.perdocwriter.is_closed
|
||
|
|
||
|
self._commit_toc(finalsegments)
|
||
|
self._finish()
|
||
|
|
||
|
def _merge_subsegments(self, results, mergetype):
|
||
|
schema = self.schema
|
||
|
schemanames = set(schema.names())
|
||
|
storage = self.storage
|
||
|
codec = self.codec
|
||
|
sources = []
|
||
|
|
||
|
# If information was added to this writer the conventional (e.g.
|
||
|
# through add_reader or merging segments), add it as an extra source
|
||
|
if self._added:
|
||
|
sources.append(self.pool.iter_postings())
|
||
|
|
||
|
pdrs = []
|
||
|
for runname, fieldnames, segment in results:
|
||
|
fieldnames = set(fieldnames) | schemanames
|
||
|
pdr = codec.per_document_reader(storage, segment)
|
||
|
pdrs.append(pdr)
|
||
|
basedoc = self.docnum
|
||
|
docmap = self.write_per_doc(fieldnames, pdr)
|
||
|
assert docmap is None
|
||
|
|
||
|
items = self._read_and_renumber_run(runname, basedoc)
|
||
|
sources.append(items)
|
||
|
|
||
|
# Create a MultiLengths object combining the length files from the
|
||
|
# subtask segments
|
||
|
self.perdocwriter.close()
|
||
|
pdrs.insert(0, self.per_document_reader())
|
||
|
mpdr = base.MultiPerDocumentReader(pdrs)
|
||
|
|
||
|
try:
|
||
|
# Merge the iterators into the field writer
|
||
|
self.fieldwriter.add_postings(schema, mpdr, imerge(sources))
|
||
|
finally:
|
||
|
mpdr.close()
|
||
|
self._added = True
|
||
|
|
||
|
|
||
|
class SerialMpWriter(MpWriter):
|
||
|
# A non-parallel version of the MpWriter for testing purposes
|
||
|
|
||
|
def __init__(self, ix, procs=None, batchsize=100, subargs=None, **kwargs):
|
||
|
SegmentWriter.__init__(self, ix, **kwargs)
|
||
|
|
||
|
self.procs = procs or cpu_count()
|
||
|
self.batchsize = batchsize
|
||
|
self.subargs = subargs if subargs else kwargs
|
||
|
self.tasks = [SegmentWriter(ix, _lk=False, **self.subargs)
|
||
|
for _ in xrange(self.procs)]
|
||
|
self.pointer = 0
|
||
|
self._added_sub = False
|
||
|
|
||
|
def add_document(self, **fields):
|
||
|
self.tasks[self.pointer].add_document(**fields)
|
||
|
self.pointer = (self.pointer + 1) % len(self.tasks)
|
||
|
self._added_sub = True
|
||
|
|
||
|
def _commit(self, mergetype, optimize, merge):
|
||
|
# Pull a (run_file_name, segment) tuple off the result queue for each
|
||
|
# sub-task, representing the final results of the task
|
||
|
|
||
|
# Merge existing segments
|
||
|
finalsegments = self._merge_segments(mergetype, optimize, merge)
|
||
|
results = []
|
||
|
for writer in self.tasks:
|
||
|
results.append(finish_subsegment(writer))
|
||
|
|
||
|
self._merge_subsegments(results, mergetype)
|
||
|
self._close_segment()
|
||
|
self._assemble_segment()
|
||
|
finalsegments.append(self.get_segment())
|
||
|
|
||
|
self._commit_toc(finalsegments)
|
||
|
self._finish()
|
||
|
|
||
|
|
||
|
# For compatibility with old multiproc module
|
||
|
class MultiSegmentWriter(MpWriter):
|
||
|
def __init__(self, *args, **kwargs):
|
||
|
MpWriter.__init__(self, *args, **kwargs)
|
||
|
self.multisegment = True
|