From c69b6b979e3d6bd01ec40e75b92b21f7a391f0ca Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sat, 5 Jun 2010 15:56:14 +0200 Subject: Added basic channel implementation including test restructured odb tests, they are now in an own module to keep the modules small --- test/git/odb/__init__.py | 1 + test/git/odb/lib.py | 60 +++++++++++++++ test/git/odb/test_channel.py | 61 +++++++++++++++ test/git/odb/test_db.py | 90 ++++++++++++++++++++++ test/git/odb/test_stream.py | 172 +++++++++++++++++++++++++++++++++++++++++++ test/git/odb/test_utils.py | 15 ++++ 6 files changed, 399 insertions(+) create mode 100644 test/git/odb/__init__.py create mode 100644 test/git/odb/lib.py create mode 100644 test/git/odb/test_channel.py create mode 100644 test/git/odb/test_db.py create mode 100644 test/git/odb/test_stream.py create mode 100644 test/git/odb/test_utils.py (limited to 'test/git/odb') diff --git a/test/git/odb/__init__.py b/test/git/odb/__init__.py new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/test/git/odb/__init__.py @@ -0,0 +1 @@ + diff --git a/test/git/odb/lib.py b/test/git/odb/lib.py new file mode 100644 index 00000000..d5199748 --- /dev/null +++ b/test/git/odb/lib.py @@ -0,0 +1,60 @@ +"""Utilities used in ODB testing""" +from git.odb import ( + OStream, + ) +from git.odb.stream import Sha1Writer + +import zlib +from cStringIO import StringIO + +#{ Stream Utilities + +class DummyStream(object): + def __init__(self): + self.was_read = False + self.bytes = 0 + self.closed = False + + def read(self, size): + self.was_read = True + self.bytes = size + + def close(self): + self.closed = True + + def _assert(self): + assert self.was_read + + +class DeriveTest(OStream): + def __init__(self, sha, type, size, stream, *args, **kwargs): + self.myarg = kwargs.pop('myarg') + self.args = args + + def _assert(self): + assert self.args + assert self.myarg + + +class ZippedStoreShaWriter(Sha1Writer): + """Remembers everything someone writes to it""" + __slots__ = ('buf', 'zip') + def __init__(self): + Sha1Writer.__init__(self) + self.buf = StringIO() + self.zip = zlib.compressobj(1) # fastest + + def __getattr__(self, attr): + return getattr(self.buf, attr) + + def write(self, data): + alen = Sha1Writer.write(self, data) + self.buf.write(self.zip.compress(data)) + return alen + + def close(self): + self.buf.write(self.zip.flush()) + + +#} END stream utilitiess + diff --git a/test/git/odb/test_channel.py b/test/git/odb/test_channel.py new file mode 100644 index 00000000..89b26582 --- /dev/null +++ b/test/git/odb/test_channel.py @@ -0,0 +1,61 @@ +"""Channel testing""" +from test.testlib import * +from git.odb.channel import * + +import time + +class TestDB(TestBase): + + def test_base(self): + # creating channel yields a write and a read channal + wc, rc = Channel() + assert isinstance(wc, WChannel) + assert isinstance(rc, RChannel) + + # everything else fails + self.failUnlessRaises(ValueError, Channel, 1, "too many args") + + # TEST UNLIMITED SIZE CHANNEL - writing+reading is FIFO + item = 1 + item2 = 2 + wc.write(item) + wc.write(item2) + assert rc.read() == item + assert rc.read() == item2 + + # next read blocks, then raises - it waits a second + st = time.time() + self.failUnlessRaises(IOError, rc.read, True, 1) + assert time.time() - st >= 1.0 + + # writing to a closed channel raises + assert not wc.closed + wc.close() + assert wc.closed + wc.close() # fine + assert wc.closed + + self.failUnlessRaises(IOError, wc.write, 1) + + # reading from a closed channel never blocks + self.failUnlessRaises(IOError, rc.read) + + + + # TEST LIMITED SIZE CHANNEL + # channel with max-items set + wc, rc = Channel(1) + wc.write(item) # fine + + # blocks for a second, its full + st = time.time() + self.failUnlessRaises(IOError, wc.write, item, True, 1) + assert time.time() - st >= 1.0 + + # get one + assert rc.read() == item + + # its empty,can put one again + wc.write(item2) + assert rc.read() == item2 + wc.close() diff --git a/test/git/odb/test_db.py b/test/git/odb/test_db.py new file mode 100644 index 00000000..35ba8680 --- /dev/null +++ b/test/git/odb/test_db.py @@ -0,0 +1,90 @@ +"""Test for object db""" +from test.testlib import * +from lib import ZippedStoreShaWriter + +from git.odb import * +from git.odb.stream import Sha1Writer +from git import Blob +from git.errors import BadObject + + +from cStringIO import StringIO +import os + +class TestDB(TestBase): + """Test the different db class implementations""" + + # data + two_lines = "1234\nhello world" + + all_data = (two_lines, ) + + def _assert_object_writing(self, db): + """General tests to verify object writing, compatible to ObjectDBW + :note: requires write access to the database""" + # start in 'dry-run' mode, using a simple sha1 writer + ostreams = (ZippedStoreShaWriter, None) + for ostreamcls in ostreams: + for data in self.all_data: + dry_run = ostreamcls is not None + ostream = None + if ostreamcls is not None: + ostream = ostreamcls() + assert isinstance(ostream, Sha1Writer) + # END create ostream + + prev_ostream = db.set_ostream(ostream) + assert type(prev_ostream) in ostreams or prev_ostream in ostreams + + istream = IStream(Blob.type, len(data), StringIO(data)) + + # store returns same istream instance, with new sha set + my_istream = db.store(istream) + sha = istream.sha + assert my_istream is istream + assert db.has_object(sha) != dry_run + assert len(sha) == 40 # for now we require 40 byte shas as default + + # verify data - the slow way, we want to run code + if not dry_run: + info = db.info(sha) + assert Blob.type == info.type + assert info.size == len(data) + + ostream = db.stream(sha) + assert ostream.read() == data + assert ostream.type == Blob.type + assert ostream.size == len(data) + else: + self.failUnlessRaises(BadObject, db.info, sha) + self.failUnlessRaises(BadObject, db.stream, sha) + + # DIRECT STREAM COPY + # our data hase been written in object format to the StringIO + # we pasesd as output stream. No physical database representation + # was created. + # Test direct stream copy of object streams, the result must be + # identical to what we fed in + ostream.seek(0) + istream.stream = ostream + assert istream.sha is not None + prev_sha = istream.sha + + db.set_ostream(ZippedStoreShaWriter()) + db.store(istream) + assert istream.sha == prev_sha + new_ostream = db.ostream() + + # note: only works as long our store write uses the same compression + # level, which is zip + assert ostream.getvalue() == new_ostream.getvalue() + # END for each data set + # END for each dry_run mode + + @with_bare_rw_repo + def test_writing(self, rwrepo): + ldb = LooseObjectDB(os.path.join(rwrepo.git_dir, 'objects')) + + # write data + self._assert_object_writing(ldb) + diff --git a/test/git/odb/test_stream.py b/test/git/odb/test_stream.py new file mode 100644 index 00000000..020fe6bd --- /dev/null +++ b/test/git/odb/test_stream.py @@ -0,0 +1,172 @@ +"""Test for object db""" +from test.testlib import * +from lib import ( + DummyStream, + DeriveTest, + Sha1Writer + ) + +from git.odb import * +from git import Blob +from cStringIO import StringIO +import tempfile +import os +import zlib + + + + +class TestStream(TestBase): + """Test stream classes""" + + data_sizes = (15, 10000, 1000*1024+512) + + def test_streams(self): + # test info + sha = Blob.NULL_HEX_SHA + s = 20 + info = OInfo(sha, Blob.type, s) + assert info.sha == sha + assert info.type == Blob.type + assert info.size == s + + # test ostream + stream = DummyStream() + ostream = OStream(*(info + (stream, ))) + ostream.read(15) + stream._assert() + assert stream.bytes == 15 + ostream.read(20) + assert stream.bytes == 20 + + # derive with own args + DeriveTest(sha, Blob.type, s, stream, 'mine',myarg = 3)._assert() + + # test istream + istream = IStream(Blob.type, s, stream) + assert istream.sha == None + istream.sha = sha + assert istream.sha == sha + + assert len(istream.binsha) == 20 + assert len(istream.hexsha) == 40 + + assert istream.size == s + istream.size = s * 2 + istream.size == s * 2 + assert istream.type == Blob.type + istream.type = "something" + assert istream.type == "something" + assert istream.stream is stream + istream.stream = None + assert istream.stream is None + + assert istream.error is None + istream.error = Exception() + assert isinstance(istream.error, Exception) + + def _assert_stream_reader(self, stream, cdata, rewind_stream=lambda s: None): + """Make stream tests - the orig_stream is seekable, allowing it to be + rewound and reused + :param cdata: the data we expect to read from stream, the contents + :param rewind_stream: function called to rewind the stream to make it ready + for reuse""" + ns = 10 + assert len(cdata) > ns-1, "Data must be larger than %i, was %i" % (ns, len(cdata)) + + # read in small steps + ss = len(cdata) / ns + for i in range(ns): + data = stream.read(ss) + chunk = cdata[i*ss:(i+1)*ss] + assert data == chunk + # END for each step + rest = stream.read() + if rest: + assert rest == cdata[-len(rest):] + # END handle rest + + rewind_stream(stream) + + # read everything + rdata = stream.read() + assert rdata == cdata + + def test_decompress_reader(self): + for close_on_deletion in range(2): + for with_size in range(2): + for ds in self.data_sizes: + cdata = make_bytes(ds, randomize=False) + + # zdata = zipped actual data + # cdata = original content data + + # create reader + if with_size: + # need object data + zdata = zlib.compress(make_object(Blob.type, cdata)) + type, size, reader = DecompressMemMapReader.new(zdata, close_on_deletion) + assert size == len(cdata) + assert type == Blob.type + else: + # here we need content data + zdata = zlib.compress(cdata) + reader = DecompressMemMapReader(zdata, close_on_deletion, len(cdata)) + assert reader._s == len(cdata) + # END get reader + + def rewind(r): + r._zip = zlib.decompressobj() + r._br = r._cws = r._cwe = 0 + if with_size: + r._parse_header_info() + # END skip header + # END make rewind func + + self._assert_stream_reader(reader, cdata, rewind) + + # put in a dummy stream for closing + dummy = DummyStream() + reader._m = dummy + + assert not dummy.closed + del(reader) + assert dummy.closed == close_on_deletion + #zdi# + # END for each datasize + # END whether size should be used + # END whether stream should be closed when deleted + + def test_sha_writer(self): + writer = Sha1Writer() + assert 2 == writer.write("hi") + assert len(writer.sha(as_hex=1)) == 40 + assert len(writer.sha(as_hex=0)) == 20 + + # make sure it does something ;) + prev_sha = writer.sha() + writer.write("hi again") + assert writer.sha() != prev_sha + + def test_compressed_writer(self): + for ds in self.data_sizes: + fd, path = tempfile.mkstemp() + ostream = FDCompressedSha1Writer(fd) + data = make_bytes(ds, randomize=False) + + # for now, just a single write, code doesn't care about chunking + assert len(data) == ostream.write(data) + ostream.close() + # its closed already + self.failUnlessRaises(OSError, os.close, fd) + + # read everything back, compare to data we zip + fd = os.open(path, os.O_RDONLY) + written_data = os.read(fd, os.path.getsize(path)) + os.close(fd) + assert written_data == zlib.compress(data, 1) # best speed + + os.remove(path) + # END for each os + + diff --git a/test/git/odb/test_utils.py b/test/git/odb/test_utils.py new file mode 100644 index 00000000..34572b37 --- /dev/null +++ b/test/git/odb/test_utils.py @@ -0,0 +1,15 @@ +"""Test for object db""" +from test.testlib import * +from git import Blob +from git.odb.utils import ( + to_hex_sha, + to_bin_sha + ) + + +class TestUtils(TestBase): + def test_basics(self): + assert to_hex_sha(Blob.NULL_HEX_SHA) == Blob.NULL_HEX_SHA + assert len(to_bin_sha(Blob.NULL_HEX_SHA)) == 20 + assert to_hex_sha(to_bin_sha(Blob.NULL_HEX_SHA)) == Blob.NULL_HEX_SHA + -- cgit v1.2.3 From 65c9fe0baa579173afa5a2d463ac198d06ef4993 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sat, 5 Jun 2010 16:07:19 +0200 Subject: A code donation: Donating a worker thread implementation inclduding tests to Git-Python. I have the feeling it can do much good here :) --- test/git/odb/test_thread.py | 47 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) create mode 100644 test/git/odb/test_thread.py (limited to 'test/git/odb') diff --git a/test/git/odb/test_thread.py b/test/git/odb/test_thread.py new file mode 100644 index 00000000..3b7f749b --- /dev/null +++ b/test/git/odb/test_thread.py @@ -0,0 +1,47 @@ +# -*- coding: utf-8 -*- +""" Test thead classes and functions""" +from test.testlib import * +from git.odb.thread import * +from Queue import Queue + +class TestWorker(WorkerThread): + def __init__(self, *args, **kwargs): + super(TestWorker, self).__init__(*args, **kwargs) + self.reset() + + def fun(self, *args, **kwargs): + self.called = True + self.args = args + self.kwargs = kwargs + return True + + def make_assertion(self): + assert self.called + assert self.args + assert self.kwargs + self.reset() + + def reset(self): + self.called = False + self.args = None + self.kwargs = None + + +class TestCase( TestCase ): + + @terminate_threads + def test_worker_thread(self): + worker = TestWorker() + assert isinstance(worker.start(), WorkerThread) + + # test different method types + standalone_func = lambda *args, **kwargs: worker.fun(*args, **kwargs) + for function in ("fun", TestWorker.fun, worker.fun, standalone_func): + rval = worker.call(function, 1, this='that') + assert isinstance(rval, Queue) + assert rval.get() is True + worker.make_assertion() + # END for each function type + + worker.call('quit') + -- cgit v1.2.3 From 50e469109eed3a752d9a1b0297f16466ad92f8d2 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sat, 5 Jun 2010 18:26:26 +0200 Subject: Initial pool design added, allowing for lazy channel based evaluation of inter-dependent tasks --- test/git/odb/test_channel.py | 2 +- test/git/odb/test_pool.py | 10 ++++++++++ test/git/odb/test_thread.py | 2 +- 3 files changed, 12 insertions(+), 2 deletions(-) create mode 100644 test/git/odb/test_pool.py (limited to 'test/git/odb') diff --git a/test/git/odb/test_channel.py b/test/git/odb/test_channel.py index 89b26582..d845a6ec 100644 --- a/test/git/odb/test_channel.py +++ b/test/git/odb/test_channel.py @@ -4,7 +4,7 @@ from git.odb.channel import * import time -class TestDB(TestBase): +class TestChannels(TestBase): def test_base(self): # creating channel yields a write and a read channal diff --git a/test/git/odb/test_pool.py b/test/git/odb/test_pool.py new file mode 100644 index 00000000..6656c69d --- /dev/null +++ b/test/git/odb/test_pool.py @@ -0,0 +1,10 @@ +"""Channel testing""" +from test.testlib import * +from git.odb.pool import * + +import time + +class TestThreadPool(TestBase): + + def test_base(self): + pass diff --git a/test/git/odb/test_thread.py b/test/git/odb/test_thread.py index 3b7f749b..674ecc1d 100644 --- a/test/git/odb/test_thread.py +++ b/test/git/odb/test_thread.py @@ -27,7 +27,7 @@ class TestWorker(WorkerThread): self.kwargs = None -class TestCase( TestCase ): +class TestThreads( TestCase ): @terminate_threads def test_worker_thread(self): -- cgit v1.2.3 From 61138f2ece0cb864b933698174315c34a78835d1 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sat, 5 Jun 2010 19:59:17 +0200 Subject: Moved multiprocessing modules into own package, as they in fact have nothing to do with the object db. If that really works the way I want, it will become an own project, called async --- test/git/odb/test_channel.py | 61 -------------------------------------------- test/git/odb/test_pool.py | 10 -------- test/git/odb/test_thread.py | 47 ---------------------------------- 3 files changed, 118 deletions(-) delete mode 100644 test/git/odb/test_channel.py delete mode 100644 test/git/odb/test_pool.py delete mode 100644 test/git/odb/test_thread.py (limited to 'test/git/odb') diff --git a/test/git/odb/test_channel.py b/test/git/odb/test_channel.py deleted file mode 100644 index d845a6ec..00000000 --- a/test/git/odb/test_channel.py +++ /dev/null @@ -1,61 +0,0 @@ -"""Channel testing""" -from test.testlib import * -from git.odb.channel import * - -import time - -class TestChannels(TestBase): - - def test_base(self): - # creating channel yields a write and a read channal - wc, rc = Channel() - assert isinstance(wc, WChannel) - assert isinstance(rc, RChannel) - - # everything else fails - self.failUnlessRaises(ValueError, Channel, 1, "too many args") - - # TEST UNLIMITED SIZE CHANNEL - writing+reading is FIFO - item = 1 - item2 = 2 - wc.write(item) - wc.write(item2) - assert rc.read() == item - assert rc.read() == item2 - - # next read blocks, then raises - it waits a second - st = time.time() - self.failUnlessRaises(IOError, rc.read, True, 1) - assert time.time() - st >= 1.0 - - # writing to a closed channel raises - assert not wc.closed - wc.close() - assert wc.closed - wc.close() # fine - assert wc.closed - - self.failUnlessRaises(IOError, wc.write, 1) - - # reading from a closed channel never blocks - self.failUnlessRaises(IOError, rc.read) - - - - # TEST LIMITED SIZE CHANNEL - # channel with max-items set - wc, rc = Channel(1) - wc.write(item) # fine - - # blocks for a second, its full - st = time.time() - self.failUnlessRaises(IOError, wc.write, item, True, 1) - assert time.time() - st >= 1.0 - - # get one - assert rc.read() == item - - # its empty,can put one again - wc.write(item2) - assert rc.read() == item2 - wc.close() diff --git a/test/git/odb/test_pool.py b/test/git/odb/test_pool.py deleted file mode 100644 index 6656c69d..00000000 --- a/test/git/odb/test_pool.py +++ /dev/null @@ -1,10 +0,0 @@ -"""Channel testing""" -from test.testlib import * -from git.odb.pool import * - -import time - -class TestThreadPool(TestBase): - - def test_base(self): - pass diff --git a/test/git/odb/test_thread.py b/test/git/odb/test_thread.py deleted file mode 100644 index 674ecc1d..00000000 --- a/test/git/odb/test_thread.py +++ /dev/null @@ -1,47 +0,0 @@ -# -*- coding: utf-8 -*- -""" Test thead classes and functions""" -from test.testlib import * -from git.odb.thread import * -from Queue import Queue - -class TestWorker(WorkerThread): - def __init__(self, *args, **kwargs): - super(TestWorker, self).__init__(*args, **kwargs) - self.reset() - - def fun(self, *args, **kwargs): - self.called = True - self.args = args - self.kwargs = kwargs - return True - - def make_assertion(self): - assert self.called - assert self.args - assert self.kwargs - self.reset() - - def reset(self): - self.called = False - self.args = None - self.kwargs = None - - -class TestThreads( TestCase ): - - @terminate_threads - def test_worker_thread(self): - worker = TestWorker() - assert isinstance(worker.start(), WorkerThread) - - # test different method types - standalone_func = lambda *args, **kwargs: worker.fun(*args, **kwargs) - for function in ("fun", TestWorker.fun, worker.fun, standalone_func): - rval = worker.call(function, 1, this='that') - assert isinstance(rval, Queue) - assert rval.get() is True - worker.make_assertion() - # END for each function type - - worker.call('quit') - -- cgit v1.2.3