From c69b6b979e3d6bd01ec40e75b92b21f7a391f0ca Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sat, 5 Jun 2010 15:56:14 +0200 Subject: Added basic channel implementation including test restructured odb tests, they are now in an own module to keep the modules small --- lib/git/odb/channel.py | 106 +++++++++++++++++++++++++++++++++++++++++++++++++ lib/git/odb/db.py | 11 +++-- 2 files changed, 111 insertions(+), 6 deletions(-) create mode 100644 lib/git/odb/channel.py (limited to 'lib/git/odb') diff --git a/lib/git/odb/channel.py b/lib/git/odb/channel.py new file mode 100644 index 00000000..f6469d42 --- /dev/null +++ b/lib/git/odb/channel.py @@ -0,0 +1,106 @@ +"""Contains a queue based channel implementation""" +from Queue import ( + Queue, + Empty, + Full + ) + +#{ Classes +class Channel(object): + """A channel is similar to a system pipe. It has a write end as well as one or + more read ends. If Data is in the channel, it can be read, if not the read operation + will block until data becomes available. + If the channel is closed, any read operation will result in an exception + + This base class is not instantiated directly, but instead serves as constructor + for RWChannel pairs. + + Create a new channel """ + __slots__ = tuple() + def __new__(cls, *args): + if cls is Channel: + max_items = 0 + if len(args) == 1: + max_items = args[0] + if len(args) > 1: + raise ValueError("Specify not more than the number of items the channel should take") + wc = WChannel(max_items) + rc = RChannel(wc) + return wc, rc + # END constructor mode + return object.__new__(cls) + +class WChannel(Channel): + """The write end of a channel""" + __slots__ = ('_closed', '_queue') + + def __init__(self, max_items=0): + """initialize this instance, able to hold max_items at once + Write calls will block if the channel is full, until someone reads from it""" + self._closed = False + self._queue = Queue(max_items) + + + #{ Interface + def write(self, item, block=True, timeout=None): + """Send an item into the channel, it can be read from the read end of the + channel accordingly + :param item: Item to send + :param block: If True, the call will block until there is free space in the + channel + :param timeout: timeout in seconds for blocking calls. + :raise IOError: when writing into closed file or when writing into a non-blocking + full channel + :note: may block if the channel has a limited capacity""" + if self._closed: + raise IOError("Cannot write to a closed channel") + + try: + self._queue.put(item, block, timeout) + except Full: + raise IOError("Capacity of the channel was exeeded") + # END exception handling + + def close(self): + """Close the channel. Multiple close calls on a closed channel are no + an error""" + self._closed = True + + @property + def closed(self): + """:return: True if the channel was closed""" + return self._closed + #} END interface + + +class RChannel(Channel): + """The read-end of a corresponding write channel""" + __slots__ = '_wc' + + def __init__(self, wchannel): + """Initialize this instance from its parent write channel""" + self._wc = wchannel + + + #{ Interface + + def read(self, block=True, timeout=None): + """:return: an item read from the channel + :param block: if True, the call will block until an item is available + :param timeout: if positive and block is True, it will block only for the + given amount of seconds. + :raise IOError: When reading from an empty channel ( if non-blocking, or + if the channel is still empty after the timeout""" + # if the channel is closed for writing, we never block + if self._wc.closed: + block = False + + try: + return self._wc._queue.get(block, timeout) + except Empty: + raise IOError("Error reading from an empty channel") + # END handle reading + + #} END interface + +#} END classes diff --git a/lib/git/odb/db.py b/lib/git/odb/db.py index d970b0bf..5d3cc6a3 100644 --- a/lib/git/odb/db.py +++ b/lib/git/odb/db.py @@ -142,11 +142,10 @@ class FileDBBase(object): """Initialize this instance to look for its files at the given root path All subsequent operations will be relative to this path :raise InvalidDBRoot: - :note: The base will perform basic checking for accessability, but the subclass - is required to verify that the root_path contains the database structure it needs""" + :note: The base will not perform any accessablity checking as the base + might not yet be accessible, but become accessible before the first + access.""" super(FileDBBase, self).__init__() - if not os.path.isdir(root_path): - raise InvalidDBRoot(root_path) self._root_path = root_path @@ -333,10 +332,10 @@ class GitObjectDB(LooseObjectDB): def info(self, sha): t = self._git.get_object_header(sha) - return OInfo(t[0], t[1], t[2]) + return OInfo(*t) def stream(self, sha): """For now, all lookup is done by git itself""" t = self._git.stream_object_data(sha) - return OStream(t[0], t[1], t[2], t[3]) + return OStream(*t) -- cgit v1.2.3 From 65c9fe0baa579173afa5a2d463ac198d06ef4993 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sat, 5 Jun 2010 16:07:19 +0200 Subject: A code donation: Donating a worker thread implementation inclduding tests to Git-Python. I have the feeling it can do much good here :) --- lib/git/odb/channel.py | 1 + lib/git/odb/thread.py | 203 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 204 insertions(+) create mode 100644 lib/git/odb/thread.py (limited to 'lib/git/odb') diff --git a/lib/git/odb/channel.py b/lib/git/odb/channel.py index f6469d42..32eef6e1 100644 --- a/lib/git/odb/channel.py +++ b/lib/git/odb/channel.py @@ -30,6 +30,7 @@ class Channel(object): # END constructor mode return object.__new__(cls) + class WChannel(Channel): """The write end of a channel""" __slots__ = ('_closed', '_queue') diff --git a/lib/git/odb/thread.py b/lib/git/odb/thread.py new file mode 100644 index 00000000..3938666a --- /dev/null +++ b/lib/git/odb/thread.py @@ -0,0 +1,203 @@ +# -*- coding: utf-8 -*- +"""Module with threading utilities""" +__docformat__ = "restructuredtext" +import threading +import inspect +import Queue + +#{ Decorators + +def do_terminate_threads(whitelist=list()): + """Simple function which terminates all of our threads + :param whitelist: If whitelist is given, only the given threads will be terminated""" + for t in threading.enumerate(): + if not isinstance(t, TerminatableThread): + continue + if whitelist and t not in whitelist: + continue + if isinstance(t, WorkerThread): + t.inq.put(t.quit) + # END worker special handling + t.stop_and_join() + # END for each thread + +def terminate_threads( func ): + """Kills all worker threads the method has created by sending the quit signal. + This takes over in case of an error in the main function""" + def wrapper(*args, **kwargs): + cur_threads = set(threading.enumerate()) + try: + return func(*args, **kwargs) + finally: + do_terminate_threads(set(threading.enumerate()) - cur_threads) + # END finally shutdown threads + # END wrapper + wrapper.__name__ = func.__name__ + return wrapper + +#} END decorators + +#{ Classes + +class TerminatableThread(threading.Thread): + """A simple thread able to terminate itself on behalf of the user. + + Terminate a thread as follows: + + t.stop_and_join() + + Derived classes call _should_terminate() to determine whether they should + abort gracefully + """ + __slots__ = '_terminate' + + def __init__(self): + super(TerminatableThread, self).__init__() + self._terminate = False + + + #{ Subclass Interface + def _should_terminate(self): + """:return: True if this thread should terminate its operation immediately""" + return self._terminate + + def _terminated(self): + """Called once the thread terminated. Its called in the main thread + and may perform cleanup operations""" + pass + + def start(self): + """Start the thread and return self""" + super(TerminatableThread, self).start() + return self + + #} END subclass interface + + #{ Interface + + def stop_and_join(self): + """Ask the thread to stop its operation and wait for it to terminate + :note: Depending on the implenetation, this might block a moment""" + self._terminate = True + self.join() + self._terminated() + #} END interface + + +class WorkerThread(TerminatableThread): + """ + This base allows to call functions on class instances natively and retrieve + their results asynchronously using a queue. + The thread runs forever unless it receives the terminate signal using + its task queue. + + Tasks could be anything, but should usually be class methods and arguments to + allow the following: + + inq = Queue() + outq = Queue() + w = WorkerThread(inq, outq) + w.start() + inq.put((WorkerThread., args, kwargs)) + res = outq.get() + + finally we call quit to terminate asap. + + alternatively, you can make a call more intuitively - the output is the output queue + allowing you to get the result right away or later + w.call(arg, kwarg='value').get() + + inq.put(WorkerThread.quit) + w.join() + + You may provide the following tuples as task: + t[0] = class method, function or instance method + t[1] = optional, tuple or list of arguments to pass to the routine + t[2] = optional, dictionary of keyword arguments to pass to the routine + """ + __slots__ = ('inq', 'outq') + + class InvalidRoutineError(Exception): + """Class sent as return value in case of an error""" + + def __init__(self, inq = None, outq = None): + super(WorkerThread, self).__init__() + self.inq = inq or Queue.Queue() + self.outq = outq or Queue.Queue() + + def call(self, function, *args, **kwargs): + """Method that makes the call to the worker using the input queue, + returning our output queue + + :param funciton: can be a standalone function unrelated to this class, + a class method of this class or any instance method. + If it is a string, it will be considered a function residing on this instance + :param args: arguments to pass to function + :parma **kwargs: kwargs to pass to function""" + self.inq.put((function, args, kwargs)) + return self.outq + + def wait_until_idle(self): + """wait until the input queue is empty, in the meanwhile, take all + results off the output queue.""" + while not self.inq.empty(): + try: + self.outq.get(False) + except Queue.Empty: + continue + # END while there are tasks on the queue + + def run(self): + """Process input tasks until we receive the quit signal""" + while True: + if self._should_terminate(): + break + # END check for stop request + routine = self.__class__.quit + args = tuple() + kwargs = dict() + tasktuple = self.inq.get() + + if isinstance(tasktuple, (tuple, list)): + if len(tasktuple) == 3: + routine, args, kwargs = tasktuple + elif len(tasktuple) == 2: + routine, args = tasktuple + elif len(tasktuple) == 1: + routine = tasktuple[0] + # END tasktuple length check + elif inspect.isroutine(tasktuple): + routine = tasktuple + # END tasktuple handling + + try: + rval = None + if inspect.ismethod(routine): + if routine.im_self is None: + rval = routine(self, *args, **kwargs) + else: + rval = routine(*args, **kwargs) + elif inspect.isroutine(routine): + rval = routine(*args, **kwargs) + elif isinstance(routine, basestring) and hasattr(self, routine): + rval = getattr(self, routine)(*args, **kwargs) + else: + # ignore unknown items + print "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple)) + self.outq.put(self.InvalidRoutineError(routine)) + break + # END make routine call + self.outq.put(rval) + except StopIteration: + break + except Exception,e: + print "%s: Task %s raised unhandled exception: %s" % (self.getName(), str(tasktuple), str(e)) + self.outq.put(e) + # END routine exception handling + # END endless loop + + def quit(self): + raise StopIteration + + +#} END classes -- cgit v1.2.3 From 50e469109eed3a752d9a1b0297f16466ad92f8d2 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sat, 5 Jun 2010 18:26:26 +0200 Subject: Initial pool design added, allowing for lazy channel based evaluation of inter-dependent tasks --- lib/git/odb/channel.py | 1 + lib/git/odb/pool.py | 104 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 105 insertions(+) create mode 100644 lib/git/odb/pool.py (limited to 'lib/git/odb') diff --git a/lib/git/odb/channel.py b/lib/git/odb/channel.py index 32eef6e1..c9cbfb87 100644 --- a/lib/git/odb/channel.py +++ b/lib/git/odb/channel.py @@ -17,6 +17,7 @@ class Channel(object): Create a new channel """ __slots__ = tuple() + def __new__(cls, *args): if cls is Channel: max_items = 0 diff --git a/lib/git/odb/pool.py b/lib/git/odb/pool.py new file mode 100644 index 00000000..5c3a7ead --- /dev/null +++ b/lib/git/odb/pool.py @@ -0,0 +1,104 @@ +"""Implementation of a thread-pool working with channels""" +from thread import TerminatableThread +from channel import ( + Channel, + WChannel, + RChannel + ) + +class Node(object): + """A quick and dirty to the point implementation of a simple, and slow ascyclic graph. + Its not designed to support big graphs, and sports only the functionality + we need""" + __slots__('in_nodes', 'out_nodes') + + +class Graph(object): + """A simple graph implementation, keeping nodes and providing basic access and + editing functions""" + __slots__ = "nodes" + + def add_node(self, node): + pass + + def del_node(self, node): + pass + + def visit_input_depth_first(self, node, visitor=lambda n: True ): + """Visit all input nodes of the given node, depth first, calling visitor + for each node on our way. If the function returns False, the traversal + will not go any deeper, but continue at the next branch""" + pass + + +class TaskNode(Node): + """Couples an input channel, an output channel, as well as a processing function + together. + It may contain additional information on how to handel read-errors from the + input channel""" + __slots__ = ('in_rc', 'out_wc', 'fun') + + def is_done(self): + """:return: True if we are finished processing""" + return self.out_wc.closed + + +class PoolChannel(Channel): + """Base class for read and write channels which trigger the pool to evaluate + its tasks, causing the evaluation of the task list effectively assure a read + from actual output channel will not block forever due to task dependencies. + """ + __slots__ = tuple() + + +class RPoolChannel(PoolChannel): + """ A read-only pool channel may not be wrapped or derived from, but it provides slots to call + before and after an item is to be read""" + __slots__ = ('_task', '_pool', '_pre_cb', '_post_cb') + + def set_post_cb(self, fun = lambda item: item): + """Install a callback to call after the item has been read. The function + returns a possibly changed item. If it raises, the exception will be propagated + in an IOError, indicating read-failure + If a function is not provided, the call is effectively uninstalled.""" + + def set_pre_cb(self, fun = lambda : None): + """Install a callback to call before an item is read from the channel. + If it fails, the read will fail with an IOError + If a function is not provided, the call is effectively uninstalled.""" + + +class PoolWorker(WorkerThread): + """A worker thread which gets called to deal with Tasks. Tasks provide channls + with actual work, whose result will be send to the tasks output channel""" + + @classmethod + def perform_task(cls, task): + pass + + +class ThreadPool(Graph): + """A thread pool maintains a set of one or more worker threads, but supports + a fully serial mode in which case the amount of threads is zero. + + Work is distributed via Channels, which form a dependency graph. The evaluation + is lazy, as work will only be done once an output is requested.""" + __slots__ = ( '_workers', # list of worker threads + '_queue', # master queue for tasks + '_ordered_tasks_cache' # tasks in order of evaluation, mapped by read channel + ) + + def del_node(self, task): + """Delete the node ( being a task ), but delete the entries in our output channel + cache as well""" + + + def set_pool_size(self, size=0): + """Set the amount of workers to use in this pool. + :param size: if 0, the pool will do all work itself in the calling thread, + otherwise the work will be distributed among the given amount of threads""" + + def add_task(self, task): + """Add a new task to be processed. + :return: your task instance with its output channel set. It can be used + to retrieve processed items""" -- cgit v1.2.3 From 61138f2ece0cb864b933698174315c34a78835d1 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sat, 5 Jun 2010 19:59:17 +0200 Subject: Moved multiprocessing modules into own package, as they in fact have nothing to do with the object db. If that really works the way I want, it will become an own project, called async --- lib/git/odb/channel.py | 108 -------------------------- lib/git/odb/pool.py | 104 ------------------------- lib/git/odb/thread.py | 203 ------------------------------------------------- 3 files changed, 415 deletions(-) delete mode 100644 lib/git/odb/channel.py delete mode 100644 lib/git/odb/pool.py delete mode 100644 lib/git/odb/thread.py (limited to 'lib/git/odb') diff --git a/lib/git/odb/channel.py b/lib/git/odb/channel.py deleted file mode 100644 index c9cbfb87..00000000 --- a/lib/git/odb/channel.py +++ /dev/null @@ -1,108 +0,0 @@ -"""Contains a queue based channel implementation""" -from Queue import ( - Queue, - Empty, - Full - ) - -#{ Classes -class Channel(object): - """A channel is similar to a system pipe. It has a write end as well as one or - more read ends. If Data is in the channel, it can be read, if not the read operation - will block until data becomes available. - If the channel is closed, any read operation will result in an exception - - This base class is not instantiated directly, but instead serves as constructor - for RWChannel pairs. - - Create a new channel """ - __slots__ = tuple() - - def __new__(cls, *args): - if cls is Channel: - max_items = 0 - if len(args) == 1: - max_items = args[0] - if len(args) > 1: - raise ValueError("Specify not more than the number of items the channel should take") - wc = WChannel(max_items) - rc = RChannel(wc) - return wc, rc - # END constructor mode - return object.__new__(cls) - - -class WChannel(Channel): - """The write end of a channel""" - __slots__ = ('_closed', '_queue') - - def __init__(self, max_items=0): - """initialize this instance, able to hold max_items at once - Write calls will block if the channel is full, until someone reads from it""" - self._closed = False - self._queue = Queue(max_items) - - - #{ Interface - def write(self, item, block=True, timeout=None): - """Send an item into the channel, it can be read from the read end of the - channel accordingly - :param item: Item to send - :param block: If True, the call will block until there is free space in the - channel - :param timeout: timeout in seconds for blocking calls. - :raise IOError: when writing into closed file or when writing into a non-blocking - full channel - :note: may block if the channel has a limited capacity""" - if self._closed: - raise IOError("Cannot write to a closed channel") - - try: - self._queue.put(item, block, timeout) - except Full: - raise IOError("Capacity of the channel was exeeded") - # END exception handling - - def close(self): - """Close the channel. Multiple close calls on a closed channel are no - an error""" - self._closed = True - - @property - def closed(self): - """:return: True if the channel was closed""" - return self._closed - #} END interface - - -class RChannel(Channel): - """The read-end of a corresponding write channel""" - __slots__ = '_wc' - - def __init__(self, wchannel): - """Initialize this instance from its parent write channel""" - self._wc = wchannel - - - #{ Interface - - def read(self, block=True, timeout=None): - """:return: an item read from the channel - :param block: if True, the call will block until an item is available - :param timeout: if positive and block is True, it will block only for the - given amount of seconds. - :raise IOError: When reading from an empty channel ( if non-blocking, or - if the channel is still empty after the timeout""" - # if the channel is closed for writing, we never block - if self._wc.closed: - block = False - - try: - return self._wc._queue.get(block, timeout) - except Empty: - raise IOError("Error reading from an empty channel") - # END handle reading - - #} END interface - -#} END classes diff --git a/lib/git/odb/pool.py b/lib/git/odb/pool.py deleted file mode 100644 index 5c3a7ead..00000000 --- a/lib/git/odb/pool.py +++ /dev/null @@ -1,104 +0,0 @@ -"""Implementation of a thread-pool working with channels""" -from thread import TerminatableThread -from channel import ( - Channel, - WChannel, - RChannel - ) - -class Node(object): - """A quick and dirty to the point implementation of a simple, and slow ascyclic graph. - Its not designed to support big graphs, and sports only the functionality - we need""" - __slots__('in_nodes', 'out_nodes') - - -class Graph(object): - """A simple graph implementation, keeping nodes and providing basic access and - editing functions""" - __slots__ = "nodes" - - def add_node(self, node): - pass - - def del_node(self, node): - pass - - def visit_input_depth_first(self, node, visitor=lambda n: True ): - """Visit all input nodes of the given node, depth first, calling visitor - for each node on our way. If the function returns False, the traversal - will not go any deeper, but continue at the next branch""" - pass - - -class TaskNode(Node): - """Couples an input channel, an output channel, as well as a processing function - together. - It may contain additional information on how to handel read-errors from the - input channel""" - __slots__ = ('in_rc', 'out_wc', 'fun') - - def is_done(self): - """:return: True if we are finished processing""" - return self.out_wc.closed - - -class PoolChannel(Channel): - """Base class for read and write channels which trigger the pool to evaluate - its tasks, causing the evaluation of the task list effectively assure a read - from actual output channel will not block forever due to task dependencies. - """ - __slots__ = tuple() - - -class RPoolChannel(PoolChannel): - """ A read-only pool channel may not be wrapped or derived from, but it provides slots to call - before and after an item is to be read""" - __slots__ = ('_task', '_pool', '_pre_cb', '_post_cb') - - def set_post_cb(self, fun = lambda item: item): - """Install a callback to call after the item has been read. The function - returns a possibly changed item. If it raises, the exception will be propagated - in an IOError, indicating read-failure - If a function is not provided, the call is effectively uninstalled.""" - - def set_pre_cb(self, fun = lambda : None): - """Install a callback to call before an item is read from the channel. - If it fails, the read will fail with an IOError - If a function is not provided, the call is effectively uninstalled.""" - - -class PoolWorker(WorkerThread): - """A worker thread which gets called to deal with Tasks. Tasks provide channls - with actual work, whose result will be send to the tasks output channel""" - - @classmethod - def perform_task(cls, task): - pass - - -class ThreadPool(Graph): - """A thread pool maintains a set of one or more worker threads, but supports - a fully serial mode in which case the amount of threads is zero. - - Work is distributed via Channels, which form a dependency graph. The evaluation - is lazy, as work will only be done once an output is requested.""" - __slots__ = ( '_workers', # list of worker threads - '_queue', # master queue for tasks - '_ordered_tasks_cache' # tasks in order of evaluation, mapped by read channel - ) - - def del_node(self, task): - """Delete the node ( being a task ), but delete the entries in our output channel - cache as well""" - - - def set_pool_size(self, size=0): - """Set the amount of workers to use in this pool. - :param size: if 0, the pool will do all work itself in the calling thread, - otherwise the work will be distributed among the given amount of threads""" - - def add_task(self, task): - """Add a new task to be processed. - :return: your task instance with its output channel set. It can be used - to retrieve processed items""" diff --git a/lib/git/odb/thread.py b/lib/git/odb/thread.py deleted file mode 100644 index 3938666a..00000000 --- a/lib/git/odb/thread.py +++ /dev/null @@ -1,203 +0,0 @@ -# -*- coding: utf-8 -*- -"""Module with threading utilities""" -__docformat__ = "restructuredtext" -import threading -import inspect -import Queue - -#{ Decorators - -def do_terminate_threads(whitelist=list()): - """Simple function which terminates all of our threads - :param whitelist: If whitelist is given, only the given threads will be terminated""" - for t in threading.enumerate(): - if not isinstance(t, TerminatableThread): - continue - if whitelist and t not in whitelist: - continue - if isinstance(t, WorkerThread): - t.inq.put(t.quit) - # END worker special handling - t.stop_and_join() - # END for each thread - -def terminate_threads( func ): - """Kills all worker threads the method has created by sending the quit signal. - This takes over in case of an error in the main function""" - def wrapper(*args, **kwargs): - cur_threads = set(threading.enumerate()) - try: - return func(*args, **kwargs) - finally: - do_terminate_threads(set(threading.enumerate()) - cur_threads) - # END finally shutdown threads - # END wrapper - wrapper.__name__ = func.__name__ - return wrapper - -#} END decorators - -#{ Classes - -class TerminatableThread(threading.Thread): - """A simple thread able to terminate itself on behalf of the user. - - Terminate a thread as follows: - - t.stop_and_join() - - Derived classes call _should_terminate() to determine whether they should - abort gracefully - """ - __slots__ = '_terminate' - - def __init__(self): - super(TerminatableThread, self).__init__() - self._terminate = False - - - #{ Subclass Interface - def _should_terminate(self): - """:return: True if this thread should terminate its operation immediately""" - return self._terminate - - def _terminated(self): - """Called once the thread terminated. Its called in the main thread - and may perform cleanup operations""" - pass - - def start(self): - """Start the thread and return self""" - super(TerminatableThread, self).start() - return self - - #} END subclass interface - - #{ Interface - - def stop_and_join(self): - """Ask the thread to stop its operation and wait for it to terminate - :note: Depending on the implenetation, this might block a moment""" - self._terminate = True - self.join() - self._terminated() - #} END interface - - -class WorkerThread(TerminatableThread): - """ - This base allows to call functions on class instances natively and retrieve - their results asynchronously using a queue. - The thread runs forever unless it receives the terminate signal using - its task queue. - - Tasks could be anything, but should usually be class methods and arguments to - allow the following: - - inq = Queue() - outq = Queue() - w = WorkerThread(inq, outq) - w.start() - inq.put((WorkerThread., args, kwargs)) - res = outq.get() - - finally we call quit to terminate asap. - - alternatively, you can make a call more intuitively - the output is the output queue - allowing you to get the result right away or later - w.call(arg, kwarg='value').get() - - inq.put(WorkerThread.quit) - w.join() - - You may provide the following tuples as task: - t[0] = class method, function or instance method - t[1] = optional, tuple or list of arguments to pass to the routine - t[2] = optional, dictionary of keyword arguments to pass to the routine - """ - __slots__ = ('inq', 'outq') - - class InvalidRoutineError(Exception): - """Class sent as return value in case of an error""" - - def __init__(self, inq = None, outq = None): - super(WorkerThread, self).__init__() - self.inq = inq or Queue.Queue() - self.outq = outq or Queue.Queue() - - def call(self, function, *args, **kwargs): - """Method that makes the call to the worker using the input queue, - returning our output queue - - :param funciton: can be a standalone function unrelated to this class, - a class method of this class or any instance method. - If it is a string, it will be considered a function residing on this instance - :param args: arguments to pass to function - :parma **kwargs: kwargs to pass to function""" - self.inq.put((function, args, kwargs)) - return self.outq - - def wait_until_idle(self): - """wait until the input queue is empty, in the meanwhile, take all - results off the output queue.""" - while not self.inq.empty(): - try: - self.outq.get(False) - except Queue.Empty: - continue - # END while there are tasks on the queue - - def run(self): - """Process input tasks until we receive the quit signal""" - while True: - if self._should_terminate(): - break - # END check for stop request - routine = self.__class__.quit - args = tuple() - kwargs = dict() - tasktuple = self.inq.get() - - if isinstance(tasktuple, (tuple, list)): - if len(tasktuple) == 3: - routine, args, kwargs = tasktuple - elif len(tasktuple) == 2: - routine, args = tasktuple - elif len(tasktuple) == 1: - routine = tasktuple[0] - # END tasktuple length check - elif inspect.isroutine(tasktuple): - routine = tasktuple - # END tasktuple handling - - try: - rval = None - if inspect.ismethod(routine): - if routine.im_self is None: - rval = routine(self, *args, **kwargs) - else: - rval = routine(*args, **kwargs) - elif inspect.isroutine(routine): - rval = routine(*args, **kwargs) - elif isinstance(routine, basestring) and hasattr(self, routine): - rval = getattr(self, routine)(*args, **kwargs) - else: - # ignore unknown items - print "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple)) - self.outq.put(self.InvalidRoutineError(routine)) - break - # END make routine call - self.outq.put(rval) - except StopIteration: - break - except Exception,e: - print "%s: Task %s raised unhandled exception: %s" % (self.getName(), str(tasktuple), str(e)) - self.outq.put(e) - # END routine exception handling - # END endless loop - - def quit(self): - raise StopIteration - - -#} END classes -- cgit v1.2.3