aboutsummaryrefslogtreecommitdiff
path: root/app/fanoutclient.py
diff options
context:
space:
mode:
Diffstat (limited to 'app/fanoutclient.py')
-rw-r--r--app/fanoutclient.py85
1 files changed, 85 insertions, 0 deletions
diff --git a/app/fanoutclient.py b/app/fanoutclient.py
new file mode 100644
index 0000000..0a215dc
--- /dev/null
+++ b/app/fanoutclient.py
@@ -0,0 +1,85 @@
+import pyuv
+import uuid
+import random
+
+class FanoutClient(object):
+ def __init__(self, address, loop=None):
+ self.address = address
+ self.host, self.port = address
+
+ self.ibuf = []
+
+ self.channel_handlers = {}
+
+ if not loop:
+ loop = pyuv.Loop.default_loop()
+
+ self._conn = pyuv.TCP(loop)
+
+ def connect(self):
+ self._conn.connect(self.address, self._on_connected)
+
+ def disconnect(self):
+ self._conn.close(self._on_close)
+
+ def on_connected(self):
+ pass
+
+ def on_close(self):
+ pass
+
+ def _on_connected(self, handle, error):
+ self.on_connected()
+ self._conn.start_read(self._on_read)
+
+ def _on_read(self, handle, data, error):
+ if not data:
+ self.on_close()
+ return
+
+ parts = data.split('\n')
+
+ while parts:
+ if len(parts) > 1:
+ self.ibuf.append(parts.pop(0))
+ self.handle_incoming("".join(self.ibuf))
+ self.ibuf = []
+ elif len(parts) == 1:
+ if not parts[0]:
+ break
+ else:
+ self.ibuf.append(parts.pop(0))
+
+ def _on_close(self, handle):
+ self.on_close()
+
+ def handle_incoming(self, data):
+ channel, msg = data.split("!")
+
+ if channel in self.channel_handlers.keys():
+ self.channel_handlers[channel](msg)
+ # else:
+ # print 'no handler for <%s>' % channel
+
+ def _send_command(self, cmd, callback=None):
+ # print "sending cmd <%s>" % cmd
+ def after_write(handle, error):
+ if callback:
+ callback()
+
+ self._conn.write("%s\n" % cmd, after_write)
+
+ def subscribe(self, channel, handler=None, **kwargs):
+ if handler:
+ self.channel_handlers[channel] = handler
+
+ self._send_command("subscribe %s" % channel, **kwargs)
+
+ def unsubscribe(self, channel, **kwargs):
+ if channel in self.channel_handlers:
+ del self.channel_handlers[channel]
+
+ self._send_command("unsubscribe %s" % channel, **kwargs)
+
+ def announce(self, channel, msg, **kwargs):
+ self._send_command("announce %s %s" % (channel, msg), **kwargs)