From ab59f78341f1dd188aaf4c30526f6295c63438b1 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sat, 5 Jun 2010 20:03:09 +0200 Subject: Renamed mp to async, as this is a much better name for what is actually going on. The default implementation uses threads, which ends up being nothing more than async, as they are all locked down by internal and the global interpreter lock --- lib/git/async/__init__.py | 1 + lib/git/async/channel.py | 108 ++++++++++++++++++++++++ lib/git/async/pool.py | 116 ++++++++++++++++++++++++++ lib/git/async/thread.py | 203 ++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 428 insertions(+) create mode 100644 lib/git/async/__init__.py create mode 100644 lib/git/async/channel.py create mode 100644 lib/git/async/pool.py create mode 100644 lib/git/async/thread.py (limited to 'lib/git/async') diff --git a/lib/git/async/__init__.py b/lib/git/async/__init__.py new file mode 100644 index 00000000..89b9eb47 --- /dev/null +++ b/lib/git/async/__init__.py @@ -0,0 +1 @@ +"""Initialize the multi-processing package""" diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py new file mode 100644 index 00000000..c9cbfb87 --- /dev/null +++ b/lib/git/async/channel.py @@ -0,0 +1,108 @@ +"""Contains a queue based channel implementation""" +from Queue import ( + Queue, + Empty, + Full + ) + +#{ Classes +class Channel(object): + """A channel is similar to a system pipe. It has a write end as well as one or + more read ends. If Data is in the channel, it can be read, if not the read operation + will block until data becomes available. + If the channel is closed, any read operation will result in an exception + + This base class is not instantiated directly, but instead serves as constructor + for RWChannel pairs. + + Create a new channel """ + __slots__ = tuple() + + def __new__(cls, *args): + if cls is Channel: + max_items = 0 + if len(args) == 1: + max_items = args[0] + if len(args) > 1: + raise ValueError("Specify not more than the number of items the channel should take") + wc = WChannel(max_items) + rc = RChannel(wc) + return wc, rc + # END constructor mode + return object.__new__(cls) + + +class WChannel(Channel): + """The write end of a channel""" + __slots__ = ('_closed', '_queue') + + def __init__(self, max_items=0): + """initialize this instance, able to hold max_items at once + Write calls will block if the channel is full, until someone reads from it""" + self._closed = False + self._queue = Queue(max_items) + + + #{ Interface + def write(self, item, block=True, timeout=None): + """Send an item into the channel, it can be read from the read end of the + channel accordingly + :param item: Item to send + :param block: If True, the call will block until there is free space in the + channel + :param timeout: timeout in seconds for blocking calls. + :raise IOError: when writing into closed file or when writing into a non-blocking + full channel + :note: may block if the channel has a limited capacity""" + if self._closed: + raise IOError("Cannot write to a closed channel") + + try: + self._queue.put(item, block, timeout) + except Full: + raise IOError("Capacity of the channel was exeeded") + # END exception handling + + def close(self): + """Close the channel. Multiple close calls on a closed channel are no + an error""" + self._closed = True + + @property + def closed(self): + """:return: True if the channel was closed""" + return self._closed + #} END interface + + +class RChannel(Channel): + """The read-end of a corresponding write channel""" + __slots__ = '_wc' + + def __init__(self, wchannel): + """Initialize this instance from its parent write channel""" + self._wc = wchannel + + + #{ Interface + + def read(self, block=True, timeout=None): + """:return: an item read from the channel + :param block: if True, the call will block until an item is available + :param timeout: if positive and block is True, it will block only for the + given amount of seconds. + :raise IOError: When reading from an empty channel ( if non-blocking, or + if the channel is still empty after the timeout""" + # if the channel is closed for writing, we never block + if self._wc.closed: + block = False + + try: + return self._wc._queue.get(block, timeout) + except Empty: + raise IOError("Error reading from an empty channel") + # END handle reading + + #} END interface + +#} END classes diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py new file mode 100644 index 00000000..f9f7880b --- /dev/null +++ b/lib/git/async/pool.py @@ -0,0 +1,116 @@ +"""Implementation of a thread-pool working with channels""" +from thread import WorkerThread +from channel import ( + Channel, + WChannel, + RChannel + ) + +class Node(object): + """A quick and dirty to the point implementation of a simple, and slow ascyclic graph. + Its not designed to support big graphs, and sports only the functionality + we need""" + __slots__ = ('in_nodes', 'out_nodes') + + +class Graph(object): + """A simple graph implementation, keeping nodes and providing basic access and + editing functions""" + __slots__ = "nodes" + + def add_node(self, node): + pass + + def del_node(self, node): + pass + + def visit_input_depth_first(self, node, visitor=lambda n: True ): + """Visit all input nodes of the given node, depth first, calling visitor + for each node on our way. If the function returns False, the traversal + will not go any deeper, but continue at the next branch""" + pass + + +class TaskNode(Node): + """Couples an input channel, an output channel, as well as a processing function + together. + It may contain additional information on how to handel read-errors from the + input channel""" + __slots__ = ('in_rc', 'out_wc', 'fun') + + def is_done(self): + """:return: True if we are finished processing""" + return self.out_wc.closed + + +class RPoolChannel(RChannel): + """ A read-only pool channel may not be wrapped or derived from, but it provides slots to call + before and after an item is to be read. + + It acts like a handle to the underlying task""" + __slots__ = ('_task', '_pool', '_pre_cb', '_post_cb') + + def set_post_cb(self, fun = lambda item: item): + """Install a callback to call after the item has been read. The function + returns a possibly changed item. If it raises, the exception will be propagated + in an IOError, indicating read-failure + If a function is not provided, the call is effectively uninstalled.""" + + def set_pre_cb(self, fun = lambda : None): + """Install a callback to call before an item is read from the channel. + If it fails, the read will fail with an IOError + If a function is not provided, the call is effectively uninstalled.""" + + def read(block=False, timeout=None): + """Read an item that was processed by one of our threads + :note: Triggers task dependency handling needed to provide the necessary + input""" + + #{ Internal + def _read(self, block=False, timeout=None): + """Calls the underlying channel's read directly, without triggering + the pool""" + return RChannel.read(self, block, timeout) + + #} END internal + + +class PoolWorker(WorkerThread): + """A worker thread which gets called to deal with Tasks. Tasks provide channls + with actual work, whose result will be send to the tasks output channel""" + + @classmethod + def perform_task(cls, task): + # note : when getting the input channel, be sure not to trigger + # RPoolChannel + pass + + +class ThreadPool(Graph): + """A thread pool maintains a set of one or more worker threads, but supports + a fully serial mode in which case the amount of threads is zero. + + Work is distributed via Channels, which form a dependency graph. The evaluation + is lazy, as work will only be done once an output is requested. + + :note: the current implementation returns channels which are meant to be + used only from the main thread""" + __slots__ = ( '_workers', # list of worker threads + '_queue', # master queue for tasks + '_ordered_tasks_cache' # tasks in order of evaluation, mapped by read channel + ) + + def del_node(self, task): + """Delete the node ( being a task ), but delete the entries in our output channel + cache as well""" + + + def set_pool_size(self, size=0): + """Set the amount of workers to use in this pool. + :param size: if 0, the pool will do all work itself in the calling thread, + otherwise the work will be distributed among the given amount of threads""" + + def add_task(self, task): + """Add a new task to be processed. + :return: your task instance with its output channel set. It can be used + to retrieve processed items""" diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py new file mode 100644 index 00000000..3938666a --- /dev/null +++ b/lib/git/async/thread.py @@ -0,0 +1,203 @@ +# -*- coding: utf-8 -*- +"""Module with threading utilities""" +__docformat__ = "restructuredtext" +import threading +import inspect +import Queue + +#{ Decorators + +def do_terminate_threads(whitelist=list()): + """Simple function which terminates all of our threads + :param whitelist: If whitelist is given, only the given threads will be terminated""" + for t in threading.enumerate(): + if not isinstance(t, TerminatableThread): + continue + if whitelist and t not in whitelist: + continue + if isinstance(t, WorkerThread): + t.inq.put(t.quit) + # END worker special handling + t.stop_and_join() + # END for each thread + +def terminate_threads( func ): + """Kills all worker threads the method has created by sending the quit signal. + This takes over in case of an error in the main function""" + def wrapper(*args, **kwargs): + cur_threads = set(threading.enumerate()) + try: + return func(*args, **kwargs) + finally: + do_terminate_threads(set(threading.enumerate()) - cur_threads) + # END finally shutdown threads + # END wrapper + wrapper.__name__ = func.__name__ + return wrapper + +#} END decorators + +#{ Classes + +class TerminatableThread(threading.Thread): + """A simple thread able to terminate itself on behalf of the user. + + Terminate a thread as follows: + + t.stop_and_join() + + Derived classes call _should_terminate() to determine whether they should + abort gracefully + """ + __slots__ = '_terminate' + + def __init__(self): + super(TerminatableThread, self).__init__() + self._terminate = False + + + #{ Subclass Interface + def _should_terminate(self): + """:return: True if this thread should terminate its operation immediately""" + return self._terminate + + def _terminated(self): + """Called once the thread terminated. Its called in the main thread + and may perform cleanup operations""" + pass + + def start(self): + """Start the thread and return self""" + super(TerminatableThread, self).start() + return self + + #} END subclass interface + + #{ Interface + + def stop_and_join(self): + """Ask the thread to stop its operation and wait for it to terminate + :note: Depending on the implenetation, this might block a moment""" + self._terminate = True + self.join() + self._terminated() + #} END interface + + +class WorkerThread(TerminatableThread): + """ + This base allows to call functions on class instances natively and retrieve + their results asynchronously using a queue. + The thread runs forever unless it receives the terminate signal using + its task queue. + + Tasks could be anything, but should usually be class methods and arguments to + allow the following: + + inq = Queue() + outq = Queue() + w = WorkerThread(inq, outq) + w.start() + inq.put((WorkerThread., args, kwargs)) + res = outq.get() + + finally we call quit to terminate asap. + + alternatively, you can make a call more intuitively - the output is the output queue + allowing you to get the result right away or later + w.call(arg, kwarg='value').get() + + inq.put(WorkerThread.quit) + w.join() + + You may provide the following tuples as task: + t[0] = class method, function or instance method + t[1] = optional, tuple or list of arguments to pass to the routine + t[2] = optional, dictionary of keyword arguments to pass to the routine + """ + __slots__ = ('inq', 'outq') + + class InvalidRoutineError(Exception): + """Class sent as return value in case of an error""" + + def __init__(self, inq = None, outq = None): + super(WorkerThread, self).__init__() + self.inq = inq or Queue.Queue() + self.outq = outq or Queue.Queue() + + def call(self, function, *args, **kwargs): + """Method that makes the call to the worker using the input queue, + returning our output queue + + :param funciton: can be a standalone function unrelated to this class, + a class method of this class or any instance method. + If it is a string, it will be considered a function residing on this instance + :param args: arguments to pass to function + :parma **kwargs: kwargs to pass to function""" + self.inq.put((function, args, kwargs)) + return self.outq + + def wait_until_idle(self): + """wait until the input queue is empty, in the meanwhile, take all + results off the output queue.""" + while not self.inq.empty(): + try: + self.outq.get(False) + except Queue.Empty: + continue + # END while there are tasks on the queue + + def run(self): + """Process input tasks until we receive the quit signal""" + while True: + if self._should_terminate(): + break + # END check for stop request + routine = self.__class__.quit + args = tuple() + kwargs = dict() + tasktuple = self.inq.get() + + if isinstance(tasktuple, (tuple, list)): + if len(tasktuple) == 3: + routine, args, kwargs = tasktuple + elif len(tasktuple) == 2: + routine, args = tasktuple + elif len(tasktuple) == 1: + routine = tasktuple[0] + # END tasktuple length check + elif inspect.isroutine(tasktuple): + routine = tasktuple + # END tasktuple handling + + try: + rval = None + if inspect.ismethod(routine): + if routine.im_self is None: + rval = routine(self, *args, **kwargs) + else: + rval = routine(*args, **kwargs) + elif inspect.isroutine(routine): + rval = routine(*args, **kwargs) + elif isinstance(routine, basestring) and hasattr(self, routine): + rval = getattr(self, routine)(*args, **kwargs) + else: + # ignore unknown items + print "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple)) + self.outq.put(self.InvalidRoutineError(routine)) + break + # END make routine call + self.outq.put(rval) + except StopIteration: + break + except Exception,e: + print "%s: Task %s raised unhandled exception: %s" % (self.getName(), str(tasktuple), str(e)) + self.outq.put(e) + # END routine exception handling + # END endless loop + + def quit(self): + raise StopIteration + + +#} END classes -- cgit v1.2.3 From b72e2704022d889f116e49abf3e1e5d3e3192d3b Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sun, 6 Jun 2010 01:00:12 +0200 Subject: Improved pool design and started rough implementation, top down to learn while going. Tests will be written soon for verification, its still quite theoretical --- lib/git/async/channel.py | 49 ++++++--- lib/git/async/graph.py | 36 +++++++ lib/git/async/pool.py | 273 +++++++++++++++++++++++++++++++++++++---------- 3 files changed, 290 insertions(+), 68 deletions(-) create mode 100644 lib/git/async/graph.py (limited to 'lib/git/async') diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index c9cbfb87..70daed24 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -7,7 +7,7 @@ from Queue import ( #{ Classes class Channel(object): - """A channel is similar to a system pipe. It has a write end as well as one or + """A channel is similar to a file like object. It has a write end as well as one or more read ends. If Data is in the channel, it can be read, if not the read operation will block until data becomes available. If the channel is closed, any read operation will result in an exception @@ -51,8 +51,8 @@ class WChannel(Channel): :param block: If True, the call will block until there is free space in the channel :param timeout: timeout in seconds for blocking calls. - :raise IOError: when writing into closed file or when writing into a non-blocking - full channel + :raise IOError: when writing into closed file + :raise EOFError: when writing into a non-blocking full channel :note: may block if the channel has a limited capacity""" if self._closed: raise IOError("Cannot write to a closed channel") @@ -60,9 +60,14 @@ class WChannel(Channel): try: self._queue.put(item, block, timeout) except Full: - raise IOError("Capacity of the channel was exeeded") + raise EOFError("Capacity of the channel was exeeded") # END exception handling + def size(self): + """:return: approximate number of items that could be read from the read-ends + of this channel""" + return self._queue.qsize() + def close(self): """Close the channel. Multiple close calls on a closed channel are no an error""" @@ -86,22 +91,42 @@ class RChannel(Channel): #{ Interface - def read(self, block=True, timeout=None): - """:return: an item read from the channel + def read(self, count=0, block=True, timeout=None): + """read a list of items read from the channel. The list, as a sequence + of items, is similar to the string of characters returned when reading from + file like objects. + :param count: given amount of items to read. If < 1, all items will be read :param block: if True, the call will block until an item is available :param timeout: if positive and block is True, it will block only for the given amount of seconds. - :raise IOError: When reading from an empty channel ( if non-blocking, or - if the channel is still empty after the timeout""" + :return: single item in a list if count is 1, or a list of count items. + If the channel was empty and count was 1, an empty list will be returned. + If count was greater 1, a list with less than count items will be + returned. + If count was < 1, a list with all items that could be read will be + returned.""" # if the channel is closed for writing, we never block if self._wc.closed: block = False - + + out = list() try: - return self._wc._queue.get(block, timeout) + if count == 1: + out.append(self._wc._queue.get(block, timeout)) + elif count < 1: + while True: + out.append(self._wc._queue.get(block, timeout)) + # END for each item + return out + else: + for i in xrange(count): + out.append(self._wc._queue.get(block, timeout)) + # END for each item + # END handle count except Empty: - raise IOError("Error reading from an empty channel") - # END handle reading + pass + # END handle exceptions + return out #} END interface diff --git a/lib/git/async/graph.py b/lib/git/async/graph.py new file mode 100644 index 00000000..0c0a2137 --- /dev/null +++ b/lib/git/async/graph.py @@ -0,0 +1,36 @@ +"""Simplistic implementation of a graph""" + +class Node(object): + """A quick and dirty to the point implementation of a simple, and slow ascyclic graph. + Its not designed to support big graphs, and sports only the functionality + we need""" + __slots__ = ('in_nodes', 'out_nodes') + + +class Graph(object): + """A simple graph implementation, keeping nodes and providing basic access and + editing functions""" + __slots__ = "nodes" + + def __init__(self): + self.nodes = list() + + def add_node(self, node): + """Add a new node to the graph""" + raise NotImplementedError() + + def del_node(self, node): + """Delete a node from the graph""" + raise NotImplementedError() + + def add_edge(self, u, v): + """Add an undirected edge between the given nodes u and v. + :raise ValueError: If the new edge would create a cycle""" + raise NotImplementedError() + + def visit_input_depth_first(self, node, visitor=lambda n: True ): + """Visit all input nodes of the given node, depth first, calling visitor + for each node on our way. If the function returns False, the traversal + will not go any deeper, but continue at the next branch""" + raise NotImplementedError() + diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index f9f7880b..7798d3d4 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -1,92 +1,146 @@ """Implementation of a thread-pool working with channels""" from thread import WorkerThread +from Queue import Queue + +from graph import ( + Graph, + Node + ) + from channel import ( Channel, WChannel, RChannel ) -class Node(object): - """A quick and dirty to the point implementation of a simple, and slow ascyclic graph. - Its not designed to support big graphs, and sports only the functionality - we need""" - __slots__ = ('in_nodes', 'out_nodes') - - -class Graph(object): - """A simple graph implementation, keeping nodes and providing basic access and - editing functions""" - __slots__ = "nodes" - - def add_node(self, node): - pass - - def del_node(self, node): - pass - - def visit_input_depth_first(self, node, visitor=lambda n: True ): - """Visit all input nodes of the given node, depth first, calling visitor - for each node on our way. If the function returns False, the traversal - will not go any deeper, but continue at the next branch""" - pass - +import weakref +import sys class TaskNode(Node): """Couples an input channel, an output channel, as well as a processing function together. It may contain additional information on how to handel read-errors from the input channel""" - __slots__ = ('in_rc', 'out_wc', 'fun') + __slots__ = ( 'in_rc', # input read channel + '_out_wc', # output write channel + '_pool_ref', # ref to our pool + '_exc', # exception caught + 'fun', # function to call with items read from in_rc + 'max_chunksize', # maximium amount of items to process per process call + 'apply_single' # apply single items even if multiple where read + ) + + def __init__(self, in_rc, fun, apply_single=True): + self.in_rc = in_rc + self._out_wc = None + self._pool_ref = None + self._exc = None + self.fun = fun + self.max_chunksize = 0 # note set + self.apply_single = apply_single def is_done(self): """:return: True if we are finished processing""" - return self.out_wc.closed + return self._out_wc.closed + + def set_done(self): + """Set ourselves to being done, has we have completed the processing""" + self._out_wc.close() + + def error(self): + """:return: Exception caught during last processing or None""" + return self._exc + def process(self, count=1): + """Process count items and send the result individually to the output channel""" + if self._out_wc is None: + raise IOError("Cannot work in uninitialized task") + + read = self.in_rc.read + if isinstance(self.in_rc, RPoolChannel) and self.in_rc._pool is self._pool_ref(): + read = self.in_rc._read + items = read(count) + + try: + if self.apply_single: + for item in items: + self._out_wc.write(self.fun(item)) + # END for each item + else: + self._out_wc.write(self.fun(items)) + # END handle single apply + except Exception, e: + self._exc = e + self.set_done() + # END exception handling + + # if we didn't get all demanded items, which is also the case if count is 0 + # we have depleted the input channel and are done + if len(items) != count: + self.set_done() + # END handle done state + #{ Configuration + class RPoolChannel(RChannel): """ A read-only pool channel may not be wrapped or derived from, but it provides slots to call before and after an item is to be read. - It acts like a handle to the underlying task""" + It acts like a handle to the underlying task in the pool.""" __slots__ = ('_task', '_pool', '_pre_cb', '_post_cb') - def set_post_cb(self, fun = lambda item: item): - """Install a callback to call after the item has been read. The function - returns a possibly changed item. If it raises, the exception will be propagated - in an IOError, indicating read-failure - If a function is not provided, the call is effectively uninstalled.""" + def __init__(self, wchannel, task, pool): + RChannel.__init__(self, wchannel) + self._task = task + self._pool = pool + self._pre_cb = None + self._post_cb = None - def set_pre_cb(self, fun = lambda : None): - """Install a callback to call before an item is read from the channel. + def __del__(self): + """Assures that our task will be deleted if we were the last reader""" + del(self._wc) # decrement ref-count + self._pool._del_task_if_orphaned(self._task) + + def set_pre_cb(self, fun = lambda count: None): + """Install a callback to call with the item count to be read before any + item is actually read from the channel. If it fails, the read will fail with an IOError If a function is not provided, the call is effectively uninstalled.""" + self._pre_cb = fun + + def set_post_cb(self, fun = lambda item: item): + """Install a callback to call after the items were read. The function + returns a possibly changed item list. If it raises, the exception will be propagated. + If a function is not provided, the call is effectively uninstalled.""" + self._post_cb = fun - def read(block=False, timeout=None): + def read(self, count=1, block=False, timeout=None): """Read an item that was processed by one of our threads :note: Triggers task dependency handling needed to provide the necessary input""" + if self._pre_cb: + self._pre_cb() + # END pre callback + + ################################################## + self._pool._prepare_processing(self._task, count) + ################################################## + + items = RChannel.read(self, count, block, timeout) + if self._post_cb: + items = self._post_cb(items) #{ Internal - def _read(self, block=False, timeout=None): + def _read(self, count=1, block=False, timeout=None): """Calls the underlying channel's read directly, without triggering the pool""" - return RChannel.read(self, block, timeout) + return RChannel.read(self, count, block, timeout) #} END internal - -class PoolWorker(WorkerThread): - """A worker thread which gets called to deal with Tasks. Tasks provide channls - with actual work, whose result will be send to the tasks output channel""" - - @classmethod - def perform_task(cls, task): - # note : when getting the input channel, be sure not to trigger - # RPoolChannel - pass -class ThreadPool(Graph): +class ThreadPool(object): """A thread pool maintains a set of one or more worker threads, but supports a fully serial mode in which case the amount of threads is zero. @@ -94,23 +148,130 @@ class ThreadPool(Graph): is lazy, as work will only be done once an output is requested. :note: the current implementation returns channels which are meant to be - used only from the main thread""" - __slots__ = ( '_workers', # list of worker threads + used only from the main thread, hence you cannot consume their results + from multiple threads unless you use a task for it.""" + __slots__ = ( '_tasks', # a graph of tasks + '_consumed_tasks', # a list with tasks that are done or had an error + '_workers', # list of worker threads '_queue', # master queue for tasks - '_ordered_tasks_cache' # tasks in order of evaluation, mapped by read channel + '_ordered_tasks_cache' # tasks in order of evaluation, mapped from task -> task list ) - def del_node(self, task): - """Delete the node ( being a task ), but delete the entries in our output channel - cache as well""" + def __init__(self, size=0): + self._tasks = Graph() + self._consumed_tasks = list() + self._workers = list() + self._queue = Queue() + self._ordered_tasks_cache = dict() + + def __del__(self): + raise NotImplementedError("TODO: Proper cleanup") + + #{ Internal + def _queue_feeder_visitor(self, task, count): + """Walk the graph and find tasks that are done for later cleanup, and + queue all others for processing by our worker threads ( if available ).""" + if task.error() or task.is_done(): + self._consumed_tasks.append(task) + + # if the task does not have the required output on its queue, schedule + # it for processing. If we should process all, we don't care about the + # amount as it should process until its all done. + if self._workers: + if count < 1 or task._out_wc.size() < count: + # respect the chunk size, and split the task up if we want + # to process too much. This can be defined per task + queue = self._queue + if task.max_chunksize: + chunksize = count / task.max_chunksize + remainder = count - (chunksize * task.max_chunksize) + for i in xrange(chunksize): + queue.put((task.process, chunksize)) + if remainder: + queue.put((task.process, remainder)) + else: + self._queue.put((task.process, count)) + # END handle chunksize + # END handle queuing + else: + # no workers, so we have to do the work ourselves + task.process(count) + # END handle serial mode + + # always walk the whole graph, we want to find consumed tasks + return True + + def _prepare_processing(self, task, count): + """Process the tasks which depend on the given one to be sure the input + channels are filled with data once we process the actual task + + Tasks have two important states: either they are done, or they are done + and have an error, so they are likely not to have finished all their work. + + Either way, we will put them onto a list of tasks to delete them, providng + information about the failed ones. + + Tasks which are not done will be put onto the queue for processing, which + is fine as we walked them depth-first.""" + self._tasks.visit_input_depth_first(task, lambda n: self._queue_feeder_visitor(n, count)) + + # delete consumed tasks to cleanup + for task in self._consumed_tasks: + self.del_task(task) + # END for each task to delete + del(self._consumed_tasks[:]) + + def _del_task_if_orphaned(self, task): + """Check the task, and delete it if it is orphaned""" + if sys.getrefcount(task._out_wc) < 3: + self.del_task(task) + #} END internal + + #{ Interface + + def del_task(self, task): + """Delete the task + Additionally we will remove orphaned tasks, which can be identified if their + output channel is only held by themselves, so no one will ever consume + its items.""" + # now delete our actual node - must set it done os it closes its channels. + # Otherwise further reads of output tasks will block. + # Actually they may still block if anyone wants to read all ... without + # a timeout + # keep its input nodes as we check whether they were orphaned + in_tasks = task.in_nodes + task.set_done() + self._tasks.del_node(task) + for t in in_tasks + self._del_task_if_orphaned(t) + # END handle orphans recursively def set_pool_size(self, size=0): """Set the amount of workers to use in this pool. :param size: if 0, the pool will do all work itself in the calling thread, otherwise the work will be distributed among the given amount of threads""" + raise NotImplementedError() def add_task(self, task): """Add a new task to be processed. - :return: your task instance with its output channel set. It can be used - to retrieve processed items""" + :return: a read channel to retrieve processed items. If that handle is lost, + the task will be considered orphaned and will be deleted on the next + occasion.""" + # create a write channel for it + wc, rc = Channel() + rc = RPoolChannel(wc, task, self) + task._out_wc = wc + task._pool_ref = weakref.ref(self) + + self._tasks.add_node(task) + + # If the input channel is one of our read channels, we add the relation + ic = task.in_rc + if isinstance(ic, RPoolChannel) and ic._pool is self: + self._tasks.add_edge(ic._task, task) + # END add task relation + + return rc + + #} END interface -- cgit v1.2.3 From ec28ad575ce1d7bb6a616ffc404f32bbb1af67b2 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sun, 6 Jun 2010 12:48:25 +0200 Subject: thread: adjusted worker thread not to provide an output queue anymore - this is handled by the task system graph: implemented it including test according to the pools requirements pool: implemented set_pool_size --- lib/git/async/graph.py | 84 +++++++++++++++++++++++++++++++++++++++++++------ lib/git/async/pool.py | 53 ++++++++++++++++++++++++++++--- lib/git/async/thread.py | 27 +++------------- 3 files changed, 127 insertions(+), 37 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/graph.py b/lib/git/async/graph.py index 0c0a2137..b4d6aa00 100644 --- a/lib/git/async/graph.py +++ b/lib/git/async/graph.py @@ -6,31 +6,95 @@ class Node(object): we need""" __slots__ = ('in_nodes', 'out_nodes') + def __init__(self): + self.in_nodes = list() + self.out_nodes = list() + class Graph(object): """A simple graph implementation, keeping nodes and providing basic access and - editing functions""" + editing functions. The performance is only suitable for small graphs of not + more than 10 nodes !""" __slots__ = "nodes" def __init__(self): self.nodes = list() def add_node(self, node): - """Add a new node to the graph""" - raise NotImplementedError() + """Add a new node to the graph + :return: the newly added node""" + self.nodes.append(node) + return node def del_node(self, node): - """Delete a node from the graph""" - raise NotImplementedError() + """Delete a node from the graph + :return: self""" + # clear connections + for outn in node.out_nodes: + del(outn.in_nodes[outn.in_nodes.index(node)]) + for inn in node.in_nodes: + del(inn.out_nodes[inn.out_nodes.index(node)]) + del(self.nodes[self.nodes.index(node)]) + return self def add_edge(self, u, v): """Add an undirected edge between the given nodes u and v. + + return: self :raise ValueError: If the new edge would create a cycle""" - raise NotImplementedError() + if u is v: + raise ValueError("Cannot connect a node with itself") + + # are they already connected ? + if u in v.in_nodes and v in u.out_nodes or \ + v in u.in_nodes and u in v.out_nodes: + return self + # END handle connection exists + + # cycle check - if we can reach any of the two by following either ones + # history, its a cycle + for start, end in ((u, v), (v,u)): + if not start.in_nodes: + continue + nodes = start.in_nodes[:] + seen = set() + # depth first search - its faster + while nodes: + n = nodes.pop() + if n in seen: + continue + seen.add(n) + if n is end: + raise ValueError("Connecting u with v would create a cycle") + nodes.extend(n.in_nodes) + # END while we are searching + # END for each direction to look + + # connection is valid, set it up + u.out_nodes.append(v) + v.in_nodes.append(u) + + return self - def visit_input_depth_first(self, node, visitor=lambda n: True ): + def visit_input_inclusive_depth_first(self, node, visitor=lambda n: True ): """Visit all input nodes of the given node, depth first, calling visitor for each node on our way. If the function returns False, the traversal - will not go any deeper, but continue at the next branch""" - raise NotImplementedError() - + will not go any deeper, but continue at the next branch + It will return the actual input node in the end !""" + nodes = node.in_nodes[:] + seen = set() + + # depth first + while nodes: + n = nodes.pop() + if n in seen: + continue + seen.add(n) + + # only proceed in that direction if visitor is fine with it + if visitor(n): + nodes.extend(n.in_nodes) + # END call visitor + # END while walking + visitor(node) + diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 7798d3d4..9a24cbc5 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -25,7 +25,8 @@ class TaskNode(Node): '_out_wc', # output write channel '_pool_ref', # ref to our pool '_exc', # exception caught - 'fun', # function to call with items read from in_rc + 'fun', # function to call with items read from in_rc + 'min_count', # minimum amount of items to produce, None means no override 'max_chunksize', # maximium amount of items to process per process call 'apply_single' # apply single items even if multiple where read ) @@ -36,6 +37,7 @@ class TaskNode(Node): self._pool_ref = None self._exc = None self.fun = fun + self.min_count = None self.max_chunksize = 0 # note set self.apply_single = apply_single @@ -174,6 +176,12 @@ class ThreadPool(object): if task.error() or task.is_done(): self._consumed_tasks.append(task) + # allow min-count override. This makes sure we take at least min-count + # items off the input queue ( later ) + if task.min_count is not None: + count = task.min_count + # END handle min-count + # if the task does not have the required output on its queue, schedule # it for processing. If we should process all, we don't care about the # amount as it should process until its all done. @@ -213,7 +221,7 @@ class ThreadPool(object): Tasks which are not done will be put onto the queue for processing, which is fine as we walked them depth-first.""" - self._tasks.visit_input_depth_first(task, lambda n: self._queue_feeder_visitor(n, count)) + self._tasks.visit_input_inclusive_depth_first(task, lambda n: self._queue_feeder_visitor(n, count)) # delete consumed tasks to cleanup for task in self._consumed_tasks: @@ -233,7 +241,9 @@ class ThreadPool(object): """Delete the task Additionally we will remove orphaned tasks, which can be identified if their output channel is only held by themselves, so no one will ever consume - its items.""" + its items. + + :return: self""" # now delete our actual node - must set it done os it closes its channels. # Otherwise further reads of output tasks will block. # Actually they may still block if anyone wants to read all ... without @@ -246,12 +256,45 @@ class ThreadPool(object): for t in in_tasks self._del_task_if_orphaned(t) # END handle orphans recursively + + return self def set_pool_size(self, size=0): - """Set the amount of workers to use in this pool. + """Set the amount of workers to use in this pool. When reducing the size, + the call may block as it waits for threads to finish. + When reducing the size to zero, this thread will process all remaining + items on the queue. + + :return: self :param size: if 0, the pool will do all work itself in the calling thread, otherwise the work will be distributed among the given amount of threads""" - raise NotImplementedError() + # either start new threads, or kill existing ones. + # If we end up with no threads, we process the remaining chunks on the queue + # ourselves + cur_count = len(self._workers) + if cur_count < size: + for i in range(size - cur_count): + worker = WorkerThread(self._queue) + self._workers.append(worker) + # END for each new worker to create + elif cur_count > size: + del_count = cur_count - size + for i in range(del_count): + self._workers[i].stop_and_join() + # END for each thread to stop + del(self._workers[:del_count]) + # END handle count + + if size == 0: + while not self._queue.empty(): + try: + taskmethod, count = self._queue.get(False) + taskmethod(count) + except Queue.Empty: + continue + # END while there are tasks on the queue + # END process queue + return self def add_task(self, task): """Add a new task to be processed. diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py index 3938666a..7ca93c86 100644 --- a/lib/git/async/thread.py +++ b/lib/git/async/thread.py @@ -85,9 +85,9 @@ class TerminatableThread(threading.Thread): class WorkerThread(TerminatableThread): - """ - This base allows to call functions on class instances natively and retrieve - their results asynchronously using a queue. + """ This base allows to call functions on class instances natively. + As it is meant to work with a pool, the result of the call must be + handled by the callee. The thread runs forever unless it receives the terminate signal using its task queue. @@ -95,11 +95,9 @@ class WorkerThread(TerminatableThread): allow the following: inq = Queue() - outq = Queue() - w = WorkerThread(inq, outq) + w = WorkerThread(inq) w.start() inq.put((WorkerThread., args, kwargs)) - res = outq.get() finally we call quit to terminate asap. @@ -120,10 +118,9 @@ class WorkerThread(TerminatableThread): class InvalidRoutineError(Exception): """Class sent as return value in case of an error""" - def __init__(self, inq = None, outq = None): + def __init__(self, inq = None): super(WorkerThread, self).__init__() self.inq = inq or Queue.Queue() - self.outq = outq or Queue.Queue() def call(self, function, *args, **kwargs): """Method that makes the call to the worker using the input queue, @@ -135,17 +132,6 @@ class WorkerThread(TerminatableThread): :param args: arguments to pass to function :parma **kwargs: kwargs to pass to function""" self.inq.put((function, args, kwargs)) - return self.outq - - def wait_until_idle(self): - """wait until the input queue is empty, in the meanwhile, take all - results off the output queue.""" - while not self.inq.empty(): - try: - self.outq.get(False) - except Queue.Empty: - continue - # END while there are tasks on the queue def run(self): """Process input tasks until we receive the quit signal""" @@ -184,15 +170,12 @@ class WorkerThread(TerminatableThread): else: # ignore unknown items print "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple)) - self.outq.put(self.InvalidRoutineError(routine)) break # END make routine call - self.outq.put(rval) except StopIteration: break except Exception,e: print "%s: Task %s raised unhandled exception: %s" % (self.getName(), str(tasktuple), str(e)) - self.outq.put(e) # END routine exception handling # END endless loop -- cgit v1.2.3 From b3cde0ee162b8f0cb67da981311c8f9c16050a62 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sun, 6 Jun 2010 18:13:21 +0200 Subject: First step of testing the pool - tasks have been separated into a new module including own tests, their design improved to prepare them for some specifics that would be needed for multiprocessing support --- lib/git/async/graph.py | 16 +++-- lib/git/async/pool.py | 164 ++++++++++++++++++------------------------------ lib/git/async/task.py | 144 ++++++++++++++++++++++++++++++++++++++++++ lib/git/async/thread.py | 9 ++- lib/git/async/util.py | 24 +++++++ 5 files changed, 246 insertions(+), 111 deletions(-) create mode 100644 lib/git/async/task.py create mode 100644 lib/git/async/util.py (limited to 'lib/git/async') diff --git a/lib/git/async/graph.py b/lib/git/async/graph.py index b4d6aa00..d817eeb4 100644 --- a/lib/git/async/graph.py +++ b/lib/git/async/graph.py @@ -1,14 +1,20 @@ """Simplistic implementation of a graph""" class Node(object): - """A quick and dirty to the point implementation of a simple, and slow ascyclic graph. - Its not designed to support big graphs, and sports only the functionality - we need""" - __slots__ = ('in_nodes', 'out_nodes') + """A Node in the graph. They know their neighbours, and have an id which should + resolve into a string""" + __slots__ = ('in_nodes', 'out_nodes', 'id') - def __init__(self): + def __init__(self, id=None): + self.id = id self.in_nodes = list() self.out_nodes = list() + + def __str__(self): + return str(self.id) + + def __repr__(self): + return "%s(%s)" % (type(self).__name__, self.id) class Graph(object): diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 9a24cbc5..2efc862b 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -1,10 +1,10 @@ """Implementation of a thread-pool working with channels""" from thread import WorkerThread +from task import InputChannelTask from Queue import Queue from graph import ( Graph, - Node ) from channel import ( @@ -16,73 +16,6 @@ from channel import ( import weakref import sys -class TaskNode(Node): - """Couples an input channel, an output channel, as well as a processing function - together. - It may contain additional information on how to handel read-errors from the - input channel""" - __slots__ = ( 'in_rc', # input read channel - '_out_wc', # output write channel - '_pool_ref', # ref to our pool - '_exc', # exception caught - 'fun', # function to call with items read from in_rc - 'min_count', # minimum amount of items to produce, None means no override - 'max_chunksize', # maximium amount of items to process per process call - 'apply_single' # apply single items even if multiple where read - ) - - def __init__(self, in_rc, fun, apply_single=True): - self.in_rc = in_rc - self._out_wc = None - self._pool_ref = None - self._exc = None - self.fun = fun - self.min_count = None - self.max_chunksize = 0 # note set - self.apply_single = apply_single - - def is_done(self): - """:return: True if we are finished processing""" - return self._out_wc.closed - - def set_done(self): - """Set ourselves to being done, has we have completed the processing""" - self._out_wc.close() - - def error(self): - """:return: Exception caught during last processing or None""" - return self._exc - - def process(self, count=1): - """Process count items and send the result individually to the output channel""" - if self._out_wc is None: - raise IOError("Cannot work in uninitialized task") - - read = self.in_rc.read - if isinstance(self.in_rc, RPoolChannel) and self.in_rc._pool is self._pool_ref(): - read = self.in_rc._read - items = read(count) - - try: - if self.apply_single: - for item in items: - self._out_wc.write(self.fun(item)) - # END for each item - else: - self._out_wc.write(self.fun(items)) - # END handle single apply - except Exception, e: - self._exc = e - self.set_done() - # END exception handling - - # if we didn't get all demanded items, which is also the case if count is 0 - # we have depleted the input channel and are done - if len(items) != count: - self.set_done() - # END handle done state - #{ Configuration - class RPoolChannel(RChannel): """ A read-only pool channel may not be wrapped or derived from, but it provides slots to call @@ -116,7 +49,7 @@ class RPoolChannel(RChannel): If a function is not provided, the call is effectively uninstalled.""" self._post_cb = fun - def read(self, count=1, block=False, timeout=None): + def read(self, count=0, block=False, timeout=None): """Read an item that was processed by one of our threads :note: Triggers task dependency handling needed to provide the necessary input""" @@ -131,9 +64,11 @@ class RPoolChannel(RChannel): items = RChannel.read(self, count, block, timeout) if self._post_cb: items = self._post_cb(items) + + return items #{ Internal - def _read(self, count=1, block=False, timeout=None): + def _read(self, count=0, block=False, timeout=None): """Calls the underlying channel's read directly, without triggering the pool""" return RChannel.read(self, count, block, timeout) @@ -141,7 +76,6 @@ class RPoolChannel(RChannel): #} END internal - class ThreadPool(object): """A thread pool maintains a set of one or more worker threads, but supports a fully serial mode in which case the amount of threads is zero. @@ -149,6 +83,15 @@ class ThreadPool(object): Work is distributed via Channels, which form a dependency graph. The evaluation is lazy, as work will only be done once an output is requested. + The thread pools inherent issue is the global interpreter lock that it will hit, + which gets worse considering a few c extensions specifically lock their part + globally as well. The only way this will improve is if custom c extensions + are written which do some bulk work, but release the GIL once they have acquired + their resources. + + Due to the nature of having multiple objects in git, its easy to distribute + that work cleanly among threads. + :note: the current implementation returns channels which are meant to be used only from the main thread, hence you cannot consume their results from multiple threads unless you use a task for it.""" @@ -156,7 +99,6 @@ class ThreadPool(object): '_consumed_tasks', # a list with tasks that are done or had an error '_workers', # list of worker threads '_queue', # master queue for tasks - '_ordered_tasks_cache' # tasks in order of evaluation, mapped from task -> task list ) def __init__(self, size=0): @@ -164,10 +106,10 @@ class ThreadPool(object): self._consumed_tasks = list() self._workers = list() self._queue = Queue() - self._ordered_tasks_cache = dict() + self.set_size(size) def __del__(self): - raise NotImplementedError("TODO: Proper cleanup") + self.set_size(0) #{ Internal def _queue_feeder_visitor(self, task, count): @@ -175,7 +117,7 @@ class ThreadPool(object): queue all others for processing by our worker threads ( if available ).""" if task.error() or task.is_done(): self._consumed_tasks.append(task) - + # allow min-count override. This makes sure we take at least min-count # items off the input queue ( later ) if task.min_count is not None: @@ -236,30 +178,11 @@ class ThreadPool(object): #} END internal #{ Interface + def size(self): + """:return: amount of workers in the pool""" + return len(self._workers) - def del_task(self, task): - """Delete the task - Additionally we will remove orphaned tasks, which can be identified if their - output channel is only held by themselves, so no one will ever consume - its items. - - :return: self""" - # now delete our actual node - must set it done os it closes its channels. - # Otherwise further reads of output tasks will block. - # Actually they may still block if anyone wants to read all ... without - # a timeout - # keep its input nodes as we check whether they were orphaned - in_tasks = task.in_nodes - task.set_done() - self._tasks.del_node(task) - - for t in in_tasks - self._del_task_if_orphaned(t) - # END handle orphans recursively - - return self - - def set_pool_size(self, size=0): + def set_size(self, size=0): """Set the amount of workers to use in this pool. When reducing the size, the call may block as it waits for threads to finish. When reducing the size to zero, this thread will process all remaining @@ -275,6 +198,7 @@ class ThreadPool(object): if cur_count < size: for i in range(size - cur_count): worker = WorkerThread(self._queue) + worker.start() self._workers.append(worker) # END for each new worker to create elif cur_count > size: @@ -295,7 +219,33 @@ class ThreadPool(object): # END while there are tasks on the queue # END process queue return self - + + def num_tasks(self): + """:return: amount of tasks""" + return len(self._tasks.nodes) + + def del_task(self, task): + """Delete the task + Additionally we will remove orphaned tasks, which can be identified if their + output channel is only held by themselves, so no one will ever consume + its items. + + :return: self""" + # now delete our actual node - must set it done os it closes its channels. + # Otherwise further reads of output tasks will block. + # Actually they may still block if anyone wants to read all ... without + # a timeout + # keep its input nodes as we check whether they were orphaned + in_tasks = task.in_nodes + task.set_done() + self._tasks.del_node(task) + + for t in in_tasks: + self._del_task_if_orphaned(t) + # END handle orphans recursively + + return self + def add_task(self, task): """Add a new task to be processed. :return: a read channel to retrieve processed items. If that handle is lost, @@ -305,15 +255,21 @@ class ThreadPool(object): wc, rc = Channel() rc = RPoolChannel(wc, task, self) task._out_wc = wc - task._pool_ref = weakref.ref(self) + + has_input_channel = isinstance(task, InputChannelTask) + if has_input_channel: + task._pool_ref = weakref.ref(self) + # END init input channel task self._tasks.add_node(task) # If the input channel is one of our read channels, we add the relation - ic = task.in_rc - if isinstance(ic, RPoolChannel) and ic._pool is self: - self._tasks.add_edge(ic._task, task) - # END add task relation + if has_input_channel: + ic = task.in_rc + if isinstance(ic, RPoolChannel) and ic._pool is self: + self._tasks.add_edge(ic._task, task) + # END add task relation + # END handle input channels for connections return rc diff --git a/lib/git/async/task.py b/lib/git/async/task.py new file mode 100644 index 00000000..d2422773 --- /dev/null +++ b/lib/git/async/task.py @@ -0,0 +1,144 @@ +from graph import Node +import threading +import new + +class OutputChannelTask(Node): + """Abstracts a named task as part of a set of interdependent tasks, which contains + additional information on how the task should be queued and processed. + + Results of the item processing are sent to an output channel, which is to be + set by the creator""" + __slots__ = ( '_read', # method to yield items to process + '_out_wc', # output write channel + '_exc', # exception caught + 'fun', # function to call with items read + 'min_count', # minimum amount of items to produce, None means no override + 'max_chunksize', # maximium amount of items to process per process call + 'apply_single' # apply single items even if multiple where read + ) + + def __init__(self, id, fun, apply_single=True, min_count=None, max_chunksize=0): + Node.__init__(self, id) + self._read = None # to be set by subclasss + self._out_wc = None # to be set later + self._exc = None + self.fun = fun + self.min_count = None + self.max_chunksize = 0 # note set + self.apply_single = apply_single + + def is_done(self): + """:return: True if we are finished processing""" + return self._out_wc.closed + + def set_done(self): + """Set ourselves to being done, has we have completed the processing""" + self._out_wc.close() + + def error(self): + """:return: Exception caught during last processing or None""" + return self._exc + + def process(self, count=0): + """Process count items and send the result individually to the output channel""" + items = self._read(count) + + try: + if self.apply_single: + for item in items: + self._out_wc.write(self.fun(item)) + # END for each item + else: + self._out_wc.write(self.fun(items)) + # END handle single apply + except Exception, e: + self._exc = e + self.set_done() + # END exception handling + + # if we didn't get all demanded items, which is also the case if count is 0 + # we have depleted the input channel and are done + if len(items) != count: + self.set_done() + # END handle done state + #{ Configuration + + +class ThreadTaskBase(object): + """Describes tasks which can be used with theaded pools""" + pass + + +class InputIteratorTaskBase(OutputChannelTask): + """Implements a task which processes items from an iterable in a multi-processing + safe manner""" + __slots__ = ('_iterator', '_lock') + # the type of the lock to use when reading from the iterator + lock_type = None + + def __init__(self, iterator, *args, **kwargs): + OutputChannelTask.__init__(self, *args, **kwargs) + if not hasattr(iterator, 'next'): + raise ValueError("Iterator %r needs a next() function" % iterator) + self._iterator = iterator + self._lock = self.lock_type() + self._read = self.__read + + def __read(self, count=0): + """Read count items from the iterator, and return them""" + self._lock.acquire() + try: + if count == 0: + return list(self._iterator) + else: + out = list() + it = self._iterator + for i in xrange(count): + try: + out.append(it.next()) + except StopIteration: + break + # END handle empty iterator + # END for each item to take + return out + # END handle count + finally: + self._lock.release() + # END handle locking + + +class InputIteratorThreadTask(InputIteratorTaskBase, ThreadTaskBase): + """An input iterator for threaded pools""" + lock_type = threading.Lock + + +class InputChannelTask(OutputChannelTask): + """Uses an input channel as source for reading items + For instantiation, it takes all arguments of its base, the first one needs + to be the input channel to read from though.""" + __slots__ = ( + 'in_rc', # channel to read items from + '_pool_ref' # to be set by Pool + ) + + def __init__(self, in_rc, *args, **kwargs): + OutputChannelTask.__init__(self, *args, **kwargs) + self._in_rc = in_rc + + def process(self, count=1): + """Verify our setup, and do some additional checking, before the + base implementation can permanently perform all operations""" + self._read = self._in_rc.read + # make sure we don't trigger the pool if we read from a pool channel which + # belongs to our own pool. Channels from different pools are fine though, + # there we want to trigger its computation + if isinstance(self._in_rc, RPoolChannel) and self._in_rc._pool is self._pool_ref(): + self._read = self._in_rc._read + + # permanently install our base for processing + self.process = new.instancemethod(OutputChannelTask.__dict__['process'], self, type(self)) + + # and call it + return OutputChannelTask.process(self, count) + #{ Configuration + diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py index 7ca93c86..82acbd8f 100644 --- a/lib/git/async/thread.py +++ b/lib/git/async/thread.py @@ -139,10 +139,15 @@ class WorkerThread(TerminatableThread): if self._should_terminate(): break # END check for stop request - routine = self.__class__.quit + routine = None args = tuple() kwargs = dict() - tasktuple = self.inq.get() + # don't wait too long, instead check for the termination request more often + try: + tasktuple = self.inq.get(True, 1) + except Queue.Empty: + continue + # END get task with timeout if isinstance(tasktuple, (tuple, list)): if len(tasktuple) == 3: diff --git a/lib/git/async/util.py b/lib/git/async/util.py new file mode 100644 index 00000000..dabd8a42 --- /dev/null +++ b/lib/git/async/util.py @@ -0,0 +1,24 @@ +"""Module with utilities related to async operations""" + +import sys +import os + +def cpu_count(): + """:return:number of CPUs in the system + :note: inspired by multiprocessing""" + num = 0 + try: + if sys.platform == 'win32': + num = int(os.environ['NUMBER_OF_PROCESSORS']) + elif 'bsd' in sys.platform or sys.platform == 'darwin': + num = int(os.popen('sysctl -n hw.ncpu').read()) + else: + num = os.sysconf('SC_NPROCESSORS_ONLN') + except (ValueError, KeyError, OSError, AttributeError): + pass + # END exception handling + + if num == 0: + raise NotImplementedError('cannot determine number of cpus') + + return num -- cgit v1.2.3 From 1b27292936c81637f6b9a7141dafaad1126f268e Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sun, 6 Jun 2010 21:15:13 +0200 Subject: Plenty of fixes in the chunking routine, made possible by a serialized chunking test. Next up, actual async processing --- lib/git/async/graph.py | 7 +++++- lib/git/async/pool.py | 66 ++++++++++++++++++++++++++++++++++++++------------ lib/git/async/task.py | 2 +- 3 files changed, 58 insertions(+), 17 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/graph.py b/lib/git/async/graph.py index d817eeb4..6386cbaa 100644 --- a/lib/git/async/graph.py +++ b/lib/git/async/graph.py @@ -35,12 +35,17 @@ class Graph(object): def del_node(self, node): """Delete a node from the graph :return: self""" + try: + del(self.nodes[self.nodes.index(node)]) + except ValueError: + return self + # END ignore if it doesn't exist + # clear connections for outn in node.out_nodes: del(outn.in_nodes[outn.in_nodes.index(node)]) for inn in node.in_nodes: del(inn.out_nodes[inn.out_nodes.index(node)]) - del(self.nodes[self.nodes.index(node)]) return self def add_edge(self, u, v): diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 2efc862b..620e2258 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -117,36 +117,72 @@ class ThreadPool(object): queue all others for processing by our worker threads ( if available ).""" if task.error() or task.is_done(): self._consumed_tasks.append(task) + return True + # END stop processing # allow min-count override. This makes sure we take at least min-count # items off the input queue ( later ) - if task.min_count is not None: + if task.min_count is not None and count != 0 and count < task.min_count: count = task.min_count # END handle min-count # if the task does not have the required output on its queue, schedule # it for processing. If we should process all, we don't care about the # amount as it should process until its all done. - if self._workers: - if count < 1 or task._out_wc.size() < count: + if count < 1 or task._out_wc.size() < count: + numchunks = 1 + chunksize = count + remainder = 0 + + # we need the count set for this - can't chunk up unlimited items + # In serial mode we could do this by checking for empty input channels, + # but in dispatch mode its impossible ( == not easily possible ) + # Only try it if we have enough demand + if task.max_chunksize and count > task.max_chunksize: + numchunks = count / task.max_chunksize + chunksize = task.max_chunksize + remainder = count - (numchunks * chunksize) + # END handle chunking + + print count, numchunks, chunksize, remainder + # the following loops are kind of unrolled - code duplication + # should make things execute faster. Putting the if statements + # into the loop would be less code, but ... slower + if self._workers: # respect the chunk size, and split the task up if we want # to process too much. This can be defined per task queue = self._queue - if task.max_chunksize: - chunksize = count / task.max_chunksize - remainder = count - (chunksize * task.max_chunksize) - for i in xrange(chunksize): + if numchunks > 1: + for i in xrange(numchunks): queue.put((task.process, chunksize)) - if remainder: - queue.put((task.process, remainder)) + # END for each chunk to put + else: + queue.put((task.process, chunksize)) + # END try efficient looping + + if remainder: + queue.put((task.process, remainder)) + # END handle chunksize + else: + # no workers, so we have to do the work ourselves + if numchunks > 1: + for i in xrange(numchunks): + task.process(chunksize) + # END for each chunk to put else: - self._queue.put((task.process, count)) + task.process(chunksize) + # END try efficient looping + + if remainder: + task.process(remainder) # END handle chunksize - # END handle queuing - else: - # no workers, so we have to do the work ourselves - task.process(count) - # END handle serial mode + + # as we are serial, we can check for consumption right away + if task.error() or task.is_done(): + self._consumed_tasks.append(task) + # END handle consumption + # END handle serial mode + # END handle queuing # always walk the whole graph, we want to find consumed tasks return True diff --git a/lib/git/async/task.py b/lib/git/async/task.py index d2422773..ec650237 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -58,7 +58,7 @@ class OutputChannelTask(Node): # if we didn't get all demanded items, which is also the case if count is 0 # we have depleted the input channel and are done - if len(items) != count: + if not items or len(items) != count: self.set_done() # END handle done state #{ Configuration -- cgit v1.2.3 From 867129e2950458ab75523b920a5e227e3efa8bbc Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sun, 6 Jun 2010 23:08:06 +0200 Subject: channel.read: enhanced to be sure we don't run into non-atomicity issues related to our channel closed flag, which is the only way not to block forever on read(0) channels which were closed by a thread 'in the meanwhile' --- lib/git/async/channel.py | 89 +++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 72 insertions(+), 17 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index 70daed24..0a1db26b 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -5,6 +5,9 @@ from Queue import ( Full ) +from time import time +import sys + #{ Classes class Channel(object): """A channel is similar to a file like object. It has a write end as well as one or @@ -106,26 +109,78 @@ class RChannel(Channel): If count was < 1, a list with all items that could be read will be returned.""" # if the channel is closed for writing, we never block - if self._wc.closed: + if self._wc.closed or timeout == 0: block = False - + + # in non-blocking mode, its all not a problem out = list() - try: - if count == 1: - out.append(self._wc._queue.get(block, timeout)) - elif count < 1: - while True: - out.append(self._wc._queue.get(block, timeout)) - # END for each item - return out - else: - for i in xrange(count): - out.append(self._wc._queue.get(block, timeout)) - # END for each item + queue = self._wc._queue + if not block: + # be as fast as possible in non-blocking mode, hence + # its a bit 'unrolled' + try: + if count == 1: + out.append(queue.get(False)) + elif count < 1: + while True: + out.append(queue.get(False)) + # END for each item + else: + for i in xrange(count): + out.append(queue.get(False)) + # END for each item + # END handle count + except Empty: + pass + # END handle exceptions + else: + # if we have really bad timing, the source of the channel + # marks itself closed, but before setting it, the thread + # switches to us. We read it, read False, and try to fetch + # something, and never return. The whole closed channel thing + # is not atomic ( of course ) + # This is why we never block for long, to get a chance to recheck + # for closed channels. + # We blend this into the timeout of the user + ourtimeout = 0.25 # the smaller, the more responsive, but the slower + wc = self._wc + timeout = (timeout is None and sys.maxint) or timeout # make sure we can compute with it + assert timeout != 0.0, "shouldn't block if timeout is 0" # okay safe + if timeout and ourtimeout > timeout: + ourtimeout = timeout + # END setup timeout + + # to get everything into one loop, we set the count accordingly + if count == 0: + count = sys.maxint # END handle count - except Empty: - pass - # END handle exceptions + + for i in xrange(count): + have_timeout = False + st = time() + while True: + try: + if wc.closed: + have_timeout = True + break + # END don't continue on closed channels + + # END abort reading if it was closed ( in the meanwhile ) + out.append(queue.get(block, ourtimeout)) + break # breakout right away + except Empty: + if timeout - (time() - st) <= 0: + # hitting timeout + have_timeout = True + break + # END abort if the user wants no more time spent here + # END handle timeout + # END endless timer loop + if have_timeout: + break + # END stop on timeout + # END for each item + # END handle blocking return out #} END interface -- cgit v1.2.3 From 6a252661c3bf4202a4d571f9c41d2afa48d9d75f Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sun, 6 Jun 2010 23:41:20 +0200 Subject: pool: First version which works as expected in async mode. Its just using a single task for now, but next up are dependent tasks --- lib/git/async/channel.py | 10 +++++++++- lib/git/async/pool.py | 44 +++++++++++++++++++++++++++----------------- lib/git/async/task.py | 12 ++++++++++-- lib/git/async/thread.py | 43 +++++++++---------------------------------- 4 files changed, 55 insertions(+), 54 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index 0a1db26b..2add9478 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -162,7 +162,15 @@ class RChannel(Channel): try: if wc.closed: have_timeout = True - break + # its about the 'in the meanwhile' :) - get everything + # we can in non-blocking mode. This will raise + try: + while True: + out.append(queue.get(False)) + # END until it raises Empty + except Empty: + break + # END finally, out of here # END don't continue on closed channels # END abort reading if it was closed ( in the meanwhile ) diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 620e2258..fcb0f442 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -49,7 +49,7 @@ class RPoolChannel(RChannel): If a function is not provided, the call is effectively uninstalled.""" self._post_cb = fun - def read(self, count=0, block=False, timeout=None): + def read(self, count=0, block=True, timeout=None): """Read an item that was processed by one of our threads :note: Triggers task dependency handling needed to provide the necessary input""" @@ -57,14 +57,21 @@ class RPoolChannel(RChannel): self._pre_cb() # END pre callback - ################################################## - self._pool._prepare_processing(self._task, count) - ################################################## + ########## prepare ############################## + self._pool._prepare_channel_read(self._task, count) + + ######### read data ###### + # read actual items, tasks were setup to put their output into our channel ( as well ) items = RChannel.read(self, count, block, timeout) + if self._post_cb: items = self._post_cb(items) + + ####### Finalize ######## + self._pool._post_channel_read(self._task) + return items #{ Internal @@ -119,17 +126,17 @@ class ThreadPool(object): self._consumed_tasks.append(task) return True # END stop processing - - # allow min-count override. This makes sure we take at least min-count - # items off the input queue ( later ) - if task.min_count is not None and count != 0 and count < task.min_count: - count = task.min_count - # END handle min-count # if the task does not have the required output on its queue, schedule # it for processing. If we should process all, we don't care about the # amount as it should process until its all done. if count < 1 or task._out_wc.size() < count: + # allow min-count override. This makes sure we take at least min-count + # items off the input queue ( later ) + if task.min_count is not None and 0 < count < task.min_count: + count = task.min_count + # END handle min-count + numchunks = 1 chunksize = count remainder = 0 @@ -144,10 +151,10 @@ class ThreadPool(object): remainder = count - (numchunks * chunksize) # END handle chunking - print count, numchunks, chunksize, remainder # the following loops are kind of unrolled - code duplication # should make things execute faster. Putting the if statements # into the loop would be less code, but ... slower + print count, numchunks, chunksize, remainder, task._out_wc.size() if self._workers: # respect the chunk size, and split the task up if we want # to process too much. This can be defined per task @@ -176,18 +183,13 @@ class ThreadPool(object): if remainder: task.process(remainder) # END handle chunksize - - # as we are serial, we can check for consumption right away - if task.error() or task.is_done(): - self._consumed_tasks.append(task) - # END handle consumption # END handle serial mode # END handle queuing # always walk the whole graph, we want to find consumed tasks return True - def _prepare_processing(self, task, count): + def _prepare_channel_read(self, task, count): """Process the tasks which depend on the given one to be sure the input channels are filled with data once we process the actual task @@ -201,10 +203,18 @@ class ThreadPool(object): is fine as we walked them depth-first.""" self._tasks.visit_input_inclusive_depth_first(task, lambda n: self._queue_feeder_visitor(n, count)) + def _post_channel_read(self, task): + """Called after we processed a read to cleanup""" + # check whether we consumed the task, and schedule it for deletion + if task.error() or task.is_done(): + self._consumed_tasks.append(task) + # END handle consumption + # delete consumed tasks to cleanup for task in self._consumed_tasks: self.del_task(task) # END for each task to delete + del(self._consumed_tasks[:]) def _del_task_if_orphaned(self, task): diff --git a/lib/git/async/task.py b/lib/git/async/task.py index ec650237..3137746c 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -7,7 +7,13 @@ class OutputChannelTask(Node): additional information on how the task should be queued and processed. Results of the item processing are sent to an output channel, which is to be - set by the creator""" + set by the creator + + * **min_count** assures that not less than min_count items will be processed per call. + * **max_chunksize** assures that multi-threading is happening in smaller chunks. If + someone wants all items to be processed, using read(0), the whole task would go to + one worker, as well as dependent tasks. If you want finer granularity , you can + specify this here, causing chunks to be no larger than max_chunksize""" __slots__ = ( '_read', # method to yield items to process '_out_wc', # output write channel '_exc', # exception caught @@ -42,7 +48,6 @@ class OutputChannelTask(Node): def process(self, count=0): """Process count items and send the result individually to the output channel""" items = self._read(count) - try: if self.apply_single: for item in items: @@ -58,6 +63,9 @@ class OutputChannelTask(Node): # if we didn't get all demanded items, which is also the case if count is 0 # we have depleted the input channel and are done + # We could check our output channel for how many items we have and put that + # into the equation, but whats important is that we were asked to produce + # count items. if not items or len(items) != count: self.set_done() # END handle done state diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py index 82acbd8f..0292289d 100644 --- a/lib/git/async/thread.py +++ b/lib/git/async/thread.py @@ -115,33 +115,17 @@ class WorkerThread(TerminatableThread): """ __slots__ = ('inq', 'outq') - class InvalidRoutineError(Exception): - """Class sent as return value in case of an error""" - def __init__(self, inq = None): super(WorkerThread, self).__init__() self.inq = inq or Queue.Queue() - def call(self, function, *args, **kwargs): - """Method that makes the call to the worker using the input queue, - returning our output queue - - :param funciton: can be a standalone function unrelated to this class, - a class method of this class or any instance method. - If it is a string, it will be considered a function residing on this instance - :param args: arguments to pass to function - :parma **kwargs: kwargs to pass to function""" - self.inq.put((function, args, kwargs)) - def run(self): """Process input tasks until we receive the quit signal""" while True: if self._should_terminate(): break # END check for stop request - routine = None - args = tuple() - kwargs = dict() + # don't wait too long, instead check for the termination request more often try: tasktuple = self.inq.get(True, 1) @@ -149,29 +133,19 @@ class WorkerThread(TerminatableThread): continue # END get task with timeout - if isinstance(tasktuple, (tuple, list)): - if len(tasktuple) == 3: - routine, args, kwargs = tasktuple - elif len(tasktuple) == 2: - routine, args = tasktuple - elif len(tasktuple) == 1: - routine = tasktuple[0] - # END tasktuple length check - elif inspect.isroutine(tasktuple): - routine = tasktuple - # END tasktuple handling + # needing exactly one function, and one arg + assert len(tasktuple) == 2, "Need tuple of function, arg - it could be more flexible, but its reduced to what we need" + routine, arg = tasktuple try: rval = None if inspect.ismethod(routine): if routine.im_self is None: - rval = routine(self, *args, **kwargs) + rval = routine(self, arg) else: - rval = routine(*args, **kwargs) + rval = routine(arg) elif inspect.isroutine(routine): - rval = routine(*args, **kwargs) - elif isinstance(routine, basestring) and hasattr(self, routine): - rval = getattr(self, routine)(*args, **kwargs) + rval = routine(arg) else: # ignore unknown items print "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple)) @@ -180,7 +154,8 @@ class WorkerThread(TerminatableThread): except StopIteration: break except Exception,e: - print "%s: Task %s raised unhandled exception: %s" % (self.getName(), str(tasktuple), str(e)) + print "%s: Task %s raised unhandled exception: %s - this really shouldn't happen !" % (self.getName(), str(tasktuple), str(e)) + break # abort ... # END routine exception handling # END endless loop -- cgit v1.2.3 From a8a448b7864e21db46184eab0f0a21d7725d074f Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Mon, 7 Jun 2010 10:38:22 +0200 Subject: pool.consumed_tasks: is now a queue to be thread safe, in preparation for multiple connected pools Reduced waiting time in tests to make them complete faster --- lib/git/async/pool.py | 29 ++++++++++++++++++----------- lib/git/async/thread.py | 5 +++++ 2 files changed, 23 insertions(+), 11 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index fcb0f442..5518e37e 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -1,7 +1,7 @@ """Implementation of a thread-pool working with channels""" from thread import WorkerThread from task import InputChannelTask -from Queue import Queue +from Queue import Queue, Empty from graph import ( Graph, @@ -103,14 +103,14 @@ class ThreadPool(object): used only from the main thread, hence you cannot consume their results from multiple threads unless you use a task for it.""" __slots__ = ( '_tasks', # a graph of tasks - '_consumed_tasks', # a list with tasks that are done or had an error + '_consumed_tasks', # a queue with tasks that are done or had an error '_workers', # list of worker threads '_queue', # master queue for tasks ) def __init__(self, size=0): self._tasks = Graph() - self._consumed_tasks = list() + self._consumed_tasks = Queue() # make sure its threadsafe self._workers = list() self._queue = Queue() self.set_size(size) @@ -123,7 +123,7 @@ class ThreadPool(object): """Walk the graph and find tasks that are done for later cleanup, and queue all others for processing by our worker threads ( if available ).""" if task.error() or task.is_done(): - self._consumed_tasks.append(task) + self._consumed_tasks.put(task) return True # END stop processing @@ -206,16 +206,21 @@ class ThreadPool(object): def _post_channel_read(self, task): """Called after we processed a read to cleanup""" # check whether we consumed the task, and schedule it for deletion + # This could have happend after the read returned ( even though the pre-read + # checks it as well ) if task.error() or task.is_done(): - self._consumed_tasks.append(task) + self._consumed_tasks.put(task) # END handle consumption # delete consumed tasks to cleanup - for task in self._consumed_tasks: - self.del_task(task) - # END for each task to delete - - del(self._consumed_tasks[:]) + try: + while True: + ct = self._consumed_tasks.get(False) + self.del_task(ct) + # END for each task to delete + except Empty: + pass + # END pop queue empty def _del_task_if_orphaned(self, task): """Check the task, and delete it if it is orphaned""" @@ -236,7 +241,9 @@ class ThreadPool(object): :return: self :param size: if 0, the pool will do all work itself in the calling thread, - otherwise the work will be distributed among the given amount of threads""" + otherwise the work will be distributed among the given amount of threads + + :note: currently NOT threadsafe !""" # either start new threads, or kill existing ones. # If we end up with no threads, we process the remaining chunks on the queue # ourselves diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py index 0292289d..2ed002e9 100644 --- a/lib/git/async/thread.py +++ b/lib/git/async/thread.py @@ -115,6 +115,11 @@ class WorkerThread(TerminatableThread): """ __slots__ = ('inq', 'outq') + + # define how often we should check for a shutdown request in case our + # taskqueue is empty + shutdown_check_time_s = 0.5 + def __init__(self, inq = None): super(WorkerThread, self).__init__() self.inq = inq or Queue.Queue() -- cgit v1.2.3 From 619662a9138fd78df02c52cae6dc89db1d70a0e5 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Mon, 7 Jun 2010 12:10:56 +0200 Subject: changed scheduling and chunksize calculation in respect to the task.min_count, to fix theoretical option for a deadlock in serial mode, and unnecessary blocking in async mode --- lib/git/async/pool.py | 213 ++++++++++++++++++++++++++++++++------------------ 1 file changed, 137 insertions(+), 76 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 5518e37e..009096f2 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -1,5 +1,6 @@ """Implementation of a thread-pool working with channels""" from thread import WorkerThread +from threading import Lock from task import InputChannelTask from Queue import Queue, Empty @@ -83,7 +84,7 @@ class RPoolChannel(RChannel): #} END internal -class ThreadPool(object): +class Pool(object): """A thread pool maintains a set of one or more worker threads, but supports a fully serial mode in which case the amount of threads is zero. @@ -106,88 +107,35 @@ class ThreadPool(object): '_consumed_tasks', # a queue with tasks that are done or had an error '_workers', # list of worker threads '_queue', # master queue for tasks + '_taskgraph_lock', # lock for accessing the task graph ) + # CONFIGURATION + # The type of worker to create - its expected to provide the Thread interface, + # taking the taskqueue as only init argument + # as well as a method called stop_and_join() to terminate it + WorkerCls = None + + # The type of lock to use to protect critical sections, providing the + # threading.Lock interface + LockCls = None + + # the type of the task queue to use - it must provide the Queue interface + TaskQueueCls = None + + def __init__(self, size=0): self._tasks = Graph() self._consumed_tasks = Queue() # make sure its threadsafe self._workers = list() - self._queue = Queue() + self._queue = self.TaskQueueCls() + self._taskgraph_lock = self.LockCls() self.set_size(size) def __del__(self): self.set_size(0) #{ Internal - def _queue_feeder_visitor(self, task, count): - """Walk the graph and find tasks that are done for later cleanup, and - queue all others for processing by our worker threads ( if available ).""" - if task.error() or task.is_done(): - self._consumed_tasks.put(task) - return True - # END stop processing - - # if the task does not have the required output on its queue, schedule - # it for processing. If we should process all, we don't care about the - # amount as it should process until its all done. - if count < 1 or task._out_wc.size() < count: - # allow min-count override. This makes sure we take at least min-count - # items off the input queue ( later ) - if task.min_count is not None and 0 < count < task.min_count: - count = task.min_count - # END handle min-count - - numchunks = 1 - chunksize = count - remainder = 0 - - # we need the count set for this - can't chunk up unlimited items - # In serial mode we could do this by checking for empty input channels, - # but in dispatch mode its impossible ( == not easily possible ) - # Only try it if we have enough demand - if task.max_chunksize and count > task.max_chunksize: - numchunks = count / task.max_chunksize - chunksize = task.max_chunksize - remainder = count - (numchunks * chunksize) - # END handle chunking - - # the following loops are kind of unrolled - code duplication - # should make things execute faster. Putting the if statements - # into the loop would be less code, but ... slower - print count, numchunks, chunksize, remainder, task._out_wc.size() - if self._workers: - # respect the chunk size, and split the task up if we want - # to process too much. This can be defined per task - queue = self._queue - if numchunks > 1: - for i in xrange(numchunks): - queue.put((task.process, chunksize)) - # END for each chunk to put - else: - queue.put((task.process, chunksize)) - # END try efficient looping - - if remainder: - queue.put((task.process, remainder)) - # END handle chunksize - else: - # no workers, so we have to do the work ourselves - if numchunks > 1: - for i in xrange(numchunks): - task.process(chunksize) - # END for each chunk to put - else: - task.process(chunksize) - # END try efficient looping - - if remainder: - task.process(remainder) - # END handle chunksize - # END handle serial mode - # END handle queuing - - # always walk the whole graph, we want to find consumed tasks - return True def _prepare_channel_read(self, task, count): """Process the tasks which depend on the given one to be sure the input @@ -201,7 +149,98 @@ class ThreadPool(object): Tasks which are not done will be put onto the queue for processing, which is fine as we walked them depth-first.""" - self._tasks.visit_input_inclusive_depth_first(task, lambda n: self._queue_feeder_visitor(n, count)) + dfirst_tasks = list() + # for the walk, we must make sure the ordering does not change + # Note: the result of this could be cached + self._tasks.visit_input_inclusive_depth_first(task, lambda n: dfirst_tasks.append(n)) + + # check the min count on all involved tasks, and be sure that we don't + # have any task which produces less than the maximum min-count of all tasks + # The actual_count is used when chunking tasks up for the queue, whereas + # the count is usued to determine whether we still have enough output + # on the queue, checking qsize ( ->revise ) + # ABTRACT: If T depends on T-1, and the client wants 1 item, T produces + # at least 10, T-1 goes with 1, then T will block after 1 item, which + # is read by the client. On the next read of 1 item, we would find T's + # queue empty and put in another 10, which could put another thread into + # blocking state. T-1 produces one more item, which is consumed right away + # by the two threads running T. Although this works in the end, it leaves + # many threads blocking and waiting for input, which is not desired. + # Setting the min-count to the max of the mincount of all tasks assures + # we have enough items for all. + # Addition: in serial mode, we would enter a deadlock if one task would + # ever wait for items ! + actual_count = count + min_counts = (((t.min_count is not None and t.min_count) or count) for t in dfirst_tasks) + min_count = reduce(lambda m1, m2: max(m1, m2), min_counts) + if 0 < count < min_count: + actual_count = min_count + # END set actual count + + # the list includes our tasks - the first one to evaluate first, the + # requested one last + for task in dfirst_tasks: + if task.error() or task.is_done(): + self._consumed_tasks.put(task) + continue + # END skip processing + + # if the task does not have the required output on its queue, schedule + # it for processing. If we should process all, we don't care about the + # amount as it should process until its all done. + # NOTE: revise this for multi-tasking - checking qsize doesnt work there ! + if count < 1 or task._out_wc.size() < count: + # but we continue to use the actual count to produce the output + numchunks = 1 + chunksize = actual_count + remainder = 0 + + # we need the count set for this - can't chunk up unlimited items + # In serial mode we could do this by checking for empty input channels, + # but in dispatch mode its impossible ( == not easily possible ) + # Only try it if we have enough demand + if task.max_chunksize and actual_count > task.max_chunksize: + numchunks = actual_count / task.max_chunksize + chunksize = task.max_chunksize + remainder = actual_count - (numchunks * chunksize) + # END handle chunking + + # the following loops are kind of unrolled - code duplication + # should make things execute faster. Putting the if statements + # into the loop would be less code, but ... slower + print actual_count, numchunks, chunksize, remainder, task._out_wc.size() + if self._workers: + # respect the chunk size, and split the task up if we want + # to process too much. This can be defined per task + queue = self._queue + if numchunks > 1: + for i in xrange(numchunks): + queue.put((task.process, chunksize)) + # END for each chunk to put + else: + queue.put((task.process, chunksize)) + # END try efficient looping + + if remainder: + queue.put((task.process, remainder)) + # END handle chunksize + else: + # no workers, so we have to do the work ourselves + if numchunks > 1: + for i in xrange(numchunks): + task.process(chunksize) + # END for each chunk to put + else: + task.process(chunksize) + # END try efficient looping + + if remainder: + task.process(remainder) + # END handle chunksize + # END handle serial mode + # END handle queuing + # END for each task to process + def _post_channel_read(self, task): """Called after we processed a read to cleanup""" @@ -250,7 +289,7 @@ class ThreadPool(object): cur_count = len(self._workers) if cur_count < size: for i in range(size - cur_count): - worker = WorkerThread(self._queue) + worker = self.WorkerCls(self._queue) worker.start() self._workers.append(worker) # END for each new worker to create @@ -291,7 +330,12 @@ class ThreadPool(object): # keep its input nodes as we check whether they were orphaned in_tasks = task.in_nodes task.set_done() - self._tasks.del_node(task) + self._taskgraph_lock.acquire() + try: + self._tasks.del_node(task) + finally: + self._taskgraph_lock.release() + # END locked deletion for t in in_tasks: self._del_task_if_orphaned(t) @@ -314,16 +358,33 @@ class ThreadPool(object): task._pool_ref = weakref.ref(self) # END init input channel task - self._tasks.add_node(task) + self._taskgraph_lock.acquire() + try: + self._tasks.add_node(task) + finally: + self._taskgraph_lock.release() + # END sync task addition # If the input channel is one of our read channels, we add the relation if has_input_channel: ic = task.in_rc if isinstance(ic, RPoolChannel) and ic._pool is self: - self._tasks.add_edge(ic._task, task) + self._taskgraph_lock.acquire() + try: + self._tasks.add_edge(ic._task, task) + finally: + self._taskgraph_lock.release() + # END handle edge-adding # END add task relation # END handle input channels for connections return rc #} END interface + + +class ThreadPool(Pool): + """A pool using threads as worker""" + WorkerCls = WorkerThread + LockCls = Lock + TaskQueueCls = Queue -- cgit v1.2.3 From 8c3c271b0d6b5f56b86e3f177caf3e916b509b52 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Mon, 7 Jun 2010 13:05:35 +0200 Subject: Added task order cache, and a lock to prevent us walking the graph while changing tasks Now processing more items to test performance, in dual-threaded mode as well, and its rather bad, have to figure out the reason for this, probably gil, but queues could help --- lib/git/async/pool.py | 26 +++++++++++++++++++++----- lib/git/async/thread.py | 3 ++- 2 files changed, 23 insertions(+), 6 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 009096f2..26a6a182 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -107,6 +107,7 @@ class Pool(object): '_consumed_tasks', # a queue with tasks that are done or had an error '_workers', # list of worker threads '_queue', # master queue for tasks + '_taskorder_cache', # map task id -> ordered dependent tasks '_taskgraph_lock', # lock for accessing the task graph ) @@ -130,6 +131,7 @@ class Pool(object): self._workers = list() self._queue = self.TaskQueueCls() self._taskgraph_lock = self.LockCls() + self._taskorder_cache = dict() self.set_size(size) def __del__(self): @@ -149,10 +151,21 @@ class Pool(object): Tasks which are not done will be put onto the queue for processing, which is fine as we walked them depth-first.""" - dfirst_tasks = list() - # for the walk, we must make sure the ordering does not change - # Note: the result of this could be cached - self._tasks.visit_input_inclusive_depth_first(task, lambda n: dfirst_tasks.append(n)) + # for the walk, we must make sure the ordering does not change. Even + # when accessing the cache, as it is related to graph changes + self._taskgraph_lock.acquire() + try: + try: + dfirst_tasks = self._taskorder_cache[id(task)] + except KeyError: + # have to retrieve the list from the graph + dfirst_tasks = list() + self._tasks.visit_input_inclusive_depth_first(task, lambda n: dfirst_tasks.append(n)) + self._taskorder_cache[id(task)] = dfirst_tasks + # END handle cached order retrieval + finally: + self._taskgraph_lock.release() + # END handle locking # check the min count on all involved tasks, and be sure that we don't # have any task which produces less than the maximum min-count of all tasks @@ -208,7 +221,8 @@ class Pool(object): # the following loops are kind of unrolled - code duplication # should make things execute faster. Putting the if statements # into the loop would be less code, but ... slower - print actual_count, numchunks, chunksize, remainder, task._out_wc.size() + # DEBUG + # print actual_count, numchunks, chunksize, remainder, task._out_wc.size() if self._workers: # respect the chunk size, and split the task up if we want # to process too much. This can be defined per task @@ -332,6 +346,7 @@ class Pool(object): task.set_done() self._taskgraph_lock.acquire() try: + self._taskorder_cache.clear() self._tasks.del_node(task) finally: self._taskgraph_lock.release() @@ -360,6 +375,7 @@ class Pool(object): self._taskgraph_lock.acquire() try: + self._taskorder_cache.clear() self._tasks.add_node(task) finally: self._taskgraph_lock.release() diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py index 2ed002e9..f875f094 100644 --- a/lib/git/async/thread.py +++ b/lib/git/async/thread.py @@ -141,7 +141,8 @@ class WorkerThread(TerminatableThread): # needing exactly one function, and one arg assert len(tasktuple) == 2, "Need tuple of function, arg - it could be more flexible, but its reduced to what we need" routine, arg = tasktuple - + # DEBUG + # print "%s: picked up: %s(%s)" % (self.name, routine, arg) try: rval = None if inspect.ismethod(routine): -- cgit v1.2.3 From edd9e23c766cfd51b3a6f6eee5aac0b791ef2fd0 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Mon, 7 Jun 2010 17:16:48 +0200 Subject: added high-speed locking facilities, allowing our Queue to be faster, at least in tests, and with multiple threads. There is still an sync bug in regard to closed channels to be fixed, as the Task.set_done handling is incorrecft --- lib/git/async/pool.py | 241 +++++++++++++++++++++++++++++++++++++----------- lib/git/async/thread.py | 3 - 2 files changed, 186 insertions(+), 58 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 26a6a182..30291835 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -1,8 +1,16 @@ """Implementation of a thread-pool working with channels""" from thread import WorkerThread -from threading import Lock + +from threading import ( + Lock, + _Condition, + _sleep, + _time, + ) + from task import InputChannelTask from Queue import Queue, Empty +from collections import deque from graph import ( Graph, @@ -18,6 +26,96 @@ import weakref import sys +#{ Utilities + +class SyncQueue(deque): + """Adapter to allow using a deque like a queue, without locking""" + def get(self, block=True, timeout=None): + try: + return self.pop() + except IndexError: + raise Empty + # END raise empty + + def empty(self): + return len(self) == 0 + + put = deque.append + + +class HSCondition(_Condition): + """An attempt to make conditions less blocking, which gains performance + in return by sleeping less""" + delay = 0.00002 # reduces wait times, but increases overhead + + def wait(self, timeout=None): + waiter = Lock() + waiter.acquire() + self.__dict__['_Condition__waiters'].append(waiter) + saved_state = self._release_save() + try: # restore state no matter what (e.g., KeyboardInterrupt) + if timeout is None: + waiter.acquire() + else: + # Balancing act: We can't afford a pure busy loop, so we + # have to sleep; but if we sleep the whole timeout time, + # we'll be unresponsive. The scheme here sleeps very + # little at first, longer as time goes on, but never longer + # than 20 times per second (or the timeout time remaining). + endtime = _time() + timeout + delay = self.delay + acquire = waiter.acquire + while True: + gotit = acquire(0) + if gotit: + break + remaining = endtime - _time() + if remaining <= 0: + break + delay = min(delay * 2, remaining, .05) + _sleep(delay) + # END endless loop + if not gotit: + try: + self.__dict__['_Condition__waiters'].remove(waiter) + except ValueError: + pass + # END didn't ever get it + finally: + self._acquire_restore(saved_state) + + def notify(self, n=1): + __waiters = self.__dict__['_Condition__waiters'] + if not __waiters: + return + if n == 1: + __waiters[0].release() + try: + __waiters.pop(0) + except IndexError: + pass + else: + waiters = __waiters[:n] + for waiter in waiters: + waiter.release() + try: + __waiters.remove(waiter) + except ValueError: + pass + # END handle n = 1 case faster + +class PerfQueue(Queue): + """A queue using different condition objects to gain multithreading performance""" + def __init__(self, maxsize=0): + Queue.__init__(self, maxsize) + + self.not_empty = HSCondition(self.mutex) + self.not_full = HSCondition(self.mutex) + self.all_tasks_done = HSCondition(self.mutex) + + +#} END utilities + class RPoolChannel(RChannel): """ A read-only pool channel may not be wrapped or derived from, but it provides slots to call before and after an item is to be read. @@ -49,7 +147,7 @@ class RPoolChannel(RChannel): returns a possibly changed item list. If it raises, the exception will be propagated. If a function is not provided, the call is effectively uninstalled.""" self._post_cb = fun - + def read(self, count=0, block=True, timeout=None): """Read an item that was processed by one of our threads :note: Triggers task dependency handling needed to provide the necessary @@ -58,8 +156,18 @@ class RPoolChannel(RChannel): self._pre_cb() # END pre callback + # if we have count items, don't do any queue preparation - if someone + # depletes the queue in the meanwhile, the channel will close and + # we will unblock naturally + have_enough = False + if count > 0: + # explicitly > count, as we want a certain safe range + have_enough = self._wc._queue.qsize() > count + # END risky game + ########## prepare ############################## - self._pool._prepare_channel_read(self._task, count) + if not have_enough: + self._pool._prepare_channel_read(self._task, count) ######### read data ###### @@ -127,9 +235,9 @@ class Pool(object): def __init__(self, size=0): self._tasks = Graph() - self._consumed_tasks = Queue() # make sure its threadsafe + self._consumed_tasks = None self._workers = list() - self._queue = self.TaskQueueCls() + self._queue = SyncQueue() # start with a sync queue self._taskgraph_lock = self.LockCls() self._taskorder_cache = dict() self.set_size(size) @@ -201,58 +309,60 @@ class Pool(object): # if the task does not have the required output on its queue, schedule # it for processing. If we should process all, we don't care about the # amount as it should process until its all done. - # NOTE: revise this for multi-tasking - checking qsize doesnt work there ! - if count < 1 or task._out_wc.size() < count: - # but we continue to use the actual count to produce the output - numchunks = 1 - chunksize = actual_count - remainder = 0 - - # we need the count set for this - can't chunk up unlimited items - # In serial mode we could do this by checking for empty input channels, - # but in dispatch mode its impossible ( == not easily possible ) - # Only try it if we have enough demand - if task.max_chunksize and actual_count > task.max_chunksize: - numchunks = actual_count / task.max_chunksize - chunksize = task.max_chunksize - remainder = actual_count - (numchunks * chunksize) - # END handle chunking - - # the following loops are kind of unrolled - code duplication - # should make things execute faster. Putting the if statements - # into the loop would be less code, but ... slower - # DEBUG - # print actual_count, numchunks, chunksize, remainder, task._out_wc.size() - if self._workers: - # respect the chunk size, and split the task up if we want - # to process too much. This can be defined per task - queue = self._queue - if numchunks > 1: - for i in xrange(numchunks): - queue.put((task.process, chunksize)) - # END for each chunk to put - else: + #if count > 1 and task._out_wc.size() >= count: + # continue + # END skip if we have enough + + # but use the actual count to produce the output, we may produce + # more than requested + numchunks = 1 + chunksize = actual_count + remainder = 0 + + # we need the count set for this - can't chunk up unlimited items + # In serial mode we could do this by checking for empty input channels, + # but in dispatch mode its impossible ( == not easily possible ) + # Only try it if we have enough demand + if task.max_chunksize and actual_count > task.max_chunksize: + numchunks = actual_count / task.max_chunksize + chunksize = task.max_chunksize + remainder = actual_count - (numchunks * chunksize) + # END handle chunking + + # the following loops are kind of unrolled - code duplication + # should make things execute faster. Putting the if statements + # into the loop would be less code, but ... slower + # DEBUG + # print actual_count, numchunks, chunksize, remainder, task._out_wc.size() + if self._workers: + # respect the chunk size, and split the task up if we want + # to process too much. This can be defined per task + queue = self._queue + if numchunks > 1: + for i in xrange(numchunks): queue.put((task.process, chunksize)) - # END try efficient looping - - if remainder: - queue.put((task.process, remainder)) - # END handle chunksize + # END for each chunk to put else: - # no workers, so we have to do the work ourselves - if numchunks > 1: - for i in xrange(numchunks): - task.process(chunksize) - # END for each chunk to put - else: + queue.put((task.process, chunksize)) + # END try efficient looping + + if remainder: + queue.put((task.process, remainder)) + # END handle chunksize + else: + # no workers, so we have to do the work ourselves + if numchunks > 1: + for i in xrange(numchunks): task.process(chunksize) - # END try efficient looping - - if remainder: - task.process(remainder) - # END handle chunksize - # END handle serial mode - # END handle queuing + # END for each chunk to put + else: + task.process(chunksize) + # END try efficient looping + + if remainder: + task.process(remainder) + # END handle chunksize + # END handle serial mode # END for each task to process @@ -297,11 +407,22 @@ class Pool(object): otherwise the work will be distributed among the given amount of threads :note: currently NOT threadsafe !""" + assert size > -1, "Size cannot be negative" + # either start new threads, or kill existing ones. # If we end up with no threads, we process the remaining chunks on the queue # ourselves cur_count = len(self._workers) if cur_count < size: + # make sure we have a real queue, and can store our consumed tasks properly + if not isinstance(self._queue, self.TaskQueueCls): + if self._queue is not None and not self._queue.empty(): + raise AssertionError("Expected empty queue when switching the queue type") + # END safety check + self._queue = self.TaskQueueCls() + self._consumed_tasks = Queue() + # END init queue + for i in range(size - cur_count): worker = self.WorkerCls(self._queue) worker.start() @@ -323,6 +444,16 @@ class Pool(object): except Queue.Empty: continue # END while there are tasks on the queue + + # use a serial queue, its faster + if not isinstance(self._queue, SyncQueue): + self._queue = SyncQueue() + # END handle queue type + + if self._consumed_tasks and not self._consumed_tasks.empty(): + self._post_channel_read(self._consumed_tasks.pop()) + # END assure consumed tasks are empty + self._consumed_tasks = SyncQueue() # END process queue return self @@ -403,4 +534,4 @@ class ThreadPool(Pool): """A pool using threads as worker""" WorkerCls = WorkerThread LockCls = Lock - TaskQueueCls = Queue + TaskQueueCls = PerfQueue diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py index f875f094..f7f0c978 100644 --- a/lib/git/async/thread.py +++ b/lib/git/async/thread.py @@ -15,9 +15,6 @@ def do_terminate_threads(whitelist=list()): continue if whitelist and t not in whitelist: continue - if isinstance(t, WorkerThread): - t.inq.put(t.quit) - # END worker special handling t.stop_and_join() # END for each thread -- cgit v1.2.3 From 583cd8807259a69fc01874b798f657c1f9ab7828 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Mon, 7 Jun 2010 19:12:44 +0200 Subject: Moved pool utilities into util module, fixed critical issue that caused havok - lets call this a safe-state --- lib/git/async/pool.py | 154 +++++++++++------------------------------------- lib/git/async/task.py | 26 +++++++- lib/git/async/thread.py | 16 ++--- lib/git/async/util.py | 106 +++++++++++++++++++++++++++++++++ 4 files changed, 176 insertions(+), 126 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 30291835..227cabfc 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -1,121 +1,28 @@ """Implementation of a thread-pool working with channels""" from thread import WorkerThread +from threading import Lock -from threading import ( - Lock, - _Condition, - _sleep, - _time, +from util import ( + SyncQueue, + AsyncQueue, ) from task import InputChannelTask -from Queue import Queue, Empty -from collections import deque - -from graph import ( - Graph, +from Queue import ( + Queue, + Empty ) +from graph import Graph from channel import ( Channel, WChannel, RChannel ) -import weakref import sys -#{ Utilities - -class SyncQueue(deque): - """Adapter to allow using a deque like a queue, without locking""" - def get(self, block=True, timeout=None): - try: - return self.pop() - except IndexError: - raise Empty - # END raise empty - - def empty(self): - return len(self) == 0 - - put = deque.append - - -class HSCondition(_Condition): - """An attempt to make conditions less blocking, which gains performance - in return by sleeping less""" - delay = 0.00002 # reduces wait times, but increases overhead - - def wait(self, timeout=None): - waiter = Lock() - waiter.acquire() - self.__dict__['_Condition__waiters'].append(waiter) - saved_state = self._release_save() - try: # restore state no matter what (e.g., KeyboardInterrupt) - if timeout is None: - waiter.acquire() - else: - # Balancing act: We can't afford a pure busy loop, so we - # have to sleep; but if we sleep the whole timeout time, - # we'll be unresponsive. The scheme here sleeps very - # little at first, longer as time goes on, but never longer - # than 20 times per second (or the timeout time remaining). - endtime = _time() + timeout - delay = self.delay - acquire = waiter.acquire - while True: - gotit = acquire(0) - if gotit: - break - remaining = endtime - _time() - if remaining <= 0: - break - delay = min(delay * 2, remaining, .05) - _sleep(delay) - # END endless loop - if not gotit: - try: - self.__dict__['_Condition__waiters'].remove(waiter) - except ValueError: - pass - # END didn't ever get it - finally: - self._acquire_restore(saved_state) - - def notify(self, n=1): - __waiters = self.__dict__['_Condition__waiters'] - if not __waiters: - return - if n == 1: - __waiters[0].release() - try: - __waiters.pop(0) - except IndexError: - pass - else: - waiters = __waiters[:n] - for waiter in waiters: - waiter.release() - try: - __waiters.remove(waiter) - except ValueError: - pass - # END handle n = 1 case faster - -class PerfQueue(Queue): - """A queue using different condition objects to gain multithreading performance""" - def __init__(self, maxsize=0): - Queue.__init__(self, maxsize) - - self.not_empty = HSCondition(self.mutex) - self.not_full = HSCondition(self.mutex) - self.all_tasks_done = HSCondition(self.mutex) - - -#} END utilities - class RPoolChannel(RChannel): """ A read-only pool channel may not be wrapped or derived from, but it provides slots to call before and after an item is to be read. @@ -237,7 +144,7 @@ class Pool(object): self._tasks = Graph() self._consumed_tasks = None self._workers = list() - self._queue = SyncQueue() # start with a sync queue + self._queue = self.TaskQueueCls() self._taskgraph_lock = self.LockCls() self._taskorder_cache = dict() self.set_size(size) @@ -375,7 +282,10 @@ class Pool(object): self._consumed_tasks.put(task) # END handle consumption - # delete consumed tasks to cleanup + self._handle_consumed_tasks() + + def _handle_consumed_tasks(self): + """Remove all consumed tasks from our queue by deleting them""" try: while True: ct = self._consumed_tasks.get(False) @@ -384,7 +294,7 @@ class Pool(object): except Empty: pass # END pop queue empty - + def _del_task_if_orphaned(self, task): """Check the task, and delete it if it is orphaned""" if sys.getrefcount(task._out_wc) < 3: @@ -415,11 +325,7 @@ class Pool(object): cur_count = len(self._workers) if cur_count < size: # make sure we have a real queue, and can store our consumed tasks properly - if not isinstance(self._queue, self.TaskQueueCls): - if self._queue is not None and not self._queue.empty(): - raise AssertionError("Expected empty queue when switching the queue type") - # END safety check - self._queue = self.TaskQueueCls() + if not isinstance(self._consumed_tasks, self.TaskQueueCls): self._consumed_tasks = Queue() # END init queue @@ -445,13 +351,8 @@ class Pool(object): continue # END while there are tasks on the queue - # use a serial queue, its faster - if not isinstance(self._queue, SyncQueue): - self._queue = SyncQueue() - # END handle queue type - if self._consumed_tasks and not self._consumed_tasks.empty(): - self._post_channel_read(self._consumed_tasks.pop()) + self._handle_consumed_tasks() # END assure consumed tasks are empty self._consumed_tasks = SyncQueue() # END process queue @@ -467,6 +368,8 @@ class Pool(object): output channel is only held by themselves, so no one will ever consume its items. + This method blocks until all tasks to be removed have been processed, if + they are currently being processed. :return: self""" # now delete our actual node - must set it done os it closes its channels. # Otherwise further reads of output tasks will block. @@ -478,6 +381,21 @@ class Pool(object): self._taskgraph_lock.acquire() try: self._taskorder_cache.clear() + # before we can delete the task, make sure its write channel + # is closed, otherwise people might still be waiting for its result. + # If a channel is not closed, this could also mean its not yet fully + # processed, but more importantly, there must be no task being processed + # right now. + # TODO: figure this out + for worker in self._workers: + r = worker.routine() + if r and r.im_self is task: + raise NotImplementedError("todo") + # END handle running task + # END check for in-progress routine + + # its done, close the channel for writing + task.close() self._tasks.del_node(task) finally: self._taskgraph_lock.release() @@ -497,11 +415,11 @@ class Pool(object): # create a write channel for it wc, rc = Channel() rc = RPoolChannel(wc, task, self) - task._out_wc = wc + task.set_wc(wc) has_input_channel = isinstance(task, InputChannelTask) if has_input_channel: - task._pool_ref = weakref.ref(self) + task.set_pool(self) # END init input channel task self._taskgraph_lock.acquire() @@ -534,4 +452,4 @@ class ThreadPool(Pool): """A pool using threads as worker""" WorkerCls = WorkerThread LockCls = Lock - TaskQueueCls = PerfQueue + TaskQueueCls = AsyncQueue diff --git a/lib/git/async/task.py b/lib/git/async/task.py index 3137746c..f106c381 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -1,5 +1,7 @@ from graph import Node + import threading +import weakref import new class OutputChannelTask(Node): @@ -17,6 +19,7 @@ class OutputChannelTask(Node): __slots__ = ( '_read', # method to yield items to process '_out_wc', # output write channel '_exc', # exception caught + '_done', # True if we are done 'fun', # function to call with items read 'min_count', # minimum amount of items to produce, None means no override 'max_chunksize', # maximium amount of items to process per process call @@ -28,6 +31,7 @@ class OutputChannelTask(Node): self._read = None # to be set by subclasss self._out_wc = None # to be set later self._exc = None + self._done = False self.fun = fun self.min_count = None self.max_chunksize = 0 # note set @@ -35,12 +39,28 @@ class OutputChannelTask(Node): def is_done(self): """:return: True if we are finished processing""" - return self._out_wc.closed + return self._done def set_done(self): """Set ourselves to being done, has we have completed the processing""" + self._done = True + self.close() + + def set_wc(self, wc): + """Set the write channel to the given one + :note: resets it done state in order to allow proper queue handling""" + self._done = False + self._out_wc = wc + + def close(self): + """A closed task will close its channel to assure the readers will wake up + :note: its safe to call this method multiple times""" self._out_wc.close() + def is_closed(self): + """:return: True if the task's write channel is closed""" + return self._out_wc.closed + def error(self): """:return: Exception caught during last processing or None""" return self._exc @@ -148,5 +168,9 @@ class InputChannelTask(OutputChannelTask): # and call it return OutputChannelTask.process(self, count) + + def set_pool(self, pool): + """Set our pool to the given one, it will be weakref'd""" + self._pool_ref = weakref.ref(pool) #{ Configuration diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py index f7f0c978..4240a664 100644 --- a/lib/git/async/thread.py +++ b/lib/git/async/thread.py @@ -110,7 +110,7 @@ class WorkerThread(TerminatableThread): t[1] = optional, tuple or list of arguments to pass to the routine t[2] = optional, dictionary of keyword arguments to pass to the routine """ - __slots__ = ('inq', 'outq') + __slots__ = ('inq', '_current_routine') # define how often we should check for a shutdown request in case our @@ -120,10 +120,12 @@ class WorkerThread(TerminatableThread): def __init__(self, inq = None): super(WorkerThread, self).__init__() self.inq = inq or Queue.Queue() + self._current_routine = None # routine we execute right now def run(self): """Process input tasks until we receive the quit signal""" while True: + self._current_routine = None if self._should_terminate(): break # END check for stop request @@ -138,8 +140,9 @@ class WorkerThread(TerminatableThread): # needing exactly one function, and one arg assert len(tasktuple) == 2, "Need tuple of function, arg - it could be more flexible, but its reduced to what we need" routine, arg = tasktuple - # DEBUG - # print "%s: picked up: %s(%s)" % (self.name, routine, arg) + + self._current_routine = routine + try: rval = None if inspect.ismethod(routine): @@ -154,16 +157,15 @@ class WorkerThread(TerminatableThread): print "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple)) break # END make routine call - except StopIteration: - break except Exception,e: print "%s: Task %s raised unhandled exception: %s - this really shouldn't happen !" % (self.getName(), str(tasktuple), str(e)) break # abort ... # END routine exception handling # END endless loop - def quit(self): - raise StopIteration + def routine(self): + """:return: routine we are currently executing, or None if we have no task""" + return self._current_routine #} END classes diff --git a/lib/git/async/util.py b/lib/git/async/util.py index dabd8a42..432d1736 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -1,8 +1,23 @@ """Module with utilities related to async operations""" +from threading import ( + Lock, + _Condition, + _sleep, + _time, + ) + +from Queue import ( + Queue, + Empty, + ) + +from collections import deque import sys import os +#{ Routines + def cpu_count(): """:return:number of CPUs in the system :note: inspired by multiprocessing""" @@ -22,3 +37,94 @@ def cpu_count(): raise NotImplementedError('cannot determine number of cpus') return num + +#} END routines + + +class SyncQueue(deque): + """Adapter to allow using a deque like a queue, without locking""" + def get(self, block=True, timeout=None): + try: + return self.pop() + except IndexError: + raise Empty + # END raise empty + + def empty(self): + return len(self) == 0 + + put = deque.append + + +class HSCondition(_Condition): + """An attempt to make conditions less blocking, which gains performance + in return by sleeping less""" + delay = 0.00002 # reduces wait times, but increases overhead + + def wait(self, timeout=None): + waiter = Lock() + waiter.acquire() + self.__dict__['_Condition__waiters'].append(waiter) + saved_state = self._release_save() + try: # restore state no matter what (e.g., KeyboardInterrupt) + if timeout is None: + waiter.acquire() + else: + # Balancing act: We can't afford a pure busy loop, so we + # have to sleep; but if we sleep the whole timeout time, + # we'll be unresponsive. The scheme here sleeps very + # little at first, longer as time goes on, but never longer + # than 20 times per second (or the timeout time remaining). + endtime = _time() + timeout + delay = self.delay + acquire = waiter.acquire + while True: + gotit = acquire(0) + if gotit: + break + remaining = endtime - _time() + if remaining <= 0: + break + delay = min(delay * 2, remaining, .05) + _sleep(delay) + # END endless loop + if not gotit: + try: + self.__dict__['_Condition__waiters'].remove(waiter) + except ValueError: + pass + # END didn't ever get it + finally: + self._acquire_restore(saved_state) + + def notify(self, n=1): + __waiters = self.__dict__['_Condition__waiters'] + if not __waiters: + return + if n == 1: + __waiters[0].release() + try: + __waiters.pop(0) + except IndexError: + pass + else: + waiters = __waiters[:n] + for waiter in waiters: + waiter.release() + try: + __waiters.remove(waiter) + except ValueError: + pass + # END handle n = 1 case faster + +class AsyncQueue(Queue): + """A queue using different condition objects to gain multithreading performance""" + def __init__(self, maxsize=0): + Queue.__init__(self, maxsize) + + self.not_empty = HSCondition(self.mutex) + self.not_full = HSCondition(self.mutex) + self.all_tasks_done = HSCondition(self.mutex) + + +#} END utilities -- cgit v1.2.3 From 654e54d200135e665e07e9f0097d913a77f169da Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Mon, 7 Jun 2010 20:01:02 +0200 Subject: task: Fixed incorrect handling of channel closure. Performance is alright for up to 2 threads, but 4 are killing the queue --- lib/git/async/pool.py | 40 ++++++++++++++++++---------------------- lib/git/async/task.py | 24 +++++++++++++++++++++--- 2 files changed, 39 insertions(+), 25 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 227cabfc..3de98777 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -21,6 +21,7 @@ from channel import ( ) import sys +from time import sleep class RPoolChannel(RChannel): @@ -371,32 +372,27 @@ class Pool(object): This method blocks until all tasks to be removed have been processed, if they are currently being processed. :return: self""" - # now delete our actual node - must set it done os it closes its channels. - # Otherwise further reads of output tasks will block. - # Actually they may still block if anyone wants to read all ... without - # a timeout - # keep its input nodes as we check whether they were orphaned - in_tasks = task.in_nodes - task.set_done() self._taskgraph_lock.acquire() try: - self._taskorder_cache.clear() - # before we can delete the task, make sure its write channel - # is closed, otherwise people might still be waiting for its result. - # If a channel is not closed, this could also mean its not yet fully - # processed, but more importantly, there must be no task being processed - # right now. - # TODO: figure this out - for worker in self._workers: - r = worker.routine() - if r and r.im_self is task: - raise NotImplementedError("todo") - # END handle running task - # END check for in-progress routine + # it can be that the task is already deleted, but its chunk was on the + # queue until now, so its marked consumed again + if not task in self._tasks.nodes: + return self + # END early abort + + # the task we are currently deleting could also be processed by + # a thread right now. We don't care about it as its taking care about + # its write channel itself, and sends everything it can to it. + # For it it doesn't matter that its not part of our task graph anymore. + + # now delete our actual node - be sure its done to prevent further + # processing in case there are still client reads on their way. + task.set_done() - # its done, close the channel for writing - task.close() + # keep its input nodes as we check whether they were orphaned + in_tasks = task.in_nodes self._tasks.del_node(task) + self._taskorder_cache.clear() finally: self._taskgraph_lock.release() # END locked deletion diff --git a/lib/git/async/task.py b/lib/git/async/task.py index f106c381..b282e371 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -2,6 +2,7 @@ from graph import Node import threading import weakref +import sys import new class OutputChannelTask(Node): @@ -44,7 +45,6 @@ class OutputChannelTask(Node): def set_done(self): """Set ourselves to being done, has we have completed the processing""" self._done = True - self.close() def set_wc(self, wc): """Set the write channel to the given one @@ -69,17 +69,25 @@ class OutputChannelTask(Node): """Process count items and send the result individually to the output channel""" items = self._read(count) try: + # increase the ref-count - we use this to determine whether anyone else + # is currently handling our output channel. As this method runs asynchronously, + # we have to make sure that the channel is closed by the last finishing task, + # which is not necessarily the one which determines that he is done + # as he couldn't read anymore items. + # The refcount will be dropped in the moment we get out of here. + wc = self._out_wc if self.apply_single: for item in items: - self._out_wc.write(self.fun(item)) + wc.write(self.fun(item)) # END for each item else: - self._out_wc.write(self.fun(items)) + wc.write(self.fun(items)) # END handle single apply except Exception, e: self._exc = e self.set_done() # END exception handling + del(wc) # if we didn't get all demanded items, which is also the case if count is 0 # we have depleted the input channel and are done @@ -89,6 +97,16 @@ class OutputChannelTask(Node): if not items or len(items) != count: self.set_done() # END handle done state + + # If we appear to be the only one left with our output channel, and are + # closed ( this could have been set in another thread as well ), make + # sure to close the output channel. + # The count is: 1 = wc itself, 2 = first reader channel, and we have only + # one, 3 is ours + x for every thread having its copy on the stack + # + 1 for the instance we provide to refcount + if self.is_done() and sys.getrefcount(self._out_wc) < 5: + self.close() + # END handle channel closure #{ Configuration -- cgit v1.2.3 From be06e87433685b5ea9cfcc131ab89c56cf8292f2 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Mon, 7 Jun 2010 22:00:47 +0200 Subject: improved testing to test the actual async handling of the pool. there are still inconsistencies that need to be fixed, but it already improved, especially the 4-thread performance which now is as fast as the dual-threaded performance --- lib/git/async/pool.py | 18 +++++++++++++++--- lib/git/async/task.py | 11 +++++++++++ lib/git/async/util.py | 6 ++++-- 3 files changed, 30 insertions(+), 5 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 3de98777..19fc9f6e 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -67,10 +67,20 @@ class RPoolChannel(RChannel): # if we have count items, don't do any queue preparation - if someone # depletes the queue in the meanwhile, the channel will close and # we will unblock naturally + # PROBLEM: If there are multiple consumer of this channel, we might + # run out of items without being replenished == block forever in the + # worst case. task.min_count could have triggered to produce more ... + # usually per read with n items, we put n items on to the queue, + # so we wouldn't check this + # Even if we have just one consumer ( we could determine that with + # the reference count ), it could be that in one moment we don't yet + # have an item, but its currently being produced by some worker. + # This is why we: + # * make no assumptions if there are multiple consumers + # * have_enough = False if count > 0: - # explicitly > count, as we want a certain safe range - have_enough = self._wc._queue.qsize() > count + have_enough = self._wc._queue.qsize() >= count # END risky game ########## prepare ############################## @@ -78,9 +88,11 @@ class RPoolChannel(RChannel): self._pool._prepare_channel_read(self._task, count) - ######### read data ###### + ####### read data ######## + ########################## # read actual items, tasks were setup to put their output into our channel ( as well ) items = RChannel.read(self, count, block, timeout) + ########################## if self._post_cb: items = self._post_cb(items) diff --git a/lib/git/async/task.py b/lib/git/async/task.py index b282e371..4e8aef54 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -178,6 +178,17 @@ class InputChannelTask(OutputChannelTask): # make sure we don't trigger the pool if we read from a pool channel which # belongs to our own pool. Channels from different pools are fine though, # there we want to trigger its computation + # PROBLEM: if the user keeps an end, but decides to put the same end into + # a task of this pool, then all items might deplete without new ones being + # produced, causing a deadlock. Just triggering the pool would be better, + # but cost's more, unnecessarily if there is just one consumer, which is + # the user. + # * could encode usage in the channel type, and fail if the refcount on + # the read-pool channel is too high + # * maybe keep track of the elements that are requested or in-production + # for each task, which would allow to precisely determine whether + # the pool as to be triggered, and bail out early. Problem would + # be the if isinstance(self._in_rc, RPoolChannel) and self._in_rc._pool is self._pool_ref(): self._read = self._in_rc._read diff --git a/lib/git/async/util.py b/lib/git/async/util.py index 432d1736..85d44694 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -59,7 +59,7 @@ class SyncQueue(deque): class HSCondition(_Condition): """An attempt to make conditions less blocking, which gains performance in return by sleeping less""" - delay = 0.00002 # reduces wait times, but increases overhead + delay = 0.00005 # reduces wait times, but increases overhead def wait(self, timeout=None): waiter = Lock() @@ -85,7 +85,9 @@ class HSCondition(_Condition): remaining = endtime - _time() if remaining <= 0: break - delay = min(delay * 2, remaining, .05) + # this makes 4 threads working as good as two, but of course + # it causes more frequent micro-sleeping + #delay = min(delay * 2, remaining, .05) _sleep(delay) # END endless loop if not gotit: -- cgit v1.2.3 From def0f73989047c4ddf9b11da05ad2c9c8e387331 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Mon, 7 Jun 2010 23:20:37 +0200 Subject: introduced a new counter keeping track of the scheduled tasks - this prevent unnecessary tasks to be scheduled as we keep track of how many items will be produced for the task at hand. This introduces additional locking, but performns well in multithreaded mode. Performance of the master queue is still a huge issue, its currently the limiting factor, as bypassing the master queue in serial moode gives 15x performance, wich is what I would need --- lib/git/async/pool.py | 15 +++++++++++++-- lib/git/async/task.py | 47 +++++++++++++++++++++++++++++++++++++++++++++-- lib/git/async/util.py | 2 +- 3 files changed, 59 insertions(+), 5 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 19fc9f6e..4c97feb0 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -80,12 +80,13 @@ class RPoolChannel(RChannel): # * have_enough = False if count > 0: - have_enough = self._wc._queue.qsize() >= count - # END risky game + have_enough = self._task.scheduled_item_count() >= count or self._wc._queue.qsize() >= count + # END ########## prepare ############################## if not have_enough: self._pool._prepare_channel_read(self._task, count) + # END prepare pool scheduling ####### read data ######## @@ -260,26 +261,33 @@ class Pool(object): queue = self._queue if numchunks > 1: for i in xrange(numchunks): + # schedule them as early as we know about them + task.add_scheduled_items(chunksize) queue.put((task.process, chunksize)) # END for each chunk to put else: + task.add_scheduled_items(chunksize) queue.put((task.process, chunksize)) # END try efficient looping if remainder: + task.add_scheduled_items(remainder) queue.put((task.process, remainder)) # END handle chunksize else: # no workers, so we have to do the work ourselves if numchunks > 1: for i in xrange(numchunks): + task.add_scheduled_items(chunksize) task.process(chunksize) # END for each chunk to put else: + task.add_scheduled_items(chunksize) task.process(chunksize) # END try efficient looping if remainder: + task.add_scheduled_items(remainder) task.process(remainder) # END handle chunksize # END handle serial mode @@ -348,6 +356,9 @@ class Pool(object): self._workers.append(worker) # END for each new worker to create elif cur_count > size: + # we can safely increase the size, even from serial mode, as we would + # only be able to do this if the serial ( sync ) mode finished processing. + # Just adding more workers is not a problem at all. del_count = cur_count - size for i in range(del_count): self._workers[i].stop_and_join() diff --git a/lib/git/async/task.py b/lib/git/async/task.py index 4e8aef54..cf486f48 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -21,6 +21,8 @@ class OutputChannelTask(Node): '_out_wc', # output write channel '_exc', # exception caught '_done', # True if we are done + '_scheduled_items', # amount of scheduled items that will be processed in total + '_slock', # lock for scheduled items 'fun', # function to call with items read 'min_count', # minimum amount of items to produce, None means no override 'max_chunksize', # maximium amount of items to process per process call @@ -33,6 +35,8 @@ class OutputChannelTask(Node): self._out_wc = None # to be set later self._exc = None self._done = False + self._scheduled_items = 0 + self._slock = threading.Lock() self.fun = fun self.min_count = None self.max_chunksize = 0 # note set @@ -50,6 +54,7 @@ class OutputChannelTask(Node): """Set the write channel to the given one :note: resets it done state in order to allow proper queue handling""" self._done = False + self._scheduled_items = 0 self._out_wc = wc def close(self): @@ -65,6 +70,21 @@ class OutputChannelTask(Node): """:return: Exception caught during last processing or None""" return self._exc + def add_scheduled_items(self, count): + """Add the given amount of scheduled items to this task""" + self._slock.acquire() + self._scheduled_items += count + self._slock.release() + + def scheduled_item_count(self): + """:return: amount of scheduled items for this task""" + self._slock.acquire() + try: + return self._scheduled_items + finally: + self._slock.release() + # END threadsafe return + def process(self, count=0): """Process count items and send the result individually to the output channel""" items = self._read(count) @@ -78,14 +98,33 @@ class OutputChannelTask(Node): wc = self._out_wc if self.apply_single: for item in items: - wc.write(self.fun(item)) + rval = self.fun(item) + # decrement afterwards, the its unscheduled once its produced + self._slock.acquire() + self._scheduled_items -= 1 + self._slock.release() + wc.write(rval) # END for each item else: - wc.write(self.fun(items)) + # shouldn't apply single be the default anyway ? + # The task designers should chunk them up in advance + rvals = self.fun(items) + self._slock.acquire() + self._scheduled_items -= len(items) + self._slock.release() + for rval in rvals: + wc.write(rval) # END handle single apply except Exception, e: self._exc = e self.set_done() + # unschedule all, we don't know how many have been produced actually + # but only if we don't apply single please + if not self.apply_single: + self._slock.acquire() + self._scheduled_items -= len(items) + self._slock.release() + # END unschedule all # END exception handling del(wc) @@ -189,6 +228,10 @@ class InputChannelTask(OutputChannelTask): # for each task, which would allow to precisely determine whether # the pool as to be triggered, and bail out early. Problem would # be the + # * Perhaps one shouldn't seek the perfect solution , but instead + # document whats working and what not, or under which conditions. + # The whole system is simple, but gets more complicated the + # smarter it wants to be. if isinstance(self._in_rc, RPoolChannel) and self._in_rc._pool is self._pool_ref(): self._read = self._in_rc._read diff --git a/lib/git/async/util.py b/lib/git/async/util.py index 85d44694..6bd8a4e8 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -59,7 +59,7 @@ class SyncQueue(deque): class HSCondition(_Condition): """An attempt to make conditions less blocking, which gains performance in return by sleeping less""" - delay = 0.00005 # reduces wait times, but increases overhead + delay = 0.0001 # reduces wait times, but increases overhead def wait(self, timeout=None): waiter = Lock() -- cgit v1.2.3 From e825f8b69760e269218b1bf1991018baf3c16b04 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Mon, 7 Jun 2010 23:38:08 +0200 Subject: Channel now uses the AsyncQueue, boosting performance by factor 4, its a start --- lib/git/async/channel.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index 2add9478..2d5ab79c 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -1,10 +1,10 @@ """Contains a queue based channel implementation""" from Queue import ( - Queue, Empty, Full ) +from util import AsyncQueue from time import time import sys @@ -43,7 +43,7 @@ class WChannel(Channel): """initialize this instance, able to hold max_items at once Write calls will block if the channel is full, until someone reads from it""" self._closed = False - self._queue = Queue(max_items) + self._queue = AsyncQueue(max_items) #{ Interface -- cgit v1.2.3 From 898d47d1711accdfded8ee470520fdb96fb12d46 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Mon, 7 Jun 2010 23:47:06 +0200 Subject: Task scheduled items lock now uses a dummy lock in serial mode, improving its performance considerably. Channels now use the AsyncQueue, boosting their throughput to about 5k items / s - this is something one can work with, considering the runtime of each item should be large enough to keep the threads busy. This could be a basis, further testing needed --- lib/git/async/pool.py | 6 ++++++ lib/git/async/util.py | 14 +++++++++++++- 2 files changed, 19 insertions(+), 1 deletion(-) (limited to 'lib/git/async') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 4c97feb0..d6b5711d 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -5,6 +5,7 @@ from threading import Lock from util import ( SyncQueue, AsyncQueue, + DummyLock ) from task import InputChannelTask @@ -462,6 +463,11 @@ class Pool(object): # END add task relation # END handle input channels for connections + # fix locks - in serial mode, the task does not need real locks + if self.size() == 0: + task._slock = DummyLock() + # END improve locks + return rc #} END interface diff --git a/lib/git/async/util.py b/lib/git/async/util.py index 6bd8a4e8..55766579 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -41,6 +41,18 @@ def cpu_count(): #} END routines + +class DummyLock(object): + """An object providing a do-nothing lock interface for use in sync mode""" + __slots__ = tuple() + + def acquire(self): + pass + + def release(self): + pass + + class SyncQueue(deque): """Adapter to allow using a deque like a queue, without locking""" def get(self, block=True, timeout=None): @@ -59,7 +71,7 @@ class SyncQueue(deque): class HSCondition(_Condition): """An attempt to make conditions less blocking, which gains performance in return by sleeping less""" - delay = 0.0001 # reduces wait times, but increases overhead + delay = 0.00002 # reduces wait times, but increases overhead def wait(self, timeout=None): waiter = Lock() -- cgit v1.2.3 From 5d996892ac76199886ba3e2754ff9c9fac2456d6 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 00:32:33 +0200 Subject: test implementation of async-queue with everything stripped from it that didn't seem necessary - its a failure, something is wrong - performance not much better than the original one, its depending on the condition performance actually, which I don't get faster --- lib/git/async/util.py | 53 ++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 48 insertions(+), 5 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/util.py b/lib/git/async/util.py index 55766579..e3556c05 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -133,12 +133,55 @@ class HSCondition(_Condition): class AsyncQueue(Queue): """A queue using different condition objects to gain multithreading performance""" + __slots__ = ('mutex', 'not_empty', 'queue') + def __init__(self, maxsize=0): - Queue.__init__(self, maxsize) - + self.queue = deque() + self.mutex = Lock() self.not_empty = HSCondition(self.mutex) - self.not_full = HSCondition(self.mutex) - self.all_tasks_done = HSCondition(self.mutex) - + def qsize(self): + self.mutex.acquire() + try: + return len(self.queue) + finally: + self.mutex.release() + + def empty(self): + self.mutex.acquire() + try: + return not len(self.queue) + finally: + self.mutex.release() + + def put(self, item, block=True, timeout=None): + self.mutex.acquire() + self.queue.append(item) + self.mutex.release() + self.not_empty.notify() + + def get(self, block=True, timeout=None): + self.not_empty.acquire() + q = self.queue + try: + if not block: + if not len(q): + raise Empty + elif timeout is None: + while not len(q): + self.not_empty.wait() + elif timeout < 0: + raise ValueError("'timeout' must be a positive number") + else: + endtime = _time() + timeout + while not len(q): + remaining = endtime - _time() + if remaining <= 0.0: + raise Empty + self.not_empty.wait(remaining) + return q.popleft() + finally: + self.not_empty.release() + + #} END utilities -- cgit v1.2.3 From 09c3f39ceb545e1198ad7a3f470d4ec896ce1add Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 10:45:14 +0200 Subject: both versions of the async queue still have trouble in certain situations, at least with my totally overwritten version of the condition - the previous one was somewhat more stable it seems. Nonetheless, this is the fastest version so far --- lib/git/async/util.py | 77 +++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 56 insertions(+), 21 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/util.py b/lib/git/async/util.py index e3556c05..fb63ccaa 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -2,6 +2,8 @@ from threading import ( Lock, + current_thread, + _allocate_lock, _Condition, _sleep, _time, @@ -57,7 +59,7 @@ class SyncQueue(deque): """Adapter to allow using a deque like a queue, without locking""" def get(self, block=True, timeout=None): try: - return self.pop() + return self.popleft() except IndexError: raise Empty # END raise empty @@ -67,26 +69,45 @@ class SyncQueue(deque): put = deque.append - -class HSCondition(_Condition): + +class HSCondition(object): """An attempt to make conditions less blocking, which gains performance in return by sleeping less""" + __slots__ = ("acquire", "release", "_lock", '_waiters') delay = 0.00002 # reduces wait times, but increases overhead + def __init__(self, lock=None): + if lock is None: + lock = Lock() + self._lock = lock + self.acquire = lock.acquire + self.release = lock.release + self._waiters = list() + + def __release(self): + return self._lock.release() + + def __acquire(self, block=None): + if block is None: + self._lock.acquire() + else: + return self._lock.acquire(block) + def wait(self, timeout=None): - waiter = Lock() - waiter.acquire() - self.__dict__['_Condition__waiters'].append(waiter) - saved_state = self._release_save() + waiter = _allocate_lock() + waiter.acquire() # get it the first time, no blocking + self._waiters.append(waiter) + + # in the momemnt we release our lock, someone else might actually resume + self.release() try: # restore state no matter what (e.g., KeyboardInterrupt) + # now we block, as we hold the lock already if timeout is None: waiter.acquire() else: - # Balancing act: We can't afford a pure busy loop, so we - # have to sleep; but if we sleep the whole timeout time, - # we'll be unresponsive. The scheme here sleeps very - # little at first, longer as time goes on, but never longer - # than 20 times per second (or the timeout time remaining). + # Balancing act: We can't afford a pure busy loop, because of the + # GIL, so we have to sleep + # We try to sleep only tiny amounts of time though to be very responsive endtime = _time() + timeout delay = self.delay acquire = waiter.acquire @@ -104,34 +125,48 @@ class HSCondition(_Condition): # END endless loop if not gotit: try: - self.__dict__['_Condition__waiters'].remove(waiter) + self._waiters.remove(waiter) except ValueError: pass # END didn't ever get it finally: - self._acquire_restore(saved_state) + # reacquire the lock + self.acquire() def notify(self, n=1): - __waiters = self.__dict__['_Condition__waiters'] - if not __waiters: + if not self._waiters: return + waiters = self._waiters if n == 1: - __waiters[0].release() + waiters[0].release() try: - __waiters.pop(0) + waiters.pop(0) except IndexError: pass else: - waiters = __waiters[:n] - for waiter in waiters: + for waiter in waiters[:n]: waiter.release() try: - __waiters.remove(waiter) + waiters.remove(waiter) except ValueError: pass # END handle n = 1 case faster + def notify_all(self): + self.notify(len(self._waiters)) + + class AsyncQueue(Queue): + """A queue using different condition objects to gain multithreading performance""" + def __init__(self, maxsize=0): + Queue.__init__(self, maxsize) + + self.not_empty = HSCondition(self.mutex) + self.not_full = HSCondition(self.mutex) + self.all_tasks_done = HSCondition(self.mutex) + + +class _AsyncQueue(Queue): """A queue using different condition objects to gain multithreading performance""" __slots__ = ('mutex', 'not_empty', 'queue') -- cgit v1.2.3 From 3776f7a766851058f6435b9f606b16766425d7ca Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 13:24:44 +0200 Subject: The new channeldesign actually works, but it also shows that its located at the wrong spot. The channel is nothing more than an adapter allowing to read multiple items from a thread-safe queue, the queue itself though must be 'closable' for writing, or needs something like a writable flag. --- lib/git/async/channel.py | 108 ++++++++++++++++++++++++----------------------- lib/git/async/task.py | 6 +-- lib/git/async/util.py | 32 ++++++++------ 3 files changed, 79 insertions(+), 67 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index 2d5ab79c..655024fe 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -4,7 +4,11 @@ from Queue import ( Full ) -from util import AsyncQueue +from util import ( + AsyncQueue, + DummyLock + ) + from time import time import sys @@ -23,12 +27,9 @@ class Channel(object): def __new__(cls, *args): if cls is Channel: - max_items = 0 - if len(args) == 1: - max_items = args[0] - if len(args) > 1: - raise ValueError("Specify not more than the number of items the channel should take") - wc = WChannel(max_items) + if len(args) > 0: + raise ValueError("Cannot take any arguments when creating a new channel") + wc = WChannel() rc = RChannel(wc) return wc, rc # END constructor mode @@ -39,11 +40,11 @@ class WChannel(Channel): """The write end of a channel""" __slots__ = ('_closed', '_queue') - def __init__(self, max_items=0): + def __init__(self): """initialize this instance, able to hold max_items at once Write calls will block if the channel is full, until someone reads from it""" self._closed = False - self._queue = AsyncQueue(max_items) + self._queue = AsyncQueue() #{ Interface @@ -74,7 +75,21 @@ class WChannel(Channel): def close(self): """Close the channel. Multiple close calls on a closed channel are no an error""" + mutex = self._queue.mutex + mutex.acquire() + # this is atomic already, due to the GIL - no need to get the queue's mutex + print "channel.close()" self._closed = True + # now make sure that the people waiting for an item are released now + # As we it could be that some readers are already on their way to initiate + # a blocking get, we must make sure that locks never block before that happens + + # now we are the only one accessing the queue, so change it + self._queue.mutex = DummyLock() + print self._queue.not_empty._waiters + self._queue.not_empty.notify_all() + print self._queue.not_empty._waiters + mutex.release() @property def closed(self): @@ -134,58 +149,47 @@ class RChannel(Channel): pass # END handle exceptions else: - # if we have really bad timing, the source of the channel - # marks itself closed, but before setting it, the thread - # switches to us. We read it, read False, and try to fetch - # something, and never return. The whole closed channel thing - # is not atomic ( of course ) - # This is why we never block for long, to get a chance to recheck - # for closed channels. - # We blend this into the timeout of the user - ourtimeout = 0.25 # the smaller, the more responsive, but the slower - wc = self._wc - timeout = (timeout is None and sys.maxint) or timeout # make sure we can compute with it - assert timeout != 0.0, "shouldn't block if timeout is 0" # okay safe - if timeout and ourtimeout > timeout: - ourtimeout = timeout - # END setup timeout - # to get everything into one loop, we set the count accordingly if count == 0: count = sys.maxint # END handle count + endtime = sys.maxint # allows timeout for whole operation + if timeout is not None: + endtime = time() + timeout + # could be improved by a separate: no-endtime branch, saving the time calls for i in xrange(count): - have_timeout = False - st = time() - while True: + try: + print "about to read", i, count, block, timeout + out.append(queue.get(block, timeout)) + print "got one" + except Empty: + pass + # END ignore empty + + # if we have been unblocked because the closed state changed + # in the meanwhile, stop trying + # NOTE: must NOT cache _wc + if self._wc.closed: + # its racing time - all threads waiting for the queue + # are awake now, and we actually can't be sure its empty + # Hence we pop it empty without blocking, getting as much + # as we can. This effectively lets us race ( with mutexes ) + # of the other threads. + print "stopped because it was closed" try: - if wc.closed: - have_timeout = True - # its about the 'in the meanwhile' :) - get everything - # we can in non-blocking mode. This will raise - try: - while True: - out.append(queue.get(False)) - # END until it raises Empty - except Empty: - break - # END finally, out of here - # END don't continue on closed channels - - # END abort reading if it was closed ( in the meanwhile ) - out.append(queue.get(block, ourtimeout)) - break # breakout right away + while True: + out.append(queue.get(False)) + # END pop it empty except Empty: - if timeout - (time() - st) <= 0: - # hitting timeout - have_timeout = True - break - # END abort if the user wants no more time spent here - # END handle timeout - # END endless timer loop - if have_timeout: + pass + # END ignore emptyness, we have all + break + # END handle cloased + + if time() >= endtime: + break # END stop on timeout # END for each item # END handle blocking diff --git a/lib/git/async/task.py b/lib/git/async/task.py index cf486f48..ce701c86 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -140,10 +140,10 @@ class OutputChannelTask(Node): # If we appear to be the only one left with our output channel, and are # closed ( this could have been set in another thread as well ), make # sure to close the output channel. - # The count is: 1 = wc itself, 2 = first reader channel, and we have only - # one, 3 is ours + x for every thread having its copy on the stack + # The count is: 1 = wc itself, 2 = first reader channel, + x for every + # thread having its copy on the stack # + 1 for the instance we provide to refcount - if self.is_done() and sys.getrefcount(self._out_wc) < 5: + if self.is_done() and sys.getrefcount(self._out_wc) < 4: self.close() # END handle channel closure #{ Configuration diff --git a/lib/git/async/util.py b/lib/git/async/util.py index fb63ccaa..01073f6d 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -73,21 +73,22 @@ class SyncQueue(deque): class HSCondition(object): """An attempt to make conditions less blocking, which gains performance in return by sleeping less""" - __slots__ = ("acquire", "release", "_lock", '_waiters') + # __slots__ = ("acquire", "release", "_lock", '_waiters') + __slots__ = ("_lock", '_waiters') delay = 0.00002 # reduces wait times, but increases overhead def __init__(self, lock=None): if lock is None: lock = Lock() self._lock = lock - self.acquire = lock.acquire - self.release = lock.release + #self.acquire = lock.acquire + #self.release = lock.release self._waiters = list() - def __release(self): + def release(self): return self._lock.release() - def __acquire(self, block=None): + def acquire(self, block=None): if block is None: self._lock.acquire() else: @@ -156,7 +157,7 @@ class HSCondition(object): self.notify(len(self._waiters)) -class AsyncQueue(Queue): +class _AsyncQueue(Queue): """A queue using different condition objects to gain multithreading performance""" def __init__(self, maxsize=0): Queue.__init__(self, maxsize) @@ -166,7 +167,7 @@ class AsyncQueue(Queue): self.all_tasks_done = HSCondition(self.mutex) -class _AsyncQueue(Queue): +class AsyncQueue(Queue): """A queue using different condition objects to gain multithreading performance""" __slots__ = ('mutex', 'not_empty', 'queue') @@ -194,9 +195,9 @@ class _AsyncQueue(Queue): self.queue.append(item) self.mutex.release() self.not_empty.notify() - + def get(self, block=True, timeout=None): - self.not_empty.acquire() + self.not_empty.acquire() # == self.mutex.acquire in that case q = self.queue try: if not block: @@ -205,16 +206,23 @@ class _AsyncQueue(Queue): elif timeout is None: while not len(q): self.not_empty.wait() - elif timeout < 0: - raise ValueError("'timeout' must be a positive number") else: + print "with timeout", timeout + import traceback + traceback.print_stack() endtime = _time() + timeout while not len(q): remaining = endtime - _time() if remaining <= 0.0: raise Empty self.not_empty.wait(remaining) - return q.popleft() + # END handle block + # can happen if someone else woke us up + try: + return q.popleft() + except IndexError: + raise Empty + # END handle unblocking reason finally: self.not_empty.release() -- cgit v1.2.3 From 53152a824f5186452504f0b68306d10ebebee416 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 14:23:58 +0200 Subject: queue: adjusted queue to be closable ( without own testing yet, except for the pool which runs it ) - its not yet stable, but should be solvable. --- lib/git/async/channel.py | 44 ++++++++++--------------------- lib/git/async/util.py | 68 ++++++++++++++++++++++++++++++++++-------------- 2 files changed, 62 insertions(+), 50 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index 655024fe..08323582 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -6,7 +6,6 @@ from Queue import ( from util import ( AsyncQueue, - DummyLock ) from time import time @@ -56,15 +55,13 @@ class WChannel(Channel): channel :param timeout: timeout in seconds for blocking calls. :raise IOError: when writing into closed file - :raise EOFError: when writing into a non-blocking full channel - :note: may block if the channel has a limited capacity""" - if self._closed: - raise IOError("Cannot write to a closed channel") - + :raise EOFError: when writing into a non-blocking full channel""" + # let the queue handle the 'closed' attribute, we write much more often + # to an open channel than to a closed one, saving a few cycles try: self._queue.put(item, block, timeout) - except Full: - raise EOFError("Capacity of the channel was exeeded") + except ReadOnly: + raise IOError("Cannot write to a closed channel") # END exception handling def size(self): @@ -75,21 +72,10 @@ class WChannel(Channel): def close(self): """Close the channel. Multiple close calls on a closed channel are no an error""" - mutex = self._queue.mutex - mutex.acquire() - # this is atomic already, due to the GIL - no need to get the queue's mutex - print "channel.close()" + # yes, close it a little too early, better than having anyone put + # additional items self._closed = True - # now make sure that the people waiting for an item are released now - # As we it could be that some readers are already on their way to initiate - # a blocking get, we must make sure that locks never block before that happens - - # now we are the only one accessing the queue, so change it - self._queue.mutex = DummyLock() - print self._queue.not_empty._waiters - self._queue.not_empty.notify_all() - print self._queue.not_empty._waiters - mutex.release() + self._queue.set_writable(False) @property def closed(self): @@ -124,6 +110,7 @@ class RChannel(Channel): If count was < 1, a list with all items that could be read will be returned.""" # if the channel is closed for writing, we never block + # NOTE: is handled by the queue if self._wc.closed or timeout == 0: block = False @@ -160,9 +147,7 @@ class RChannel(Channel): # could be improved by a separate: no-endtime branch, saving the time calls for i in xrange(count): try: - print "about to read", i, count, block, timeout out.append(queue.get(block, timeout)) - print "got one" except Empty: pass # END ignore empty @@ -176,7 +161,6 @@ class RChannel(Channel): # Hence we pop it empty without blocking, getting as much # as we can. This effectively lets us race ( with mutexes ) # of the other threads. - print "stopped because it was closed" try: while True: out.append(queue.get(False)) @@ -186,11 +170,11 @@ class RChannel(Channel): # END ignore emptyness, we have all break - # END handle cloased - - if time() >= endtime: - break - # END stop on timeout + # END handle channel cloased + + if time() >= endtime: + break + # END stop operation on timeout # END for each item # END handle blocking return out diff --git a/lib/git/async/util.py b/lib/git/async/util.py index 01073f6d..51219cc4 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -166,15 +166,21 @@ class _AsyncQueue(Queue): self.not_full = HSCondition(self.mutex) self.all_tasks_done = HSCondition(self.mutex) - + +class ReadOnly(Exception): + """Thrown when trying to write to a read-only queue""" + class AsyncQueue(Queue): - """A queue using different condition objects to gain multithreading performance""" - __slots__ = ('mutex', 'not_empty', 'queue') + """A queue using different condition objects to gain multithreading performance. + Additionally it has a threadsafe writable flag, which will alert all readers + that there is nothing more to get here.""" + __slots__ = ('mutex', 'not_empty', 'queue', '_writable') def __init__(self, maxsize=0): self.queue = deque() self.mutex = Lock() self.not_empty = HSCondition(self.mutex) + self._writable = True def qsize(self): self.mutex.acquire() @@ -183,6 +189,29 @@ class AsyncQueue(Queue): finally: self.mutex.release() + def writable(self): + self.mutex.acquire() + try: + return self._writable + finally: + self.mutex.release() + + def set_writable(self, state): + """Set the writable flag of this queue to True or False + :return: The previous state""" + self.mutex.acquire() + try: + old = self._writable + self._writable = state + return old + finally: + # if we won't receive anymore items, inform the getters + if not state: + self.not_empty.notify_all() + # END tell everyone + self.mutex.release() + # END handle locking + def empty(self): self.mutex.acquire() try: @@ -192,6 +221,9 @@ class AsyncQueue(Queue): def put(self, item, block=True, timeout=None): self.mutex.acquire() + if not self._writable: + raise ReadOnly + # END handle read-only self.queue.append(item) self.mutex.release() self.not_empty.notify() @@ -200,24 +232,20 @@ class AsyncQueue(Queue): self.not_empty.acquire() # == self.mutex.acquire in that case q = self.queue try: - if not block: - if not len(q): - raise Empty - elif timeout is None: - while not len(q): - self.not_empty.wait() - else: - print "with timeout", timeout - import traceback - traceback.print_stack() - endtime = _time() + timeout - while not len(q): - remaining = endtime - _time() - if remaining <= 0.0: - raise Empty - self.not_empty.wait(remaining) + if block: + if timeout is None: + while not len(q) and self._writable: + self.not_empty.wait() + else: + endtime = _time() + timeout + while not len(q) and self._writable: + remaining = endtime - _time() + if remaining <= 0.0: + raise Empty + self.not_empty.wait(remaining) + # END handle timeout mode # END handle block - # can happen if someone else woke us up + # can happen if we woke up because we are not writable anymore try: return q.popleft() except IndexError: -- cgit v1.2.3 From 619c11787742ce00a0ee8f841cec075897873c79 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 16:47:48 +0200 Subject: Its getting better already - intermediate commit before further chaning the task class --- lib/git/async/channel.py | 35 ++++++++++++++++++++--------------- lib/git/async/pool.py | 7 ++++--- lib/git/async/task.py | 2 ++ lib/git/async/thread.py | 3 ++- lib/git/async/util.py | 25 ++++++++++--------------- 5 files changed, 38 insertions(+), 34 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index 08323582..5c52b1dc 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -6,6 +6,7 @@ from Queue import ( from util import ( AsyncQueue, + ReadOnly ) from time import time @@ -59,6 +60,7 @@ class WChannel(Channel): # let the queue handle the 'closed' attribute, we write much more often # to an open channel than to a closed one, saving a few cycles try: + print "putting item", item, id(self._queue.queue) self._queue.put(item, block, timeout) except ReadOnly: raise IOError("Cannot write to a closed channel") @@ -74,6 +76,7 @@ class WChannel(Channel): an error""" # yes, close it a little too early, better than having anyone put # additional items + print "closing channel", self self._closed = True self._queue.set_writable(False) @@ -102,7 +105,7 @@ class RChannel(Channel): :param count: given amount of items to read. If < 1, all items will be read :param block: if True, the call will block until an item is available :param timeout: if positive and block is True, it will block only for the - given amount of seconds. + given amount of seconds, returning the items it received so far. :return: single item in a list if count is 1, or a list of count items. If the channel was empty and count was 1, an empty list will be returned. If count was greater 1, a list with less than count items will be @@ -149,27 +152,29 @@ class RChannel(Channel): try: out.append(queue.get(block, timeout)) except Empty: - pass + # here we are only if there is nothing on the queue, + # and if we are blocking. If we are not blocking, this + # indiccates that the queue was set unwritable in the meanwhile. + # hence we can abort now to prevent reading (possibly) forever + # Besides, this is racy as all threads will rip on the channel + # without waiting until its empty + if not block: + break # END ignore empty # if we have been unblocked because the closed state changed # in the meanwhile, stop trying # NOTE: must NOT cache _wc if self._wc.closed: - # its racing time - all threads waiting for the queue - # are awake now, and we actually can't be sure its empty - # Hence we pop it empty without blocking, getting as much - # as we can. This effectively lets us race ( with mutexes ) - # of the other threads. - try: - while True: - out.append(queue.get(False)) - # END pop it empty - except Empty: - pass - # END ignore emptyness, we have all + # If we were closed, we drop out even if there might still + # be items. Now its time to get these items, according to + # our count. Just switch to unblocking mode. + # If we are to read unlimited items, this would run forever, + # but the EmptyException handler takes care of this + block = False - break + # we don't continue, but let the timer decide whether + # it wants to abort # END handle channel cloased if time() >= endtime: diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index d6b5711d..cf1c2199 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -80,8 +80,8 @@ class RPoolChannel(RChannel): # * make no assumptions if there are multiple consumers # * have_enough = False - if count > 0: - have_enough = self._task.scheduled_item_count() >= count or self._wc._queue.qsize() >= count + #if count > 0: + # have_enough = self._task.scheduled_item_count() >= count or self._wc._queue.qsize() >= count # END ########## prepare ############################## @@ -319,6 +319,7 @@ class Pool(object): def _del_task_if_orphaned(self, task): """Check the task, and delete it if it is orphaned""" + # 1 as its stored on the task, 1 for the getrefcount call if sys.getrefcount(task._out_wc) < 3: self.del_task(task) #} END internal @@ -403,7 +404,7 @@ class Pool(object): if not task in self._tasks.nodes: return self # END early abort - + print "deleting ", id(task) # the task we are currently deleting could also be processed by # a thread right now. We don't care about it as its taking care about # its write channel itself, and sends everything it can to it. diff --git a/lib/git/async/task.py b/lib/git/async/task.py index ce701c86..97521cae 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -88,6 +88,7 @@ class OutputChannelTask(Node): def process(self, count=0): """Process count items and send the result individually to the output channel""" items = self._read(count) + print "task read", len(items) try: # increase the ref-count - we use this to determine whether anyone else # is currently handling our output channel. As this method runs asynchronously, @@ -117,6 +118,7 @@ class OutputChannelTask(Node): # END handle single apply except Exception, e: self._exc = e + print str(e) # TODO: REMOVE DEBUG, or make it use logging self.set_done() # unschedule all, we don't know how many have been produced actually # but only if we don't apply single please diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py index 4240a664..5faad4f8 100644 --- a/lib/git/async/thread.py +++ b/lib/git/async/thread.py @@ -124,6 +124,7 @@ class WorkerThread(TerminatableThread): def run(self): """Process input tasks until we receive the quit signal""" + gettask = self.inq.get while True: self._current_routine = None if self._should_terminate(): @@ -132,7 +133,7 @@ class WorkerThread(TerminatableThread): # don't wait too long, instead check for the termination request more often try: - tasktuple = self.inq.get(True, 1) + tasktuple = gettask(True, 0.25) except Queue.Empty: continue # END get task with timeout diff --git a/lib/git/async/util.py b/lib/git/async/util.py index 51219cc4..6d09de59 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -63,7 +63,7 @@ class SyncQueue(deque): except IndexError: raise Empty # END raise empty - + def empty(self): return len(self) == 0 @@ -86,13 +86,13 @@ class HSCondition(object): self._waiters = list() def release(self): - return self._lock.release() + self._lock.release() def acquire(self, block=None): if block is None: self._lock.acquire() else: - return self._lock.acquire(block) + self._lock.acquire(block) def wait(self, timeout=None): waiter = _allocate_lock() @@ -145,6 +145,7 @@ class HSCondition(object): except IndexError: pass else: + print "notify", waiters, n for waiter in waiters[:n]: waiter.release() try: @@ -156,16 +157,6 @@ class HSCondition(object): def notify_all(self): self.notify(len(self._waiters)) - -class _AsyncQueue(Queue): - """A queue using different condition objects to gain multithreading performance""" - def __init__(self, maxsize=0): - Queue.__init__(self, maxsize) - - self.not_empty = HSCondition(self.mutex) - self.not_full = HSCondition(self.mutex) - self.all_tasks_done = HSCondition(self.mutex) - class ReadOnly(Exception): """Thrown when trying to write to a read-only queue""" @@ -205,11 +196,12 @@ class AsyncQueue(Queue): self._writable = state return old finally: + self.mutex.release() + # if we won't receive anymore items, inform the getters if not state: self.not_empty.notify_all() # END tell everyone - self.mutex.release() # END handle locking def empty(self): @@ -222,6 +214,7 @@ class AsyncQueue(Queue): def put(self, item, block=True, timeout=None): self.mutex.acquire() if not self._writable: + self.mutex.release() raise ReadOnly # END handle read-only self.queue.append(item) @@ -245,7 +238,9 @@ class AsyncQueue(Queue): self.not_empty.wait(remaining) # END handle timeout mode # END handle block - # can happen if we woke up because we are not writable anymore + + # can throw if we woke up because we are not writable anymore + print len(q), id(q), current_thread() try: return q.popleft() except IndexError: -- cgit v1.2.3 From 13dd59ba5b3228820841682b59bad6c22476ff66 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 17:25:43 +0200 Subject: task: now deletes itself once its done - for the test this doesn't change a thing as the task deletes itself too late - its time for a paradigm change, the task should be deleted with its RPoolChannel or explicitly by the user. The test needs to adapt, and shouldn't assume anything unless the RPoolChannel is gone --- lib/git/async/pool.py | 57 ++++++++++++++------------------------------------- lib/git/async/task.py | 22 ++++++++++++++------ 2 files changed, 31 insertions(+), 48 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index cf1c2199..fce5e424 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -78,14 +78,17 @@ class RPoolChannel(RChannel): # have an item, but its currently being produced by some worker. # This is why we: # * make no assumptions if there are multiple consumers - # * - have_enough = False + # * + + # if the user tries to use us to read from a done task, we will never + # compute as all produced items are already in the channel + skip_compute = self._task.is_done() or self._task.error() #if count > 0: - # have_enough = self._task.scheduled_item_count() >= count or self._wc._queue.qsize() >= count + # skip_compute = self._task.scheduled_item_count() >= count or self._wc._queue.qsize() >= count # END ########## prepare ############################## - if not have_enough: + if not skip_compute: self._pool._prepare_channel_read(self._task, count) # END prepare pool scheduling @@ -134,7 +137,6 @@ class Pool(object): used only from the main thread, hence you cannot consume their results from multiple threads unless you use a task for it.""" __slots__ = ( '_tasks', # a graph of tasks - '_consumed_tasks', # a queue with tasks that are done or had an error '_workers', # list of worker threads '_queue', # master queue for tasks '_taskorder_cache', # map task id -> ordered dependent tasks @@ -157,7 +159,6 @@ class Pool(object): def __init__(self, size=0): self._tasks = Graph() - self._consumed_tasks = None self._workers = list() self._queue = self.TaskQueueCls() self._taskgraph_lock = self.LockCls() @@ -224,8 +225,10 @@ class Pool(object): # requested one last for task in dfirst_tasks: if task.error() or task.is_done(): - self._consumed_tasks.put(task) - continue + # in theory, the should never be consumed task in the pool, right ? + # They delete themselves once they are done. + raise AssertionError("Shouldn't have consumed tasks on the pool, they delete themeselves, what happend ?") + #continue # END skip processing # if the task does not have the required output on its queue, schedule @@ -297,26 +300,8 @@ class Pool(object): def _post_channel_read(self, task): """Called after we processed a read to cleanup""" - # check whether we consumed the task, and schedule it for deletion - # This could have happend after the read returned ( even though the pre-read - # checks it as well ) - if task.error() or task.is_done(): - self._consumed_tasks.put(task) - # END handle consumption - - self._handle_consumed_tasks() - - def _handle_consumed_tasks(self): - """Remove all consumed tasks from our queue by deleting them""" - try: - while True: - ct = self._consumed_tasks.get(False) - self.del_task(ct) - # END for each task to delete - except Empty: - pass - # END pop queue empty - + pass + def _del_task_if_orphaned(self, task): """Check the task, and delete it if it is orphaned""" # 1 as its stored on the task, 1 for the getrefcount call @@ -347,11 +332,6 @@ class Pool(object): # ourselves cur_count = len(self._workers) if cur_count < size: - # make sure we have a real queue, and can store our consumed tasks properly - if not isinstance(self._consumed_tasks, self.TaskQueueCls): - self._consumed_tasks = Queue() - # END init queue - for i in range(size - cur_count): worker = self.WorkerCls(self._queue) worker.start() @@ -377,9 +357,6 @@ class Pool(object): continue # END while there are tasks on the queue - if self._consumed_tasks and not self._consumed_tasks.empty(): - self._handle_consumed_tasks() - # END assure consumed tasks are empty self._consumed_tasks = SyncQueue() # END process queue return self @@ -437,11 +414,7 @@ class Pool(object): wc, rc = Channel() rc = RPoolChannel(wc, task, self) task.set_wc(wc) - - has_input_channel = isinstance(task, InputChannelTask) - if has_input_channel: - task.set_pool(self) - # END init input channel task + task.set_pool(self) self._taskgraph_lock.acquire() try: @@ -452,7 +425,7 @@ class Pool(object): # END sync task addition # If the input channel is one of our read channels, we add the relation - if has_input_channel: + if isinstance(task, InputChannelTask): ic = task.in_rc if isinstance(ic, RPoolChannel) and ic._pool is self: self._taskgraph_lock.acquire() diff --git a/lib/git/async/task.py b/lib/git/async/task.py index 97521cae..dc207c33 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -23,6 +23,7 @@ class OutputChannelTask(Node): '_done', # True if we are done '_scheduled_items', # amount of scheduled items that will be processed in total '_slock', # lock for scheduled items + '_pool_ref', # to be set by Pool 'fun', # function to call with items read 'min_count', # minimum amount of items to produce, None means no override 'max_chunksize', # maximium amount of items to process per process call @@ -84,6 +85,10 @@ class OutputChannelTask(Node): finally: self._slock.release() # END threadsafe return + + def set_pool(self, pool): + """Set our pool to the given one, it will be weakref'd""" + self._pool_ref = weakref.ref(pool) def process(self, count=0): """Process count items and send the result individually to the output channel""" @@ -147,6 +152,16 @@ class OutputChannelTask(Node): # + 1 for the instance we provide to refcount if self.is_done() and sys.getrefcount(self._out_wc) < 4: self.close() + # additionally, remove ourselves from the pool, this is thread-safe + # Previously the pool collected done tasks and removed them, + # but this could happen after a read finished, potentially + # leaving them on the queue until the read-handle was dropped. + # This should assure its more in-time. + # I don't like this back-ref. + pool = self._pool_ref() + if pool: + pool.del_task(self) + # END remove ourselves from the pool # END handle channel closure #{ Configuration @@ -204,8 +219,7 @@ class InputChannelTask(OutputChannelTask): For instantiation, it takes all arguments of its base, the first one needs to be the input channel to read from though.""" __slots__ = ( - 'in_rc', # channel to read items from - '_pool_ref' # to be set by Pool + 'in_rc' # channel to read items from ) def __init__(self, in_rc, *args, **kwargs): @@ -242,9 +256,5 @@ class InputChannelTask(OutputChannelTask): # and call it return OutputChannelTask.process(self, count) - - def set_pool(self, pool): - """Set our pool to the given one, it will be weakref'd""" - self._pool_ref = weakref.ref(pool) #{ Configuration -- cgit v1.2.3 From e5c0002d069382db1768349bf0c5ff40aafbf140 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 18:20:12 +0200 Subject: Revised task deletion works well, adjusted test to be creating new tasks all the time instead of reusing its own one, it was somewhat hard to manage its state over time and could cause bugs. It works okay, but it occasionally hangs, it appears to be an empty queue, have to gradually put certain things back in, although in the current mode of operation, it should never have empty queues from the pool to the user --- lib/git/async/pool.py | 25 ++++++++++++++++---- lib/git/async/task.py | 64 +++++++++------------------------------------------ 2 files changed, 31 insertions(+), 58 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index fce5e424..a915f7b0 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -41,8 +41,18 @@ class RPoolChannel(RChannel): def __del__(self): """Assures that our task will be deleted if we were the last reader""" - del(self._wc) # decrement ref-count - self._pool._del_task_if_orphaned(self._task) + del(self._wc) # decrement ref-count early + # now, if this is the last reader to the wc we just handled, there + # is no way anyone will ever read from the task again. If so, + # delete the task in question, it will take care of itself and orphans + # it might leave + # 1 is ourselves, + 1 for the call + 1, and 3 magical ones which + # I can't explain, but appears to be normal in the destructor + # On the caller side, getrefcount returns 2, as expected + if sys.getrefcount(self) < 6: + print "__del__" + self._pool.del_task(self._task) + print "done" def set_pre_cb(self, fun = lambda count: None): """Install a callback to call with the item count to be read before any @@ -105,7 +115,7 @@ class RPoolChannel(RChannel): ####### Finalize ######## self._pool._post_channel_read(self._task) - + return items #{ Internal @@ -227,6 +237,7 @@ class Pool(object): if task.error() or task.is_done(): # in theory, the should never be consumed task in the pool, right ? # They delete themselves once they are done. + # TODO: remove this check for performance later raise AssertionError("Shouldn't have consumed tasks on the pool, they delete themeselves, what happend ?") #continue # END skip processing @@ -363,7 +374,11 @@ class Pool(object): def num_tasks(self): """:return: amount of tasks""" - return len(self._tasks.nodes) + self._taskgraph_lock.acquire() + try: + return len(self._tasks.nodes) + finally: + self._taskgraph_lock.release() def del_task(self, task): """Delete the task @@ -374,6 +389,7 @@ class Pool(object): This method blocks until all tasks to be removed have been processed, if they are currently being processed. :return: self""" + print "del_task: getting lock" self._taskgraph_lock.acquire() try: # it can be that the task is already deleted, but its chunk was on the @@ -414,7 +430,6 @@ class Pool(object): wc, rc = Channel() rc = RPoolChannel(wc, task, self) task.set_wc(wc) - task.set_pool(self) self._taskgraph_lock.acquire() try: diff --git a/lib/git/async/task.py b/lib/git/async/task.py index dc207c33..5edd40bb 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -1,10 +1,11 @@ from graph import Node import threading -import weakref import sys import new +getrefcount = sys.getrefcount + class OutputChannelTask(Node): """Abstracts a named task as part of a set of interdependent tasks, which contains additional information on how the task should be queued and processed. @@ -23,7 +24,6 @@ class OutputChannelTask(Node): '_done', # True if we are done '_scheduled_items', # amount of scheduled items that will be processed in total '_slock', # lock for scheduled items - '_pool_ref', # to be set by Pool 'fun', # function to call with items read 'min_count', # minimum amount of items to produce, None means no override 'max_chunksize', # maximium amount of items to process per process call @@ -54,7 +54,7 @@ class OutputChannelTask(Node): def set_wc(self, wc): """Set the write channel to the given one :note: resets it done state in order to allow proper queue handling""" - self._done = False + self._done = False # TODO : fix this, this is a side-effect self._scheduled_items = 0 self._out_wc = wc @@ -86,10 +86,6 @@ class OutputChannelTask(Node): self._slock.release() # END threadsafe return - def set_pool(self, pool): - """Set our pool to the given one, it will be weakref'd""" - self._pool_ref = weakref.ref(pool) - def process(self, count=0): """Process count items and send the result individually to the output channel""" items = self._read(count) @@ -123,7 +119,7 @@ class OutputChannelTask(Node): # END handle single apply except Exception, e: self._exc = e - print str(e) # TODO: REMOVE DEBUG, or make it use logging + print "task error:", str(e) # TODO: REMOVE DEBUG, or make it use logging self.set_done() # unschedule all, we don't know how many have been produced actually # but only if we don't apply single please @@ -150,18 +146,8 @@ class OutputChannelTask(Node): # The count is: 1 = wc itself, 2 = first reader channel, + x for every # thread having its copy on the stack # + 1 for the instance we provide to refcount - if self.is_done() and sys.getrefcount(self._out_wc) < 4: + if self.is_done() and getrefcount(self._out_wc) < 4: self.close() - # additionally, remove ourselves from the pool, this is thread-safe - # Previously the pool collected done tasks and removed them, - # but this could happen after a read finished, potentially - # leaving them on the queue until the read-handle was dropped. - # This should assure its more in-time. - # I don't like this back-ref. - pool = self._pool_ref() - if pool: - pool.del_task(self) - # END remove ourselves from the pool # END handle channel closure #{ Configuration @@ -218,43 +204,15 @@ class InputChannelTask(OutputChannelTask): """Uses an input channel as source for reading items For instantiation, it takes all arguments of its base, the first one needs to be the input channel to read from though.""" - __slots__ = ( - 'in_rc' # channel to read items from - ) def __init__(self, in_rc, *args, **kwargs): OutputChannelTask.__init__(self, *args, **kwargs) - self._in_rc = in_rc - + self._read = in_rc.read + def process(self, count=1): - """Verify our setup, and do some additional checking, before the - base implementation can permanently perform all operations""" - self._read = self._in_rc.read - # make sure we don't trigger the pool if we read from a pool channel which - # belongs to our own pool. Channels from different pools are fine though, - # there we want to trigger its computation - # PROBLEM: if the user keeps an end, but decides to put the same end into - # a task of this pool, then all items might deplete without new ones being - # produced, causing a deadlock. Just triggering the pool would be better, - # but cost's more, unnecessarily if there is just one consumer, which is - # the user. - # * could encode usage in the channel type, and fail if the refcount on - # the read-pool channel is too high - # * maybe keep track of the elements that are requested or in-production - # for each task, which would allow to precisely determine whether - # the pool as to be triggered, and bail out early. Problem would - # be the - # * Perhaps one shouldn't seek the perfect solution , but instead - # document whats working and what not, or under which conditions. - # The whole system is simple, but gets more complicated the - # smarter it wants to be. - if isinstance(self._in_rc, RPoolChannel) and self._in_rc._pool is self._pool_ref(): - self._read = self._in_rc._read - - # permanently install our base for processing - self.process = new.instancemethod(OutputChannelTask.__dict__['process'], self, type(self)) - - # and call it - return OutputChannelTask.process(self, count) + # for now, just blindly read our input, could trigger a pool, even + # ours, but why not ? It should be able to handle this + # TODO: remove this method + super(InputChannelTask, self).process(count) #{ Configuration -- cgit v1.2.3 From 772b95631916223e472989b43f3a31f61e237f31 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 19:25:33 +0200 Subject: workerthread: adjusted to use a blocking queue, it will receive termination events only with its queue, with boosts performance into brigt green levels --- lib/git/async/channel.py | 3 +-- lib/git/async/pool.py | 56 ++++++++++++++++++++++++++++-------------------- lib/git/async/task.py | 2 +- lib/git/async/thread.py | 24 +++++++++++++++------ lib/git/async/util.py | 1 - 5 files changed, 52 insertions(+), 34 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index 5c52b1dc..c05f7383 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -60,7 +60,6 @@ class WChannel(Channel): # let the queue handle the 'closed' attribute, we write much more often # to an open channel than to a closed one, saving a few cycles try: - print "putting item", item, id(self._queue.queue) self._queue.put(item, block, timeout) except ReadOnly: raise IOError("Cannot write to a closed channel") @@ -76,7 +75,7 @@ class WChannel(Channel): an error""" # yes, close it a little too early, better than having anyone put # additional items - print "closing channel", self + # print "closing channel", self self._closed = True self._queue.set_writable(False) diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index a915f7b0..1767c61c 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -1,5 +1,8 @@ """Implementation of a thread-pool working with channels""" -from thread import WorkerThread +from thread import ( + WorkerThread, + StopProcessing, + ) from threading import Lock from util import ( @@ -147,7 +150,7 @@ class Pool(object): used only from the main thread, hence you cannot consume their results from multiple threads unless you use a task for it.""" __slots__ = ( '_tasks', # a graph of tasks - '_workers', # list of worker threads + '_num_workers', # list of workers '_queue', # master queue for tasks '_taskorder_cache', # map task id -> ordered dependent tasks '_taskgraph_lock', # lock for accessing the task graph @@ -169,7 +172,7 @@ class Pool(object): def __init__(self, size=0): self._tasks = Graph() - self._workers = list() + self._num_workers = 0 self._queue = self.TaskQueueCls() self._taskgraph_lock = self.LockCls() self._taskorder_cache = dict() @@ -270,7 +273,7 @@ class Pool(object): # into the loop would be less code, but ... slower # DEBUG # print actual_count, numchunks, chunksize, remainder, task._out_wc.size() - if self._workers: + if self._num_workers: # respect the chunk size, and split the task up if we want # to process too much. This can be defined per task queue = self._queue @@ -323,7 +326,7 @@ class Pool(object): #{ Interface def size(self): """:return: amount of workers in the pool""" - return len(self._workers) + return self._num_workers def set_size(self, size=0): """Set the amount of workers to use in this pool. When reducing the size, @@ -341,34 +344,41 @@ class Pool(object): # either start new threads, or kill existing ones. # If we end up with no threads, we process the remaining chunks on the queue # ourselves - cur_count = len(self._workers) + cur_count = self._num_workers if cur_count < size: - for i in range(size - cur_count): - worker = self.WorkerCls(self._queue) - worker.start() - self._workers.append(worker) - # END for each new worker to create - elif cur_count > size: # we can safely increase the size, even from serial mode, as we would # only be able to do this if the serial ( sync ) mode finished processing. # Just adding more workers is not a problem at all. + add_count = size - cur_count + for i in range(add_count): + print "Add worker" + self.WorkerCls(self._queue).start() + # END for each new worker to create + self._num_workers += add_count + elif cur_count > size: + # We don't care which thread exactly gets hit by our stop request + # On their way, they will consume remaining tasks, but new ones + # could be added as we speak. del_count = cur_count - size for i in range(del_count): - self._workers[i].stop_and_join() + print "stop worker" + self._queue.put((self.WorkerCls.stop, True)) # arg doesnt matter # END for each thread to stop - del(self._workers[:del_count]) + self._num_workers -= del_count # END handle count if size == 0: - while not self._queue.empty(): - try: - taskmethod, count = self._queue.get(False) - taskmethod(count) - except Queue.Empty: - continue - # END while there are tasks on the queue - - self._consumed_tasks = SyncQueue() + # NOTE: we do not preocess any tasks still on the queue, as we ill + # naturally do that once we read the next time, only on the tasks + # that are actually required. The queue will keep the tasks, + # and once we are deleted, they will vanish without additional + # time spend on them. If there shouldn't be any consumers anyway. + # If we should reenable some workers again, they will continue on the + # remaining tasks, probably with nothing to do. + # We can't clear the task queue if we have removed workers + # as they will receive the termination signal through it, and if + # we had added workers, we wouldn't be here ;). + pass # END process queue return self diff --git a/lib/git/async/task.py b/lib/git/async/task.py index 5edd40bb..f9536a45 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -89,7 +89,7 @@ class OutputChannelTask(Node): def process(self, count=0): """Process count items and send the result individually to the output channel""" items = self._read(count) - print "task read", len(items) + # print "task read", len(items) try: # increase the ref-count - we use this to determine whether anyone else # is currently handling our output channel. As this method runs asynchronously, diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py index 5faad4f8..556b7e92 100644 --- a/lib/git/async/thread.py +++ b/lib/git/async/thread.py @@ -80,6 +80,10 @@ class TerminatableThread(threading.Thread): self._terminated() #} END interface + +class StopProcessing(Exception): + """If thrown in a function processed by a WorkerThread, it will terminate""" + class WorkerThread(TerminatableThread): """ This base allows to call functions on class instances natively. @@ -122,6 +126,11 @@ class WorkerThread(TerminatableThread): self.inq = inq or Queue.Queue() self._current_routine = None # routine we execute right now + @classmethod + def stop(cls, *args): + """If send via the inq of the thread, it will stop once it processed the function""" + raise StopProcessing + def run(self): """Process input tasks until we receive the quit signal""" gettask = self.inq.get @@ -131,12 +140,8 @@ class WorkerThread(TerminatableThread): break # END check for stop request - # don't wait too long, instead check for the termination request more often - try: - tasktuple = gettask(True, 0.25) - except Queue.Empty: - continue - # END get task with timeout + # we wait and block - to terminate, send the 'stop' method + tasktuple = gettask() # needing exactly one function, and one arg assert len(tasktuple) == 2, "Need tuple of function, arg - it could be more flexible, but its reduced to what we need" @@ -158,6 +163,8 @@ class WorkerThread(TerminatableThread): print "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple)) break # END make routine call + except StopProcessing: + break except Exception,e: print "%s: Task %s raised unhandled exception: %s - this really shouldn't happen !" % (self.getName(), str(tasktuple), str(e)) break # abort ... @@ -168,5 +175,8 @@ class WorkerThread(TerminatableThread): """:return: routine we are currently executing, or None if we have no task""" return self._current_routine - + def stop_and_join(self): + """Send stop message to ourselves""" + self.inq.put((self.stop, None)) + super(WorkerThread, self).stop_and_join() #} END classes diff --git a/lib/git/async/util.py b/lib/git/async/util.py index 6d09de59..f3213ed6 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -240,7 +240,6 @@ class AsyncQueue(Queue): # END handle block # can throw if we woke up because we are not writable anymore - print len(q), id(q), current_thread() try: return q.popleft() except IndexError: -- cgit v1.2.3 From 15941ca090a2c3c987324fc911bbc6f89e941c47 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 9 Jun 2010 10:34:12 +0200 Subject: queue: fixed critical bug in the notify method, as it was not at all thread-safe, causing locks to be released multiple times. Now it runs very fast, and very stable apparently. Now its about putting previous features back in, and studying their results, before more complex task graphs can be examined --- lib/git/async/channel.py | 1 - lib/git/async/pool.py | 30 ++++++++++++++--------------- lib/git/async/task.py | 3 +-- lib/git/async/thread.py | 9 ++++++--- lib/git/async/util.py | 50 ++++++++++++++++++++++++++++++------------------ 5 files changed, 52 insertions(+), 41 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index c05f7383..58c35f96 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -75,7 +75,6 @@ class WChannel(Channel): an error""" # yes, close it a little too early, better than having anyone put # additional items - # print "closing channel", self self._closed = True self._queue.set_writable(False) diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 1767c61c..7bddf7da 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -53,9 +53,8 @@ class RPoolChannel(RChannel): # I can't explain, but appears to be normal in the destructor # On the caller side, getrefcount returns 2, as expected if sys.getrefcount(self) < 6: - print "__del__" - self._pool.del_task(self._task) - print "done" + self._pool.remove_task(self._task) + # END handle refcount based removal of task def set_pre_cb(self, fun = lambda count: None): """Install a callback to call with the item count to be read before any @@ -237,12 +236,14 @@ class Pool(object): # the list includes our tasks - the first one to evaluate first, the # requested one last for task in dfirst_tasks: - if task.error() or task.is_done(): + # if task.error() or task.is_done(): # in theory, the should never be consumed task in the pool, right ? - # They delete themselves once they are done. - # TODO: remove this check for performance later - raise AssertionError("Shouldn't have consumed tasks on the pool, they delete themeselves, what happend ?") - #continue + # They delete themselves once they are done. But as we run asynchronously, + # It can be that someone reads, while a task realizes its done, and + # we get here to prepare the read although it already is done. + # Its not a problem though, the task wiill not do anything. + # Hence we don't waste our time with checking for it + # raise AssertionError("Shouldn't have consumed tasks on the pool, they delete themeselves, what happend ?") # END skip processing # if the task does not have the required output on its queue, schedule @@ -316,11 +317,11 @@ class Pool(object): """Called after we processed a read to cleanup""" pass - def _del_task_if_orphaned(self, task): + def _remove_task_if_orphaned(self, task): """Check the task, and delete it if it is orphaned""" # 1 as its stored on the task, 1 for the getrefcount call if sys.getrefcount(task._out_wc) < 3: - self.del_task(task) + self.remove_task(task) #} END internal #{ Interface @@ -351,7 +352,6 @@ class Pool(object): # Just adding more workers is not a problem at all. add_count = size - cur_count for i in range(add_count): - print "Add worker" self.WorkerCls(self._queue).start() # END for each new worker to create self._num_workers += add_count @@ -361,7 +361,6 @@ class Pool(object): # could be added as we speak. del_count = cur_count - size for i in range(del_count): - print "stop worker" self._queue.put((self.WorkerCls.stop, True)) # arg doesnt matter # END for each thread to stop self._num_workers -= del_count @@ -390,7 +389,7 @@ class Pool(object): finally: self._taskgraph_lock.release() - def del_task(self, task): + def remove_task(self, task): """Delete the task Additionally we will remove orphaned tasks, which can be identified if their output channel is only held by themselves, so no one will ever consume @@ -399,7 +398,6 @@ class Pool(object): This method blocks until all tasks to be removed have been processed, if they are currently being processed. :return: self""" - print "del_task: getting lock" self._taskgraph_lock.acquire() try: # it can be that the task is already deleted, but its chunk was on the @@ -407,7 +405,7 @@ class Pool(object): if not task in self._tasks.nodes: return self # END early abort - print "deleting ", id(task) + # the task we are currently deleting could also be processed by # a thread right now. We don't care about it as its taking care about # its write channel itself, and sends everything it can to it. @@ -426,7 +424,7 @@ class Pool(object): # END locked deletion for t in in_tasks: - self._del_task_if_orphaned(t) + self._remove_task_if_orphaned(t) # END handle orphans recursively return self diff --git a/lib/git/async/task.py b/lib/git/async/task.py index f9536a45..f1448f96 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -89,7 +89,6 @@ class OutputChannelTask(Node): def process(self, count=0): """Process count items and send the result individually to the output channel""" items = self._read(count) - # print "task read", len(items) try: # increase the ref-count - we use this to determine whether anyone else # is currently handling our output channel. As this method runs asynchronously, @@ -119,7 +118,7 @@ class OutputChannelTask(Node): # END handle single apply except Exception, e: self._exc = e - print "task error:", str(e) # TODO: REMOVE DEBUG, or make it use logging + print >> sys.stderr, "task error:", str(e) # TODO: REMOVE DEBUG, or make it use logging self.set_done() # unschedule all, we don't know how many have been produced actually # but only if we don't apply single please diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py index 556b7e92..cd964f1c 100644 --- a/lib/git/async/thread.py +++ b/lib/git/async/thread.py @@ -5,6 +5,8 @@ import threading import inspect import Queue +import sys + #{ Decorators def do_terminate_threads(whitelist=list()): @@ -160,14 +162,15 @@ class WorkerThread(TerminatableThread): rval = routine(arg) else: # ignore unknown items - print "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple)) + print >> sys.stderr, "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple)) break # END make routine call except StopProcessing: + print self.name, "stops processing" break except Exception,e: - print "%s: Task %s raised unhandled exception: %s - this really shouldn't happen !" % (self.getName(), str(tasktuple), str(e)) - break # abort ... + print >> sys.stderr, "%s: Task %s raised unhandled exception: %s - this really shouldn't happen !" % (self.getName(), str(tasktuple), str(e)) + continue # just continue # END routine exception handling # END endless loop diff --git a/lib/git/async/util.py b/lib/git/async/util.py index f3213ed6..dff38f58 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -71,18 +71,15 @@ class SyncQueue(deque): class HSCondition(object): - """An attempt to make conditions less blocking, which gains performance - in return by sleeping less""" - # __slots__ = ("acquire", "release", "_lock", '_waiters') + """Cleaned up code of the original condition object in order + to make it run and respond faster.""" __slots__ = ("_lock", '_waiters') - delay = 0.00002 # reduces wait times, but increases overhead + delay = 0.0002 # reduces wait times, but increases overhead def __init__(self, lock=None): if lock is None: lock = Lock() self._lock = lock - #self.acquire = lock.acquire - #self.release = lock.release self._waiters = list() def release(self): @@ -109,6 +106,8 @@ class HSCondition(object): # Balancing act: We can't afford a pure busy loop, because of the # GIL, so we have to sleep # We try to sleep only tiny amounts of time though to be very responsive + # NOTE: this branch is not used by the async system anyway, but + # will be hit when the user reads with timeout endtime = _time() + timeout delay = self.delay acquire = waiter.acquire @@ -133,25 +132,36 @@ class HSCondition(object): finally: # reacquire the lock self.acquire() + # END assure release lock def notify(self, n=1): + """Its vital that this method is threadsafe - to be fast we don'd get a lock, + but instead rely on pseudo-atomic operations that come with the GIL. + Hence we use pop in the n=1 case to be truly atomic. + In the multi-notify case, we acquire a lock just for safety, as otherwise + we might pop too much of someone else notifies n waiters as well, which + would in the worst case lead to double-releases of locks.""" if not self._waiters: return - waiters = self._waiters if n == 1: - waiters[0].release() + # so here we assume this is thead-safe ! It wouldn't be in any other + # language, but python it is. try: - waiters.pop(0) + self._waiters.pop(0).release() except IndexError: pass else: - print "notify", waiters, n - for waiter in waiters[:n]: - waiter.release() - try: - waiters.remove(waiter) - except ValueError: - pass + self.acquire() + # once the waiter resumes, he will want to acquire the lock + # and waits again, but only until we are done, which is important + # to do that in a thread-safe fashion + try: + for i in range(min(n, len(self._waiters))): + self._waiters.pop(0).release() + # END for each waiter to resume + finally: + self.release() + # END assure we release our lock # END handle n = 1 case faster def notify_all(self): @@ -164,7 +174,8 @@ class ReadOnly(Exception): class AsyncQueue(Queue): """A queue using different condition objects to gain multithreading performance. Additionally it has a threadsafe writable flag, which will alert all readers - that there is nothing more to get here.""" + that there is nothing more to get here. + All default-queue code was cleaned up for performance.""" __slots__ = ('mutex', 'not_empty', 'queue', '_writable') def __init__(self, maxsize=0): @@ -222,7 +233,7 @@ class AsyncQueue(Queue): self.not_empty.notify() def get(self, block=True, timeout=None): - self.not_empty.acquire() # == self.mutex.acquire in that case + self.mutex.acquire() q = self.queue try: if block: @@ -246,7 +257,8 @@ class AsyncQueue(Queue): raise Empty # END handle unblocking reason finally: - self.not_empty.release() + self.mutex.release() + # END assure lock is released #} END utilities -- cgit v1.2.3 From f2c8d26d3b25b864ad48e6de018757266b59f708 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 9 Jun 2010 11:28:37 +0200 Subject: thread: fixed initialization problem if an empty iterable was handed in queue: Queue now derives from deque directly, which safes one dict lookup as the queue does not need to be accessed through self anymore pool test improved to better verify threads are started correctly --- lib/git/async/pool.py | 5 ++--- lib/git/async/task.py | 23 ++++++++++++++++++++++- lib/git/async/thread.py | 8 ++++++-- lib/git/async/util.py | 19 ++++++++----------- 4 files changed, 38 insertions(+), 17 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 7bddf7da..7ed6fd8e 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -331,9 +331,8 @@ class Pool(object): def set_size(self, size=0): """Set the amount of workers to use in this pool. When reducing the size, - the call may block as it waits for threads to finish. - When reducing the size to zero, this thread will process all remaining - items on the queue. + threads will continue with their work until they are done before effectively + being removed. :return: self :param size: if 0, the pool will do all work itself in the calling thread, diff --git a/lib/git/async/task.py b/lib/git/async/task.py index f1448f96..dd2bd351 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -1,4 +1,5 @@ from graph import Node +from util import ReadOnly import threading import sys @@ -117,8 +118,9 @@ class OutputChannelTask(Node): wc.write(rval) # END handle single apply except Exception, e: - self._exc = e print >> sys.stderr, "task error:", str(e) # TODO: REMOVE DEBUG, or make it use logging + + # be sure our task is not scheduled again self.set_done() # unschedule all, we don't know how many have been produced actually # but only if we don't apply single please @@ -127,6 +129,25 @@ class OutputChannelTask(Node): self._scheduled_items -= len(items) self._slock.release() # END unschedule all + + # PROBLEM: We have failed to create at least one item, hence its not + # garantueed that enough items will be produced for a possibly blocking + # client on the other end. This is why we have no other choice but + # to close the channel, preventing the possibility of blocking. + # This implies that dependent tasks will go down with us, but that is + # just the right thing to do of course - one loose link in the chain ... + # Other chunks of our kind currently being processed will then + # fail to write to the channel and fail as well + # self.close() + + # If some other chunk of our Task had an error, the channel will be closed + # This is not an issue, just be sure we don't overwrite the original + # exception with the ReadOnly error that would be emitted in that case. + # We imply that ReadOnly is exclusive to us, as it won't be an error + # if the user emits it + if not isinstance(e, ReadOnly): + self._exc = e + # END set error flag # END exception handling del(wc) diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py index cd964f1c..faeda04f 100644 --- a/lib/git/async/thread.py +++ b/lib/git/async/thread.py @@ -125,7 +125,9 @@ class WorkerThread(TerminatableThread): def __init__(self, inq = None): super(WorkerThread, self).__init__() - self.inq = inq or Queue.Queue() + self.inq = inq + if inq is None: + self.inq = Queue.Queue() self._current_routine = None # routine we execute right now @classmethod @@ -135,6 +137,8 @@ class WorkerThread(TerminatableThread): def run(self): """Process input tasks until we receive the quit signal""" + print self.name, "starts processing" # DEBUG + gettask = self.inq.get while True: self._current_routine = None @@ -166,7 +170,7 @@ class WorkerThread(TerminatableThread): break # END make routine call except StopProcessing: - print self.name, "stops processing" + print self.name, "stops processing" # DEBUG break except Exception,e: print >> sys.stderr, "%s: Task %s raised unhandled exception: %s - this really shouldn't happen !" % (self.getName(), str(tasktuple), str(e)) diff --git a/lib/git/async/util.py b/lib/git/async/util.py index dff38f58..b5e1a0c0 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -10,7 +10,6 @@ from threading import ( ) from Queue import ( - Queue, Empty, ) @@ -171,15 +170,14 @@ class HSCondition(object): class ReadOnly(Exception): """Thrown when trying to write to a read-only queue""" -class AsyncQueue(Queue): +class AsyncQueue(deque): """A queue using different condition objects to gain multithreading performance. Additionally it has a threadsafe writable flag, which will alert all readers that there is nothing more to get here. All default-queue code was cleaned up for performance.""" - __slots__ = ('mutex', 'not_empty', 'queue', '_writable') + __slots__ = ('mutex', 'not_empty', '_writable') def __init__(self, maxsize=0): - self.queue = deque() self.mutex = Lock() self.not_empty = HSCondition(self.mutex) self._writable = True @@ -187,7 +185,7 @@ class AsyncQueue(Queue): def qsize(self): self.mutex.acquire() try: - return len(self.queue) + return len(self) finally: self.mutex.release() @@ -218,7 +216,7 @@ class AsyncQueue(Queue): def empty(self): self.mutex.acquire() try: - return not len(self.queue) + return not len(self) finally: self.mutex.release() @@ -228,21 +226,20 @@ class AsyncQueue(Queue): self.mutex.release() raise ReadOnly # END handle read-only - self.queue.append(item) + self.append(item) self.mutex.release() self.not_empty.notify() def get(self, block=True, timeout=None): self.mutex.acquire() - q = self.queue try: if block: if timeout is None: - while not len(q) and self._writable: + while not len(self) and self._writable: self.not_empty.wait() else: endtime = _time() + timeout - while not len(q) and self._writable: + while not len(self) and self._writable: remaining = endtime - _time() if remaining <= 0.0: raise Empty @@ -252,7 +249,7 @@ class AsyncQueue(Queue): # can throw if we woke up because we are not writable anymore try: - return q.popleft() + return self.popleft() except IndexError: raise Empty # END handle unblocking reason -- cgit v1.2.3 From 2054561da184955c4be4a92f0b4fa5c5c1c01350 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 9 Jun 2010 11:31:24 +0200 Subject: HSCondition: using a deck to store waiters, for further speedup --- lib/git/async/util.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/util.py b/lib/git/async/util.py index b5e1a0c0..2c18a1b9 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -79,7 +79,7 @@ class HSCondition(object): if lock is None: lock = Lock() self._lock = lock - self._waiters = list() + self._waiters = deque() def release(self): self._lock.release() @@ -146,7 +146,7 @@ class HSCondition(object): # so here we assume this is thead-safe ! It wouldn't be in any other # language, but python it is. try: - self._waiters.pop(0).release() + self._waiters.popleft().release() except IndexError: pass else: @@ -156,7 +156,7 @@ class HSCondition(object): # to do that in a thread-safe fashion try: for i in range(min(n, len(self._waiters))): - self._waiters.pop(0).release() + self._waiters.popleft().release() # END for each waiter to resume finally: self.release() -- cgit v1.2.3 From 1090701721888474d34f8a4af28ee1bb1c3fdaaa Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 9 Jun 2010 11:35:41 +0200 Subject: HSCondition: now deriving from deque, as the AsyncQeue does, to elimitate one more level of indirection. Clearly this not good from a design standpoint, as a Condition is no Deque, but it helps speeding things up which is what this is about. Could make it a hidden class to indicate how 'special' it is --- lib/git/async/util.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/util.py b/lib/git/async/util.py index 2c18a1b9..ffdb14a2 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -69,17 +69,16 @@ class SyncQueue(deque): put = deque.append -class HSCondition(object): +class HSCondition(deque): """Cleaned up code of the original condition object in order to make it run and respond faster.""" - __slots__ = ("_lock", '_waiters') + __slots__ = ("_lock") delay = 0.0002 # reduces wait times, but increases overhead def __init__(self, lock=None): if lock is None: lock = Lock() self._lock = lock - self._waiters = deque() def release(self): self._lock.release() @@ -93,7 +92,7 @@ class HSCondition(object): def wait(self, timeout=None): waiter = _allocate_lock() waiter.acquire() # get it the first time, no blocking - self._waiters.append(waiter) + self.append(waiter) # in the momemnt we release our lock, someone else might actually resume self.release() @@ -124,7 +123,7 @@ class HSCondition(object): # END endless loop if not gotit: try: - self._waiters.remove(waiter) + self.remove(waiter) except ValueError: pass # END didn't ever get it @@ -140,13 +139,13 @@ class HSCondition(object): In the multi-notify case, we acquire a lock just for safety, as otherwise we might pop too much of someone else notifies n waiters as well, which would in the worst case lead to double-releases of locks.""" - if not self._waiters: + if not self: return if n == 1: # so here we assume this is thead-safe ! It wouldn't be in any other # language, but python it is. try: - self._waiters.popleft().release() + self.popleft().release() except IndexError: pass else: @@ -155,8 +154,8 @@ class HSCondition(object): # and waits again, but only until we are done, which is important # to do that in a thread-safe fashion try: - for i in range(min(n, len(self._waiters))): - self._waiters.popleft().release() + for i in range(min(n, len(self))): + self.popleft().release() # END for each waiter to resume finally: self.release() @@ -164,7 +163,7 @@ class HSCondition(object): # END handle n = 1 case faster def notify_all(self): - self.notify(len(self._waiters)) + self.notify(len(self)) class ReadOnly(Exception): -- cgit v1.2.3 From a988e6985849e4f6a561b4a5468d525c25ce74fe Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 9 Jun 2010 11:45:25 +0200 Subject: HSCondition: now gets a lock even in the single-notify case, as it was required due to the non-atomiciy of the invovled operation. Removed one level of indirection for the lock, by refraining from calling my own 'wrapper' methods, which brought it back to the performance it had before the locking was introduced for the n==1 case --- lib/git/async/util.py | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/util.py b/lib/git/async/util.py index ffdb14a2..008e60a3 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -95,7 +95,7 @@ class HSCondition(deque): self.append(waiter) # in the momemnt we release our lock, someone else might actually resume - self.release() + self._lock.release() try: # restore state no matter what (e.g., KeyboardInterrupt) # now we block, as we hold the lock already if timeout is None: @@ -129,7 +129,7 @@ class HSCondition(deque): # END didn't ever get it finally: # reacquire the lock - self.acquire() + self._lock.acquire() # END assure release lock def notify(self, n=1): @@ -144,12 +144,23 @@ class HSCondition(deque): if n == 1: # so here we assume this is thead-safe ! It wouldn't be in any other # language, but python it is. + # But ... its two objects here - first the popleft, then the relasecall. + # If the timing is really really bad, and that happens if you let it + # run often enough ( its a matter of statistics ), this will fail, + # which is why we lock it. + # And yes, this causes some slow down, as single notifications happen + # alot + self._lock.acquire() try: - self.popleft().release() - except IndexError: - pass + try: + self.popleft().release() + except IndexError: + pass + finally: + self._lock.release() + # END assure lock is released else: - self.acquire() + self._lock.acquire() # once the waiter resumes, he will want to acquire the lock # and waits again, but only until we are done, which is important # to do that in a thread-safe fashion @@ -158,7 +169,7 @@ class HSCondition(deque): self.popleft().release() # END for each waiter to resume finally: - self.release() + self._lock.release() # END assure we release our lock # END handle n = 1 case faster -- cgit v1.2.3 From 4e6bece08aea01859a232e99a1e1ad8cc1eb7d36 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 9 Jun 2010 14:01:51 +0200 Subject: HSCondition: Fixed terrible bug which it inherited from its default python Condition implementation, related to the notify method not being treadsafe. Although I was aware of it, I missed the first check which tests for the size - the result could be incorrect if the whole method wasn't locked. Testing runs stable now, allowing to move on \! --- lib/git/async/pool.py | 7 +++++-- lib/git/async/task.py | 2 +- lib/git/async/util.py | 47 +++++++++++++++-------------------------------- 3 files changed, 21 insertions(+), 35 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 7ed6fd8e..66a2a105 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -58,14 +58,17 @@ class RPoolChannel(RChannel): def set_pre_cb(self, fun = lambda count: None): """Install a callback to call with the item count to be read before any - item is actually read from the channel. + item is actually read from the channel. The call must be threadsafe if + the channel is passed to more than one tasks. If it fails, the read will fail with an IOError If a function is not provided, the call is effectively uninstalled.""" self._pre_cb = fun def set_post_cb(self, fun = lambda item: item): """Install a callback to call after the items were read. The function - returns a possibly changed item list. If it raises, the exception will be propagated. + returns a possibly changed item list.The call must be threadsafe if + the channel is passed to more than one tasks. + If it raises, the exception will be propagated. If a function is not provided, the call is effectively uninstalled.""" self._post_cb = fun diff --git a/lib/git/async/task.py b/lib/git/async/task.py index dd2bd351..d18cedca 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -138,7 +138,7 @@ class OutputChannelTask(Node): # just the right thing to do of course - one loose link in the chain ... # Other chunks of our kind currently being processed will then # fail to write to the channel and fail as well - # self.close() + self.close() # If some other chunk of our Task had an error, the channel will be closed # This is not an issue, just be sure we don't overwrite the original diff --git a/lib/git/async/util.py b/lib/git/async/util.py index 008e60a3..2f46d55f 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -133,45 +133,28 @@ class HSCondition(deque): # END assure release lock def notify(self, n=1): - """Its vital that this method is threadsafe - to be fast we don'd get a lock, - but instead rely on pseudo-atomic operations that come with the GIL. - Hence we use pop in the n=1 case to be truly atomic. - In the multi-notify case, we acquire a lock just for safety, as otherwise - we might pop too much of someone else notifies n waiters as well, which - would in the worst case lead to double-releases of locks.""" - if not self: - return - if n == 1: - # so here we assume this is thead-safe ! It wouldn't be in any other - # language, but python it is. - # But ... its two objects here - first the popleft, then the relasecall. - # If the timing is really really bad, and that happens if you let it - # run often enough ( its a matter of statistics ), this will fail, - # which is why we lock it. - # And yes, this causes some slow down, as single notifications happen - # alot - self._lock.acquire() - try: + """Its vital that this method is threadsafe - we absolutely have to + get a lock at the beginning of this method to be sure we get the + correct amount of waiters back. If we bail out, although a waiter + is about to be added, it will miss its wakeup notification, and block + forever (possibly)""" + self._lock.acquire() + try: + if not self: # len(self) == 0, but this should be faster + return + if n == 1: try: self.popleft().release() except IndexError: pass - finally: - self._lock.release() - # END assure lock is released - else: - self._lock.acquire() - # once the waiter resumes, he will want to acquire the lock - # and waits again, but only until we are done, which is important - # to do that in a thread-safe fashion - try: + else: for i in range(min(n, len(self))): self.popleft().release() # END for each waiter to resume - finally: - self._lock.release() - # END assure we release our lock - # END handle n = 1 case faster + # END handle n = 1 case faster + finally: + self._lock.release() + # END assure lock is released def notify_all(self): self.notify(len(self)) -- cgit v1.2.3 From 0974f8737a3c56a7c076f9d0b757c6cb106324fb Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 9 Jun 2010 14:47:41 +0200 Subject: Channel: Read method revised - now it really really doesn't block anymore, and it runs faster as well, about 2/3 of the performance we have when being in serial mode --- lib/git/async/channel.py | 85 +++++++++++++++++++++--------------------------- lib/git/async/task.py | 2 +- 2 files changed, 38 insertions(+), 49 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index 58c35f96..3a277e7e 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -38,12 +38,11 @@ class Channel(object): class WChannel(Channel): """The write end of a channel""" - __slots__ = ('_closed', '_queue') + __slots__ = ('_queue') def __init__(self): """initialize this instance, able to hold max_items at once Write calls will block if the channel is full, until someone reads from it""" - self._closed = False self._queue = AsyncQueue() @@ -55,15 +54,10 @@ class WChannel(Channel): :param block: If True, the call will block until there is free space in the channel :param timeout: timeout in seconds for blocking calls. - :raise IOError: when writing into closed file - :raise EOFError: when writing into a non-blocking full channel""" + :raise ReadOnly: when writing into closed channel""" # let the queue handle the 'closed' attribute, we write much more often # to an open channel than to a closed one, saving a few cycles - try: - self._queue.put(item, block, timeout) - except ReadOnly: - raise IOError("Cannot write to a closed channel") - # END exception handling + self._queue.put(item, block, timeout) def size(self): """:return: approximate number of items that could be read from the read-ends @@ -73,15 +67,11 @@ class WChannel(Channel): def close(self): """Close the channel. Multiple close calls on a closed channel are no an error""" - # yes, close it a little too early, better than having anyone put - # additional items - self._closed = True self._queue.set_writable(False) - @property def closed(self): """:return: True if the channel was closed""" - return self._closed + return not self._queue.writable() #} END interface @@ -104,6 +94,7 @@ class RChannel(Channel): :param block: if True, the call will block until an item is available :param timeout: if positive and block is True, it will block only for the given amount of seconds, returning the items it received so far. + The timeout is applied to each read item, not for the whole operation. :return: single item in a list if count is 1, or a list of count items. If the channel was empty and count was 1, an empty list will be returned. If count was greater 1, a list with less than count items will be @@ -112,9 +103,11 @@ class RChannel(Channel): returned.""" # if the channel is closed for writing, we never block # NOTE: is handled by the queue - if self._wc.closed or timeout == 0: - block = False - + # We don't check for a closed state here has it costs time - most of + # the time, it will not be closed, and will bail out automatically once + # it gets closed + + # in non-blocking mode, its all not a problem out = list() queue = self._wc._queue @@ -142,42 +135,38 @@ class RChannel(Channel): count = sys.maxint # END handle count - endtime = sys.maxint # allows timeout for whole operation - if timeout is not None: - endtime = time() + timeout - # could be improved by a separate: no-endtime branch, saving the time calls - for i in xrange(count): + i = 0 + while i < count: try: out.append(queue.get(block, timeout)) + i += 1 except Empty: - # here we are only if there is nothing on the queue, - # and if we are blocking. If we are not blocking, this - # indiccates that the queue was set unwritable in the meanwhile. - # hence we can abort now to prevent reading (possibly) forever - # Besides, this is racy as all threads will rip on the channel - # without waiting until its empty - if not block: - break - # END ignore empty - - # if we have been unblocked because the closed state changed - # in the meanwhile, stop trying - # NOTE: must NOT cache _wc - if self._wc.closed: - # If we were closed, we drop out even if there might still - # be items. Now its time to get these items, according to - # our count. Just switch to unblocking mode. - # If we are to read unlimited items, this would run forever, - # but the EmptyException handler takes care of this - block = False + # here we are only if + # someone woke us up to inform us about the queue that changed + # its writable state + # The following branch checks for closed channels, and pulls + # as many items as we need and as possible, before + # leaving the loop. + if not queue.writable(): + try: + while i < count: + out.append(queue.get(False, None)) + i += 1 + # END count loop + except Empty: + break # out of count loop + # END handle absolutely empty queue + # END handle closed channel - # we don't continue, but let the timer decide whether - # it wants to abort - # END handle channel cloased - - if time() >= endtime: + # if we are here, we woke up and the channel is not closed + # Either the queue became writable again, which currently shouldn't + # be able to happen in the channel, or someone read with a timeout + # that actually timed out. + # As it timed out, which is the only reason we are here, + # we have to abort break - # END stop operation on timeout + # END ignore empty + # END for each item # END handle blocking return out diff --git a/lib/git/async/task.py b/lib/git/async/task.py index d18cedca..539b240f 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -66,7 +66,7 @@ class OutputChannelTask(Node): def is_closed(self): """:return: True if the task's write channel is closed""" - return self._out_wc.closed + return self._out_wc.closed() def error(self): """:return: Exception caught during last processing or None""" -- cgit v1.2.3 From 57a4e09294230a36cc874a6272c71757c48139f2 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 9 Jun 2010 15:29:47 +0200 Subject: Channel: removed pseudoconstructor, which clearly improves the design and makes it easier to constomize pool: in serial mode, created channels will be serial-only, which brings 15% of performance --- lib/git/async/channel.py | 36 +++++++++++++++++++++++------------- lib/git/async/pool.py | 36 +++++++++++++++++++++++------------- lib/git/async/util.py | 9 ++++++++- 3 files changed, 54 insertions(+), 27 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index 3a277e7e..bb118f30 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -6,6 +6,7 @@ from Queue import ( from util import ( AsyncQueue, + SyncQueue, ReadOnly ) @@ -24,27 +25,19 @@ class Channel(object): Create a new channel """ __slots__ = tuple() - - def __new__(cls, *args): - if cls is Channel: - if len(args) > 0: - raise ValueError("Cannot take any arguments when creating a new channel") - wc = WChannel() - rc = RChannel(wc) - return wc, rc - # END constructor mode - return object.__new__(cls) class WChannel(Channel): - """The write end of a channel""" + """The write end of a channel - it is thread-safe""" __slots__ = ('_queue') + # The queue to use to store the actual data + QueueCls = AsyncQueue + def __init__(self): """initialize this instance, able to hold max_items at once Write calls will block if the channel is full, until someone reads from it""" - self._queue = AsyncQueue() - + self._queue = self.QueueCls() #{ Interface def write(self, item, block=True, timeout=None): @@ -75,6 +68,12 @@ class WChannel(Channel): #} END interface +class SerialWChannel(WChannel): + """A slightly faster version of a WChannel, which sacrificed thead-safety for + performance""" + QueueCls = SyncQueue + + class RChannel(Channel): """The read-end of a corresponding write channel""" __slots__ = '_wc' @@ -174,3 +173,14 @@ class RChannel(Channel): #} END interface #} END classes + +#{ Constructors +def mkchannel(wctype = WChannel, rctype = RChannel): + """Create a channel, which consists of one write end and one read end + :return: tuple(write_channel, read_channel) + :param wctype: The type of the write channel to instantiate + :param rctype: The type of the read channel to instantiate""" + wc = wctype() + rc = rctype(wc) + return wc, rc +#} END constructors diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 66a2a105..549c801e 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -6,7 +6,6 @@ from thread import ( from threading import Lock from util import ( - SyncQueue, AsyncQueue, DummyLock ) @@ -19,8 +18,9 @@ from Queue import ( from graph import Graph from channel import ( - Channel, + mkchannel, WChannel, + SerialWChannel, RChannel ) @@ -329,7 +329,8 @@ class Pool(object): #{ Interface def size(self): - """:return: amount of workers in the pool""" + """:return: amount of workers in the pool + :note: method is not threadsafe !""" return self._num_workers def set_size(self, size=0): @@ -339,7 +340,9 @@ class Pool(object): :return: self :param size: if 0, the pool will do all work itself in the calling thread, - otherwise the work will be distributed among the given amount of threads + otherwise the work will be distributed among the given amount of threads. + If the size is 0, newly added tasks will use channels which are NOT + threadsafe to optimize item throughput. :note: currently NOT threadsafe !""" assert size > -1, "Size cannot be negative" @@ -437,17 +440,29 @@ class Pool(object): the task will be considered orphaned and will be deleted on the next occasion.""" # create a write channel for it - wc, rc = Channel() - rc = RPoolChannel(wc, task, self) - task.set_wc(wc) + wctype = WChannel self._taskgraph_lock.acquire() try: self._taskorder_cache.clear() self._tasks.add_node(task) + + # fix locks - in serial mode, the task does not need real locks + # Additionally, use a non-threadsafe queue + # This brings about 15% more performance, but sacrifices thread-safety + # when reading from multiple threads. + if self.size() == 0: + task._slock = DummyLock() + wctype = SerialWChannel + # END improve locks + + # setup the tasks channel + wc = wctype() + rc = RPoolChannel(wc, task, self) + task.set_wc(wc) finally: self._taskgraph_lock.release() - # END sync task addition + # END sync task addition # If the input channel is one of our read channels, we add the relation if isinstance(task, InputChannelTask): @@ -462,11 +477,6 @@ class Pool(object): # END add task relation # END handle input channels for connections - # fix locks - in serial mode, the task does not need real locks - if self.size() == 0: - task._slock = DummyLock() - # END improve locks - return rc #} END interface diff --git a/lib/git/async/util.py b/lib/git/async/util.py index 2f46d55f..00d0dbab 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -66,7 +66,14 @@ class SyncQueue(deque): def empty(self): return len(self) == 0 - put = deque.append + def set_writable(self, state): + pass + + def writable(self): + return True + + def put(self, item, block=True, timeout=None): + self.append(item) class HSCondition(deque): -- cgit v1.2.3 From 07996a1a1e53ffdd2680d4bfbc2f4059687859a5 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 9 Jun 2010 15:40:51 +0200 Subject: task: removed scheduled task support, which at some point was introduced to improve performance, but which now hinders performance, besides being unnecessary ;) --- lib/git/async/pool.py | 43 +++++++++++-------------------------------- lib/git/async/task.py | 33 --------------------------------- 2 files changed, 11 insertions(+), 65 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 549c801e..284c41c7 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -80,27 +80,21 @@ class RPoolChannel(RChannel): self._pre_cb() # END pre callback - # if we have count items, don't do any queue preparation - if someone - # depletes the queue in the meanwhile, the channel will close and - # we will unblock naturally - # PROBLEM: If there are multiple consumer of this channel, we might - # run out of items without being replenished == block forever in the - # worst case. task.min_count could have triggered to produce more ... - # usually per read with n items, we put n items on to the queue, - # so we wouldn't check this - # Even if we have just one consumer ( we could determine that with - # the reference count ), it could be that in one moment we don't yet - # have an item, but its currently being produced by some worker. - # This is why we: - # * make no assumptions if there are multiple consumers - # * + # NOTE: we always queue the operation that would give us count items + # as tracking the scheduled items or testing the channels size + # is in herently unsafe depending on the design of the task network + # If we put on tasks onto the queue for every request, we are sure + # to always produce enough items, even if the task.min_count actually + # provided enough - its better to have some possibly empty task runs + # than having and empty queue that blocks. + + # NOTE: TODO: that case is only possible if one Task could be connected + # to multiple input channels in a manner known by the system. Currently + # this is not possible, but should be implemented at some point # if the user tries to use us to read from a done task, we will never # compute as all produced items are already in the channel skip_compute = self._task.is_done() or self._task.error() - #if count > 0: - # skip_compute = self._task.scheduled_item_count() >= count or self._wc._queue.qsize() >= count - # END ########## prepare ############################## if not skip_compute: @@ -249,13 +243,6 @@ class Pool(object): # raise AssertionError("Shouldn't have consumed tasks on the pool, they delete themeselves, what happend ?") # END skip processing - # if the task does not have the required output on its queue, schedule - # it for processing. If we should process all, we don't care about the - # amount as it should process until its all done. - #if count > 1 and task._out_wc.size() >= count: - # continue - # END skip if we have enough - # but use the actual count to produce the output, we may produce # more than requested numchunks = 1 @@ -283,33 +270,26 @@ class Pool(object): queue = self._queue if numchunks > 1: for i in xrange(numchunks): - # schedule them as early as we know about them - task.add_scheduled_items(chunksize) queue.put((task.process, chunksize)) # END for each chunk to put else: - task.add_scheduled_items(chunksize) queue.put((task.process, chunksize)) # END try efficient looping if remainder: - task.add_scheduled_items(remainder) queue.put((task.process, remainder)) # END handle chunksize else: # no workers, so we have to do the work ourselves if numchunks > 1: for i in xrange(numchunks): - task.add_scheduled_items(chunksize) task.process(chunksize) # END for each chunk to put else: - task.add_scheduled_items(chunksize) task.process(chunksize) # END try efficient looping if remainder: - task.add_scheduled_items(remainder) task.process(remainder) # END handle chunksize # END handle serial mode @@ -452,7 +432,6 @@ class Pool(object): # This brings about 15% more performance, but sacrifices thread-safety # when reading from multiple threads. if self.size() == 0: - task._slock = DummyLock() wctype = SerialWChannel # END improve locks diff --git a/lib/git/async/task.py b/lib/git/async/task.py index 539b240f..be02cfe8 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -23,8 +23,6 @@ class OutputChannelTask(Node): '_out_wc', # output write channel '_exc', # exception caught '_done', # True if we are done - '_scheduled_items', # amount of scheduled items that will be processed in total - '_slock', # lock for scheduled items 'fun', # function to call with items read 'min_count', # minimum amount of items to produce, None means no override 'max_chunksize', # maximium amount of items to process per process call @@ -37,8 +35,6 @@ class OutputChannelTask(Node): self._out_wc = None # to be set later self._exc = None self._done = False - self._scheduled_items = 0 - self._slock = threading.Lock() self.fun = fun self.min_count = None self.max_chunksize = 0 # note set @@ -72,21 +68,6 @@ class OutputChannelTask(Node): """:return: Exception caught during last processing or None""" return self._exc - def add_scheduled_items(self, count): - """Add the given amount of scheduled items to this task""" - self._slock.acquire() - self._scheduled_items += count - self._slock.release() - - def scheduled_item_count(self): - """:return: amount of scheduled items for this task""" - self._slock.acquire() - try: - return self._scheduled_items - finally: - self._slock.release() - # END threadsafe return - def process(self, count=0): """Process count items and send the result individually to the output channel""" items = self._read(count) @@ -101,19 +82,12 @@ class OutputChannelTask(Node): if self.apply_single: for item in items: rval = self.fun(item) - # decrement afterwards, the its unscheduled once its produced - self._slock.acquire() - self._scheduled_items -= 1 - self._slock.release() wc.write(rval) # END for each item else: # shouldn't apply single be the default anyway ? # The task designers should chunk them up in advance rvals = self.fun(items) - self._slock.acquire() - self._scheduled_items -= len(items) - self._slock.release() for rval in rvals: wc.write(rval) # END handle single apply @@ -122,13 +96,6 @@ class OutputChannelTask(Node): # be sure our task is not scheduled again self.set_done() - # unschedule all, we don't know how many have been produced actually - # but only if we don't apply single please - if not self.apply_single: - self._slock.acquire() - self._scheduled_items -= len(items) - self._slock.release() - # END unschedule all # PROBLEM: We have failed to create at least one item, hence its not # garantueed that enough items will be produced for a possibly blocking -- cgit v1.2.3 From ea81f14dafbfb24d70373c74b5f8dabf3f2225d9 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 9 Jun 2010 16:38:21 +0200 Subject: Channel: Callbacks reviewed - they are now part of Subclasses of the default channel implementation, one of which is used as base by the Pool Read channel, releasing it of the duty to call these itself. The write channel with callback subclass allows the transformation of the item to be written --- lib/git/async/channel.py | 51 ++++++++++++++++++++++++++++++++++++++++++++++++ lib/git/async/pool.py | 51 ++++++------------------------------------------ 2 files changed, 57 insertions(+), 45 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index bb118f30..abb31035 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -68,6 +68,32 @@ class WChannel(Channel): #} END interface +class CallbackWChannel(WChannel): + """The write end of a channel which allows you to setup a callback to be + called after an item was written to the channel""" + __slots__ = ('_pre_cb') + + def __init__(self): + WChannel.__init__(self) + self._pre_cb = None + + def set_pre_cb(self, fun = lambda item: item): + """Install a callback to be called before the given item is written. + It returns a possibly altered item which will be written to the channel + instead, making it useful for pre-write item conversions. + Providing None uninstalls the current method. + :return: the previously installed function or None + :note: Must be thread-safe if the channel is used in multiple threads""" + prev = self._pre_cb + self._pre_cb = fun + return prev + + def write(self, item, block=True, timeout=None): + if self._pre_cb: + item = self._pre_cb(item) + WChannel.write(self, item, block, timeout) + + class SerialWChannel(WChannel): """A slightly faster version of a WChannel, which sacrificed thead-safety for performance""" @@ -171,7 +197,32 @@ class RChannel(Channel): return out #} END interface + +class CallbackRChannel(RChannel): + """A channel which sends a callback before items are read from the channel""" + __slots__ = "_pre_cb" + + def __init__(self, wc): + RChannel.__init__(self, wc) + self._pre_cb = None + + def set_pre_cb(self, fun = lambda count: None): + """Install a callback to call with the item count to be read before any + item is actually read from the channel. + Exceptions will be propagated. + If a function is not provided, the call is effectively uninstalled. + :return: the previously installed callback or None + :note: The callback must be threadsafe if the channel is used by multiple threads.""" + prev = self._pre_cb + self._pre_cb = fun + return prev + def read(self, count=0, block=True, timeout=None): + if self._pre_cb: + self._pre_cb(count) + return RChannel.read(self, count, block, timeout) + + #} END classes #{ Constructors diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 284c41c7..7d4e96d1 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -21,26 +21,24 @@ from channel import ( mkchannel, WChannel, SerialWChannel, - RChannel + CallbackRChannel ) import sys from time import sleep -class RPoolChannel(RChannel): +class RPoolChannel(CallbackRChannel): """ A read-only pool channel may not be wrapped or derived from, but it provides slots to call before and after an item is to be read. It acts like a handle to the underlying task in the pool.""" - __slots__ = ('_task', '_pool', '_pre_cb', '_post_cb') + __slots__ = ('_task', '_pool') def __init__(self, wchannel, task, pool): - RChannel.__init__(self, wchannel) + CallbackRChannel.__init__(self, wchannel) self._task = task self._pool = pool - self._pre_cb = None - self._post_cb = None def __del__(self): """Assures that our task will be deleted if we were the last reader""" @@ -56,30 +54,10 @@ class RPoolChannel(RChannel): self._pool.remove_task(self._task) # END handle refcount based removal of task - def set_pre_cb(self, fun = lambda count: None): - """Install a callback to call with the item count to be read before any - item is actually read from the channel. The call must be threadsafe if - the channel is passed to more than one tasks. - If it fails, the read will fail with an IOError - If a function is not provided, the call is effectively uninstalled.""" - self._pre_cb = fun - - def set_post_cb(self, fun = lambda item: item): - """Install a callback to call after the items were read. The function - returns a possibly changed item list.The call must be threadsafe if - the channel is passed to more than one tasks. - If it raises, the exception will be propagated. - If a function is not provided, the call is effectively uninstalled.""" - self._post_cb = fun - def read(self, count=0, block=True, timeout=None): """Read an item that was processed by one of our threads :note: Triggers task dependency handling needed to provide the necessary input""" - if self._pre_cb: - self._pre_cb() - # END pre callback - # NOTE: we always queue the operation that would give us count items # as tracking the scheduled items or testing the channels size # is in herently unsafe depending on the design of the task network @@ -90,7 +68,7 @@ class RPoolChannel(RChannel): # NOTE: TODO: that case is only possible if one Task could be connected # to multiple input channels in a manner known by the system. Currently - # this is not possible, but should be implemented at some point + # this is not possible, but should be implemented at some point. # if the user tries to use us to read from a done task, we will never # compute as all produced items are already in the channel @@ -105,25 +83,12 @@ class RPoolChannel(RChannel): ####### read data ######## ########################## # read actual items, tasks were setup to put their output into our channel ( as well ) - items = RChannel.read(self, count, block, timeout) + items = CallbackRChannel.read(self, count, block, timeout) ########################## - if self._post_cb: - items = self._post_cb(items) - - - ####### Finalize ######## - self._pool._post_channel_read(self._task) return items - #{ Internal - def _read(self, count=0, block=False, timeout=None): - """Calls the underlying channel's read directly, without triggering - the pool""" - return RChannel.read(self, count, block, timeout) - - #} END internal class Pool(object): @@ -296,10 +261,6 @@ class Pool(object): # END for each task to process - def _post_channel_read(self, task): - """Called after we processed a read to cleanup""" - pass - def _remove_task_if_orphaned(self, task): """Check the task, and delete it if it is orphaned""" # 1 as its stored on the task, 1 for the getrefcount call -- cgit v1.2.3 From 365fb14ced88a5571d3287ff1698582ceacd80d6 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 9 Jun 2010 16:59:17 +0200 Subject: task: redesigned write channel access to allow the task creator to set own write channels, possibly some with callbacks installed etc.. Pool.add_task will respect the users choice now, but provide defaults which are optimized for performance --- lib/git/async/pool.py | 13 ++++++++----- lib/git/async/task.py | 37 ++++++++++++++++++++++--------------- 2 files changed, 30 insertions(+), 20 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 7d4e96d1..f7c1cfe0 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -388,18 +388,21 @@ class Pool(object): self._taskorder_cache.clear() self._tasks.add_node(task) - # fix locks - in serial mode, the task does not need real locks - # Additionally, use a non-threadsafe queue + # Use a non-threadsafe queue # This brings about 15% more performance, but sacrifices thread-safety # when reading from multiple threads. if self.size() == 0: wctype = SerialWChannel # END improve locks - # setup the tasks channel - wc = wctype() + # setup the tasks channel - respect the task creators choice though + # if it is set. + wc = task.wchannel() + if wc is None: + wc = wctype() + # END create write channel ifunset rc = RPoolChannel(wc, task, self) - task.set_wc(wc) + task.set_wchannel(wc) finally: self._taskgraph_lock.release() # END sync task addition diff --git a/lib/git/async/task.py b/lib/git/async/task.py index be02cfe8..f98336b2 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -1,4 +1,5 @@ from graph import Node +from channel import WChannel from util import ReadOnly import threading @@ -11,8 +12,8 @@ class OutputChannelTask(Node): """Abstracts a named task as part of a set of interdependent tasks, which contains additional information on how the task should be queued and processed. - Results of the item processing are sent to an output channel, which is to be - set by the creator + Results of the item processing are sent to a write channel, which is to be + set by the creator using the ``set_wchannel`` method. * **min_count** assures that not less than min_count items will be processed per call. * **max_chunksize** assures that multi-threading is happening in smaller chunks. If @@ -29,10 +30,11 @@ class OutputChannelTask(Node): 'apply_single' # apply single items even if multiple where read ) - def __init__(self, id, fun, apply_single=True, min_count=None, max_chunksize=0): + def __init__(self, id, fun, apply_single=True, min_count=None, max_chunksize=0, + wchannel=None): Node.__init__(self, id) self._read = None # to be set by subclasss - self._out_wc = None # to be set later + self._out_wc = wchannel # to be set later self._exc = None self._done = False self.fun = fun @@ -48,13 +50,21 @@ class OutputChannelTask(Node): """Set ourselves to being done, has we have completed the processing""" self._done = True - def set_wc(self, wc): - """Set the write channel to the given one - :note: resets it done state in order to allow proper queue handling""" - self._done = False # TODO : fix this, this is a side-effect - self._scheduled_items = 0 + def set_wchannel(self, wc): + """Set the write channel to the given one""" self._out_wc = wc + def wchannel(self): + """:return: a proxy to our write channel or None if non is set + :note: you must not hold a reference to our write channel when the + task is being processed. This would cause the write channel never + to be closed as the task will think there is still another instance + being processed which can close the channel once it is done. + In the worst case, this will block your reads.""" + if self._out_wc is None: + return None + return self._out_wc + def close(self): """A closed task will close its channel to assure the readers will wake up :note: its safe to call this method multiple times""" @@ -128,8 +138,10 @@ class OutputChannelTask(Node): # END handle done state # If we appear to be the only one left with our output channel, and are - # closed ( this could have been set in another thread as well ), make + # done ( this could have been set in another thread as well ), make # sure to close the output channel. + # Waiting with this to be the last one helps to keep the + # write-channel writable longer # The count is: 1 = wc itself, 2 = first reader channel, + x for every # thread having its copy on the stack # + 1 for the instance we provide to refcount @@ -196,10 +208,5 @@ class InputChannelTask(OutputChannelTask): OutputChannelTask.__init__(self, *args, **kwargs) self._read = in_rc.read - def process(self, count=1): - # for now, just blindly read our input, could trigger a pool, even - # ours, but why not ? It should be able to handle this - # TODO: remove this method - super(InputChannelTask, self).process(count) #{ Configuration -- cgit v1.2.3 From 257a8a9441fca9a9bc384f673ba86ef5c3f1715d Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 9 Jun 2010 21:19:54 +0200 Subject: test: prepared task dependency test, which already helped to find bug in the reference counting mechanism, causing references to the pool to be kepts via cycles --- lib/git/async/pool.py | 55 ++++++++++++++++++++++++++++++++++++--------------- lib/git/async/task.py | 7 +++++-- 2 files changed, 44 insertions(+), 18 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index f7c1cfe0..2ec18f1a 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -25,6 +25,7 @@ from channel import ( ) import sys +import weakref from time import sleep @@ -33,25 +34,37 @@ class RPoolChannel(CallbackRChannel): before and after an item is to be read. It acts like a handle to the underlying task in the pool.""" - __slots__ = ('_task', '_pool') + __slots__ = ('_task_ref', '_pool_ref') def __init__(self, wchannel, task, pool): CallbackRChannel.__init__(self, wchannel) - self._task = task - self._pool = pool + self._task_ref = weakref.ref(task) + self._pool_ref = weakref.ref(pool) def __del__(self): """Assures that our task will be deleted if we were the last reader""" - del(self._wc) # decrement ref-count early - # now, if this is the last reader to the wc we just handled, there + task = self._task_ref() + if task is None: + return + + pool = self._pool_ref() + if pool is None: + return + + # if this is the last reader to the wc we just handled, there # is no way anyone will ever read from the task again. If so, # delete the task in question, it will take care of itself and orphans # it might leave # 1 is ourselves, + 1 for the call + 1, and 3 magical ones which # I can't explain, but appears to be normal in the destructor # On the caller side, getrefcount returns 2, as expected + # When just calling remove_task, + # it has no way of knowing that the write channel is about to diminsh. + # which is why we pass the info as a private kwarg - not nice, but + # okay for now + # TODO: Fix this - private/public method if sys.getrefcount(self) < 6: - self._pool.remove_task(self._task) + pool.remove_task(task, _from_destructor_=True) # END handle refcount based removal of task def read(self, count=0, block=True, timeout=None): @@ -72,11 +85,16 @@ class RPoolChannel(CallbackRChannel): # if the user tries to use us to read from a done task, we will never # compute as all produced items are already in the channel - skip_compute = self._task.is_done() or self._task.error() + task = self._task_ref() + if task is None: + return list() + # END abort if task was deleted + + skip_compute = task.is_done() or task.error() ########## prepare ############################## if not skip_compute: - self._pool._prepare_channel_read(self._task, count) + self._pool_ref()._prepare_channel_read(task, count) # END prepare pool scheduling @@ -261,11 +279,16 @@ class Pool(object): # END for each task to process - def _remove_task_if_orphaned(self, task): + def _remove_task_if_orphaned(self, task, from_destructor): """Check the task, and delete it if it is orphaned""" # 1 as its stored on the task, 1 for the getrefcount call - if sys.getrefcount(task._out_wc) < 3: - self.remove_task(task) + # If we are getting here from the destructor of an RPool channel, + # its totally valid to virtually decrement the refcount by 1 as + # we can expect it to drop once the destructor completes, which is when + # we finish all recursive calls + max_ref_count = 3 + from_destructor + if sys.getrefcount(task.wchannel()) < max_ref_count: + self.remove_task(task, from_destructor) #} END internal #{ Interface @@ -335,7 +358,7 @@ class Pool(object): finally: self._taskgraph_lock.release() - def remove_task(self, task): + def remove_task(self, task, _from_destructor_=False): """Delete the task Additionally we will remove orphaned tasks, which can be identified if their output channel is only held by themselves, so no one will ever consume @@ -370,7 +393,7 @@ class Pool(object): # END locked deletion for t in in_tasks: - self._remove_task_if_orphaned(t) + self._remove_task_if_orphaned(t, _from_destructor_) # END handle orphans recursively return self @@ -409,11 +432,11 @@ class Pool(object): # If the input channel is one of our read channels, we add the relation if isinstance(task, InputChannelTask): - ic = task.in_rc - if isinstance(ic, RPoolChannel) and ic._pool is self: + ic = task.rchannel() + if isinstance(ic, RPoolChannel) and ic._pool_ref() is self: self._taskgraph_lock.acquire() try: - self._tasks.add_edge(ic._task, task) + self._tasks.add_edge(ic._task_ref(), task) finally: self._taskgraph_lock.release() # END handle edge-adding diff --git a/lib/git/async/task.py b/lib/git/async/task.py index f98336b2..03b40492 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -208,5 +208,8 @@ class InputChannelTask(OutputChannelTask): OutputChannelTask.__init__(self, *args, **kwargs) self._read = in_rc.read - #{ Configuration - + def rchannel(self): + """:return: input channel from which we read""" + # the instance is bound in its instance method - lets use this to keep + # the refcount at one ( per consumer ) + return self._read.im_self -- cgit v1.2.3 From 3323464f85b986cba23176271da92a478b33ab9c Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 10 Jun 2010 00:24:49 +0200 Subject: messy first version of a properly working depth-first graph method, which allows the pool to work as expected. Many more tests need to be added, and there still is a problem with shutdown as sometimes it won't kill all threads, mainly because the process came up with worker threads started, which cannot be --- lib/git/async/graph.py | 23 ++++++++++++----------- lib/git/async/pool.py | 6 +++--- lib/git/async/task.py | 5 ++++- lib/git/async/util.py | 8 +++++++- 4 files changed, 26 insertions(+), 16 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/graph.py b/lib/git/async/graph.py index 6386cbaa..e3999cdc 100644 --- a/lib/git/async/graph.py +++ b/lib/git/async/graph.py @@ -87,25 +87,26 @@ class Graph(object): return self - def visit_input_inclusive_depth_first(self, node, visitor=lambda n: True ): - """Visit all input nodes of the given node, depth first, calling visitor - for each node on our way. If the function returns False, the traversal - will not go any deeper, but continue at the next branch - It will return the actual input node in the end !""" - nodes = node.in_nodes[:] + def input_inclusive_dfirst_reversed(self, node): + """Return all input nodes of the given node, depth first, + It will return the actual input node last, as it is required + like that by the pool""" + stack = [node] seen = set() # depth first - while nodes: - n = nodes.pop() + out = list() + while stack: + n = stack.pop() if n in seen: continue seen.add(n) + out.append(n) # only proceed in that direction if visitor is fine with it - if visitor(n): - nodes.extend(n.in_nodes) + stack.extend(n.in_nodes) # END call visitor # END while walking - visitor(node) + out.reverse() + return out diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 2ec18f1a..5ebc3655 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -182,14 +182,13 @@ class Pool(object): dfirst_tasks = self._taskorder_cache[id(task)] except KeyError: # have to retrieve the list from the graph - dfirst_tasks = list() - self._tasks.visit_input_inclusive_depth_first(task, lambda n: dfirst_tasks.append(n)) + dfirst_tasks = self._tasks.input_inclusive_dfirst_reversed(task) self._taskorder_cache[id(task)] = dfirst_tasks # END handle cached order retrieval finally: self._taskgraph_lock.release() # END handle locking - + print dfirst_tasks # check the min count on all involved tasks, and be sure that we don't # have any task which produces less than the maximum min-count of all tasks # The actual_count is used when chunking tasks up for the queue, whereas @@ -309,6 +308,7 @@ class Pool(object): threadsafe to optimize item throughput. :note: currently NOT threadsafe !""" + print "set_size", size assert size > -1, "Size cannot be negative" # either start new threads, or kill existing ones. diff --git a/lib/git/async/task.py b/lib/git/async/task.py index 03b40492..57dd285d 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -80,7 +80,9 @@ class OutputChannelTask(Node): def process(self, count=0): """Process count items and send the result individually to the output channel""" + print "%r: reading %i" % (self.id, count) items = self._read(count) + print "%r: done reading" % self.id try: # increase the ref-count - we use this to determine whether anyone else # is currently handling our output channel. As this method runs asynchronously, @@ -102,7 +104,7 @@ class OutputChannelTask(Node): wc.write(rval) # END handle single apply except Exception, e: - print >> sys.stderr, "task error:", str(e) # TODO: REMOVE DEBUG, or make it use logging + print >> sys.stderr, "task %s error:" % self.id, type(e), str(e) # TODO: REMOVE DEBUG, or make it use logging # be sure our task is not scheduled again self.set_done() @@ -146,6 +148,7 @@ class OutputChannelTask(Node): # thread having its copy on the stack # + 1 for the instance we provide to refcount if self.is_done() and getrefcount(self._out_wc) < 4: + print "Closing channel of %r" % self.id self.close() # END handle channel closure #{ Configuration diff --git a/lib/git/async/util.py b/lib/git/async/util.py index 00d0dbab..b7750b0b 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -206,7 +206,6 @@ class AsyncQueue(deque): return old finally: self.mutex.release() - # if we won't receive anymore items, inform the getters if not state: self.not_empty.notify_all() @@ -222,6 +221,13 @@ class AsyncQueue(deque): def put(self, item, block=True, timeout=None): self.mutex.acquire() + # NOTE: we explicitly do NOT check for our writable state + # Its just used as a notification signal, and we need to be able + # to continue writing to prevent threads ( easily ) from failing + # to write their computed results, which we want in fact + # NO: we want them to fail and stop processing, as the one who caused + # the channel to close had a reason and wants the threads to + # stop on the task as soon as possible if not self._writable: self.mutex.release() raise ReadOnly -- cgit v1.2.3 From cfb278d74ad01f3f1edf5e0ad113974a9555038d Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 10 Jun 2010 10:14:32 +0200 Subject: InputChannelTask now has interface for properly handling the reading from the same and different pools --- lib/git/async/pool.py | 43 +++++++++++++++++++++++++++++++++++++++---- lib/git/async/task.py | 26 +++++++++++++++++++++++++- 2 files changed, 64 insertions(+), 5 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 5ebc3655..1b3c2748 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -10,7 +10,6 @@ from util import ( DummyLock ) -from task import InputChannelTask from Queue import ( Queue, Empty @@ -66,6 +65,24 @@ class RPoolChannel(CallbackRChannel): if sys.getrefcount(self) < 6: pool.remove_task(task, _from_destructor_=True) # END handle refcount based removal of task + + #{ Internal + def _read(self, count=0, block=True, timeout=None): + """Direct read, bypassing the pool handling""" + return CallbackRChannel.read(self, count, block, timeout) + #} END internal + + #{ Interface + + def pool_ref(self): + """:return: reference to the pool we belong to""" + return self._pool_ref + + def task_ref(self): + """:return: reference to the task producing our items""" + return self._task_ref + + #} END interface def read(self, count=0, block=True, timeout=None): """Read an item that was processed by one of our threads @@ -188,7 +205,7 @@ class Pool(object): finally: self._taskgraph_lock.release() # END handle locking - print dfirst_tasks + # check the min count on all involved tasks, and be sure that we don't # have any task which produces less than the maximum min-count of all tasks # The actual_count is used when chunking tasks up for the queue, whereas @@ -406,6 +423,18 @@ class Pool(object): # create a write channel for it wctype = WChannel + # adjust the task with our pool ref, if it has the slot and is empty + # For now, we don't allow tasks to be used in multiple pools, except + # for by their channels + if hasattr(task, 'pool'): + their_pool = task.pool() + if their_pool is None: + task.set_pool(self) + elif their_pool is not self: + raise ValueError("Task %r is already registered to another pool" % task.id) + # END handle pool exclusivity + # END handle pool aware tasks + self._taskgraph_lock.acquire() try: self._taskorder_cache.clear() @@ -431,12 +460,18 @@ class Pool(object): # END sync task addition # If the input channel is one of our read channels, we add the relation - if isinstance(task, InputChannelTask): + if hasattr(task, 'rchannel'): ic = task.rchannel() - if isinstance(ic, RPoolChannel) and ic._pool_ref() is self: + if hasattr(ic, 'pool_ref') and ic.pool_ref()() is self: self._taskgraph_lock.acquire() try: self._tasks.add_edge(ic._task_ref(), task) + + # additionally, bypass ourselves when reading from the + # task, if possible + if hasattr(ic, '_read'): + task.set_read(ic._read) + # END handle read bypass finally: self._taskgraph_lock.release() # END handle edge-adding diff --git a/lib/git/async/task.py b/lib/git/async/task.py index 57dd285d..d5b45609 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -1,8 +1,12 @@ from graph import Node -from channel import WChannel from util import ReadOnly +from channel import ( + WChannel, + CallbackRChannel + ) import threading +import weakref import sys import new @@ -147,6 +151,7 @@ class OutputChannelTask(Node): # The count is: 1 = wc itself, 2 = first reader channel, + x for every # thread having its copy on the stack # + 1 for the instance we provide to refcount + # Soft close, so others can continue writing their results if self.is_done() and getrefcount(self._out_wc) < 4: print "Closing channel of %r" % self.id self.close() @@ -206,13 +211,32 @@ class InputChannelTask(OutputChannelTask): """Uses an input channel as source for reading items For instantiation, it takes all arguments of its base, the first one needs to be the input channel to read from though.""" + __slots__ = "_pool_ref" def __init__(self, in_rc, *args, **kwargs): OutputChannelTask.__init__(self, *args, **kwargs) self._read = in_rc.read + self._pool_ref = None + + #{ Internal Interface def rchannel(self): """:return: input channel from which we read""" # the instance is bound in its instance method - lets use this to keep # the refcount at one ( per consumer ) return self._read.im_self + + def set_read(self, read): + """Adjust the read method to the given one""" + self._read = read + + def set_pool(self, pool): + self._pool_ref = weakref.ref(pool) + + def pool(self): + """:return: pool we are attached to, or None""" + if self._pool_ref is None: + return None + return self._pool_ref() + + #} END intenral interface -- cgit v1.2.3 From 01eac1a959c1fa5894a86bf11e6b92f96762bdd8 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 10 Jun 2010 12:06:16 +0200 Subject: Added more dependency task tests, especially the single-reads are not yet fully deterministic as tasks still run into the problem that they try to write into a closed channel, it was closed by one of their task-mates who didn't know someone else was still computing --- lib/git/async/task.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/task.py b/lib/git/async/task.py index d5b45609..0b1d0666 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -84,9 +84,9 @@ class OutputChannelTask(Node): def process(self, count=0): """Process count items and send the result individually to the output channel""" - print "%r: reading %i" % (self.id, count) + # print "%r: reading %i" % (self.id, count) items = self._read(count) - print "%r: done reading" % self.id + # print "%r: done reading %i items" % (self.id, len(items)) try: # increase the ref-count - we use this to determine whether anyone else # is currently handling our output channel. As this method runs asynchronously, @@ -109,7 +109,6 @@ class OutputChannelTask(Node): # END handle single apply except Exception, e: print >> sys.stderr, "task %s error:" % self.id, type(e), str(e) # TODO: REMOVE DEBUG, or make it use logging - # be sure our task is not scheduled again self.set_done() @@ -153,7 +152,7 @@ class OutputChannelTask(Node): # + 1 for the instance we provide to refcount # Soft close, so others can continue writing their results if self.is_done() and getrefcount(self._out_wc) < 4: - print "Closing channel of %r" % self.id + # print "Closing channel of %r" % self.id self.close() # END handle channel closure #{ Configuration -- cgit v1.2.3 From 55e757928e493ce93056822d510482e4ffcaac2d Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 10 Jun 2010 14:39:57 +0200 Subject: channel: Changed design to be more logical - a channel now has any amount of readers and writers, a ready is not connected to its writer anymore. This changes the refcounting of course, which is why the auto-cleanup for the pool is currently broken. The benefit of this are faster writes to the channel, reading didn't improve, refcounts should be clearer now --- lib/git/async/channel.py | 102 ++++++++++++++++++++++------------------------- lib/git/async/pool.py | 76 +++++++++++++++++------------------ lib/git/async/task.py | 54 +++++++++++-------------- 3 files changed, 107 insertions(+), 125 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index abb31035..9b019707 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -21,61 +21,57 @@ class Channel(object): If the channel is closed, any read operation will result in an exception This base class is not instantiated directly, but instead serves as constructor - for RWChannel pairs. + for Rwriter pairs. Create a new channel """ - __slots__ = tuple() - - -class WChannel(Channel): - """The write end of a channel - it is thread-safe""" - __slots__ = ('_queue') + __slots__ = 'queue' # The queue to use to store the actual data QueueCls = AsyncQueue def __init__(self): - """initialize this instance, able to hold max_items at once - Write calls will block if the channel is full, until someone reads from it""" - self._queue = self.QueueCls() - - #{ Interface - def write(self, item, block=True, timeout=None): - """Send an item into the channel, it can be read from the read end of the - channel accordingly - :param item: Item to send - :param block: If True, the call will block until there is free space in the - channel - :param timeout: timeout in seconds for blocking calls. - :raise ReadOnly: when writing into closed channel""" - # let the queue handle the 'closed' attribute, we write much more often - # to an open channel than to a closed one, saving a few cycles - self._queue.put(item, block, timeout) - + """initialize this instance with a queue holding the channel contents""" + self.queue = self.QueueCls() + + +class SerialChannel(Channel): + """A slightly faster version of a Channel, which sacrificed thead-safety for performance""" + QueueCls = SyncQueue + + +class Writer(object): + """The write end of a channel, a file-like interface for a channel""" + __slots__ = ('write', 'channel') + + def __init__(self, channel): + """Initialize the writer to use the given channel""" + self.channel = channel + self.write = channel.queue.put + + #{ Interface def size(self): - """:return: approximate number of items that could be read from the read-ends - of this channel""" - return self._queue.qsize() + return self.channel.queue.qsize() def close(self): """Close the channel. Multiple close calls on a closed channel are no an error""" - self._queue.set_writable(False) + self.channel.queue.set_writable(False) def closed(self): """:return: True if the channel was closed""" - return not self._queue.writable() + return not self.channel.queue.writable() #} END interface -class CallbackWChannel(WChannel): +class CallbackWriter(Writer): """The write end of a channel which allows you to setup a callback to be called after an item was written to the channel""" __slots__ = ('_pre_cb') - def __init__(self): - WChannel.__init__(self) + def __init__(self, channel): + Writer.__init__(self, channel) self._pre_cb = None + self.write = self._write def set_pre_cb(self, fun = lambda item: item): """Install a callback to be called before the given item is written. @@ -88,25 +84,19 @@ class CallbackWChannel(WChannel): self._pre_cb = fun return prev - def write(self, item, block=True, timeout=None): + def _write(self, item, block=True, timeout=None): if self._pre_cb: item = self._pre_cb(item) - WChannel.write(self, item, block, timeout) + self.channel.queue.put(item, block, timeout) - -class SerialWChannel(WChannel): - """A slightly faster version of a WChannel, which sacrificed thead-safety for - performance""" - QueueCls = SyncQueue - -class RChannel(Channel): - """The read-end of a corresponding write channel""" - __slots__ = '_wc' +class Reader(object): + """Allows reading from a channel""" + __slots__ = 'channel' - def __init__(self, wchannel): + def __init__(self, channel): """Initialize this instance from its parent write channel""" - self._wc = wchannel + self.channel = channel #{ Interface @@ -135,7 +125,7 @@ class RChannel(Channel): # in non-blocking mode, its all not a problem out = list() - queue = self._wc._queue + queue = self.channel.queue if not block: # be as fast as possible in non-blocking mode, hence # its a bit 'unrolled' @@ -198,12 +188,12 @@ class RChannel(Channel): #} END interface -class CallbackRChannel(RChannel): +class CallbackReader(Reader): """A channel which sends a callback before items are read from the channel""" __slots__ = "_pre_cb" - def __init__(self, wc): - RChannel.__init__(self, wc) + def __init__(self, channel): + Reader.__init__(self, channel) self._pre_cb = None def set_pre_cb(self, fun = lambda count: None): @@ -220,18 +210,20 @@ class CallbackRChannel(RChannel): def read(self, count=0, block=True, timeout=None): if self._pre_cb: self._pre_cb(count) - return RChannel.read(self, count, block, timeout) + return Reader.read(self, count, block, timeout) #} END classes #{ Constructors -def mkchannel(wctype = WChannel, rctype = RChannel): - """Create a channel, which consists of one write end and one read end - :return: tuple(write_channel, read_channel) +def mkchannel(ctype = Channel, wtype = Writer, rtype = Reader): + """Create a channel, with a reader and a writer + :return: tuple(reader, writer) + :param ctype: Channel to instantiate :param wctype: The type of the write channel to instantiate :param rctype: The type of the read channel to instantiate""" - wc = wctype() - rc = rctype(wc) + c = ctype() + wc = wtype(c) + rc = rtype(c) return wc, rc #} END constructors diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 1b3c2748..68551ea3 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -18,27 +18,28 @@ from Queue import ( from graph import Graph from channel import ( mkchannel, - WChannel, - SerialWChannel, - CallbackRChannel + Writer, + Channel, + SerialChannel, + CallbackReader ) import sys import weakref from time import sleep +import new -class RPoolChannel(CallbackRChannel): - """ A read-only pool channel may not be wrapped or derived from, but it provides slots to call - before and after an item is to be read. - +class PoolReader(CallbackReader): + """A reader designed to read from channels which take part in pools It acts like a handle to the underlying task in the pool.""" - __slots__ = ('_task_ref', '_pool_ref') + __slots__ = ('_task_ref', '_pool_ref', '_read') - def __init__(self, wchannel, task, pool): - CallbackRChannel.__init__(self, wchannel) + def __init__(self, channel, task, pool): + CallbackReader.__init__(self, channel) self._task_ref = weakref.ref(task) self._pool_ref = weakref.ref(pool) + self._read = new.instancemethod(CallbackReader.__dict__['read'], self, CallbackReader) def __del__(self): """Assures that our task will be deleted if we were the last reader""" @@ -63,15 +64,9 @@ class RPoolChannel(CallbackRChannel): # okay for now # TODO: Fix this - private/public method if sys.getrefcount(self) < 6: - pool.remove_task(task, _from_destructor_=True) + pool.remove_task(task) # END handle refcount based removal of task - #{ Internal - def _read(self, count=0, block=True, timeout=None): - """Direct read, bypassing the pool handling""" - return CallbackRChannel.read(self, count, block, timeout) - #} END internal - #{ Interface def pool_ref(self): @@ -118,7 +113,7 @@ class RPoolChannel(CallbackRChannel): ####### read data ######## ########################## # read actual items, tasks were setup to put their output into our channel ( as well ) - items = CallbackRChannel.read(self, count, block, timeout) + items = CallbackReader.read(self, count, block, timeout) ########################## @@ -262,21 +257,21 @@ class Pool(object): # should make things execute faster. Putting the if statements # into the loop would be less code, but ... slower # DEBUG - # print actual_count, numchunks, chunksize, remainder, task._out_wc.size() + # print actual_count, numchunks, chunksize, remainder, task._out_writer.size() if self._num_workers: # respect the chunk size, and split the task up if we want # to process too much. This can be defined per task - queue = self._queue + qput = self._queue if numchunks > 1: for i in xrange(numchunks): - queue.put((task.process, chunksize)) + qput((task.process, chunksize)) # END for each chunk to put else: - queue.put((task.process, chunksize)) + qput((task.process, chunksize)) # END try efficient looping if remainder: - queue.put((task.process, remainder)) + qput((task.process, remainder)) # END handle chunksize else: # no workers, so we have to do the work ourselves @@ -295,16 +290,16 @@ class Pool(object): # END for each task to process - def _remove_task_if_orphaned(self, task, from_destructor): + def _remove_task_if_orphaned(self, task): """Check the task, and delete it if it is orphaned""" - # 1 as its stored on the task, 1 for the getrefcount call + # 1 for writer on task, 1 for the getrefcount call + 1 for each other writer/reader # If we are getting here from the destructor of an RPool channel, # its totally valid to virtually decrement the refcount by 1 as # we can expect it to drop once the destructor completes, which is when # we finish all recursive calls - max_ref_count = 3 + from_destructor - if sys.getrefcount(task.wchannel()) < max_ref_count: - self.remove_task(task, from_destructor) + max_ref_count = 3 + if sys.getrefcount(task.writer().channel) < max_ref_count: + self.remove_task(task) #} END internal #{ Interface @@ -375,7 +370,7 @@ class Pool(object): finally: self._taskgraph_lock.release() - def remove_task(self, task, _from_destructor_=False): + def remove_task(self, task): """Delete the task Additionally we will remove orphaned tasks, which can be identified if their output channel is only held by themselves, so no one will ever consume @@ -410,7 +405,7 @@ class Pool(object): # END locked deletion for t in in_tasks: - self._remove_task_if_orphaned(t, _from_destructor_) + self._remove_task_if_orphaned(t) # END handle orphans recursively return self @@ -421,7 +416,7 @@ class Pool(object): the task will be considered orphaned and will be deleted on the next occasion.""" # create a write channel for it - wctype = WChannel + ctype = Channel # adjust the task with our pool ref, if it has the slot and is empty # For now, we don't allow tasks to be used in multiple pools, except @@ -442,26 +437,29 @@ class Pool(object): # Use a non-threadsafe queue # This brings about 15% more performance, but sacrifices thread-safety - # when reading from multiple threads. if self.size() == 0: - wctype = SerialWChannel + ctype = SerialChannel # END improve locks # setup the tasks channel - respect the task creators choice though # if it is set. - wc = task.wchannel() + wc = task.writer() + ch = None if wc is None: - wc = wctype() + ch = ctype() + wc = Writer(ch) + task.set_writer(wc) + else: + ch = wc.channel # END create write channel ifunset - rc = RPoolChannel(wc, task, self) - task.set_wchannel(wc) + rc = PoolReader(ch, task, self) finally: self._taskgraph_lock.release() # END sync task addition # If the input channel is one of our read channels, we add the relation - if hasattr(task, 'rchannel'): - ic = task.rchannel() + if hasattr(task, 'reader'): + ic = task.reader() if hasattr(ic, 'pool_ref') and ic.pool_ref()() is self: self._taskgraph_lock.acquire() try: diff --git a/lib/git/async/task.py b/lib/git/async/task.py index 0b1d0666..5a6c1e95 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -1,23 +1,17 @@ from graph import Node from util import ReadOnly -from channel import ( - WChannel, - CallbackRChannel - ) import threading import weakref import sys import new -getrefcount = sys.getrefcount - class OutputChannelTask(Node): """Abstracts a named task as part of a set of interdependent tasks, which contains additional information on how the task should be queued and processed. Results of the item processing are sent to a write channel, which is to be - set by the creator using the ``set_wchannel`` method. + set by the creator using the ``set_writer`` method. * **min_count** assures that not less than min_count items will be processed per call. * **max_chunksize** assures that multi-threading is happening in smaller chunks. If @@ -25,9 +19,11 @@ class OutputChannelTask(Node): one worker, as well as dependent tasks. If you want finer granularity , you can specify this here, causing chunks to be no larger than max_chunksize""" __slots__ = ( '_read', # method to yield items to process - '_out_wc', # output write channel + '_out_writer', # output write channel '_exc', # exception caught '_done', # True if we are done + '_num_writers', # number of concurrent writers + '_wlock', # lock for the above 'fun', # function to call with items read 'min_count', # minimum amount of items to produce, None means no override 'max_chunksize', # maximium amount of items to process per process call @@ -35,12 +31,14 @@ class OutputChannelTask(Node): ) def __init__(self, id, fun, apply_single=True, min_count=None, max_chunksize=0, - wchannel=None): + writer=None): Node.__init__(self, id) self._read = None # to be set by subclasss - self._out_wc = wchannel # to be set later + self._out_writer = writer self._exc = None self._done = False + self._num_writers = 0 + self._wlock = threading.Lock() self.fun = fun self.min_count = None self.max_chunksize = 0 # note set @@ -54,29 +52,29 @@ class OutputChannelTask(Node): """Set ourselves to being done, has we have completed the processing""" self._done = True - def set_wchannel(self, wc): + def set_writer(self, writer): """Set the write channel to the given one""" - self._out_wc = wc + self._out_writer = writer - def wchannel(self): + def writer(self): """:return: a proxy to our write channel or None if non is set :note: you must not hold a reference to our write channel when the task is being processed. This would cause the write channel never to be closed as the task will think there is still another instance being processed which can close the channel once it is done. In the worst case, this will block your reads.""" - if self._out_wc is None: + if self._out_writer is None: return None - return self._out_wc + return self._out_writer def close(self): """A closed task will close its channel to assure the readers will wake up :note: its safe to call this method multiple times""" - self._out_wc.close() + self._out_writer.close() def is_closed(self): """:return: True if the task's write channel is closed""" - return self._out_wc.closed() + return self._out_writer.closed() def error(self): """:return: Exception caught during last processing or None""" @@ -88,24 +86,18 @@ class OutputChannelTask(Node): items = self._read(count) # print "%r: done reading %i items" % (self.id, len(items)) try: - # increase the ref-count - we use this to determine whether anyone else - # is currently handling our output channel. As this method runs asynchronously, - # we have to make sure that the channel is closed by the last finishing task, - # which is not necessarily the one which determines that he is done - # as he couldn't read anymore items. - # The refcount will be dropped in the moment we get out of here. - wc = self._out_wc + write = self._out_writer.write if self.apply_single: for item in items: rval = self.fun(item) - wc.write(rval) + write(rval) # END for each item else: # shouldn't apply single be the default anyway ? # The task designers should chunk them up in advance rvals = self.fun(items) for rval in rvals: - wc.write(rval) + write(rval) # END handle single apply except Exception, e: print >> sys.stderr, "task %s error:" % self.id, type(e), str(e) # TODO: REMOVE DEBUG, or make it use logging @@ -131,7 +123,7 @@ class OutputChannelTask(Node): self._exc = e # END set error flag # END exception handling - del(wc) + # if we didn't get all demanded items, which is also the case if count is 0 # we have depleted the input channel and are done @@ -151,7 +143,7 @@ class OutputChannelTask(Node): # thread having its copy on the stack # + 1 for the instance we provide to refcount # Soft close, so others can continue writing their results - if self.is_done() and getrefcount(self._out_wc) < 4: + if self.is_done(): # print "Closing channel of %r" % self.id self.close() # END handle channel closure @@ -212,14 +204,14 @@ class InputChannelTask(OutputChannelTask): to be the input channel to read from though.""" __slots__ = "_pool_ref" - def __init__(self, in_rc, *args, **kwargs): + def __init__(self, in_reader, *args, **kwargs): OutputChannelTask.__init__(self, *args, **kwargs) - self._read = in_rc.read + self._read = in_reader.read self._pool_ref = None #{ Internal Interface - def rchannel(self): + def reader(self): """:return: input channel from which we read""" # the instance is bound in its instance method - lets use this to keep # the refcount at one ( per consumer ) -- cgit v1.2.3 From 7c36f3648e39ace752c67c71867693ce1eee52a3 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 10 Jun 2010 15:38:40 +0200 Subject: Now tracking the amount of concurrent writers to assure the channel is closed only when there is no one else writing to it. This assures that all tasks can continue working, and put their results accordingly. Shutdown is still not working correctly, but that should be solvable as well. Its still not perfect though ... --- lib/git/async/pool.py | 24 +++++++++++-------- lib/git/async/task.py | 64 ++++++++++++++++++++++++++++++++++++--------------- 2 files changed, 59 insertions(+), 29 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 68551ea3..3fd99c7b 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -33,13 +33,12 @@ import new class PoolReader(CallbackReader): """A reader designed to read from channels which take part in pools It acts like a handle to the underlying task in the pool.""" - __slots__ = ('_task_ref', '_pool_ref', '_read') + __slots__ = ('_task_ref', '_pool_ref') def __init__(self, channel, task, pool): CallbackReader.__init__(self, channel) self._task_ref = weakref.ref(task) self._pool_ref = weakref.ref(pool) - self._read = new.instancemethod(CallbackReader.__dict__['read'], self, CallbackReader) def __del__(self): """Assures that our task will be deleted if we were the last reader""" @@ -62,11 +61,16 @@ class PoolReader(CallbackReader): # it has no way of knowing that the write channel is about to diminsh. # which is why we pass the info as a private kwarg - not nice, but # okay for now - # TODO: Fix this - private/public method if sys.getrefcount(self) < 6: - pool.remove_task(task) + pool.remove_task(task, _from_destructor_ = True) # END handle refcount based removal of task + #{ Internal + def _read(self, count=0, block=True, timeout=None): + return CallbackReader.read(self, count, block, timeout) + + #} END internal + #{ Interface def pool_ref(self): @@ -261,7 +265,7 @@ class Pool(object): if self._num_workers: # respect the chunk size, and split the task up if we want # to process too much. This can be defined per task - qput = self._queue + qput = self._queue.put if numchunks > 1: for i in xrange(numchunks): qput((task.process, chunksize)) @@ -290,16 +294,16 @@ class Pool(object): # END for each task to process - def _remove_task_if_orphaned(self, task): + def _remove_task_if_orphaned(self, task, from_destructor): """Check the task, and delete it if it is orphaned""" # 1 for writer on task, 1 for the getrefcount call + 1 for each other writer/reader # If we are getting here from the destructor of an RPool channel, # its totally valid to virtually decrement the refcount by 1 as # we can expect it to drop once the destructor completes, which is when # we finish all recursive calls - max_ref_count = 3 + max_ref_count = 3 + from_destructor if sys.getrefcount(task.writer().channel) < max_ref_count: - self.remove_task(task) + self.remove_task(task, from_destructor) #} END internal #{ Interface @@ -370,7 +374,7 @@ class Pool(object): finally: self._taskgraph_lock.release() - def remove_task(self, task): + def remove_task(self, task, _from_destructor_ = False): """Delete the task Additionally we will remove orphaned tasks, which can be identified if their output channel is only held by themselves, so no one will ever consume @@ -405,7 +409,7 @@ class Pool(object): # END locked deletion for t in in_tasks: - self._remove_task_if_orphaned(t) + self._remove_task_if_orphaned(t, _from_destructor_) # END handle orphans recursively return self diff --git a/lib/git/async/task.py b/lib/git/async/task.py index 5a6c1e95..ae2532d9 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -82,23 +82,36 @@ class OutputChannelTask(Node): def process(self, count=0): """Process count items and send the result individually to the output channel""" - # print "%r: reading %i" % (self.id, count) + # first thing: increment the writer count + self._wlock.acquire() + self._num_writers += 1 + self._wlock.release() + + #print "%r: reading %i" % (self.id, count) + #if hasattr(self, 'reader'): + # print "from", self.reader().channel items = self._read(count) - # print "%r: done reading %i items" % (self.id, len(items)) + #print "%r: done reading %i items" % (self.id, len(items)) try: - write = self._out_writer.write - if self.apply_single: - for item in items: - rval = self.fun(item) - write(rval) - # END for each item - else: - # shouldn't apply single be the default anyway ? - # The task designers should chunk them up in advance - rvals = self.fun(items) - for rval in rvals: - write(rval) - # END handle single apply + try: + write = self._out_writer.write + if self.apply_single: + for item in items: + rval = self.fun(item) + write(rval) + # END for each item + else: + # shouldn't apply single be the default anyway ? + # The task designers should chunk them up in advance + rvals = self.fun(items) + for rval in rvals: + write(rval) + # END handle single apply + finally: + self._wlock.acquire() + self._num_writers -= 1 + self._wlock.release() + # END handle writer count except Exception, e: print >> sys.stderr, "task %s error:" % self.id, type(e), str(e) # TODO: REMOVE DEBUG, or make it use logging # be sure our task is not scheduled again @@ -144,8 +157,13 @@ class OutputChannelTask(Node): # + 1 for the instance we provide to refcount # Soft close, so others can continue writing their results if self.is_done(): - # print "Closing channel of %r" % self.id - self.close() + self._wlock.acquire() + if self._num_writers == 0: + #if not self.is_closed(): # DEBUG + # print "Closing channel of %r" % self.id, len(self._out_writer.channel.queue), self._out_writer.channel + self.close() + # END handle writers + self._wlock.release() # END handle channel closure #{ Configuration @@ -158,7 +176,7 @@ class ThreadTaskBase(object): class InputIteratorTaskBase(OutputChannelTask): """Implements a task which processes items from an iterable in a multi-processing safe manner""" - __slots__ = ('_iterator', '_lock') + __slots__ = ('_iterator', '_lock', '_empty') # the type of the lock to use when reading from the iterator lock_type = None @@ -169,12 +187,19 @@ class InputIteratorTaskBase(OutputChannelTask): self._iterator = iterator self._lock = self.lock_type() self._read = self.__read + self._empty = False def __read(self, count=0): """Read count items from the iterator, and return them""" + # not threadsafe, but worst thing that could happen is that + # we try to get items one more time + if self._empty: + return list() + # END early abort self._lock.acquire() try: if count == 0: + self._empty = True return list(self._iterator) else: out = list() @@ -183,6 +208,7 @@ class InputIteratorTaskBase(OutputChannelTask): try: out.append(it.next()) except StopIteration: + self._empty = True break # END handle empty iterator # END for each item to take @@ -198,7 +224,7 @@ class InputIteratorThreadTask(InputIteratorTaskBase, ThreadTaskBase): lock_type = threading.Lock -class InputChannelTask(OutputChannelTask): +class InputChannelTask(OutputChannelTask, ThreadTaskBase): """Uses an input channel as source for reading items For instantiation, it takes all arguments of its base, the first one needs to be the input channel to read from though.""" -- cgit v1.2.3 From c34343d0b714d2c4657972020afea034a167a682 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 10 Jun 2010 15:52:32 +0200 Subject: tasks can now terminate faster when no items were read, without neglecting their duty to close the channel if required. Code is a little less maintainable now, but faster, it appears --- lib/git/async/task.py | 43 ++++++++++++++++++++++++------------------- 1 file changed, 24 insertions(+), 19 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/task.py b/lib/git/async/task.py index ae2532d9..a8ba5ac6 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -92,21 +92,24 @@ class OutputChannelTask(Node): # print "from", self.reader().channel items = self._read(count) #print "%r: done reading %i items" % (self.id, len(items)) + try: try: - write = self._out_writer.write - if self.apply_single: - for item in items: - rval = self.fun(item) - write(rval) - # END for each item - else: - # shouldn't apply single be the default anyway ? - # The task designers should chunk them up in advance - rvals = self.fun(items) - for rval in rvals: - write(rval) - # END handle single apply + if items: + write = self._out_writer.write + if self.apply_single: + for item in items: + rval = self.fun(item) + write(rval) + # END for each item + else: + # shouldn't apply single be the default anyway ? + # The task designers should chunk them up in advance + rvals = self.fun(items) + for rval in rvals: + write(rval) + # END handle single apply + # END if there is anything to do finally: self._wlock.acquire() self._num_writers -= 1 @@ -158,12 +161,14 @@ class OutputChannelTask(Node): # Soft close, so others can continue writing their results if self.is_done(): self._wlock.acquire() - if self._num_writers == 0: - #if not self.is_closed(): # DEBUG - # print "Closing channel of %r" % self.id, len(self._out_writer.channel.queue), self._out_writer.channel - self.close() - # END handle writers - self._wlock.release() + try: + if self._num_writers == 0: + # print "Closing channel of %r" % self.id, len(self._out_writer.channel.queue), self._out_writer.channel + self.close() + # END handle writers + finally: + self._wlock.release() + # END assure lock release # END handle channel closure #{ Configuration -- cgit v1.2.3 From fbe062bf6dacd3ad63dd827d898337fa542931ac Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 10 Jun 2010 23:55:50 +0200 Subject: Added dependency-task tests, and fixed plenty of ref-count related bugs, as well as concurrency issues. Now it works okay, but the thread-shutdown is still an issue, as it causes incorrect behaviour making the tests fail. Its good, as it hints at additional issues that need to be solved. There is just a little more left on the feature side, but its nearly there --- lib/git/async/graph.py | 14 +++++++++++++- lib/git/async/pool.py | 2 +- lib/git/async/task.py | 10 ++++++++-- lib/git/async/thread.py | 43 ++++++++++++++++++++++--------------------- 4 files changed, 44 insertions(+), 25 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/graph.py b/lib/git/async/graph.py index e3999cdc..9ee0e891 100644 --- a/lib/git/async/graph.py +++ b/lib/git/async/graph.py @@ -25,14 +25,24 @@ class Graph(object): def __init__(self): self.nodes = list() + + def __del__(self): + """Deletes bidericational dependencies""" + for node in self.nodes: + node.in_nodes = None + node.out_nodes = None + # END cleanup nodes + + # otherwise the nodes would keep floating around + def add_node(self, node): """Add a new node to the graph :return: the newly added node""" self.nodes.append(node) return node - def del_node(self, node): + def remove_node(self, node): """Delete a node from the graph :return: self""" try: @@ -46,6 +56,8 @@ class Graph(object): del(outn.in_nodes[outn.in_nodes.index(node)]) for inn in node.in_nodes: del(inn.out_nodes[inn.out_nodes.index(node)]) + node.out_nodes = list() + node.in_nodes = list() return self def add_edge(self, u, v): diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 3fd99c7b..0aad90ae 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -402,7 +402,7 @@ class Pool(object): # keep its input nodes as we check whether they were orphaned in_tasks = task.in_nodes - self._tasks.del_node(task) + self._tasks.remove_node(task) self._taskorder_cache.clear() finally: self._taskgraph_lock.release() diff --git a/lib/git/async/task.py b/lib/git/async/task.py index a8ba5ac6..49e7e7cf 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -82,7 +82,8 @@ class OutputChannelTask(Node): def process(self, count=0): """Process count items and send the result individually to the output channel""" - # first thing: increment the writer count + # first thing: increment the writer count - other tasks must be able + # to respond properly ( even if it turns out we don't need it later ) self._wlock.acquire() self._num_writers += 1 self._wlock.release() @@ -191,7 +192,11 @@ class InputIteratorTaskBase(OutputChannelTask): raise ValueError("Iterator %r needs a next() function" % iterator) self._iterator = iterator self._lock = self.lock_type() - self._read = self.__read + + # this is necessary to prevent a cyclic ref, preventing us from + # getting deleted ( and collected ) + weakself = weakref.ref(self) + self._read = lambda count: weakself().__read(count) self._empty = False def __read(self, count=0): @@ -201,6 +206,7 @@ class InputIteratorTaskBase(OutputChannelTask): if self._empty: return list() # END early abort + self._lock.acquire() try: if count == 0: diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py index faeda04f..b8d2e418 100644 --- a/lib/git/async/thread.py +++ b/lib/git/async/thread.py @@ -116,7 +116,7 @@ class WorkerThread(TerminatableThread): t[1] = optional, tuple or list of arguments to pass to the routine t[2] = optional, dictionary of keyword arguments to pass to the routine """ - __slots__ = ('inq', '_current_routine') + __slots__ = ('inq') # define how often we should check for a shutdown request in case our @@ -128,7 +128,6 @@ class WorkerThread(TerminatableThread): self.inq = inq if inq is None: self.inq = Queue.Queue() - self._current_routine = None # routine we execute right now @classmethod def stop(cls, *args): @@ -141,7 +140,6 @@ class WorkerThread(TerminatableThread): gettask = self.inq.get while True: - self._current_routine = None if self._should_terminate(): break # END check for stop request @@ -153,22 +151,27 @@ class WorkerThread(TerminatableThread): assert len(tasktuple) == 2, "Need tuple of function, arg - it could be more flexible, but its reduced to what we need" routine, arg = tasktuple - self._current_routine = routine - try: - rval = None - if inspect.ismethod(routine): - if routine.im_self is None: - rval = routine(self, arg) - else: + try: + rval = None + if inspect.ismethod(routine): + if routine.im_self is None: + rval = routine(self, arg) + else: + rval = routine(arg) + elif inspect.isroutine(routine): rval = routine(arg) - elif inspect.isroutine(routine): - rval = routine(arg) - else: - # ignore unknown items - print >> sys.stderr, "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple)) - break - # END make routine call + else: + # ignore unknown items + print >> sys.stderr, "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple)) + break + # END make routine call + finally: + # make sure we delete the routine to release the reference as soon + # as possible. Otherwise objects might not be destroyed + # while we are waiting + del(routine) + del(tasktuple) except StopProcessing: print self.name, "stops processing" # DEBUG break @@ -176,12 +179,10 @@ class WorkerThread(TerminatableThread): print >> sys.stderr, "%s: Task %s raised unhandled exception: %s - this really shouldn't happen !" % (self.getName(), str(tasktuple), str(e)) continue # just continue # END routine exception handling + + # END handle routine release # END endless loop - def routine(self): - """:return: routine we are currently executing, or None if we have no task""" - return self._current_routine - def stop_and_join(self): """Send stop message to ourselves""" self.inq.put((self.stop, None)) -- cgit v1.2.3 From 6d1212e8c412b0b4802bc1080d38d54907db879d Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Fri, 11 Jun 2010 11:52:01 +0200 Subject: IMPORTANT: sometimes, when notifying waiters by releasing their lock, the lock is not actually released or they are not actually notifyied, staying in a beautysleep. This glitch is probably caused by some detail not treated correctly in the thread python module, which is something we cannot fix. It works most of the time as expected though - maybe some cleanup is not done correctly which causes this --- lib/git/async/pool.py | 1 - lib/git/async/thread.py | 2 -- lib/git/async/util.py | 8 +++++--- 3 files changed, 5 insertions(+), 6 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 0aad90ae..dbc201a9 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -324,7 +324,6 @@ class Pool(object): threadsafe to optimize item throughput. :note: currently NOT threadsafe !""" - print "set_size", size assert size > -1, "Size cannot be negative" # either start new threads, or kill existing ones. diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py index b8d2e418..4d046a2f 100644 --- a/lib/git/async/thread.py +++ b/lib/git/async/thread.py @@ -146,9 +146,7 @@ class WorkerThread(TerminatableThread): # we wait and block - to terminate, send the 'stop' method tasktuple = gettask() - # needing exactly one function, and one arg - assert len(tasktuple) == 2, "Need tuple of function, arg - it could be more flexible, but its reduced to what we need" routine, arg = tasktuple try: diff --git a/lib/git/async/util.py b/lib/git/async/util.py index b7750b0b..11ab75a6 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -101,10 +101,12 @@ class HSCondition(deque): waiter.acquire() # get it the first time, no blocking self.append(waiter) - # in the momemnt we release our lock, someone else might actually resume - self._lock.release() - try: # restore state no matter what (e.g., KeyboardInterrupt) + + try: + # restore state no matter what (e.g., KeyboardInterrupt) # now we block, as we hold the lock already + # in the momemnt we release our lock, someone else might actually resume + self._lock.release() if timeout is None: waiter.acquire() else: -- cgit v1.2.3 From 1873db442dc7511fc2c92fbaeb8d998d3e62723d Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Fri, 11 Jun 2010 16:25:27 +0200 Subject: Improved shutdown handling - although its impossible to prevent some stderr printing thanks to the underlying threading implementation, we can at least make sure that the interpreter doesn't block during shutdown. Now it appears to be running smoothly --- lib/git/async/__init__.py | 29 +++++++++++++++++++++++++++++ lib/git/async/thread.py | 25 +++++++++++++++++-------- 2 files changed, 46 insertions(+), 8 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/__init__.py b/lib/git/async/__init__.py index 89b9eb47..e212f1b2 100644 --- a/lib/git/async/__init__.py +++ b/lib/git/async/__init__.py @@ -1 +1,30 @@ """Initialize the multi-processing package""" + +#{ Initialization +def _init_atexit(): + """Setup an at-exit job to be sure our workers are shutdown correctly before + the interpreter quits""" + import atexit + import thread + atexit.register(thread.do_terminate_threads) + +def _init_signals(): + """Assure we shutdown our threads correctly when being interrupted""" + import signal + import thread + + prev_handler = signal.getsignal(signal.SIGINT) + def thread_interrupt_handler(signum, frame): + thread.do_terminate_threads() + if callable(prev_handler): + prev_handler(signum, frame) + raise KeyboardInterrupt() + # END call previous handler + # END signal handler + signal.signal(signal.SIGINT, thread_interrupt_handler) + + +#} END init + +_init_atexit() +_init_signals() diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py index 4d046a2f..afe0d79d 100644 --- a/lib/git/async/thread.py +++ b/lib/git/async/thread.py @@ -136,16 +136,21 @@ class WorkerThread(TerminatableThread): def run(self): """Process input tasks until we receive the quit signal""" - print self.name, "starts processing" # DEBUG - gettask = self.inq.get while True: if self._should_terminate(): break # END check for stop request - # we wait and block - to terminate, send the 'stop' method + # note: during shutdown, this turns None in the middle of waiting + # for an item to be put onto it - we can't du anything about it - + # even if we catch everything and break gracefully, the parent + # call will think we failed with an empty exception. + # Hence we just don't do anything about it. Alternatively + # we could override the start method to get our own bootstrapping, + # which would mean repeating plenty of code in of the threading module. tasktuple = gettask() + # needing exactly one function, and one arg routine, arg = tasktuple @@ -161,7 +166,7 @@ class WorkerThread(TerminatableThread): rval = routine(arg) else: # ignore unknown items - print >> sys.stderr, "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple)) + sys.stderr.write("%s: task %s was not understood - terminating\n" % (self.getName(), str(tasktuple))) break # END make routine call finally: @@ -171,10 +176,9 @@ class WorkerThread(TerminatableThread): del(routine) del(tasktuple) except StopProcessing: - print self.name, "stops processing" # DEBUG break except Exception,e: - print >> sys.stderr, "%s: Task %s raised unhandled exception: %s - this really shouldn't happen !" % (self.getName(), str(tasktuple), str(e)) + sys.stderr.write("%s: Task %s raised unhandled exception: %s - this really shouldn't happen !\n" % (self.getName(), str(tasktuple), str(e))) continue # just continue # END routine exception handling @@ -182,7 +186,12 @@ class WorkerThread(TerminatableThread): # END endless loop def stop_and_join(self): - """Send stop message to ourselves""" + """Send stop message to ourselves - we don't block, the thread will terminate + once it has finished processing its input queue to receive our termination + event""" + # DONT call superclass as it will try to join - join's don't work for + # some reason, as python apparently doesn't switch threads (so often) + # while waiting ... I don't know, but the threads respond properly, + # but only if dear python switches to them self.inq.put((self.stop, None)) - super(WorkerThread, self).stop_and_join() #} END classes -- cgit v1.2.3 From e14e3f143e7260de9581aee27e5a9b2645db72de Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Fri, 11 Jun 2010 16:42:09 +0200 Subject: Removed commented-out debug code and additional debug printings. Verified it works on py2.4, 2.5 and 2.6 --- lib/git/async/pool.py | 6 ------ lib/git/async/task.py | 6 ------ lib/git/async/util.py | 1 - 3 files changed, 13 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index dbc201a9..7ee3e8eb 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -95,10 +95,6 @@ class PoolReader(CallbackReader): # provided enough - its better to have some possibly empty task runs # than having and empty queue that blocks. - # NOTE: TODO: that case is only possible if one Task could be connected - # to multiple input channels in a manner known by the system. Currently - # this is not possible, but should be implemented at some point. - # if the user tries to use us to read from a done task, we will never # compute as all produced items are already in the channel task = self._task_ref() @@ -260,8 +256,6 @@ class Pool(object): # the following loops are kind of unrolled - code duplication # should make things execute faster. Putting the if statements # into the loop would be less code, but ... slower - # DEBUG - # print actual_count, numchunks, chunksize, remainder, task._out_writer.size() if self._num_workers: # respect the chunk size, and split the task up if we want # to process too much. This can be defined per task diff --git a/lib/git/async/task.py b/lib/git/async/task.py index 49e7e7cf..10b22649 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -88,11 +88,7 @@ class OutputChannelTask(Node): self._num_writers += 1 self._wlock.release() - #print "%r: reading %i" % (self.id, count) - #if hasattr(self, 'reader'): - # print "from", self.reader().channel items = self._read(count) - #print "%r: done reading %i items" % (self.id, len(items)) try: try: @@ -117,7 +113,6 @@ class OutputChannelTask(Node): self._wlock.release() # END handle writer count except Exception, e: - print >> sys.stderr, "task %s error:" % self.id, type(e), str(e) # TODO: REMOVE DEBUG, or make it use logging # be sure our task is not scheduled again self.set_done() @@ -164,7 +159,6 @@ class OutputChannelTask(Node): self._wlock.acquire() try: if self._num_writers == 0: - # print "Closing channel of %r" % self.id, len(self._out_writer.channel.queue), self._out_writer.channel self.close() # END handle writers finally: diff --git a/lib/git/async/util.py b/lib/git/async/util.py index 11ab75a6..4c4f3929 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -2,7 +2,6 @@ from threading import ( Lock, - current_thread, _allocate_lock, _Condition, _sleep, -- cgit v1.2.3 From cac6e06cc9ef2903a15e594186445f3baa989a1a Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Fri, 11 Jun 2010 16:58:44 +0200 Subject: test_task: fixed import error, made all modules from x import * safe --- lib/git/async/channel.py | 3 +++ lib/git/async/graph.py | 2 ++ lib/git/async/pool.py | 2 ++ lib/git/async/task.py | 5 ++++- lib/git/async/thread.py | 4 ++++ 5 files changed, 15 insertions(+), 1 deletion(-) (limited to 'lib/git/async') diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index 9b019707..ae476cda 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -13,6 +13,9 @@ from util import ( from time import time import sys +__all__ = ('Channel', 'SerialChannel', 'Writer', 'CallbackWriter', 'Reader', + 'CallbackReader', 'mkchannel', 'ReadOnly') + #{ Classes class Channel(object): """A channel is similar to a file like object. It has a write end as well as one or diff --git a/lib/git/async/graph.py b/lib/git/async/graph.py index 9ee0e891..4e14c81e 100644 --- a/lib/git/async/graph.py +++ b/lib/git/async/graph.py @@ -1,5 +1,7 @@ """Simplistic implementation of a graph""" +__all__ = ('Node', 'Graph') + class Node(object): """A Node in the graph. They know their neighbours, and have an id which should resolve into a string""" diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 7ee3e8eb..cf14e47b 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -30,6 +30,8 @@ from time import sleep import new +__all__ = ('PoolReader', 'Pool', 'ThreadPool') + class PoolReader(CallbackReader): """A reader designed to read from channels which take part in pools It acts like a handle to the underlying task in the pool.""" diff --git a/lib/git/async/task.py b/lib/git/async/task.py index 10b22649..d7f331b7 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -6,8 +6,11 @@ import weakref import sys import new +__all__ = ('OutputChannelTask', 'ThreadTaskBase', 'InputIteratorTaskBase', + 'InputIteratorThreadTask', 'InputChannelTask') + class OutputChannelTask(Node): - """Abstracts a named task as part of a set of interdependent tasks, which contains + """Abstracts a named task, which contains additional information on how the task should be queued and processed. Results of the item processing are sent to a write channel, which is to be diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py index afe0d79d..96b4f0c4 100644 --- a/lib/git/async/thread.py +++ b/lib/git/async/thread.py @@ -7,6 +7,10 @@ import Queue import sys +__all__ = ('do_terminate_threads', 'terminate_threads', 'TerminatableThread', + 'WorkerThread') + + #{ Decorators def do_terminate_threads(whitelist=list()): -- cgit v1.2.3 From a28942bdf01f4ddb9d0b5a0489bd6f4e101dd775 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Fri, 11 Jun 2010 20:13:21 +0200 Subject: Added performance test, improved iterator task which will now be usable by default. It shows that there must be the notion of a producer, which can work if there are no items read --- lib/git/async/task.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'lib/git/async') diff --git a/lib/git/async/task.py b/lib/git/async/task.py index d7f331b7..0eb4527c 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -196,6 +196,9 @@ class InputIteratorTaskBase(OutputChannelTask): self._read = lambda count: weakself().__read(count) self._empty = False + # defaults to returning our items unchanged + self.fun = lambda item: item + def __read(self, count=0): """Read count items from the iterator, and return them""" # not threadsafe, but worst thing that could happen is that -- cgit v1.2.3 From be8955a0fbb77d673587974b763f17c214904b57 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sat, 12 Jun 2010 11:19:18 +0200 Subject: Cleaned up channel design, Reader and Writer bases don't require a channel anymore, but are abstract. Added IteratorReader, implementing the reader interface from an iterator. The implementation moved from the TaskIterator to the channel --- lib/git/async/channel.py | 139 +++++++++++++++++++++++++++++++++++++++-------- lib/git/async/pool.py | 15 ++--- lib/git/async/task.py | 51 ++--------------- 3 files changed, 130 insertions(+), 75 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index ae476cda..79cb5294 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -11,10 +11,12 @@ from util import ( ) from time import time +import threading import sys -__all__ = ('Channel', 'SerialChannel', 'Writer', 'CallbackWriter', 'Reader', - 'CallbackReader', 'mkchannel', 'ReadOnly') +__all__ = ('Channel', 'SerialChannel', 'Writer', 'ChannelWriter', 'CallbackChannelWriter', + 'Reader', 'ChannelReader', 'CallbackChannelReader', 'mkchannel', 'ReadOnly', + 'IteratorReader') #{ Classes class Channel(object): @@ -43,15 +45,50 @@ class SerialChannel(Channel): class Writer(object): + """A writer is an object providing write access to a possibly blocking reading device""" + __slots__ = tuple() + + #{ Interface + + def __init__(self, device): + """Initialize the instance with the device to write to""" + + def write(self, item, block=True, timeout=None): + """Write the given item into the device + :param block: True if the device may block until space for the item is available + :param timeout: The time in seconds to wait for the device to become ready + in blocking mode""" + raise NotImplementedError() + + def size(self): + """:return: number of items already in the device, they could be read with a reader""" + raise NotImplementedError() + + def close(self): + """Close the channel. Multiple close calls on a closed channel are no + an error""" + raise NotImplementedError() + + def closed(self): + """:return: True if the channel was closed""" + raise NotImplementedError() + + #} END interface + + +class ChannelWriter(Writer): """The write end of a channel, a file-like interface for a channel""" - __slots__ = ('write', 'channel') + __slots__ = ('channel', '_put') def __init__(self, channel): """Initialize the writer to use the given channel""" self.channel = channel - self.write = channel.queue.put + self._put = self.channel.queue.put #{ Interface + def write(self, item, block=False, timeout=None): + return self._put(item, block, timeout) + def size(self): return self.channel.queue.qsize() @@ -66,15 +103,14 @@ class Writer(object): #} END interface -class CallbackWriter(Writer): +class CallbackChannelWriter(ChannelWriter): """The write end of a channel which allows you to setup a callback to be called after an item was written to the channel""" __slots__ = ('_pre_cb') def __init__(self, channel): - Writer.__init__(self, channel) + super(CallbackChannelWriter, self).__init__(channel) self._pre_cb = None - self.write = self._write def set_pre_cb(self, fun = lambda item: item): """Install a callback to be called before the given item is written. @@ -87,25 +123,22 @@ class CallbackWriter(Writer): self._pre_cb = fun return prev - def _write(self, item, block=True, timeout=None): + def write(self, item, block=True, timeout=None): if self._pre_cb: item = self._pre_cb(item) - self.channel.queue.put(item, block, timeout) + super(CallbackChannelWriter, self).write(item, block, timeout) class Reader(object): - """Allows reading from a channel""" - __slots__ = 'channel' + """Allows reading from a device""" + __slots__ = tuple() - def __init__(self, channel): - """Initialize this instance from its parent write channel""" - self.channel = channel - - #{ Interface - + def __init__(self, device): + """Initialize the instance with the device to read from""" + def read(self, count=0, block=True, timeout=None): - """read a list of items read from the channel. The list, as a sequence + """read a list of items read from the device. The list, as a sequence of items, is similar to the string of characters returned when reading from file like objects. :param count: given amount of items to read. If < 1, all items will be read @@ -114,11 +147,25 @@ class Reader(object): given amount of seconds, returning the items it received so far. The timeout is applied to each read item, not for the whole operation. :return: single item in a list if count is 1, or a list of count items. - If the channel was empty and count was 1, an empty list will be returned. + If the device was empty and count was 1, an empty list will be returned. If count was greater 1, a list with less than count items will be returned. If count was < 1, a list with all items that could be read will be returned.""" + raise NotImplementedError() + + +class ChannelReader(Reader): + """Allows reading from a channel. The reader is thread-safe if the channel is as well""" + __slots__ = 'channel' + + def __init__(self, channel): + """Initialize this instance from its parent write channel""" + self.channel = channel + + #{ Interface + + def read(self, count=0, block=True, timeout=None): # if the channel is closed for writing, we never block # NOTE: is handled by the queue # We don't check for a closed state here has it costs time - most of @@ -191,12 +238,12 @@ class Reader(object): #} END interface -class CallbackReader(Reader): +class CallbackChannelReader(ChannelReader): """A channel which sends a callback before items are read from the channel""" __slots__ = "_pre_cb" def __init__(self, channel): - Reader.__init__(self, channel) + super(CallbackChannelReader, self).__init__(channel) self._pre_cb = None def set_pre_cb(self, fun = lambda count: None): @@ -213,13 +260,59 @@ class CallbackReader(Reader): def read(self, count=0, block=True, timeout=None): if self._pre_cb: self._pre_cb(count) - return Reader.read(self, count, block, timeout) + return super(CallbackChannelReader, self).read(count, block, timeout) + +class IteratorReader(Reader): + """A Reader allowing to read items from an iterator, instead of a channel. + Reads will never block. Its thread-safe""" + __slots__ = ("_empty", '_iter', '_lock') + + # the type of the lock to use when reading from the iterator + lock_type = threading.Lock + + def __init__(self, iterator): + self._empty = False + if not hasattr(iterator, 'next'): + raise ValueError("Iterator %r needs a next() function" % iterator) + self._iter = iterator + self._lock = self.lock_type() + + def read(self, count=0, block=True, timeout=None): + """Non-Blocking implementation of read""" + # not threadsafe, but worst thing that could happen is that + # we try to get items one more time + if self._empty: + return list() + # END early abort + + self._lock.acquire() + try: + if count == 0: + self._empty = True + return list(self._iter) + else: + out = list() + it = self._iter + for i in xrange(count): + try: + out.append(it.next()) + except StopIteration: + self._empty = True + break + # END handle empty iterator + # END for each item to take + return out + # END handle count + finally: + self._lock.release() + # END handle locking + #} END classes #{ Constructors -def mkchannel(ctype = Channel, wtype = Writer, rtype = Reader): +def mkchannel(ctype = Channel, wtype = ChannelWriter, rtype = ChannelReader): """Create a channel, with a reader and a writer :return: tuple(reader, writer) :param ctype: Channel to instantiate diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index cf14e47b..8f33a029 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -18,10 +18,10 @@ from Queue import ( from graph import Graph from channel import ( mkchannel, - Writer, + ChannelWriter, Channel, SerialChannel, - CallbackReader + CallbackChannelReader ) import sys @@ -32,13 +32,14 @@ import new __all__ = ('PoolReader', 'Pool', 'ThreadPool') -class PoolReader(CallbackReader): + +class PoolReader(CallbackChannelReader): """A reader designed to read from channels which take part in pools It acts like a handle to the underlying task in the pool.""" __slots__ = ('_task_ref', '_pool_ref') def __init__(self, channel, task, pool): - CallbackReader.__init__(self, channel) + CallbackChannelReader.__init__(self, channel) self._task_ref = weakref.ref(task) self._pool_ref = weakref.ref(pool) @@ -69,7 +70,7 @@ class PoolReader(CallbackReader): #{ Internal def _read(self, count=0, block=True, timeout=None): - return CallbackReader.read(self, count, block, timeout) + return CallbackChannelReader.read(self, count, block, timeout) #} END internal @@ -115,7 +116,7 @@ class PoolReader(CallbackReader): ####### read data ######## ########################## # read actual items, tasks were setup to put their output into our channel ( as well ) - items = CallbackReader.read(self, count, block, timeout) + items = CallbackChannelReader.read(self, count, block, timeout) ########################## @@ -446,7 +447,7 @@ class Pool(object): ch = None if wc is None: ch = ctype() - wc = Writer(ch) + wc = ChannelWriter(ch) task.set_writer(wc) else: ch = wc.channel diff --git a/lib/git/async/task.py b/lib/git/async/task.py index 0eb4527c..b7b5e699 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -1,5 +1,7 @@ from graph import Node from util import ReadOnly +from channel import IteratorReader + import threading import weakref @@ -179,56 +181,15 @@ class ThreadTaskBase(object): class InputIteratorTaskBase(OutputChannelTask): """Implements a task which processes items from an iterable in a multi-processing safe manner""" - __slots__ = ('_iterator', '_lock', '_empty') - # the type of the lock to use when reading from the iterator - lock_type = None + __slots__ = tuple() + def __init__(self, iterator, *args, **kwargs): OutputChannelTask.__init__(self, *args, **kwargs) - if not hasattr(iterator, 'next'): - raise ValueError("Iterator %r needs a next() function" % iterator) - self._iterator = iterator - self._lock = self.lock_type() - - # this is necessary to prevent a cyclic ref, preventing us from - # getting deleted ( and collected ) - weakself = weakref.ref(self) - self._read = lambda count: weakself().__read(count) - self._empty = False - + self._read = IteratorReader(iterator).read # defaults to returning our items unchanged self.fun = lambda item: item - - def __read(self, count=0): - """Read count items from the iterator, and return them""" - # not threadsafe, but worst thing that could happen is that - # we try to get items one more time - if self._empty: - return list() - # END early abort - - self._lock.acquire() - try: - if count == 0: - self._empty = True - return list(self._iterator) - else: - out = list() - it = self._iterator - for i in xrange(count): - try: - out.append(it.next()) - except StopIteration: - self._empty = True - break - # END handle empty iterator - # END for each item to take - return out - # END handle count - finally: - self._lock.release() - # END handle locking - + class InputIteratorThreadTask(InputIteratorTaskBase, ThreadTaskBase): """An input iterator for threaded pools""" -- cgit v1.2.3 From 1d8a577ffc6ad7ce1465001ddebdc157aecc1617 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sat, 12 Jun 2010 11:41:10 +0200 Subject: channel: cleaned up inheritance hierarchy, adding mixing for callback functionality - previously the callback functionality was bound to channel based readers/writers --- lib/git/async/channel.py | 33 +++++++++++++++++++++++---------- 1 file changed, 23 insertions(+), 10 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index 79cb5294..a29ff17c 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -103,13 +103,14 @@ class ChannelWriter(Writer): #} END interface -class CallbackChannelWriter(ChannelWriter): +class CallbackWriterMixin(object): """The write end of a channel which allows you to setup a callback to be called after an item was written to the channel""" - __slots__ = ('_pre_cb') + # slots don't work with mixin's :( + # __slots__ = ('_pre_cb') - def __init__(self, channel): - super(CallbackChannelWriter, self).__init__(channel) + def __init__(self, *args): + super(CallbackWriterMixin, self).__init__(*args) self._pre_cb = None def set_pre_cb(self, fun = lambda item: item): @@ -126,7 +127,12 @@ class CallbackChannelWriter(ChannelWriter): def write(self, item, block=True, timeout=None): if self._pre_cb: item = self._pre_cb(item) - super(CallbackChannelWriter, self).write(item, block, timeout) + super(CallbackWriterMixin, self).write(item, block, timeout) + + +class CallbackChannelWriter(CallbackWriterMixin, ChannelWriter): + """Implements a channel writer with callback functionality""" + pass class Reader(object): @@ -238,12 +244,14 @@ class ChannelReader(Reader): #} END interface -class CallbackChannelReader(ChannelReader): + +class CallbackReaderMixin(object): """A channel which sends a callback before items are read from the channel""" - __slots__ = "_pre_cb" + # unfortunately, slots can only use direct inheritance, have to turn it off :( + # __slots__ = "_pre_cb" - def __init__(self, channel): - super(CallbackChannelReader, self).__init__(channel) + def __init__(self, *args): + super(CallbackReaderMixin, self).__init__(*args) self._pre_cb = None def set_pre_cb(self, fun = lambda count: None): @@ -260,7 +268,12 @@ class CallbackChannelReader(ChannelReader): def read(self, count=0, block=True, timeout=None): if self._pre_cb: self._pre_cb(count) - return super(CallbackChannelReader, self).read(count, block, timeout) + return super(CallbackReaderMixin, self).read(count, block, timeout) + + +class CallbackChannelReader(CallbackReaderMixin, ChannelReader): + """Implements a channel reader with callback functionality""" + pass class IteratorReader(Reader): -- cgit v1.2.3 From 7a0b79ee574999ecbc76696506352e4a5a0d7159 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sat, 12 Jun 2010 12:38:02 +0200 Subject: task: improved naming of task types, improved pool test to be less dependent on starting with just the main thread --- lib/git/async/task.py | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/task.py b/lib/git/async/task.py index b7b5e699..ac948dc0 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -8,21 +8,27 @@ import weakref import sys import new -__all__ = ('OutputChannelTask', 'ThreadTaskBase', 'InputIteratorTaskBase', - 'InputIteratorThreadTask', 'InputChannelTask') +__all__ = ('Task', 'ThreadTaskBase', 'IteratorTaskBase', + 'IteratorThreadTask', 'ChannelThreadTask') -class OutputChannelTask(Node): +class Task(Node): """Abstracts a named task, which contains additional information on how the task should be queued and processed. - Results of the item processing are sent to a write channel, which is to be + Results of the item processing are sent to a writer, which is to be set by the creator using the ``set_writer`` method. + Items are read using the internal ``_read`` callable, subclasses are meant to + set this to a callable that supports the Reader interface's read function. + * **min_count** assures that not less than min_count items will be processed per call. * **max_chunksize** assures that multi-threading is happening in smaller chunks. If someone wants all items to be processed, using read(0), the whole task would go to one worker, as well as dependent tasks. If you want finer granularity , you can - specify this here, causing chunks to be no larger than max_chunksize""" + specify this here, causing chunks to be no larger than max_chunksize + * **apply_single** if True, default True, individual items will be given to the + worker function. If False, a list of possibly multiple items will be passed + instead.""" __slots__ = ( '_read', # method to yield items to process '_out_writer', # output write channel '_exc', # exception caught @@ -178,32 +184,32 @@ class ThreadTaskBase(object): pass -class InputIteratorTaskBase(OutputChannelTask): +class IteratorTaskBase(Task): """Implements a task which processes items from an iterable in a multi-processing safe manner""" __slots__ = tuple() def __init__(self, iterator, *args, **kwargs): - OutputChannelTask.__init__(self, *args, **kwargs) + Task.__init__(self, *args, **kwargs) self._read = IteratorReader(iterator).read # defaults to returning our items unchanged self.fun = lambda item: item -class InputIteratorThreadTask(InputIteratorTaskBase, ThreadTaskBase): +class IteratorThreadTask(IteratorTaskBase, ThreadTaskBase): """An input iterator for threaded pools""" lock_type = threading.Lock -class InputChannelTask(OutputChannelTask, ThreadTaskBase): +class ChannelThreadTask(Task, ThreadTaskBase): """Uses an input channel as source for reading items For instantiation, it takes all arguments of its base, the first one needs to be the input channel to read from though.""" __slots__ = "_pool_ref" def __init__(self, in_reader, *args, **kwargs): - OutputChannelTask.__init__(self, *args, **kwargs) + Task.__init__(self, *args, **kwargs) self._read = in_reader.read self._pool_ref = None -- cgit v1.2.3