diff options
| author | Oskari Timperi <oskari.timperi@iki.fi> | 2014-02-26 23:34:59 +0200 |
|---|---|---|
| committer | Oskari Timperi <oskari.timperi@iki.fi> | 2014-02-26 23:34:59 +0200 |
| commit | ab8a8e675d2dcdaaa7ccfbbdf4643246227e5e2a (patch) | |
| tree | d1980aad9bea37b886a4e87d0252b965ad672f5f | |
| download | distributed-test-ab8a8e675d2dcdaaa7ccfbbdf4643246227e5e2a.tar.gz distributed-test-ab8a8e675d2dcdaaa7ccfbbdf4643246227e5e2a.zip | |
initial commit
| -rw-r--r-- | .gitignore | 1 | ||||
| -rw-r--r-- | app/__init__.py | 0 | ||||
| -rw-r--r-- | app/distprocess.py | 293 | ||||
| -rw-r--r-- | app/fanoutclient.py | 85 | ||||
| -rw-r--r-- | app/terminatorscanner.py | 32 | ||||
| -rw-r--r-- | hop.py | 77 | ||||
| -rwxr-xr-x | quitapps.sh | 3 | ||||
| -rwxr-xr-x | runapps.sh | 5 | ||||
| -rw-r--r-- | testapp.py | 6 |
9 files changed, 502 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0d20b64 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +*.pyc diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/app/__init__.py diff --git a/app/distprocess.py b/app/distprocess.py new file mode 100644 index 0000000..f7081c8 --- /dev/null +++ b/app/distprocess.py @@ -0,0 +1,293 @@ +import random +import signal +import uuid + +import pyuv + +from fanoutclient import FanoutClient +from terminatorscanner import TerminatorScannerBasic + + +class PeerFinder(FanoutClient): + def __init__(self, process, address, loop=None): + super(PeerFinder, self).__init__(address, loop) + self.process = process + + def join_handler(self, msg): + peer_id, port = msg.split('|') + + if peer_id != self.process.client_id: + print "got peer at port %s" % port + self.process.peers.append((peer_id, port)) + self.process.send_hello(peer_id, port) + + def leave_handler(self, msg): + print 'peer %s has left' % msg + self.process.peers = [p for p in self.process.peers if p[0] != msg] + + def control_handler(self, msg): + if msg == 'quit': + self.process.disconnect() + + + def on_connected(self): + print "peerfinder connected" + self.subscribe("join", self.join_handler) + self.subscribe("leave", self.leave_handler) + self.subscribe("control", self.control_handler) + self.announce("join", "%s|%s" % (self.process.client_id, self.process.port)) + + def disconnect(self): + def after_announce(): + print 'LEFT' + super(PeerFinder, self).disconnect() + print 'LEAVING' + self.announce("leave", self.process.client_id, callback=after_announce) + # super(PeerFinder, self).disconnect() + + +class DistProcess(object): + def __init__(self, loop=None): + self.peers = [] + + self.epoch = 0 + + self.freq = -1 + self.new_freq = None + self.freq_epoch = 0 + + self.client_id = str(uuid.uuid4()) + + if not loop: + loop = pyuv.Loop.default_loop() + + self.loop = loop + + self.signal_watchers = set() + + for sig in (signal.SIGINT, signal.SIGTERM): + handle = pyuv.Signal(loop) + handle.start(self._signal_cb, sig) + self.signal_watchers.add(handle) + + self.peerfinder = PeerFinder(self, ('127.0.0.1', 9898), loop) + + self.port = random.randint(10000, 50000) + + self.clients = [] + + self.incoming = TerminatorScannerBasic('\n') + self.cmd_handlers = {} + self.cmd_handlers['hello'] = self.handle_hello + self.cmd_handlers['update'] = self.handle_update + self.cmd_handlers['elect_me'] = self.handle_elect_me + self.cmd_handlers['you_have_my_vote'] = self.handle_you_have_my_vote + self.cmd_handlers['hop'] = self.handle_hop + + self.votes = [] + self.voting = False + self.vote_timer = pyuv.Timer(self.loop) + + def start(self): + self.server = pyuv.TCP(self.loop) + self.server.bind(('127.0.0.1', self.port)) + self.server.listen(self._on_connection) + + self.update_timer = pyuv.Timer(self.loop) + self.update_timer.start(self._on_timer, random.random() * 5 + 5, + random.random() * 5 + 5) + + print "listening on port %s" % self.port + + self.peerfinder.connect() + + def _on_connection(self, server, error): + # print 'client connected' + client = pyuv.TCP(server.loop) + server.accept(client) + self.clients.append(client) + client.start_read(self._on_read) + + def _on_read(self, client, data, error): + if data is None: + client.close() + self.clients.remove(client) + return + + self.incoming.handle_read(data) + + while self.incoming.incoming: + msg = self.incoming.incoming.pop(0) + self._handle_msg(msg) + + def _handle_msg(self, msg): + # print 'received data <%s>' % msg + + cmd, data = msg.split(' ', 1) + + if cmd in self.cmd_handlers: + handler = self.cmd_handlers[cmd] + handler(data) + + def handle_hello(self, data): + peer, port = data.split() + print 'got hello from %s' % peer + self.peers.append((peer, int(port))) + + def handle_update(self, data): + print 'got update %s' % data + + epoch, freq, freq_epoch = [int(x) for x in data.split(':')] + + if freq_epoch > self.freq_epoch: + print 'freq updated to %s (freq_epoch %s)' % (freq, freq_epoch) + self.freq_epoch = freq_epoch + self.freq = freq + + if epoch > self.epoch: + print 'updating epoch to %s' % epoch + self.epoch = epoch + + def handle_elect_me(self, data): + peer, epoch = data.split() + + print '%s wants to be elected (epoch %s)' % (peer, epoch) + + epoch = int(epoch) + + if epoch > self.epoch: + self.epoch = epoch + + if epoch == self.epoch: + self.send_vote(peer) + else: + pass + + def handle_you_have_my_vote(self, data): + peer, epoch = data.split() + + print '%s votes for me (epoch %s)' % (peer, epoch) + + epoch = int(epoch) + + if epoch == self.epoch: + self.votes.append(peer) + + def handle_hop(self, data): + self.change_freq() + + def disconnect(self): + self.peerfinder.disconnect() + [c.close() for c in self.clients] + [h.close() for h in self.signal_watchers] + self.update_timer.stop() + self.vote_timer.stop() + self.server.close() + + def _signal_cb(self, handle, signum): + self.disconnect() + + def _on_timer(self, timer): + # print '_on_timer' + + if (self.freq < 0) and (self.epoch == 0): + self.change_freq() + return + + if self.peers: + self.send_update() + + def change_freq(self): + print 'trying to change freq' + + self.epoch += 1 + self.new_freq = random.randint(0, 1000) + + self.send_elect() + + def on_timer(timer): + print 'voting time is up' + + timer.stop() + + vote_count = len(self.votes) + peer_count = len(self.peers) + peer_count_m = peer_count + + if peer_count % 2 != 0: + peer_count_m = peer_count_m + 1 + + if ((peer_count > 1) and (vote_count > peer_count/2)) or ((peer_count == 1) and (vote_count == 1)): + print "i'm elected" + + self.freq_epoch = self.epoch + self.freq = self.new_freq + + self.send_update() + + self.votes = [] + + self.voting = False + + self.vote_timer.start(on_timer, 5, 0) + + def send_update(self): + if self.voting: + return + + peer, port = random.choice(self.peers) + + print 'sending update to port %s' % port + + def on_connected(handle, error): + handle.write('update %s:%s:%s\n' % (self.epoch, self.freq, self.freq_epoch), + after_write) + + def after_write(handle, error): + handle.close() + + handle = pyuv.TCP(self.server.loop) + handle.connect(('127.0.0.1', int(port)), on_connected) + + def send_elect(self): + print 'sending elects' + + self.voting = True + + def on_connected(handle, error): + handle.write('elect_me %s %s\n' % (self.client_id, self.epoch), after_write) + + def after_write(handle, error): + handle.close() + + for peer, port in self.peers: + handle = pyuv.TCP(self.server.loop) + handle.connect(('127.0.0.1', int(port)), on_connected) + + def send_vote(self, peer_id): + peer = [p for p in self.peers if p[0] == peer_id] + + if peer: + _, port = peer[0] + + def on_connected(handle, error): + handle.write('you_have_my_vote %s %s\n' % (self.client_id, self.epoch), after_write) + + def after_write(handle, error): + handle.close() + + handle = pyuv.TCP(self.server.loop) + handle.connect(('127.0.0.1', int(port)), on_connected) + + def send_hello(self, peer, port): + print 'saying hello to port %s' % port + + def on_connected(handle, error): + handle.write('hello %s %s\n' % (self.client_id, self.port), + on_written) + + def on_written(handle, error): + handle.close() + + handle = pyuv.TCP(self.server.loop) + handle.connect(('127.0.0.1', int(port)), on_connected) diff --git a/app/fanoutclient.py b/app/fanoutclient.py new file mode 100644 index 0000000..0a215dc --- /dev/null +++ b/app/fanoutclient.py @@ -0,0 +1,85 @@ +import pyuv +import uuid +import random + +class FanoutClient(object): + def __init__(self, address, loop=None): + self.address = address + self.host, self.port = address + + self.ibuf = [] + + self.channel_handlers = {} + + if not loop: + loop = pyuv.Loop.default_loop() + + self._conn = pyuv.TCP(loop) + + def connect(self): + self._conn.connect(self.address, self._on_connected) + + def disconnect(self): + self._conn.close(self._on_close) + + def on_connected(self): + pass + + def on_close(self): + pass + + def _on_connected(self, handle, error): + self.on_connected() + self._conn.start_read(self._on_read) + + def _on_read(self, handle, data, error): + if not data: + self.on_close() + return + + parts = data.split('\n') + + while parts: + if len(parts) > 1: + self.ibuf.append(parts.pop(0)) + self.handle_incoming("".join(self.ibuf)) + self.ibuf = [] + elif len(parts) == 1: + if not parts[0]: + break + else: + self.ibuf.append(parts.pop(0)) + + def _on_close(self, handle): + self.on_close() + + def handle_incoming(self, data): + channel, msg = data.split("!") + + if channel in self.channel_handlers.keys(): + self.channel_handlers[channel](msg) + # else: + # print 'no handler for <%s>' % channel + + def _send_command(self, cmd, callback=None): + # print "sending cmd <%s>" % cmd + def after_write(handle, error): + if callback: + callback() + + self._conn.write("%s\n" % cmd, after_write) + + def subscribe(self, channel, handler=None, **kwargs): + if handler: + self.channel_handlers[channel] = handler + + self._send_command("subscribe %s" % channel, **kwargs) + + def unsubscribe(self, channel, **kwargs): + if channel in self.channel_handlers: + del self.channel_handlers[channel] + + self._send_command("unsubscribe %s" % channel, **kwargs) + + def announce(self, channel, msg, **kwargs): + self._send_command("announce %s %s" % (channel, msg), **kwargs) diff --git a/app/terminatorscanner.py b/app/terminatorscanner.py new file mode 100644 index 0000000..6bd99a1 --- /dev/null +++ b/app/terminatorscanner.py @@ -0,0 +1,32 @@ +class TerminatorScanner(object): + def __init__(self, terminator=None): + self.in_buffer = '' + self.terminator = terminator + + def handle_read(self, data): + self.in_buffer = self.in_buffer + data + + terminator_len = len(self.terminator) + + while True: + index = self.in_buffer.find(self.terminator) + + if index < 0: + break + + incoming = self.in_buffer[:index] + self.in_buffer = self.in_buffer[index+terminator_len:] + + self.handle_incoming(incoming) + + def handle_incoming(self, data): + raise NotImplementedError('must be implemented in subclass') + + +class TerminatorScannerBasic(TerminatorScanner): + def __init__(self, terminator=None): + super(TerminatorScannerBasic, self).__init__(terminator) + self.incoming = [] + + def handle_incoming(self, data): + self.incoming.append(data) @@ -0,0 +1,77 @@ +from app.fanoutclient import FanoutClient +from app.terminatorscanner import TerminatorScannerBasic +import pyuv +import random + +class Hopper(FanoutClient): + def __init__(self, address, port): + super(Hopper, self).__init__(address) + self.server_listen_port = port + + def on_connected(self): + self.subscribe('join') + self.subscribe('leave') + self.announce('join', 'hopper|%s' % self.server_listen_port) + + def disconnect(self): + def after_announce(): + super(Hopper, self).disconnect() + self.announce("leave", 'hopper', callback=after_announce) + +loop = pyuv.Loop.default_loop() +ihandler = TerminatorScannerBasic('\n') + +clients = [] +ports = [] + +def on_connection(server, error): + client = pyuv.TCP(server.loop) + server.accept(client) + clients.append(client) + client.start_read(on_read) + +def on_read(client, data, error): + if data is None: + client.close() + clients.remove(client) + return + + ihandler.handle_read(data) + + while ihandler.incoming: + msg = ihandler.incoming.pop(0) + cmd, data = msg.split(' ', 1) + if cmd == 'hello': + _, port = data.split() + ports.append(port) + print 'hello from %s' % port + +port = random.randint(10000, 50000) +server = pyuv.TCP(loop) +server.bind(('127.0.0.1', port)) +server.listen(on_connection) + +def on_timer(timer): + timer.stop() + port = random.choice(ports) + + print 'sending hop command to port %s' % port + + def on_connected(handle, error): + handle.write('hop hop\n', on_written) + + def on_written(handle, error): + handle.close() + fanout_client.disconnect() + server.close() + + handle = pyuv.TCP(timer.loop) + handle.connect(('127.0.0.1', int(port)), on_connected) + +timer = pyuv.Timer(loop) +timer.start(on_timer, 2, 0) + +fanout_client = Hopper(('127.0.0.1', 9898), port) +fanout_client.connect() + +loop.run() diff --git a/quitapps.sh b/quitapps.sh new file mode 100755 index 0000000..c65f57c --- /dev/null +++ b/quitapps.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +echo -n -e "subscribe control\nannounce control quit\n" | nc localhost 9898 diff --git a/runapps.sh b/runapps.sh new file mode 100755 index 0000000..49ec7a1 --- /dev/null +++ b/runapps.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +for i in $(seq 10); do + python testapp.py & +done diff --git a/testapp.py b/testapp.py new file mode 100644 index 0000000..914aa9d --- /dev/null +++ b/testapp.py @@ -0,0 +1,6 @@ +from app.distprocess import DistProcess + +d = DistProcess() +d.start() + +d.loop.run() |
