aboutsummaryrefslogtreecommitdiff
path: root/lib/git/async/channel.py
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-08 13:24:44 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-08 13:24:44 +0200
commit3776f7a766851058f6435b9f606b16766425d7ca (patch)
tree8096d6d84090f4abc5aad374c1fe6f64088572a6 /lib/git/async/channel.py
parent09c3f39ceb545e1198ad7a3f470d4ec896ce1add (diff)
downloadGitPython-3776f7a766851058f6435b9f606b16766425d7ca.tar.gz
GitPython-3776f7a766851058f6435b9f606b16766425d7ca.zip
The new channeldesign actually works, but it also shows that its located at the wrong spot. The channel is nothing more than an adapter allowing to read multiple items from a thread-safe queue, the queue itself though must be 'closable' for writing, or needs something like a writable flag.
Diffstat (limited to 'lib/git/async/channel.py')
-rw-r--r--lib/git/async/channel.py108
1 files changed, 56 insertions, 52 deletions
diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py
index 2d5ab79c..655024fe 100644
--- a/lib/git/async/channel.py
+++ b/lib/git/async/channel.py
@@ -4,7 +4,11 @@ from Queue import (
Full
)
-from util import AsyncQueue
+from util import (
+ AsyncQueue,
+ DummyLock
+ )
+
from time import time
import sys
@@ -23,12 +27,9 @@ class Channel(object):
def __new__(cls, *args):
if cls is Channel:
- max_items = 0
- if len(args) == 1:
- max_items = args[0]
- if len(args) > 1:
- raise ValueError("Specify not more than the number of items the channel should take")
- wc = WChannel(max_items)
+ if len(args) > 0:
+ raise ValueError("Cannot take any arguments when creating a new channel")
+ wc = WChannel()
rc = RChannel(wc)
return wc, rc
# END constructor mode
@@ -39,11 +40,11 @@ class WChannel(Channel):
"""The write end of a channel"""
__slots__ = ('_closed', '_queue')
- def __init__(self, max_items=0):
+ def __init__(self):
"""initialize this instance, able to hold max_items at once
Write calls will block if the channel is full, until someone reads from it"""
self._closed = False
- self._queue = AsyncQueue(max_items)
+ self._queue = AsyncQueue()
#{ Interface
@@ -74,7 +75,21 @@ class WChannel(Channel):
def close(self):
"""Close the channel. Multiple close calls on a closed channel are no
an error"""
+ mutex = self._queue.mutex
+ mutex.acquire()
+ # this is atomic already, due to the GIL - no need to get the queue's mutex
+ print "channel.close()"
self._closed = True
+ # now make sure that the people waiting for an item are released now
+ # As we it could be that some readers are already on their way to initiate
+ # a blocking get, we must make sure that locks never block before that happens
+
+ # now we are the only one accessing the queue, so change it
+ self._queue.mutex = DummyLock()
+ print self._queue.not_empty._waiters
+ self._queue.not_empty.notify_all()
+ print self._queue.not_empty._waiters
+ mutex.release()
@property
def closed(self):
@@ -134,58 +149,47 @@ class RChannel(Channel):
pass
# END handle exceptions
else:
- # if we have really bad timing, the source of the channel
- # marks itself closed, but before setting it, the thread
- # switches to us. We read it, read False, and try to fetch
- # something, and never return. The whole closed channel thing
- # is not atomic ( of course )
- # This is why we never block for long, to get a chance to recheck
- # for closed channels.
- # We blend this into the timeout of the user
- ourtimeout = 0.25 # the smaller, the more responsive, but the slower
- wc = self._wc
- timeout = (timeout is None and sys.maxint) or timeout # make sure we can compute with it
- assert timeout != 0.0, "shouldn't block if timeout is 0" # okay safe
- if timeout and ourtimeout > timeout:
- ourtimeout = timeout
- # END setup timeout
-
# to get everything into one loop, we set the count accordingly
if count == 0:
count = sys.maxint
# END handle count
+ endtime = sys.maxint # allows timeout for whole operation
+ if timeout is not None:
+ endtime = time() + timeout
+ # could be improved by a separate: no-endtime branch, saving the time calls
for i in xrange(count):
- have_timeout = False
- st = time()
- while True:
+ try:
+ print "about to read", i, count, block, timeout
+ out.append(queue.get(block, timeout))
+ print "got one"
+ except Empty:
+ pass
+ # END ignore empty
+
+ # if we have been unblocked because the closed state changed
+ # in the meanwhile, stop trying
+ # NOTE: must NOT cache _wc
+ if self._wc.closed:
+ # its racing time - all threads waiting for the queue
+ # are awake now, and we actually can't be sure its empty
+ # Hence we pop it empty without blocking, getting as much
+ # as we can. This effectively lets us race ( with mutexes )
+ # of the other threads.
+ print "stopped because it was closed"
try:
- if wc.closed:
- have_timeout = True
- # its about the 'in the meanwhile' :) - get everything
- # we can in non-blocking mode. This will raise
- try:
- while True:
- out.append(queue.get(False))
- # END until it raises Empty
- except Empty:
- break
- # END finally, out of here
- # END don't continue on closed channels
-
- # END abort reading if it was closed ( in the meanwhile )
- out.append(queue.get(block, ourtimeout))
- break # breakout right away
+ while True:
+ out.append(queue.get(False))
+ # END pop it empty
except Empty:
- if timeout - (time() - st) <= 0:
- # hitting timeout
- have_timeout = True
- break
- # END abort if the user wants no more time spent here
- # END handle timeout
- # END endless timer loop
- if have_timeout:
+ pass
+ # END ignore emptyness, we have all
+
break
+ # END handle cloased
+
+ if time() >= endtime:
+ break
# END stop on timeout
# END for each item
# END handle blocking