diff options
| author | Sebastian Thiel <byronimo@gmail.com> | 2010-11-21 21:47:18 +0100 |
|---|---|---|
| committer | Sebastian Thiel <byronimo@gmail.com> | 2010-11-21 22:00:45 +0100 |
| commit | 48a17c87c15b2fa7ce2e84afa09484f354d57a39 (patch) | |
| tree | 8664414605c3b8f5176c144c18e5f4b9d0715852 /test/git | |
| parent | 0b813371f5a8af95152cae109d28c7c97bfaf79f (diff) | |
| parent | 6befb28efd86556e45bb0b213bcfbfa866cac379 (diff) | |
| download | GitPython-48a17c87c15b2fa7ce2e84afa09484f354d57a39.tar.gz GitPython-48a17c87c15b2fa7ce2e84afa09484f354d57a39.zip | |
-#######->WARNING<-####### Directory structure changed, see commit message
If you use git-python as a submodule of your own project, which alters the sys.path to import it,
you will have to adjust your code to take the changed directory structure into consideration.
Previously, you would put the path
./git-python/lib
into your syspath. All modules moved two levels up, which means that the 'git-python' directory
now is a package itself. This implies that the submodule's path must change so that the root
directory is called 'git'.
Your code must now put the directory containing the submodule into the sys.path.
For example, if you previously would have the following configuration:
./ext/git-python/lib/git/__init__.py
you would now change your submodule path to the following:
./ext/git
On the latets revision, the directory structure is changed so that
the git/__init__.py file is at the following path:
./ext/git/__init__.py
To be able to import git, you need to put ./ext into your sys.path.
Diffstat (limited to 'test/git')
| -rw-r--r-- | test/git/__init__.py | 5 | ||||
| -rw-r--r-- | test/git/performance/lib.py | 78 | ||||
| -rw-r--r-- | test/git/performance/test_commit.py | 99 | ||||
| -rw-r--r-- | test/git/performance/test_odb.py | 70 | ||||
| -rw-r--r-- | test/git/performance/test_streams.py | 131 | ||||
| -rw-r--r-- | test/git/performance/test_utils.py | 174 | ||||
| -rw-r--r-- | test/git/test_actor.py | 36 | ||||
| -rw-r--r-- | test/git/test_base.py | 100 | ||||
| -rw-r--r-- | test/git/test_blob.py | 20 | ||||
| -rw-r--r-- | test/git/test_commit.py | 273 | ||||
| -rw-r--r-- | test/git/test_config.py | 102 | ||||
| -rw-r--r-- | test/git/test_db.py | 25 | ||||
| -rw-r--r-- | test/git/test_diff.py | 108 | ||||
| -rw-r--r-- | test/git/test_fun.py | 251 | ||||
| -rw-r--r-- | test/git/test_git.py | 84 | ||||
| -rw-r--r-- | test/git/test_index.py | 670 | ||||
| -rw-r--r-- | test/git/test_refs.py | 488 | ||||
| -rw-r--r-- | test/git/test_remote.py | 445 | ||||
| -rw-r--r-- | test/git/test_repo.py | 580 | ||||
| -rw-r--r-- | test/git/test_stats.py | 25 | ||||
| -rw-r--r-- | test/git/test_submodule.py | 496 | ||||
| -rw-r--r-- | test/git/test_tree.py | 141 | ||||
| -rw-r--r-- | test/git/test_util.py | 105 |
23 files changed, 0 insertions, 4506 deletions
diff --git a/test/git/__init__.py b/test/git/__init__.py deleted file mode 100644 index 757cbad1..00000000 --- a/test/git/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -# __init__.py -# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors -# -# This module is part of GitPython and is released under -# the BSD License: http://www.opensource.org/licenses/bsd-license.php diff --git a/test/git/performance/lib.py b/test/git/performance/lib.py deleted file mode 100644 index 4ac1f1da..00000000 --- a/test/git/performance/lib.py +++ /dev/null @@ -1,78 +0,0 @@ -"""Contains library functions""" -import os -from test.testlib import * -import shutil -import tempfile - -from git.db import ( - GitCmdObjectDB, - GitDB - ) - -from git import ( - Repo - ) - -#{ Invvariants -k_env_git_repo = "GIT_PYTHON_TEST_GIT_REPO_BASE" -#} END invariants - - -#{ Utilities -def resolve_or_fail(env_var): - """:return: resolved environment variable or raise EnvironmentError""" - try: - return os.environ[env_var] - except KeyError: - raise EnvironmentError("Please set the %r envrionment variable and retry" % env_var) - # END exception handling - -#} END utilities - - -#{ Base Classes - -class TestBigRepoR(TestBase): - """TestCase providing access to readonly 'big' repositories using the following - member variables: - - * gitrorepo - - * Read-Only git repository - actually the repo of git itself - - * puregitrorepo - - * As gitrepo, but uses pure python implementation - """ - - #{ Invariants - head_sha_2k = '235d521da60e4699e5bd59ac658b5b48bd76ddca' - head_sha_50 = '32347c375250fd470973a5d76185cac718955fd5' - #} END invariants - - @classmethod - def setUpAll(cls): - super(TestBigRepoR, cls).setUpAll() - repo_path = resolve_or_fail(k_env_git_repo) - cls.gitrorepo = Repo(repo_path, odbt=GitCmdObjectDB) - cls.puregitrorepo = Repo(repo_path, odbt=GitDB) - - -class TestBigRepoRW(TestBigRepoR): - """As above, but provides a big repository that we can write to. - - Provides ``self.gitrwrepo`` and ``self.puregitrwrepo``""" - - @classmethod - def setUpAll(cls): - super(TestBigRepoRW, cls).setUpAll() - dirname = tempfile.mktemp() - os.mkdir(dirname) - cls.gitrwrepo = cls.gitrorepo.clone(dirname, shared=True, bare=True, odbt=GitCmdObjectDB) - cls.puregitrwrepo = Repo(dirname, odbt=GitDB) - - @classmethod - def tearDownAll(cls): - shutil.rmtree(cls.gitrwrepo.working_dir) - -#} END base classes diff --git a/test/git/performance/test_commit.py b/test/git/performance/test_commit.py deleted file mode 100644 index 62d409fc..00000000 --- a/test/git/performance/test_commit.py +++ /dev/null @@ -1,99 +0,0 @@ -# test_performance.py -# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors -# -# This module is part of GitPython and is released under -# the BSD License: http://www.opensource.org/licenses/bsd-license.php - -from lib import * -from git import * -from gitdb import IStream -from test.git.test_commit import assert_commit_serialization -from cStringIO import StringIO -from time import time -import sys - -class TestPerformance(TestBigRepoRW): - - # ref with about 100 commits in its history - ref_100 = '0.1.6' - - def _query_commit_info(self, c): - c.author - c.authored_date - c.author_tz_offset - c.committer - c.committed_date - c.committer_tz_offset - c.message - c.parents - - def test_iteration(self): - no = 0 - nc = 0 - - # find the first commit containing the given path - always do a full - # iteration ( restricted to the path in question ), but in fact it should - # return quite a lot of commits, we just take one and hence abort the operation - - st = time() - for c in self.rorepo.iter_commits(self.ref_100): - nc += 1 - self._query_commit_info(c) - for obj in c.tree.traverse(): - obj.size - no += 1 - # END for each object - # END for each commit - elapsed_time = time() - st - print >> sys.stderr, "Traversed %i Trees and a total of %i unchached objects in %s [s] ( %f objs/s )" % (nc, no, elapsed_time, no/elapsed_time) - - def test_commit_traversal(self): - # bound to cat-file parsing performance - nc = 0 - st = time() - for c in self.gitrorepo.commit(self.head_sha_2k).traverse(branch_first=False): - nc += 1 - self._query_commit_info(c) - # END for each traversed commit - elapsed_time = time() - st - print >> sys.stderr, "Traversed %i Commits in %s [s] ( %f commits/s )" % (nc, elapsed_time, nc/elapsed_time) - - def test_commit_iteration(self): - # bound to stream parsing performance - nc = 0 - st = time() - for c in Commit.iter_items(self.gitrorepo, self.head_sha_2k): - nc += 1 - self._query_commit_info(c) - # END for each traversed commit - elapsed_time = time() - st - print >> sys.stderr, "Iterated %i Commits in %s [s] ( %f commits/s )" % (nc, elapsed_time, nc/elapsed_time) - - def test_commit_serialization(self): - assert_commit_serialization(self.gitrwrepo, self.head_sha_2k, True) - - rwrepo = self.gitrwrepo - make_object = rwrepo.odb.store - # direct serialization - deserialization can be tested afterwards - # serialization is probably limited on IO - hc = rwrepo.commit(self.head_sha_2k) - - commits = list() - nc = 5000 - st = time() - for i in xrange(nc): - cm = Commit( rwrepo, Commit.NULL_BIN_SHA, hc.tree, - hc.author, hc.authored_date, hc.author_tz_offset, - hc.committer, hc.committed_date, hc.committer_tz_offset, - str(i), parents=hc.parents, encoding=hc.encoding) - - stream = StringIO() - cm._serialize(stream) - slen = stream.tell() - stream.seek(0) - - cm.binsha = make_object(IStream(Commit.type, slen, stream)).binsha - # END commit creation - elapsed = time() - st - - print >> sys.stderr, "Serialized %i commits to loose objects in %f s ( %f commits / s )" % (nc, elapsed, nc / elapsed) diff --git a/test/git/performance/test_odb.py b/test/git/performance/test_odb.py deleted file mode 100644 index 32b70f69..00000000 --- a/test/git/performance/test_odb.py +++ /dev/null @@ -1,70 +0,0 @@ -"""Performance tests for object store""" - -from time import time -import sys -import stat - -from lib import ( - TestBigRepoR - ) - - -class TestObjDBPerformance(TestBigRepoR): - - def test_random_access(self): - results = [ ["Iterate Commits"], ["Iterate Blobs"], ["Retrieve Blob Data"] ] - for repo in (self.gitrorepo, self.puregitrorepo): - # GET COMMITS - st = time() - root_commit = repo.commit(self.head_sha_2k) - commits = list(root_commit.traverse()) - nc = len(commits) - elapsed = time() - st - - print >> sys.stderr, "%s: Retrieved %i commits from ObjectStore in %g s ( %f commits / s )" % (type(repo.odb), nc, elapsed, nc / elapsed) - results[0].append(elapsed) - - # GET TREES - # walk all trees of all commits - st = time() - blobs_per_commit = list() - nt = 0 - for commit in commits: - tree = commit.tree - blobs = list() - for item in tree.traverse(): - nt += 1 - if item.type == 'blob': - blobs.append(item) - # direct access for speed - # END while trees are there for walking - blobs_per_commit.append(blobs) - # END for each commit - elapsed = time() - st - - print >> sys.stderr, "%s: Retrieved %i objects from %i commits in %g s ( %f objects / s )" % (type(repo.odb), nt, len(commits), elapsed, nt / elapsed) - results[1].append(elapsed) - - # GET BLOBS - st = time() - nb = 0 - too_many = 15000 - data_bytes = 0 - for blob_list in blobs_per_commit: - for blob in blob_list: - data_bytes += len(blob.data_stream.read()) - # END for each blobsha - nb += len(blob_list) - if nb > too_many: - break - # END for each bloblist - elapsed = time() - st - - print >> sys.stderr, "%s: Retrieved %i blob (%i KiB) and their data in %g s ( %f blobs / s, %f KiB / s )" % (type(repo.odb), nb, data_bytes/1000, elapsed, nb / elapsed, (data_bytes / 1000) / elapsed) - results[2].append(elapsed) - # END for each repo type - - # final results - for test_name, a, b in results: - print >> sys.stderr, "%s: %f s vs %f s, pure is %f times slower" % (test_name, a, b, b / a) - # END for each result diff --git a/test/git/performance/test_streams.py b/test/git/performance/test_streams.py deleted file mode 100644 index a5811262..00000000 --- a/test/git/performance/test_streams.py +++ /dev/null @@ -1,131 +0,0 @@ -"""Performance data streaming performance""" - -from test.testlib import * -from gitdb import * -from gitdb.util import bin_to_hex - -from time import time -import os -import sys -import stat -import subprocess - -from gitdb.test.lib import make_memory_file - -from lib import ( - TestBigRepoR - ) - - -class TestObjDBPerformance(TestBigRepoR): - - large_data_size_bytes = 1000*1000*10 # some MiB should do it - moderate_data_size_bytes = 1000*1000*1 # just 1 MiB - - @with_rw_repo('HEAD', bare=True) - def test_large_data_streaming(self, rwrepo): - # TODO: This part overlaps with the same file in gitdb.test.performance.test_stream - # It should be shared if possible - ldb = LooseObjectDB(os.path.join(rwrepo.git_dir, 'objects')) - - for randomize in range(2): - desc = (randomize and 'random ') or '' - print >> sys.stderr, "Creating %s data ..." % desc - st = time() - size, stream = make_memory_file(self.large_data_size_bytes, randomize) - elapsed = time() - st - print >> sys.stderr, "Done (in %f s)" % elapsed - - # writing - due to the compression it will seem faster than it is - st = time() - binsha = ldb.store(IStream('blob', size, stream)).binsha - elapsed_add = time() - st - assert ldb.has_object(binsha) - db_file = ldb.readable_db_object_path(bin_to_hex(binsha)) - fsize_kib = os.path.getsize(db_file) / 1000 - - - size_kib = size / 1000 - print >> sys.stderr, "Added %i KiB (filesize = %i KiB) of %s data to loose odb in %f s ( %f Write KiB / s)" % (size_kib, fsize_kib, desc, elapsed_add, size_kib / elapsed_add) - - # reading all at once - st = time() - ostream = ldb.stream(binsha) - shadata = ostream.read() - elapsed_readall = time() - st - - stream.seek(0) - assert shadata == stream.getvalue() - print >> sys.stderr, "Read %i KiB of %s data at once from loose odb in %f s ( %f Read KiB / s)" % (size_kib, desc, elapsed_readall, size_kib / elapsed_readall) - - - # reading in chunks of 1 MiB - cs = 512*1000 - chunks = list() - st = time() - ostream = ldb.stream(binsha) - while True: - data = ostream.read(cs) - chunks.append(data) - if len(data) < cs: - break - # END read in chunks - elapsed_readchunks = time() - st - - stream.seek(0) - assert ''.join(chunks) == stream.getvalue() - - cs_kib = cs / 1000 - print >> sys.stderr, "Read %i KiB of %s data in %i KiB chunks from loose odb in %f s ( %f Read KiB / s)" % (size_kib, desc, cs_kib, elapsed_readchunks, size_kib / elapsed_readchunks) - - # del db file so git has something to do - os.remove(db_file) - - # VS. CGIT - ########## - # CGIT ! Can using the cgit programs be faster ? - proc = rwrepo.git.hash_object('-w', '--stdin', as_process=True, istream=subprocess.PIPE) - - # write file - pump everything in at once to be a fast as possible - data = stream.getvalue() # cache it - st = time() - proc.stdin.write(data) - proc.stdin.close() - gitsha = proc.stdout.read().strip() - proc.wait() - gelapsed_add = time() - st - del(data) - assert gitsha == bin_to_hex(binsha) # we do it the same way, right ? - - # as its the same sha, we reuse our path - fsize_kib = os.path.getsize(db_file) / 1000 - print >> sys.stderr, "Added %i KiB (filesize = %i KiB) of %s data to using git-hash-object in %f s ( %f Write KiB / s)" % (size_kib, fsize_kib, desc, gelapsed_add, size_kib / gelapsed_add) - - # compare ... - print >> sys.stderr, "Git-Python is %f %% faster than git when adding big %s files" % (100.0 - (elapsed_add / gelapsed_add) * 100, desc) - - - # read all - st = time() - s, t, size, data = rwrepo.git.get_object_data(gitsha) - gelapsed_readall = time() - st - print >> sys.stderr, "Read %i KiB of %s data at once using git-cat-file in %f s ( %f Read KiB / s)" % (size_kib, desc, gelapsed_readall, size_kib / gelapsed_readall) - - # compare - print >> sys.stderr, "Git-Python is %f %% faster than git when reading big %sfiles" % (100.0 - (elapsed_readall / gelapsed_readall) * 100, desc) - - - # read chunks - st = time() - s, t, size, stream = rwrepo.git.stream_object_data(gitsha) - while True: - data = stream.read(cs) - if len(data) < cs: - break - # END read stream - gelapsed_readchunks = time() - st - print >> sys.stderr, "Read %i KiB of %s data in %i KiB chunks from git-cat-file in %f s ( %f Read KiB / s)" % (size_kib, desc, cs_kib, gelapsed_readchunks, size_kib / gelapsed_readchunks) - - # compare - print >> sys.stderr, "Git-Python is %f %% faster than git when reading big %s files in chunks" % (100.0 - (elapsed_readchunks / gelapsed_readchunks) * 100, desc) - # END for each randomization factor diff --git a/test/git/performance/test_utils.py b/test/git/performance/test_utils.py deleted file mode 100644 index 19c1e84a..00000000 --- a/test/git/performance/test_utils.py +++ /dev/null @@ -1,174 +0,0 @@ -"""Performance of utilities""" -from time import time -import sys -import stat - -from lib import ( - TestBigRepoR - ) - - -class TestUtilPerformance(TestBigRepoR): - - def test_access(self): - # compare dict vs. slot access - class Slotty(object): - __slots__ = "attr" - def __init__(self): - self.attr = 1 - - class Dicty(object): - def __init__(self): - self.attr = 1 - - class BigSlotty(object): - __slots__ = ('attr', ) + tuple('abcdefghijk') - def __init__(self): - for attr in self.__slots__: - setattr(self, attr, 1) - - class BigDicty(object): - def __init__(self): - for attr in BigSlotty.__slots__: - setattr(self, attr, 1) - - ni = 1000000 - for cls in (Slotty, Dicty, BigSlotty, BigDicty): - cli = cls() - st = time() - for i in xrange(ni): - cli.attr - # END for each access - elapsed = time() - st - print >> sys.stderr, "Accessed %s.attr %i times in %s s ( %f acc / s)" % (cls.__name__, ni, elapsed, ni / elapsed) - # END for each class type - - # check num of sequence-acceses - for cls in (list, tuple): - x = 10 - st = time() - s = cls(range(x)) - for i in xrange(ni): - s[0] - s[1] - s[2] - # END for - elapsed = time() - st - na = ni * 3 - print >> sys.stderr, "Accessed %s[x] %i times in %s s ( %f acc / s)" % (cls.__name__, na, elapsed, na / elapsed) - # END for each sequence - - def test_instantiation(self): - ni = 100000 - max_num_items = 4 - for mni in range(max_num_items+1): - for cls in (tuple, list): - st = time() - for i in xrange(ni): - if mni == 0: - cls() - elif mni == 1: - cls((1,)) - elif mni == 2: - cls((1,2)) - elif mni == 3: - cls((1,2,3)) - elif mni == 4: - cls((1,2,3,4)) - else: - cls(x for x in xrange(mni)) - # END handle empty cls - # END for each item - elapsed = time() - st - print >> sys.stderr, "Created %i %ss of size %i in %f s ( %f inst / s)" % (ni, cls.__name__, mni, elapsed, ni / elapsed) - # END for each type - # END for each item count - - # tuple and tuple direct - st = time() - for i in xrange(ni): - t = (1,2,3,4) - # END for each item - elapsed = time() - st - print >> sys.stderr, "Created %i tuples (1,2,3,4) in %f s ( %f tuples / s)" % (ni, elapsed, ni / elapsed) - - st = time() - for i in xrange(ni): - t = tuple((1,2,3,4)) - # END for each item - elapsed = time() - st - print >> sys.stderr, "Created %i tuples tuple((1,2,3,4)) in %f s ( %f tuples / s)" % (ni, elapsed, ni / elapsed) - - def test_unpacking_vs_indexing(self): - ni = 1000000 - list_items = [1,2,3,4] - tuple_items = (1,2,3,4) - - for sequence in (list_items, tuple_items): - st = time() - for i in xrange(ni): - one, two, three, four = sequence - # END for eac iteration - elapsed = time() - st - print >> sys.stderr, "Unpacked %i %ss of size %i in %f s ( %f acc / s)" % (ni, type(sequence).__name__, len(sequence), elapsed, ni / elapsed) - - st = time() - for i in xrange(ni): - one, two, three, four = sequence[0], sequence[1], sequence[2], sequence[3] - # END for eac iteration - elapsed = time() - st - print >> sys.stderr, "Unpacked %i %ss of size %i individually in %f s ( %f acc / s)" % (ni, type(sequence).__name__, len(sequence), elapsed, ni / elapsed) - - st = time() - for i in xrange(ni): - one, two = sequence[0], sequence[1] - # END for eac iteration - elapsed = time() - st - print >> sys.stderr, "Unpacked %i %ss of size %i individually (2 of 4) in %f s ( %f acc / s)" % (ni, type(sequence).__name__, len(sequence), elapsed, ni / elapsed) - # END for each sequence - - def test_large_list_vs_iteration(self): - # what costs more: alloc/realloc of lists, or the cpu strain of iterators ? - def slow_iter(ni): - for i in xrange(ni): - yield i - # END slow iter - be closer to the real world - - # alloc doesn't play a role here it seems - for ni in (500, 1000, 10000, 20000, 40000): - st = time() - for i in list(xrange(ni)): - i - # END for each item - elapsed = time() - st - print >> sys.stderr, "Iterated %i items from list in %f s ( %f acc / s)" % (ni, elapsed, ni / elapsed) - - st = time() - for i in slow_iter(ni): - i - # END for each item - elapsed = time() - st - print >> sys.stderr, "Iterated %i items from iterator in %f s ( %f acc / s)" % (ni, elapsed, ni / elapsed) - # END for each number of iterations - - def test_type_vs_inst_class(self): - class NewType(object): - pass - - # lets see which way is faster - inst = NewType() - - ni = 1000000 - st = time() - for i in xrange(ni): - inst.__class__() - # END for each item - elapsed = time() - st - print >> sys.stderr, "Created %i items using inst.__class__ in %f s ( %f items / s)" % (ni, elapsed, ni / elapsed) - - st = time() - for i in xrange(ni): - type(inst)() - # END for each item - elapsed = time() - st - print >> sys.stderr, "Created %i items using type(inst)() in %f s ( %f items / s)" % (ni, elapsed, ni / elapsed) diff --git a/test/git/test_actor.py b/test/git/test_actor.py deleted file mode 100644 index 8fda57e5..00000000 --- a/test/git/test_actor.py +++ /dev/null @@ -1,36 +0,0 @@ -# test_actor.py -# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors -# -# This module is part of GitPython and is released under -# the BSD License: http://www.opensource.org/licenses/bsd-license.php - -import os -from test.testlib import * -from git import * - -class TestActor(object): - def test_from_string_should_separate_name_and_email(self): - a = Actor._from_string("Michael Trier <mtrier@example.com>") - assert_equal("Michael Trier", a.name) - assert_equal("mtrier@example.com", a.email) - - # base type capabilities - assert a == a - assert not ( a != a ) - m = set() - m.add(a) - m.add(a) - assert len(m) == 1 - - def test_from_string_should_handle_just_name(self): - a = Actor._from_string("Michael Trier") - assert_equal("Michael Trier", a.name) - assert_equal(None, a.email) - - def test_should_display_representation(self): - a = Actor._from_string("Michael Trier <mtrier@example.com>") - assert_equal('<git.Actor "Michael Trier <mtrier@example.com>">', repr(a)) - - def test_str_should_alias_name(self): - a = Actor._from_string("Michael Trier <mtrier@example.com>") - assert_equal(a.name, str(a))
\ No newline at end of file diff --git a/test/git/test_base.py b/test/git/test_base.py deleted file mode 100644 index 25d1e4e9..00000000 --- a/test/git/test_base.py +++ /dev/null @@ -1,100 +0,0 @@ -# test_base.py -# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors -# -# This module is part of GitPython and is released under -# the BSD License: http://www.opensource.org/licenses/bsd-license.php - -import git.objects.base as base -import git.refs as refs -import os - -from test.testlib import * -from git import * -from itertools import chain -from git.objects.util import get_object_type_by_name -from gitdb.util import hex_to_bin -import tempfile - -class TestBase(TestBase): - - type_tuples = ( ("blob", "8741fc1d09d61f02ffd8cded15ff603eff1ec070", "blob.py"), - ("tree", "3a6a5e3eeed3723c09f1ef0399f81ed6b8d82e79", "directory"), - ("commit", "4251bd59fb8e11e40c40548cba38180a9536118c", None), - ("tag", "e56a60e8e9cd333cfba0140a77cd12b0d9398f10", None) ) - - def test_base_object(self): - # test interface of base object classes - types = (Blob, Tree, Commit, TagObject) - assert len(types) == len(self.type_tuples) - - s = set() - num_objs = 0 - num_index_objs = 0 - for obj_type, (typename, hexsha, path) in zip(types, self.type_tuples): - binsha = hex_to_bin(hexsha) - item = None - if path is None: - item = obj_type(self.rorepo,binsha) - else: - item = obj_type(self.rorepo,binsha, 0, path) - # END handle index objects - num_objs += 1 - assert item.hexsha == hexsha - assert item.type == typename - assert item.size - assert item == item - assert not item != item - assert str(item) == item.hexsha - assert repr(item) - s.add(item) - - if isinstance(item, base.IndexObject): - num_index_objs += 1 - if hasattr(item,'path'): # never runs here - assert not item.path.startswith("/") # must be relative - assert isinstance(item.mode, int) - # END index object check - - # read from stream - data_stream = item.data_stream - data = data_stream.read() - assert data - - tmpfile = os.tmpfile() - assert item == item.stream_data(tmpfile) - tmpfile.seek(0) - assert tmpfile.read() == data - # END stream to file directly - # END for each object type to create - - # each has a unique sha - assert len(s) == num_objs - assert len(s|s) == num_objs - assert num_index_objs == 2 - - def test_get_object_type_by_name(self): - for tname in base.Object.TYPES: - assert base.Object in get_object_type_by_name(tname).mro() - # END for each known type - - assert_raises( ValueError, get_object_type_by_name, "doesntexist" ) - - def test_object_resolution(self): - # objects must be resolved to shas so they compare equal - assert self.rorepo.head.reference.object == self.rorepo.active_branch.object - - @with_rw_repo('HEAD', bare=True) - def test_with_bare_rw_repo(self, bare_rw_repo): - assert bare_rw_repo.config_reader("repository").getboolean("core", "bare") - assert os.path.isfile(os.path.join(bare_rw_repo.git_dir,'HEAD')) - - @with_rw_repo('0.1.6') - def test_with_rw_repo(self, rw_repo): - assert not rw_repo.config_reader("repository").getboolean("core", "bare") - assert os.path.isdir(os.path.join(rw_repo.working_tree_dir,'lib')) - - @with_rw_and_rw_remote_repo('0.1.6') - def test_with_rw_remote_and_rw_repo(self, rw_repo, rw_remote_repo): - assert not rw_repo.config_reader("repository").getboolean("core", "bare") - assert rw_remote_repo.config_reader("repository").getboolean("core", "bare") - assert os.path.isdir(os.path.join(rw_repo.working_tree_dir,'lib')) diff --git a/test/git/test_blob.py b/test/git/test_blob.py deleted file mode 100644 index 623ed179..00000000 --- a/test/git/test_blob.py +++ /dev/null @@ -1,20 +0,0 @@ -# test_blob.py -# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors -# -# This module is part of GitPython and is released under -# the BSD License: http://www.opensource.org/licenses/bsd-license.php - -from test.testlib import * -from git import * -from gitdb.util import hex_to_bin - -class TestBlob(TestBase): - - def test_mime_type_should_return_mime_type_for_known_types(self): - blob = Blob(self.rorepo, **{'binsha': Blob.NULL_BIN_SHA, 'path': 'foo.png'}) - assert_equal("image/png", blob.mime_type) - - def test_mime_type_should_return_text_plain_for_unknown_types(self): - blob = Blob(self.rorepo, **{'binsha': Blob.NULL_BIN_SHA,'path': 'something'}) - assert_equal("text/plain", blob.mime_type) - diff --git a/test/git/test_commit.py b/test/git/test_commit.py deleted file mode 100644 index c3ce5c92..00000000 --- a/test/git/test_commit.py +++ /dev/null @@ -1,273 +0,0 @@ -# -*- coding: utf-8 -*- -# test_commit.py -# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors -# -# This module is part of GitPython and is released under -# the BSD License: http://www.opensource.org/licenses/bsd-license.php - -from test.testlib import * -from git import * -from gitdb import IStream -from gitdb.util import hex_to_bin - -from cStringIO import StringIO -import time -import sys - - -def assert_commit_serialization(rwrepo, commit_id, print_performance_info=False): - """traverse all commits in the history of commit identified by commit_id and check - if the serialization works. - :param print_performance_info: if True, we will show how fast we are""" - ns = 0 # num serializations - nds = 0 # num deserializations - - st = time.time() - for cm in rwrepo.commit(commit_id).traverse(): - nds += 1 - - # assert that we deserialize commits correctly, hence we get the same - # sha on serialization - stream = StringIO() - cm._serialize(stream) - ns += 1 - streamlen = stream.tell() - stream.seek(0) - - istream = rwrepo.odb.store(IStream(Commit.type, streamlen, stream)) - assert istream.hexsha == cm.hexsha - - nc = Commit(rwrepo, Commit.NULL_BIN_SHA, cm.tree, - cm.author, cm.authored_date, cm.author_tz_offset, - cm.committer, cm.committed_date, cm.committer_tz_offset, - cm.message, cm.parents, cm.encoding) - - assert nc.parents == cm.parents - stream = StringIO() - nc._serialize(stream) - ns += 1 - streamlen = stream.tell() - stream.seek(0) - - # reuse istream - istream.size = streamlen - istream.stream = stream - istream.binsha = None - nc.binsha = rwrepo.odb.store(istream).binsha - - # if it worked, we have exactly the same contents ! - assert nc.hexsha == cm.hexsha - # END check commits - elapsed = time.time() - st - - if print_performance_info: - print >> sys.stderr, "Serialized %i and deserialized %i commits in %f s ( (%f, %f) commits / s" % (ns, nds, elapsed, ns/elapsed, nds/elapsed) - # END handle performance info - - -class TestCommit(TestBase): - - def test_bake(self): - - commit = self.rorepo.commit('2454ae89983a4496a445ce347d7a41c0bb0ea7ae') - commit.author # bake - - assert_equal("Sebastian Thiel", commit.author.name) - assert_equal("byronimo@gmail.com", commit.author.email) - assert commit.author == commit.committer - assert isinstance(commit.authored_date, int) and isinstance(commit.committed_date, int) - assert isinstance(commit.author_tz_offset, int) and isinstance(commit.committer_tz_offset, int) - assert commit.message == "Added missing information to docstrings of commit and stats module\n" - - - def test_stats(self): - commit = self.rorepo.commit('33ebe7acec14b25c5f84f35a664803fcab2f7781') - stats = commit.stats - - def check_entries(d): - assert isinstance(d, dict) - for key in ("insertions", "deletions", "lines"): - assert key in d - # END assertion helper - assert stats.files - assert stats.total - - check_entries(stats.total) - assert "files" in stats.total - - for filepath, d in stats.files.items(): - check_entries(d) - # END for each stated file - - # assure data is parsed properly - michael = Actor._from_string("Michael Trier <mtrier@gmail.com>") - assert commit.author == michael - assert commit.committer == michael - assert commit.authored_date == 1210193388 - assert commit.committed_date == 1210193388 - assert commit.author_tz_offset == 14400, commit.author_tz_offset - assert commit.committer_tz_offset == 14400, commit.committer_tz_offset - assert commit.message == "initial project\n" - - def test_unicode_actor(self): - # assure we can parse unicode actors correctly - name = "Üäöß ÄußÉ".decode("utf-8") - assert len(name) == 9 - special = Actor._from_string(u"%s <something@this.com>" % name) - assert special.name == name - assert isinstance(special.name, unicode) - - def test_traversal(self): - start = self.rorepo.commit("a4d06724202afccd2b5c54f81bcf2bf26dea7fff") - first = self.rorepo.commit("33ebe7acec14b25c5f84f35a664803fcab2f7781") - p0 = start.parents[0] - p1 = start.parents[1] - p00 = p0.parents[0] - p10 = p1.parents[0] - - # basic branch first, depth first - dfirst = start.traverse(branch_first=False) - bfirst = start.traverse(branch_first=True) - assert dfirst.next() == p0 - assert dfirst.next() == p00 - - assert bfirst.next() == p0 - assert bfirst.next() == p1 - assert bfirst.next() == p00 - assert bfirst.next() == p10 - - # at some point, both iterations should stop - assert list(bfirst)[-1] == first - stoptraverse = self.rorepo.commit("254d04aa3180eb8b8daf7b7ff25f010cd69b4e7d").traverse(as_edge=True) - l = list(stoptraverse) - assert len(l[0]) == 2 - - # ignore self - assert start.traverse(ignore_self=False).next() == start - - # depth - assert len(list(start.traverse(ignore_self=False, depth=0))) == 1 - - # prune - assert start.traverse(branch_first=1, prune=lambda i,d: i==p0).next() == p1 - - # predicate - assert start.traverse(branch_first=1, predicate=lambda i,d: i==p1).next() == p1 - - # traversal should stop when the beginning is reached - self.failUnlessRaises(StopIteration, first.traverse().next) - - # parents of the first commit should be empty ( as the only parent has a null - # sha ) - assert len(first.parents) == 0 - - def test_iteration(self): - # we can iterate commits - all_commits = Commit.list_items(self.rorepo, self.rorepo.head) - assert all_commits - assert all_commits == list(self.rorepo.iter_commits()) - - # this includes merge commits - mcomit = self.rorepo.commit('d884adc80c80300b4cc05321494713904ef1df2d') - assert mcomit in all_commits - - # we can limit the result to paths - ltd_commits = list(self.rorepo.iter_commits(paths='CHANGES')) - assert ltd_commits and len(ltd_commits) < len(all_commits) - - # show commits of multiple paths, resulting in a union of commits - less_ltd_commits = list(Commit.iter_items(self.rorepo, 'master', paths=('CHANGES', 'AUTHORS'))) - assert len(ltd_commits) < len(less_ltd_commits) - - def test_iter_items(self): - # pretty not allowed - self.failUnlessRaises(ValueError, Commit.iter_items, self.rorepo, 'master', pretty="raw") - - def test_rev_list_bisect_all(self): - """ - 'git rev-list --bisect-all' returns additional information - in the commit header. This test ensures that we properly parse it. - """ - revs = self.rorepo.git.rev_list('933d23bf95a5bd1624fbcdf328d904e1fa173474', - first_parent=True, - bisect_all=True) - - commits = Commit._iter_from_process_or_stream(self.rorepo, StringProcessAdapter(revs)) - expected_ids = ( - '7156cece3c49544abb6bf7a0c218eb36646fad6d', - '1f66cfbbce58b4b552b041707a12d437cc5f400a', - '33ebe7acec14b25c5f84f35a664803fcab2f7781', - '933d23bf95a5bd1624fbcdf328d904e1fa173474' - ) - for sha1, commit in zip(expected_ids, commits): - assert_equal(sha1, commit.hexsha) - - def test_count(self): - assert self.rorepo.tag('refs/tags/0.1.5').commit.count( ) == 143 - - def test_list(self): - assert isinstance(Commit.list_items(self.rorepo, '0.1.5', max_count=5)[hex_to_bin('5117c9c8a4d3af19a9958677e45cda9269de1541')], Commit) - - def test_str(self): - commit = Commit(self.rorepo, Commit.NULL_BIN_SHA) - assert_equal(Commit.NULL_HEX_SHA, str(commit)) - - def test_repr(self): - commit = Commit(self.rorepo, Commit.NULL_BIN_SHA) - assert_equal('<git.Commit "%s">' % Commit.NULL_HEX_SHA, repr(commit)) - - def test_equality(self): - commit1 = Commit(self.rorepo, Commit.NULL_BIN_SHA) - commit2 = Commit(self.rorepo, Commit.NULL_BIN_SHA) - commit3 = Commit(self.rorepo, "\1"*20) - assert_equal(commit1, commit2) - assert_not_equal(commit2, commit3) - - def test_iter_parents(self): - # should return all but ourselves, even if skip is defined - c = self.rorepo.commit('0.1.5') - for skip in (0, 1): - piter = c.iter_parents(skip=skip) - first_parent = piter.next() - assert first_parent != c - assert first_parent == c.parents[0] - # END for each - - def test_base(self): - name_rev = self.rorepo.head.commit.name_rev - assert isinstance(name_rev, basestring) - - @with_rw_repo('HEAD', bare=True) - def test_serialization(self, rwrepo): - # create all commits of our repo - assert_commit_serialization(rwrepo, '0.1.6') - - def test_serialization_unicode_support(self): - assert Commit.default_encoding.lower() == 'utf-8' - - # create a commit with unicode in the message, and the author's name - # Verify its serialization and deserialization - cmt = self.rorepo.commit('0.1.6') - assert isinstance(cmt.message, unicode) # it automatically decodes it as such - assert isinstance(cmt.author.name, unicode) # same here - - cmt.message = "üäêèß".decode("utf-8") - assert len(cmt.message) == 5 - - cmt.author.name = "äüß".decode("utf-8") - assert len(cmt.author.name) == 3 - - cstream = StringIO() - cmt._serialize(cstream) - cstream.seek(0) - assert len(cstream.getvalue()) - - ncmt = Commit(self.rorepo, cmt.binsha) - ncmt._deserialize(cstream) - - assert cmt.author.name == ncmt.author.name - assert cmt.message == ncmt.message - # actually, it can't be printed in a shell as repr wants to have ascii only - # it appears - cmt.author.__repr__() - diff --git a/test/git/test_config.py b/test/git/test_config.py deleted file mode 100644 index 8c846b99..00000000 --- a/test/git/test_config.py +++ /dev/null @@ -1,102 +0,0 @@ -# test_config.py -# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors -# -# This module is part of GitPython and is released under -# the BSD License: http://www.opensource.org/licenses/bsd-license.php - -from test.testlib import * -from git import * -import StringIO -from copy import copy -from ConfigParser import NoSectionError - -class TestBase(TestCase): - - def _to_memcache(self, file_path): - fp = open(file_path, "r") - sio = StringIO.StringIO(fp.read()) - sio.name = file_path - return sio - - def _parsers_equal_or_raise(self, lhs, rhs): - pass - - def test_read_write(self): - # writer must create the exact same file as the one read before - for filename in ("git_config", "git_config_global"): - file_obj = self._to_memcache(fixture_path(filename)) - file_obj_orig = copy(file_obj) - w_config = GitConfigParser(file_obj, read_only = False) - w_config.read() # enforce reading - assert w_config._sections - w_config.write() # enforce writing - assert file_obj.getvalue() == file_obj_orig.getvalue() - - # creating an additional config writer must fail due to exclusive access - self.failUnlessRaises(IOError, GitConfigParser, file_obj, read_only = False) - - # should still have a lock and be able to make changes - assert w_config._lock._has_lock() - - # changes should be written right away - sname = "my_section" - oname = "mykey" - val = "myvalue" - w_config.add_section(sname) - assert w_config.has_section(sname) - w_config.set(sname, oname, val) - assert w_config.has_option(sname,oname) - assert w_config.get(sname, oname) == val - - sname_new = "new_section" - oname_new = "new_key" - ival = 10 - w_config.set_value(sname_new, oname_new, ival) - assert w_config.get_value(sname_new, oname_new) == ival - - file_obj.seek(0) - r_config = GitConfigParser(file_obj, read_only=True) - assert r_config.has_section(sname) - assert r_config.has_option(sname, oname) - assert r_config.get(sname, oname) == val - - # END for each filename - - def test_base(self): - path_repo = fixture_path("git_config") - path_global = fixture_path("git_config_global") - r_config = GitConfigParser([path_repo, path_global], read_only=True) - assert r_config.read_only - num_sections = 0 - num_options = 0 - - # test reader methods - assert r_config._is_initialized == False - for section in r_config.sections(): - num_sections += 1 - for option in r_config.options(section): - num_options += 1 - val = r_config.get(section, option) - val_typed = r_config.get_value(section, option) - assert isinstance(val_typed, (bool, long, float, basestring)) - assert val - assert "\n" not in option - assert "\n" not in val - - # writing must fail - self.failUnlessRaises(IOError, r_config.set, section, option, None) - self.failUnlessRaises(IOError, r_config.remove_option, section, option ) - # END for each option - self.failUnlessRaises(IOError, r_config.remove_section, section) - # END for each section - assert num_sections and num_options - assert r_config._is_initialized == True - - # get value which doesnt exist, with default - default = "my default value" - assert r_config.get_value("doesnt", "exist", default) == default - - # it raises if there is no default though - self.failUnlessRaises(NoSectionError, r_config.get_value, "doesnt", "exist") - - diff --git a/test/git/test_db.py b/test/git/test_db.py deleted file mode 100644 index 9da13bd8..00000000 --- a/test/git/test_db.py +++ /dev/null @@ -1,25 +0,0 @@ -# test_repo.py -# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors -# -# This module is part of GitPython and is released under -# the BSD License: http://www.opensource.org/licenses/bsd-license.php -from test.testlib import * -from git.db import * -from gitdb.util import bin_to_hex -from git.exc import BadObject -import os - -class TestDB(TestBase): - - def test_base(self): - gdb = GitCmdObjectDB(os.path.join(self.rorepo.git_dir, 'objects'), self.rorepo.git) - - # partial to complete - works with everything - hexsha = bin_to_hex(gdb.partial_to_complete_sha_hex("0.1.6")) - assert len(hexsha) == 40 - - assert bin_to_hex(gdb.partial_to_complete_sha_hex(hexsha[:20])) == hexsha - - # fails with BadObject - for invalid_rev in ("0000", "bad/ref", "super bad"): - self.failUnlessRaises(BadObject, gdb.partial_to_complete_sha_hex, invalid_rev) diff --git a/test/git/test_diff.py b/test/git/test_diff.py deleted file mode 100644 index ade21c1b..00000000 --- a/test/git/test_diff.py +++ /dev/null @@ -1,108 +0,0 @@ -# test_diff.py -# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors -# -# This module is part of GitPython and is released under -# the BSD License: http://www.opensource.org/licenses/bsd-license.php - -from test.testlib import * -from git import * - -class TestDiff(TestBase): - - def _assert_diff_format(self, diffs): - # verify that the format of the diff is sane - for diff in diffs: - if diff.a_mode: - assert isinstance(diff.a_mode, int) - if diff.b_mode: - assert isinstance(diff.b_mode, int) - - if diff.a_blob: - assert not diff.a_blob.path.endswith('\n') - if diff.b_blob: - assert not diff.b_blob.path.endswith('\n') - # END for each diff - return diffs - - def test_list_from_string_new_mode(self): - output = StringProcessAdapter(fixture('diff_new_mode')) - diffs = Diff._index_from_patch_format(self.rorepo, output.stdout) - self._assert_diff_format(diffs) - - assert_equal(1, len(diffs)) - assert_equal(10, len(diffs[0].diff.splitlines())) - - def test_diff_with_rename(self): - output = StringProcessAdapter(fixture('diff_rename')) - diffs = Diff._index_from_patch_format(self.rorepo, output.stdout) - self._assert_diff_format(diffs) - - assert_equal(1, len(diffs)) - - diff = diffs[0] - assert_true(diff.renamed) - assert_equal(diff.rename_from, 'AUTHORS') - assert_equal(diff.rename_to, 'CONTRIBUTORS') - - def test_diff_patch_format(self): - # test all of the 'old' format diffs for completness - it should at least - # be able to deal with it - fixtures = ("diff_2", "diff_2f", "diff_f", "diff_i", "diff_mode_only", - "diff_new_mode", "diff_numstat", "diff_p", "diff_rename", - "diff_tree_numstat_root" ) - - for fixture_name in fixtures: - diff_proc = StringProcessAdapter(fixture(fixture_name)) - diffs = Diff._index_from_patch_format(self.rorepo, diff_proc.stdout) - # END for each fixture - - def test_diff_interface(self): - # test a few variations of the main diff routine - assertion_map = dict() - for i, commit in enumerate(self.rorepo.iter_commits('0.1.6', max_count=2)): - diff_item = commit - if i%2 == 0: - diff_item = commit.tree - # END use tree every second item - - for other in (None, commit.Index, commit.parents[0]): - for paths in (None, "CHANGES", ("CHANGES", "lib")): - for create_patch in range(2): - diff_index = diff_item.diff(other, paths, create_patch) - assert isinstance(diff_index, DiffIndex) - - if diff_index: - self._assert_diff_format(diff_index) - for ct in DiffIndex.change_type: - key = 'ct_%s'%ct - assertion_map.setdefault(key, 0) - assertion_map[key] = assertion_map[key]+len(list(diff_index.iter_change_type(ct))) - # END for each changetype - - # check entries - diff_set = set() - diff_set.add(diff_index[0]) - diff_set.add(diff_index[0]) - assert len(diff_set) == 1 - assert diff_index[0] == diff_index[0] - assert not (diff_index[0] != diff_index[0]) - # END diff index checking - # END for each patch option - # END for each path option - # END for each other side - # END for each commit - - # assert we could always find at least one instance of the members we - # can iterate in the diff index - if not this indicates its not working correctly - # or our test does not span the whole range of possibilities - for key,value in assertion_map.items(): - assert value, "Did not find diff for %s" % key - # END for each iteration type - - # test path not existing in the index - should be ignored - c = self.rorepo.head.commit - cp = c.parents[0] - diff_index = c.diff(cp, ["does/not/exist"]) - assert len(diff_index) == 0 - - diff --git a/test/git/test_fun.py b/test/git/test_fun.py deleted file mode 100644 index 3fdc13fd..00000000 --- a/test/git/test_fun.py +++ /dev/null @@ -1,251 +0,0 @@ -from test.testlib import * -from git.objects.fun import ( - traverse_tree_recursive, - traverse_trees_recursive, - tree_to_stream - ) - -from git.index.fun import ( - aggressive_tree_merge - ) - -from gitdb.util import bin_to_hex -from gitdb.base import IStream -from gitdb.typ import str_tree_type - -from stat import ( - S_IFDIR, - S_IFREG, - S_IFLNK - ) - -from git.index import IndexFile -from cStringIO import StringIO - -class TestFun(TestBase): - - def _assert_index_entries(self, entries, trees): - index = IndexFile.from_tree(self.rorepo, *[self.rorepo.tree(bin_to_hex(t)) for t in trees]) - assert entries - assert len(index.entries) == len(entries) - for entry in entries: - assert (entry.path, entry.stage) in index.entries - # END assert entry matches fully - - def test_aggressive_tree_merge(self): - # head tree with additions, removals and modification compared to its predecessor - odb = self.rorepo.odb - HC = self.rorepo.commit("6c1faef799095f3990e9970bc2cb10aa0221cf9c") - H = HC.tree - B = HC.parents[0].tree - - # entries from single tree - trees = [H.binsha] - self._assert_index_entries(aggressive_tree_merge(odb, trees), trees) - - # from multiple trees - trees = [B.binsha, H.binsha] - self._assert_index_entries(aggressive_tree_merge(odb, trees), trees) - - # three way, no conflict - tree = self.rorepo.tree - B = tree("35a09c0534e89b2d43ec4101a5fb54576b577905") - H = tree("4fe5cfa0e063a8d51a1eb6f014e2aaa994e5e7d4") - M = tree("1f2b19de3301e76ab3a6187a49c9c93ff78bafbd") - trees = [B.binsha, H.binsha, M.binsha] - self._assert_index_entries(aggressive_tree_merge(odb, trees), trees) - - # three-way, conflict in at least one file, both modified - B = tree("a7a4388eeaa4b6b94192dce67257a34c4a6cbd26") - H = tree("f9cec00938d9059882bb8eabdaf2f775943e00e5") - M = tree("44a601a068f4f543f73fd9c49e264c931b1e1652") - trees = [B.binsha, H.binsha, M.binsha] - self._assert_index_entries(aggressive_tree_merge(odb, trees), trees) - - # too many trees - self.failUnlessRaises(ValueError, aggressive_tree_merge, odb, trees*2) - - def mktree(self, odb, entries): - """create a tree from the given tree entries and safe it to the database""" - sio = StringIO() - tree_to_stream(entries, sio.write) - sio.seek(0) - istream = odb.store(IStream(str_tree_type, len(sio.getvalue()), sio)) - return istream.binsha - - @with_rw_repo('0.1.6') - def test_three_way_merge(self, rwrepo): - def mkfile(name, sha, executable=0): - return (sha, S_IFREG | 0644 | executable*0111, name) - def mkcommit(name, sha): - return (sha, S_IFDIR | S_IFLNK, name) - def assert_entries(entries, num_entries, has_conflict=False): - assert len(entries) == num_entries - assert has_conflict == (len([e for e in entries if e.stage != 0]) > 0) - mktree = self.mktree - - shaa = "\1"*20 - shab = "\2"*20 - shac = "\3"*20 - - odb = rwrepo.odb - - # base tree - bfn = 'basefile' - fbase = mkfile(bfn, shaa) - tb = mktree(odb, [fbase]) - - # non-conflicting new files, same data - fa = mkfile('1', shab) - th = mktree(odb, [fbase, fa]) - fb = mkfile('2', shac) - tm = mktree(odb, [fbase, fb]) - - # two new files, same base file - trees = [tb, th, tm] - assert_entries(aggressive_tree_merge(odb, trees), 3) - - # both delete same file, add own one - fa = mkfile('1', shab) - th = mktree(odb, [fa]) - fb = mkfile('2', shac) - tm = mktree(odb, [fb]) - - # two new files - trees = [tb, th, tm] - assert_entries(aggressive_tree_merge(odb, trees), 2) - - # same file added in both, differently - fa = mkfile('1', shab) - th = mktree(odb, [fa]) - fb = mkfile('1', shac) - tm = mktree(odb, [fb]) - - # expect conflict - trees = [tb, th, tm] - assert_entries(aggressive_tree_merge(odb, trees), 2, True) - - # same file added, different mode - fa = mkfile('1', shab) - th = mktree(odb, [fa]) - fb = mkcommit('1', shab) - tm = mktree(odb, [fb]) - - # expect conflict - trees = [tb, th, tm] - assert_entries(aggressive_tree_merge(odb, trees), 2, True) - - # same file added in both - fa = mkfile('1', shab) - th = mktree(odb, [fa]) - fb = mkfile('1', shab) - tm = mktree(odb, [fb]) - - # expect conflict - trees = [tb, th, tm] - assert_entries(aggressive_tree_merge(odb, trees), 1) - - # modify same base file, differently - fa = mkfile(bfn, shab) - th = mktree(odb, [fa]) - fb = mkfile(bfn, shac) - tm = mktree(odb, [fb]) - - # conflict, 3 versions on 3 stages - trees = [tb, th, tm] - assert_entries(aggressive_tree_merge(odb, trees), 3, True) - - - # change mode on same base file, by making one a commit, the other executable - # no content change ( this is totally unlikely to happen in the real world ) - fa = mkcommit(bfn, shaa) - th = mktree(odb, [fa]) - fb = mkfile(bfn, shaa, executable=1) - tm = mktree(odb, [fb]) - - # conflict, 3 versions on 3 stages, because of different mode - trees = [tb, th, tm] - assert_entries(aggressive_tree_merge(odb, trees), 3, True) - - for is_them in range(2): - # only we/they change contents - fa = mkfile(bfn, shab) - th = mktree(odb, [fa]) - - trees = [tb, th, tb] - if is_them: - trees = [tb, tb, th] - entries = aggressive_tree_merge(odb, trees) - assert len(entries) == 1 and entries[0].binsha == shab - - # only we/they change the mode - fa = mkcommit(bfn, shaa) - th = mktree(odb, [fa]) - - trees = [tb, th, tb] - if is_them: - trees = [tb, tb, th] - entries = aggressive_tree_merge(odb, trees) - assert len(entries) == 1 and entries[0].binsha == shaa and entries[0].mode == fa[1] - - # one side deletes, the other changes = conflict - fa = mkfile(bfn, shab) - th = mktree(odb, [fa]) - tm = mktree(odb, []) - trees = [tb, th, tm] - if is_them: - trees = [tb, tm, th] - # as one is deleted, there are only 2 entries - assert_entries(aggressive_tree_merge(odb, trees), 2, True) - # END handle ours, theirs - - def _assert_tree_entries(self, entries, num_trees): - for entry in entries: - assert len(entry) == num_trees - paths = set(e[2] for e in entry if e) - - # only one path per set of entries - assert len(paths) == 1 - # END verify entry - - def test_tree_traversal(self): - # low level tree tarversal - odb = self.rorepo.odb - H = self.rorepo.tree('29eb123beb1c55e5db4aa652d843adccbd09ae18') # head tree - M = self.rorepo.tree('e14e3f143e7260de9581aee27e5a9b2645db72de') # merge tree - B = self.rorepo.tree('f606937a7a21237c866efafcad33675e6539c103') # base tree - B_old = self.rorepo.tree('1f66cfbbce58b4b552b041707a12d437cc5f400a') # old base tree - - # two very different trees - entries = traverse_trees_recursive(odb, [B_old.binsha, H.binsha], '') - self._assert_tree_entries(entries, 2) - - oentries = traverse_trees_recursive(odb, [H.binsha, B_old.binsha], '') - assert len(oentries) == len(entries) - self._assert_tree_entries(oentries, 2) - - # single tree - is_no_tree = lambda i, d: i.type != 'tree' - entries = traverse_trees_recursive(odb, [B.binsha], '') - assert len(entries) == len(list(B.traverse(predicate=is_no_tree))) - self._assert_tree_entries(entries, 1) - - # two trees - entries = traverse_trees_recursive(odb, [B.binsha, H.binsha], '') - self._assert_tree_entries(entries, 2) - - # tree trees - entries = traverse_trees_recursive(odb, [B.binsha, H.binsha, M.binsha], '') - self._assert_tree_entries(entries, 3) - - def test_tree_traversal_single(self): - max_count = 50 - count = 0 - odb = self.rorepo.odb - for commit in self.rorepo.commit("29eb123beb1c55e5db4aa652d843adccbd09ae18").traverse(): - if count >= max_count: - break - count += 1 - entries = traverse_tree_recursive(odb, commit.tree.binsha, '') - assert entries - # END for each commit diff --git a/test/git/test_git.py b/test/git/test_git.py deleted file mode 100644 index 518c464a..00000000 --- a/test/git/test_git.py +++ /dev/null @@ -1,84 +0,0 @@ -# test_git.py -# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors -# -# This module is part of GitPython and is released under -# the BSD License: http://www.opensource.org/licenses/bsd-license.php - -import os, sys -from test.testlib import * -from git import Git, GitCommandError - -class TestGit(TestCase): - - @classmethod - def setUpAll(cls): - cls.git = Git(GIT_REPO) - - @patch_object(Git, 'execute') - def test_call_process_calls_execute(self, git): - git.return_value = '' - self.git.version() - assert_true(git.called) - assert_equal(git.call_args, ((['git', 'version'],), {})) - - @raises(GitCommandError) - def test_it_raises_errors(self): - self.git.this_does_not_exist() - - - def test_it_transforms_kwargs_into_git_command_arguments(self): - assert_equal(["-s"], self.git.transform_kwargs(**{'s': True})) - assert_equal(["-s5"], self.git.transform_kwargs(**{'s': 5})) - - assert_equal(["--max-count"], self.git.transform_kwargs(**{'max_count': True})) - assert_equal(["--max-count=5"], self.git.transform_kwargs(**{'max_count': 5})) - - assert_equal(["-s", "-t"], self.git.transform_kwargs(**{'s': True, 't': True})) - - def test_it_executes_git_to_shell_and_returns_result(self): - assert_match('^git version [\d\.]{2}.*$', self.git.execute(["git","version"])) - - def test_it_accepts_stdin(self): - filename = fixture_path("cat_file_blob") - fh = open(filename, 'r') - assert_equal("70c379b63ffa0795fdbfbc128e5a2818397b7ef8", - self.git.hash_object(istream=fh, stdin=True)) - fh.close() - - @patch_object(Git, 'execute') - def test_it_ignores_false_kwargs(self, git): - # this_should_not_be_ignored=False implies it *should* be ignored - output = self.git.version(pass_this_kwarg=False) - assert_true("pass_this_kwarg" not in git.call_args[1]) - - def test_persistent_cat_file_command(self): - # read header only - import subprocess as sp - hexsha = "b2339455342180c7cc1e9bba3e9f181f7baa5167" - g = self.git.cat_file(batch_check=True, istream=sp.PIPE,as_process=True) - g.stdin.write("b2339455342180c7cc1e9bba3e9f181f7baa5167\n") - g.stdin.flush() - obj_info = g.stdout.readline() - - # read header + data - g = self.git.cat_file(batch=True, istream=sp.PIPE,as_process=True) - g.stdin.write("b2339455342180c7cc1e9bba3e9f181f7baa5167\n") - g.stdin.flush() - obj_info_two = g.stdout.readline() - assert obj_info == obj_info_two - - # read data - have to read it in one large chunk - size = int(obj_info.split()[2]) - data = g.stdout.read(size) - terminating_newline = g.stdout.read(1) - - # now we should be able to read a new object - g.stdin.write("b2339455342180c7cc1e9bba3e9f181f7baa5167\n") - g.stdin.flush() - assert g.stdout.readline() == obj_info - - - # same can be achived using the respective command functions - hexsha, typename, size = self.git.get_object_header(hexsha) - hexsha, typename_two, size_two, data = self.git.get_object_data(hexsha) - assert typename == typename_two and size == size_two diff --git a/test/git/test_index.py b/test/git/test_index.py deleted file mode 100644 index 29a7404d..00000000 --- a/test/git/test_index.py +++ /dev/null @@ -1,670 +0,0 @@ -# test_index.py -# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors -# -# This module is part of GitPython and is released under -# the BSD License: http://www.opensource.org/licenses/bsd-license.php - -from test.testlib import * -from git import * -import inspect -import os -import sys -import tempfile -import glob -import shutil -from stat import * - -class TestIndex(TestBase): - - def __init__(self, *args): - super(TestIndex, self).__init__(*args) - self._reset_progress() - - def _assert_fprogress(self, entries): - assert len(entries) == len(self._fprogress_map) - for path, call_count in self._fprogress_map.iteritems(): - assert call_count == 2 - # END for each item in progress map - self._reset_progress() - - def _fprogress(self, path, done, item): - self._fprogress_map.setdefault(path, 0) - curval = self._fprogress_map[path] - if curval == 0: - assert not done - if curval == 1: - assert done - self._fprogress_map[path] = curval + 1 - - def _fprogress_add(self, path, done, item): - """Called as progress func - we keep track of the proper - call order""" - assert item is not None - self._fprogress(path, done, item) - - def _reset_progress(self): - # maps paths to the count of calls - self._fprogress_map = dict() - - def _assert_entries(self, entries): - for entry in entries: - assert isinstance(entry, BaseIndexEntry) - assert not os.path.isabs(entry.path) - assert not "\\" in entry.path - # END for each entry - - def test_index_file_base(self): - # read from file - index = IndexFile(self.rorepo, fixture_path("index")) - assert index.entries - assert index.version > 0 - - # test entry - last_val = None - entry = index.entries.itervalues().next() - for attr in ("path","ctime","mtime","dev","inode","mode","uid", - "gid","size","binsha", "hexsha", "stage"): - val = getattr(entry, attr) - # END for each method - - # test update - entries = index.entries - assert isinstance(index.update(), IndexFile) - assert entries is not index.entries - - # test stage - index_merge = IndexFile(self.rorepo, fixture_path("index_merge")) - assert len(index_merge.entries) == 106 - assert len(list(e for e in index_merge.entries.itervalues() if e.stage != 0 )) - - # write the data - it must match the original - tmpfile = tempfile.mktemp() - index_merge.write(tmpfile) - fp = open(tmpfile, 'rb') - assert fp.read() == fixture("index_merge") - fp.close() - os.remove(tmpfile) - - def _cmp_tree_index(self, tree, index): - # fail unless both objects contain the same paths and blobs - if isinstance(tree, str): - tree = self.rorepo.commit(tree).tree - - num_blobs = 0 - blist = list() - for blob in tree.traverse(predicate = lambda e,d: e.type == "blob", branch_first=False): - assert (blob.path,0) in index.entries - blist.append(blob) - # END for each blob in tree - if len(blist) != len(index.entries): - iset = set(k[0] for k in index.entries.keys()) - bset = set(b.path for b in blist) - raise AssertionError( "CMP Failed: Missing entries in index: %s, missing in tree: %s" % (bset-iset, iset-bset) ) - # END assertion message - - @with_rw_repo('0.1.6') - def test_index_file_from_tree(self, rw_repo): - common_ancestor_sha = "5117c9c8a4d3af19a9958677e45cda9269de1541" - cur_sha = "4b43ca7ff72d5f535134241e7c797ddc9c7a3573" - other_sha = "39f85c4358b7346fee22169da9cad93901ea9eb9" - - # simple index from tree - base_index = IndexFile.from_tree(rw_repo, common_ancestor_sha) - assert base_index.entries - self._cmp_tree_index(common_ancestor_sha, base_index) - - # merge two trees - its like a fast-forward - two_way_index = IndexFile.from_tree(rw_repo, common_ancestor_sha, cur_sha) - assert two_way_index.entries - self._cmp_tree_index(cur_sha, two_way_index) - - # merge three trees - here we have a merge conflict - three_way_index = IndexFile.from_tree(rw_repo, common_ancestor_sha, cur_sha, other_sha) - assert len(list(e for e in three_way_index.entries.values() if e.stage != 0)) - - - # ITERATE BLOBS - merge_required = lambda t: t[0] != 0 - merge_blobs = list(three_way_index.iter_blobs(merge_required)) - assert merge_blobs - assert merge_blobs[0][0] in (1,2,3) - assert isinstance(merge_blobs[0][1], Blob) - - # test BlobFilter - prefix = 'lib/git' - for stage, blob in base_index.iter_blobs(BlobFilter([prefix])): - assert blob.path.startswith(prefix) - - - # writing a tree should fail with an unmerged index - self.failUnlessRaises(UnmergedEntriesError, three_way_index.write_tree) - - # removed unmerged entries - unmerged_blob_map = three_way_index.unmerged_blobs() - assert unmerged_blob_map - - # pick the first blob at the first stage we find and use it as resolved version - three_way_index.resolve_blobs( l[0][1] for l in unmerged_blob_map.itervalues() ) - tree = three_way_index.write_tree() - assert isinstance(tree, Tree) - num_blobs = 0 - for blob in tree.traverse(predicate=lambda item,d: item.type == "blob"): - assert (blob.path,0) in three_way_index.entries - num_blobs += 1 - # END for each blob - assert num_blobs == len(three_way_index.entries) - - @with_rw_repo('0.1.6') - def test_index_merge_tree(self, rw_repo): - # A bit out of place, but we need a different repo for this: - assert self.rorepo != rw_repo and not (self.rorepo == rw_repo) - assert len(set((self.rorepo, self.rorepo, rw_repo, rw_repo))) == 2 - - # SINGLE TREE MERGE - # current index is at the (virtual) cur_commit - next_commit = "4c39f9da792792d4e73fc3a5effde66576ae128c" - parent_commit = rw_repo.head.commit.parents[0] - manifest_key = IndexFile.entry_key('MANIFEST.in', 0) - manifest_entry = rw_repo.index.entries[manifest_key] - rw_repo.index.merge_tree(next_commit) - # only one change should be recorded - assert manifest_entry.binsha != rw_repo.index.entries[manifest_key].binsha - - rw_repo.index.reset(rw_repo.head) - assert rw_repo.index.entries[manifest_key].binsha == manifest_entry.binsha - - # FAKE MERGE - ############# - # Add a change with a NULL sha that should conflict with next_commit. We - # pretend there was a change, but we do not even bother adding a proper - # sha for it ( which makes things faster of course ) - manifest_fake_entry = BaseIndexEntry((manifest_entry[0], "\0"*20, 0, manifest_entry[3])) - # try write flag - self._assert_entries(rw_repo.index.add([manifest_fake_entry], write=False)) - # add actually resolves the null-hex-sha for us as a feature, but we can - # edit the index manually - assert rw_repo.index.entries[manifest_key].binsha != Object.NULL_BIN_SHA - # must operate on the same index for this ! Its a bit problematic as - # it might confuse people - index = rw_repo.index - index.entries[manifest_key] = IndexEntry.from_base(manifest_fake_entry) - index.write() - assert rw_repo.index.entries[manifest_key].hexsha == Diff.NULL_HEX_SHA - - # write an unchanged index ( just for the fun of it ) - rw_repo.index.write() - - # a three way merge would result in a conflict and fails as the command will - # not overwrite any entries in our index and hence leave them unmerged. This is - # mainly a protection feature as the current index is not yet in a tree - self.failUnlessRaises(GitCommandError, index.merge_tree, next_commit, base=parent_commit) - - # the only way to get the merged entries is to safe the current index away into a tree, - # which is like a temporary commit for us. This fails as well as the NULL sha deos not - # have a corresponding object - # NOTE: missing_ok is not a kwarg anymore, missing_ok is always true - # self.failUnlessRaises(GitCommandError, index.write_tree) - - # if missing objects are okay, this would work though ( they are always okay now ) - tree = index.write_tree() - - # now make a proper three way merge with unmerged entries - unmerged_tree = IndexFile.from_tree(rw_repo, parent_commit, tree, next_commit) - unmerged_blobs = unmerged_tree.unmerged_blobs() - assert len(unmerged_blobs) == 1 and unmerged_blobs.keys()[0] == manifest_key[0] - - - @with_rw_repo('0.1.6') - def test_index_file_diffing(self, rw_repo): - # default Index instance points to our index - index = IndexFile(rw_repo) - assert index.path is not None - assert len(index.entries) - - # write the file back - index.write() - - # could sha it, or check stats - - # test diff - # resetting the head will leave the index in a different state, and the - # diff will yield a few changes - cur_head_commit = rw_repo.head.reference.commit - ref = rw_repo.head.reset('HEAD~6', index=True, working_tree=False) - - # diff against same index is 0 - diff = index.diff() - assert len(diff) == 0 - - # against HEAD as string, must be the same as it matches index - diff = index.diff('HEAD') - assert len(diff) == 0 - - # against previous head, there must be a difference - diff = index.diff(cur_head_commit) - assert len(diff) - - # we reverse the result - adiff = index.diff(str(cur_head_commit), R=True) - odiff = index.diff(cur_head_commit, R=False) # now its not reversed anymore - assert adiff != odiff - assert odiff == diff # both unreversed diffs against HEAD - - # against working copy - its still at cur_commit - wdiff = index.diff(None) - assert wdiff != adiff - assert wdiff != odiff - - # against something unusual - self.failUnlessRaises(ValueError, index.diff, int) - - # adjust the index to match an old revision - cur_branch = rw_repo.active_branch - cur_commit = cur_branch.commit - rev_head_parent = 'HEAD~1' - assert index.reset(rev_head_parent) is index - - assert cur_branch == rw_repo.active_branch - assert cur_commit == rw_repo.head.commit - - # there must be differences towards the working tree which is in the 'future' - assert index.diff(None) - - # reset the working copy as well to current head,to pull 'back' as well - new_data = "will be reverted" - file_path = os.path.join(rw_repo.working_tree_dir, "CHANGES") - fp = open(file_path, "wb") - fp.write(new_data) - fp.close() - index.reset(rev_head_parent, working_tree=True) - assert not index.diff(None) - assert cur_branch == rw_repo.active_branch - assert cur_commit == rw_repo.head.commit - fp = open(file_path,'rb') - try: - assert fp.read() != new_data - finally: - fp.close() - - # test full checkout - test_file = os.path.join(rw_repo.working_tree_dir, "CHANGES") - open(test_file, 'ab').write("some data") - rval = index.checkout(None, force=True, fprogress=self._fprogress) - assert 'CHANGES' in list(rval) - self._assert_fprogress([None]) - assert os.path.isfile(test_file) - - os.remove(test_file) - rval = index.checkout(None, force=False, fprogress=self._fprogress) - assert 'CHANGES' in list(rval) - self._assert_fprogress([None]) - assert os.path.isfile(test_file) - - # individual file - os.remove(test_file) - rval = index.checkout(test_file, fprogress=self._fprogress) - assert list(rval)[0] == 'CHANGES' - self._assert_fprogress([test_file]) - assert os.path.exists(test_file) - - # checking out non-existing file throws - self.failUnlessRaises(CheckoutError, index.checkout, "doesnt_exist_ever.txt.that") - self.failUnlessRaises(CheckoutError, index.checkout, paths=["doesnt/exist"]) - - # checkout file with modifications - append_data = "hello" - fp = open(test_file, "ab") - fp.write(append_data) - fp.close() - try: - index.checkout(test_file) - except CheckoutError, e: - assert len(e.failed_files) == 1 and e.failed_files[0] == os.path.basename(test_file) - assert (len(e.failed_files) == len(e.failed_reasons)) and isinstance(e.failed_reasons[0], basestring) - assert len(e.valid_files) == 0 - assert open(test_file).read().endswith(append_data) - else: - raise AssertionError("Exception CheckoutError not thrown") - - # if we force it it should work - index.checkout(test_file, force=True) - assert not open(test_file).read().endswith(append_data) - - # checkout directory - shutil.rmtree(os.path.join(rw_repo.working_tree_dir, "lib")) - rval = index.checkout('lib') - assert len(list(rval)) > 1 - - def _count_existing(self, repo, files): - """ - Returns count of files that actually exist in the repository directory. - """ - existing = 0 - basedir = repo.working_tree_dir - for f in files: - existing += os.path.isfile(os.path.join(basedir, f)) - # END for each deleted file - return existing - # END num existing helper - - @with_rw_repo('0.1.6') - def test_index_mutation(self, rw_repo): - index = rw_repo.index - num_entries = len(index.entries) - cur_head = rw_repo.head - - uname = "Some Developer" - umail = "sd@company.com" - rw_repo.config_writer().set_value("user", "name", uname) - rw_repo.config_writer().set_value("user", "email", umail) - - # remove all of the files, provide a wild mix of paths, BaseIndexEntries, - # IndexEntries - def mixed_iterator(): - count = 0 - for entry in index.entries.itervalues(): - type_id = count % 4 - if type_id == 0: # path - yield entry.path - elif type_id == 1: # blob - yield Blob(rw_repo, entry.binsha, entry.mode, entry.path) - elif type_id == 2: # BaseIndexEntry - yield BaseIndexEntry(entry[:4]) - elif type_id == 3: # IndexEntry - yield entry - else: - raise AssertionError("Invalid Type") - count += 1 - # END for each entry - # END mixed iterator - deleted_files = index.remove(mixed_iterator(), working_tree=False) - assert deleted_files - assert self._count_existing(rw_repo, deleted_files) == len(deleted_files) - assert len(index.entries) == 0 - - # reset the index to undo our changes - index.reset() - assert len(index.entries) == num_entries - - # remove with working copy - deleted_files = index.remove(mixed_iterator(), working_tree=True) - assert deleted_files - assert self._count_existing(rw_repo, deleted_files) == 0 - - # reset everything - index.reset(working_tree=True) - assert self._count_existing(rw_repo, deleted_files) == len(deleted_files) - - # invalid type - self.failUnlessRaises(TypeError, index.remove, [1]) - - # absolute path - deleted_files = index.remove([os.path.join(rw_repo.working_tree_dir,"lib")], r=True) - assert len(deleted_files) > 1 - self.failUnlessRaises(ValueError, index.remove, ["/doesnt/exists"]) - - # TEST COMMITTING - # commit changed index - cur_commit = cur_head.commit - commit_message = "commit default head" - - new_commit = index.commit(commit_message, head=False) - assert cur_commit != new_commit - assert new_commit.author.name == uname - assert new_commit.author.email == umail - assert new_commit.committer.name == uname - assert new_commit.committer.email == umail - assert new_commit.message == commit_message - assert new_commit.parents[0] == cur_commit - assert len(new_commit.parents) == 1 - assert cur_head.commit == cur_commit - - # same index, no parents - commit_message = "index without parents" - commit_no_parents = index.commit(commit_message, parent_commits=list(), head=True) - assert SymbolicReference(rw_repo, 'ORIG_HEAD').commit == cur_commit - assert commit_no_parents.message == commit_message - assert len(commit_no_parents.parents) == 0 - assert cur_head.commit == commit_no_parents - - # same index, multiple parents - commit_message = "Index with multiple parents\n commit with another line" - commit_multi_parent = index.commit(commit_message,parent_commits=(commit_no_parents, new_commit)) - assert commit_multi_parent.message == commit_message - assert len(commit_multi_parent.parents) == 2 - assert commit_multi_parent.parents[0] == commit_no_parents - assert commit_multi_parent.parents[1] == new_commit - assert cur_head.commit == commit_multi_parent - - # re-add all files in lib - # get the lib folder back on disk, but get an index without it - index.reset(new_commit.parents[0], working_tree=True).reset(new_commit, working_tree=False) - lib_file_path = os.path.join("lib", "git", "__init__.py") - assert (lib_file_path, 0) not in index.entries - assert os.path.isfile(os.path.join(rw_repo.working_tree_dir, lib_file_path)) - - # directory - entries = index.add(['lib'], fprogress=self._fprogress_add) - self._assert_entries(entries) - self._assert_fprogress(entries) - assert len(entries)>1 - - # glob - entries = index.reset(new_commit).add([os.path.join('lib', 'git', '*.py')], fprogress=self._fprogress_add) - self._assert_entries(entries) - self._assert_fprogress(entries) - assert len(entries) == 14 - - # same file - entries = index.reset(new_commit).add([os.path.abspath(os.path.join('lib', 'git', 'head.py'))]*2, fprogress=self._fprogress_add) - self._assert_entries(entries) - assert entries[0].mode & 0644 == 0644 - # would fail, test is too primitive to handle this case - # self._assert_fprogress(entries) - self._reset_progress() - assert len(entries) == 2 - - # missing path - self.failUnlessRaises(OSError, index.reset(new_commit).add, ['doesnt/exist/must/raise']) - - # blob from older revision overrides current index revision - old_blob = new_commit.parents[0].tree.blobs[0] - entries = index.reset(new_commit).add([old_blob], fprogress=self._fprogress_add) - self._assert_entries(entries) - self._assert_fprogress(entries) - assert index.entries[(old_blob.path,0)].hexsha == old_blob.hexsha and len(entries) == 1 - - # mode 0 not allowed - null_hex_sha = Diff.NULL_HEX_SHA - null_bin_sha = "\0" * 20 - self.failUnlessRaises(ValueError, index.reset(new_commit).add, [BaseIndexEntry((0, null_bin_sha,0,"doesntmatter"))]) - - # add new file - new_file_relapath = "my_new_file" - new_file_path = self._make_file(new_file_relapath, "hello world", rw_repo) - entries = index.reset(new_commit).add([BaseIndexEntry((010644, null_bin_sha, 0, new_file_relapath))], fprogress=self._fprogress_add) - self._assert_entries(entries) - self._assert_fprogress(entries) - assert len(entries) == 1 and entries[0].hexsha != null_hex_sha - - # add symlink - if sys.platform != "win32": - basename = "my_real_symlink" - target = "/etc/that" - link_file = os.path.join(rw_repo.working_tree_dir, basename) - os.symlink(target, link_file) - entries = index.reset(new_commit).add([link_file], fprogress=self._fprogress_add) - self._assert_entries(entries) - self._assert_fprogress(entries) - assert len(entries) == 1 and S_ISLNK(entries[0].mode) - assert S_ISLNK(index.entries[index.entry_key("my_real_symlink", 0)].mode) - - # we expect only the target to be written - assert index.repo.odb.stream(entries[0].binsha).read() == target - # END real symlink test - - # add fake symlink and assure it checks-our as symlink - fake_symlink_relapath = "my_fake_symlink" - link_target = "/etc/that" - fake_symlink_path = self._make_file(fake_symlink_relapath, link_target, rw_repo) - fake_entry = BaseIndexEntry((0120000, null_bin_sha, 0, fake_symlink_relapath)) - entries = index.reset(new_commit).add([fake_entry], fprogress=self._fprogress_add) - self._assert_entries(entries) - self._assert_fprogress(entries) - assert entries[0].hexsha != null_hex_sha - assert len(entries) == 1 and S_ISLNK(entries[0].mode) - - # assure this also works with an alternate method - full_index_entry = IndexEntry.from_base(BaseIndexEntry((0120000, entries[0].binsha, 0, entries[0].path))) - entry_key = index.entry_key(full_index_entry) - index.reset(new_commit) - - assert entry_key not in index.entries - index.entries[entry_key] = full_index_entry - index.write() - index.update() # force reread of entries - new_entry = index.entries[entry_key] - assert S_ISLNK(new_entry.mode) - - # a tree created from this should contain the symlink - tree = index.write_tree() - assert fake_symlink_relapath in tree - index.write() # flush our changes for the checkout - - # checkout the fakelink, should be a link then - assert not S_ISLNK(os.stat(fake_symlink_path)[ST_MODE]) - os.remove(fake_symlink_path) - index.checkout(fake_symlink_path) - - # on windows we will never get symlinks - if os.name == 'nt': - # simlinks should contain the link as text ( which is what a - # symlink actually is ) - open(fake_symlink_path,'rb').read() == link_target - else: - assert S_ISLNK(os.lstat(fake_symlink_path)[ST_MODE]) - - # TEST RENAMING - def assert_mv_rval(rval): - for source, dest in rval: - assert not os.path.exists(source) and os.path.exists(dest) - # END for each renamed item - # END move assertion utility - - self.failUnlessRaises(ValueError, index.move, ['just_one_path']) - # file onto existing file - files = ['AUTHORS', 'LICENSE'] - self.failUnlessRaises(GitCommandError, index.move, files) - - # again, with force - assert_mv_rval(index.move(files, f=True)) - - # files into directory - dry run - paths = ['LICENSE', 'VERSION', 'doc'] - rval = index.move(paths, dry_run=True) - assert len(rval) == 2 - assert os.path.exists(paths[0]) - - # again, no dry run - rval = index.move(paths) - assert_mv_rval(rval) - - # dir into dir - rval = index.move(['doc', 'test']) - assert_mv_rval(rval) - - - # TEST PATH REWRITING - ###################### - count = [0] - def rewriter(entry): - rval = str(count[0]) - count[0] += 1 - return rval - # END rewriter - - def make_paths(): - # two existing ones, one new one - yield 'CHANGES' - yield 'ez_setup.py' - yield index.entries[index.entry_key('README', 0)] - yield index.entries[index.entry_key('.gitignore', 0)] - - for fid in range(3): - fname = 'newfile%i' % fid - open(fname, 'wb').write("abcd") - yield Blob(rw_repo, Blob.NULL_BIN_SHA, 0100644, fname) - # END for each new file - # END path producer - paths = list(make_paths()) - self._assert_entries(index.add(paths, path_rewriter=rewriter)) - - for filenum in range(len(paths)): - assert index.entry_key(str(filenum), 0) in index.entries - - - # TEST RESET ON PATHS - ###################### - arela = "aa" - brela = "bb" - afile = self._make_file(arela, "adata", rw_repo) - bfile = self._make_file(brela, "bdata", rw_repo) - akey = index.entry_key(arela, 0) - bkey = index.entry_key(brela, 0) - keys = (akey, bkey) - absfiles = (afile, bfile) - files = (arela, brela) - - for fkey in keys: - assert not fkey in index.entries - - index.add(files, write=True) - nc = index.commit("2 files committed", head=False) - - for fkey in keys: - assert fkey in index.entries - - # just the index - index.reset(paths=(arela, afile)) - assert not akey in index.entries - assert bkey in index.entries - - # now with working tree - files on disk as well as entries must be recreated - rw_repo.head.commit = nc - for absfile in absfiles: - os.remove(absfile) - - index.reset(working_tree=True, paths=files) - - for fkey in keys: - assert fkey in index.entries - for absfile in absfiles: - assert os.path.isfile(absfile) - - - @with_rw_repo('HEAD') - def test_compare_write_tree(self, rw_repo): - # write all trees and compare them - # its important to have a few submodules in there too - max_count = 25 - count = 0 - for commit in rw_repo.head.commit.traverse(): - if count >= max_count: - break - count += 1 - index = rw_repo.index.reset(commit) - orig_tree = commit.tree - assert index.write_tree() == orig_tree - # END for each commit - - def test_index_new(self): - B = self.rorepo.tree("6d9b1f4f9fa8c9f030e3207e7deacc5d5f8bba4e") - H = self.rorepo.tree("25dca42bac17d511b7e2ebdd9d1d679e7626db5f") - M = self.rorepo.tree("e746f96bcc29238b79118123028ca170adc4ff0f") - - for args in ((B,), (B,H), (B,H,M)): - index = IndexFile.new(self.rorepo, *args) - assert isinstance(index, IndexFile) - # END for each arg tuple - - diff --git a/test/git/test_refs.py b/test/git/test_refs.py deleted file mode 100644 index fa26bae9..00000000 --- a/test/git/test_refs.py +++ /dev/null @@ -1,488 +0,0 @@ -# test_refs.py -# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors -# -# This module is part of GitPython and is released under -# the BSD License: http://www.opensource.org/licenses/bsd-license.php - -from mock import * -from test.testlib import * -from git import * -import git.refs as refs -from git.objects.tag import TagObject -from itertools import chain -import os - -class TestRefs(TestBase): - - def test_from_path(self): - # should be able to create any reference directly - for ref_type in ( Reference, Head, TagReference, RemoteReference ): - for name in ('rela_name', 'path/rela_name'): - full_path = ref_type.to_full_path(name) - instance = ref_type.from_path(self.rorepo, full_path) - assert isinstance(instance, ref_type) - # END for each name - # END for each type - - def test_tag_base(self): - tag_object_refs = list() - for tag in self.rorepo.tags: - assert "refs/tags" in tag.path - assert tag.name - assert isinstance( tag.commit, Commit ) - if tag.tag is not None: - tag_object_refs.append( tag ) - tagobj = tag.tag - assert isinstance( tagobj, TagObject ) - assert tagobj.tag == tag.name - assert isinstance( tagobj.tagger, Actor ) - assert isinstance( tagobj.tagged_date, int ) - assert isinstance( tagobj.tagger_tz_offset, int ) - assert tagobj.message - assert tag.object == tagobj - # can't assign the object - self.failUnlessRaises(AttributeError, setattr, tag, 'object', tagobj) - # END if we have a tag object - # END for tag in repo-tags - assert tag_object_refs - assert isinstance(self.rorepo.tags['0.1.5'], TagReference) - - def test_tags(self): - # tag refs can point to tag objects or to commits - s = set() - ref_count = 0 - for ref in chain(self.rorepo.tags, self.rorepo.heads): - ref_count += 1 - assert isinstance(ref, refs.Reference) - assert str(ref) == ref.name - assert repr(ref) - assert ref == ref - assert not ref != ref - s.add(ref) - # END for each ref - assert len(s) == ref_count - assert len(s|s) == ref_count - - @with_rw_repo('HEAD', bare=False) - def test_heads(self, rwrepo): - for head in rwrepo.heads: - assert head.name - assert head.path - assert "refs/heads" in head.path - prev_object = head.object - cur_object = head.object - assert prev_object == cur_object # represent the same git object - assert prev_object is not cur_object # but are different instances - - writer = head.config_writer() - tv = "testopt" - writer.set_value(tv, 1) - assert writer.get_value(tv) == 1 - del(writer) - assert head.config_reader().get_value(tv) == 1 - head.config_writer().remove_option(tv) - - # after the clone, we might still have a tracking branch setup - head.set_tracking_branch(None) - assert head.tracking_branch() is None - remote_ref = rwrepo.remotes[0].refs[0] - assert head.set_tracking_branch(remote_ref) is head - assert head.tracking_branch() == remote_ref - head.set_tracking_branch(None) - assert head.tracking_branch() is None - # END for each head - - # verify ORIG_HEAD gets set for detached heads - head = rwrepo.head - orig_head = head.orig_head() - cur_head = head.ref - cur_commit = cur_head.commit - pcommit = cur_head.commit.parents[0].parents[0] - head.ref = pcommit # detach head - assert orig_head.commit == cur_commit - - # even if we set it through its reference - chaning the ref - # will adjust the orig_head, which still points to cur_commit - head.ref = cur_head - assert orig_head.commit == pcommit - assert head.commit == cur_commit == cur_head.commit - - cur_head.commit = pcommit - assert head.commit == pcommit - assert orig_head.commit == cur_commit - - # with automatic dereferencing - head.commit = cur_commit - assert orig_head.commit == pcommit - - # changing branches which are not checked out doesn't affect the ORIG_HEAD - other_head = Head.create(rwrepo, 'mynewhead', pcommit) - assert other_head.commit == pcommit - assert orig_head.commit == pcommit - other_head.commit = pcommit.parents[0] - assert orig_head.commit == pcommit - - - def test_refs(self): - types_found = set() - for ref in self.rorepo.refs: - types_found.add(type(ref)) - assert len(types_found) >= 3 - - def test_is_valid(self): - assert Reference(self.rorepo, 'refs/doesnt/exist').is_valid() == False - assert self.rorepo.head.is_valid() - assert self.rorepo.head.reference.is_valid() - assert SymbolicReference(self.rorepo, 'hellothere').is_valid() == False - - @with_rw_repo('0.1.6') - def test_head_reset(self, rw_repo): - cur_head = rw_repo.head - old_head_commit = cur_head.commit - new_head_commit = cur_head.ref.commit.parents[0] - cur_head.reset(new_head_commit, index=True) # index only - assert cur_head.reference.commit == new_head_commit - - self.failUnlessRaises(ValueError, cur_head.reset, new_head_commit, index=False, working_tree=True) - new_head_commit = new_head_commit.parents[0] - cur_head.reset(new_head_commit, index=True, working_tree=True) # index + wt - assert cur_head.reference.commit == new_head_commit - - # paths - make sure we have something to do - rw_repo.index.reset(old_head_commit.parents[0]) - cur_head.reset(cur_head, paths = "test") - cur_head.reset(new_head_commit, paths = "lib") - # hard resets with paths don't work, its all or nothing - self.failUnlessRaises(GitCommandError, cur_head.reset, new_head_commit, working_tree=True, paths = "lib") - - # we can do a mixed reset, and then checkout from the index though - cur_head.reset(new_head_commit) - rw_repo.index.checkout(["lib"], force=True)# - - - # now that we have a write write repo, change the HEAD reference - its - # like git-reset --soft - heads = rw_repo.heads - assert heads - for head in heads: - cur_head.reference = head - assert cur_head.reference == head - assert isinstance(cur_head.reference, Head) - assert cur_head.commit == head.commit - assert not cur_head.is_detached - # END for each head - - # detach - active_head = heads[0] - curhead_commit = active_head.commit - cur_head.reference = curhead_commit - assert cur_head.commit == curhead_commit - assert cur_head.is_detached - self.failUnlessRaises(TypeError, getattr, cur_head, "reference") - - # tags are references, hence we can point to them - some_tag = rw_repo.tags[0] - cur_head.reference = some_tag - assert not cur_head.is_detached - assert cur_head.commit == some_tag.commit - assert isinstance(cur_head.reference, TagReference) - - # put HEAD back to a real head, otherwise everything else fails - cur_head.reference = active_head - - # type check - self.failUnlessRaises(ValueError, setattr, cur_head, "reference", "that") - - # head handling - commit = 'HEAD' - prev_head_commit = cur_head.commit - for count, new_name in enumerate(("my_new_head", "feature/feature1")): - actual_commit = commit+"^"*count - new_head = Head.create(rw_repo, new_name, actual_commit) - assert cur_head.commit == prev_head_commit - assert isinstance(new_head, Head) - # already exists - self.failUnlessRaises(GitCommandError, Head.create, rw_repo, new_name) - - # force it - new_head = Head.create(rw_repo, new_name, actual_commit, force=True) - old_path = new_head.path - old_name = new_head.name - - assert new_head.rename("hello").name == "hello" - assert new_head.rename("hello/world").name == "hello/world" - assert new_head.rename(old_name).name == old_name and new_head.path == old_path - - # rename with force - tmp_head = Head.create(rw_repo, "tmphead") - self.failUnlessRaises(GitCommandError, tmp_head.rename, new_head) - tmp_head.rename(new_head, force=True) - assert tmp_head == new_head and tmp_head.object == new_head.object - - Head.delete(rw_repo, tmp_head) - heads = rw_repo.heads - assert tmp_head not in heads and new_head not in heads - # force on deletion testing would be missing here, code looks okay though ;) - # END for each new head name - self.failUnlessRaises(TypeError, RemoteReference.create, rw_repo, "some_name") - - # tag ref - tag_name = "1.0.2" - light_tag = TagReference.create(rw_repo, tag_name) - self.failUnlessRaises(GitCommandError, TagReference.create, rw_repo, tag_name) - light_tag = TagReference.create(rw_repo, tag_name, "HEAD~1", force = True) - assert isinstance(light_tag, TagReference) - assert light_tag.name == tag_name - assert light_tag.commit == cur_head.commit.parents[0] - assert light_tag.tag is None - - # tag with tag object - other_tag_name = "releases/1.0.2RC" - msg = "my mighty tag\nsecond line" - obj_tag = TagReference.create(rw_repo, other_tag_name, message=msg) - assert isinstance(obj_tag, TagReference) - assert obj_tag.name == other_tag_name - assert obj_tag.commit == cur_head.commit - assert obj_tag.tag is not None - - TagReference.delete(rw_repo, light_tag, obj_tag) - tags = rw_repo.tags - assert light_tag not in tags and obj_tag not in tags - - # remote deletion - remote_refs_so_far = 0 - remotes = rw_repo.remotes - assert remotes - for remote in remotes: - refs = remote.refs - RemoteReference.delete(rw_repo, *refs) - remote_refs_so_far += len(refs) - for ref in refs: - assert ref.remote_name == remote.name - # END for each ref to delete - assert remote_refs_so_far - - for remote in remotes: - # remotes without references throw - self.failUnlessRaises(AssertionError, getattr, remote, 'refs') - # END for each remote - - # change where the active head points to - if cur_head.is_detached: - cur_head.reference = rw_repo.heads[0] - - head = cur_head.reference - old_commit = head.commit - head.commit = old_commit.parents[0] - assert head.commit == old_commit.parents[0] - assert head.commit == cur_head.commit - head.commit = old_commit - - # setting a non-commit as commit fails, but succeeds as object - head_tree = head.commit.tree - self.failUnlessRaises(ValueError, setattr, head, 'commit', head_tree) - assert head.commit == old_commit # and the ref did not change - self.failUnlessRaises(GitCommandError, setattr, head, 'object', head_tree) - - # set the commit directly using the head. This would never detach the head - assert not cur_head.is_detached - head.object = old_commit - cur_head.reference = head.commit - assert cur_head.is_detached - parent_commit = head.commit.parents[0] - assert cur_head.is_detached - cur_head.commit = parent_commit - assert cur_head.is_detached and cur_head.commit == parent_commit - - cur_head.reference = head - assert not cur_head.is_detached - cur_head.commit = parent_commit - assert not cur_head.is_detached - assert head.commit == parent_commit - - # test checkout - active_branch = rw_repo.active_branch - for head in rw_repo.heads: - checked_out_head = head.checkout() - assert checked_out_head == head - # END for each head to checkout - - # checkout with branch creation - new_head = active_branch.checkout(b="new_head") - assert active_branch != rw_repo.active_branch - assert new_head == rw_repo.active_branch - - # checkout with force as we have a changed a file - # clear file - open(new_head.commit.tree.blobs[-1].abspath,'w').close() - assert len(new_head.commit.diff(None)) - - # create a new branch that is likely to touch the file we changed - far_away_head = rw_repo.create_head("far_head",'HEAD~100') - self.failUnlessRaises(GitCommandError, far_away_head.checkout) - assert active_branch == active_branch.checkout(force=True) - assert rw_repo.head.reference != far_away_head - - # test reference creation - partial_ref = 'sub/ref' - full_ref = 'refs/%s' % partial_ref - ref = Reference.create(rw_repo, partial_ref) - assert ref.path == full_ref - assert ref.object == rw_repo.head.commit - - self.failUnlessRaises(OSError, Reference.create, rw_repo, full_ref, 'HEAD~20') - # it works if it is at the same spot though and points to the same reference - assert Reference.create(rw_repo, full_ref, 'HEAD').path == full_ref - Reference.delete(rw_repo, full_ref) - - # recreate the reference using a full_ref - ref = Reference.create(rw_repo, full_ref) - assert ref.path == full_ref - assert ref.object == rw_repo.head.commit - - # recreate using force - ref = Reference.create(rw_repo, partial_ref, 'HEAD~1', force=True) - assert ref.path == full_ref - assert ref.object == rw_repo.head.commit.parents[0] - - # rename it - orig_obj = ref.object - for name in ('refs/absname', 'rela_name', 'feature/rela_name'): - ref_new_name = ref.rename(name) - assert isinstance(ref_new_name, Reference) - assert name in ref_new_name.path - assert ref_new_name.object == orig_obj - assert ref_new_name == ref - # END for each name type - - # References that don't exist trigger an error if we want to access them - self.failUnlessRaises(ValueError, getattr, Reference(rw_repo, "refs/doesntexist"), 'commit') - - # exists, fail unless we force - ex_ref_path = far_away_head.path - self.failUnlessRaises(OSError, ref.rename, ex_ref_path) - # if it points to the same commit it works - far_away_head.commit = ref.commit - ref.rename(ex_ref_path) - assert ref.path == ex_ref_path and ref.object == orig_obj - assert ref.rename(ref.path).path == ex_ref_path # rename to same name - - # create symbolic refs - symref_path = "symrefs/sym" - symref = SymbolicReference.create(rw_repo, symref_path, cur_head.reference) - assert symref.path == symref_path - assert symref.reference == cur_head.reference - - self.failUnlessRaises(OSError, SymbolicReference.create, rw_repo, symref_path, cur_head.reference.commit) - # it works if the new ref points to the same reference - SymbolicReference.create(rw_repo, symref.path, symref.reference).path == symref.path - SymbolicReference.delete(rw_repo, symref) - # would raise if the symref wouldn't have been deletedpbl - symref = SymbolicReference.create(rw_repo, symref_path, cur_head.reference) - - # test symbolic references which are not at default locations like HEAD - # or FETCH_HEAD - they may also be at spots in refs of course - symbol_ref_path = "refs/symbol_ref" - symref = SymbolicReference(rw_repo, symbol_ref_path) - assert symref.path == symbol_ref_path - symbol_ref_abspath = os.path.join(rw_repo.git_dir, symref.path) - - # set it - symref.reference = new_head - assert symref.reference == new_head - assert os.path.isfile(symbol_ref_abspath) - assert symref.commit == new_head.commit - - for name in ('absname','folder/rela_name'): - symref_new_name = symref.rename(name) - assert isinstance(symref_new_name, SymbolicReference) - assert name in symref_new_name.path - assert symref_new_name.reference == new_head - assert symref_new_name == symref - assert not symref.is_detached - # END for each ref - - # create a new non-head ref just to be sure we handle it even if packed - Reference.create(rw_repo, full_ref) - - # test ref listing - assure we have packed refs - rw_repo.git.pack_refs(all=True, prune=True) - heads = rw_repo.heads - assert heads - assert new_head in heads - assert active_branch in heads - assert rw_repo.tags - - # we should be able to iterate all symbolic refs as well - in that case - # we should expect only symbolic references to be returned - for symref in SymbolicReference.iter_items(rw_repo): - assert not symref.is_detached - - # when iterating references, we can get references and symrefs - # when deleting all refs, I'd expect them to be gone ! Even from - # the packed ones - # For this to work, we must not be on any branch - rw_repo.head.reference = rw_repo.head.commit - deleted_refs = set() - for ref in Reference.iter_items(rw_repo): - if ref.is_detached: - ref.delete(rw_repo, ref) - deleted_refs.add(ref) - # END delete ref - # END for each ref to iterate and to delete - assert deleted_refs - - for ref in Reference.iter_items(rw_repo): - if ref.is_detached: - assert ref not in deleted_refs - # END for each ref - - # reattach head - head will not be returned if it is not a symbolic - # ref - rw_repo.head.reference = Head.create(rw_repo, "master") - - # At least the head should still exist - assert os.path.isfile(os.path.join(rw_repo.git_dir, 'HEAD')) - refs = list(SymbolicReference.iter_items(rw_repo)) - assert len(refs) == 1 - - - # test creation of new refs from scratch - for path in ("basename", "dir/somename", "dir2/subdir/basename"): - # REFERENCES - ############ - fpath = Reference.to_full_path(path) - ref_fp = Reference.from_path(rw_repo, fpath) - assert not ref_fp.is_valid() - ref = Reference(rw_repo, fpath) - assert ref == ref_fp - - # can be created by assigning a commit - ref.commit = rw_repo.head.commit - assert ref.is_valid() - - # if the assignment raises, the ref doesn't exist - Reference.delete(ref.repo, ref.path) - assert not ref.is_valid() - self.failUnlessRaises(ValueError, setattr, ref, 'commit', "nonsense") - assert not ref.is_valid() - - # I am sure I had my reason to make it a class method at first, but - # now it doesn't make so much sense anymore, want an instance method as well - # See http://byronimo.lighthouseapp.com/projects/51787-gitpython/tickets/27 - Reference.delete(ref.repo, ref.path) - assert not ref.is_valid() - - ref.object = rw_repo.head.commit - assert ref.is_valid() - - Reference.delete(ref.repo, ref.path) - assert not ref.is_valid() - self.failUnlessRaises(GitCommandError, setattr, ref, 'object', "nonsense") - assert not ref.is_valid() - - # END for each path - - def test_dereference_recursive(self): - # for now, just test the HEAD - assert SymbolicReference.dereference_recursive(self.rorepo, 'HEAD') diff --git a/test/git/test_remote.py b/test/git/test_remote.py deleted file mode 100644 index c52f907e..00000000 --- a/test/git/test_remote.py +++ /dev/null @@ -1,445 +0,0 @@ -# test_remote.py -# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors -# -# This module is part of GitPython and is released under -# the BSD License: http://www.opensource.org/licenses/bsd-license.php - -from test.testlib import * -from git import * -from git.util import IterableList -import tempfile -import shutil -import os -import random - -# assure we have repeatable results -random.seed(0) - -class TestRemoteProgress(RemoteProgress): - __slots__ = ( "_seen_lines", "_stages_per_op", '_num_progress_messages' ) - def __init__(self): - super(TestRemoteProgress, self).__init__() - self._seen_lines = list() - self._stages_per_op = dict() - self._num_progress_messages = 0 - - def _parse_progress_line(self, line): - # we may remove the line later if it is dropped - # Keep it for debugging - self._seen_lines.append(line) - rval = super(TestRemoteProgress, self)._parse_progress_line(line) - assert len(line) > 1, "line %r too short" % line - return rval - - def line_dropped(self, line): - try: - self._seen_lines.remove(line) - except ValueError: - pass - - def update(self, op_code, cur_count, max_count=None, message=''): - # check each stage only comes once - op_id = op_code & self.OP_MASK - assert op_id in (self.COUNTING, self.COMPRESSING, self.WRITING) - - self._stages_per_op.setdefault(op_id, 0) - self._stages_per_op[ op_id ] = self._stages_per_op[ op_id ] | (op_code & self.STAGE_MASK) - - if op_code & (self.WRITING|self.END) == (self.WRITING|self.END): - assert message - # END check we get message - - self._num_progress_messages += 1 - - - def make_assertion(self): - # we don't always receive messages - if not self._seen_lines: - return - - # sometimes objects are not compressed which is okay - assert len(self._seen_ops) in (2,3) - assert self._stages_per_op - - # must have seen all stages - for op, stages in self._stages_per_op.items(): - assert stages & self.STAGE_MASK == self.STAGE_MASK - # END for each op/stage - - def assert_received_message(self): - assert self._num_progress_messages - - -class TestRemote(TestBase): - - def _print_fetchhead(self, repo): - fp = open(os.path.join(repo.git_dir, "FETCH_HEAD")) - fp.close() - - - def _do_test_fetch_result(self, results, remote): - # self._print_fetchhead(remote.repo) - assert len(results) > 0 and isinstance(results[0], FetchInfo) - for info in results: - assert isinstance(info.note, basestring) - if isinstance(info.ref, Reference): - assert info.flags != 0 - # END reference type flags handling - assert isinstance(info.ref, (SymbolicReference, Reference)) - if info.flags & (info.FORCED_UPDATE|info.FAST_FORWARD): - assert isinstance(info.old_commit, Commit) - else: - assert info.old_commit is None - # END forced update checking - # END for each info - - def _do_test_push_result(self, results, remote): - assert len(results) > 0 and isinstance(results[0], PushInfo) - for info in results: - assert info.flags - assert isinstance(info.summary, basestring) - if info.old_commit is not None: - assert isinstance(info.old_commit, Commit) - if info.flags & info.ERROR: - has_one = False - for bitflag in (info.REJECTED, info.REMOTE_REJECTED, info.REMOTE_FAILURE): - has_one |= bool(info.flags & bitflag) - # END for each bitflag - assert has_one - else: - # there must be a remote commit - if info.flags & info.DELETED == 0: - assert isinstance(info.local_ref, Reference) - else: - assert info.local_ref is None - assert type(info.remote_ref) in (TagReference, RemoteReference) - # END error checking - # END for each info - - - def _do_test_fetch_info(self, repo): - self.failUnlessRaises(ValueError, FetchInfo._from_line, repo, "nonsense", '') - self.failUnlessRaises(ValueError, FetchInfo._from_line, repo, "? [up to date] 0.1.7RC -> origin/0.1.7RC", '') - - def _commit_random_file(self, repo): - #Create a file with a random name and random data and commit it to repo. - # Return the commited absolute file path - index = repo.index - new_file = self._make_file(os.path.basename(tempfile.mktemp()),str(random.random()), repo) - index.add([new_file]) - index.commit("Committing %s" % new_file) - return new_file - - def _do_test_fetch(self,remote, rw_repo, remote_repo): - # specialized fetch testing to de-clutter the main test - self._do_test_fetch_info(rw_repo) - - def fetch_and_test(remote, **kwargs): - progress = TestRemoteProgress() - kwargs['progress'] = progress - res = remote.fetch(**kwargs) - progress.make_assertion() - self._do_test_fetch_result(res, remote) - return res - # END fetch and check - - def get_info(res, remote, name): - return res["%s/%s"%(remote,name)] - - # put remote head to master as it is garantueed to exist - remote_repo.head.reference = remote_repo.heads.master - - res = fetch_and_test(remote) - # all uptodate - for info in res: - assert info.flags & info.HEAD_UPTODATE - - # rewind remote head to trigger rejection - # index must be false as remote is a bare repo - rhead = remote_repo.head - remote_commit = rhead.commit - rhead.reset("HEAD~2", index=False) - res = fetch_and_test(remote) - mkey = "%s/%s"%(remote,'master') - master_info = res[mkey] - assert master_info.flags & FetchInfo.FORCED_UPDATE and master_info.note is not None - - # normal fast forward - set head back to previous one - rhead.commit = remote_commit - res = fetch_and_test(remote) - assert res[mkey].flags & FetchInfo.FAST_FORWARD - - # new remote branch - new_remote_branch = Head.create(remote_repo, "new_branch") - res = fetch_and_test(remote) - new_branch_info = get_info(res, remote, new_remote_branch) - assert new_branch_info.flags & FetchInfo.NEW_HEAD - - # remote branch rename ( causes creation of a new one locally ) - new_remote_branch.rename("other_branch_name") - res = fetch_and_test(remote) - other_branch_info = get_info(res, remote, new_remote_branch) - assert other_branch_info.ref.commit == new_branch_info.ref.commit - - # remove new branch - Head.delete(new_remote_branch.repo, new_remote_branch) - res = fetch_and_test(remote) - # deleted remote will not be fetched - self.failUnlessRaises(IndexError, get_info, res, remote, new_remote_branch) - - # prune stale tracking branches - stale_refs = remote.stale_refs - assert len(stale_refs) == 2 and isinstance(stale_refs[0], RemoteReference) - RemoteReference.delete(rw_repo, *stale_refs) - - # test single branch fetch with refspec including target remote - res = fetch_and_test(remote, refspec="master:refs/remotes/%s/master"%remote) - assert len(res) == 1 and get_info(res, remote, 'master') - - # ... with respec and no target - res = fetch_and_test(remote, refspec='master') - assert len(res) == 1 - - # add new tag reference - rtag = TagReference.create(remote_repo, "1.0-RV_hello.there") - res = fetch_and_test(remote, tags=True) - tinfo = res[str(rtag)] - assert isinstance(tinfo.ref, TagReference) and tinfo.ref.commit == rtag.commit - assert tinfo.flags & tinfo.NEW_TAG - - # adjust tag commit - Reference._set_object(rtag, rhead.commit.parents[0].parents[0]) - res = fetch_and_test(remote, tags=True) - tinfo = res[str(rtag)] - assert tinfo.commit == rtag.commit - assert tinfo.flags & tinfo.TAG_UPDATE - - # delete remote tag - local one will stay - TagReference.delete(remote_repo, rtag) - res = fetch_and_test(remote, tags=True) - self.failUnlessRaises(IndexError, get_info, res, remote, str(rtag)) - - # provoke to receive actual objects to see what kind of output we have to - # expect. For that we need a remote transport protocol - # Create a new UN-shared repo and fetch into it after we pushed a change - # to the shared repo - other_repo_dir = tempfile.mktemp("other_repo") - # must clone with a local path for the repo implementation not to freak out - # as it wants local paths only ( which I can understand ) - other_repo = remote_repo.clone(other_repo_dir, shared=False) - remote_repo_url = "git://localhost%s"%remote_repo.git_dir - - # put origin to git-url - other_origin = other_repo.remotes.origin - other_origin.config_writer.set("url", remote_repo_url) - # it automatically creates alternates as remote_repo is shared as well. - # It will use the transport though and ignore alternates when fetching - # assert not other_repo.alternates # this would fail - - # assure we are in the right state - rw_repo.head.reset(remote.refs.master, working_tree=True) - try: - self._commit_random_file(rw_repo) - remote.push(rw_repo.head.reference) - - # here I would expect to see remote-information about packing - # objects and so on. Unfortunately, this does not happen - # if we are redirecting the output - git explicitly checks for this - # and only provides progress information to ttys - res = fetch_and_test(other_origin) - finally: - shutil.rmtree(other_repo_dir) - # END test and cleanup - - def _test_push_and_pull(self,remote, rw_repo, remote_repo): - # push our changes - lhead = rw_repo.head - lindex = rw_repo.index - # assure we are on master and it is checked out where the remote is - try: - lhead.reference = rw_repo.heads.master - except AttributeError: - # if the author is on a non-master branch, the clones might not have - # a local master yet. We simply create it - lhead.reference = rw_repo.create_head('master') - # END master handling - lhead.reset(remote.refs.master, working_tree=True) - - # push without spec should fail ( without further configuration ) - # well, works nicely - # self.failUnlessRaises(GitCommandError, remote.push) - - # simple file push - self._commit_random_file(rw_repo) - progress = TestRemoteProgress() - res = remote.push(lhead.reference, progress) - assert isinstance(res, IterableList) - self._do_test_push_result(res, remote) - progress.make_assertion() - - # rejected - undo last commit - lhead.reset("HEAD~1") - res = remote.push(lhead.reference) - assert res[0].flags & PushInfo.ERROR - assert res[0].flags & PushInfo.REJECTED - self._do_test_push_result(res, remote) - - # force rejected pull - res = remote.push('+%s' % lhead.reference) - assert res[0].flags & PushInfo.ERROR == 0 - assert res[0].flags & PushInfo.FORCED_UPDATE - self._do_test_push_result(res, remote) - - # invalid refspec - res = remote.push("hellothere") - assert len(res) == 0 - - # push new tags - progress = TestRemoteProgress() - to_be_updated = "my_tag.1.0RV" - new_tag = TagReference.create(rw_repo, to_be_updated) - other_tag = TagReference.create(rw_repo, "my_obj_tag.2.1aRV", message="my message") - res = remote.push(progress=progress, tags=True) - assert res[-1].flags & PushInfo.NEW_TAG - progress.make_assertion() - self._do_test_push_result(res, remote) - - # update push new tags - # Rejection is default - new_tag = TagReference.create(rw_repo, to_be_updated, ref='HEAD~1', force=True) - res = remote.push(tags=True) - self._do_test_push_result(res, remote) - assert res[-1].flags & PushInfo.REJECTED and res[-1].flags & PushInfo.ERROR - - # push force this tag - res = remote.push("+%s" % new_tag.path) - assert res[-1].flags & PushInfo.ERROR == 0 and res[-1].flags & PushInfo.FORCED_UPDATE - - # delete tag - have to do it using refspec - res = remote.push(":%s" % new_tag.path) - self._do_test_push_result(res, remote) - assert res[0].flags & PushInfo.DELETED - # Currently progress is not properly transferred, especially not using - # the git daemon - # progress.assert_received_message() - - # push new branch - new_head = Head.create(rw_repo, "my_new_branch") - progress = TestRemoteProgress() - res = remote.push(new_head, progress) - assert res[0].flags & PushInfo.NEW_HEAD - progress.make_assertion() - self._do_test_push_result(res, remote) - - # delete new branch on the remote end and locally - res = remote.push(":%s" % new_head.path) - self._do_test_push_result(res, remote) - Head.delete(rw_repo, new_head) - assert res[-1].flags & PushInfo.DELETED - - # --all - res = remote.push(all=True) - self._do_test_push_result(res, remote) - - remote.pull('master') - - # cleanup - delete created tags and branches as we are in an innerloop on - # the same repository - TagReference.delete(rw_repo, new_tag, other_tag) - remote.push(":%s" % other_tag.path) - - @with_rw_and_rw_remote_repo('0.1.6') - def test_base(self, rw_repo, remote_repo): - num_remotes = 0 - remote_set = set() - ran_fetch_test = False - - for remote in rw_repo.remotes: - num_remotes += 1 - assert remote == remote - assert str(remote) != repr(remote) - remote_set.add(remote) - remote_set.add(remote) # should already exist - - # REFS - refs = remote.refs - assert refs - for ref in refs: - assert ref.remote_name == remote.name - assert ref.remote_head - # END for each ref - - # OPTIONS - # cannot use 'fetch' key anymore as it is now a method - for opt in ("url", ): - val = getattr(remote, opt) - reader = remote.config_reader - assert reader.get(opt) == val - assert reader.get_value(opt, None) == val - - # unable to write with a reader - self.failUnlessRaises(IOError, reader.set, opt, "test") - - # change value - writer = remote.config_writer - new_val = "myval" - writer.set(opt, new_val) - assert writer.get(opt) == new_val - writer.set(opt, val) - assert writer.get(opt) == val - del(writer) - assert getattr(remote, opt) == val - # END for each default option key - - # RENAME - other_name = "totally_other_name" - prev_name = remote.name - assert remote.rename(other_name) == remote - assert prev_name != remote.name - # multiple times - for time in range(2): - assert remote.rename(prev_name).name == prev_name - # END for each rename ( back to prev_name ) - - # PUSH/PULL TESTING - self._test_push_and_pull(remote, rw_repo, remote_repo) - - # FETCH TESTING - # Only for remotes - local cases are the same or less complicated - # as additional progress information will never be emitted - if remote.name == "daemon_origin": - self._do_test_fetch(remote, rw_repo, remote_repo) - ran_fetch_test = True - # END fetch test - - remote.update() - # END for each remote - - assert ran_fetch_test - assert num_remotes - assert num_remotes == len(remote_set) - - origin = rw_repo.remote('origin') - assert origin == rw_repo.remotes.origin - - @with_rw_repo('HEAD', bare=True) - def test_creation_and_removal(self, bare_rw_repo): - new_name = "test_new_one" - arg_list = (new_name, "git@server:hello.git") - remote = Remote.create(bare_rw_repo, *arg_list ) - assert remote.name == "test_new_one" - assert remote in bare_rw_repo.remotes - - # create same one again - self.failUnlessRaises(GitCommandError, Remote.create, bare_rw_repo, *arg_list) - - Remote.remove(bare_rw_repo, new_name) - - for remote in bare_rw_repo.remotes: - if remote.name == new_name: - raise AssertionError("Remote removal failed") - # END if deleted remote matches existing remote's name - # END for each remote - - - diff --git a/test/git/test_repo.py b/test/git/test_repo.py deleted file mode 100644 index 62b4c476..00000000 --- a/test/git/test_repo.py +++ /dev/null @@ -1,580 +0,0 @@ -# test_repo.py -# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors -# -# This module is part of GitPython and is released under -# the BSD License: http://www.opensource.org/licenses/bsd-license.php -from test.testlib import * -from git import * -from git.util import join_path_native -from git.exc import BadObject -from gitdb.util import hex_to_bin, bin_to_hex - -import os, sys -import tempfile -import shutil -from cStringIO import StringIO - - -class TestRepo(TestBase): - - @raises(InvalidGitRepositoryError) - def test_new_should_raise_on_invalid_repo_location(self): - Repo(tempfile.gettempdir()) - - @raises(NoSuchPathError) - def test_new_should_raise_on_non_existant_path(self): - Repo("repos/foobar") - - def test_repo_creation_from_different_paths(self): - r_from_gitdir = Repo(self.rorepo.git_dir) - assert r_from_gitdir.git_dir == self.rorepo.git_dir - assert r_from_gitdir.git_dir.endswith('.git') - assert not self.rorepo.git.working_dir.endswith('.git') - assert r_from_gitdir.git.working_dir == self.rorepo.git.working_dir - - def test_description(self): - txt = "Test repository" - self.rorepo.description = txt - assert_equal(self.rorepo.description, txt) - - def test_heads_should_return_array_of_head_objects(self): - for head in self.rorepo.heads: - assert_equal(Head, head.__class__) - - def test_heads_should_populate_head_data(self): - for head in self.rorepo.heads: - assert head.name - assert isinstance(head.commit,Commit) - # END for each head - - assert isinstance(self.rorepo.heads.master, Head) - assert isinstance(self.rorepo.heads['master'], Head) - - def test_tree_from_revision(self): - tree = self.rorepo.tree('0.1.6') - assert len(tree.hexsha) == 40 - assert tree.type == "tree" - assert self.rorepo.tree(tree) == tree - - # try from invalid revision that does not exist - self.failUnlessRaises(BadObject, self.rorepo.tree, 'hello world') - - def test_commit_from_revision(self): - commit = self.rorepo.commit('0.1.4') - assert commit.type == 'commit' - assert self.rorepo.commit(commit) == commit - - def test_commits(self): - mc = 10 - commits = list(self.rorepo.iter_commits('0.1.6', max_count=mc)) - assert len(commits) == mc - - c = commits[0] - assert_equal('9a4b1d4d11eee3c5362a4152216376e634bd14cf', c.hexsha) - assert_equal(["c76852d0bff115720af3f27acdb084c59361e5f6"], [p.hexsha for p in c.parents]) - assert_equal("ce41fc29549042f1aa09cc03174896cf23f112e3", c.tree.hexsha) - assert_equal("Michael Trier", c.author.name) - assert_equal("mtrier@gmail.com", c.author.email) - assert_equal(1232829715, c.authored_date) - assert_equal(5*3600, c.author_tz_offset) - assert_equal("Michael Trier", c.committer.name) - assert_equal("mtrier@gmail.com", c.committer.email) - assert_equal(1232829715, c.committed_date) - assert_equal(5*3600, c.committer_tz_offset) - assert_equal("Bumped version 0.1.6\n", c.message) - - c = commits[1] - assert isinstance(c.parents, tuple) - - def test_trees(self): - mc = 30 - num_trees = 0 - for tree in self.rorepo.iter_trees('0.1.5', max_count=mc): - num_trees += 1 - assert isinstance(tree, Tree) - # END for each tree - assert num_trees == mc - - - def _assert_empty_repo(self, repo): - # test all kinds of things with an empty, freshly initialized repo. - # It should throw good errors - - # entries should be empty - assert len(repo.index.entries) == 0 - - # head is accessible - assert repo.head - assert repo.head.ref - assert not repo.head.is_valid() - - # we can change the head to some other ref - head_ref = Head.from_path(repo, Head.to_full_path('some_head')) - assert not head_ref.is_valid() - repo.head.ref = head_ref - - # is_dirty can handle all kwargs - for args in ((1, 0, 0), (0, 1, 0), (0, 0, 1)): - assert not repo.is_dirty(*args) - # END for each arg - - # we can add a file to the index ( if we are not bare ) - if not repo.bare: - pass - # END test repos with working tree - - - def test_init(self): - prev_cwd = os.getcwd() - os.chdir(tempfile.gettempdir()) - git_dir_rela = "repos/foo/bar.git" - del_dir_abs = os.path.abspath("repos") - git_dir_abs = os.path.abspath(git_dir_rela) - try: - # with specific path - for path in (git_dir_rela, git_dir_abs): - r = Repo.init(path=path, bare=True) - assert isinstance(r, Repo) - assert r.bare == True - assert os.path.isdir(r.git_dir) - - self._assert_empty_repo(r) - - # test clone - clone_path = path + "_clone" - rc = r.clone(clone_path) - self._assert_empty_repo(rc) - - - try: - shutil.rmtree(clone_path) - except OSError: - # when relative paths are used, the clone may actually be inside - # of the parent directory - pass - # END exception handling - - # try again, this time with the absolute version - rc = Repo.clone_from(r.git_dir, clone_path) - self._assert_empty_repo(rc) - - shutil.rmtree(git_dir_abs) - try: - shutil.rmtree(clone_path) - except OSError: - # when relative paths are used, the clone may actually be inside - # of the parent directory - pass - # END exception handling - - # END for each path - - os.makedirs(git_dir_rela) - os.chdir(git_dir_rela) - r = Repo.init(bare=False) - r.bare == False - - self._assert_empty_repo(r) - finally: - try: - shutil.rmtree(del_dir_abs) - except OSError: - pass - os.chdir(prev_cwd) - # END restore previous state - - def test_bare_property(self): - self.rorepo.bare - - def test_daemon_export(self): - orig_val = self.rorepo.daemon_export - self.rorepo.daemon_export = not orig_val - assert self.rorepo.daemon_export == ( not orig_val ) - self.rorepo.daemon_export = orig_val - assert self.rorepo.daemon_export == orig_val - - def test_alternates(self): - cur_alternates = self.rorepo.alternates - # empty alternates - self.rorepo.alternates = [] - assert self.rorepo.alternates == [] - alts = [ "other/location", "this/location" ] - self.rorepo.alternates = alts - assert alts == self.rorepo.alternates - self.rorepo.alternates = cur_alternates - - def test_repr(self): - path = os.path.join(os.path.abspath(GIT_REPO), '.git') - assert_equal('<git.Repo "%s">' % path, repr(self.rorepo)) - - def test_is_dirty_with_bare_repository(self): - orig_value = self.rorepo._bare - self.rorepo._bare = True - assert_false(self.rorepo.is_dirty()) - self.rorepo._bare = orig_value - - def test_is_dirty(self): - self.rorepo._bare = False - for index in (0,1): - for working_tree in (0,1): - for untracked_files in (0,1): - assert self.rorepo.is_dirty(index, working_tree, untracked_files) in (True, False) - # END untracked files - # END working tree - # END index - orig_val = self.rorepo._bare - self.rorepo._bare = True - assert self.rorepo.is_dirty() == False - self.rorepo._bare = orig_val - - def test_head(self): - assert self.rorepo.head.reference.object == self.rorepo.active_branch.object - - def test_index(self): - index = self.rorepo.index - assert isinstance(index, IndexFile) - - def test_tag(self): - assert self.rorepo.tag('refs/tags/0.1.5').commit - - def test_archive(self): - tmpfile = os.tmpfile() - self.rorepo.archive(tmpfile, '0.1.5') - assert tmpfile.tell() - - @patch_object(Git, '_call_process') - def test_should_display_blame_information(self, git): - git.return_value = fixture('blame') - b = self.rorepo.blame( 'master', 'lib/git.py') - assert_equal(13, len(b)) - assert_equal( 2, len(b[0]) ) - # assert_equal(25, reduce(lambda acc, x: acc + len(x[-1]), b)) - assert_equal(hash(b[0][0]), hash(b[9][0])) - c = b[0][0] - assert_true(git.called) - assert_equal(git.call_args, (('blame', 'master', '--', 'lib/git.py'), {'p': True})) - - assert_equal('634396b2f541a9f2d58b00be1a07f0c358b999b3', c.hexsha) - assert_equal('Tom Preston-Werner', c.author.name) - assert_equal('tom@mojombo.com', c.author.email) - assert_equal(1191997100, c.authored_date) - assert_equal('Tom Preston-Werner', c.committer.name) - assert_equal('tom@mojombo.com', c.committer.email) - assert_equal(1191997100, c.committed_date) - assert_equal('initial grit setup', c.message) - - # test the 'lines per commit' entries - tlist = b[0][1] - assert_true( tlist ) - assert_true( isinstance( tlist[0], basestring ) ) - assert_true( len( tlist ) < sum( len(t) for t in tlist ) ) # test for single-char bug - - def test_untracked_files(self): - base = self.rorepo.working_tree_dir - files = ( join_path_native(base, "__test_myfile"), - join_path_native(base, "__test_other_file") ) - num_recently_untracked = 0 - try: - for fpath in files: - fd = open(fpath,"wb") - fd.close() - # END for each filename - untracked_files = self.rorepo.untracked_files - num_recently_untracked = len(untracked_files) - - # assure we have all names - they are relative to the git-dir - num_test_untracked = 0 - for utfile in untracked_files: - num_test_untracked += join_path_native(base, utfile) in files - assert len(files) == num_test_untracked - finally: - for fpath in files: - if os.path.isfile(fpath): - os.remove(fpath) - # END handle files - - assert len(self.rorepo.untracked_files) == (num_recently_untracked - len(files)) - - def test_config_reader(self): - reader = self.rorepo.config_reader() # all config files - assert reader.read_only - reader = self.rorepo.config_reader("repository") # single config file - assert reader.read_only - - def test_config_writer(self): - for config_level in self.rorepo.config_level: - try: - writer = self.rorepo.config_writer(config_level) - assert not writer.read_only - except IOError: - # its okay not to get a writer for some configuration files if we - # have no permissions - pass - # END for each config level - - def test_creation_deletion(self): - # just a very quick test to assure it generally works. There are - # specialized cases in the test_refs module - head = self.rorepo.create_head("new_head", "HEAD~1") - self.rorepo.delete_head(head) - - tag = self.rorepo.create_tag("new_tag", "HEAD~2") - self.rorepo.delete_tag(tag) - self.rorepo.config_writer() - remote = self.rorepo.create_remote("new_remote", "git@server:repo.git") - self.rorepo.delete_remote(remote) - - def test_comparison_and_hash(self): - # this is only a preliminary test, more testing done in test_index - assert self.rorepo == self.rorepo and not (self.rorepo != self.rorepo) - assert len(set((self.rorepo, self.rorepo))) == 1 - - def test_git_cmd(self): - # test CatFileContentStream, just to be very sure we have no fencepost errors - # last \n is the terminating newline that it expects - l1 = "0123456789\n" - l2 = "abcdefghijklmnopqrstxy\n" - l3 = "z\n" - d = "%s%s%s\n" % (l1, l2, l3) - - l1p = l1[:5] - - # full size - # size is without terminating newline - def mkfull(): - return Git.CatFileContentStream(len(d)-1, StringIO(d)) - - ts = 5 - def mktiny(): - return Git.CatFileContentStream(ts, StringIO(d)) - - # readlines no limit - s = mkfull() - lines = s.readlines() - assert len(lines) == 3 and lines[-1].endswith('\n') - assert s._stream.tell() == len(d) # must have scrubbed to the end - - # realines line limit - s = mkfull() - lines = s.readlines(5) - assert len(lines) == 1 - - # readlines on tiny sections - s = mktiny() - lines = s.readlines() - assert len(lines) == 1 and lines[0] == l1p - assert s._stream.tell() == ts+1 - - # readline no limit - s = mkfull() - assert s.readline() == l1 - assert s.readline() == l2 - assert s.readline() == l3 - assert s.readline() == '' - assert s._stream.tell() == len(d) - - # readline limit - s = mkfull() - assert s.readline(5) == l1p - assert s.readline() == l1[5:] - - # readline on tiny section - s = mktiny() - assert s.readline() == l1p - assert s.readline() == '' - assert s._stream.tell() == ts+1 - - # read no limit - s = mkfull() - assert s.read() == d[:-1] - assert s.read() == '' - assert s._stream.tell() == len(d) - - # read limit - s = mkfull() - assert s.read(5) == l1p - assert s.read(6) == l1[5:] - assert s._stream.tell() == 5 + 6 # its not yet done - - # read tiny - s = mktiny() - assert s.read(2) == l1[:2] - assert s._stream.tell() == 2 - assert s.read() == l1[2:ts] - assert s._stream.tell() == ts+1 - - def _assert_rev_parse_types(self, name, rev_obj): - rev_parse = self.rorepo.rev_parse - - if rev_obj.type == 'tag': - rev_obj = rev_obj.object - - # tree and blob type - obj = rev_parse(name + '^{tree}') - assert obj == rev_obj.tree - - obj = rev_parse(name + ':CHANGES') - assert obj.type == 'blob' and obj.path == 'CHANGES' - assert rev_obj.tree['CHANGES'] == obj - - - def _assert_rev_parse(self, name): - """tries multiple different rev-parse syntaxes with the given name - :return: parsed object""" - rev_parse = self.rorepo.rev_parse - orig_obj = rev_parse(name) - if orig_obj.type == 'tag': - obj = orig_obj.object - else: - obj = orig_obj - # END deref tags by default - - # try history - rev = name + "~" - obj2 = rev_parse(rev) - assert obj2 == obj.parents[0] - self._assert_rev_parse_types(rev, obj2) - - # history with number - ni = 11 - history = [obj.parents[0]] - for pn in range(ni): - history.append(history[-1].parents[0]) - # END get given amount of commits - - for pn in range(11): - rev = name + "~%i" % (pn+1) - obj2 = rev_parse(rev) - assert obj2 == history[pn] - self._assert_rev_parse_types(rev, obj2) - # END history check - - # parent ( default ) - rev = name + "^" - obj2 = rev_parse(rev) - assert obj2 == obj.parents[0] - self._assert_rev_parse_types(rev, obj2) - - # parent with number - for pn, parent in enumerate(obj.parents): - rev = name + "^%i" % (pn+1) - assert rev_parse(rev) == parent - self._assert_rev_parse_types(rev, parent) - # END for each parent - - return orig_obj - - def test_rev_parse(self): - rev_parse = self.rorepo.rev_parse - - # try special case: This one failed beforehand - assert rev_parse("33ebe").hexsha == "33ebe7acec14b25c5f84f35a664803fcab2f7781" - - # start from reference - num_resolved = 0 - for ref in Reference.iter_items(self.rorepo): - path_tokens = ref.path.split("/") - for pt in range(len(path_tokens)): - path_section = '/'.join(path_tokens[-(pt+1):]) - try: - obj = self._assert_rev_parse(path_section) - assert obj.type == ref.object.type - num_resolved += 1 - except BadObject: - print "failed on %s" % path_section - # is fine, in case we have something like 112, which belongs to remotes/rname/merge-requests/112 - pass - # END exception handling - # END for each token - # END for each reference - assert num_resolved - - # it works with tags ! - tag = self._assert_rev_parse('0.1.4') - assert tag.type == 'tag' - - # try full sha directly ( including type conversion ) - assert tag.object == rev_parse(tag.object.hexsha) - self._assert_rev_parse_types(tag.object.hexsha, tag.object) - - - # multiple tree types result in the same tree: HEAD^{tree}^{tree}:CHANGES - rev = '0.1.4^{tree}^{tree}' - assert rev_parse(rev) == tag.object.tree - assert rev_parse(rev+':CHANGES') == tag.object.tree['CHANGES'] - - - # try to get parents from first revision - it should fail as no such revision - # exists - first_rev = "33ebe7acec14b25c5f84f35a664803fcab2f7781" - commit = rev_parse(first_rev) - assert len(commit.parents) == 0 - assert commit.hexsha == first_rev - self.failUnlessRaises(BadObject, rev_parse, first_rev+"~") - self.failUnlessRaises(BadObject, rev_parse, first_rev+"^") - - # short SHA1 - commit2 = rev_parse(first_rev[:20]) - assert commit2 == commit - commit2 = rev_parse(first_rev[:5]) - assert commit2 == commit - - - # todo: dereference tag into a blob 0.1.7^{blob} - quite a special one - # needs a tag which points to a blob - - - # ref^0 returns commit being pointed to, same with ref~0, and ^{} - tag = rev_parse('0.1.4') - for token in (('~0', '^0', '^{}')): - assert tag.object == rev_parse('0.1.4%s' % token) - # END handle multiple tokens - - # try partial parsing - max_items = 40 - for i, binsha in enumerate(self.rorepo.odb.sha_iter()): - assert rev_parse(bin_to_hex(binsha)[:8-(i%2)]).binsha == binsha - if i > max_items: - # this is rather slow currently, as rev_parse returns an object - # which requires accessing packs, it has some additional overhead - break - # END for each binsha in repo - - # missing closing brace commit^{tree - self.failUnlessRaises(ValueError, rev_parse, '0.1.4^{tree') - - # missing starting brace - self.failUnlessRaises(ValueError, rev_parse, '0.1.4^tree}') - - - # cannot handle rev-log for now - self.failUnlessRaises(ValueError, rev_parse, "hi@there") - - def test_repo_odbtype(self): - target_type = GitDB - if sys.version_info[1] < 5: - target_type = GitCmdObjectDB - assert isinstance(self.rorepo.odb, target_type) - - def test_submodules(self): - assert len(self.rorepo.submodules) == 1 # non-recursive - assert len(list(self.rorepo.iter_submodules())) == 2 - - assert isinstance(self.rorepo.submodule("gitdb"), Submodule) - self.failUnlessRaises(ValueError, self.rorepo.submodule, "doesn't exist") - - @with_rw_repo('HEAD', bare=False) - def test_submodule_update(self, rwrepo): - # fails in bare mode - rwrepo._bare = True - self.failUnlessRaises(InvalidGitRepositoryError, rwrepo.submodule_update) - rwrepo._bare = False - - # test create submodule - sm = rwrepo.submodules[0] - sm = rwrepo.create_submodule("my_new_sub", "some_path", join_path_native(self.rorepo.working_tree_dir, sm.path)) - assert isinstance(sm, Submodule) - - # note: the rest of this functionality is tested in test_submodule - - diff --git a/test/git/test_stats.py b/test/git/test_stats.py deleted file mode 100644 index cc30bf92..00000000 --- a/test/git/test_stats.py +++ /dev/null @@ -1,25 +0,0 @@ -# test_stats.py -# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors -# -# This module is part of GitPython and is released under -# the BSD License: http://www.opensource.org/licenses/bsd-license.php - -from test.testlib import * -from git import * - -class TestStats(TestBase): - - def test__list_from_string(self): - output = fixture('diff_numstat') - stats = Stats._list_from_string(self.rorepo, output) - - assert_equal(2, stats.total['files']) - assert_equal(52, stats.total['lines']) - assert_equal(29, stats.total['insertions']) - assert_equal(23, stats.total['deletions']) - - assert_equal(29, stats.files["a.txt"]['insertions']) - assert_equal(18, stats.files["a.txt"]['deletions']) - - assert_equal(0, stats.files["b.txt"]['insertions']) - assert_equal(5, stats.files["b.txt"]['deletions']) diff --git a/test/git/test_submodule.py b/test/git/test_submodule.py deleted file mode 100644 index d922f32a..00000000 --- a/test/git/test_submodule.py +++ /dev/null @@ -1,496 +0,0 @@ -# This module is part of GitPython and is released under -# the BSD License: http://www.opensource.org/licenses/bsd-license.php - -from test.testlib import * -from git.exc import * -from git.objects.submodule.base import Submodule -from git.objects.submodule.root import RootModule -from git.util import to_native_path_linux, join_path_native -import shutil -import git -import os - -class TestSubmodule(TestBase): - - k_subm_current = "45c0f285a6d9d9214f8167742d12af2855f527fb" - k_subm_changed = "394ed7006ee5dc8bddfd132b64001d5dfc0ffdd3" - k_no_subm_tag = "0.1.6" - - - def _do_base_tests(self, rwrepo): - """Perform all tests in the given repository, it may be bare or nonbare""" - # manual instantiation - smm = Submodule(rwrepo, "\0"*20) - # name needs to be set in advance - self.failUnlessRaises(AttributeError, getattr, smm, 'name') - - # iterate - 1 submodule - sms = Submodule.list_items(rwrepo, self.k_subm_current) - assert len(sms) == 1 - sm = sms[0] - - # at a different time, there is None - assert len(Submodule.list_items(rwrepo, self.k_no_subm_tag)) == 0 - - assert sm.path == 'lib/git/ext/gitdb' - assert sm.path != sm.name # in our case, we have ids there, which don't equal the path - assert sm.url == 'git://gitorious.org/git-python/gitdb.git' - assert sm.branch_path == 'refs/heads/master' # the default ... - assert sm.branch_name == 'master' - assert sm.parent_commit == rwrepo.head.commit - # size is always 0 - assert sm.size == 0 - # the module is not checked-out yet - self.failUnlessRaises(InvalidGitRepositoryError, sm.module) - - # which is why we can't get the branch either - it points into the module() repository - self.failUnlessRaises(InvalidGitRepositoryError, getattr, sm, 'branch') - - # branch_path works, as its just a string - assert isinstance(sm.branch_path, basestring) - - # some commits earlier we still have a submodule, but its at a different commit - smold = Submodule.iter_items(rwrepo, self.k_subm_changed).next() - assert smold.binsha != sm.binsha - assert smold != sm # the name changed - - # force it to reread its information - del(smold._url) - smold.url == sm.url - - # test config_reader/writer methods - sm.config_reader() - new_smclone_path = None # keep custom paths for later - new_csmclone_path = None # - if rwrepo.bare: - self.failUnlessRaises(InvalidGitRepositoryError, sm.config_writer) - else: - writer = sm.config_writer() - # for faster checkout, set the url to the local path - new_smclone_path = to_native_path_linux(join_path_native(self.rorepo.working_tree_dir, sm.path)) - writer.set_value('url', new_smclone_path) - del(writer) - assert sm.config_reader().get_value('url') == new_smclone_path - assert sm.url == new_smclone_path - # END handle bare repo - smold.config_reader() - - # cannot get a writer on historical submodules - if not rwrepo.bare: - self.failUnlessRaises(ValueError, smold.config_writer) - # END handle bare repo - - # make the old into a new - this doesn't work as the name changed - prev_parent_commit = smold.parent_commit - self.failUnlessRaises(ValueError, smold.set_parent_commit, self.k_subm_current) - # the sha is properly updated - smold.set_parent_commit(self.k_subm_changed+"~1") - assert smold.binsha != sm.binsha - - # raises if the sm didn't exist in new parent - it keeps its - # parent_commit unchanged - self.failUnlessRaises(ValueError, smold.set_parent_commit, self.k_no_subm_tag) - - # TEST TODO: if a path in the gitmodules file, but not in the index, it raises - - # TEST UPDATE - ############## - # module retrieval is not always possible - if rwrepo.bare: - self.failUnlessRaises(InvalidGitRepositoryError, sm.module) - self.failUnlessRaises(InvalidGitRepositoryError, sm.remove) - self.failUnlessRaises(InvalidGitRepositoryError, sm.add, rwrepo, 'here', 'there') - else: - # its not checked out in our case - self.failUnlessRaises(InvalidGitRepositoryError, sm.module) - assert not sm.module_exists() - - # currently there is only one submodule - assert len(list(rwrepo.iter_submodules())) == 1 - - # TEST ADD - ########### - # preliminary tests - # adding existing returns exactly the existing - sma = Submodule.add(rwrepo, sm.name, sm.path) - assert sma.path == sm.path - - # no url and no module at path fails - self.failUnlessRaises(ValueError, Submodule.add, rwrepo, "newsubm", "pathtorepo", url=None) - - # CONTINUE UPDATE - ################# - - # lets update it - its a recursive one too - newdir = os.path.join(sm.abspath, 'dir') - os.makedirs(newdir) - - # update fails if the path already exists non-empty - self.failUnlessRaises(OSError, sm.update) - os.rmdir(newdir) - - assert sm.update() is sm - sm_repopath = sm.path # cache for later - assert sm.module_exists() - assert isinstance(sm.module(), git.Repo) - assert sm.module().working_tree_dir == sm.abspath - - # INTERLEAVE ADD TEST - ##################### - # url must match the one in the existing repository ( if submodule name suggests a new one ) - # or we raise - self.failUnlessRaises(ValueError, Submodule.add, rwrepo, "newsubm", sm.path, "git://someurl/repo.git") - - - # CONTINUE UPDATE - ################# - # we should have setup a tracking branch, which is also active - assert sm.module().head.ref.tracking_branch() is not None - - # delete the whole directory and re-initialize - shutil.rmtree(sm.abspath) - sm.update(recursive=False) - assert len(list(rwrepo.iter_submodules())) == 2 - assert len(sm.children()) == 1 # its not checked out yet - csm = sm.children()[0] - assert not csm.module_exists() - csm_repopath = csm.path - - # adjust the path of the submodules module to point to the local destination - new_csmclone_path = to_native_path_linux(join_path_native(self.rorepo.working_tree_dir, sm.path, csm.path)) - csm.config_writer().set_value('url', new_csmclone_path) - assert csm.url == new_csmclone_path - - # update recuesively again - sm.update(recursive=True) - - # tracking branch once again - csm.module().head.ref.tracking_branch() is not None - - # this flushed in a sub-submodule - assert len(list(rwrepo.iter_submodules())) == 2 - - - # reset both heads to the previous version, verify that to_latest_revision works - smods = (sm.module(), csm.module()) - for repo in smods: - repo.head.reset('HEAD~1', working_tree=1) - # END for each repo to reset - - sm.update(recursive=True, to_latest_revision=True) - for repo in smods: - assert repo.head.commit == repo.head.ref.tracking_branch().commit - # END for each repo to check - del(smods) - - # if the head is detached, it still works ( but warns ) - smref = sm.module().head.ref - sm.module().head.ref = 'HEAD~1' - # if there is no tracking branch, we get a warning as well - csm_tracking_branch = csm.module().head.ref.tracking_branch() - csm.module().head.ref.set_tracking_branch(None) - sm.update(recursive=True, to_latest_revision=True) - - # to_latest_revision changes the child submodule's commit, it needs an - # update now - csm.set_parent_commit(csm.repo.head.commit) - - # undo the changes - sm.module().head.ref = smref - csm.module().head.ref.set_tracking_branch(csm_tracking_branch) - - # REMOVAL OF REPOSITOTRY - ######################## - # must delete something - self.failUnlessRaises(ValueError, csm.remove, module=False, configuration=False) - # We have modified the configuration, hence the index is dirty, and the - # deletion will fail - # NOTE: As we did a few updates in the meanwhile, the indices were reset - # Hence we create some changes - sm.config_writer().set_value("somekey", "somevalue") - csm.config_writer().set_value("okey", "ovalue") - self.failUnlessRaises(InvalidGitRepositoryError, sm.remove) - # if we remove the dirty index, it would work - sm.module().index.reset() - # still, we have the file modified - self.failUnlessRaises(InvalidGitRepositoryError, sm.remove, dry_run=True) - sm.module().index.reset(working_tree=True) - - # this would work - assert sm.remove(dry_run=True) is sm - assert sm.module_exists() - sm.remove(force=True, dry_run=True) - assert sm.module_exists() - - # but ... we have untracked files in the child submodule - fn = join_path_native(csm.module().working_tree_dir, "newfile") - open(fn, 'w').write("hi") - self.failUnlessRaises(InvalidGitRepositoryError, sm.remove) - - # forcibly delete the child repository - assert csm.remove(force=True) is csm - assert not csm.exists() - assert not csm.module_exists() - assert len(sm.children()) == 0 - # now we have a changed index, as configuration was altered. - # fix this - sm.module().index.reset(working_tree=True) - - # now delete only the module of the main submodule - assert sm.module_exists() - sm.remove(configuration=False) - assert sm.exists() - assert not sm.module_exists() - assert sm.config_reader().get_value('url') - - # delete the rest - sm.remove() - assert not sm.exists() - assert not sm.module_exists() - - assert len(rwrepo.submodules) == 0 - - # ADD NEW SUBMODULE - ################### - # add a simple remote repo - trailing slashes are no problem - smid = "newsub" - osmid = "othersub" - nsm = Submodule.add(rwrepo, smid, sm_repopath, new_smclone_path+"/", None, no_checkout=True) - assert nsm.name == smid - assert nsm.module_exists() - assert nsm.exists() - # its not checked out - assert not os.path.isfile(join_path_native(nsm.module().working_tree_dir, Submodule.k_modules_file)) - assert len(rwrepo.submodules) == 1 - - # add another submodule, but into the root, not as submodule - osm = Submodule.add(rwrepo, osmid, csm_repopath, new_csmclone_path, Submodule.k_head_default) - assert osm != nsm - assert osm.module_exists() - assert osm.exists() - assert os.path.isfile(join_path_native(osm.module().working_tree_dir, 'setup.py')) - - assert len(rwrepo.submodules) == 2 - - # commit the changes, just to finalize the operation - rwrepo.index.commit("my submod commit") - assert len(rwrepo.submodules) == 2 - - # needs update as the head changed, it thinks its in the history - # of the repo otherwise - nsm.set_parent_commit(rwrepo.head.commit) - osm.set_parent_commit(rwrepo.head.commit) - - # MOVE MODULE - ############# - # invalid inptu - self.failUnlessRaises(ValueError, nsm.move, 'doesntmatter', module=False, configuration=False) - - # renaming to the same path does nothing - assert nsm.move(sm.path) is nsm - - # rename a module - nmp = join_path_native("new", "module", "dir") + "/" # new module path - pmp = nsm.path - abspmp = nsm.abspath - assert nsm.move(nmp) is nsm - nmp = nmp[:-1] # cut last / - assert nsm.path == nmp - assert rwrepo.submodules[0].path == nmp - - # move it back - but there is a file now - this doesn't work - # as the empty directories where removed. - self.failUnlessRaises(IOError, open, abspmp, 'w') - - mpath = 'newsubmodule' - absmpath = join_path_native(rwrepo.working_tree_dir, mpath) - open(absmpath, 'w').write('') - self.failUnlessRaises(ValueError, nsm.move, mpath) - os.remove(absmpath) - - # now it works, as we just move it back - nsm.move(pmp) - assert nsm.path == pmp - assert rwrepo.submodules[0].path == pmp - - # TODO lowprio: test remaining exceptions ... for now its okay, the code looks right - - # REMOVE 'EM ALL - ################ - # if a submodule's repo has no remotes, it can't be added without an explicit url - osmod = osm.module() - - osm.remove(module=False) - for remote in osmod.remotes: - remote.remove(osmod, remote.name) - assert not osm.exists() - self.failUnlessRaises(ValueError, Submodule.add, rwrepo, osmid, csm_repopath, url=None) - # END handle bare mode - - # Error if there is no submodule file here - self.failUnlessRaises(IOError, Submodule._config_parser, rwrepo, rwrepo.commit(self.k_no_subm_tag), True) - - @with_rw_repo(k_subm_current) - def test_base_rw(self, rwrepo): - self._do_base_tests(rwrepo) - - @with_rw_repo(k_subm_current, bare=True) - def test_base_bare(self, rwrepo): - self._do_base_tests(rwrepo) - - @with_rw_repo(k_subm_current, bare=False) - def test_root_module(self, rwrepo): - # Can query everything without problems - rm = RootModule(self.rorepo) - assert rm.module() is self.rorepo - - # try attributes - rm.binsha - rm.mode - rm.path - assert rm.name == rm.k_root_name - assert rm.parent_commit == self.rorepo.head.commit - rm.url - rm.branch - - assert len(rm.list_items(rm.module())) == 1 - rm.config_reader() - rm.config_writer() - - # deep traversal gitdb / async - rsmsp = [sm.path for sm in rm.traverse()] - assert len(rsmsp) == 2 # gitdb and async, async being a child of gitdb - - # cannot set the parent commit as root module's path didn't exist - self.failUnlessRaises(ValueError, rm.set_parent_commit, 'HEAD') - - # TEST UPDATE - ############# - # setup commit which remove existing, add new and modify existing submodules - rm = RootModule(rwrepo) - assert len(rm.children()) == 1 - - # modify path without modifying the index entry - # ( which is what the move method would do properly ) - #================================================== - sm = rm.children()[0] - pp = "path/prefix" - fp = join_path_native(pp, sm.path) - prep = sm.path - assert not sm.module_exists() # was never updated after rwrepo's clone - - # assure we clone from a local source - sm.config_writer().set_value('url', to_native_path_linux(join_path_native(self.rorepo.working_tree_dir, sm.path))) - sm.update(recursive=False) - assert sm.module_exists() - sm.config_writer().set_value('path', fp) # change path to something with prefix AFTER url change - - # update fails as list_items in such a situations cannot work, as it cannot - # find the entry at the changed path - self.failUnlessRaises(InvalidGitRepositoryError, rm.update, recursive=False) - - # move it properly - doesn't work as it its path currently points to an indexentry - # which doesn't exist ( move it to some path, it doesn't matter here ) - self.failUnlessRaises(InvalidGitRepositoryError, sm.move, pp) - # reset the path(cache) to where it was, now it works - sm.path = prep - sm.move(fp, module=False) # leave it at the old location - - assert not sm.module_exists() - cpathchange = rwrepo.index.commit("changed sm path") # finally we can commit - - # update puts the module into place - rm.update(recursive=False) - sm.set_parent_commit(cpathchange) - assert sm.module_exists() - - # add submodule - #================ - nsmn = "newsubmodule" - nsmp = "submrepo" - async_url = to_native_path_linux(join_path_native(self.rorepo.working_tree_dir, rsmsp[0], rsmsp[1])) - nsm = Submodule.add(rwrepo, nsmn, nsmp, url=async_url) - csmadded = rwrepo.index.commit("Added submodule").hexsha # make sure we don't keep the repo reference - nsm.set_parent_commit(csmadded) - assert nsm.module_exists() - # in our case, the module should not exist, which happens if we update a parent - # repo and a new submodule comes into life - nsm.remove(configuration=False, module=True) - assert not nsm.module_exists() and nsm.exists() - - rm.update(recursive=False) - assert nsm.module_exists() - - - - # remove submodule - the previous one - #==================================== - sm.set_parent_commit(csmadded) - smp = sm.abspath - assert not sm.remove(module=False).exists() - assert os.path.isdir(smp) # module still exists - csmremoved = rwrepo.index.commit("Removed submodule") - - # an update will remove the module - rm.update(recursive=False) - assert not os.path.isdir(smp) - - - # change url - #============= - # to the first repository, this way we have a fast checkout, and a completely different - # repository at the different url - nsm.set_parent_commit(csmremoved) - nsmurl = to_native_path_linux(join_path_native(self.rorepo.working_tree_dir, rsmsp[0])) - nsm.config_writer().set_value('url', nsmurl) - csmpathchange = rwrepo.index.commit("changed url") - nsm.set_parent_commit(csmpathchange) - - prev_commit = nsm.module().head.commit - rm.update(recursive=False) - assert nsm.module().remotes.origin.url == nsmurl - # head changed, as the remote url and its commit changed - assert prev_commit != nsm.module().head.commit - - # add the submodule's changed commit to the index, which is what the - # user would do - # beforehand, update our instance's binsha with the new one - nsm.binsha = nsm.module().head.commit.binsha - rwrepo.index.add([nsm]) - - # change branch - #================= - # we only have one branch, so we switch to a virtual one, and back - # to the current one to trigger the difference - cur_branch = nsm.branch - nsmm = nsm.module() - prev_commit = nsmm.head.commit - for branch in ("some_virtual_branch", cur_branch.name): - nsm.config_writer().set_value(Submodule.k_head_option, git.Head.to_full_path(branch)) - csmbranchchange = rwrepo.index.commit("changed branch to %s" % branch) - nsm.set_parent_commit(csmbranchchange) - # END for each branch to change - - # Lets remove our tracking branch to simulate some changes - nsmmh = nsmm.head - assert nsmmh.ref.tracking_branch() is None # never set it up until now - assert not nsmmh.is_detached - - rm.update(recursive=False) - - assert nsmmh.ref.tracking_branch() is not None - assert not nsmmh.is_detached - - # recursive update - # ================= - # finally we recursively update a module, just to run the code at least once - # remove the module so that it has more work - assert len(nsm.children()) == 1 - assert nsm.exists() and nsm.module_exists() and len(nsm.children()) == 1 - # assure we pull locally only - nsmc = nsm.children()[0] - nsmc.config_writer().set_value('url', async_url) - rm.update(recursive=True) - - assert len(nsm.children()) == 1 and nsmc.module_exists() - diff --git a/test/git/test_tree.py b/test/git/test_tree.py deleted file mode 100644 index 18688424..00000000 --- a/test/git/test_tree.py +++ /dev/null @@ -1,141 +0,0 @@ -# test_tree.py -# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors -# -# This module is part of GitPython and is released under -# the BSD License: http://www.opensource.org/licenses/bsd-license.php - -import os -from test.testlib import * -from git import * -from git.objects.fun import ( - traverse_tree_recursive, - traverse_trees_recursive - ) -from cStringIO import StringIO - -class TestTree(TestBase): - - def test_serializable(self): - # tree at the given commit contains a submodule as well - roottree = self.rorepo.tree('6c1faef799095f3990e9970bc2cb10aa0221cf9c') - for item in roottree.traverse(ignore_self=False): - if item.type != Tree.type: - continue - # END skip non-trees - tree = item - orig_data = tree.data_stream.read() - orig_cache = tree._cache - - stream = StringIO() - tree._serialize(stream) - assert stream.getvalue() == orig_data - - stream.seek(0) - testtree = Tree(self.rorepo, Tree.NULL_BIN_SHA, 0, '') - testtree._deserialize(stream) - assert testtree._cache == orig_cache - - - # TEST CACHE MUTATOR - mod = testtree.cache - self.failUnlessRaises(ValueError, mod.add, "invalid sha", 0, "name") - self.failUnlessRaises(ValueError, mod.add, Tree.NULL_HEX_SHA, 0, "invalid mode") - self.failUnlessRaises(ValueError, mod.add, Tree.NULL_HEX_SHA, tree.mode, "invalid/name") - - # add new item - name = "fake_dir" - mod.add(testtree.NULL_HEX_SHA, tree.mode, name) - assert name in testtree - - # its available in the tree immediately - assert isinstance(testtree[name], Tree) - - # adding it again will not cause multiple of them to be presents - cur_count = len(testtree) - mod.add(testtree.NULL_HEX_SHA, tree.mode, name) - assert len(testtree) == cur_count - - # fails with a different sha - name exists - hexsha = "1"*40 - self.failUnlessRaises(ValueError, mod.add, hexsha, tree.mode, name) - - # force it - replace existing one - mod.add(hexsha, tree.mode, name, force=True) - assert testtree[name].hexsha == hexsha - assert len(testtree) == cur_count - - # unchecked addition always works, even with invalid items - invalid_name = "hi/there" - mod.add_unchecked(hexsha, 0, invalid_name) - assert len(testtree) == cur_count + 1 - - del(mod[invalid_name]) - assert len(testtree) == cur_count - # del again, its fine - del(mod[invalid_name]) - - # have added one item, we are done - mod.set_done() - mod.set_done() # multiple times are okay - - # serialize, its different now - stream = StringIO() - testtree._serialize(stream) - stream.seek(0) - assert stream.getvalue() != orig_data - - # replaces cache, but we make sure of it - del(testtree._cache) - testtree._deserialize(stream) - assert name in testtree - assert invalid_name not in testtree - # END for each item in tree - - def test_traverse(self): - root = self.rorepo.tree('0.1.6') - num_recursive = 0 - all_items = list() - for obj in root.traverse(): - if "/" in obj.path: - num_recursive += 1 - - assert isinstance(obj, (Blob, Tree)) - all_items.append(obj) - # END for each object - assert all_items == root.list_traverse() - - # limit recursion level to 0 - should be same as default iteration - assert all_items - assert 'CHANGES' in root - assert len(list(root)) == len(list(root.traverse(depth=1))) - - # only choose trees - trees_only = lambda i,d: i.type == "tree" - trees = list(root.traverse(predicate = trees_only)) - assert len(trees) == len(list( i for i in root.traverse() if trees_only(i,0) )) - - # test prune - lib_folder = lambda t,d: t.path == "lib" - pruned_trees = list(root.traverse(predicate = trees_only,prune = lib_folder)) - assert len(pruned_trees) < len(trees) - - # trees and blobs - assert len(set(trees)|set(root.trees)) == len(trees) - assert len(set(b for b in root if isinstance(b, Blob)) | set(root.blobs)) == len( root.blobs ) - subitem = trees[0][0] - assert "/" in subitem.path - assert subitem.name == os.path.basename(subitem.path) - - # assure that at some point the traversed paths have a slash in them - found_slash = False - for item in root.traverse(): - assert os.path.isabs(item.abspath) - if '/' in item.path: - found_slash = True - # END check for slash - - # slashes in paths are supported as well - assert root[item.path] == item == root/item.path - # END for each item - assert found_slash - diff --git a/test/git/test_util.py b/test/git/test_util.py deleted file mode 100644 index 6453bc19..00000000 --- a/test/git/test_util.py +++ /dev/null @@ -1,105 +0,0 @@ -# test_utils.py -# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors -# -# This module is part of GitPython and is released under -# the BSD License: http://www.opensource.org/licenses/bsd-license.php - -import os -import tempfile - -from test.testlib import * -from git.util import * -from git.objects.util import * -from git import * -from git.cmd import dashify - -import time - - -class TestUtils(TestCase): - def setup(self): - self.testdict = { - "string": "42", - "int": 42, - "array": [ 42 ], - } - - def test_it_should_dashify(self): - assert_equal('this-is-my-argument', dashify('this_is_my_argument')) - assert_equal('foo', dashify('foo')) - - - def test_lock_file(self): - my_file = tempfile.mktemp() - lock_file = LockFile(my_file) - assert not lock_file._has_lock() - # release lock we don't have - fine - lock_file._release_lock() - - # get lock - lock_file._obtain_lock_or_raise() - assert lock_file._has_lock() - - # concurrent access - other_lock_file = LockFile(my_file) - assert not other_lock_file._has_lock() - self.failUnlessRaises(IOError, other_lock_file._obtain_lock_or_raise) - - lock_file._release_lock() - assert not lock_file._has_lock() - - other_lock_file._obtain_lock_or_raise() - self.failUnlessRaises(IOError, lock_file._obtain_lock_or_raise) - - # auto-release on destruction - del(other_lock_file) - lock_file._obtain_lock_or_raise() - lock_file._release_lock() - - def test_blocking_lock_file(self): - my_file = tempfile.mktemp() - lock_file = BlockingLockFile(my_file) - lock_file._obtain_lock() - - # next one waits for the lock - start = time.time() - wait_time = 0.1 - wait_lock = BlockingLockFile(my_file, 0.05, wait_time) - self.failUnlessRaises(IOError, wait_lock._obtain_lock) - elapsed = time.time() - start - assert elapsed <= wait_time + 0.02 # some extra time it may cost - - def test_user_id(self): - assert '@' in get_user_id() - - def test_parse_date(self): - # test all supported formats - def assert_rval(rval, veri_time, offset=0): - assert len(rval) == 2 - assert isinstance(rval[0], int) and isinstance(rval[1], int) - assert rval[0] == veri_time - assert rval[1] == offset - - # now that we are here, test our conversion functions as well - utctz = altz_to_utctz_str(offset) - assert isinstance(utctz, basestring) - assert utctz_to_altz(verify_utctz(utctz)) == offset - # END assert rval utility - - rfc = ("Thu, 07 Apr 2005 22:13:11 +0000", 0) - iso = ("2005-04-07T22:13:11 -0200", 7200) - iso2 = ("2005-04-07 22:13:11 +0400", -14400) - iso3 = ("2005.04.07 22:13:11 -0000", 0) - alt = ("04/07/2005 22:13:11", 0) - alt2 = ("07.04.2005 22:13:11", 0) - veri_time = 1112904791 # the time this represents - for date, offset in (rfc, iso, iso2, iso3, alt, alt2): - assert_rval(parse_date(date), veri_time, offset) - # END for each date type - - # and failure - self.failUnlessRaises(ValueError, parse_date, 'invalid format') - self.failUnlessRaises(ValueError, parse_date, '123456789 -02000') - self.failUnlessRaises(ValueError, parse_date, ' 123456789 -0200') - - |
