From ab59f78341f1dd188aaf4c30526f6295c63438b1 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sat, 5 Jun 2010 20:03:09 +0200 Subject: Renamed mp to async, as this is a much better name for what is actually going on. The default implementation uses threads, which ends up being nothing more than async, as they are all locked down by internal and the global interpreter lock --- test/git/async/test_channel.py | 61 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100644 test/git/async/test_channel.py (limited to 'test/git/async/test_channel.py') diff --git a/test/git/async/test_channel.py b/test/git/async/test_channel.py new file mode 100644 index 00000000..ad68a8d5 --- /dev/null +++ b/test/git/async/test_channel.py @@ -0,0 +1,61 @@ +"""Channel testing""" +from test.testlib import * +from git.async.channel import * + +import time + +class TestChannels(TestBase): + + def test_base(self): + # creating channel yields a write and a read channal + wc, rc = Channel() + assert isinstance(wc, WChannel) + assert isinstance(rc, RChannel) + + # everything else fails + self.failUnlessRaises(ValueError, Channel, 1, "too many args") + + # TEST UNLIMITED SIZE CHANNEL - writing+reading is FIFO + item = 1 + item2 = 2 + wc.write(item) + wc.write(item2) + assert rc.read() == item + assert rc.read() == item2 + + # next read blocks, then raises - it waits a second + st = time.time() + self.failUnlessRaises(IOError, rc.read, True, 1) + assert time.time() - st >= 1.0 + + # writing to a closed channel raises + assert not wc.closed + wc.close() + assert wc.closed + wc.close() # fine + assert wc.closed + + self.failUnlessRaises(IOError, wc.write, 1) + + # reading from a closed channel never blocks + self.failUnlessRaises(IOError, rc.read) + + + + # TEST LIMITED SIZE CHANNEL + # channel with max-items set + wc, rc = Channel(1) + wc.write(item) # fine + + # blocks for a second, its full + st = time.time() + self.failUnlessRaises(IOError, wc.write, item, True, 1) + assert time.time() - st >= 1.0 + + # get one + assert rc.read() == item + + # its empty,can put one again + wc.write(item2) + assert rc.read() == item2 + wc.close() -- cgit v1.2.3 From b72e2704022d889f116e49abf3e1e5d3e3192d3b Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sun, 6 Jun 2010 01:00:12 +0200 Subject: Improved pool design and started rough implementation, top down to learn while going. Tests will be written soon for verification, its still quite theoretical --- test/git/async/test_channel.py | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) (limited to 'test/git/async/test_channel.py') diff --git a/test/git/async/test_channel.py b/test/git/async/test_channel.py index ad68a8d5..2a3c1585 100644 --- a/test/git/async/test_channel.py +++ b/test/git/async/test_channel.py @@ -20,12 +20,15 @@ class TestChannels(TestBase): item2 = 2 wc.write(item) wc.write(item2) - assert rc.read() == item - assert rc.read() == item2 + + # read all - it blocks as its still open for writing + st = time.time() + assert rc.read(timeout=1) == [item, item2] + assert time.time() - st >= 1.0 # next read blocks, then raises - it waits a second st = time.time() - self.failUnlessRaises(IOError, rc.read, True, 1) + assert len(rc.read(1, True, 1)) == 0 assert time.time() - st >= 1.0 # writing to a closed channel raises @@ -38,7 +41,7 @@ class TestChannels(TestBase): self.failUnlessRaises(IOError, wc.write, 1) # reading from a closed channel never blocks - self.failUnlessRaises(IOError, rc.read) + assert len(rc.read()) == 0 @@ -49,13 +52,16 @@ class TestChannels(TestBase): # blocks for a second, its full st = time.time() - self.failUnlessRaises(IOError, wc.write, item, True, 1) + self.failUnlessRaises(EOFError, wc.write, item, True, 1) assert time.time() - st >= 1.0 - # get one - assert rc.read() == item + # get our only one + assert rc.read(1)[0] == item # its empty,can put one again wc.write(item2) - assert rc.read() == item2 wc.close() + + # reading 10 will only yield one, it will not block as its closed + assert rc.read(10, timeout=1)[0] == item2 + -- cgit v1.2.3 From 867129e2950458ab75523b920a5e227e3efa8bbc Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sun, 6 Jun 2010 23:08:06 +0200 Subject: channel.read: enhanced to be sure we don't run into non-atomicity issues related to our channel closed flag, which is the only way not to block forever on read(0) channels which were closed by a thread 'in the meanwhile' --- test/git/async/test_channel.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'test/git/async/test_channel.py') diff --git a/test/git/async/test_channel.py b/test/git/async/test_channel.py index 2a3c1585..6472b5b5 100644 --- a/test/git/async/test_channel.py +++ b/test/git/async/test_channel.py @@ -26,7 +26,7 @@ class TestChannels(TestBase): assert rc.read(timeout=1) == [item, item2] assert time.time() - st >= 1.0 - # next read blocks, then raises - it waits a second + # next read blocks. it waits a second st = time.time() assert len(rc.read(1, True, 1)) == 0 assert time.time() - st >= 1.0 -- cgit v1.2.3 From a8a448b7864e21db46184eab0f0a21d7725d074f Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Mon, 7 Jun 2010 10:38:22 +0200 Subject: pool.consumed_tasks: is now a queue to be thread safe, in preparation for multiple connected pools Reduced waiting time in tests to make them complete faster --- test/git/async/test_channel.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) (limited to 'test/git/async/test_channel.py') diff --git a/test/git/async/test_channel.py b/test/git/async/test_channel.py index 6472b5b5..acfbd15e 100644 --- a/test/git/async/test_channel.py +++ b/test/git/async/test_channel.py @@ -22,14 +22,15 @@ class TestChannels(TestBase): wc.write(item2) # read all - it blocks as its still open for writing + to = 0.2 st = time.time() - assert rc.read(timeout=1) == [item, item2] - assert time.time() - st >= 1.0 + assert rc.read(timeout=to) == [item, item2] + assert time.time() - st >= to # next read blocks. it waits a second st = time.time() - assert len(rc.read(1, True, 1)) == 0 - assert time.time() - st >= 1.0 + assert len(rc.read(1, True, to)) == 0 + assert time.time() - st >= to # writing to a closed channel raises assert not wc.closed @@ -50,10 +51,10 @@ class TestChannels(TestBase): wc, rc = Channel(1) wc.write(item) # fine - # blocks for a second, its full + # blocks for a a moment, its full st = time.time() - self.failUnlessRaises(EOFError, wc.write, item, True, 1) - assert time.time() - st >= 1.0 + self.failUnlessRaises(EOFError, wc.write, item, True, to) + assert time.time() - st >= to # get our only one assert rc.read(1)[0] == item -- cgit v1.2.3 From 3776f7a766851058f6435b9f606b16766425d7ca Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 13:24:44 +0200 Subject: The new channeldesign actually works, but it also shows that its located at the wrong spot. The channel is nothing more than an adapter allowing to read multiple items from a thread-safe queue, the queue itself though must be 'closable' for writing, or needs something like a writable flag. --- test/git/async/test_channel.py | 24 +----------------------- 1 file changed, 1 insertion(+), 23 deletions(-) (limited to 'test/git/async/test_channel.py') diff --git a/test/git/async/test_channel.py b/test/git/async/test_channel.py index acfbd15e..25eb974c 100644 --- a/test/git/async/test_channel.py +++ b/test/git/async/test_channel.py @@ -43,26 +43,4 @@ class TestChannels(TestBase): # reading from a closed channel never blocks assert len(rc.read()) == 0 - - - - # TEST LIMITED SIZE CHANNEL - # channel with max-items set - wc, rc = Channel(1) - wc.write(item) # fine - - # blocks for a a moment, its full - st = time.time() - self.failUnlessRaises(EOFError, wc.write, item, True, to) - assert time.time() - st >= to - - # get our only one - assert rc.read(1)[0] == item - - # its empty,can put one again - wc.write(item2) - wc.close() - - # reading 10 will only yield one, it will not block as its closed - assert rc.read(10, timeout=1)[0] == item2 - + -- cgit v1.2.3 From 619c11787742ce00a0ee8f841cec075897873c79 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 16:47:48 +0200 Subject: Its getting better already - intermediate commit before further chaning the task class --- test/git/async/test_channel.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'test/git/async/test_channel.py') diff --git a/test/git/async/test_channel.py b/test/git/async/test_channel.py index 25eb974c..ab4ae015 100644 --- a/test/git/async/test_channel.py +++ b/test/git/async/test_channel.py @@ -42,5 +42,9 @@ class TestChannels(TestBase): self.failUnlessRaises(IOError, wc.write, 1) # reading from a closed channel never blocks + print "preblock" assert len(rc.read()) == 0 - + print "got read(0)" + assert len(rc.read(5)) == 0 + assert len(rc.read(1)) == 0 + -- cgit v1.2.3 From 0974f8737a3c56a7c076f9d0b757c6cb106324fb Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 9 Jun 2010 14:47:41 +0200 Subject: Channel: Read method revised - now it really really doesn't block anymore, and it runs faster as well, about 2/3 of the performance we have when being in serial mode --- test/git/async/test_channel.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) (limited to 'test/git/async/test_channel.py') diff --git a/test/git/async/test_channel.py b/test/git/async/test_channel.py index ab4ae015..32458f31 100644 --- a/test/git/async/test_channel.py +++ b/test/git/async/test_channel.py @@ -33,18 +33,16 @@ class TestChannels(TestBase): assert time.time() - st >= to # writing to a closed channel raises - assert not wc.closed + assert not wc.closed() wc.close() - assert wc.closed + assert wc.closed() wc.close() # fine - assert wc.closed + assert wc.closed() - self.failUnlessRaises(IOError, wc.write, 1) + self.failUnlessRaises(ReadOnly, wc.write, 1) # reading from a closed channel never blocks - print "preblock" assert len(rc.read()) == 0 - print "got read(0)" assert len(rc.read(5)) == 0 assert len(rc.read(1)) == 0 -- cgit v1.2.3 From 57a4e09294230a36cc874a6272c71757c48139f2 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 9 Jun 2010 15:29:47 +0200 Subject: Channel: removed pseudoconstructor, which clearly improves the design and makes it easier to constomize pool: in serial mode, created channels will be serial-only, which brings 15% of performance --- test/git/async/test_channel.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) (limited to 'test/git/async/test_channel.py') diff --git a/test/git/async/test_channel.py b/test/git/async/test_channel.py index 32458f31..444a076a 100644 --- a/test/git/async/test_channel.py +++ b/test/git/async/test_channel.py @@ -8,12 +8,10 @@ class TestChannels(TestBase): def test_base(self): # creating channel yields a write and a read channal - wc, rc = Channel() - assert isinstance(wc, WChannel) + wc, rc = mkchannel() + assert isinstance(wc, WChannel) # default args assert isinstance(rc, RChannel) - # everything else fails - self.failUnlessRaises(ValueError, Channel, 1, "too many args") # TEST UNLIMITED SIZE CHANNEL - writing+reading is FIFO item = 1 -- cgit v1.2.3 From ea81f14dafbfb24d70373c74b5f8dabf3f2225d9 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 9 Jun 2010 16:38:21 +0200 Subject: Channel: Callbacks reviewed - they are now part of Subclasses of the default channel implementation, one of which is used as base by the Pool Read channel, releasing it of the duty to call these itself. The write channel with callback subclass allows the transformation of the item to be written --- test/git/async/test_channel.py | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) (limited to 'test/git/async/test_channel.py') diff --git a/test/git/async/test_channel.py b/test/git/async/test_channel.py index 444a076a..215081cd 100644 --- a/test/git/async/test_channel.py +++ b/test/git/async/test_channel.py @@ -44,3 +44,30 @@ class TestChannels(TestBase): assert len(rc.read(5)) == 0 assert len(rc.read(1)) == 0 + + # test callback channels + wc, rc = mkchannel(wctype = CallbackWChannel, rctype = CallbackRChannel) + + cb = [0, 0] # set slots to one if called + def pre_write(item): + cb[0] = 1 + return item + 1 + def pre_read(count): + cb[1] = 1 + + # set, verify it returns previous one + assert wc.set_pre_cb(pre_write) is None + assert rc.set_pre_cb(pre_read) is None + assert wc.set_pre_cb(pre_write) is pre_write + assert rc.set_pre_cb(pre_read) is pre_read + + # writer transforms input + val = 5 + wc.write(val) + assert cb[0] == 1 and cb[1] == 0 + + rval = rc.read(1)[0] # read one item, must not block + assert cb[0] == 1 and cb[1] == 1 + assert rval == val + 1 + + -- cgit v1.2.3 From 55e757928e493ce93056822d510482e4ffcaac2d Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 10 Jun 2010 14:39:57 +0200 Subject: channel: Changed design to be more logical - a channel now has any amount of readers and writers, a ready is not connected to its writer anymore. This changes the refcounting of course, which is why the auto-cleanup for the pool is currently broken. The benefit of this are faster writes to the channel, reading didn't improve, refcounts should be clearer now --- test/git/async/test_channel.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'test/git/async/test_channel.py') diff --git a/test/git/async/test_channel.py b/test/git/async/test_channel.py index 215081cd..a24c7c91 100644 --- a/test/git/async/test_channel.py +++ b/test/git/async/test_channel.py @@ -9,8 +9,8 @@ class TestChannels(TestBase): def test_base(self): # creating channel yields a write and a read channal wc, rc = mkchannel() - assert isinstance(wc, WChannel) # default args - assert isinstance(rc, RChannel) + assert isinstance(wc, Writer) # default args + assert isinstance(rc, Reader) # TEST UNLIMITED SIZE CHANNEL - writing+reading is FIFO @@ -46,7 +46,7 @@ class TestChannels(TestBase): # test callback channels - wc, rc = mkchannel(wctype = CallbackWChannel, rctype = CallbackRChannel) + wc, rc = mkchannel(wtype = CallbackWriter, rtype = CallbackReader) cb = [0, 0] # set slots to one if called def pre_write(item): -- cgit v1.2.3 From be8955a0fbb77d673587974b763f17c214904b57 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sat, 12 Jun 2010 11:19:18 +0200 Subject: Cleaned up channel design, Reader and Writer bases don't require a channel anymore, but are abstract. Added IteratorReader, implementing the reader interface from an iterator. The implementation moved from the TaskIterator to the channel --- test/git/async/test_channel.py | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) (limited to 'test/git/async/test_channel.py') diff --git a/test/git/async/test_channel.py b/test/git/async/test_channel.py index a24c7c91..e9e1b64c 100644 --- a/test/git/async/test_channel.py +++ b/test/git/async/test_channel.py @@ -9,8 +9,8 @@ class TestChannels(TestBase): def test_base(self): # creating channel yields a write and a read channal wc, rc = mkchannel() - assert isinstance(wc, Writer) # default args - assert isinstance(rc, Reader) + assert isinstance(wc, ChannelWriter) # default args + assert isinstance(rc, ChannelReader) # TEST UNLIMITED SIZE CHANNEL - writing+reading is FIFO @@ -46,7 +46,7 @@ class TestChannels(TestBase): # test callback channels - wc, rc = mkchannel(wtype = CallbackWriter, rtype = CallbackReader) + wc, rc = mkchannel(wtype = CallbackChannelWriter, rtype = CallbackChannelReader) cb = [0, 0] # set slots to one if called def pre_write(item): @@ -71,3 +71,17 @@ class TestChannels(TestBase): assert rval == val + 1 + + # ITERATOR READER + reader = IteratorReader(iter(range(10))) + assert len(reader.read(2)) == 2 + assert len(reader.read(0)) == 8 + # its empty now + assert len(reader.read(0)) == 0 + assert len(reader.read(5)) == 0 + + # doesn't work if item is not an iterator + self.failUnlessRaises(ValueError, IteratorReader, list()) + + # NOTE: its thread-safety is tested by the pool + -- cgit v1.2.3