diff options
Diffstat (limited to 'network')
-rw-r--r-- | network/__init__.py | 16 | ||||
-rw-r--r-- | network/avahi.py | 112 | ||||
-rw-r--r-- | network/browser.py | 52 | ||||
-rw-r--r-- | network/client.py | 226 | ||||
-rw-r--r-- | network/server.py | 148 | ||||
-rw-r--r-- | network/zmqglib.py | 46 |
6 files changed, 600 insertions, 0 deletions
diff --git a/network/__init__.py b/network/__init__.py new file mode 100644 index 0000000..3812173 --- /dev/null +++ b/network/__init__.py @@ -0,0 +1,16 @@ +import warnings + +from gi.repository import GLib + +# GLib.threads_init() is deprecated, which is awesome, but it prints out an +# error message which will be confusing to beginners. +# I don't want to remove this either, as the library still needs to be usable on +# older systems. +# So we monkey-patch the warning away. +_warn = warnings.warn +warnings.warn = lambda *a, **k: None +GLib.threads_init() +warnings.warn = _warn + +from network.client import Client +from network.server import Server diff --git a/network/avahi.py b/network/avahi.py new file mode 100644 index 0000000..6eb58d5 --- /dev/null +++ b/network/avahi.py @@ -0,0 +1,112 @@ +# This file is part of avahi. +# +# avahi is free software; you can redistribute it and/or modify it +# under the terms of the GNU Lesser General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# avahi is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY +# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public +# License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with avahi; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 +# USA. + +# Some definitions matching those in avahi-common/defs.h + +import dbus + +SERVER_INVALID, SERVER_REGISTERING, SERVER_RUNNING, SERVER_COLLISION, SERVER_FAILURE = range(0, 5) + +ENTRY_GROUP_UNCOMMITED, ENTRY_GROUP_REGISTERING, ENTRY_GROUP_ESTABLISHED, ENTRY_GROUP_COLLISION, ENTRY_GROUP_FAILURE = range(0, 5) + +DOMAIN_BROWSER_BROWSE, DOMAIN_BROWSER_BROWSE_DEFAULT, DOMAIN_BROWSER_REGISTER, DOMAIN_BROWSER_REGISTER_DEFAULT, DOMAIN_BROWSER_BROWSE_LEGACY = range(0, 5) + +PROTO_UNSPEC, PROTO_INET, PROTO_INET6 = -1, 0, 1 + +IF_UNSPEC = -1 + +PUBLISH_UNIQUE = 1 +PUBLISH_NO_PROBE = 2 +PUBLISH_NO_ANNOUNCE = 4 +PUBLISH_ALLOW_MULTIPLE = 8 +PUBLISH_NO_REVERSE = 16 +PUBLISH_NO_COOKIE = 32 +PUBLISH_UPDATE = 64 +PUBLISH_USE_WIDE_AREA = 128 +PUBLISH_USE_MULTICAST = 256 + +LOOKUP_USE_WIDE_AREA = 1 +LOOKUP_USE_MULTICAST = 2 +LOOKUP_NO_TXT = 4 +LOOKUP_NO_ADDRESS = 8 + +LOOKUP_RESULT_CACHED = 1 +LOOKUP_RESULT_WIDE_AREA = 2 +LOOKUP_RESULT_MULTICAST = 4 +LOOKUP_RESULT_LOCAL = 8 +LOOKUP_RESULT_OUR_OWN = 16 +LOOKUP_RESULT_STATIC = 32 + +SERVICE_COOKIE = "org.freedesktop.Avahi.cookie" +SERVICE_COOKIE_INVALID = 0 + +DBUS_NAME = "org.freedesktop.Avahi" +DBUS_INTERFACE_SERVER = DBUS_NAME + ".Server" +DBUS_PATH_SERVER = "/" +DBUS_INTERFACE_ENTRY_GROUP = DBUS_NAME + ".EntryGroup" +DBUS_INTERFACE_DOMAIN_BROWSER = DBUS_NAME + ".DomainBrowser" +DBUS_INTERFACE_SERVICE_TYPE_BROWSER = DBUS_NAME + ".ServiceTypeBrowser" +DBUS_INTERFACE_SERVICE_BROWSER = DBUS_NAME + ".ServiceBrowser" +DBUS_INTERFACE_ADDRESS_RESOLVER = DBUS_NAME + ".AddressResolver" +DBUS_INTERFACE_HOST_NAME_RESOLVER = DBUS_NAME + ".HostNameResolver" +DBUS_INTERFACE_SERVICE_RESOLVER = DBUS_NAME + ".ServiceResolver" +DBUS_INTERFACE_RECORD_BROWSER = DBUS_NAME + ".RecordBrowser" + +def byte_array_to_string(s): + r = "" + + for c in s: + + if c >= 32 and c < 127: + r += "%c" % c + else: + r += "." + + return r + +def txt_array_to_string_array(t): + l = [] + + for s in t: + l.append(byte_array_to_string(s)) + + return l + + +def string_to_byte_array(s): + r = [] + + for c in s: + r.append(dbus.Byte(ord(c))) + + return r + +def string_array_to_txt_array(t): + l = [] + + for s in t: + l.append(string_to_byte_array(s)) + + return l + +def dict_to_txt_array(txt_dict): + l = [] + + for k,v in txt_dict.items(): + l.append(string_to_byte_array("%s=%s" % (k,v))) + + return l diff --git a/network/browser.py b/network/browser.py new file mode 100644 index 0000000..faed1da --- /dev/null +++ b/network/browser.py @@ -0,0 +1,52 @@ +import curses +import os +from six.moves import input +import sys +from threading import Thread + +curses.setupterm(os.environ['TERM']) +def clear_terminal(): + f = sys.stdout + try: + f = f.buffer + except AttributeError: + pass + f.write(curses.tigetstr('clear')) + f.flush() + +class ConsoleServerBrowser(object): + def __init__(self, client): + self.client = client + self.result = None + self.services = [] + + def read_input(self): + while self.result is None: + line = input() + try: + self.result = int(line) + except ValueError: + pass + self.client.mainloop.quit() + + def update(self, services): + self.services = services + + # print the new output + output = '\n'.join( + '{0}: {name} @ {host} ({address}:{port})'.format(i, **service) + for i, service in enumerate(services, 1) + ) + clear_terminal() + print(output) + + def run(self, client_run): + t = Thread(target=self.read_input) + t.setDaemon(True) + t.start() + + client_run() + + service = self.services[self.result - 1] + return service['target'] + diff --git a/network/client.py b/network/client.py new file mode 100644 index 0000000..5f454d7 --- /dev/null +++ b/network/client.py @@ -0,0 +1,226 @@ +from threading import Lock as Mutex + +import dbus +from dbus.mainloop.glib import DBusGMainLoop +from gi.repository import GLib +import logbook +import zmq + +from network import avahi +from network.browser import ConsoleServerBrowser + +log = logbook.Logger(__name__) + +dbus_loop = DBusGMainLoop() +system_bus = dbus.SystemBus(mainloop=dbus_loop) + +avahi_server = dbus.Interface( + system_bus.get_object(avahi.DBUS_NAME, avahi.DBUS_PATH_SERVER), + avahi.DBUS_INTERFACE_SERVER, +) + +def acquire_or_die(mutex, error): + if not mutex.acquire(False): + raise RuntimeError(error) + +class Discoverer(object): + # there seems to be a bug in my current zmq version + # otherwise, just set this to False. + IPv4_ONLY = True + + def __init__(self, kind): + super(Discoverer, self).__init__() + self.kind = '_%s._tcp' % kind + self.services = {} + + proto = avahi.PROTO_INET if self.IPv4_ONLY else avahi.PROTO_UNSPEC + + self.avahi_browser = dbus.Interface( + system_bus.get_object( + avahi.DBUS_NAME, + avahi_server.ServiceBrowserNew( + avahi.IF_UNSPEC, proto, + self.kind, '', + dbus.UInt32(0), + ) + ), + avahi.DBUS_INTERFACE_SERVICE_BROWSER, + ) + + self.avahi_browser.connect_to_signal('ItemNew', self.on_service_discovered) + self.avahi_browser.connect_to_signal('ItemRemove', self.on_service_removed) + + def on_services_changed(self): + pass + + def on_service_resolved( + self, + interface, protocol, + name, kind, domain, + host, aprotocol, address, port, + text, flags, + ): + key = (interface, protocol, name, kind, domain) + + target = 'tcp://%s:%d' % ( + '[' + address + ']' + if aprotocol == avahi.PROTO_INET6 + else address, + port, + ) + + self.services[key] = { + 'name': name, + 'domain': domain, + 'host': host, + 'address': address, + 'port': port, + 'text': text, + 'target': target, + } + + self.on_services_changed() + + def on_service_error(self, key, error): + log.warning('Error resolving {}: {}', key, error) + self.services.pop(key, None) + self.on_services_changed() + + def on_service_discovered( + self, + interface, protocol, + name, kind, domain, + flags, + ): + key = (interface, protocol, name, kind, domain) + log.debug('Discovered service {}', key) + if key in self.services: + log.warning('Discovered a service twice: {}', key) + return + + self.services[key] = None + + avahi_server.ResolveService( + interface, protocol, name, kind, domain, + avahi.PROTO_UNSPEC, dbus.UInt32(0), + reply_handler=self.on_service_resolved, + error_handler=lambda error: self.on_service_error(key, error), + ) + + def on_service_removed( + self, + interface, protocol, + name, kind, domain, + flags, + ): + key = (interface, protocol, name, kind, domain) + self.services.pop(key, None) + self.on_services_changed() + +class Client(Discoverer): + """ + This is the counterpart to the Server. This client is capable of + discovering servers on the LAN and connecting to them (or to any other + server) using the same request/reply protocol (ZeroMQ). + + :param kind: The machine-readable kind of server you want to connect to. + Currently only used by ``find_server()``. + """ + + def __init__(self, kind): + super(Client, self).__init__(kind) + self._send_mutex = Mutex() + + self.socket = None + + def connect(self, target, ctx=None): + """ + Connect to a server. If you connect to multiple servers, ZeroMQ will + load-balance between them. + + :param target: A ZeroMQ endpoint (e.g., "tcp://192.168.0.1:12345") + :param ctx: A ZeroMQ context (optional) + """ + if ctx is None: + ctx = zmq.Context.instance() + + self.socket = ctx.socket(zmq.REQ) + self.socket.connect(target) + + def send(self, data): + """ + Send a message to the server, wait for the response, and return it. + Both the sent and received messages are binary strings, not unicode. + + :param data: The data to be sent to the server. + :rtype: string (the data returned from the server) + """ + acquire_or_die( + self._send_mutex, + "You called send_async(), then send(): " + "you need to call recv_async() first.", + ) + try: + self.socket.send(data) + except Exception: + raise + else: + return self.socket.recv() + finally: + self._send_mutex.release() + + def send_async(self, data): + """ + Send a message to the server, without waiting for a response. + + Currently this method waits for acknowledgement of the message by a + server, but this extra latency will be removed in the future. + + In order to get the result of the message, call ``.read_async``. + + :param data: The data to be sent to the server. + """ + acquire_or_die( + self._send_mutex, + "You tried to call send_async() twice. " + "You need to call recv_async() first.", + ) + self.socket.send(data) + + def recv_async(self): + """ + After sending a message to the server with ``.send_async()``, calling + this method will wait for the response from the server and return it. + + :rtype: string (the data returned from the server) + """ + return self.socket.recv() + + def on_services_changed(self): + super(Client, self).on_services_changed() + services = [ + service + for service in self.services.values() + if service is not None + ] + self._browser.update(services) + + def find_server(self, browser_cls=ConsoleServerBrowser, connect=True): + """ + Find a server on the local network (using avahi) and (optionally) + connect to it. + + :param browser: A Browser class to provide the user interface. + :param connect: If true, the client will immediately connect to the + chosen server. + :rtype: (string) The ZeroMQ endpoint of the chosen server. + """ + mainloop = GLib.MainLoop() + + self._browser = browser_cls(self) + endpoint = self._browser.run(mainloop.run) + + if connect and endpoint is not None: + self.connect(endpoint) + + return endpoint diff --git a/network/server.py b/network/server.py new file mode 100644 index 0000000..9c63ea5 --- /dev/null +++ b/network/server.py @@ -0,0 +1,148 @@ +from random import randint + +import dbus +from dbus.mainloop.glib import DBusGMainLoop +from gi.repository import GLib +import logbook +import zmq + +from network import avahi +from network.zmqglib import ZMQSource + +log = logbook.Logger(__name__) + +dbus_loop = DBusGMainLoop() +system_bus = dbus.SystemBus(mainloop=dbus_loop) + +avahi_server = dbus.Interface( + system_bus.get_object(avahi.DBUS_NAME, avahi.DBUS_PATH_SERVER), + avahi.DBUS_INTERFACE_SERVER, +) + +def raise_not_implemented(message): + raise NotImplementedError() + +class Server(object): + """ + This is a network server capable of handling request/reply style + communication (using ZeroMQ). When running, it maintains a zeroconf + entry (using avahi) so that it can be discovered automatically over + LANs. + + :param name: The human-readable name of this server + (e.g., "Bob's Dungeon Game"). + :param kind: The machine-readable kind of server this is. + (e.g., "dungeon") + :param port: The TCP port to run the server on. + If unspecified, it picks a random port (in the future, this will + be better, and pick an *unused* port). + :param handler: The function to run when on recieving a client request. + :param text: A list of strings to put in the avahi record. + + The handler can also be specified using the server.handler decorator: + + >>> server = Server("Bob's Dungeon Game", "dungeon") + >>> @server.handler + ... def on_request(msg): + ... if msg == 'attack': + ... return 'You attack a monster!' + ... return 'Nothing happens.' + ... + >>> server.run() + """ + def __init__( + self, name, kind, + port=None, + handler=raise_not_implemented, + text=(), + ): + if port is None: + # TODO: fix this! + port = randint(49152, 65535) + + self.name = name + self.kind = '_%s._tcp' % kind + self.interface = '' + self.port = port + self.text = text + + self.handle = handler + + self.mainloop = GLib.MainLoop() + self.avahi_group = dbus.Interface( + system_bus.get_object( + avahi.DBUS_NAME, + avahi_server.EntryGroupNew() + ), + avahi.DBUS_INTERFACE_ENTRY_GROUP, + ) + + # Communication + + def handler(self, func): + """ + Decorator function which provides a more readable way of defining the + handler for a server. Be aware that this will override any previously + defined handler. + """ + self.handle = func + + def on_message(self, socket, data, *user_data): + response = self.handle(data) + socket.send(response) + + def run(self, ctx=None): + """ + Start up the server, and serve requests until ``.stop()`` is called. + + The server runs using a GLib mainloop so that avahi integration works, + which may be helpful for integrating other things (you could put a Gtk + application in the same thread). + """ + if ctx is None: + ctx = zmq.Context.instance() + + socket = ctx.socket(zmq.REP) + socket.bind('tcp://*:%d' % self.port) + log.debug('Running on port %d' % self.port) + + self.publish() + + mainctx = self.mainloop.get_context() + + source = ZMQSource(socket) + source.attach(mainctx) + source.set_callback(self.on_message) + + self.mainloop.run() + + self.unpublish() + + def stop(self): + """ + Stop serving requests. This function is thread-safe, so you can call it + from other threads, or you can call it from within the GLib mainloop + (using GLib.timeout_add or GLib.idle_add or some other signal) + """ + self.mainloop.quit() + + # Avahi + + def publish(self): + if self.interface != '': + raise NotImplementedError('Serving on a specific interface.') + host = '' + domain = '' + + help(self.avahi_group) + self.avahi_group.AddService( + avahi.IF_UNSPEC, avahi.PROTO_UNSPEC, + dbus.UInt32(0), + self.name, self.kind, domain, + host, dbus.UInt16(self.port), + avahi.string_array_to_txt_array(self.text), + ) + self.avahi_group.Commit() + + def unpublish(self): + self.avahi_group.Reset() diff --git a/network/zmqglib.py b/network/zmqglib.py new file mode 100644 index 0000000..8334235 --- /dev/null +++ b/network/zmqglib.py @@ -0,0 +1,46 @@ +import zmq + +from gi.repository import GLib + +READ_EVENTS = ( + GLib.IOCondition.IN | + GLib.IOCondition.HUP | + GLib.IOCondition.ERR +) + +class ZMQSource(GLib.Source): + def set_socket(self, sock): + self.sock = sock + + fd = sock.getsockopt(zmq.FD) + self.poll_fd = GLib.PollFD( + fd=fd, + events=READ_EVENTS, + ) + self.add_poll(self.poll_fd) + + def prepare(self): + return (False, -1) + + def check(self): + if not self.poll_fd.revents: + return False + + events = self.sock.getsockopt(zmq.EVENTS) + return bool(events & zmq.POLLIN) + + def dispatch(self, callback, user_data): + try: + while True: + data = self.sock.recv(zmq.NOBLOCK) + callback(self.sock, data, *user_data) + except zmq.ZMQError as e: + if e.errno != zmq.EAGAIN: + raise + return True +_ZMQSource = ZMQSource + +def ZMQSource(sock): + s = _ZMQSource() + s.set_socket(sock) + return s |