From ab59f78341f1dd188aaf4c30526f6295c63438b1 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sat, 5 Jun 2010 20:03:09 +0200 Subject: Renamed mp to async, as this is a much better name for what is actually going on. The default implementation uses threads, which ends up being nothing more than async, as they are all locked down by internal and the global interpreter lock --- lib/git/async/pool.py | 116 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 116 insertions(+) create mode 100644 lib/git/async/pool.py (limited to 'lib/git/async/pool.py') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py new file mode 100644 index 00000000..f9f7880b --- /dev/null +++ b/lib/git/async/pool.py @@ -0,0 +1,116 @@ +"""Implementation of a thread-pool working with channels""" +from thread import WorkerThread +from channel import ( + Channel, + WChannel, + RChannel + ) + +class Node(object): + """A quick and dirty to the point implementation of a simple, and slow ascyclic graph. + Its not designed to support big graphs, and sports only the functionality + we need""" + __slots__ = ('in_nodes', 'out_nodes') + + +class Graph(object): + """A simple graph implementation, keeping nodes and providing basic access and + editing functions""" + __slots__ = "nodes" + + def add_node(self, node): + pass + + def del_node(self, node): + pass + + def visit_input_depth_first(self, node, visitor=lambda n: True ): + """Visit all input nodes of the given node, depth first, calling visitor + for each node on our way. If the function returns False, the traversal + will not go any deeper, but continue at the next branch""" + pass + + +class TaskNode(Node): + """Couples an input channel, an output channel, as well as a processing function + together. + It may contain additional information on how to handel read-errors from the + input channel""" + __slots__ = ('in_rc', 'out_wc', 'fun') + + def is_done(self): + """:return: True if we are finished processing""" + return self.out_wc.closed + + +class RPoolChannel(RChannel): + """ A read-only pool channel may not be wrapped or derived from, but it provides slots to call + before and after an item is to be read. + + It acts like a handle to the underlying task""" + __slots__ = ('_task', '_pool', '_pre_cb', '_post_cb') + + def set_post_cb(self, fun = lambda item: item): + """Install a callback to call after the item has been read. The function + returns a possibly changed item. If it raises, the exception will be propagated + in an IOError, indicating read-failure + If a function is not provided, the call is effectively uninstalled.""" + + def set_pre_cb(self, fun = lambda : None): + """Install a callback to call before an item is read from the channel. + If it fails, the read will fail with an IOError + If a function is not provided, the call is effectively uninstalled.""" + + def read(block=False, timeout=None): + """Read an item that was processed by one of our threads + :note: Triggers task dependency handling needed to provide the necessary + input""" + + #{ Internal + def _read(self, block=False, timeout=None): + """Calls the underlying channel's read directly, without triggering + the pool""" + return RChannel.read(self, block, timeout) + + #} END internal + + +class PoolWorker(WorkerThread): + """A worker thread which gets called to deal with Tasks. Tasks provide channls + with actual work, whose result will be send to the tasks output channel""" + + @classmethod + def perform_task(cls, task): + # note : when getting the input channel, be sure not to trigger + # RPoolChannel + pass + + +class ThreadPool(Graph): + """A thread pool maintains a set of one or more worker threads, but supports + a fully serial mode in which case the amount of threads is zero. + + Work is distributed via Channels, which form a dependency graph. The evaluation + is lazy, as work will only be done once an output is requested. + + :note: the current implementation returns channels which are meant to be + used only from the main thread""" + __slots__ = ( '_workers', # list of worker threads + '_queue', # master queue for tasks + '_ordered_tasks_cache' # tasks in order of evaluation, mapped by read channel + ) + + def del_node(self, task): + """Delete the node ( being a task ), but delete the entries in our output channel + cache as well""" + + + def set_pool_size(self, size=0): + """Set the amount of workers to use in this pool. + :param size: if 0, the pool will do all work itself in the calling thread, + otherwise the work will be distributed among the given amount of threads""" + + def add_task(self, task): + """Add a new task to be processed. + :return: your task instance with its output channel set. It can be used + to retrieve processed items""" -- cgit v1.2.3 From b72e2704022d889f116e49abf3e1e5d3e3192d3b Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sun, 6 Jun 2010 01:00:12 +0200 Subject: Improved pool design and started rough implementation, top down to learn while going. Tests will be written soon for verification, its still quite theoretical --- lib/git/async/pool.py | 273 +++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 217 insertions(+), 56 deletions(-) (limited to 'lib/git/async/pool.py') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index f9f7880b..7798d3d4 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -1,92 +1,146 @@ """Implementation of a thread-pool working with channels""" from thread import WorkerThread +from Queue import Queue + +from graph import ( + Graph, + Node + ) + from channel import ( Channel, WChannel, RChannel ) -class Node(object): - """A quick and dirty to the point implementation of a simple, and slow ascyclic graph. - Its not designed to support big graphs, and sports only the functionality - we need""" - __slots__ = ('in_nodes', 'out_nodes') - - -class Graph(object): - """A simple graph implementation, keeping nodes and providing basic access and - editing functions""" - __slots__ = "nodes" - - def add_node(self, node): - pass - - def del_node(self, node): - pass - - def visit_input_depth_first(self, node, visitor=lambda n: True ): - """Visit all input nodes of the given node, depth first, calling visitor - for each node on our way. If the function returns False, the traversal - will not go any deeper, but continue at the next branch""" - pass - +import weakref +import sys class TaskNode(Node): """Couples an input channel, an output channel, as well as a processing function together. It may contain additional information on how to handel read-errors from the input channel""" - __slots__ = ('in_rc', 'out_wc', 'fun') + __slots__ = ( 'in_rc', # input read channel + '_out_wc', # output write channel + '_pool_ref', # ref to our pool + '_exc', # exception caught + 'fun', # function to call with items read from in_rc + 'max_chunksize', # maximium amount of items to process per process call + 'apply_single' # apply single items even if multiple where read + ) + + def __init__(self, in_rc, fun, apply_single=True): + self.in_rc = in_rc + self._out_wc = None + self._pool_ref = None + self._exc = None + self.fun = fun + self.max_chunksize = 0 # note set + self.apply_single = apply_single def is_done(self): """:return: True if we are finished processing""" - return self.out_wc.closed + return self._out_wc.closed + + def set_done(self): + """Set ourselves to being done, has we have completed the processing""" + self._out_wc.close() + + def error(self): + """:return: Exception caught during last processing or None""" + return self._exc + def process(self, count=1): + """Process count items and send the result individually to the output channel""" + if self._out_wc is None: + raise IOError("Cannot work in uninitialized task") + + read = self.in_rc.read + if isinstance(self.in_rc, RPoolChannel) and self.in_rc._pool is self._pool_ref(): + read = self.in_rc._read + items = read(count) + + try: + if self.apply_single: + for item in items: + self._out_wc.write(self.fun(item)) + # END for each item + else: + self._out_wc.write(self.fun(items)) + # END handle single apply + except Exception, e: + self._exc = e + self.set_done() + # END exception handling + + # if we didn't get all demanded items, which is also the case if count is 0 + # we have depleted the input channel and are done + if len(items) != count: + self.set_done() + # END handle done state + #{ Configuration + class RPoolChannel(RChannel): """ A read-only pool channel may not be wrapped or derived from, but it provides slots to call before and after an item is to be read. - It acts like a handle to the underlying task""" + It acts like a handle to the underlying task in the pool.""" __slots__ = ('_task', '_pool', '_pre_cb', '_post_cb') - def set_post_cb(self, fun = lambda item: item): - """Install a callback to call after the item has been read. The function - returns a possibly changed item. If it raises, the exception will be propagated - in an IOError, indicating read-failure - If a function is not provided, the call is effectively uninstalled.""" + def __init__(self, wchannel, task, pool): + RChannel.__init__(self, wchannel) + self._task = task + self._pool = pool + self._pre_cb = None + self._post_cb = None - def set_pre_cb(self, fun = lambda : None): - """Install a callback to call before an item is read from the channel. + def __del__(self): + """Assures that our task will be deleted if we were the last reader""" + del(self._wc) # decrement ref-count + self._pool._del_task_if_orphaned(self._task) + + def set_pre_cb(self, fun = lambda count: None): + """Install a callback to call with the item count to be read before any + item is actually read from the channel. If it fails, the read will fail with an IOError If a function is not provided, the call is effectively uninstalled.""" + self._pre_cb = fun + + def set_post_cb(self, fun = lambda item: item): + """Install a callback to call after the items were read. The function + returns a possibly changed item list. If it raises, the exception will be propagated. + If a function is not provided, the call is effectively uninstalled.""" + self._post_cb = fun - def read(block=False, timeout=None): + def read(self, count=1, block=False, timeout=None): """Read an item that was processed by one of our threads :note: Triggers task dependency handling needed to provide the necessary input""" + if self._pre_cb: + self._pre_cb() + # END pre callback + + ################################################## + self._pool._prepare_processing(self._task, count) + ################################################## + + items = RChannel.read(self, count, block, timeout) + if self._post_cb: + items = self._post_cb(items) #{ Internal - def _read(self, block=False, timeout=None): + def _read(self, count=1, block=False, timeout=None): """Calls the underlying channel's read directly, without triggering the pool""" - return RChannel.read(self, block, timeout) + return RChannel.read(self, count, block, timeout) #} END internal - -class PoolWorker(WorkerThread): - """A worker thread which gets called to deal with Tasks. Tasks provide channls - with actual work, whose result will be send to the tasks output channel""" - - @classmethod - def perform_task(cls, task): - # note : when getting the input channel, be sure not to trigger - # RPoolChannel - pass -class ThreadPool(Graph): +class ThreadPool(object): """A thread pool maintains a set of one or more worker threads, but supports a fully serial mode in which case the amount of threads is zero. @@ -94,23 +148,130 @@ class ThreadPool(Graph): is lazy, as work will only be done once an output is requested. :note: the current implementation returns channels which are meant to be - used only from the main thread""" - __slots__ = ( '_workers', # list of worker threads + used only from the main thread, hence you cannot consume their results + from multiple threads unless you use a task for it.""" + __slots__ = ( '_tasks', # a graph of tasks + '_consumed_tasks', # a list with tasks that are done or had an error + '_workers', # list of worker threads '_queue', # master queue for tasks - '_ordered_tasks_cache' # tasks in order of evaluation, mapped by read channel + '_ordered_tasks_cache' # tasks in order of evaluation, mapped from task -> task list ) - def del_node(self, task): - """Delete the node ( being a task ), but delete the entries in our output channel - cache as well""" + def __init__(self, size=0): + self._tasks = Graph() + self._consumed_tasks = list() + self._workers = list() + self._queue = Queue() + self._ordered_tasks_cache = dict() + + def __del__(self): + raise NotImplementedError("TODO: Proper cleanup") + + #{ Internal + def _queue_feeder_visitor(self, task, count): + """Walk the graph and find tasks that are done for later cleanup, and + queue all others for processing by our worker threads ( if available ).""" + if task.error() or task.is_done(): + self._consumed_tasks.append(task) + + # if the task does not have the required output on its queue, schedule + # it for processing. If we should process all, we don't care about the + # amount as it should process until its all done. + if self._workers: + if count < 1 or task._out_wc.size() < count: + # respect the chunk size, and split the task up if we want + # to process too much. This can be defined per task + queue = self._queue + if task.max_chunksize: + chunksize = count / task.max_chunksize + remainder = count - (chunksize * task.max_chunksize) + for i in xrange(chunksize): + queue.put((task.process, chunksize)) + if remainder: + queue.put((task.process, remainder)) + else: + self._queue.put((task.process, count)) + # END handle chunksize + # END handle queuing + else: + # no workers, so we have to do the work ourselves + task.process(count) + # END handle serial mode + + # always walk the whole graph, we want to find consumed tasks + return True + + def _prepare_processing(self, task, count): + """Process the tasks which depend on the given one to be sure the input + channels are filled with data once we process the actual task + + Tasks have two important states: either they are done, or they are done + and have an error, so they are likely not to have finished all their work. + + Either way, we will put them onto a list of tasks to delete them, providng + information about the failed ones. + + Tasks which are not done will be put onto the queue for processing, which + is fine as we walked them depth-first.""" + self._tasks.visit_input_depth_first(task, lambda n: self._queue_feeder_visitor(n, count)) + + # delete consumed tasks to cleanup + for task in self._consumed_tasks: + self.del_task(task) + # END for each task to delete + del(self._consumed_tasks[:]) + + def _del_task_if_orphaned(self, task): + """Check the task, and delete it if it is orphaned""" + if sys.getrefcount(task._out_wc) < 3: + self.del_task(task) + #} END internal + + #{ Interface + + def del_task(self, task): + """Delete the task + Additionally we will remove orphaned tasks, which can be identified if their + output channel is only held by themselves, so no one will ever consume + its items.""" + # now delete our actual node - must set it done os it closes its channels. + # Otherwise further reads of output tasks will block. + # Actually they may still block if anyone wants to read all ... without + # a timeout + # keep its input nodes as we check whether they were orphaned + in_tasks = task.in_nodes + task.set_done() + self._tasks.del_node(task) + for t in in_tasks + self._del_task_if_orphaned(t) + # END handle orphans recursively def set_pool_size(self, size=0): """Set the amount of workers to use in this pool. :param size: if 0, the pool will do all work itself in the calling thread, otherwise the work will be distributed among the given amount of threads""" + raise NotImplementedError() def add_task(self, task): """Add a new task to be processed. - :return: your task instance with its output channel set. It can be used - to retrieve processed items""" + :return: a read channel to retrieve processed items. If that handle is lost, + the task will be considered orphaned and will be deleted on the next + occasion.""" + # create a write channel for it + wc, rc = Channel() + rc = RPoolChannel(wc, task, self) + task._out_wc = wc + task._pool_ref = weakref.ref(self) + + self._tasks.add_node(task) + + # If the input channel is one of our read channels, we add the relation + ic = task.in_rc + if isinstance(ic, RPoolChannel) and ic._pool is self: + self._tasks.add_edge(ic._task, task) + # END add task relation + + return rc + + #} END interface -- cgit v1.2.3 From ec28ad575ce1d7bb6a616ffc404f32bbb1af67b2 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sun, 6 Jun 2010 12:48:25 +0200 Subject: thread: adjusted worker thread not to provide an output queue anymore - this is handled by the task system graph: implemented it including test according to the pools requirements pool: implemented set_pool_size --- lib/git/async/pool.py | 53 ++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 48 insertions(+), 5 deletions(-) (limited to 'lib/git/async/pool.py') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 7798d3d4..9a24cbc5 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -25,7 +25,8 @@ class TaskNode(Node): '_out_wc', # output write channel '_pool_ref', # ref to our pool '_exc', # exception caught - 'fun', # function to call with items read from in_rc + 'fun', # function to call with items read from in_rc + 'min_count', # minimum amount of items to produce, None means no override 'max_chunksize', # maximium amount of items to process per process call 'apply_single' # apply single items even if multiple where read ) @@ -36,6 +37,7 @@ class TaskNode(Node): self._pool_ref = None self._exc = None self.fun = fun + self.min_count = None self.max_chunksize = 0 # note set self.apply_single = apply_single @@ -174,6 +176,12 @@ class ThreadPool(object): if task.error() or task.is_done(): self._consumed_tasks.append(task) + # allow min-count override. This makes sure we take at least min-count + # items off the input queue ( later ) + if task.min_count is not None: + count = task.min_count + # END handle min-count + # if the task does not have the required output on its queue, schedule # it for processing. If we should process all, we don't care about the # amount as it should process until its all done. @@ -213,7 +221,7 @@ class ThreadPool(object): Tasks which are not done will be put onto the queue for processing, which is fine as we walked them depth-first.""" - self._tasks.visit_input_depth_first(task, lambda n: self._queue_feeder_visitor(n, count)) + self._tasks.visit_input_inclusive_depth_first(task, lambda n: self._queue_feeder_visitor(n, count)) # delete consumed tasks to cleanup for task in self._consumed_tasks: @@ -233,7 +241,9 @@ class ThreadPool(object): """Delete the task Additionally we will remove orphaned tasks, which can be identified if their output channel is only held by themselves, so no one will ever consume - its items.""" + its items. + + :return: self""" # now delete our actual node - must set it done os it closes its channels. # Otherwise further reads of output tasks will block. # Actually they may still block if anyone wants to read all ... without @@ -246,12 +256,45 @@ class ThreadPool(object): for t in in_tasks self._del_task_if_orphaned(t) # END handle orphans recursively + + return self def set_pool_size(self, size=0): - """Set the amount of workers to use in this pool. + """Set the amount of workers to use in this pool. When reducing the size, + the call may block as it waits for threads to finish. + When reducing the size to zero, this thread will process all remaining + items on the queue. + + :return: self :param size: if 0, the pool will do all work itself in the calling thread, otherwise the work will be distributed among the given amount of threads""" - raise NotImplementedError() + # either start new threads, or kill existing ones. + # If we end up with no threads, we process the remaining chunks on the queue + # ourselves + cur_count = len(self._workers) + if cur_count < size: + for i in range(size - cur_count): + worker = WorkerThread(self._queue) + self._workers.append(worker) + # END for each new worker to create + elif cur_count > size: + del_count = cur_count - size + for i in range(del_count): + self._workers[i].stop_and_join() + # END for each thread to stop + del(self._workers[:del_count]) + # END handle count + + if size == 0: + while not self._queue.empty(): + try: + taskmethod, count = self._queue.get(False) + taskmethod(count) + except Queue.Empty: + continue + # END while there are tasks on the queue + # END process queue + return self def add_task(self, task): """Add a new task to be processed. -- cgit v1.2.3 From b3cde0ee162b8f0cb67da981311c8f9c16050a62 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sun, 6 Jun 2010 18:13:21 +0200 Subject: First step of testing the pool - tasks have been separated into a new module including own tests, their design improved to prepare them for some specifics that would be needed for multiprocessing support --- lib/git/async/pool.py | 164 ++++++++++++++++++-------------------------------- 1 file changed, 60 insertions(+), 104 deletions(-) (limited to 'lib/git/async/pool.py') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 9a24cbc5..2efc862b 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -1,10 +1,10 @@ """Implementation of a thread-pool working with channels""" from thread import WorkerThread +from task import InputChannelTask from Queue import Queue from graph import ( Graph, - Node ) from channel import ( @@ -16,73 +16,6 @@ from channel import ( import weakref import sys -class TaskNode(Node): - """Couples an input channel, an output channel, as well as a processing function - together. - It may contain additional information on how to handel read-errors from the - input channel""" - __slots__ = ( 'in_rc', # input read channel - '_out_wc', # output write channel - '_pool_ref', # ref to our pool - '_exc', # exception caught - 'fun', # function to call with items read from in_rc - 'min_count', # minimum amount of items to produce, None means no override - 'max_chunksize', # maximium amount of items to process per process call - 'apply_single' # apply single items even if multiple where read - ) - - def __init__(self, in_rc, fun, apply_single=True): - self.in_rc = in_rc - self._out_wc = None - self._pool_ref = None - self._exc = None - self.fun = fun - self.min_count = None - self.max_chunksize = 0 # note set - self.apply_single = apply_single - - def is_done(self): - """:return: True if we are finished processing""" - return self._out_wc.closed - - def set_done(self): - """Set ourselves to being done, has we have completed the processing""" - self._out_wc.close() - - def error(self): - """:return: Exception caught during last processing or None""" - return self._exc - - def process(self, count=1): - """Process count items and send the result individually to the output channel""" - if self._out_wc is None: - raise IOError("Cannot work in uninitialized task") - - read = self.in_rc.read - if isinstance(self.in_rc, RPoolChannel) and self.in_rc._pool is self._pool_ref(): - read = self.in_rc._read - items = read(count) - - try: - if self.apply_single: - for item in items: - self._out_wc.write(self.fun(item)) - # END for each item - else: - self._out_wc.write(self.fun(items)) - # END handle single apply - except Exception, e: - self._exc = e - self.set_done() - # END exception handling - - # if we didn't get all demanded items, which is also the case if count is 0 - # we have depleted the input channel and are done - if len(items) != count: - self.set_done() - # END handle done state - #{ Configuration - class RPoolChannel(RChannel): """ A read-only pool channel may not be wrapped or derived from, but it provides slots to call @@ -116,7 +49,7 @@ class RPoolChannel(RChannel): If a function is not provided, the call is effectively uninstalled.""" self._post_cb = fun - def read(self, count=1, block=False, timeout=None): + def read(self, count=0, block=False, timeout=None): """Read an item that was processed by one of our threads :note: Triggers task dependency handling needed to provide the necessary input""" @@ -131,9 +64,11 @@ class RPoolChannel(RChannel): items = RChannel.read(self, count, block, timeout) if self._post_cb: items = self._post_cb(items) + + return items #{ Internal - def _read(self, count=1, block=False, timeout=None): + def _read(self, count=0, block=False, timeout=None): """Calls the underlying channel's read directly, without triggering the pool""" return RChannel.read(self, count, block, timeout) @@ -141,7 +76,6 @@ class RPoolChannel(RChannel): #} END internal - class ThreadPool(object): """A thread pool maintains a set of one or more worker threads, but supports a fully serial mode in which case the amount of threads is zero. @@ -149,6 +83,15 @@ class ThreadPool(object): Work is distributed via Channels, which form a dependency graph. The evaluation is lazy, as work will only be done once an output is requested. + The thread pools inherent issue is the global interpreter lock that it will hit, + which gets worse considering a few c extensions specifically lock their part + globally as well. The only way this will improve is if custom c extensions + are written which do some bulk work, but release the GIL once they have acquired + their resources. + + Due to the nature of having multiple objects in git, its easy to distribute + that work cleanly among threads. + :note: the current implementation returns channels which are meant to be used only from the main thread, hence you cannot consume their results from multiple threads unless you use a task for it.""" @@ -156,7 +99,6 @@ class ThreadPool(object): '_consumed_tasks', # a list with tasks that are done or had an error '_workers', # list of worker threads '_queue', # master queue for tasks - '_ordered_tasks_cache' # tasks in order of evaluation, mapped from task -> task list ) def __init__(self, size=0): @@ -164,10 +106,10 @@ class ThreadPool(object): self._consumed_tasks = list() self._workers = list() self._queue = Queue() - self._ordered_tasks_cache = dict() + self.set_size(size) def __del__(self): - raise NotImplementedError("TODO: Proper cleanup") + self.set_size(0) #{ Internal def _queue_feeder_visitor(self, task, count): @@ -175,7 +117,7 @@ class ThreadPool(object): queue all others for processing by our worker threads ( if available ).""" if task.error() or task.is_done(): self._consumed_tasks.append(task) - + # allow min-count override. This makes sure we take at least min-count # items off the input queue ( later ) if task.min_count is not None: @@ -236,30 +178,11 @@ class ThreadPool(object): #} END internal #{ Interface + def size(self): + """:return: amount of workers in the pool""" + return len(self._workers) - def del_task(self, task): - """Delete the task - Additionally we will remove orphaned tasks, which can be identified if their - output channel is only held by themselves, so no one will ever consume - its items. - - :return: self""" - # now delete our actual node - must set it done os it closes its channels. - # Otherwise further reads of output tasks will block. - # Actually they may still block if anyone wants to read all ... without - # a timeout - # keep its input nodes as we check whether they were orphaned - in_tasks = task.in_nodes - task.set_done() - self._tasks.del_node(task) - - for t in in_tasks - self._del_task_if_orphaned(t) - # END handle orphans recursively - - return self - - def set_pool_size(self, size=0): + def set_size(self, size=0): """Set the amount of workers to use in this pool. When reducing the size, the call may block as it waits for threads to finish. When reducing the size to zero, this thread will process all remaining @@ -275,6 +198,7 @@ class ThreadPool(object): if cur_count < size: for i in range(size - cur_count): worker = WorkerThread(self._queue) + worker.start() self._workers.append(worker) # END for each new worker to create elif cur_count > size: @@ -295,7 +219,33 @@ class ThreadPool(object): # END while there are tasks on the queue # END process queue return self - + + def num_tasks(self): + """:return: amount of tasks""" + return len(self._tasks.nodes) + + def del_task(self, task): + """Delete the task + Additionally we will remove orphaned tasks, which can be identified if their + output channel is only held by themselves, so no one will ever consume + its items. + + :return: self""" + # now delete our actual node - must set it done os it closes its channels. + # Otherwise further reads of output tasks will block. + # Actually they may still block if anyone wants to read all ... without + # a timeout + # keep its input nodes as we check whether they were orphaned + in_tasks = task.in_nodes + task.set_done() + self._tasks.del_node(task) + + for t in in_tasks: + self._del_task_if_orphaned(t) + # END handle orphans recursively + + return self + def add_task(self, task): """Add a new task to be processed. :return: a read channel to retrieve processed items. If that handle is lost, @@ -305,15 +255,21 @@ class ThreadPool(object): wc, rc = Channel() rc = RPoolChannel(wc, task, self) task._out_wc = wc - task._pool_ref = weakref.ref(self) + + has_input_channel = isinstance(task, InputChannelTask) + if has_input_channel: + task._pool_ref = weakref.ref(self) + # END init input channel task self._tasks.add_node(task) # If the input channel is one of our read channels, we add the relation - ic = task.in_rc - if isinstance(ic, RPoolChannel) and ic._pool is self: - self._tasks.add_edge(ic._task, task) - # END add task relation + if has_input_channel: + ic = task.in_rc + if isinstance(ic, RPoolChannel) and ic._pool is self: + self._tasks.add_edge(ic._task, task) + # END add task relation + # END handle input channels for connections return rc -- cgit v1.2.3 From 1b27292936c81637f6b9a7141dafaad1126f268e Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sun, 6 Jun 2010 21:15:13 +0200 Subject: Plenty of fixes in the chunking routine, made possible by a serialized chunking test. Next up, actual async processing --- lib/git/async/pool.py | 66 +++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 51 insertions(+), 15 deletions(-) (limited to 'lib/git/async/pool.py') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 2efc862b..620e2258 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -117,36 +117,72 @@ class ThreadPool(object): queue all others for processing by our worker threads ( if available ).""" if task.error() or task.is_done(): self._consumed_tasks.append(task) + return True + # END stop processing # allow min-count override. This makes sure we take at least min-count # items off the input queue ( later ) - if task.min_count is not None: + if task.min_count is not None and count != 0 and count < task.min_count: count = task.min_count # END handle min-count # if the task does not have the required output on its queue, schedule # it for processing. If we should process all, we don't care about the # amount as it should process until its all done. - if self._workers: - if count < 1 or task._out_wc.size() < count: + if count < 1 or task._out_wc.size() < count: + numchunks = 1 + chunksize = count + remainder = 0 + + # we need the count set for this - can't chunk up unlimited items + # In serial mode we could do this by checking for empty input channels, + # but in dispatch mode its impossible ( == not easily possible ) + # Only try it if we have enough demand + if task.max_chunksize and count > task.max_chunksize: + numchunks = count / task.max_chunksize + chunksize = task.max_chunksize + remainder = count - (numchunks * chunksize) + # END handle chunking + + print count, numchunks, chunksize, remainder + # the following loops are kind of unrolled - code duplication + # should make things execute faster. Putting the if statements + # into the loop would be less code, but ... slower + if self._workers: # respect the chunk size, and split the task up if we want # to process too much. This can be defined per task queue = self._queue - if task.max_chunksize: - chunksize = count / task.max_chunksize - remainder = count - (chunksize * task.max_chunksize) - for i in xrange(chunksize): + if numchunks > 1: + for i in xrange(numchunks): queue.put((task.process, chunksize)) - if remainder: - queue.put((task.process, remainder)) + # END for each chunk to put + else: + queue.put((task.process, chunksize)) + # END try efficient looping + + if remainder: + queue.put((task.process, remainder)) + # END handle chunksize + else: + # no workers, so we have to do the work ourselves + if numchunks > 1: + for i in xrange(numchunks): + task.process(chunksize) + # END for each chunk to put else: - self._queue.put((task.process, count)) + task.process(chunksize) + # END try efficient looping + + if remainder: + task.process(remainder) # END handle chunksize - # END handle queuing - else: - # no workers, so we have to do the work ourselves - task.process(count) - # END handle serial mode + + # as we are serial, we can check for consumption right away + if task.error() or task.is_done(): + self._consumed_tasks.append(task) + # END handle consumption + # END handle serial mode + # END handle queuing # always walk the whole graph, we want to find consumed tasks return True -- cgit v1.2.3 From 6a252661c3bf4202a4d571f9c41d2afa48d9d75f Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sun, 6 Jun 2010 23:41:20 +0200 Subject: pool: First version which works as expected in async mode. Its just using a single task for now, but next up are dependent tasks --- lib/git/async/pool.py | 44 +++++++++++++++++++++++++++----------------- 1 file changed, 27 insertions(+), 17 deletions(-) (limited to 'lib/git/async/pool.py') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 620e2258..fcb0f442 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -49,7 +49,7 @@ class RPoolChannel(RChannel): If a function is not provided, the call is effectively uninstalled.""" self._post_cb = fun - def read(self, count=0, block=False, timeout=None): + def read(self, count=0, block=True, timeout=None): """Read an item that was processed by one of our threads :note: Triggers task dependency handling needed to provide the necessary input""" @@ -57,14 +57,21 @@ class RPoolChannel(RChannel): self._pre_cb() # END pre callback - ################################################## - self._pool._prepare_processing(self._task, count) - ################################################## + ########## prepare ############################## + self._pool._prepare_channel_read(self._task, count) + + ######### read data ###### + # read actual items, tasks were setup to put their output into our channel ( as well ) items = RChannel.read(self, count, block, timeout) + if self._post_cb: items = self._post_cb(items) + + ####### Finalize ######## + self._pool._post_channel_read(self._task) + return items #{ Internal @@ -119,17 +126,17 @@ class ThreadPool(object): self._consumed_tasks.append(task) return True # END stop processing - - # allow min-count override. This makes sure we take at least min-count - # items off the input queue ( later ) - if task.min_count is not None and count != 0 and count < task.min_count: - count = task.min_count - # END handle min-count # if the task does not have the required output on its queue, schedule # it for processing. If we should process all, we don't care about the # amount as it should process until its all done. if count < 1 or task._out_wc.size() < count: + # allow min-count override. This makes sure we take at least min-count + # items off the input queue ( later ) + if task.min_count is not None and 0 < count < task.min_count: + count = task.min_count + # END handle min-count + numchunks = 1 chunksize = count remainder = 0 @@ -144,10 +151,10 @@ class ThreadPool(object): remainder = count - (numchunks * chunksize) # END handle chunking - print count, numchunks, chunksize, remainder # the following loops are kind of unrolled - code duplication # should make things execute faster. Putting the if statements # into the loop would be less code, but ... slower + print count, numchunks, chunksize, remainder, task._out_wc.size() if self._workers: # respect the chunk size, and split the task up if we want # to process too much. This can be defined per task @@ -176,18 +183,13 @@ class ThreadPool(object): if remainder: task.process(remainder) # END handle chunksize - - # as we are serial, we can check for consumption right away - if task.error() or task.is_done(): - self._consumed_tasks.append(task) - # END handle consumption # END handle serial mode # END handle queuing # always walk the whole graph, we want to find consumed tasks return True - def _prepare_processing(self, task, count): + def _prepare_channel_read(self, task, count): """Process the tasks which depend on the given one to be sure the input channels are filled with data once we process the actual task @@ -201,10 +203,18 @@ class ThreadPool(object): is fine as we walked them depth-first.""" self._tasks.visit_input_inclusive_depth_first(task, lambda n: self._queue_feeder_visitor(n, count)) + def _post_channel_read(self, task): + """Called after we processed a read to cleanup""" + # check whether we consumed the task, and schedule it for deletion + if task.error() or task.is_done(): + self._consumed_tasks.append(task) + # END handle consumption + # delete consumed tasks to cleanup for task in self._consumed_tasks: self.del_task(task) # END for each task to delete + del(self._consumed_tasks[:]) def _del_task_if_orphaned(self, task): -- cgit v1.2.3 From a8a448b7864e21db46184eab0f0a21d7725d074f Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Mon, 7 Jun 2010 10:38:22 +0200 Subject: pool.consumed_tasks: is now a queue to be thread safe, in preparation for multiple connected pools Reduced waiting time in tests to make them complete faster --- lib/git/async/pool.py | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) (limited to 'lib/git/async/pool.py') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index fcb0f442..5518e37e 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -1,7 +1,7 @@ """Implementation of a thread-pool working with channels""" from thread import WorkerThread from task import InputChannelTask -from Queue import Queue +from Queue import Queue, Empty from graph import ( Graph, @@ -103,14 +103,14 @@ class ThreadPool(object): used only from the main thread, hence you cannot consume their results from multiple threads unless you use a task for it.""" __slots__ = ( '_tasks', # a graph of tasks - '_consumed_tasks', # a list with tasks that are done or had an error + '_consumed_tasks', # a queue with tasks that are done or had an error '_workers', # list of worker threads '_queue', # master queue for tasks ) def __init__(self, size=0): self._tasks = Graph() - self._consumed_tasks = list() + self._consumed_tasks = Queue() # make sure its threadsafe self._workers = list() self._queue = Queue() self.set_size(size) @@ -123,7 +123,7 @@ class ThreadPool(object): """Walk the graph and find tasks that are done for later cleanup, and queue all others for processing by our worker threads ( if available ).""" if task.error() or task.is_done(): - self._consumed_tasks.append(task) + self._consumed_tasks.put(task) return True # END stop processing @@ -206,16 +206,21 @@ class ThreadPool(object): def _post_channel_read(self, task): """Called after we processed a read to cleanup""" # check whether we consumed the task, and schedule it for deletion + # This could have happend after the read returned ( even though the pre-read + # checks it as well ) if task.error() or task.is_done(): - self._consumed_tasks.append(task) + self._consumed_tasks.put(task) # END handle consumption # delete consumed tasks to cleanup - for task in self._consumed_tasks: - self.del_task(task) - # END for each task to delete - - del(self._consumed_tasks[:]) + try: + while True: + ct = self._consumed_tasks.get(False) + self.del_task(ct) + # END for each task to delete + except Empty: + pass + # END pop queue empty def _del_task_if_orphaned(self, task): """Check the task, and delete it if it is orphaned""" @@ -236,7 +241,9 @@ class ThreadPool(object): :return: self :param size: if 0, the pool will do all work itself in the calling thread, - otherwise the work will be distributed among the given amount of threads""" + otherwise the work will be distributed among the given amount of threads + + :note: currently NOT threadsafe !""" # either start new threads, or kill existing ones. # If we end up with no threads, we process the remaining chunks on the queue # ourselves -- cgit v1.2.3 From 619662a9138fd78df02c52cae6dc89db1d70a0e5 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Mon, 7 Jun 2010 12:10:56 +0200 Subject: changed scheduling and chunksize calculation in respect to the task.min_count, to fix theoretical option for a deadlock in serial mode, and unnecessary blocking in async mode --- lib/git/async/pool.py | 213 ++++++++++++++++++++++++++++++++------------------ 1 file changed, 137 insertions(+), 76 deletions(-) (limited to 'lib/git/async/pool.py') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 5518e37e..009096f2 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -1,5 +1,6 @@ """Implementation of a thread-pool working with channels""" from thread import WorkerThread +from threading import Lock from task import InputChannelTask from Queue import Queue, Empty @@ -83,7 +84,7 @@ class RPoolChannel(RChannel): #} END internal -class ThreadPool(object): +class Pool(object): """A thread pool maintains a set of one or more worker threads, but supports a fully serial mode in which case the amount of threads is zero. @@ -106,88 +107,35 @@ class ThreadPool(object): '_consumed_tasks', # a queue with tasks that are done or had an error '_workers', # list of worker threads '_queue', # master queue for tasks + '_taskgraph_lock', # lock for accessing the task graph ) + # CONFIGURATION + # The type of worker to create - its expected to provide the Thread interface, + # taking the taskqueue as only init argument + # as well as a method called stop_and_join() to terminate it + WorkerCls = None + + # The type of lock to use to protect critical sections, providing the + # threading.Lock interface + LockCls = None + + # the type of the task queue to use - it must provide the Queue interface + TaskQueueCls = None + + def __init__(self, size=0): self._tasks = Graph() self._consumed_tasks = Queue() # make sure its threadsafe self._workers = list() - self._queue = Queue() + self._queue = self.TaskQueueCls() + self._taskgraph_lock = self.LockCls() self.set_size(size) def __del__(self): self.set_size(0) #{ Internal - def _queue_feeder_visitor(self, task, count): - """Walk the graph and find tasks that are done for later cleanup, and - queue all others for processing by our worker threads ( if available ).""" - if task.error() or task.is_done(): - self._consumed_tasks.put(task) - return True - # END stop processing - - # if the task does not have the required output on its queue, schedule - # it for processing. If we should process all, we don't care about the - # amount as it should process until its all done. - if count < 1 or task._out_wc.size() < count: - # allow min-count override. This makes sure we take at least min-count - # items off the input queue ( later ) - if task.min_count is not None and 0 < count < task.min_count: - count = task.min_count - # END handle min-count - - numchunks = 1 - chunksize = count - remainder = 0 - - # we need the count set for this - can't chunk up unlimited items - # In serial mode we could do this by checking for empty input channels, - # but in dispatch mode its impossible ( == not easily possible ) - # Only try it if we have enough demand - if task.max_chunksize and count > task.max_chunksize: - numchunks = count / task.max_chunksize - chunksize = task.max_chunksize - remainder = count - (numchunks * chunksize) - # END handle chunking - - # the following loops are kind of unrolled - code duplication - # should make things execute faster. Putting the if statements - # into the loop would be less code, but ... slower - print count, numchunks, chunksize, remainder, task._out_wc.size() - if self._workers: - # respect the chunk size, and split the task up if we want - # to process too much. This can be defined per task - queue = self._queue - if numchunks > 1: - for i in xrange(numchunks): - queue.put((task.process, chunksize)) - # END for each chunk to put - else: - queue.put((task.process, chunksize)) - # END try efficient looping - - if remainder: - queue.put((task.process, remainder)) - # END handle chunksize - else: - # no workers, so we have to do the work ourselves - if numchunks > 1: - for i in xrange(numchunks): - task.process(chunksize) - # END for each chunk to put - else: - task.process(chunksize) - # END try efficient looping - - if remainder: - task.process(remainder) - # END handle chunksize - # END handle serial mode - # END handle queuing - - # always walk the whole graph, we want to find consumed tasks - return True def _prepare_channel_read(self, task, count): """Process the tasks which depend on the given one to be sure the input @@ -201,7 +149,98 @@ class ThreadPool(object): Tasks which are not done will be put onto the queue for processing, which is fine as we walked them depth-first.""" - self._tasks.visit_input_inclusive_depth_first(task, lambda n: self._queue_feeder_visitor(n, count)) + dfirst_tasks = list() + # for the walk, we must make sure the ordering does not change + # Note: the result of this could be cached + self._tasks.visit_input_inclusive_depth_first(task, lambda n: dfirst_tasks.append(n)) + + # check the min count on all involved tasks, and be sure that we don't + # have any task which produces less than the maximum min-count of all tasks + # The actual_count is used when chunking tasks up for the queue, whereas + # the count is usued to determine whether we still have enough output + # on the queue, checking qsize ( ->revise ) + # ABTRACT: If T depends on T-1, and the client wants 1 item, T produces + # at least 10, T-1 goes with 1, then T will block after 1 item, which + # is read by the client. On the next read of 1 item, we would find T's + # queue empty and put in another 10, which could put another thread into + # blocking state. T-1 produces one more item, which is consumed right away + # by the two threads running T. Although this works in the end, it leaves + # many threads blocking and waiting for input, which is not desired. + # Setting the min-count to the max of the mincount of all tasks assures + # we have enough items for all. + # Addition: in serial mode, we would enter a deadlock if one task would + # ever wait for items ! + actual_count = count + min_counts = (((t.min_count is not None and t.min_count) or count) for t in dfirst_tasks) + min_count = reduce(lambda m1, m2: max(m1, m2), min_counts) + if 0 < count < min_count: + actual_count = min_count + # END set actual count + + # the list includes our tasks - the first one to evaluate first, the + # requested one last + for task in dfirst_tasks: + if task.error() or task.is_done(): + self._consumed_tasks.put(task) + continue + # END skip processing + + # if the task does not have the required output on its queue, schedule + # it for processing. If we should process all, we don't care about the + # amount as it should process until its all done. + # NOTE: revise this for multi-tasking - checking qsize doesnt work there ! + if count < 1 or task._out_wc.size() < count: + # but we continue to use the actual count to produce the output + numchunks = 1 + chunksize = actual_count + remainder = 0 + + # we need the count set for this - can't chunk up unlimited items + # In serial mode we could do this by checking for empty input channels, + # but in dispatch mode its impossible ( == not easily possible ) + # Only try it if we have enough demand + if task.max_chunksize and actual_count > task.max_chunksize: + numchunks = actual_count / task.max_chunksize + chunksize = task.max_chunksize + remainder = actual_count - (numchunks * chunksize) + # END handle chunking + + # the following loops are kind of unrolled - code duplication + # should make things execute faster. Putting the if statements + # into the loop would be less code, but ... slower + print actual_count, numchunks, chunksize, remainder, task._out_wc.size() + if self._workers: + # respect the chunk size, and split the task up if we want + # to process too much. This can be defined per task + queue = self._queue + if numchunks > 1: + for i in xrange(numchunks): + queue.put((task.process, chunksize)) + # END for each chunk to put + else: + queue.put((task.process, chunksize)) + # END try efficient looping + + if remainder: + queue.put((task.process, remainder)) + # END handle chunksize + else: + # no workers, so we have to do the work ourselves + if numchunks > 1: + for i in xrange(numchunks): + task.process(chunksize) + # END for each chunk to put + else: + task.process(chunksize) + # END try efficient looping + + if remainder: + task.process(remainder) + # END handle chunksize + # END handle serial mode + # END handle queuing + # END for each task to process + def _post_channel_read(self, task): """Called after we processed a read to cleanup""" @@ -250,7 +289,7 @@ class ThreadPool(object): cur_count = len(self._workers) if cur_count < size: for i in range(size - cur_count): - worker = WorkerThread(self._queue) + worker = self.WorkerCls(self._queue) worker.start() self._workers.append(worker) # END for each new worker to create @@ -291,7 +330,12 @@ class ThreadPool(object): # keep its input nodes as we check whether they were orphaned in_tasks = task.in_nodes task.set_done() - self._tasks.del_node(task) + self._taskgraph_lock.acquire() + try: + self._tasks.del_node(task) + finally: + self._taskgraph_lock.release() + # END locked deletion for t in in_tasks: self._del_task_if_orphaned(t) @@ -314,16 +358,33 @@ class ThreadPool(object): task._pool_ref = weakref.ref(self) # END init input channel task - self._tasks.add_node(task) + self._taskgraph_lock.acquire() + try: + self._tasks.add_node(task) + finally: + self._taskgraph_lock.release() + # END sync task addition # If the input channel is one of our read channels, we add the relation if has_input_channel: ic = task.in_rc if isinstance(ic, RPoolChannel) and ic._pool is self: - self._tasks.add_edge(ic._task, task) + self._taskgraph_lock.acquire() + try: + self._tasks.add_edge(ic._task, task) + finally: + self._taskgraph_lock.release() + # END handle edge-adding # END add task relation # END handle input channels for connections return rc #} END interface + + +class ThreadPool(Pool): + """A pool using threads as worker""" + WorkerCls = WorkerThread + LockCls = Lock + TaskQueueCls = Queue -- cgit v1.2.3 From 8c3c271b0d6b5f56b86e3f177caf3e916b509b52 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Mon, 7 Jun 2010 13:05:35 +0200 Subject: Added task order cache, and a lock to prevent us walking the graph while changing tasks Now processing more items to test performance, in dual-threaded mode as well, and its rather bad, have to figure out the reason for this, probably gil, but queues could help --- lib/git/async/pool.py | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) (limited to 'lib/git/async/pool.py') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 009096f2..26a6a182 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -107,6 +107,7 @@ class Pool(object): '_consumed_tasks', # a queue with tasks that are done or had an error '_workers', # list of worker threads '_queue', # master queue for tasks + '_taskorder_cache', # map task id -> ordered dependent tasks '_taskgraph_lock', # lock for accessing the task graph ) @@ -130,6 +131,7 @@ class Pool(object): self._workers = list() self._queue = self.TaskQueueCls() self._taskgraph_lock = self.LockCls() + self._taskorder_cache = dict() self.set_size(size) def __del__(self): @@ -149,10 +151,21 @@ class Pool(object): Tasks which are not done will be put onto the queue for processing, which is fine as we walked them depth-first.""" - dfirst_tasks = list() - # for the walk, we must make sure the ordering does not change - # Note: the result of this could be cached - self._tasks.visit_input_inclusive_depth_first(task, lambda n: dfirst_tasks.append(n)) + # for the walk, we must make sure the ordering does not change. Even + # when accessing the cache, as it is related to graph changes + self._taskgraph_lock.acquire() + try: + try: + dfirst_tasks = self._taskorder_cache[id(task)] + except KeyError: + # have to retrieve the list from the graph + dfirst_tasks = list() + self._tasks.visit_input_inclusive_depth_first(task, lambda n: dfirst_tasks.append(n)) + self._taskorder_cache[id(task)] = dfirst_tasks + # END handle cached order retrieval + finally: + self._taskgraph_lock.release() + # END handle locking # check the min count on all involved tasks, and be sure that we don't # have any task which produces less than the maximum min-count of all tasks @@ -208,7 +221,8 @@ class Pool(object): # the following loops are kind of unrolled - code duplication # should make things execute faster. Putting the if statements # into the loop would be less code, but ... slower - print actual_count, numchunks, chunksize, remainder, task._out_wc.size() + # DEBUG + # print actual_count, numchunks, chunksize, remainder, task._out_wc.size() if self._workers: # respect the chunk size, and split the task up if we want # to process too much. This can be defined per task @@ -332,6 +346,7 @@ class Pool(object): task.set_done() self._taskgraph_lock.acquire() try: + self._taskorder_cache.clear() self._tasks.del_node(task) finally: self._taskgraph_lock.release() @@ -360,6 +375,7 @@ class Pool(object): self._taskgraph_lock.acquire() try: + self._taskorder_cache.clear() self._tasks.add_node(task) finally: self._taskgraph_lock.release() -- cgit v1.2.3 From edd9e23c766cfd51b3a6f6eee5aac0b791ef2fd0 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Mon, 7 Jun 2010 17:16:48 +0200 Subject: added high-speed locking facilities, allowing our Queue to be faster, at least in tests, and with multiple threads. There is still an sync bug in regard to closed channels to be fixed, as the Task.set_done handling is incorrecft --- lib/git/async/pool.py | 241 ++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 186 insertions(+), 55 deletions(-) (limited to 'lib/git/async/pool.py') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 26a6a182..30291835 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -1,8 +1,16 @@ """Implementation of a thread-pool working with channels""" from thread import WorkerThread -from threading import Lock + +from threading import ( + Lock, + _Condition, + _sleep, + _time, + ) + from task import InputChannelTask from Queue import Queue, Empty +from collections import deque from graph import ( Graph, @@ -18,6 +26,96 @@ import weakref import sys +#{ Utilities + +class SyncQueue(deque): + """Adapter to allow using a deque like a queue, without locking""" + def get(self, block=True, timeout=None): + try: + return self.pop() + except IndexError: + raise Empty + # END raise empty + + def empty(self): + return len(self) == 0 + + put = deque.append + + +class HSCondition(_Condition): + """An attempt to make conditions less blocking, which gains performance + in return by sleeping less""" + delay = 0.00002 # reduces wait times, but increases overhead + + def wait(self, timeout=None): + waiter = Lock() + waiter.acquire() + self.__dict__['_Condition__waiters'].append(waiter) + saved_state = self._release_save() + try: # restore state no matter what (e.g., KeyboardInterrupt) + if timeout is None: + waiter.acquire() + else: + # Balancing act: We can't afford a pure busy loop, so we + # have to sleep; but if we sleep the whole timeout time, + # we'll be unresponsive. The scheme here sleeps very + # little at first, longer as time goes on, but never longer + # than 20 times per second (or the timeout time remaining). + endtime = _time() + timeout + delay = self.delay + acquire = waiter.acquire + while True: + gotit = acquire(0) + if gotit: + break + remaining = endtime - _time() + if remaining <= 0: + break + delay = min(delay * 2, remaining, .05) + _sleep(delay) + # END endless loop + if not gotit: + try: + self.__dict__['_Condition__waiters'].remove(waiter) + except ValueError: + pass + # END didn't ever get it + finally: + self._acquire_restore(saved_state) + + def notify(self, n=1): + __waiters = self.__dict__['_Condition__waiters'] + if not __waiters: + return + if n == 1: + __waiters[0].release() + try: + __waiters.pop(0) + except IndexError: + pass + else: + waiters = __waiters[:n] + for waiter in waiters: + waiter.release() + try: + __waiters.remove(waiter) + except ValueError: + pass + # END handle n = 1 case faster + +class PerfQueue(Queue): + """A queue using different condition objects to gain multithreading performance""" + def __init__(self, maxsize=0): + Queue.__init__(self, maxsize) + + self.not_empty = HSCondition(self.mutex) + self.not_full = HSCondition(self.mutex) + self.all_tasks_done = HSCondition(self.mutex) + + +#} END utilities + class RPoolChannel(RChannel): """ A read-only pool channel may not be wrapped or derived from, but it provides slots to call before and after an item is to be read. @@ -49,7 +147,7 @@ class RPoolChannel(RChannel): returns a possibly changed item list. If it raises, the exception will be propagated. If a function is not provided, the call is effectively uninstalled.""" self._post_cb = fun - + def read(self, count=0, block=True, timeout=None): """Read an item that was processed by one of our threads :note: Triggers task dependency handling needed to provide the necessary @@ -58,8 +156,18 @@ class RPoolChannel(RChannel): self._pre_cb() # END pre callback + # if we have count items, don't do any queue preparation - if someone + # depletes the queue in the meanwhile, the channel will close and + # we will unblock naturally + have_enough = False + if count > 0: + # explicitly > count, as we want a certain safe range + have_enough = self._wc._queue.qsize() > count + # END risky game + ########## prepare ############################## - self._pool._prepare_channel_read(self._task, count) + if not have_enough: + self._pool._prepare_channel_read(self._task, count) ######### read data ###### @@ -127,9 +235,9 @@ class Pool(object): def __init__(self, size=0): self._tasks = Graph() - self._consumed_tasks = Queue() # make sure its threadsafe + self._consumed_tasks = None self._workers = list() - self._queue = self.TaskQueueCls() + self._queue = SyncQueue() # start with a sync queue self._taskgraph_lock = self.LockCls() self._taskorder_cache = dict() self.set_size(size) @@ -201,58 +309,60 @@ class Pool(object): # if the task does not have the required output on its queue, schedule # it for processing. If we should process all, we don't care about the # amount as it should process until its all done. - # NOTE: revise this for multi-tasking - checking qsize doesnt work there ! - if count < 1 or task._out_wc.size() < count: - # but we continue to use the actual count to produce the output - numchunks = 1 - chunksize = actual_count - remainder = 0 - - # we need the count set for this - can't chunk up unlimited items - # In serial mode we could do this by checking for empty input channels, - # but in dispatch mode its impossible ( == not easily possible ) - # Only try it if we have enough demand - if task.max_chunksize and actual_count > task.max_chunksize: - numchunks = actual_count / task.max_chunksize - chunksize = task.max_chunksize - remainder = actual_count - (numchunks * chunksize) - # END handle chunking - - # the following loops are kind of unrolled - code duplication - # should make things execute faster. Putting the if statements - # into the loop would be less code, but ... slower - # DEBUG - # print actual_count, numchunks, chunksize, remainder, task._out_wc.size() - if self._workers: - # respect the chunk size, and split the task up if we want - # to process too much. This can be defined per task - queue = self._queue - if numchunks > 1: - for i in xrange(numchunks): - queue.put((task.process, chunksize)) - # END for each chunk to put - else: + #if count > 1 and task._out_wc.size() >= count: + # continue + # END skip if we have enough + + # but use the actual count to produce the output, we may produce + # more than requested + numchunks = 1 + chunksize = actual_count + remainder = 0 + + # we need the count set for this - can't chunk up unlimited items + # In serial mode we could do this by checking for empty input channels, + # but in dispatch mode its impossible ( == not easily possible ) + # Only try it if we have enough demand + if task.max_chunksize and actual_count > task.max_chunksize: + numchunks = actual_count / task.max_chunksize + chunksize = task.max_chunksize + remainder = actual_count - (numchunks * chunksize) + # END handle chunking + + # the following loops are kind of unrolled - code duplication + # should make things execute faster. Putting the if statements + # into the loop would be less code, but ... slower + # DEBUG + # print actual_count, numchunks, chunksize, remainder, task._out_wc.size() + if self._workers: + # respect the chunk size, and split the task up if we want + # to process too much. This can be defined per task + queue = self._queue + if numchunks > 1: + for i in xrange(numchunks): queue.put((task.process, chunksize)) - # END try efficient looping - - if remainder: - queue.put((task.process, remainder)) - # END handle chunksize + # END for each chunk to put else: - # no workers, so we have to do the work ourselves - if numchunks > 1: - for i in xrange(numchunks): - task.process(chunksize) - # END for each chunk to put - else: + queue.put((task.process, chunksize)) + # END try efficient looping + + if remainder: + queue.put((task.process, remainder)) + # END handle chunksize + else: + # no workers, so we have to do the work ourselves + if numchunks > 1: + for i in xrange(numchunks): task.process(chunksize) - # END try efficient looping - - if remainder: - task.process(remainder) - # END handle chunksize - # END handle serial mode - # END handle queuing + # END for each chunk to put + else: + task.process(chunksize) + # END try efficient looping + + if remainder: + task.process(remainder) + # END handle chunksize + # END handle serial mode # END for each task to process @@ -297,11 +407,22 @@ class Pool(object): otherwise the work will be distributed among the given amount of threads :note: currently NOT threadsafe !""" + assert size > -1, "Size cannot be negative" + # either start new threads, or kill existing ones. # If we end up with no threads, we process the remaining chunks on the queue # ourselves cur_count = len(self._workers) if cur_count < size: + # make sure we have a real queue, and can store our consumed tasks properly + if not isinstance(self._queue, self.TaskQueueCls): + if self._queue is not None and not self._queue.empty(): + raise AssertionError("Expected empty queue when switching the queue type") + # END safety check + self._queue = self.TaskQueueCls() + self._consumed_tasks = Queue() + # END init queue + for i in range(size - cur_count): worker = self.WorkerCls(self._queue) worker.start() @@ -323,6 +444,16 @@ class Pool(object): except Queue.Empty: continue # END while there are tasks on the queue + + # use a serial queue, its faster + if not isinstance(self._queue, SyncQueue): + self._queue = SyncQueue() + # END handle queue type + + if self._consumed_tasks and not self._consumed_tasks.empty(): + self._post_channel_read(self._consumed_tasks.pop()) + # END assure consumed tasks are empty + self._consumed_tasks = SyncQueue() # END process queue return self @@ -403,4 +534,4 @@ class ThreadPool(Pool): """A pool using threads as worker""" WorkerCls = WorkerThread LockCls = Lock - TaskQueueCls = Queue + TaskQueueCls = PerfQueue -- cgit v1.2.3 From 583cd8807259a69fc01874b798f657c1f9ab7828 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Mon, 7 Jun 2010 19:12:44 +0200 Subject: Moved pool utilities into util module, fixed critical issue that caused havok - lets call this a safe-state --- lib/git/async/pool.py | 154 ++++++++++++-------------------------------------- 1 file changed, 36 insertions(+), 118 deletions(-) (limited to 'lib/git/async/pool.py') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 30291835..227cabfc 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -1,121 +1,28 @@ """Implementation of a thread-pool working with channels""" from thread import WorkerThread +from threading import Lock -from threading import ( - Lock, - _Condition, - _sleep, - _time, +from util import ( + SyncQueue, + AsyncQueue, ) from task import InputChannelTask -from Queue import Queue, Empty -from collections import deque - -from graph import ( - Graph, +from Queue import ( + Queue, + Empty ) +from graph import Graph from channel import ( Channel, WChannel, RChannel ) -import weakref import sys -#{ Utilities - -class SyncQueue(deque): - """Adapter to allow using a deque like a queue, without locking""" - def get(self, block=True, timeout=None): - try: - return self.pop() - except IndexError: - raise Empty - # END raise empty - - def empty(self): - return len(self) == 0 - - put = deque.append - - -class HSCondition(_Condition): - """An attempt to make conditions less blocking, which gains performance - in return by sleeping less""" - delay = 0.00002 # reduces wait times, but increases overhead - - def wait(self, timeout=None): - waiter = Lock() - waiter.acquire() - self.__dict__['_Condition__waiters'].append(waiter) - saved_state = self._release_save() - try: # restore state no matter what (e.g., KeyboardInterrupt) - if timeout is None: - waiter.acquire() - else: - # Balancing act: We can't afford a pure busy loop, so we - # have to sleep; but if we sleep the whole timeout time, - # we'll be unresponsive. The scheme here sleeps very - # little at first, longer as time goes on, but never longer - # than 20 times per second (or the timeout time remaining). - endtime = _time() + timeout - delay = self.delay - acquire = waiter.acquire - while True: - gotit = acquire(0) - if gotit: - break - remaining = endtime - _time() - if remaining <= 0: - break - delay = min(delay * 2, remaining, .05) - _sleep(delay) - # END endless loop - if not gotit: - try: - self.__dict__['_Condition__waiters'].remove(waiter) - except ValueError: - pass - # END didn't ever get it - finally: - self._acquire_restore(saved_state) - - def notify(self, n=1): - __waiters = self.__dict__['_Condition__waiters'] - if not __waiters: - return - if n == 1: - __waiters[0].release() - try: - __waiters.pop(0) - except IndexError: - pass - else: - waiters = __waiters[:n] - for waiter in waiters: - waiter.release() - try: - __waiters.remove(waiter) - except ValueError: - pass - # END handle n = 1 case faster - -class PerfQueue(Queue): - """A queue using different condition objects to gain multithreading performance""" - def __init__(self, maxsize=0): - Queue.__init__(self, maxsize) - - self.not_empty = HSCondition(self.mutex) - self.not_full = HSCondition(self.mutex) - self.all_tasks_done = HSCondition(self.mutex) - - -#} END utilities - class RPoolChannel(RChannel): """ A read-only pool channel may not be wrapped or derived from, but it provides slots to call before and after an item is to be read. @@ -237,7 +144,7 @@ class Pool(object): self._tasks = Graph() self._consumed_tasks = None self._workers = list() - self._queue = SyncQueue() # start with a sync queue + self._queue = self.TaskQueueCls() self._taskgraph_lock = self.LockCls() self._taskorder_cache = dict() self.set_size(size) @@ -375,7 +282,10 @@ class Pool(object): self._consumed_tasks.put(task) # END handle consumption - # delete consumed tasks to cleanup + self._handle_consumed_tasks() + + def _handle_consumed_tasks(self): + """Remove all consumed tasks from our queue by deleting them""" try: while True: ct = self._consumed_tasks.get(False) @@ -384,7 +294,7 @@ class Pool(object): except Empty: pass # END pop queue empty - + def _del_task_if_orphaned(self, task): """Check the task, and delete it if it is orphaned""" if sys.getrefcount(task._out_wc) < 3: @@ -415,11 +325,7 @@ class Pool(object): cur_count = len(self._workers) if cur_count < size: # make sure we have a real queue, and can store our consumed tasks properly - if not isinstance(self._queue, self.TaskQueueCls): - if self._queue is not None and not self._queue.empty(): - raise AssertionError("Expected empty queue when switching the queue type") - # END safety check - self._queue = self.TaskQueueCls() + if not isinstance(self._consumed_tasks, self.TaskQueueCls): self._consumed_tasks = Queue() # END init queue @@ -445,13 +351,8 @@ class Pool(object): continue # END while there are tasks on the queue - # use a serial queue, its faster - if not isinstance(self._queue, SyncQueue): - self._queue = SyncQueue() - # END handle queue type - if self._consumed_tasks and not self._consumed_tasks.empty(): - self._post_channel_read(self._consumed_tasks.pop()) + self._handle_consumed_tasks() # END assure consumed tasks are empty self._consumed_tasks = SyncQueue() # END process queue @@ -467,6 +368,8 @@ class Pool(object): output channel is only held by themselves, so no one will ever consume its items. + This method blocks until all tasks to be removed have been processed, if + they are currently being processed. :return: self""" # now delete our actual node - must set it done os it closes its channels. # Otherwise further reads of output tasks will block. @@ -478,6 +381,21 @@ class Pool(object): self._taskgraph_lock.acquire() try: self._taskorder_cache.clear() + # before we can delete the task, make sure its write channel + # is closed, otherwise people might still be waiting for its result. + # If a channel is not closed, this could also mean its not yet fully + # processed, but more importantly, there must be no task being processed + # right now. + # TODO: figure this out + for worker in self._workers: + r = worker.routine() + if r and r.im_self is task: + raise NotImplementedError("todo") + # END handle running task + # END check for in-progress routine + + # its done, close the channel for writing + task.close() self._tasks.del_node(task) finally: self._taskgraph_lock.release() @@ -497,11 +415,11 @@ class Pool(object): # create a write channel for it wc, rc = Channel() rc = RPoolChannel(wc, task, self) - task._out_wc = wc + task.set_wc(wc) has_input_channel = isinstance(task, InputChannelTask) if has_input_channel: - task._pool_ref = weakref.ref(self) + task.set_pool(self) # END init input channel task self._taskgraph_lock.acquire() @@ -534,4 +452,4 @@ class ThreadPool(Pool): """A pool using threads as worker""" WorkerCls = WorkerThread LockCls = Lock - TaskQueueCls = PerfQueue + TaskQueueCls = AsyncQueue -- cgit v1.2.3 From 654e54d200135e665e07e9f0097d913a77f169da Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Mon, 7 Jun 2010 20:01:02 +0200 Subject: task: Fixed incorrect handling of channel closure. Performance is alright for up to 2 threads, but 4 are killing the queue --- lib/git/async/pool.py | 40 ++++++++++++++++++---------------------- 1 file changed, 18 insertions(+), 22 deletions(-) (limited to 'lib/git/async/pool.py') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 227cabfc..3de98777 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -21,6 +21,7 @@ from channel import ( ) import sys +from time import sleep class RPoolChannel(RChannel): @@ -371,32 +372,27 @@ class Pool(object): This method blocks until all tasks to be removed have been processed, if they are currently being processed. :return: self""" - # now delete our actual node - must set it done os it closes its channels. - # Otherwise further reads of output tasks will block. - # Actually they may still block if anyone wants to read all ... without - # a timeout - # keep its input nodes as we check whether they were orphaned - in_tasks = task.in_nodes - task.set_done() self._taskgraph_lock.acquire() try: - self._taskorder_cache.clear() - # before we can delete the task, make sure its write channel - # is closed, otherwise people might still be waiting for its result. - # If a channel is not closed, this could also mean its not yet fully - # processed, but more importantly, there must be no task being processed - # right now. - # TODO: figure this out - for worker in self._workers: - r = worker.routine() - if r and r.im_self is task: - raise NotImplementedError("todo") - # END handle running task - # END check for in-progress routine + # it can be that the task is already deleted, but its chunk was on the + # queue until now, so its marked consumed again + if not task in self._tasks.nodes: + return self + # END early abort + + # the task we are currently deleting could also be processed by + # a thread right now. We don't care about it as its taking care about + # its write channel itself, and sends everything it can to it. + # For it it doesn't matter that its not part of our task graph anymore. + + # now delete our actual node - be sure its done to prevent further + # processing in case there are still client reads on their way. + task.set_done() - # its done, close the channel for writing - task.close() + # keep its input nodes as we check whether they were orphaned + in_tasks = task.in_nodes self._tasks.del_node(task) + self._taskorder_cache.clear() finally: self._taskgraph_lock.release() # END locked deletion -- cgit v1.2.3 From be06e87433685b5ea9cfcc131ab89c56cf8292f2 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Mon, 7 Jun 2010 22:00:47 +0200 Subject: improved testing to test the actual async handling of the pool. there are still inconsistencies that need to be fixed, but it already improved, especially the 4-thread performance which now is as fast as the dual-threaded performance --- lib/git/async/pool.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) (limited to 'lib/git/async/pool.py') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 3de98777..19fc9f6e 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -67,10 +67,20 @@ class RPoolChannel(RChannel): # if we have count items, don't do any queue preparation - if someone # depletes the queue in the meanwhile, the channel will close and # we will unblock naturally + # PROBLEM: If there are multiple consumer of this channel, we might + # run out of items without being replenished == block forever in the + # worst case. task.min_count could have triggered to produce more ... + # usually per read with n items, we put n items on to the queue, + # so we wouldn't check this + # Even if we have just one consumer ( we could determine that with + # the reference count ), it could be that in one moment we don't yet + # have an item, but its currently being produced by some worker. + # This is why we: + # * make no assumptions if there are multiple consumers + # * have_enough = False if count > 0: - # explicitly > count, as we want a certain safe range - have_enough = self._wc._queue.qsize() > count + have_enough = self._wc._queue.qsize() >= count # END risky game ########## prepare ############################## @@ -78,9 +88,11 @@ class RPoolChannel(RChannel): self._pool._prepare_channel_read(self._task, count) - ######### read data ###### + ####### read data ######## + ########################## # read actual items, tasks were setup to put their output into our channel ( as well ) items = RChannel.read(self, count, block, timeout) + ########################## if self._post_cb: items = self._post_cb(items) -- cgit v1.2.3 From def0f73989047c4ddf9b11da05ad2c9c8e387331 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Mon, 7 Jun 2010 23:20:37 +0200 Subject: introduced a new counter keeping track of the scheduled tasks - this prevent unnecessary tasks to be scheduled as we keep track of how many items will be produced for the task at hand. This introduces additional locking, but performns well in multithreaded mode. Performance of the master queue is still a huge issue, its currently the limiting factor, as bypassing the master queue in serial moode gives 15x performance, wich is what I would need --- lib/git/async/pool.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) (limited to 'lib/git/async/pool.py') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 19fc9f6e..4c97feb0 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -80,12 +80,13 @@ class RPoolChannel(RChannel): # * have_enough = False if count > 0: - have_enough = self._wc._queue.qsize() >= count - # END risky game + have_enough = self._task.scheduled_item_count() >= count or self._wc._queue.qsize() >= count + # END ########## prepare ############################## if not have_enough: self._pool._prepare_channel_read(self._task, count) + # END prepare pool scheduling ####### read data ######## @@ -260,26 +261,33 @@ class Pool(object): queue = self._queue if numchunks > 1: for i in xrange(numchunks): + # schedule them as early as we know about them + task.add_scheduled_items(chunksize) queue.put((task.process, chunksize)) # END for each chunk to put else: + task.add_scheduled_items(chunksize) queue.put((task.process, chunksize)) # END try efficient looping if remainder: + task.add_scheduled_items(remainder) queue.put((task.process, remainder)) # END handle chunksize else: # no workers, so we have to do the work ourselves if numchunks > 1: for i in xrange(numchunks): + task.add_scheduled_items(chunksize) task.process(chunksize) # END for each chunk to put else: + task.add_scheduled_items(chunksize) task.process(chunksize) # END try efficient looping if remainder: + task.add_scheduled_items(remainder) task.process(remainder) # END handle chunksize # END handle serial mode @@ -348,6 +356,9 @@ class Pool(object): self._workers.append(worker) # END for each new worker to create elif cur_count > size: + # we can safely increase the size, even from serial mode, as we would + # only be able to do this if the serial ( sync ) mode finished processing. + # Just adding more workers is not a problem at all. del_count = cur_count - size for i in range(del_count): self._workers[i].stop_and_join() -- cgit v1.2.3 From 898d47d1711accdfded8ee470520fdb96fb12d46 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Mon, 7 Jun 2010 23:47:06 +0200 Subject: Task scheduled items lock now uses a dummy lock in serial mode, improving its performance considerably. Channels now use the AsyncQueue, boosting their throughput to about 5k items / s - this is something one can work with, considering the runtime of each item should be large enough to keep the threads busy. This could be a basis, further testing needed --- lib/git/async/pool.py | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'lib/git/async/pool.py') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 4c97feb0..d6b5711d 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -5,6 +5,7 @@ from threading import Lock from util import ( SyncQueue, AsyncQueue, + DummyLock ) from task import InputChannelTask @@ -462,6 +463,11 @@ class Pool(object): # END add task relation # END handle input channels for connections + # fix locks - in serial mode, the task does not need real locks + if self.size() == 0: + task._slock = DummyLock() + # END improve locks + return rc #} END interface -- cgit v1.2.3 From 619c11787742ce00a0ee8f841cec075897873c79 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 16:47:48 +0200 Subject: Its getting better already - intermediate commit before further chaning the task class --- lib/git/async/pool.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) (limited to 'lib/git/async/pool.py') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index d6b5711d..cf1c2199 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -80,8 +80,8 @@ class RPoolChannel(RChannel): # * make no assumptions if there are multiple consumers # * have_enough = False - if count > 0: - have_enough = self._task.scheduled_item_count() >= count or self._wc._queue.qsize() >= count + #if count > 0: + # have_enough = self._task.scheduled_item_count() >= count or self._wc._queue.qsize() >= count # END ########## prepare ############################## @@ -319,6 +319,7 @@ class Pool(object): def _del_task_if_orphaned(self, task): """Check the task, and delete it if it is orphaned""" + # 1 as its stored on the task, 1 for the getrefcount call if sys.getrefcount(task._out_wc) < 3: self.del_task(task) #} END internal @@ -403,7 +404,7 @@ class Pool(object): if not task in self._tasks.nodes: return self # END early abort - + print "deleting ", id(task) # the task we are currently deleting could also be processed by # a thread right now. We don't care about it as its taking care about # its write channel itself, and sends everything it can to it. -- cgit v1.2.3 From 13dd59ba5b3228820841682b59bad6c22476ff66 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 17:25:43 +0200 Subject: task: now deletes itself once its done - for the test this doesn't change a thing as the task deletes itself too late - its time for a paradigm change, the task should be deleted with its RPoolChannel or explicitly by the user. The test needs to adapt, and shouldn't assume anything unless the RPoolChannel is gone --- lib/git/async/pool.py | 57 ++++++++++++++------------------------------------- 1 file changed, 15 insertions(+), 42 deletions(-) (limited to 'lib/git/async/pool.py') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index cf1c2199..fce5e424 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -78,14 +78,17 @@ class RPoolChannel(RChannel): # have an item, but its currently being produced by some worker. # This is why we: # * make no assumptions if there are multiple consumers - # * - have_enough = False + # * + + # if the user tries to use us to read from a done task, we will never + # compute as all produced items are already in the channel + skip_compute = self._task.is_done() or self._task.error() #if count > 0: - # have_enough = self._task.scheduled_item_count() >= count or self._wc._queue.qsize() >= count + # skip_compute = self._task.scheduled_item_count() >= count or self._wc._queue.qsize() >= count # END ########## prepare ############################## - if not have_enough: + if not skip_compute: self._pool._prepare_channel_read(self._task, count) # END prepare pool scheduling @@ -134,7 +137,6 @@ class Pool(object): used only from the main thread, hence you cannot consume their results from multiple threads unless you use a task for it.""" __slots__ = ( '_tasks', # a graph of tasks - '_consumed_tasks', # a queue with tasks that are done or had an error '_workers', # list of worker threads '_queue', # master queue for tasks '_taskorder_cache', # map task id -> ordered dependent tasks @@ -157,7 +159,6 @@ class Pool(object): def __init__(self, size=0): self._tasks = Graph() - self._consumed_tasks = None self._workers = list() self._queue = self.TaskQueueCls() self._taskgraph_lock = self.LockCls() @@ -224,8 +225,10 @@ class Pool(object): # requested one last for task in dfirst_tasks: if task.error() or task.is_done(): - self._consumed_tasks.put(task) - continue + # in theory, the should never be consumed task in the pool, right ? + # They delete themselves once they are done. + raise AssertionError("Shouldn't have consumed tasks on the pool, they delete themeselves, what happend ?") + #continue # END skip processing # if the task does not have the required output on its queue, schedule @@ -297,26 +300,8 @@ class Pool(object): def _post_channel_read(self, task): """Called after we processed a read to cleanup""" - # check whether we consumed the task, and schedule it for deletion - # This could have happend after the read returned ( even though the pre-read - # checks it as well ) - if task.error() or task.is_done(): - self._consumed_tasks.put(task) - # END handle consumption - - self._handle_consumed_tasks() - - def _handle_consumed_tasks(self): - """Remove all consumed tasks from our queue by deleting them""" - try: - while True: - ct = self._consumed_tasks.get(False) - self.del_task(ct) - # END for each task to delete - except Empty: - pass - # END pop queue empty - + pass + def _del_task_if_orphaned(self, task): """Check the task, and delete it if it is orphaned""" # 1 as its stored on the task, 1 for the getrefcount call @@ -347,11 +332,6 @@ class Pool(object): # ourselves cur_count = len(self._workers) if cur_count < size: - # make sure we have a real queue, and can store our consumed tasks properly - if not isinstance(self._consumed_tasks, self.TaskQueueCls): - self._consumed_tasks = Queue() - # END init queue - for i in range(size - cur_count): worker = self.WorkerCls(self._queue) worker.start() @@ -377,9 +357,6 @@ class Pool(object): continue # END while there are tasks on the queue - if self._consumed_tasks and not self._consumed_tasks.empty(): - self._handle_consumed_tasks() - # END assure consumed tasks are empty self._consumed_tasks = SyncQueue() # END process queue return self @@ -437,11 +414,7 @@ class Pool(object): wc, rc = Channel() rc = RPoolChannel(wc, task, self) task.set_wc(wc) - - has_input_channel = isinstance(task, InputChannelTask) - if has_input_channel: - task.set_pool(self) - # END init input channel task + task.set_pool(self) self._taskgraph_lock.acquire() try: @@ -452,7 +425,7 @@ class Pool(object): # END sync task addition # If the input channel is one of our read channels, we add the relation - if has_input_channel: + if isinstance(task, InputChannelTask): ic = task.in_rc if isinstance(ic, RPoolChannel) and ic._pool is self: self._taskgraph_lock.acquire() -- cgit v1.2.3 From e5c0002d069382db1768349bf0c5ff40aafbf140 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 18:20:12 +0200 Subject: Revised task deletion works well, adjusted test to be creating new tasks all the time instead of reusing its own one, it was somewhat hard to manage its state over time and could cause bugs. It works okay, but it occasionally hangs, it appears to be an empty queue, have to gradually put certain things back in, although in the current mode of operation, it should never have empty queues from the pool to the user --- lib/git/async/pool.py | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) (limited to 'lib/git/async/pool.py') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index fce5e424..a915f7b0 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -41,8 +41,18 @@ class RPoolChannel(RChannel): def __del__(self): """Assures that our task will be deleted if we were the last reader""" - del(self._wc) # decrement ref-count - self._pool._del_task_if_orphaned(self._task) + del(self._wc) # decrement ref-count early + # now, if this is the last reader to the wc we just handled, there + # is no way anyone will ever read from the task again. If so, + # delete the task in question, it will take care of itself and orphans + # it might leave + # 1 is ourselves, + 1 for the call + 1, and 3 magical ones which + # I can't explain, but appears to be normal in the destructor + # On the caller side, getrefcount returns 2, as expected + if sys.getrefcount(self) < 6: + print "__del__" + self._pool.del_task(self._task) + print "done" def set_pre_cb(self, fun = lambda count: None): """Install a callback to call with the item count to be read before any @@ -105,7 +115,7 @@ class RPoolChannel(RChannel): ####### Finalize ######## self._pool._post_channel_read(self._task) - + return items #{ Internal @@ -227,6 +237,7 @@ class Pool(object): if task.error() or task.is_done(): # in theory, the should never be consumed task in the pool, right ? # They delete themselves once they are done. + # TODO: remove this check for performance later raise AssertionError("Shouldn't have consumed tasks on the pool, they delete themeselves, what happend ?") #continue # END skip processing @@ -363,7 +374,11 @@ class Pool(object): def num_tasks(self): """:return: amount of tasks""" - return len(self._tasks.nodes) + self._taskgraph_lock.acquire() + try: + return len(self._tasks.nodes) + finally: + self._taskgraph_lock.release() def del_task(self, task): """Delete the task @@ -374,6 +389,7 @@ class Pool(object): This method blocks until all tasks to be removed have been processed, if they are currently being processed. :return: self""" + print "del_task: getting lock" self._taskgraph_lock.acquire() try: # it can be that the task is already deleted, but its chunk was on the @@ -414,7 +430,6 @@ class Pool(object): wc, rc = Channel() rc = RPoolChannel(wc, task, self) task.set_wc(wc) - task.set_pool(self) self._taskgraph_lock.acquire() try: -- cgit v1.2.3 From 772b95631916223e472989b43f3a31f61e237f31 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 19:25:33 +0200 Subject: workerthread: adjusted to use a blocking queue, it will receive termination events only with its queue, with boosts performance into brigt green levels --- lib/git/async/pool.py | 56 ++++++++++++++++++++++++++++++--------------------- 1 file changed, 33 insertions(+), 23 deletions(-) (limited to 'lib/git/async/pool.py') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index a915f7b0..1767c61c 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -1,5 +1,8 @@ """Implementation of a thread-pool working with channels""" -from thread import WorkerThread +from thread import ( + WorkerThread, + StopProcessing, + ) from threading import Lock from util import ( @@ -147,7 +150,7 @@ class Pool(object): used only from the main thread, hence you cannot consume their results from multiple threads unless you use a task for it.""" __slots__ = ( '_tasks', # a graph of tasks - '_workers', # list of worker threads + '_num_workers', # list of workers '_queue', # master queue for tasks '_taskorder_cache', # map task id -> ordered dependent tasks '_taskgraph_lock', # lock for accessing the task graph @@ -169,7 +172,7 @@ class Pool(object): def __init__(self, size=0): self._tasks = Graph() - self._workers = list() + self._num_workers = 0 self._queue = self.TaskQueueCls() self._taskgraph_lock = self.LockCls() self._taskorder_cache = dict() @@ -270,7 +273,7 @@ class Pool(object): # into the loop would be less code, but ... slower # DEBUG # print actual_count, numchunks, chunksize, remainder, task._out_wc.size() - if self._workers: + if self._num_workers: # respect the chunk size, and split the task up if we want # to process too much. This can be defined per task queue = self._queue @@ -323,7 +326,7 @@ class Pool(object): #{ Interface def size(self): """:return: amount of workers in the pool""" - return len(self._workers) + return self._num_workers def set_size(self, size=0): """Set the amount of workers to use in this pool. When reducing the size, @@ -341,34 +344,41 @@ class Pool(object): # either start new threads, or kill existing ones. # If we end up with no threads, we process the remaining chunks on the queue # ourselves - cur_count = len(self._workers) + cur_count = self._num_workers if cur_count < size: - for i in range(size - cur_count): - worker = self.WorkerCls(self._queue) - worker.start() - self._workers.append(worker) - # END for each new worker to create - elif cur_count > size: # we can safely increase the size, even from serial mode, as we would # only be able to do this if the serial ( sync ) mode finished processing. # Just adding more workers is not a problem at all. + add_count = size - cur_count + for i in range(add_count): + print "Add worker" + self.WorkerCls(self._queue).start() + # END for each new worker to create + self._num_workers += add_count + elif cur_count > size: + # We don't care which thread exactly gets hit by our stop request + # On their way, they will consume remaining tasks, but new ones + # could be added as we speak. del_count = cur_count - size for i in range(del_count): - self._workers[i].stop_and_join() + print "stop worker" + self._queue.put((self.WorkerCls.stop, True)) # arg doesnt matter # END for each thread to stop - del(self._workers[:del_count]) + self._num_workers -= del_count # END handle count if size == 0: - while not self._queue.empty(): - try: - taskmethod, count = self._queue.get(False) - taskmethod(count) - except Queue.Empty: - continue - # END while there are tasks on the queue - - self._consumed_tasks = SyncQueue() + # NOTE: we do not preocess any tasks still on the queue, as we ill + # naturally do that once we read the next time, only on the tasks + # that are actually required. The queue will keep the tasks, + # and once we are deleted, they will vanish without additional + # time spend on them. If there shouldn't be any consumers anyway. + # If we should reenable some workers again, they will continue on the + # remaining tasks, probably with nothing to do. + # We can't clear the task queue if we have removed workers + # as they will receive the termination signal through it, and if + # we had added workers, we wouldn't be here ;). + pass # END process queue return self -- cgit v1.2.3 From 15941ca090a2c3c987324fc911bbc6f89e941c47 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 9 Jun 2010 10:34:12 +0200 Subject: queue: fixed critical bug in the notify method, as it was not at all thread-safe, causing locks to be released multiple times. Now it runs very fast, and very stable apparently. Now its about putting previous features back in, and studying their results, before more complex task graphs can be examined --- lib/git/async/pool.py | 30 ++++++++++++++---------------- 1 file changed, 14 insertions(+), 16 deletions(-) (limited to 'lib/git/async/pool.py') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 1767c61c..7bddf7da 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -53,9 +53,8 @@ class RPoolChannel(RChannel): # I can't explain, but appears to be normal in the destructor # On the caller side, getrefcount returns 2, as expected if sys.getrefcount(self) < 6: - print "__del__" - self._pool.del_task(self._task) - print "done" + self._pool.remove_task(self._task) + # END handle refcount based removal of task def set_pre_cb(self, fun = lambda count: None): """Install a callback to call with the item count to be read before any @@ -237,12 +236,14 @@ class Pool(object): # the list includes our tasks - the first one to evaluate first, the # requested one last for task in dfirst_tasks: - if task.error() or task.is_done(): + # if task.error() or task.is_done(): # in theory, the should never be consumed task in the pool, right ? - # They delete themselves once they are done. - # TODO: remove this check for performance later - raise AssertionError("Shouldn't have consumed tasks on the pool, they delete themeselves, what happend ?") - #continue + # They delete themselves once they are done. But as we run asynchronously, + # It can be that someone reads, while a task realizes its done, and + # we get here to prepare the read although it already is done. + # Its not a problem though, the task wiill not do anything. + # Hence we don't waste our time with checking for it + # raise AssertionError("Shouldn't have consumed tasks on the pool, they delete themeselves, what happend ?") # END skip processing # if the task does not have the required output on its queue, schedule @@ -316,11 +317,11 @@ class Pool(object): """Called after we processed a read to cleanup""" pass - def _del_task_if_orphaned(self, task): + def _remove_task_if_orphaned(self, task): """Check the task, and delete it if it is orphaned""" # 1 as its stored on the task, 1 for the getrefcount call if sys.getrefcount(task._out_wc) < 3: - self.del_task(task) + self.remove_task(task) #} END internal #{ Interface @@ -351,7 +352,6 @@ class Pool(object): # Just adding more workers is not a problem at all. add_count = size - cur_count for i in range(add_count): - print "Add worker" self.WorkerCls(self._queue).start() # END for each new worker to create self._num_workers += add_count @@ -361,7 +361,6 @@ class Pool(object): # could be added as we speak. del_count = cur_count - size for i in range(del_count): - print "stop worker" self._queue.put((self.WorkerCls.stop, True)) # arg doesnt matter # END for each thread to stop self._num_workers -= del_count @@ -390,7 +389,7 @@ class Pool(object): finally: self._taskgraph_lock.release() - def del_task(self, task): + def remove_task(self, task): """Delete the task Additionally we will remove orphaned tasks, which can be identified if their output channel is only held by themselves, so no one will ever consume @@ -399,7 +398,6 @@ class Pool(object): This method blocks until all tasks to be removed have been processed, if they are currently being processed. :return: self""" - print "del_task: getting lock" self._taskgraph_lock.acquire() try: # it can be that the task is already deleted, but its chunk was on the @@ -407,7 +405,7 @@ class Pool(object): if not task in self._tasks.nodes: return self # END early abort - print "deleting ", id(task) + # the task we are currently deleting could also be processed by # a thread right now. We don't care about it as its taking care about # its write channel itself, and sends everything it can to it. @@ -426,7 +424,7 @@ class Pool(object): # END locked deletion for t in in_tasks: - self._del_task_if_orphaned(t) + self._remove_task_if_orphaned(t) # END handle orphans recursively return self -- cgit v1.2.3 From f2c8d26d3b25b864ad48e6de018757266b59f708 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 9 Jun 2010 11:28:37 +0200 Subject: thread: fixed initialization problem if an empty iterable was handed in queue: Queue now derives from deque directly, which safes one dict lookup as the queue does not need to be accessed through self anymore pool test improved to better verify threads are started correctly --- lib/git/async/pool.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) (limited to 'lib/git/async/pool.py') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 7bddf7da..7ed6fd8e 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -331,9 +331,8 @@ class Pool(object): def set_size(self, size=0): """Set the amount of workers to use in this pool. When reducing the size, - the call may block as it waits for threads to finish. - When reducing the size to zero, this thread will process all remaining - items on the queue. + threads will continue with their work until they are done before effectively + being removed. :return: self :param size: if 0, the pool will do all work itself in the calling thread, -- cgit v1.2.3 From 4e6bece08aea01859a232e99a1e1ad8cc1eb7d36 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 9 Jun 2010 14:01:51 +0200 Subject: HSCondition: Fixed terrible bug which it inherited from its default python Condition implementation, related to the notify method not being treadsafe. Although I was aware of it, I missed the first check which tests for the size - the result could be incorrect if the whole method wasn't locked. Testing runs stable now, allowing to move on \! --- lib/git/async/pool.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'lib/git/async/pool.py') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 7ed6fd8e..66a2a105 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -58,14 +58,17 @@ class RPoolChannel(RChannel): def set_pre_cb(self, fun = lambda count: None): """Install a callback to call with the item count to be read before any - item is actually read from the channel. + item is actually read from the channel. The call must be threadsafe if + the channel is passed to more than one tasks. If it fails, the read will fail with an IOError If a function is not provided, the call is effectively uninstalled.""" self._pre_cb = fun def set_post_cb(self, fun = lambda item: item): """Install a callback to call after the items were read. The function - returns a possibly changed item list. If it raises, the exception will be propagated. + returns a possibly changed item list.The call must be threadsafe if + the channel is passed to more than one tasks. + If it raises, the exception will be propagated. If a function is not provided, the call is effectively uninstalled.""" self._post_cb = fun -- cgit v1.2.3 From 57a4e09294230a36cc874a6272c71757c48139f2 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 9 Jun 2010 15:29:47 +0200 Subject: Channel: removed pseudoconstructor, which clearly improves the design and makes it easier to constomize pool: in serial mode, created channels will be serial-only, which brings 15% of performance --- lib/git/async/pool.py | 36 +++++++++++++++++++++++------------- 1 file changed, 23 insertions(+), 13 deletions(-) (limited to 'lib/git/async/pool.py') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 66a2a105..549c801e 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -6,7 +6,6 @@ from thread import ( from threading import Lock from util import ( - SyncQueue, AsyncQueue, DummyLock ) @@ -19,8 +18,9 @@ from Queue import ( from graph import Graph from channel import ( - Channel, + mkchannel, WChannel, + SerialWChannel, RChannel ) @@ -329,7 +329,8 @@ class Pool(object): #{ Interface def size(self): - """:return: amount of workers in the pool""" + """:return: amount of workers in the pool + :note: method is not threadsafe !""" return self._num_workers def set_size(self, size=0): @@ -339,7 +340,9 @@ class Pool(object): :return: self :param size: if 0, the pool will do all work itself in the calling thread, - otherwise the work will be distributed among the given amount of threads + otherwise the work will be distributed among the given amount of threads. + If the size is 0, newly added tasks will use channels which are NOT + threadsafe to optimize item throughput. :note: currently NOT threadsafe !""" assert size > -1, "Size cannot be negative" @@ -437,17 +440,29 @@ class Pool(object): the task will be considered orphaned and will be deleted on the next occasion.""" # create a write channel for it - wc, rc = Channel() - rc = RPoolChannel(wc, task, self) - task.set_wc(wc) + wctype = WChannel self._taskgraph_lock.acquire() try: self._taskorder_cache.clear() self._tasks.add_node(task) + + # fix locks - in serial mode, the task does not need real locks + # Additionally, use a non-threadsafe queue + # This brings about 15% more performance, but sacrifices thread-safety + # when reading from multiple threads. + if self.size() == 0: + task._slock = DummyLock() + wctype = SerialWChannel + # END improve locks + + # setup the tasks channel + wc = wctype() + rc = RPoolChannel(wc, task, self) + task.set_wc(wc) finally: self._taskgraph_lock.release() - # END sync task addition + # END sync task addition # If the input channel is one of our read channels, we add the relation if isinstance(task, InputChannelTask): @@ -462,11 +477,6 @@ class Pool(object): # END add task relation # END handle input channels for connections - # fix locks - in serial mode, the task does not need real locks - if self.size() == 0: - task._slock = DummyLock() - # END improve locks - return rc #} END interface -- cgit v1.2.3 From 07996a1a1e53ffdd2680d4bfbc2f4059687859a5 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 9 Jun 2010 15:40:51 +0200 Subject: task: removed scheduled task support, which at some point was introduced to improve performance, but which now hinders performance, besides being unnecessary ;) --- lib/git/async/pool.py | 43 +++++++++++-------------------------------- 1 file changed, 11 insertions(+), 32 deletions(-) (limited to 'lib/git/async/pool.py') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 549c801e..284c41c7 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -80,27 +80,21 @@ class RPoolChannel(RChannel): self._pre_cb() # END pre callback - # if we have count items, don't do any queue preparation - if someone - # depletes the queue in the meanwhile, the channel will close and - # we will unblock naturally - # PROBLEM: If there are multiple consumer of this channel, we might - # run out of items without being replenished == block forever in the - # worst case. task.min_count could have triggered to produce more ... - # usually per read with n items, we put n items on to the queue, - # so we wouldn't check this - # Even if we have just one consumer ( we could determine that with - # the reference count ), it could be that in one moment we don't yet - # have an item, but its currently being produced by some worker. - # This is why we: - # * make no assumptions if there are multiple consumers - # * + # NOTE: we always queue the operation that would give us count items + # as tracking the scheduled items or testing the channels size + # is in herently unsafe depending on the design of the task network + # If we put on tasks onto the queue for every request, we are sure + # to always produce enough items, even if the task.min_count actually + # provided enough - its better to have some possibly empty task runs + # than having and empty queue that blocks. + + # NOTE: TODO: that case is only possible if one Task could be connected + # to multiple input channels in a manner known by the system. Currently + # this is not possible, but should be implemented at some point # if the user tries to use us to read from a done task, we will never # compute as all produced items are already in the channel skip_compute = self._task.is_done() or self._task.error() - #if count > 0: - # skip_compute = self._task.scheduled_item_count() >= count or self._wc._queue.qsize() >= count - # END ########## prepare ############################## if not skip_compute: @@ -249,13 +243,6 @@ class Pool(object): # raise AssertionError("Shouldn't have consumed tasks on the pool, they delete themeselves, what happend ?") # END skip processing - # if the task does not have the required output on its queue, schedule - # it for processing. If we should process all, we don't care about the - # amount as it should process until its all done. - #if count > 1 and task._out_wc.size() >= count: - # continue - # END skip if we have enough - # but use the actual count to produce the output, we may produce # more than requested numchunks = 1 @@ -283,33 +270,26 @@ class Pool(object): queue = self._queue if numchunks > 1: for i in xrange(numchunks): - # schedule them as early as we know about them - task.add_scheduled_items(chunksize) queue.put((task.process, chunksize)) # END for each chunk to put else: - task.add_scheduled_items(chunksize) queue.put((task.process, chunksize)) # END try efficient looping if remainder: - task.add_scheduled_items(remainder) queue.put((task.process, remainder)) # END handle chunksize else: # no workers, so we have to do the work ourselves if numchunks > 1: for i in xrange(numchunks): - task.add_scheduled_items(chunksize) task.process(chunksize) # END for each chunk to put else: - task.add_scheduled_items(chunksize) task.process(chunksize) # END try efficient looping if remainder: - task.add_scheduled_items(remainder) task.process(remainder) # END handle chunksize # END handle serial mode @@ -452,7 +432,6 @@ class Pool(object): # This brings about 15% more performance, but sacrifices thread-safety # when reading from multiple threads. if self.size() == 0: - task._slock = DummyLock() wctype = SerialWChannel # END improve locks -- cgit v1.2.3 From ea81f14dafbfb24d70373c74b5f8dabf3f2225d9 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 9 Jun 2010 16:38:21 +0200 Subject: Channel: Callbacks reviewed - they are now part of Subclasses of the default channel implementation, one of which is used as base by the Pool Read channel, releasing it of the duty to call these itself. The write channel with callback subclass allows the transformation of the item to be written --- lib/git/async/pool.py | 51 ++++++--------------------------------------------- 1 file changed, 6 insertions(+), 45 deletions(-) (limited to 'lib/git/async/pool.py') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 284c41c7..7d4e96d1 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -21,26 +21,24 @@ from channel import ( mkchannel, WChannel, SerialWChannel, - RChannel + CallbackRChannel ) import sys from time import sleep -class RPoolChannel(RChannel): +class RPoolChannel(CallbackRChannel): """ A read-only pool channel may not be wrapped or derived from, but it provides slots to call before and after an item is to be read. It acts like a handle to the underlying task in the pool.""" - __slots__ = ('_task', '_pool', '_pre_cb', '_post_cb') + __slots__ = ('_task', '_pool') def __init__(self, wchannel, task, pool): - RChannel.__init__(self, wchannel) + CallbackRChannel.__init__(self, wchannel) self._task = task self._pool = pool - self._pre_cb = None - self._post_cb = None def __del__(self): """Assures that our task will be deleted if we were the last reader""" @@ -56,30 +54,10 @@ class RPoolChannel(RChannel): self._pool.remove_task(self._task) # END handle refcount based removal of task - def set_pre_cb(self, fun = lambda count: None): - """Install a callback to call with the item count to be read before any - item is actually read from the channel. The call must be threadsafe if - the channel is passed to more than one tasks. - If it fails, the read will fail with an IOError - If a function is not provided, the call is effectively uninstalled.""" - self._pre_cb = fun - - def set_post_cb(self, fun = lambda item: item): - """Install a callback to call after the items were read. The function - returns a possibly changed item list.The call must be threadsafe if - the channel is passed to more than one tasks. - If it raises, the exception will be propagated. - If a function is not provided, the call is effectively uninstalled.""" - self._post_cb = fun - def read(self, count=0, block=True, timeout=None): """Read an item that was processed by one of our threads :note: Triggers task dependency handling needed to provide the necessary input""" - if self._pre_cb: - self._pre_cb() - # END pre callback - # NOTE: we always queue the operation that would give us count items # as tracking the scheduled items or testing the channels size # is in herently unsafe depending on the design of the task network @@ -90,7 +68,7 @@ class RPoolChannel(RChannel): # NOTE: TODO: that case is only possible if one Task could be connected # to multiple input channels in a manner known by the system. Currently - # this is not possible, but should be implemented at some point + # this is not possible, but should be implemented at some point. # if the user tries to use us to read from a done task, we will never # compute as all produced items are already in the channel @@ -105,25 +83,12 @@ class RPoolChannel(RChannel): ####### read data ######## ########################## # read actual items, tasks were setup to put their output into our channel ( as well ) - items = RChannel.read(self, count, block, timeout) + items = CallbackRChannel.read(self, count, block, timeout) ########################## - if self._post_cb: - items = self._post_cb(items) - - - ####### Finalize ######## - self._pool._post_channel_read(self._task) return items - #{ Internal - def _read(self, count=0, block=False, timeout=None): - """Calls the underlying channel's read directly, without triggering - the pool""" - return RChannel.read(self, count, block, timeout) - - #} END internal class Pool(object): @@ -296,10 +261,6 @@ class Pool(object): # END for each task to process - def _post_channel_read(self, task): - """Called after we processed a read to cleanup""" - pass - def _remove_task_if_orphaned(self, task): """Check the task, and delete it if it is orphaned""" # 1 as its stored on the task, 1 for the getrefcount call -- cgit v1.2.3 From 365fb14ced88a5571d3287ff1698582ceacd80d6 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 9 Jun 2010 16:59:17 +0200 Subject: task: redesigned write channel access to allow the task creator to set own write channels, possibly some with callbacks installed etc.. Pool.add_task will respect the users choice now, but provide defaults which are optimized for performance --- lib/git/async/pool.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) (limited to 'lib/git/async/pool.py') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 7d4e96d1..f7c1cfe0 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -388,18 +388,21 @@ class Pool(object): self._taskorder_cache.clear() self._tasks.add_node(task) - # fix locks - in serial mode, the task does not need real locks - # Additionally, use a non-threadsafe queue + # Use a non-threadsafe queue # This brings about 15% more performance, but sacrifices thread-safety # when reading from multiple threads. if self.size() == 0: wctype = SerialWChannel # END improve locks - # setup the tasks channel - wc = wctype() + # setup the tasks channel - respect the task creators choice though + # if it is set. + wc = task.wchannel() + if wc is None: + wc = wctype() + # END create write channel ifunset rc = RPoolChannel(wc, task, self) - task.set_wc(wc) + task.set_wchannel(wc) finally: self._taskgraph_lock.release() # END sync task addition -- cgit v1.2.3 From 257a8a9441fca9a9bc384f673ba86ef5c3f1715d Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 9 Jun 2010 21:19:54 +0200 Subject: test: prepared task dependency test, which already helped to find bug in the reference counting mechanism, causing references to the pool to be kepts via cycles --- lib/git/async/pool.py | 55 ++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 39 insertions(+), 16 deletions(-) (limited to 'lib/git/async/pool.py') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index f7c1cfe0..2ec18f1a 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -25,6 +25,7 @@ from channel import ( ) import sys +import weakref from time import sleep @@ -33,25 +34,37 @@ class RPoolChannel(CallbackRChannel): before and after an item is to be read. It acts like a handle to the underlying task in the pool.""" - __slots__ = ('_task', '_pool') + __slots__ = ('_task_ref', '_pool_ref') def __init__(self, wchannel, task, pool): CallbackRChannel.__init__(self, wchannel) - self._task = task - self._pool = pool + self._task_ref = weakref.ref(task) + self._pool_ref = weakref.ref(pool) def __del__(self): """Assures that our task will be deleted if we were the last reader""" - del(self._wc) # decrement ref-count early - # now, if this is the last reader to the wc we just handled, there + task = self._task_ref() + if task is None: + return + + pool = self._pool_ref() + if pool is None: + return + + # if this is the last reader to the wc we just handled, there # is no way anyone will ever read from the task again. If so, # delete the task in question, it will take care of itself and orphans # it might leave # 1 is ourselves, + 1 for the call + 1, and 3 magical ones which # I can't explain, but appears to be normal in the destructor # On the caller side, getrefcount returns 2, as expected + # When just calling remove_task, + # it has no way of knowing that the write channel is about to diminsh. + # which is why we pass the info as a private kwarg - not nice, but + # okay for now + # TODO: Fix this - private/public method if sys.getrefcount(self) < 6: - self._pool.remove_task(self._task) + pool.remove_task(task, _from_destructor_=True) # END handle refcount based removal of task def read(self, count=0, block=True, timeout=None): @@ -72,11 +85,16 @@ class RPoolChannel(CallbackRChannel): # if the user tries to use us to read from a done task, we will never # compute as all produced items are already in the channel - skip_compute = self._task.is_done() or self._task.error() + task = self._task_ref() + if task is None: + return list() + # END abort if task was deleted + + skip_compute = task.is_done() or task.error() ########## prepare ############################## if not skip_compute: - self._pool._prepare_channel_read(self._task, count) + self._pool_ref()._prepare_channel_read(task, count) # END prepare pool scheduling @@ -261,11 +279,16 @@ class Pool(object): # END for each task to process - def _remove_task_if_orphaned(self, task): + def _remove_task_if_orphaned(self, task, from_destructor): """Check the task, and delete it if it is orphaned""" # 1 as its stored on the task, 1 for the getrefcount call - if sys.getrefcount(task._out_wc) < 3: - self.remove_task(task) + # If we are getting here from the destructor of an RPool channel, + # its totally valid to virtually decrement the refcount by 1 as + # we can expect it to drop once the destructor completes, which is when + # we finish all recursive calls + max_ref_count = 3 + from_destructor + if sys.getrefcount(task.wchannel()) < max_ref_count: + self.remove_task(task, from_destructor) #} END internal #{ Interface @@ -335,7 +358,7 @@ class Pool(object): finally: self._taskgraph_lock.release() - def remove_task(self, task): + def remove_task(self, task, _from_destructor_=False): """Delete the task Additionally we will remove orphaned tasks, which can be identified if their output channel is only held by themselves, so no one will ever consume @@ -370,7 +393,7 @@ class Pool(object): # END locked deletion for t in in_tasks: - self._remove_task_if_orphaned(t) + self._remove_task_if_orphaned(t, _from_destructor_) # END handle orphans recursively return self @@ -409,11 +432,11 @@ class Pool(object): # If the input channel is one of our read channels, we add the relation if isinstance(task, InputChannelTask): - ic = task.in_rc - if isinstance(ic, RPoolChannel) and ic._pool is self: + ic = task.rchannel() + if isinstance(ic, RPoolChannel) and ic._pool_ref() is self: self._taskgraph_lock.acquire() try: - self._tasks.add_edge(ic._task, task) + self._tasks.add_edge(ic._task_ref(), task) finally: self._taskgraph_lock.release() # END handle edge-adding -- cgit v1.2.3 From 3323464f85b986cba23176271da92a478b33ab9c Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 10 Jun 2010 00:24:49 +0200 Subject: messy first version of a properly working depth-first graph method, which allows the pool to work as expected. Many more tests need to be added, and there still is a problem with shutdown as sometimes it won't kill all threads, mainly because the process came up with worker threads started, which cannot be --- lib/git/async/pool.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'lib/git/async/pool.py') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 2ec18f1a..5ebc3655 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -182,14 +182,13 @@ class Pool(object): dfirst_tasks = self._taskorder_cache[id(task)] except KeyError: # have to retrieve the list from the graph - dfirst_tasks = list() - self._tasks.visit_input_inclusive_depth_first(task, lambda n: dfirst_tasks.append(n)) + dfirst_tasks = self._tasks.input_inclusive_dfirst_reversed(task) self._taskorder_cache[id(task)] = dfirst_tasks # END handle cached order retrieval finally: self._taskgraph_lock.release() # END handle locking - + print dfirst_tasks # check the min count on all involved tasks, and be sure that we don't # have any task which produces less than the maximum min-count of all tasks # The actual_count is used when chunking tasks up for the queue, whereas @@ -309,6 +308,7 @@ class Pool(object): threadsafe to optimize item throughput. :note: currently NOT threadsafe !""" + print "set_size", size assert size > -1, "Size cannot be negative" # either start new threads, or kill existing ones. -- cgit v1.2.3 From cfb278d74ad01f3f1edf5e0ad113974a9555038d Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 10 Jun 2010 10:14:32 +0200 Subject: InputChannelTask now has interface for properly handling the reading from the same and different pools --- lib/git/async/pool.py | 43 +++++++++++++++++++++++++++++++++++++++---- 1 file changed, 39 insertions(+), 4 deletions(-) (limited to 'lib/git/async/pool.py') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 5ebc3655..1b3c2748 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -10,7 +10,6 @@ from util import ( DummyLock ) -from task import InputChannelTask from Queue import ( Queue, Empty @@ -66,6 +65,24 @@ class RPoolChannel(CallbackRChannel): if sys.getrefcount(self) < 6: pool.remove_task(task, _from_destructor_=True) # END handle refcount based removal of task + + #{ Internal + def _read(self, count=0, block=True, timeout=None): + """Direct read, bypassing the pool handling""" + return CallbackRChannel.read(self, count, block, timeout) + #} END internal + + #{ Interface + + def pool_ref(self): + """:return: reference to the pool we belong to""" + return self._pool_ref + + def task_ref(self): + """:return: reference to the task producing our items""" + return self._task_ref + + #} END interface def read(self, count=0, block=True, timeout=None): """Read an item that was processed by one of our threads @@ -188,7 +205,7 @@ class Pool(object): finally: self._taskgraph_lock.release() # END handle locking - print dfirst_tasks + # check the min count on all involved tasks, and be sure that we don't # have any task which produces less than the maximum min-count of all tasks # The actual_count is used when chunking tasks up for the queue, whereas @@ -406,6 +423,18 @@ class Pool(object): # create a write channel for it wctype = WChannel + # adjust the task with our pool ref, if it has the slot and is empty + # For now, we don't allow tasks to be used in multiple pools, except + # for by their channels + if hasattr(task, 'pool'): + their_pool = task.pool() + if their_pool is None: + task.set_pool(self) + elif their_pool is not self: + raise ValueError("Task %r is already registered to another pool" % task.id) + # END handle pool exclusivity + # END handle pool aware tasks + self._taskgraph_lock.acquire() try: self._taskorder_cache.clear() @@ -431,12 +460,18 @@ class Pool(object): # END sync task addition # If the input channel is one of our read channels, we add the relation - if isinstance(task, InputChannelTask): + if hasattr(task, 'rchannel'): ic = task.rchannel() - if isinstance(ic, RPoolChannel) and ic._pool_ref() is self: + if hasattr(ic, 'pool_ref') and ic.pool_ref()() is self: self._taskgraph_lock.acquire() try: self._tasks.add_edge(ic._task_ref(), task) + + # additionally, bypass ourselves when reading from the + # task, if possible + if hasattr(ic, '_read'): + task.set_read(ic._read) + # END handle read bypass finally: self._taskgraph_lock.release() # END handle edge-adding -- cgit v1.2.3 From 55e757928e493ce93056822d510482e4ffcaac2d Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 10 Jun 2010 14:39:57 +0200 Subject: channel: Changed design to be more logical - a channel now has any amount of readers and writers, a ready is not connected to its writer anymore. This changes the refcounting of course, which is why the auto-cleanup for the pool is currently broken. The benefit of this are faster writes to the channel, reading didn't improve, refcounts should be clearer now --- lib/git/async/pool.py | 76 +++++++++++++++++++++++++-------------------------- 1 file changed, 37 insertions(+), 39 deletions(-) (limited to 'lib/git/async/pool.py') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 1b3c2748..68551ea3 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -18,27 +18,28 @@ from Queue import ( from graph import Graph from channel import ( mkchannel, - WChannel, - SerialWChannel, - CallbackRChannel + Writer, + Channel, + SerialChannel, + CallbackReader ) import sys import weakref from time import sleep +import new -class RPoolChannel(CallbackRChannel): - """ A read-only pool channel may not be wrapped or derived from, but it provides slots to call - before and after an item is to be read. - +class PoolReader(CallbackReader): + """A reader designed to read from channels which take part in pools It acts like a handle to the underlying task in the pool.""" - __slots__ = ('_task_ref', '_pool_ref') + __slots__ = ('_task_ref', '_pool_ref', '_read') - def __init__(self, wchannel, task, pool): - CallbackRChannel.__init__(self, wchannel) + def __init__(self, channel, task, pool): + CallbackReader.__init__(self, channel) self._task_ref = weakref.ref(task) self._pool_ref = weakref.ref(pool) + self._read = new.instancemethod(CallbackReader.__dict__['read'], self, CallbackReader) def __del__(self): """Assures that our task will be deleted if we were the last reader""" @@ -63,15 +64,9 @@ class RPoolChannel(CallbackRChannel): # okay for now # TODO: Fix this - private/public method if sys.getrefcount(self) < 6: - pool.remove_task(task, _from_destructor_=True) + pool.remove_task(task) # END handle refcount based removal of task - #{ Internal - def _read(self, count=0, block=True, timeout=None): - """Direct read, bypassing the pool handling""" - return CallbackRChannel.read(self, count, block, timeout) - #} END internal - #{ Interface def pool_ref(self): @@ -118,7 +113,7 @@ class RPoolChannel(CallbackRChannel): ####### read data ######## ########################## # read actual items, tasks were setup to put their output into our channel ( as well ) - items = CallbackRChannel.read(self, count, block, timeout) + items = CallbackReader.read(self, count, block, timeout) ########################## @@ -262,21 +257,21 @@ class Pool(object): # should make things execute faster. Putting the if statements # into the loop would be less code, but ... slower # DEBUG - # print actual_count, numchunks, chunksize, remainder, task._out_wc.size() + # print actual_count, numchunks, chunksize, remainder, task._out_writer.size() if self._num_workers: # respect the chunk size, and split the task up if we want # to process too much. This can be defined per task - queue = self._queue + qput = self._queue if numchunks > 1: for i in xrange(numchunks): - queue.put((task.process, chunksize)) + qput((task.process, chunksize)) # END for each chunk to put else: - queue.put((task.process, chunksize)) + qput((task.process, chunksize)) # END try efficient looping if remainder: - queue.put((task.process, remainder)) + qput((task.process, remainder)) # END handle chunksize else: # no workers, so we have to do the work ourselves @@ -295,16 +290,16 @@ class Pool(object): # END for each task to process - def _remove_task_if_orphaned(self, task, from_destructor): + def _remove_task_if_orphaned(self, task): """Check the task, and delete it if it is orphaned""" - # 1 as its stored on the task, 1 for the getrefcount call + # 1 for writer on task, 1 for the getrefcount call + 1 for each other writer/reader # If we are getting here from the destructor of an RPool channel, # its totally valid to virtually decrement the refcount by 1 as # we can expect it to drop once the destructor completes, which is when # we finish all recursive calls - max_ref_count = 3 + from_destructor - if sys.getrefcount(task.wchannel()) < max_ref_count: - self.remove_task(task, from_destructor) + max_ref_count = 3 + if sys.getrefcount(task.writer().channel) < max_ref_count: + self.remove_task(task) #} END internal #{ Interface @@ -375,7 +370,7 @@ class Pool(object): finally: self._taskgraph_lock.release() - def remove_task(self, task, _from_destructor_=False): + def remove_task(self, task): """Delete the task Additionally we will remove orphaned tasks, which can be identified if their output channel is only held by themselves, so no one will ever consume @@ -410,7 +405,7 @@ class Pool(object): # END locked deletion for t in in_tasks: - self._remove_task_if_orphaned(t, _from_destructor_) + self._remove_task_if_orphaned(t) # END handle orphans recursively return self @@ -421,7 +416,7 @@ class Pool(object): the task will be considered orphaned and will be deleted on the next occasion.""" # create a write channel for it - wctype = WChannel + ctype = Channel # adjust the task with our pool ref, if it has the slot and is empty # For now, we don't allow tasks to be used in multiple pools, except @@ -442,26 +437,29 @@ class Pool(object): # Use a non-threadsafe queue # This brings about 15% more performance, but sacrifices thread-safety - # when reading from multiple threads. if self.size() == 0: - wctype = SerialWChannel + ctype = SerialChannel # END improve locks # setup the tasks channel - respect the task creators choice though # if it is set. - wc = task.wchannel() + wc = task.writer() + ch = None if wc is None: - wc = wctype() + ch = ctype() + wc = Writer(ch) + task.set_writer(wc) + else: + ch = wc.channel # END create write channel ifunset - rc = RPoolChannel(wc, task, self) - task.set_wchannel(wc) + rc = PoolReader(ch, task, self) finally: self._taskgraph_lock.release() # END sync task addition # If the input channel is one of our read channels, we add the relation - if hasattr(task, 'rchannel'): - ic = task.rchannel() + if hasattr(task, 'reader'): + ic = task.reader() if hasattr(ic, 'pool_ref') and ic.pool_ref()() is self: self._taskgraph_lock.acquire() try: -- cgit v1.2.3 From 7c36f3648e39ace752c67c71867693ce1eee52a3 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 10 Jun 2010 15:38:40 +0200 Subject: Now tracking the amount of concurrent writers to assure the channel is closed only when there is no one else writing to it. This assures that all tasks can continue working, and put their results accordingly. Shutdown is still not working correctly, but that should be solvable as well. Its still not perfect though ... --- lib/git/async/pool.py | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) (limited to 'lib/git/async/pool.py') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 68551ea3..3fd99c7b 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -33,13 +33,12 @@ import new class PoolReader(CallbackReader): """A reader designed to read from channels which take part in pools It acts like a handle to the underlying task in the pool.""" - __slots__ = ('_task_ref', '_pool_ref', '_read') + __slots__ = ('_task_ref', '_pool_ref') def __init__(self, channel, task, pool): CallbackReader.__init__(self, channel) self._task_ref = weakref.ref(task) self._pool_ref = weakref.ref(pool) - self._read = new.instancemethod(CallbackReader.__dict__['read'], self, CallbackReader) def __del__(self): """Assures that our task will be deleted if we were the last reader""" @@ -62,11 +61,16 @@ class PoolReader(CallbackReader): # it has no way of knowing that the write channel is about to diminsh. # which is why we pass the info as a private kwarg - not nice, but # okay for now - # TODO: Fix this - private/public method if sys.getrefcount(self) < 6: - pool.remove_task(task) + pool.remove_task(task, _from_destructor_ = True) # END handle refcount based removal of task + #{ Internal + def _read(self, count=0, block=True, timeout=None): + return CallbackReader.read(self, count, block, timeout) + + #} END internal + #{ Interface def pool_ref(self): @@ -261,7 +265,7 @@ class Pool(object): if self._num_workers: # respect the chunk size, and split the task up if we want # to process too much. This can be defined per task - qput = self._queue + qput = self._queue.put if numchunks > 1: for i in xrange(numchunks): qput((task.process, chunksize)) @@ -290,16 +294,16 @@ class Pool(object): # END for each task to process - def _remove_task_if_orphaned(self, task): + def _remove_task_if_orphaned(self, task, from_destructor): """Check the task, and delete it if it is orphaned""" # 1 for writer on task, 1 for the getrefcount call + 1 for each other writer/reader # If we are getting here from the destructor of an RPool channel, # its totally valid to virtually decrement the refcount by 1 as # we can expect it to drop once the destructor completes, which is when # we finish all recursive calls - max_ref_count = 3 + max_ref_count = 3 + from_destructor if sys.getrefcount(task.writer().channel) < max_ref_count: - self.remove_task(task) + self.remove_task(task, from_destructor) #} END internal #{ Interface @@ -370,7 +374,7 @@ class Pool(object): finally: self._taskgraph_lock.release() - def remove_task(self, task): + def remove_task(self, task, _from_destructor_ = False): """Delete the task Additionally we will remove orphaned tasks, which can be identified if their output channel is only held by themselves, so no one will ever consume @@ -405,7 +409,7 @@ class Pool(object): # END locked deletion for t in in_tasks: - self._remove_task_if_orphaned(t) + self._remove_task_if_orphaned(t, _from_destructor_) # END handle orphans recursively return self -- cgit v1.2.3 From fbe062bf6dacd3ad63dd827d898337fa542931ac Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 10 Jun 2010 23:55:50 +0200 Subject: Added dependency-task tests, and fixed plenty of ref-count related bugs, as well as concurrency issues. Now it works okay, but the thread-shutdown is still an issue, as it causes incorrect behaviour making the tests fail. Its good, as it hints at additional issues that need to be solved. There is just a little more left on the feature side, but its nearly there --- lib/git/async/pool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'lib/git/async/pool.py') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 3fd99c7b..0aad90ae 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -402,7 +402,7 @@ class Pool(object): # keep its input nodes as we check whether they were orphaned in_tasks = task.in_nodes - self._tasks.del_node(task) + self._tasks.remove_node(task) self._taskorder_cache.clear() finally: self._taskgraph_lock.release() -- cgit v1.2.3 From 6d1212e8c412b0b4802bc1080d38d54907db879d Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Fri, 11 Jun 2010 11:52:01 +0200 Subject: IMPORTANT: sometimes, when notifying waiters by releasing their lock, the lock is not actually released or they are not actually notifyied, staying in a beautysleep. This glitch is probably caused by some detail not treated correctly in the thread python module, which is something we cannot fix. It works most of the time as expected though - maybe some cleanup is not done correctly which causes this --- lib/git/async/pool.py | 1 - 1 file changed, 1 deletion(-) (limited to 'lib/git/async/pool.py') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 0aad90ae..dbc201a9 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -324,7 +324,6 @@ class Pool(object): threadsafe to optimize item throughput. :note: currently NOT threadsafe !""" - print "set_size", size assert size > -1, "Size cannot be negative" # either start new threads, or kill existing ones. -- cgit v1.2.3 From e14e3f143e7260de9581aee27e5a9b2645db72de Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Fri, 11 Jun 2010 16:42:09 +0200 Subject: Removed commented-out debug code and additional debug printings. Verified it works on py2.4, 2.5 and 2.6 --- lib/git/async/pool.py | 6 ------ 1 file changed, 6 deletions(-) (limited to 'lib/git/async/pool.py') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index dbc201a9..7ee3e8eb 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -95,10 +95,6 @@ class PoolReader(CallbackReader): # provided enough - its better to have some possibly empty task runs # than having and empty queue that blocks. - # NOTE: TODO: that case is only possible if one Task could be connected - # to multiple input channels in a manner known by the system. Currently - # this is not possible, but should be implemented at some point. - # if the user tries to use us to read from a done task, we will never # compute as all produced items are already in the channel task = self._task_ref() @@ -260,8 +256,6 @@ class Pool(object): # the following loops are kind of unrolled - code duplication # should make things execute faster. Putting the if statements # into the loop would be less code, but ... slower - # DEBUG - # print actual_count, numchunks, chunksize, remainder, task._out_writer.size() if self._num_workers: # respect the chunk size, and split the task up if we want # to process too much. This can be defined per task -- cgit v1.2.3 From cac6e06cc9ef2903a15e594186445f3baa989a1a Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Fri, 11 Jun 2010 16:58:44 +0200 Subject: test_task: fixed import error, made all modules from x import * safe --- lib/git/async/pool.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'lib/git/async/pool.py') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 7ee3e8eb..cf14e47b 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -30,6 +30,8 @@ from time import sleep import new +__all__ = ('PoolReader', 'Pool', 'ThreadPool') + class PoolReader(CallbackReader): """A reader designed to read from channels which take part in pools It acts like a handle to the underlying task in the pool.""" -- cgit v1.2.3 From be8955a0fbb77d673587974b763f17c214904b57 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sat, 12 Jun 2010 11:19:18 +0200 Subject: Cleaned up channel design, Reader and Writer bases don't require a channel anymore, but are abstract. Added IteratorReader, implementing the reader interface from an iterator. The implementation moved from the TaskIterator to the channel --- lib/git/async/pool.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) (limited to 'lib/git/async/pool.py') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index cf14e47b..8f33a029 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -18,10 +18,10 @@ from Queue import ( from graph import Graph from channel import ( mkchannel, - Writer, + ChannelWriter, Channel, SerialChannel, - CallbackReader + CallbackChannelReader ) import sys @@ -32,13 +32,14 @@ import new __all__ = ('PoolReader', 'Pool', 'ThreadPool') -class PoolReader(CallbackReader): + +class PoolReader(CallbackChannelReader): """A reader designed to read from channels which take part in pools It acts like a handle to the underlying task in the pool.""" __slots__ = ('_task_ref', '_pool_ref') def __init__(self, channel, task, pool): - CallbackReader.__init__(self, channel) + CallbackChannelReader.__init__(self, channel) self._task_ref = weakref.ref(task) self._pool_ref = weakref.ref(pool) @@ -69,7 +70,7 @@ class PoolReader(CallbackReader): #{ Internal def _read(self, count=0, block=True, timeout=None): - return CallbackReader.read(self, count, block, timeout) + return CallbackChannelReader.read(self, count, block, timeout) #} END internal @@ -115,7 +116,7 @@ class PoolReader(CallbackReader): ####### read data ######## ########################## # read actual items, tasks were setup to put their output into our channel ( as well ) - items = CallbackReader.read(self, count, block, timeout) + items = CallbackChannelReader.read(self, count, block, timeout) ########################## @@ -446,7 +447,7 @@ class Pool(object): ch = None if wc is None: ch = ctype() - wc = Writer(ch) + wc = ChannelWriter(ch) task.set_writer(wc) else: ch = wc.channel -- cgit v1.2.3