aboutsummaryrefslogtreecommitdiff
path: root/lib/git/async
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-08 13:24:44 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-08 13:24:44 +0200
commit3776f7a766851058f6435b9f606b16766425d7ca (patch)
tree8096d6d84090f4abc5aad374c1fe6f64088572a6 /lib/git/async
parent09c3f39ceb545e1198ad7a3f470d4ec896ce1add (diff)
downloadGitPython-3776f7a766851058f6435b9f606b16766425d7ca.tar.gz
GitPython-3776f7a766851058f6435b9f606b16766425d7ca.zip
The new channeldesign actually works, but it also shows that its located at the wrong spot. The channel is nothing more than an adapter allowing to read multiple items from a thread-safe queue, the queue itself though must be 'closable' for writing, or needs something like a writable flag.
Diffstat (limited to 'lib/git/async')
-rw-r--r--lib/git/async/channel.py108
-rw-r--r--lib/git/async/task.py6
-rw-r--r--lib/git/async/util.py32
3 files changed, 79 insertions, 67 deletions
diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py
index 2d5ab79c..655024fe 100644
--- a/lib/git/async/channel.py
+++ b/lib/git/async/channel.py
@@ -4,7 +4,11 @@ from Queue import (
Full
)
-from util import AsyncQueue
+from util import (
+ AsyncQueue,
+ DummyLock
+ )
+
from time import time
import sys
@@ -23,12 +27,9 @@ class Channel(object):
def __new__(cls, *args):
if cls is Channel:
- max_items = 0
- if len(args) == 1:
- max_items = args[0]
- if len(args) > 1:
- raise ValueError("Specify not more than the number of items the channel should take")
- wc = WChannel(max_items)
+ if len(args) > 0:
+ raise ValueError("Cannot take any arguments when creating a new channel")
+ wc = WChannel()
rc = RChannel(wc)
return wc, rc
# END constructor mode
@@ -39,11 +40,11 @@ class WChannel(Channel):
"""The write end of a channel"""
__slots__ = ('_closed', '_queue')
- def __init__(self, max_items=0):
+ def __init__(self):
"""initialize this instance, able to hold max_items at once
Write calls will block if the channel is full, until someone reads from it"""
self._closed = False
- self._queue = AsyncQueue(max_items)
+ self._queue = AsyncQueue()
#{ Interface
@@ -74,7 +75,21 @@ class WChannel(Channel):
def close(self):
"""Close the channel. Multiple close calls on a closed channel are no
an error"""
+ mutex = self._queue.mutex
+ mutex.acquire()
+ # this is atomic already, due to the GIL - no need to get the queue's mutex
+ print "channel.close()"
self._closed = True
+ # now make sure that the people waiting for an item are released now
+ # As we it could be that some readers are already on their way to initiate
+ # a blocking get, we must make sure that locks never block before that happens
+
+ # now we are the only one accessing the queue, so change it
+ self._queue.mutex = DummyLock()
+ print self._queue.not_empty._waiters
+ self._queue.not_empty.notify_all()
+ print self._queue.not_empty._waiters
+ mutex.release()
@property
def closed(self):
@@ -134,58 +149,47 @@ class RChannel(Channel):
pass
# END handle exceptions
else:
- # if we have really bad timing, the source of the channel
- # marks itself closed, but before setting it, the thread
- # switches to us. We read it, read False, and try to fetch
- # something, and never return. The whole closed channel thing
- # is not atomic ( of course )
- # This is why we never block for long, to get a chance to recheck
- # for closed channels.
- # We blend this into the timeout of the user
- ourtimeout = 0.25 # the smaller, the more responsive, but the slower
- wc = self._wc
- timeout = (timeout is None and sys.maxint) or timeout # make sure we can compute with it
- assert timeout != 0.0, "shouldn't block if timeout is 0" # okay safe
- if timeout and ourtimeout > timeout:
- ourtimeout = timeout
- # END setup timeout
-
# to get everything into one loop, we set the count accordingly
if count == 0:
count = sys.maxint
# END handle count
+ endtime = sys.maxint # allows timeout for whole operation
+ if timeout is not None:
+ endtime = time() + timeout
+ # could be improved by a separate: no-endtime branch, saving the time calls
for i in xrange(count):
- have_timeout = False
- st = time()
- while True:
+ try:
+ print "about to read", i, count, block, timeout
+ out.append(queue.get(block, timeout))
+ print "got one"
+ except Empty:
+ pass
+ # END ignore empty
+
+ # if we have been unblocked because the closed state changed
+ # in the meanwhile, stop trying
+ # NOTE: must NOT cache _wc
+ if self._wc.closed:
+ # its racing time - all threads waiting for the queue
+ # are awake now, and we actually can't be sure its empty
+ # Hence we pop it empty without blocking, getting as much
+ # as we can. This effectively lets us race ( with mutexes )
+ # of the other threads.
+ print "stopped because it was closed"
try:
- if wc.closed:
- have_timeout = True
- # its about the 'in the meanwhile' :) - get everything
- # we can in non-blocking mode. This will raise
- try:
- while True:
- out.append(queue.get(False))
- # END until it raises Empty
- except Empty:
- break
- # END finally, out of here
- # END don't continue on closed channels
-
- # END abort reading if it was closed ( in the meanwhile )
- out.append(queue.get(block, ourtimeout))
- break # breakout right away
+ while True:
+ out.append(queue.get(False))
+ # END pop it empty
except Empty:
- if timeout - (time() - st) <= 0:
- # hitting timeout
- have_timeout = True
- break
- # END abort if the user wants no more time spent here
- # END handle timeout
- # END endless timer loop
- if have_timeout:
+ pass
+ # END ignore emptyness, we have all
+
break
+ # END handle cloased
+
+ if time() >= endtime:
+ break
# END stop on timeout
# END for each item
# END handle blocking
diff --git a/lib/git/async/task.py b/lib/git/async/task.py
index cf486f48..ce701c86 100644
--- a/lib/git/async/task.py
+++ b/lib/git/async/task.py
@@ -140,10 +140,10 @@ class OutputChannelTask(Node):
# If we appear to be the only one left with our output channel, and are
# closed ( this could have been set in another thread as well ), make
# sure to close the output channel.
- # The count is: 1 = wc itself, 2 = first reader channel, and we have only
- # one, 3 is ours + x for every thread having its copy on the stack
+ # The count is: 1 = wc itself, 2 = first reader channel, + x for every
+ # thread having its copy on the stack
# + 1 for the instance we provide to refcount
- if self.is_done() and sys.getrefcount(self._out_wc) < 5:
+ if self.is_done() and sys.getrefcount(self._out_wc) < 4:
self.close()
# END handle channel closure
#{ Configuration
diff --git a/lib/git/async/util.py b/lib/git/async/util.py
index fb63ccaa..01073f6d 100644
--- a/lib/git/async/util.py
+++ b/lib/git/async/util.py
@@ -73,21 +73,22 @@ class SyncQueue(deque):
class HSCondition(object):
"""An attempt to make conditions less blocking, which gains performance
in return by sleeping less"""
- __slots__ = ("acquire", "release", "_lock", '_waiters')
+ # __slots__ = ("acquire", "release", "_lock", '_waiters')
+ __slots__ = ("_lock", '_waiters')
delay = 0.00002 # reduces wait times, but increases overhead
def __init__(self, lock=None):
if lock is None:
lock = Lock()
self._lock = lock
- self.acquire = lock.acquire
- self.release = lock.release
+ #self.acquire = lock.acquire
+ #self.release = lock.release
self._waiters = list()
- def __release(self):
+ def release(self):
return self._lock.release()
- def __acquire(self, block=None):
+ def acquire(self, block=None):
if block is None:
self._lock.acquire()
else:
@@ -156,7 +157,7 @@ class HSCondition(object):
self.notify(len(self._waiters))
-class AsyncQueue(Queue):
+class _AsyncQueue(Queue):
"""A queue using different condition objects to gain multithreading performance"""
def __init__(self, maxsize=0):
Queue.__init__(self, maxsize)
@@ -166,7 +167,7 @@ class AsyncQueue(Queue):
self.all_tasks_done = HSCondition(self.mutex)
-class _AsyncQueue(Queue):
+class AsyncQueue(Queue):
"""A queue using different condition objects to gain multithreading performance"""
__slots__ = ('mutex', 'not_empty', 'queue')
@@ -194,9 +195,9 @@ class _AsyncQueue(Queue):
self.queue.append(item)
self.mutex.release()
self.not_empty.notify()
-
+
def get(self, block=True, timeout=None):
- self.not_empty.acquire()
+ self.not_empty.acquire() # == self.mutex.acquire in that case
q = self.queue
try:
if not block:
@@ -205,16 +206,23 @@ class _AsyncQueue(Queue):
elif timeout is None:
while not len(q):
self.not_empty.wait()
- elif timeout < 0:
- raise ValueError("'timeout' must be a positive number")
else:
+ print "with timeout", timeout
+ import traceback
+ traceback.print_stack()
endtime = _time() + timeout
while not len(q):
remaining = endtime - _time()
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
- return q.popleft()
+ # END handle block
+ # can happen if someone else woke us up
+ try:
+ return q.popleft()
+ except IndexError:
+ raise Empty
+ # END handle unblocking reason
finally:
self.not_empty.release()