summaryrefslogtreecommitdiff
path: root/network/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'network/client.py')
-rw-r--r--network/client.py226
1 files changed, 226 insertions, 0 deletions
diff --git a/network/client.py b/network/client.py
new file mode 100644
index 0000000..5f454d7
--- /dev/null
+++ b/network/client.py
@@ -0,0 +1,226 @@
+from threading import Lock as Mutex
+
+import dbus
+from dbus.mainloop.glib import DBusGMainLoop
+from gi.repository import GLib
+import logbook
+import zmq
+
+from network import avahi
+from network.browser import ConsoleServerBrowser
+
+log = logbook.Logger(__name__)
+
+dbus_loop = DBusGMainLoop()
+system_bus = dbus.SystemBus(mainloop=dbus_loop)
+
+avahi_server = dbus.Interface(
+ system_bus.get_object(avahi.DBUS_NAME, avahi.DBUS_PATH_SERVER),
+ avahi.DBUS_INTERFACE_SERVER,
+)
+
+def acquire_or_die(mutex, error):
+ if not mutex.acquire(False):
+ raise RuntimeError(error)
+
+class Discoverer(object):
+ # there seems to be a bug in my current zmq version
+ # otherwise, just set this to False.
+ IPv4_ONLY = True
+
+ def __init__(self, kind):
+ super(Discoverer, self).__init__()
+ self.kind = '_%s._tcp' % kind
+ self.services = {}
+
+ proto = avahi.PROTO_INET if self.IPv4_ONLY else avahi.PROTO_UNSPEC
+
+ self.avahi_browser = dbus.Interface(
+ system_bus.get_object(
+ avahi.DBUS_NAME,
+ avahi_server.ServiceBrowserNew(
+ avahi.IF_UNSPEC, proto,
+ self.kind, '',
+ dbus.UInt32(0),
+ )
+ ),
+ avahi.DBUS_INTERFACE_SERVICE_BROWSER,
+ )
+
+ self.avahi_browser.connect_to_signal('ItemNew', self.on_service_discovered)
+ self.avahi_browser.connect_to_signal('ItemRemove', self.on_service_removed)
+
+ def on_services_changed(self):
+ pass
+
+ def on_service_resolved(
+ self,
+ interface, protocol,
+ name, kind, domain,
+ host, aprotocol, address, port,
+ text, flags,
+ ):
+ key = (interface, protocol, name, kind, domain)
+
+ target = 'tcp://%s:%d' % (
+ '[' + address + ']'
+ if aprotocol == avahi.PROTO_INET6
+ else address,
+ port,
+ )
+
+ self.services[key] = {
+ 'name': name,
+ 'domain': domain,
+ 'host': host,
+ 'address': address,
+ 'port': port,
+ 'text': text,
+ 'target': target,
+ }
+
+ self.on_services_changed()
+
+ def on_service_error(self, key, error):
+ log.warning('Error resolving {}: {}', key, error)
+ self.services.pop(key, None)
+ self.on_services_changed()
+
+ def on_service_discovered(
+ self,
+ interface, protocol,
+ name, kind, domain,
+ flags,
+ ):
+ key = (interface, protocol, name, kind, domain)
+ log.debug('Discovered service {}', key)
+ if key in self.services:
+ log.warning('Discovered a service twice: {}', key)
+ return
+
+ self.services[key] = None
+
+ avahi_server.ResolveService(
+ interface, protocol, name, kind, domain,
+ avahi.PROTO_UNSPEC, dbus.UInt32(0),
+ reply_handler=self.on_service_resolved,
+ error_handler=lambda error: self.on_service_error(key, error),
+ )
+
+ def on_service_removed(
+ self,
+ interface, protocol,
+ name, kind, domain,
+ flags,
+ ):
+ key = (interface, protocol, name, kind, domain)
+ self.services.pop(key, None)
+ self.on_services_changed()
+
+class Client(Discoverer):
+ """
+ This is the counterpart to the Server. This client is capable of
+ discovering servers on the LAN and connecting to them (or to any other
+ server) using the same request/reply protocol (ZeroMQ).
+
+ :param kind: The machine-readable kind of server you want to connect to.
+ Currently only used by ``find_server()``.
+ """
+
+ def __init__(self, kind):
+ super(Client, self).__init__(kind)
+ self._send_mutex = Mutex()
+
+ self.socket = None
+
+ def connect(self, target, ctx=None):
+ """
+ Connect to a server. If you connect to multiple servers, ZeroMQ will
+ load-balance between them.
+
+ :param target: A ZeroMQ endpoint (e.g., "tcp://192.168.0.1:12345")
+ :param ctx: A ZeroMQ context (optional)
+ """
+ if ctx is None:
+ ctx = zmq.Context.instance()
+
+ self.socket = ctx.socket(zmq.REQ)
+ self.socket.connect(target)
+
+ def send(self, data):
+ """
+ Send a message to the server, wait for the response, and return it.
+ Both the sent and received messages are binary strings, not unicode.
+
+ :param data: The data to be sent to the server.
+ :rtype: string (the data returned from the server)
+ """
+ acquire_or_die(
+ self._send_mutex,
+ "You called send_async(), then send(): "
+ "you need to call recv_async() first.",
+ )
+ try:
+ self.socket.send(data)
+ except Exception:
+ raise
+ else:
+ return self.socket.recv()
+ finally:
+ self._send_mutex.release()
+
+ def send_async(self, data):
+ """
+ Send a message to the server, without waiting for a response.
+
+ Currently this method waits for acknowledgement of the message by a
+ server, but this extra latency will be removed in the future.
+
+ In order to get the result of the message, call ``.read_async``.
+
+ :param data: The data to be sent to the server.
+ """
+ acquire_or_die(
+ self._send_mutex,
+ "You tried to call send_async() twice. "
+ "You need to call recv_async() first.",
+ )
+ self.socket.send(data)
+
+ def recv_async(self):
+ """
+ After sending a message to the server with ``.send_async()``, calling
+ this method will wait for the response from the server and return it.
+
+ :rtype: string (the data returned from the server)
+ """
+ return self.socket.recv()
+
+ def on_services_changed(self):
+ super(Client, self).on_services_changed()
+ services = [
+ service
+ for service in self.services.values()
+ if service is not None
+ ]
+ self._browser.update(services)
+
+ def find_server(self, browser_cls=ConsoleServerBrowser, connect=True):
+ """
+ Find a server on the local network (using avahi) and (optionally)
+ connect to it.
+
+ :param browser: A Browser class to provide the user interface.
+ :param connect: If true, the client will immediately connect to the
+ chosen server.
+ :rtype: (string) The ZeroMQ endpoint of the chosen server.
+ """
+ mainloop = GLib.MainLoop()
+
+ self._browser = browser_cls(self)
+ endpoint = self._browser.run(mainloop.run)
+
+ if connect and endpoint is not None:
+ self.connect(endpoint)
+
+ return endpoint