diff options
Diffstat (limited to 'app/distprocess.py')
| -rw-r--r-- | app/distprocess.py | 293 |
1 files changed, 293 insertions, 0 deletions
diff --git a/app/distprocess.py b/app/distprocess.py new file mode 100644 index 0000000..f7081c8 --- /dev/null +++ b/app/distprocess.py @@ -0,0 +1,293 @@ +import random +import signal +import uuid + +import pyuv + +from fanoutclient import FanoutClient +from terminatorscanner import TerminatorScannerBasic + + +class PeerFinder(FanoutClient): + def __init__(self, process, address, loop=None): + super(PeerFinder, self).__init__(address, loop) + self.process = process + + def join_handler(self, msg): + peer_id, port = msg.split('|') + + if peer_id != self.process.client_id: + print "got peer at port %s" % port + self.process.peers.append((peer_id, port)) + self.process.send_hello(peer_id, port) + + def leave_handler(self, msg): + print 'peer %s has left' % msg + self.process.peers = [p for p in self.process.peers if p[0] != msg] + + def control_handler(self, msg): + if msg == 'quit': + self.process.disconnect() + + + def on_connected(self): + print "peerfinder connected" + self.subscribe("join", self.join_handler) + self.subscribe("leave", self.leave_handler) + self.subscribe("control", self.control_handler) + self.announce("join", "%s|%s" % (self.process.client_id, self.process.port)) + + def disconnect(self): + def after_announce(): + print 'LEFT' + super(PeerFinder, self).disconnect() + print 'LEAVING' + self.announce("leave", self.process.client_id, callback=after_announce) + # super(PeerFinder, self).disconnect() + + +class DistProcess(object): + def __init__(self, loop=None): + self.peers = [] + + self.epoch = 0 + + self.freq = -1 + self.new_freq = None + self.freq_epoch = 0 + + self.client_id = str(uuid.uuid4()) + + if not loop: + loop = pyuv.Loop.default_loop() + + self.loop = loop + + self.signal_watchers = set() + + for sig in (signal.SIGINT, signal.SIGTERM): + handle = pyuv.Signal(loop) + handle.start(self._signal_cb, sig) + self.signal_watchers.add(handle) + + self.peerfinder = PeerFinder(self, ('127.0.0.1', 9898), loop) + + self.port = random.randint(10000, 50000) + + self.clients = [] + + self.incoming = TerminatorScannerBasic('\n') + self.cmd_handlers = {} + self.cmd_handlers['hello'] = self.handle_hello + self.cmd_handlers['update'] = self.handle_update + self.cmd_handlers['elect_me'] = self.handle_elect_me + self.cmd_handlers['you_have_my_vote'] = self.handle_you_have_my_vote + self.cmd_handlers['hop'] = self.handle_hop + + self.votes = [] + self.voting = False + self.vote_timer = pyuv.Timer(self.loop) + + def start(self): + self.server = pyuv.TCP(self.loop) + self.server.bind(('127.0.0.1', self.port)) + self.server.listen(self._on_connection) + + self.update_timer = pyuv.Timer(self.loop) + self.update_timer.start(self._on_timer, random.random() * 5 + 5, + random.random() * 5 + 5) + + print "listening on port %s" % self.port + + self.peerfinder.connect() + + def _on_connection(self, server, error): + # print 'client connected' + client = pyuv.TCP(server.loop) + server.accept(client) + self.clients.append(client) + client.start_read(self._on_read) + + def _on_read(self, client, data, error): + if data is None: + client.close() + self.clients.remove(client) + return + + self.incoming.handle_read(data) + + while self.incoming.incoming: + msg = self.incoming.incoming.pop(0) + self._handle_msg(msg) + + def _handle_msg(self, msg): + # print 'received data <%s>' % msg + + cmd, data = msg.split(' ', 1) + + if cmd in self.cmd_handlers: + handler = self.cmd_handlers[cmd] + handler(data) + + def handle_hello(self, data): + peer, port = data.split() + print 'got hello from %s' % peer + self.peers.append((peer, int(port))) + + def handle_update(self, data): + print 'got update %s' % data + + epoch, freq, freq_epoch = [int(x) for x in data.split(':')] + + if freq_epoch > self.freq_epoch: + print 'freq updated to %s (freq_epoch %s)' % (freq, freq_epoch) + self.freq_epoch = freq_epoch + self.freq = freq + + if epoch > self.epoch: + print 'updating epoch to %s' % epoch + self.epoch = epoch + + def handle_elect_me(self, data): + peer, epoch = data.split() + + print '%s wants to be elected (epoch %s)' % (peer, epoch) + + epoch = int(epoch) + + if epoch > self.epoch: + self.epoch = epoch + + if epoch == self.epoch: + self.send_vote(peer) + else: + pass + + def handle_you_have_my_vote(self, data): + peer, epoch = data.split() + + print '%s votes for me (epoch %s)' % (peer, epoch) + + epoch = int(epoch) + + if epoch == self.epoch: + self.votes.append(peer) + + def handle_hop(self, data): + self.change_freq() + + def disconnect(self): + self.peerfinder.disconnect() + [c.close() for c in self.clients] + [h.close() for h in self.signal_watchers] + self.update_timer.stop() + self.vote_timer.stop() + self.server.close() + + def _signal_cb(self, handle, signum): + self.disconnect() + + def _on_timer(self, timer): + # print '_on_timer' + + if (self.freq < 0) and (self.epoch == 0): + self.change_freq() + return + + if self.peers: + self.send_update() + + def change_freq(self): + print 'trying to change freq' + + self.epoch += 1 + self.new_freq = random.randint(0, 1000) + + self.send_elect() + + def on_timer(timer): + print 'voting time is up' + + timer.stop() + + vote_count = len(self.votes) + peer_count = len(self.peers) + peer_count_m = peer_count + + if peer_count % 2 != 0: + peer_count_m = peer_count_m + 1 + + if ((peer_count > 1) and (vote_count > peer_count/2)) or ((peer_count == 1) and (vote_count == 1)): + print "i'm elected" + + self.freq_epoch = self.epoch + self.freq = self.new_freq + + self.send_update() + + self.votes = [] + + self.voting = False + + self.vote_timer.start(on_timer, 5, 0) + + def send_update(self): + if self.voting: + return + + peer, port = random.choice(self.peers) + + print 'sending update to port %s' % port + + def on_connected(handle, error): + handle.write('update %s:%s:%s\n' % (self.epoch, self.freq, self.freq_epoch), + after_write) + + def after_write(handle, error): + handle.close() + + handle = pyuv.TCP(self.server.loop) + handle.connect(('127.0.0.1', int(port)), on_connected) + + def send_elect(self): + print 'sending elects' + + self.voting = True + + def on_connected(handle, error): + handle.write('elect_me %s %s\n' % (self.client_id, self.epoch), after_write) + + def after_write(handle, error): + handle.close() + + for peer, port in self.peers: + handle = pyuv.TCP(self.server.loop) + handle.connect(('127.0.0.1', int(port)), on_connected) + + def send_vote(self, peer_id): + peer = [p for p in self.peers if p[0] == peer_id] + + if peer: + _, port = peer[0] + + def on_connected(handle, error): + handle.write('you_have_my_vote %s %s\n' % (self.client_id, self.epoch), after_write) + + def after_write(handle, error): + handle.close() + + handle = pyuv.TCP(self.server.loop) + handle.connect(('127.0.0.1', int(port)), on_connected) + + def send_hello(self, peer, port): + print 'saying hello to port %s' % port + + def on_connected(handle, error): + handle.write('hello %s %s\n' % (self.client_id, self.port), + on_written) + + def on_written(handle, error): + handle.close() + + handle = pyuv.TCP(self.server.loop) + handle.connect(('127.0.0.1', int(port)), on_connected) |
