aboutsummaryrefslogtreecommitdiff
path: root/app/fanoutclient.py
blob: 0a215dc552f1aa163119c5d72ba7aefb84ff7bc8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import pyuv
import uuid
import random

class FanoutClient(object):
    def __init__(self, address, loop=None):
        self.address = address
        self.host, self.port = address

        self.ibuf = []

        self.channel_handlers = {}

        if not loop:
            loop = pyuv.Loop.default_loop()

        self._conn = pyuv.TCP(loop)

    def connect(self):
        self._conn.connect(self.address, self._on_connected)

    def disconnect(self):
        self._conn.close(self._on_close)

    def on_connected(self):
        pass

    def on_close(self):
        pass

    def _on_connected(self, handle, error):
        self.on_connected()
        self._conn.start_read(self._on_read)

    def _on_read(self, handle, data, error):
        if not data:
            self.on_close()
            return

        parts = data.split('\n')

        while parts:
            if len(parts) > 1:
                self.ibuf.append(parts.pop(0))
                self.handle_incoming("".join(self.ibuf))
                self.ibuf = []
            elif len(parts) == 1:
                if not parts[0]:
                    break
                else:
                    self.ibuf.append(parts.pop(0))

    def _on_close(self, handle):
        self.on_close()

    def handle_incoming(self, data):
        channel, msg = data.split("!")

        if channel in self.channel_handlers.keys():
            self.channel_handlers[channel](msg)
        # else:
            # print 'no handler for <%s>' % channel

    def _send_command(self, cmd, callback=None):
        # print "sending cmd <%s>" % cmd
        def after_write(handle, error):
            if callback:
                callback()

        self._conn.write("%s\n" % cmd, after_write)

    def subscribe(self, channel, handler=None, **kwargs):
        if handler:
            self.channel_handlers[channel] = handler

        self._send_command("subscribe %s" % channel, **kwargs)

    def unsubscribe(self, channel, **kwargs):
        if channel in self.channel_handlers:
            del self.channel_handlers[channel]

        self._send_command("unsubscribe %s" % channel, **kwargs)

    def announce(self, channel, msg, **kwargs):
        self._send_command("announce %s %s" % (channel, msg), **kwargs)