From 491440543571b07c849c0ef9c4ebf5c27f263bc0 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 7 Jan 2015 11:18:07 +0100 Subject: Implemented non-blocking operations using poll() Next up is using threads --- doc/source/changes.rst | 5 +++ git/cmd.py | 96 ++++++++++++++++++++++++++++++++++++++++++++++++++ git/index/base.py | 19 +++++----- git/remote.py | 66 +++++++++++++--------------------- git/repo/base.py | 13 ++++--- git/util.py | 8 +++++ 6 files changed, 149 insertions(+), 58 deletions(-) diff --git a/doc/source/changes.rst b/doc/source/changes.rst index b7479e4f..4983b3d0 100644 --- a/doc/source/changes.rst +++ b/doc/source/changes.rst @@ -2,6 +2,11 @@ Changelog ========= +0.3.5 - Bugfixes +================ +* push/pull/fetch operations will not block anymore +* A list of all fixed issues can be found here: https://github.com/gitpython-developers/GitPython/issues?q=milestone%3A%22v0.3.5+-+bugfixes%22+ + 0.3.4 - Python 3 Support ======================== * Internally, hexadecimal SHA1 are treated as ascii encoded strings. Binary SHA1 are treated as bytes. diff --git a/git/cmd.py b/git/cmd.py index ef370fe2..3cb334b6 100644 --- a/git/cmd.py +++ b/git/cmd.py @@ -6,6 +6,7 @@ import os import sys +import select import logging from subprocess import ( call, @@ -36,9 +37,104 @@ log = logging.getLogger('git.cmd') __all__ = ('Git', ) +# ============================================================================== +## @name Utilities +# ------------------------------------------------------------------------------ +# Documentation +## @{ + +def handle_process_output(process, stdout_handler, stderr_handler, finalizer): + """Registers for notifications to lean that process output is ready to read, and dispatches lines to + the respective line handlers. We are able to handle carriage returns in case progress is sent by that + mean. For performance reasons, we only apply this to stderr. + This function returns once the finalizer returns + :return: result of finalizer + :param process: subprocess.Popen instance + :param stdout_handler: f(stdout_line_string), or None + :param stderr_hanlder: f(stderr_line_string), or None + :param finalizer: f(proc) - wait for proc to finish""" + def read_line_fast(stream): + return stream.readline() + + def read_line_slow(stream): + line = b'' + while True: + char = stream.read(1) # reads individual single byte strings + if not char: + break + + if char in (b'\r', b'\n') and line: + break + else: + line += char + # END process parsed line + # END while file is not done reading + return line + # end + + fdmap = { process.stdout.fileno() : (process.stdout, stdout_handler, read_line_fast), + process.stderr.fileno() : (process.stderr, stderr_handler, read_line_slow) } + + if hasattr(select, 'poll'): + def dispatch_line(fd): + stream, handler, readline = fdmap[fd] + # this can possibly block for a while, but since we wake-up with at least one or more lines to handle, + # we are good ... + line = readline(stream).decode(defenc) + if line and handler: + handler(line) + return line + # end dispatch helper + + # poll is preferred, as select is limited to file handles up to 1024 ... . Not an issue for us though, + # as we deal with relatively blank processes + poll = select.poll() + READ_ONLY = select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR + CLOSED = select.POLLHUP | select.POLLERR + + poll.register(process.stdout, READ_ONLY) + poll.register(process.stderr, READ_ONLY) + + closed_streams = set() + while True: + # no timeout + poll_result = poll.poll() + for fd, result in poll_result: + if result & CLOSED: + closed_streams.add(fd) + else: + dispatch_line(fd) + # end handle closed stream + # end for each poll-result tuple + + if len(closed_streams) == len(fdmap): + break + # end its all done + # end endless loop + + # Depelete all remaining buffers + for fno, _ in fdmap.items(): + while True: + line = dispatch_line(fno) + if not line: + break + # end deplete buffer + # end for each file handle + else: + # Oh ... probably we are on windows. select.select() can only handle sockets, we have files + # The only reliable way to do this now is to use threads and wait for both to finish + # Since the finalizer is expected to wait, we don't have to introduce our own wait primitive + raise NotImplementedError() + # end + + return finalizer(process) + + def dashify(string): return string.replace('_', '-') +## -- End Utilities -- @} + class Git(LazyMixin): diff --git a/git/index/base.py b/git/index/base.py index cc883469..66fd5b1f 100644 --- a/git/index/base.py +++ b/git/index/base.py @@ -287,11 +287,11 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): changes according to the amount of trees. If 1 Tree is given, it will just be read into a new index If 2 Trees are given, they will be merged into a new index using a - two way merge algorithm. Tree 1 is the 'current' tree, tree 2 is the 'other' - one. It behaves like a fast-forward. - If 3 Trees are given, a 3-way merge will be performed with the first tree - being the common ancestor of tree 2 and tree 3. Tree 2 is the 'current' tree, - tree 3 is the 'other' one + two way merge algorithm. Tree 1 is the 'current' tree, tree 2 is the 'other' + one. It behaves like a fast-forward. + If 3 Trees are given, a 3-way merge will be performed with the first tree + being the common ancestor of tree 2 and tree 3. Tree 2 is the 'current' tree, + tree 3 is the 'other' one :param kwargs: Additional arguments passed to git-read-tree @@ -882,14 +882,11 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): def commit(self, message, parent_commits=None, head=True, author=None, committer=None): """Commit the current default index file, creating a commit object. - For more information on the arguments, see tree.commit. - :note: - If you have manually altered the .entries member of this instance, - don't forget to write() your changes to disk beforehand. - :return: - Commit object representing the new commit""" + :note: If you have manually altered the .entries member of this instance, + don't forget to write() your changes to disk beforehand. + :return: Commit object representing the new commit""" tree = self.write_tree() return Commit.create_from_tree(self.repo, tree, message, parent_commits, head, author=author, committer=committer) diff --git a/git/remote.py b/git/remote.py index 484bc031..85f4ebf2 100644 --- a/git/remote.py +++ b/git/remote.py @@ -31,6 +31,7 @@ from git.util import ( join_path, finalize_process ) +from git.cmd import handle_process_output from gitdb.util import join from git.compat import defenc @@ -39,31 +40,6 @@ __all__ = ('RemoteProgress', 'PushInfo', 'FetchInfo', 'Remote') #{ Utilities - -def digest_process_messages(fh, progress): - """Read progress messages from file-like object fh, supplying the respective - progress messages to the progress instance. - - :param fh: File handle to read from - :return: list(line, ...) list of lines without linebreaks that did - not contain progress information""" - line_so_far = b'' - dropped_lines = list() - while True: - char = fh.read(1) # reads individual single byte strings - if not char: - break - - if char in (b'\r', b'\n') and line_so_far: - dropped_lines.extend(progress._parse_progress_line(line_so_far.decode(defenc))) - line_so_far = b'' - else: - line_so_far += char - # END process parsed line - # END while file is not done reading - return dropped_lines - - def add_progress(kwargs, git, progress): """Add the --progress flag to the given kwargs dict if supported by the git command. If the actual progress in the given progress instance is not @@ -532,17 +508,24 @@ class Remote(LazyMixin, Iterable): # Basically we want all fetch info lines which appear to be in regular form, and thus have a # command character. Everything else we ignore, cmds = set(PushInfo._flag_map.keys()) & set(FetchInfo._flag_map.keys()) - for line in digest_process_messages(proc.stderr, progress): - if line.startswith('fatal:'): - raise GitCommandError(("Error when fetching: %s" % line,), 2) - # END handle special messages - for cmd in cmds: - if line[1] == cmd: - fetch_info_lines.append(line) - continue - # end find command code - # end for each comand code we know - # END for each line + + progress_handler = progress.new_message_handler() + def my_progress_handler(line): + for pline in progress_handler(line): + if line.startswith('fatal:'): + raise GitCommandError(("Error when fetching: %s" % line,), 2) + # END handle special messages + for cmd in cmds: + if line[1] == cmd: + fetch_info_lines.append(line) + continue + # end find command code + # end for each comand code we know + # end for each line progress didn't handle + # end + + # We are only interested in stderr here ... + handle_process_output(proc, None, my_progress_handler, finalize_process) # read head information fp = open(join(self.repo.git_dir, 'FETCH_HEAD'), 'rb') @@ -555,7 +538,6 @@ class Remote(LazyMixin, Iterable): output.extend(FetchInfo._from_line(self.repo, err_line, fetch_line) for err_line, fetch_line in zip(fetch_info_lines, fetch_head_info)) - finalize_process(proc) return output def _get_push_info(self, proc, progress): @@ -564,11 +546,10 @@ class Remote(LazyMixin, Iterable): # read the lines manually as it will use carriage returns between the messages # to override the previous one. This is why we read the bytes manually # TODO: poll() on file descriptors to know what to read next, process streams concurrently - digest_process_messages(proc.stderr, progress) - + progress_handler = progress.new_message_handler() output = IterableList('name') - for line in proc.stdout.readlines(): - line = line.decode(defenc) + + def stdout_handler(line): try: output.append(PushInfo._from_line(self, line)) except ValueError: @@ -576,7 +557,8 @@ class Remote(LazyMixin, Iterable): pass # END exception handling # END for each line - finalize_process(proc) + + handle_process_output(proc, stdout_handler, progress_handler, finalize_process) return output def fetch(self, refspec=None, progress=None, **kwargs): diff --git a/git/repo/base.py b/git/repo/base.py index 2a63492b..e8db3540 100644 --- a/git/repo/base.py +++ b/git/repo/base.py @@ -5,7 +5,10 @@ # the BSD License: http://www.opensource.org/licenses/bsd-license.php from git.exc import InvalidGitRepositoryError, NoSuchPathError -from git.cmd import Git +from git.cmd import ( + Git, + handle_process_output +) from git.refs import ( HEAD, Head, @@ -25,7 +28,6 @@ from git.index import IndexFile from git.config import GitConfigParser from git.remote import ( Remote, - digest_process_messages, add_progress ) @@ -711,9 +713,10 @@ class Repo(object): proc = git.clone(url, path, with_extended_output=True, as_process=True, v=True, **add_progress(kwargs, git, progress)) if progress: - digest_process_messages(proc.stderr, progress) - # END handle progress - finalize_process(proc) + handle_process_output(proc, None, progress.new_message_handler(), finalize_process) + else: + finalize_process(proc) + # end handle progress finally: if prev_cwd is not None: os.chdir(prev_cwd) diff --git a/git/util.py b/git/util.py index 4de736d3..34b09d32 100644 --- a/git/util.py +++ b/git/util.py @@ -249,6 +249,14 @@ class RemoteProgress(object): # END for each sub line return failed_lines + def new_message_handler(self): + """:return: a progress handler suitable for handle_process_output(), passing lines on to this Progress + handler in a suitable format""" + def handler(line): + return self._parse_progress_line(line.rstrip()) + # end + return handler + def line_dropped(self, line): """Called whenever a line could not be understood and was therefore dropped.""" pass -- cgit v1.2.3 From c86bea60dde4016dd850916aa2e0db5260e1ff61 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 7 Jan 2015 11:41:15 +0100 Subject: Implemented threaded version of pipe-draining --- git/cmd.py | 45 ++++++++++++++++++++++++++++++--------------- 1 file changed, 30 insertions(+), 15 deletions(-) diff --git a/git/cmd.py b/git/cmd.py index 3cb334b6..5ba5edb4 100644 --- a/git/cmd.py +++ b/git/cmd.py @@ -8,6 +8,7 @@ import os import sys import select import logging +import threading from subprocess import ( call, Popen, @@ -72,12 +73,8 @@ def handle_process_output(process, stdout_handler, stderr_handler, finalizer): return line # end - fdmap = { process.stdout.fileno() : (process.stdout, stdout_handler, read_line_fast), - process.stderr.fileno() : (process.stderr, stderr_handler, read_line_slow) } - - if hasattr(select, 'poll'): - def dispatch_line(fd): - stream, handler, readline = fdmap[fd] + def dispatch_line(fno): + stream, handler, readline = fdmap[fno] # this can possibly block for a while, but since we wake-up with at least one or more lines to handle, # we are good ... line = readline(stream).decode(defenc) @@ -85,9 +82,22 @@ def handle_process_output(process, stdout_handler, stderr_handler, finalizer): handler(line) return line # end dispatch helper + # end + + def deplete_buffer(fno): + while True: + line = dispatch_line(fno) + if not line: + break + # end deplete buffer + # end + + fdmap = { process.stdout.fileno() : (process.stdout, stdout_handler, read_line_fast), + process.stderr.fileno() : (process.stderr, stderr_handler, read_line_slow) } - # poll is preferred, as select is limited to file handles up to 1024 ... . Not an issue for us though, - # as we deal with relatively blank processes + if hasattr(select, 'poll'): + # poll is preferred, as select is limited to file handles up to 1024 ... . This could otherwise be + # an issue for us, as it matters how many handles or own process has poll = select.poll() READ_ONLY = select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR CLOSED = select.POLLHUP | select.POLLERR @@ -113,18 +123,23 @@ def handle_process_output(process, stdout_handler, stderr_handler, finalizer): # end endless loop # Depelete all remaining buffers - for fno, _ in fdmap.items(): - while True: - line = dispatch_line(fno) - if not line: - break - # end deplete buffer + for fno in fdmap.keys(): + deplete_buffer(fno) # end for each file handle else: # Oh ... probably we are on windows. select.select() can only handle sockets, we have files # The only reliable way to do this now is to use threads and wait for both to finish # Since the finalizer is expected to wait, we don't have to introduce our own wait primitive - raise NotImplementedError() + # NO: It's not enough unfortunately, and we will have to sync the threads + threads = list() + for fno in fdmap.keys(): + t = threading.Thread(target = lambda: deplete_buffer(fno)) + threads.append(t) + t.start() + # end + for t in threads: + t.join() + # end # end return finalizer(process) -- cgit v1.2.3 From 763ef75d12f0ad6e4b79a7df304c7b5f1b5a11f2 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 7 Jan 2015 12:32:45 +0100 Subject: Using a wait-group seems to properly sync the threads for buffer depletion --- git/cmd.py | 20 ++++++++++++-------- git/util.py | 32 +++++++++++++++++++++++++++++++- 2 files changed, 43 insertions(+), 9 deletions(-) diff --git a/git/cmd.py b/git/cmd.py index 5ba5edb4..e03d0cdc 100644 --- a/git/cmd.py +++ b/git/cmd.py @@ -18,7 +18,8 @@ from subprocess import ( from .util import ( LazyMixin, - stream_copy + stream_copy, + WaitGroup ) from .exc import GitCommandError from git.compat import ( @@ -84,12 +85,14 @@ def handle_process_output(process, stdout_handler, stderr_handler, finalizer): # end dispatch helper # end - def deplete_buffer(fno): + def deplete_buffer(fno, wg=None): while True: line = dispatch_line(fno) if not line: break # end deplete buffer + if wg: + wg.done() # end fdmap = { process.stdout.fileno() : (process.stdout, stdout_handler, read_line_fast), @@ -131,15 +134,16 @@ def handle_process_output(process, stdout_handler, stderr_handler, finalizer): # The only reliable way to do this now is to use threads and wait for both to finish # Since the finalizer is expected to wait, we don't have to introduce our own wait primitive # NO: It's not enough unfortunately, and we will have to sync the threads - threads = list() + wg = WaitGroup() for fno in fdmap.keys(): - t = threading.Thread(target = lambda: deplete_buffer(fno)) - threads.append(t) + wg.add(1) + t = threading.Thread(target = lambda: deplete_buffer(fno, wg)) t.start() # end - for t in threads: - t.join() - # end + # NOTE: Just joining threads can possibly fail as there is a gap between .start() and when it's + # actually started, which could make the wait() call to just return because the thread is not yet + # active + wg.wait() # end return finalizer(process) diff --git a/git/util.py b/git/util.py index 34b09d32..e211ca41 100644 --- a/git/util.py +++ b/git/util.py @@ -12,6 +12,7 @@ import stat import shutil import platform import getpass +import threading # NOTE: Some of the unused imports might be used/imported by others. # Handle once test-cases are back up and running. @@ -32,7 +33,7 @@ from gitdb.util import ( # NOQA __all__ = ("stream_copy", "join_path", "to_native_path_windows", "to_native_path_linux", "join_path_native", "Stats", "IndexFileSHA1Writer", "Iterable", "IterableList", "BlockingLockFile", "LockFile", 'Actor', 'get_user_id', 'assure_directory_exists', - 'RemoteProgress', 'rmtree') + 'RemoteProgress', 'rmtree', 'WaitGroup') #{ Utility Methods @@ -699,3 +700,32 @@ class Iterable(object): raise NotImplementedError("To be implemented by Subclass") #} END classes + + +class WaitGroup(object): + """WaitGroup is like Go sync.WaitGroup. + + Without all the useful corner cases. + By Peter Teichman, taken from https://gist.github.com/pteichman/84b92ae7cef0ab98f5a8 + """ + def __init__(self): + self.count = 0 + self.cv = threading.Condition() + + def add(self, n): + self.cv.acquire() + self.count += n + self.cv.release() + + def done(self): + self.cv.acquire() + self.count -= 1 + if self.count == 0: + self.cv.notify_all() + self.cv.release() + + def wait(self): + self.cv.acquire() + while self.count > 0: + self.cv.wait() + self.cv.release() -- cgit v1.2.3 From 87a6ffa13ae2951a168cde5908c7a94b16562b96 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 7 Jan 2015 12:37:49 +0100 Subject: Fix flake8 --- git/cmd.py | 20 ++++++++++---------- git/remote.py | 6 ++++-- git/util.py | 10 +++++----- 3 files changed, 19 insertions(+), 17 deletions(-) diff --git a/git/cmd.py b/git/cmd.py index e03d0cdc..f847166c 100644 --- a/git/cmd.py +++ b/git/cmd.py @@ -46,8 +46,8 @@ __all__ = ('Git', ) ## @{ def handle_process_output(process, stdout_handler, stderr_handler, finalizer): - """Registers for notifications to lean that process output is ready to read, and dispatches lines to - the respective line handlers. We are able to handle carriage returns in case progress is sent by that + """Registers for notifications to lean that process output is ready to read, and dispatches lines to + the respective line handlers. We are able to handle carriage returns in case progress is sent by that mean. For performance reasons, we only apply this to stderr. This function returns once the finalizer returns :return: result of finalizer @@ -77,7 +77,7 @@ def handle_process_output(process, stdout_handler, stderr_handler, finalizer): def dispatch_line(fno): stream, handler, readline = fdmap[fno] # this can possibly block for a while, but since we wake-up with at least one or more lines to handle, - # we are good ... + # we are good ... line = readline(stream).decode(defenc) if line and handler: handler(line) @@ -93,13 +93,13 @@ def handle_process_output(process, stdout_handler, stderr_handler, finalizer): # end deplete buffer if wg: wg.done() - # end + # end - fdmap = { process.stdout.fileno() : (process.stdout, stdout_handler, read_line_fast), - process.stderr.fileno() : (process.stderr, stderr_handler, read_line_slow) } + fdmap = {process.stdout.fileno(): (process.stdout, stdout_handler, read_line_fast), + process.stderr.fileno(): (process.stderr, stderr_handler, read_line_slow)} if hasattr(select, 'poll'): - # poll is preferred, as select is limited to file handles up to 1024 ... . This could otherwise be + # poll is preferred, as select is limited to file handles up to 1024 ... . This could otherwise be # an issue for us, as it matters how many handles or own process has poll = select.poll() READ_ONLY = select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR @@ -137,10 +137,10 @@ def handle_process_output(process, stdout_handler, stderr_handler, finalizer): wg = WaitGroup() for fno in fdmap.keys(): wg.add(1) - t = threading.Thread(target = lambda: deplete_buffer(fno, wg)) + t = threading.Thread(target=lambda: deplete_buffer(fno, wg)) t.start() # end - # NOTE: Just joining threads can possibly fail as there is a gap between .start() and when it's + # NOTE: Just joining threads can possibly fail as there is a gap between .start() and when it's # actually started, which could make the wait() call to just return because the thread is not yet # active wg.wait() @@ -148,7 +148,7 @@ def handle_process_output(process, stdout_handler, stderr_handler, finalizer): return finalizer(process) - + def dashify(string): return string.replace('_', '-') diff --git a/git/remote.py b/git/remote.py index 85f4ebf2..87db5dd4 100644 --- a/git/remote.py +++ b/git/remote.py @@ -40,6 +40,7 @@ __all__ = ('RemoteProgress', 'PushInfo', 'FetchInfo', 'Remote') #{ Utilities + def add_progress(kwargs, git, progress): """Add the --progress flag to the given kwargs dict if supported by the git command. If the actual progress in the given progress instance is not @@ -510,6 +511,7 @@ class Remote(LazyMixin, Iterable): cmds = set(PushInfo._flag_map.keys()) & set(FetchInfo._flag_map.keys()) progress_handler = progress.new_message_handler() + def my_progress_handler(line): for pline in progress_handler(line): if line.startswith('fatal:'): @@ -520,11 +522,11 @@ class Remote(LazyMixin, Iterable): fetch_info_lines.append(line) continue # end find command code - # end for each comand code we know + # end for each comand code we know # end for each line progress didn't handle # end - # We are only interested in stderr here ... + # We are only interested in stderr here ... handle_process_output(proc, None, my_progress_handler, finalize_process) # read head information diff --git a/git/util.py b/git/util.py index e211ca41..4d1ea8d6 100644 --- a/git/util.py +++ b/git/util.py @@ -251,7 +251,7 @@ class RemoteProgress(object): return failed_lines def new_message_handler(self): - """:return: a progress handler suitable for handle_process_output(), passing lines on to this Progress + """:return: a progress handler suitable for handle_process_output(), passing lines on to this Progress handler in a suitable format""" def handler(line): return self._parse_progress_line(line.rstrip()) @@ -704,26 +704,26 @@ class Iterable(object): class WaitGroup(object): """WaitGroup is like Go sync.WaitGroup. - + Without all the useful corner cases. By Peter Teichman, taken from https://gist.github.com/pteichman/84b92ae7cef0ab98f5a8 """ def __init__(self): self.count = 0 self.cv = threading.Condition() - + def add(self, n): self.cv.acquire() self.count += n self.cv.release() - + def done(self): self.cv.acquire() self.count -= 1 if self.count == 0: self.cv.notify_all() self.cv.release() - + def wait(self): self.cv.acquire() while self.count > 0: -- cgit v1.2.3