aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOskari Timperi <oskari.timperi@iki.fi>2014-02-26 23:34:59 +0200
committerOskari Timperi <oskari.timperi@iki.fi>2014-02-26 23:34:59 +0200
commitab8a8e675d2dcdaaa7ccfbbdf4643246227e5e2a (patch)
treed1980aad9bea37b886a4e87d0252b965ad672f5f
downloaddistributed-test-ab8a8e675d2dcdaaa7ccfbbdf4643246227e5e2a.tar.gz
distributed-test-ab8a8e675d2dcdaaa7ccfbbdf4643246227e5e2a.zip
initial commit
-rw-r--r--.gitignore1
-rw-r--r--app/__init__.py0
-rw-r--r--app/distprocess.py293
-rw-r--r--app/fanoutclient.py85
-rw-r--r--app/terminatorscanner.py32
-rw-r--r--hop.py77
-rwxr-xr-xquitapps.sh3
-rwxr-xr-xrunapps.sh5
-rw-r--r--testapp.py6
9 files changed, 502 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..0d20b64
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1 @@
+*.pyc
diff --git a/app/__init__.py b/app/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/app/__init__.py
diff --git a/app/distprocess.py b/app/distprocess.py
new file mode 100644
index 0000000..f7081c8
--- /dev/null
+++ b/app/distprocess.py
@@ -0,0 +1,293 @@
+import random
+import signal
+import uuid
+
+import pyuv
+
+from fanoutclient import FanoutClient
+from terminatorscanner import TerminatorScannerBasic
+
+
+class PeerFinder(FanoutClient):
+ def __init__(self, process, address, loop=None):
+ super(PeerFinder, self).__init__(address, loop)
+ self.process = process
+
+ def join_handler(self, msg):
+ peer_id, port = msg.split('|')
+
+ if peer_id != self.process.client_id:
+ print "got peer at port %s" % port
+ self.process.peers.append((peer_id, port))
+ self.process.send_hello(peer_id, port)
+
+ def leave_handler(self, msg):
+ print 'peer %s has left' % msg
+ self.process.peers = [p for p in self.process.peers if p[0] != msg]
+
+ def control_handler(self, msg):
+ if msg == 'quit':
+ self.process.disconnect()
+
+
+ def on_connected(self):
+ print "peerfinder connected"
+ self.subscribe("join", self.join_handler)
+ self.subscribe("leave", self.leave_handler)
+ self.subscribe("control", self.control_handler)
+ self.announce("join", "%s|%s" % (self.process.client_id, self.process.port))
+
+ def disconnect(self):
+ def after_announce():
+ print 'LEFT'
+ super(PeerFinder, self).disconnect()
+ print 'LEAVING'
+ self.announce("leave", self.process.client_id, callback=after_announce)
+ # super(PeerFinder, self).disconnect()
+
+
+class DistProcess(object):
+ def __init__(self, loop=None):
+ self.peers = []
+
+ self.epoch = 0
+
+ self.freq = -1
+ self.new_freq = None
+ self.freq_epoch = 0
+
+ self.client_id = str(uuid.uuid4())
+
+ if not loop:
+ loop = pyuv.Loop.default_loop()
+
+ self.loop = loop
+
+ self.signal_watchers = set()
+
+ for sig in (signal.SIGINT, signal.SIGTERM):
+ handle = pyuv.Signal(loop)
+ handle.start(self._signal_cb, sig)
+ self.signal_watchers.add(handle)
+
+ self.peerfinder = PeerFinder(self, ('127.0.0.1', 9898), loop)
+
+ self.port = random.randint(10000, 50000)
+
+ self.clients = []
+
+ self.incoming = TerminatorScannerBasic('\n')
+ self.cmd_handlers = {}
+ self.cmd_handlers['hello'] = self.handle_hello
+ self.cmd_handlers['update'] = self.handle_update
+ self.cmd_handlers['elect_me'] = self.handle_elect_me
+ self.cmd_handlers['you_have_my_vote'] = self.handle_you_have_my_vote
+ self.cmd_handlers['hop'] = self.handle_hop
+
+ self.votes = []
+ self.voting = False
+ self.vote_timer = pyuv.Timer(self.loop)
+
+ def start(self):
+ self.server = pyuv.TCP(self.loop)
+ self.server.bind(('127.0.0.1', self.port))
+ self.server.listen(self._on_connection)
+
+ self.update_timer = pyuv.Timer(self.loop)
+ self.update_timer.start(self._on_timer, random.random() * 5 + 5,
+ random.random() * 5 + 5)
+
+ print "listening on port %s" % self.port
+
+ self.peerfinder.connect()
+
+ def _on_connection(self, server, error):
+ # print 'client connected'
+ client = pyuv.TCP(server.loop)
+ server.accept(client)
+ self.clients.append(client)
+ client.start_read(self._on_read)
+
+ def _on_read(self, client, data, error):
+ if data is None:
+ client.close()
+ self.clients.remove(client)
+ return
+
+ self.incoming.handle_read(data)
+
+ while self.incoming.incoming:
+ msg = self.incoming.incoming.pop(0)
+ self._handle_msg(msg)
+
+ def _handle_msg(self, msg):
+ # print 'received data <%s>' % msg
+
+ cmd, data = msg.split(' ', 1)
+
+ if cmd in self.cmd_handlers:
+ handler = self.cmd_handlers[cmd]
+ handler(data)
+
+ def handle_hello(self, data):
+ peer, port = data.split()
+ print 'got hello from %s' % peer
+ self.peers.append((peer, int(port)))
+
+ def handle_update(self, data):
+ print 'got update %s' % data
+
+ epoch, freq, freq_epoch = [int(x) for x in data.split(':')]
+
+ if freq_epoch > self.freq_epoch:
+ print 'freq updated to %s (freq_epoch %s)' % (freq, freq_epoch)
+ self.freq_epoch = freq_epoch
+ self.freq = freq
+
+ if epoch > self.epoch:
+ print 'updating epoch to %s' % epoch
+ self.epoch = epoch
+
+ def handle_elect_me(self, data):
+ peer, epoch = data.split()
+
+ print '%s wants to be elected (epoch %s)' % (peer, epoch)
+
+ epoch = int(epoch)
+
+ if epoch > self.epoch:
+ self.epoch = epoch
+
+ if epoch == self.epoch:
+ self.send_vote(peer)
+ else:
+ pass
+
+ def handle_you_have_my_vote(self, data):
+ peer, epoch = data.split()
+
+ print '%s votes for me (epoch %s)' % (peer, epoch)
+
+ epoch = int(epoch)
+
+ if epoch == self.epoch:
+ self.votes.append(peer)
+
+ def handle_hop(self, data):
+ self.change_freq()
+
+ def disconnect(self):
+ self.peerfinder.disconnect()
+ [c.close() for c in self.clients]
+ [h.close() for h in self.signal_watchers]
+ self.update_timer.stop()
+ self.vote_timer.stop()
+ self.server.close()
+
+ def _signal_cb(self, handle, signum):
+ self.disconnect()
+
+ def _on_timer(self, timer):
+ # print '_on_timer'
+
+ if (self.freq < 0) and (self.epoch == 0):
+ self.change_freq()
+ return
+
+ if self.peers:
+ self.send_update()
+
+ def change_freq(self):
+ print 'trying to change freq'
+
+ self.epoch += 1
+ self.new_freq = random.randint(0, 1000)
+
+ self.send_elect()
+
+ def on_timer(timer):
+ print 'voting time is up'
+
+ timer.stop()
+
+ vote_count = len(self.votes)
+ peer_count = len(self.peers)
+ peer_count_m = peer_count
+
+ if peer_count % 2 != 0:
+ peer_count_m = peer_count_m + 1
+
+ if ((peer_count > 1) and (vote_count > peer_count/2)) or ((peer_count == 1) and (vote_count == 1)):
+ print "i'm elected"
+
+ self.freq_epoch = self.epoch
+ self.freq = self.new_freq
+
+ self.send_update()
+
+ self.votes = []
+
+ self.voting = False
+
+ self.vote_timer.start(on_timer, 5, 0)
+
+ def send_update(self):
+ if self.voting:
+ return
+
+ peer, port = random.choice(self.peers)
+
+ print 'sending update to port %s' % port
+
+ def on_connected(handle, error):
+ handle.write('update %s:%s:%s\n' % (self.epoch, self.freq, self.freq_epoch),
+ after_write)
+
+ def after_write(handle, error):
+ handle.close()
+
+ handle = pyuv.TCP(self.server.loop)
+ handle.connect(('127.0.0.1', int(port)), on_connected)
+
+ def send_elect(self):
+ print 'sending elects'
+
+ self.voting = True
+
+ def on_connected(handle, error):
+ handle.write('elect_me %s %s\n' % (self.client_id, self.epoch), after_write)
+
+ def after_write(handle, error):
+ handle.close()
+
+ for peer, port in self.peers:
+ handle = pyuv.TCP(self.server.loop)
+ handle.connect(('127.0.0.1', int(port)), on_connected)
+
+ def send_vote(self, peer_id):
+ peer = [p for p in self.peers if p[0] == peer_id]
+
+ if peer:
+ _, port = peer[0]
+
+ def on_connected(handle, error):
+ handle.write('you_have_my_vote %s %s\n' % (self.client_id, self.epoch), after_write)
+
+ def after_write(handle, error):
+ handle.close()
+
+ handle = pyuv.TCP(self.server.loop)
+ handle.connect(('127.0.0.1', int(port)), on_connected)
+
+ def send_hello(self, peer, port):
+ print 'saying hello to port %s' % port
+
+ def on_connected(handle, error):
+ handle.write('hello %s %s\n' % (self.client_id, self.port),
+ on_written)
+
+ def on_written(handle, error):
+ handle.close()
+
+ handle = pyuv.TCP(self.server.loop)
+ handle.connect(('127.0.0.1', int(port)), on_connected)
diff --git a/app/fanoutclient.py b/app/fanoutclient.py
new file mode 100644
index 0000000..0a215dc
--- /dev/null
+++ b/app/fanoutclient.py
@@ -0,0 +1,85 @@
+import pyuv
+import uuid
+import random
+
+class FanoutClient(object):
+ def __init__(self, address, loop=None):
+ self.address = address
+ self.host, self.port = address
+
+ self.ibuf = []
+
+ self.channel_handlers = {}
+
+ if not loop:
+ loop = pyuv.Loop.default_loop()
+
+ self._conn = pyuv.TCP(loop)
+
+ def connect(self):
+ self._conn.connect(self.address, self._on_connected)
+
+ def disconnect(self):
+ self._conn.close(self._on_close)
+
+ def on_connected(self):
+ pass
+
+ def on_close(self):
+ pass
+
+ def _on_connected(self, handle, error):
+ self.on_connected()
+ self._conn.start_read(self._on_read)
+
+ def _on_read(self, handle, data, error):
+ if not data:
+ self.on_close()
+ return
+
+ parts = data.split('\n')
+
+ while parts:
+ if len(parts) > 1:
+ self.ibuf.append(parts.pop(0))
+ self.handle_incoming("".join(self.ibuf))
+ self.ibuf = []
+ elif len(parts) == 1:
+ if not parts[0]:
+ break
+ else:
+ self.ibuf.append(parts.pop(0))
+
+ def _on_close(self, handle):
+ self.on_close()
+
+ def handle_incoming(self, data):
+ channel, msg = data.split("!")
+
+ if channel in self.channel_handlers.keys():
+ self.channel_handlers[channel](msg)
+ # else:
+ # print 'no handler for <%s>' % channel
+
+ def _send_command(self, cmd, callback=None):
+ # print "sending cmd <%s>" % cmd
+ def after_write(handle, error):
+ if callback:
+ callback()
+
+ self._conn.write("%s\n" % cmd, after_write)
+
+ def subscribe(self, channel, handler=None, **kwargs):
+ if handler:
+ self.channel_handlers[channel] = handler
+
+ self._send_command("subscribe %s" % channel, **kwargs)
+
+ def unsubscribe(self, channel, **kwargs):
+ if channel in self.channel_handlers:
+ del self.channel_handlers[channel]
+
+ self._send_command("unsubscribe %s" % channel, **kwargs)
+
+ def announce(self, channel, msg, **kwargs):
+ self._send_command("announce %s %s" % (channel, msg), **kwargs)
diff --git a/app/terminatorscanner.py b/app/terminatorscanner.py
new file mode 100644
index 0000000..6bd99a1
--- /dev/null
+++ b/app/terminatorscanner.py
@@ -0,0 +1,32 @@
+class TerminatorScanner(object):
+ def __init__(self, terminator=None):
+ self.in_buffer = ''
+ self.terminator = terminator
+
+ def handle_read(self, data):
+ self.in_buffer = self.in_buffer + data
+
+ terminator_len = len(self.terminator)
+
+ while True:
+ index = self.in_buffer.find(self.terminator)
+
+ if index < 0:
+ break
+
+ incoming = self.in_buffer[:index]
+ self.in_buffer = self.in_buffer[index+terminator_len:]
+
+ self.handle_incoming(incoming)
+
+ def handle_incoming(self, data):
+ raise NotImplementedError('must be implemented in subclass')
+
+
+class TerminatorScannerBasic(TerminatorScanner):
+ def __init__(self, terminator=None):
+ super(TerminatorScannerBasic, self).__init__(terminator)
+ self.incoming = []
+
+ def handle_incoming(self, data):
+ self.incoming.append(data)
diff --git a/hop.py b/hop.py
new file mode 100644
index 0000000..670a288
--- /dev/null
+++ b/hop.py
@@ -0,0 +1,77 @@
+from app.fanoutclient import FanoutClient
+from app.terminatorscanner import TerminatorScannerBasic
+import pyuv
+import random
+
+class Hopper(FanoutClient):
+ def __init__(self, address, port):
+ super(Hopper, self).__init__(address)
+ self.server_listen_port = port
+
+ def on_connected(self):
+ self.subscribe('join')
+ self.subscribe('leave')
+ self.announce('join', 'hopper|%s' % self.server_listen_port)
+
+ def disconnect(self):
+ def after_announce():
+ super(Hopper, self).disconnect()
+ self.announce("leave", 'hopper', callback=after_announce)
+
+loop = pyuv.Loop.default_loop()
+ihandler = TerminatorScannerBasic('\n')
+
+clients = []
+ports = []
+
+def on_connection(server, error):
+ client = pyuv.TCP(server.loop)
+ server.accept(client)
+ clients.append(client)
+ client.start_read(on_read)
+
+def on_read(client, data, error):
+ if data is None:
+ client.close()
+ clients.remove(client)
+ return
+
+ ihandler.handle_read(data)
+
+ while ihandler.incoming:
+ msg = ihandler.incoming.pop(0)
+ cmd, data = msg.split(' ', 1)
+ if cmd == 'hello':
+ _, port = data.split()
+ ports.append(port)
+ print 'hello from %s' % port
+
+port = random.randint(10000, 50000)
+server = pyuv.TCP(loop)
+server.bind(('127.0.0.1', port))
+server.listen(on_connection)
+
+def on_timer(timer):
+ timer.stop()
+ port = random.choice(ports)
+
+ print 'sending hop command to port %s' % port
+
+ def on_connected(handle, error):
+ handle.write('hop hop\n', on_written)
+
+ def on_written(handle, error):
+ handle.close()
+ fanout_client.disconnect()
+ server.close()
+
+ handle = pyuv.TCP(timer.loop)
+ handle.connect(('127.0.0.1', int(port)), on_connected)
+
+timer = pyuv.Timer(loop)
+timer.start(on_timer, 2, 0)
+
+fanout_client = Hopper(('127.0.0.1', 9898), port)
+fanout_client.connect()
+
+loop.run()
diff --git a/quitapps.sh b/quitapps.sh
new file mode 100755
index 0000000..c65f57c
--- /dev/null
+++ b/quitapps.sh
@@ -0,0 +1,3 @@
+#!/bin/bash
+
+echo -n -e "subscribe control\nannounce control quit\n" | nc localhost 9898
diff --git a/runapps.sh b/runapps.sh
new file mode 100755
index 0000000..49ec7a1
--- /dev/null
+++ b/runapps.sh
@@ -0,0 +1,5 @@
+#!/bin/bash
+
+for i in $(seq 10); do
+ python testapp.py &
+done
diff --git a/testapp.py b/testapp.py
new file mode 100644
index 0000000..914aa9d
--- /dev/null
+++ b/testapp.py
@@ -0,0 +1,6 @@
+from app.distprocess import DistProcess
+
+d = DistProcess()
+d.start()
+
+d.loop.run()