diff options
Diffstat (limited to 'git/test/db')
| -rw-r--r-- | git/test/db/__init__.py | 4 | ||||
| -rw-r--r-- | git/test/db/base.py | 626 | ||||
| -rw-r--r-- | git/test/db/cmd/__init__.py | 4 | ||||
| -rw-r--r-- | git/test/db/cmd/test_base.py | 32 | ||||
| -rw-r--r-- | git/test/db/lib.py | 246 | ||||
| -rw-r--r-- | git/test/db/py/__init__.py | 4 | ||||
| -rw-r--r-- | git/test/db/py/test_base.py | 16 | ||||
| -rw-r--r-- | git/test/db/py/test_git.py | 51 | ||||
| -rw-r--r-- | git/test/db/py/test_loose.py | 36 | ||||
| -rw-r--r-- | git/test/db/py/test_mem.py | 30 | ||||
| -rw-r--r-- | git/test/db/py/test_pack.py | 76 | ||||
| -rw-r--r-- | git/test/db/py/test_ref.py | 62 | ||||
| -rw-r--r-- | git/test/db/test_base.py | 20 |
13 files changed, 1207 insertions, 0 deletions
diff --git a/git/test/db/__init__.py b/git/test/db/__init__.py new file mode 100644 index 00000000..8a681e42 --- /dev/null +++ b/git/test/db/__init__.py @@ -0,0 +1,4 @@ +# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors +# +# This module is part of GitDB and is released under +# the New BSD License: http://www.opensource.org/licenses/bsd-license.php diff --git a/git/test/db/base.py b/git/test/db/base.py new file mode 100644 index 00000000..5291ba03 --- /dev/null +++ b/git/test/db/base.py @@ -0,0 +1,626 @@ +# test_repo.py +# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors +# +# This module is part of GitPython and is released under +# the BSD License: http://www.opensource.org/licenses/bsd-license.php +from lib import TestDBBase +from git.test.lib import * +from git.cmd import Git +from git.objects import * +from git.exc import * +from git.index import * +from git.refs import * +from git.util import join_path_native +from git.exc import BadObject +from git.util import hex_to_bin, bin_to_hex + +import os, sys +import tempfile +import shutil +from cStringIO import StringIO + +from git.db.compat import RepoCompatibilityInterface + + +class RepoGlobalsItemDeletorMetaCls(GlobalsItemDeletorMetaCls): + ModuleToDelete = 'RepoBase' + + +class RepoBase(TestDBBase): + """Basic test for everything a fully implemented repository should support""" + __metaclass__ = RepoGlobalsItemDeletorMetaCls + + def test_new_should_raise_on_invalid_repo_location(self): + self.failUnlessRaises(InvalidGitRepositoryError, self.RepoCls, tempfile.gettempdir()) + + def test_new_should_raise_on_non_existant_path(self): + self.failUnlessRaises(NoSuchPathError, self.RepoCls, "repos/foobar") + + def test_repo_creation_from_different_paths(self): + r_from_gitdir = self.RepoCls(self.rorepo.git_dir) + assert r_from_gitdir.git_dir == self.rorepo.git_dir + assert r_from_gitdir.git_dir.endswith('.git') + assert not self.rorepo.git.working_dir.endswith('.git') + assert r_from_gitdir.git.working_dir == self.rorepo.git.working_dir + + def test_description(self): + txt = "Test repository" + self.rorepo.description = txt + assert_equal(self.rorepo.description, txt) + + def test_heads_should_return_array_of_head_objects(self): + for head in self.rorepo.heads: + assert_equal(Head, head.__class__) + + def test_heads_should_populate_head_data(self): + for head in self.rorepo.heads: + assert head.name + assert isinstance(head.commit,Commit) + # END for each head + + assert isinstance(self.rorepo.heads.master, Head) + assert isinstance(self.rorepo.heads['master'], Head) + + def test_tree_from_revision(self): + tree = self.rorepo.tree('0.1.6') + assert len(tree.hexsha) == 40 + assert tree.type == "tree" + assert self.rorepo.tree(tree) == tree + + # try from invalid revision that does not exist + self.failUnlessRaises(BadObject, self.rorepo.tree, 'hello world') + + def test_commit_from_revision(self): + commit = self.rorepo.commit('0.1.4') + assert commit.type == 'commit' + assert self.rorepo.commit(commit) == commit + + def test_commits(self): + mc = 10 + commits = list(self.rorepo.iter_commits('0.1.6', max_count=mc)) + assert len(commits) == mc + + c = commits[0] + assert_equal('9a4b1d4d11eee3c5362a4152216376e634bd14cf', c.hexsha) + assert_equal(["c76852d0bff115720af3f27acdb084c59361e5f6"], [p.hexsha for p in c.parents]) + assert_equal("ce41fc29549042f1aa09cc03174896cf23f112e3", c.tree.hexsha) + assert_equal("Michael Trier", c.author.name) + assert_equal("mtrier@gmail.com", c.author.email) + assert_equal(1232829715, c.authored_date) + assert_equal(5*3600, c.author_tz_offset) + assert_equal("Michael Trier", c.committer.name) + assert_equal("mtrier@gmail.com", c.committer.email) + assert_equal(1232829715, c.committed_date) + assert_equal(5*3600, c.committer_tz_offset) + assert_equal("Bumped version 0.1.6\n", c.message) + + c = commits[1] + assert isinstance(c.parents, tuple) + + def test_trees(self): + mc = 30 + num_trees = 0 + for tree in self.rorepo.iter_trees('0.1.5', max_count=mc): + num_trees += 1 + assert isinstance(tree, Tree) + # END for each tree + assert num_trees == mc + + + def _assert_empty_repo(self, repo): + # test all kinds of things with an empty, freshly initialized repo. + # It should throw good errors + + # entries should be empty + assert len(repo.index.entries) == 0 + + # head is accessible + assert repo.head + assert repo.head.ref + assert not repo.head.is_valid() + + # we can change the head to some other ref + head_ref = Head.from_path(repo, Head.to_full_path('some_head')) + assert not head_ref.is_valid() + repo.head.ref = head_ref + + # is_dirty can handle all kwargs + for args in ((1, 0, 0), (0, 1, 0), (0, 0, 1)): + assert not repo.is_dirty(*args) + # END for each arg + + # we can add a file to the index ( if we are not bare ) + if not repo.bare: + pass + # END test repos with working tree + + + def test_init(self): + prev_cwd = os.getcwd() + os.chdir(tempfile.gettempdir()) + git_dir_rela = "repos/foo/bar.git" + del_dir_abs = os.path.abspath("repos") + git_dir_abs = os.path.abspath(git_dir_rela) + try: + # with specific path + for path in (git_dir_rela, git_dir_abs): + r = self.RepoCls.init(path=path, bare=True) + assert isinstance(r, self.RepoCls) + assert r.bare == True + assert os.path.isdir(r.git_dir) + + self._assert_empty_repo(r) + + # test clone + clone_path = path + "_clone" + rc = r.clone(clone_path) + self._assert_empty_repo(rc) + + + try: + shutil.rmtree(clone_path) + except OSError: + # when relative paths are used, the clone may actually be inside + # of the parent directory + pass + # END exception handling + + # try again, this time with the absolute version + rc = self.RepoCls.clone_from(r.git_dir, clone_path) + self._assert_empty_repo(rc) + + shutil.rmtree(git_dir_abs) + try: + shutil.rmtree(clone_path) + except OSError: + # when relative paths are used, the clone may actually be inside + # of the parent directory + pass + # END exception handling + + # END for each path + + os.makedirs(git_dir_rela) + os.chdir(git_dir_rela) + r = self.RepoCls.init(bare=False) + r.bare == False + + self._assert_empty_repo(r) + finally: + try: + shutil.rmtree(del_dir_abs) + except OSError: + pass + os.chdir(prev_cwd) + # END restore previous state + + def test_bare_property(self): + if isinstance(self.rorepo, RepoCompatibilityInterface): + self.rorepo.bare + #END handle compatability + self.rorepo.is_bare + + def test_daemon_export(self): + orig_val = self.rorepo.daemon_export + self.rorepo.daemon_export = not orig_val + assert self.rorepo.daemon_export == ( not orig_val ) + self.rorepo.daemon_export = orig_val + assert self.rorepo.daemon_export == orig_val + + def test_alternates(self): + cur_alternates = self.rorepo.alternates + # empty alternates + self.rorepo.alternates = [] + assert self.rorepo.alternates == [] + alts = [ "other/location", "this/location" ] + self.rorepo.alternates = alts + assert alts == self.rorepo.alternates + self.rorepo.alternates = cur_alternates + + def test_repr(self): + assert_equal('<git.Repo "%s">' % rorepo_dir(), repr(self.rorepo)) + + def test_is_dirty_with_bare_repository(self): + orig_value = self.rorepo._bare + self.rorepo._bare = True + assert_false(self.rorepo.is_dirty()) + self.rorepo._bare = orig_value + + def test_is_dirty(self): + self.rorepo._bare = False + for index in (0,1): + for working_tree in (0,1): + for untracked_files in (0,1): + assert self.rorepo.is_dirty(index, working_tree, untracked_files) in (True, False) + # END untracked files + # END working tree + # END index + orig_val = self.rorepo._bare + self.rorepo._bare = True + assert self.rorepo.is_dirty() == False + self.rorepo._bare = orig_val + + def test_head(self): + assert self.rorepo.head.reference.object == self.rorepo.active_branch.object + + def test_index(self): + index = self.rorepo.index + assert isinstance(index, IndexFile) + + def test_tag(self): + assert self.rorepo.tag('0.1.5').commit + assert self.rorepo.tag('refs/tags/0.1.5').commit + + def test_archive(self): + tmpfile = os.tmpfile() + self.rorepo.archive(tmpfile, '0.1.5') + assert tmpfile.tell() + + @patch_object(Git, '_call_process') + def test_should_display_blame_information(self, git): + git.return_value = fixture('blame') + b = self.rorepo.blame( 'master', 'lib/git.py') + assert_equal(13, len(b)) + assert_equal( 2, len(b[0]) ) + # assert_equal(25, reduce(lambda acc, x: acc + len(x[-1]), b)) + assert_equal(hash(b[0][0]), hash(b[9][0])) + c = b[0][0] + assert_true(git.called) + assert_equal(git.call_args, (('blame', 'master', '--', 'lib/git.py'), {'p': True})) + + assert_equal('634396b2f541a9f2d58b00be1a07f0c358b999b3', c.hexsha) + assert_equal('Tom Preston-Werner', c.author.name) + assert_equal('tom@mojombo.com', c.author.email) + assert_equal(1191997100, c.authored_date) + assert_equal('Tom Preston-Werner', c.committer.name) + assert_equal('tom@mojombo.com', c.committer.email) + assert_equal(1191997100, c.committed_date) + assert_equal('initial grit setup', c.message) + + # test the 'lines per commit' entries + tlist = b[0][1] + assert_true( tlist ) + assert_true( isinstance( tlist[0], basestring ) ) + assert_true( len( tlist ) < sum( len(t) for t in tlist ) ) # test for single-char bug + + def test_untracked_files(self): + base = self.rorepo.working_tree_dir + files = ( join_path_native(base, "__test_myfile"), + join_path_native(base, "__test_other_file") ) + num_recently_untracked = 0 + try: + for fpath in files: + fd = open(fpath,"wb") + fd.close() + # END for each filename + untracked_files = self.rorepo.untracked_files + num_recently_untracked = len(untracked_files) + + # assure we have all names - they are relative to the git-dir + num_test_untracked = 0 + for utfile in untracked_files: + num_test_untracked += join_path_native(base, utfile) in files + assert len(files) == num_test_untracked + finally: + for fpath in files: + if os.path.isfile(fpath): + os.remove(fpath) + # END handle files + + assert len(self.rorepo.untracked_files) == (num_recently_untracked - len(files)) + + def test_config_reader(self): + reader = self.rorepo.config_reader() # all config files + assert reader.read_only + reader = self.rorepo.config_reader("repository") # single config file + assert reader.read_only + + def test_config_writer(self): + for config_level in self.rorepo.config_level: + try: + writer = self.rorepo.config_writer(config_level) + assert not writer.read_only + except IOError: + # its okay not to get a writer for some configuration files if we + # have no permissions + pass + # END for each config level + + def test_creation_deletion(self): + # just a very quick test to assure it generally works. There are + # specialized cases in the test_refs module + head = self.rorepo.create_head("new_head", "HEAD~1") + self.rorepo.delete_head(head) + + tag = self.rorepo.create_tag("new_tag", "HEAD~2") + self.rorepo.delete_tag(tag) + self.rorepo.config_writer() + remote = self.rorepo.create_remote("new_remote", "git@server:repo.git") + self.rorepo.delete_remote(remote) + + def test_comparison_and_hash(self): + # this is only a preliminary test, more testing done in test_index + assert self.rorepo == self.rorepo and not (self.rorepo != self.rorepo) + assert len(set((self.rorepo, self.rorepo))) == 1 + + def test_git_cmd(self): + # test CatFileContentStream, just to be very sure we have no fencepost errors + # last \n is the terminating newline that it expects + l1 = "0123456789\n" + l2 = "abcdefghijklmnopqrstxy\n" + l3 = "z\n" + d = "%s%s%s\n" % (l1, l2, l3) + + l1p = l1[:5] + + # full size + # size is without terminating newline + def mkfull(): + return Git.CatFileContentStream(len(d)-1, StringIO(d)) + + ts = 5 + def mktiny(): + return Git.CatFileContentStream(ts, StringIO(d)) + + # readlines no limit + s = mkfull() + lines = s.readlines() + assert len(lines) == 3 and lines[-1].endswith('\n') + assert s._stream.tell() == len(d) # must have scrubbed to the end + + # realines line limit + s = mkfull() + lines = s.readlines(5) + assert len(lines) == 1 + + # readlines on tiny sections + s = mktiny() + lines = s.readlines() + assert len(lines) == 1 and lines[0] == l1p + assert s._stream.tell() == ts+1 + + # readline no limit + s = mkfull() + assert s.readline() == l1 + assert s.readline() == l2 + assert s.readline() == l3 + assert s.readline() == '' + assert s._stream.tell() == len(d) + + # readline limit + s = mkfull() + assert s.readline(5) == l1p + assert s.readline() == l1[5:] + + # readline on tiny section + s = mktiny() + assert s.readline() == l1p + assert s.readline() == '' + assert s._stream.tell() == ts+1 + + # read no limit + s = mkfull() + assert s.read() == d[:-1] + assert s.read() == '' + assert s._stream.tell() == len(d) + + # read limit + s = mkfull() + assert s.read(5) == l1p + assert s.read(6) == l1[5:] + assert s._stream.tell() == 5 + 6 # its not yet done + + # read tiny + s = mktiny() + assert s.read(2) == l1[:2] + assert s._stream.tell() == 2 + assert s.read() == l1[2:ts] + assert s._stream.tell() == ts+1 + + def _assert_rev_parse_types(self, name, rev_obj): + rev_parse = self.rorepo.rev_parse + + if rev_obj.type == 'tag': + rev_obj = rev_obj.object + + # tree and blob type + obj = rev_parse(name + '^{tree}') + assert obj == rev_obj.tree + + obj = rev_parse(name + ':CHANGES') + assert obj.type == 'blob' and obj.path == 'CHANGES' + assert rev_obj.tree['CHANGES'] == obj + + + def _assert_rev_parse(self, name): + """tries multiple different rev-parse syntaxes with the given name + :return: parsed object""" + rev_parse = self.rorepo.rev_parse + orig_obj = rev_parse(name) + if orig_obj.type == 'tag': + obj = orig_obj.object + else: + obj = orig_obj + # END deref tags by default + + # try history + rev = name + "~" + obj2 = rev_parse(rev) + assert obj2 == obj.parents[0] + self._assert_rev_parse_types(rev, obj2) + + # history with number + ni = 11 + history = [obj.parents[0]] + for pn in range(ni): + history.append(history[-1].parents[0]) + # END get given amount of commits + + for pn in range(11): + rev = name + "~%i" % (pn+1) + obj2 = rev_parse(rev) + assert obj2 == history[pn] + self._assert_rev_parse_types(rev, obj2) + # END history check + + # parent ( default ) + rev = name + "^" + obj2 = rev_parse(rev) + assert obj2 == obj.parents[0] + self._assert_rev_parse_types(rev, obj2) + + # parent with number + for pn, parent in enumerate(obj.parents): + rev = name + "^%i" % (pn+1) + assert rev_parse(rev) == parent + self._assert_rev_parse_types(rev, parent) + # END for each parent + + return orig_obj + + @with_rw_repo('HEAD', bare=False) + def test_rw_rev_parse(self, rwrepo): + # verify it does not confuse branches with hexsha ids + ahead = rwrepo.create_head('aaaaaaaa') + assert(rwrepo.rev_parse(str(ahead)) == ahead.commit) + + def test_rev_parse(self): + rev_parse = self.rorepo.rev_parse + + # try special case: This one failed at some point, make sure its fixed + assert rev_parse("33ebe").hexsha == "33ebe7acec14b25c5f84f35a664803fcab2f7781" + + # start from reference + num_resolved = 0 + + for ref in Reference.iter_items(self.rorepo): + path_tokens = ref.path.split("/") + for pt in range(len(path_tokens)): + path_section = '/'.join(path_tokens[-(pt+1):]) + try: + obj = self._assert_rev_parse(path_section) + assert obj.type == ref.object.type + num_resolved += 1 + except BadObject: + print "failed on %s" % path_section + # is fine, in case we have something like 112, which belongs to remotes/rname/merge-requests/112 + pass + # END exception handling + # END for each token + # END for each reference + assert num_resolved + + # it works with tags ! + tag = self._assert_rev_parse('0.1.4') + assert tag.type == 'tag' + + # try full sha directly ( including type conversion ) + assert tag.object == rev_parse(tag.object.hexsha) + self._assert_rev_parse_types(tag.object.hexsha, tag.object) + + + # multiple tree types result in the same tree: HEAD^{tree}^{tree}:CHANGES + rev = '0.1.4^{tree}^{tree}' + assert rev_parse(rev) == tag.object.tree + assert rev_parse(rev+':CHANGES') == tag.object.tree['CHANGES'] + + + # try to get parents from first revision - it should fail as no such revision + # exists + first_rev = "33ebe7acec14b25c5f84f35a664803fcab2f7781" + commit = rev_parse(first_rev) + assert len(commit.parents) == 0 + assert commit.hexsha == first_rev + self.failUnlessRaises(BadObject, rev_parse, first_rev+"~") + self.failUnlessRaises(BadObject, rev_parse, first_rev+"^") + + # short SHA1 + commit2 = rev_parse(first_rev[:20]) + assert commit2 == commit + commit2 = rev_parse(first_rev[:5]) + assert commit2 == commit + + + # todo: dereference tag into a blob 0.1.7^{blob} - quite a special one + # needs a tag which points to a blob + + + # ref^0 returns commit being pointed to, same with ref~0, and ^{} + tag = rev_parse('0.1.4') + for token in (('~0', '^0', '^{}')): + assert tag.object == rev_parse('0.1.4%s' % token) + # END handle multiple tokens + + # try partial parsing + max_items = 40 + for i, binsha in enumerate(self.rorepo.odb.sha_iter()): + assert rev_parse(bin_to_hex(binsha)[:8-(i%2)]).binsha == binsha + if i > max_items: + # this is rather slow currently, as rev_parse returns an object + # which requires accessing packs, it has some additional overhead + break + # END for each binsha in repo + + # missing closing brace commit^{tree + self.failUnlessRaises(ValueError, rev_parse, '0.1.4^{tree') + + # missing starting brace + self.failUnlessRaises(ValueError, rev_parse, '0.1.4^tree}') + + # REVLOG + ####### + head = self.rorepo.head + + # need to specify a ref when using the @ syntax + self.failUnlessRaises(BadObject, rev_parse, "%s@{0}" % head.commit.hexsha) + + # uses HEAD.ref by default + assert rev_parse('@{0}') == head.commit + if not head.is_detached: + refspec = '%s@{0}' % head.ref.name + assert rev_parse(refspec) == head.ref.commit + # all additional specs work as well + assert rev_parse(refspec+"^{tree}") == head.commit.tree + assert rev_parse(refspec+":CHANGES").type == 'blob' + #END operate on non-detached head + + # the most recent previous position of the currently checked out branch + + try: + assert rev_parse('@{1}') != head.commit + except IndexError: + # on new checkouts, there isn't even a single past branch position + # in the log + pass + #END handle fresh checkouts + + # position doesn't exist + self.failUnlessRaises(IndexError, rev_parse, '@{10000}') + + # currently, nothing more is supported + self.failUnlessRaises(NotImplementedError, rev_parse, "@{1 week ago}") + + def test_submodules(self): + assert len(self.rorepo.submodules) == 1 # non-recursive + # in previous configurations, we had recursive repositories so this would compare to 2 + # now there is only one left, as gitdb was merged + assert len(list(self.rorepo.iter_submodules())) == 1 + + assert isinstance(self.rorepo.submodule("git/ext/async"), Submodule) + self.failUnlessRaises(ValueError, self.rorepo.submodule, "doesn't exist") + + @with_rw_repo('HEAD', bare=False) + def test_submodule_update(self, rwrepo): + # fails in bare mode + rwrepo._bare = True + self.failUnlessRaises(InvalidGitRepositoryError, rwrepo.submodule_update) + rwrepo._bare = False + + # test create submodule + sm = rwrepo.submodules[0] + sm = rwrepo.create_submodule("my_new_sub", "some_path", join_path_native(self.rorepo.working_tree_dir, sm.path)) + assert isinstance(sm, Submodule) + + # note: the rest of this functionality is tested in test_submodule + + diff --git a/git/test/db/cmd/__init__.py b/git/test/db/cmd/__init__.py new file mode 100644 index 00000000..8a681e42 --- /dev/null +++ b/git/test/db/cmd/__init__.py @@ -0,0 +1,4 @@ +# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors +# +# This module is part of GitDB and is released under +# the New BSD License: http://www.opensource.org/licenses/bsd-license.php diff --git a/git/test/db/cmd/test_base.py b/git/test/db/cmd/test_base.py new file mode 100644 index 00000000..959be16b --- /dev/null +++ b/git/test/db/cmd/test_base.py @@ -0,0 +1,32 @@ +# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors +# +# This module is part of GitDB and is released under +# the New BSD License: http://www.opensource.org/licenses/bsd-license.php +from git.test.lib import rorepo_dir +from git.test.db.base import RepoBase + +from git.util import bin_to_hex +from git.exc import BadObject + +from git.db.complex import CmdCompatibilityGitDB +from git.db.cmd.base import * + +class TestBase(RepoBase): + RepoCls = CmdCompatibilityGitDB + + def test_basics(self): + gdb = self.rorepo + + # partial to complete - works with everything + hexsha = bin_to_hex(gdb.partial_to_complete_sha_hex("0.1.6")) + assert len(hexsha) == 40 + + assert bin_to_hex(gdb.partial_to_complete_sha_hex(hexsha[:20])) == hexsha + + # fails with BadObject + for invalid_rev in ("0000", "bad/ref", "super bad"): + self.failUnlessRaises(BadObject, gdb.partial_to_complete_sha_hex, invalid_rev) + + def test_fetch_info(self): + self.failUnlessRaises(ValueError, CmdFetchInfo._from_line, self.rorepo, "nonsense", '') + self.failUnlessRaises(ValueError, CmdFetchInfo._from_line, self.rorepo, "? [up to date] 0.1.7RC -> origin/0.1.7RC", '') diff --git a/git/test/db/lib.py b/git/test/db/lib.py new file mode 100644 index 00000000..499ca252 --- /dev/null +++ b/git/test/db/lib.py @@ -0,0 +1,246 @@ +# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors +# +# This module is part of GitDB and is released under +# the New BSD License: http://www.opensource.org/licenses/bsd-license.php +"""Base classes for object db testing""" +from git.test.lib import ( + with_rw_directory, + with_packs_rw, + ZippedStoreShaWriter, + fixture_path, + TestBase, + rorepo_dir, + ) + +from git.stream import Sha1Writer +from git.base import ( + IStream, + OStream, + OInfo + ) + +from git.exc import BadObject +from git.typ import str_blob_type + +from async import IteratorReader +from cStringIO import StringIO +from struct import pack + + +__all__ = ('TestDBBase', 'with_rw_directory', 'with_packs_rw', 'fixture_path') + +class TestDBBase(TestBase): + """Base Class providing default functionality to all tests such as: + + - Utility functions provided by the TestCase base of the unittest method such as:: + self.fail("todo") + self.failUnlessRaises(...) + + - Class level repository which is considered read-only as it is shared among + all test cases in your type. + Access it using:: + self.rorepo # 'ro' stands for read-only + + The rorepo is in fact your current project's git repo. If you refer to specific + shas for your objects, be sure you choose some that are part of the immutable portion + of the project history ( to assure tests don't fail for others ). + + Derived types can override the default repository type to create a different + read-only repo, allowing to test their specific type + """ + + # data + two_lines = "1234\nhello world" + all_data = (two_lines, ) + + #{ Configuration + # The repository type to instantiate. It takes at least a path to operate upon + # during instantiation. + RepoCls = None + + # if True, a read-only repo will be provided and RepoCls must be set. + # Otherwise it may remain unset + needs_ro_repo = True + #} END configuration + + @classmethod + def setUpAll(cls): + """ + Dynamically add a read-only repository to our actual type. This way + each test type has its own repository + """ + if cls.needs_ro_repo: + assert cls.RepoCls is not None, "RepoCls class member must be set" + cls.rorepo = cls.RepoCls(rorepo_dir()) + #END handle rorepo + + def _assert_object_writing_simple(self, db): + # write a bunch of objects and query their streams and info + null_objs = db.size() + ni = 250 + for i in xrange(ni): + data = pack(">L", i) + istream = IStream(str_blob_type, len(data), StringIO(data)) + new_istream = db.store(istream) + assert new_istream is istream + assert db.has_object(istream.binsha) + + info = db.info(istream.binsha) + assert isinstance(info, OInfo) + assert info.type == istream.type and info.size == istream.size + + stream = db.stream(istream.binsha) + assert isinstance(stream, OStream) + assert stream.binsha == info.binsha and stream.type == info.type + assert stream.read() == data + # END for each item + + assert db.size() == null_objs + ni + shas = list(db.sha_iter()) + assert len(shas) == db.size() + assert len(shas[0]) == 20 + + + def _assert_object_writing(self, db): + """General tests to verify object writing, compatible to ObjectDBW + :note: requires write access to the database""" + # start in 'dry-run' mode, using a simple sha1 writer + ostreams = (ZippedStoreShaWriter, None) + for ostreamcls in ostreams: + for data in self.all_data: + dry_run = ostreamcls is not None + ostream = None + if ostreamcls is not None: + ostream = ostreamcls() + assert isinstance(ostream, Sha1Writer) + # END create ostream + + prev_ostream = db.set_ostream(ostream) + assert type(prev_ostream) in ostreams or prev_ostream in ostreams + + istream = IStream(str_blob_type, len(data), StringIO(data)) + + # store returns same istream instance, with new sha set + my_istream = db.store(istream) + sha = istream.binsha + assert my_istream is istream + assert db.has_object(sha) != dry_run + assert len(sha) == 20 + + # verify data - the slow way, we want to run code + if not dry_run: + info = db.info(sha) + assert str_blob_type == info.type + assert info.size == len(data) + + ostream = db.stream(sha) + assert ostream.read() == data + assert ostream.type == str_blob_type + assert ostream.size == len(data) + else: + self.failUnlessRaises(BadObject, db.info, sha) + self.failUnlessRaises(BadObject, db.stream, sha) + + # DIRECT STREAM COPY + # our data hase been written in object format to the StringIO + # we pasesd as output stream. No physical database representation + # was created. + # Test direct stream copy of object streams, the result must be + # identical to what we fed in + ostream.seek(0) + istream.stream = ostream + assert istream.binsha is not None + prev_sha = istream.binsha + + db.set_ostream(ZippedStoreShaWriter()) + db.store(istream) + assert istream.binsha == prev_sha + new_ostream = db.ostream() + + # note: only works as long our store write uses the same compression + # level, which is zip_best + assert ostream.getvalue() == new_ostream.getvalue() + # END for each data set + # END for each dry_run mode + + def _assert_object_writing_async(self, db): + """Test generic object writing using asynchronous access""" + ni = 5000 + def istream_generator(offset=0, ni=ni): + for data_src in xrange(ni): + data = str(data_src + offset) + yield IStream(str_blob_type, len(data), StringIO(data)) + # END for each item + # END generator utility + + # for now, we are very trusty here as we expect it to work if it worked + # in the single-stream case + + # write objects + reader = IteratorReader(istream_generator()) + istream_reader = db.store_async(reader) + istreams = istream_reader.read() # read all + assert istream_reader.task().error() is None + assert len(istreams) == ni + + for stream in istreams: + assert stream.error is None + assert len(stream.binsha) == 20 + assert isinstance(stream, IStream) + # END assert each stream + + # test has-object-async - we must have all previously added ones + reader = IteratorReader( istream.binsha for istream in istreams ) + hasobject_reader = db.has_object_async(reader) + count = 0 + for sha, has_object in hasobject_reader: + assert has_object + count += 1 + # END for each sha + assert count == ni + + # read the objects we have just written + reader = IteratorReader( istream.binsha for istream in istreams ) + ostream_reader = db.stream_async(reader) + + # read items individually to prevent hitting possible sys-limits + count = 0 + for ostream in ostream_reader: + assert isinstance(ostream, OStream) + count += 1 + # END for each ostream + assert ostream_reader.task().error() is None + assert count == ni + + # get info about our items + reader = IteratorReader( istream.binsha for istream in istreams ) + info_reader = db.info_async(reader) + + count = 0 + for oinfo in info_reader: + assert isinstance(oinfo, OInfo) + count += 1 + # END for each oinfo instance + assert count == ni + + + # combined read-write using a converter + # add 2500 items, and obtain their output streams + nni = 2500 + reader = IteratorReader(istream_generator(offset=ni, ni=nni)) + istream_to_sha = lambda istreams: [ istream.binsha for istream in istreams ] + + istream_reader = db.store_async(reader) + istream_reader.set_post_cb(istream_to_sha) + + ostream_reader = db.stream_async(istream_reader) + + count = 0 + # read it individually, otherwise we might run into the ulimit + for ostream in ostream_reader: + assert isinstance(ostream, OStream) + count += 1 + # END for each ostream + assert count == nni + + diff --git a/git/test/db/py/__init__.py b/git/test/db/py/__init__.py new file mode 100644 index 00000000..8a681e42 --- /dev/null +++ b/git/test/db/py/__init__.py @@ -0,0 +1,4 @@ +# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors +# +# This module is part of GitDB and is released under +# the New BSD License: http://www.opensource.org/licenses/bsd-license.php diff --git a/git/test/db/py/test_base.py b/git/test/db/py/test_base.py new file mode 100644 index 00000000..6b06bbe9 --- /dev/null +++ b/git/test/db/py/test_base.py @@ -0,0 +1,16 @@ +# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors +# +# This module is part of GitDB and is released under +# the New BSD License: http://www.opensource.org/licenses/bsd-license.php +from git.test.lib import rorepo_dir +from git.test.db.base import RepoBase + +from git.db.complex import PureCompatibilityGitDB + +class TestPyDBBase(RepoBase): + + RepoCls = PureCompatibilityGitDB + + def test_basics(self): + pass + diff --git a/git/test/db/py/test_git.py b/git/test/db/py/test_git.py new file mode 100644 index 00000000..ecaa5c8f --- /dev/null +++ b/git/test/db/py/test_git.py @@ -0,0 +1,51 @@ +# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors +# +# This module is part of GitDB and is released under +# the New BSD License: http://www.opensource.org/licenses/bsd-license.php +from git.test.lib import rorepo_dir +from git.test.db.lib import TestDBBase, with_rw_directory +from git.exc import BadObject +from git.db.py.complex import PureGitODB +from git.base import OStream, OInfo +from git.util import hex_to_bin, bin_to_hex + +import os + +class TestGitDB(TestDBBase): + needs_ro_repo = False + + def test_reading(self): + gdb = PureGitODB(os.path.join(rorepo_dir(), 'objects')) + + # we have packs and loose objects, alternates doesn't necessarily exist + assert 1 < len(gdb.databases()) < 4 + + # access should be possible + git_sha = hex_to_bin("5aebcd5cb3340fb31776941d7e4d518a712a8655") + assert isinstance(gdb.info(git_sha), OInfo) + assert isinstance(gdb.stream(git_sha), OStream) + assert gdb.size() > 200 + sha_list = list(gdb.sha_iter()) + assert len(sha_list) == gdb.size() + + + # This is actually a test for compound functionality, but it doesn't + # have a separate test module + # test partial shas + # this one as uneven and quite short + assert gdb.partial_to_complete_sha_hex('5aebcd') == hex_to_bin("5aebcd5cb3340fb31776941d7e4d518a712a8655") + + # mix even/uneven hexshas + for i, binsha in enumerate(sha_list[:50]): + assert gdb.partial_to_complete_sha_hex(bin_to_hex(binsha)[:8-(i%2)]) == binsha + # END for each sha + + self.failUnlessRaises(BadObject, gdb.partial_to_complete_sha_hex, "0000") + + @with_rw_directory + def test_writing(self, path): + gdb = PureGitODB(path) + + # its possible to write objects + self._assert_object_writing(gdb) + self._assert_object_writing_async(gdb) diff --git a/git/test/db/py/test_loose.py b/git/test/db/py/test_loose.py new file mode 100644 index 00000000..0c9b4831 --- /dev/null +++ b/git/test/db/py/test_loose.py @@ -0,0 +1,36 @@ +# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors +# +# This module is part of GitDB and is released under +# the New BSD License: http://www.opensource.org/licenses/bsd-license.php +from git.test.db.lib import TestDBBase, with_rw_directory +from git.db.py.loose import PureLooseObjectODB +from git.exc import BadObject +from git.util import bin_to_hex + +class TestLooseDB(TestDBBase): + + needs_ro_repo = False + + @with_rw_directory + def test_basics(self, path): + ldb = PureLooseObjectODB(path) + + # write data + self._assert_object_writing(ldb) + self._assert_object_writing_async(ldb) + + # verify sha iteration and size + shas = list(ldb.sha_iter()) + assert shas and len(shas[0]) == 20 + + assert len(shas) == ldb.size() + + # verify find short object + long_sha = bin_to_hex(shas[-1]) + for short_sha in (long_sha[:20], long_sha[:5]): + assert bin_to_hex(ldb.partial_to_complete_sha_hex(short_sha)) == long_sha + # END for each sha + + self.failUnlessRaises(BadObject, ldb.partial_to_complete_sha_hex, '0000') + # raises if no object could be foudn + diff --git a/git/test/db/py/test_mem.py b/git/test/db/py/test_mem.py new file mode 100644 index 00000000..bc98dc56 --- /dev/null +++ b/git/test/db/py/test_mem.py @@ -0,0 +1,30 @@ +# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors +# +# This module is part of GitDB and is released under +# the New BSD License: http://www.opensource.org/licenses/bsd-license.php +from git.test.db.lib import TestDBBase, with_rw_directory +from git.db.py.mem import PureMemoryDB +from git.db.py.loose import PureLooseObjectODB + +class TestPureMemoryDB(TestDBBase): + + needs_ro_repo = False + + @with_rw_directory + def test_writing(self, path): + mdb = PureMemoryDB() + + # write data + self._assert_object_writing_simple(mdb) + + # test stream copy + ldb = PureLooseObjectODB(path) + assert ldb.size() == 0 + num_streams_copied = mdb.stream_copy(mdb.sha_iter(), ldb) + assert num_streams_copied == mdb.size() + + assert ldb.size() == mdb.size() + for sha in mdb.sha_iter(): + assert ldb.has_object(sha) + assert ldb.stream(sha).read() == mdb.stream(sha).read() + # END verify objects where copied and are equal diff --git a/git/test/db/py/test_pack.py b/git/test/db/py/test_pack.py new file mode 100644 index 00000000..5043f446 --- /dev/null +++ b/git/test/db/py/test_pack.py @@ -0,0 +1,76 @@ +# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors +# +# This module is part of GitDB and is released under +# the New BSD License: http://www.opensource.org/licenses/bsd-license.php +from git.test.db.lib import TestDBBase, with_packs_rw + +from git.db.py.pack import PurePackedODB +from git.test.lib import fixture_path + +from git.exc import BadObject, AmbiguousObjectName + +import os +import random + +class TestPackDB(TestDBBase): + + needs_ro_repo = False + + @with_packs_rw + def test_writing(self, path): + pdb = PurePackedODB(path) + + # on demand, we init our pack cache + num_packs = len(pdb.entities()) + assert num_packs + assert pdb._st_mtime != 0 + + # test pack directory changed: + # packs removed - rename a file, should affect the glob + pack_path = pdb.entities()[0].pack().path() + new_pack_path = pack_path + "renamed" + os.rename(pack_path, new_pack_path) + + pdb.update_cache(force=True) + assert len(pdb.entities()) == num_packs - 1 + + # packs added + os.rename(new_pack_path, pack_path) + pdb.update_cache(force=True) + assert len(pdb.entities()) == num_packs + + # bang on the cache + # access the Entities directly, as there is no iteration interface + # yet ( or required for now ) + sha_list = list(pdb.sha_iter()) + assert len(sha_list) == pdb.size() + + # hit all packs in random order + random.shuffle(sha_list) + + for sha in sha_list: + info = pdb.info(sha) + stream = pdb.stream(sha) + # END for each sha to query + + + # test short finding - be a bit more brutal here + max_bytes = 19 + min_bytes = 2 + num_ambiguous = 0 + for i, sha in enumerate(sha_list): + short_sha = sha[:max((i % max_bytes), min_bytes)] + try: + assert pdb.partial_to_complete_sha(short_sha, len(short_sha)*2) == sha + except AmbiguousObjectName: + num_ambiguous += 1 + pass # valid, we can have short objects + # END exception handling + # END for each sha to find + + # we should have at least one ambiguous, considering the small sizes + # but in our pack, there is no ambigious ... + # assert num_ambiguous + + # non-existing + self.failUnlessRaises(BadObject, pdb.partial_to_complete_sha, "\0\0", 4) diff --git a/git/test/db/py/test_ref.py b/git/test/db/py/test_ref.py new file mode 100644 index 00000000..c5374dc9 --- /dev/null +++ b/git/test/db/py/test_ref.py @@ -0,0 +1,62 @@ +# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors +# +# This module is part of GitDB and is released under +# the New BSD License: http://www.opensource.org/licenses/bsd-license.php +from git.test.db.lib import * +from git.db.py.ref import PureReferenceDB + +from git.util import ( + NULL_BIN_SHA, + hex_to_bin + ) + +import os + +class TestPureReferenceDB(TestDBBase): + + needs_ro_repo = False + + def make_alt_file(self, alt_path, alt_list): + """Create an alternates file which contains the given alternates. + The list can be empty""" + alt_file = open(alt_path, "wb") + for alt in alt_list: + alt_file.write(alt + "\n") + alt_file.close() + + @with_rw_directory + def test_writing(self, path): + NULL_BIN_SHA = '\0' * 20 + + alt_path = os.path.join(path, 'alternates') + rdb = PureReferenceDB(alt_path) + assert len(rdb.databases()) == 0 + assert rdb.size() == 0 + assert len(list(rdb.sha_iter())) == 0 + + # try empty, non-existing + assert not rdb.has_object(NULL_BIN_SHA) + + + # setup alternate file + # add two, one is invalid + own_repo_path = fixture_path('../../../.git/objects') # use own repo + self.make_alt_file(alt_path, [own_repo_path, "invalid/path"]) + rdb.update_cache() + assert len(rdb.databases()) == 1 + + # we should now find a default revision of ours + git_sha = hex_to_bin("5aebcd5cb3340fb31776941d7e4d518a712a8655") + assert rdb.has_object(git_sha) + + # remove valid + self.make_alt_file(alt_path, ["just/one/invalid/path"]) + rdb.update_cache() + assert len(rdb.databases()) == 0 + + # add valid + self.make_alt_file(alt_path, [own_repo_path]) + rdb.update_cache() + assert len(rdb.databases()) == 1 + + diff --git a/git/test/db/test_base.py b/git/test/db/test_base.py new file mode 100644 index 00000000..2a882d0a --- /dev/null +++ b/git/test/db/test_base.py @@ -0,0 +1,20 @@ +# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors +# +# This module is part of GitDB and is released under +# the New BSD License: http://www.opensource.org/licenses/bsd-license.php +from lib import * +from git.db import RefSpec + +class TestBase(TestDBBase): + + needs_ro_repo = False + + @with_rw_directory + def test_basics(self, path): + self.failUnlessRaises(ValueError, RefSpec, None, None) + rs = RefSpec(None, "something") + assert rs.force == False + assert rs.delete_destination() + assert rs.source is None + assert rs.destination == "something" + |
