summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--network/__init__.py16
-rw-r--r--network/avahi.py112
-rw-r--r--network/browser.py52
-rw-r--r--network/client.py226
-rw-r--r--network/server.py148
-rw-r--r--network/zmqglib.py46
6 files changed, 600 insertions, 0 deletions
diff --git a/network/__init__.py b/network/__init__.py
new file mode 100644
index 0000000..3812173
--- /dev/null
+++ b/network/__init__.py
@@ -0,0 +1,16 @@
+import warnings
+
+from gi.repository import GLib
+
+# GLib.threads_init() is deprecated, which is awesome, but it prints out an
+# error message which will be confusing to beginners.
+# I don't want to remove this either, as the library still needs to be usable on
+# older systems.
+# So we monkey-patch the warning away.
+_warn = warnings.warn
+warnings.warn = lambda *a, **k: None
+GLib.threads_init()
+warnings.warn = _warn
+
+from network.client import Client
+from network.server import Server
diff --git a/network/avahi.py b/network/avahi.py
new file mode 100644
index 0000000..6eb58d5
--- /dev/null
+++ b/network/avahi.py
@@ -0,0 +1,112 @@
+# This file is part of avahi.
+#
+# avahi is free software; you can redistribute it and/or modify it
+# under the terms of the GNU Lesser General Public License as
+# published by the Free Software Foundation; either version 2 of the
+# License, or (at your option) any later version.
+#
+# avahi is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
+# License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with avahi; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+# USA.
+
+# Some definitions matching those in avahi-common/defs.h
+
+import dbus
+
+SERVER_INVALID, SERVER_REGISTERING, SERVER_RUNNING, SERVER_COLLISION, SERVER_FAILURE = range(0, 5)
+
+ENTRY_GROUP_UNCOMMITED, ENTRY_GROUP_REGISTERING, ENTRY_GROUP_ESTABLISHED, ENTRY_GROUP_COLLISION, ENTRY_GROUP_FAILURE = range(0, 5)
+
+DOMAIN_BROWSER_BROWSE, DOMAIN_BROWSER_BROWSE_DEFAULT, DOMAIN_BROWSER_REGISTER, DOMAIN_BROWSER_REGISTER_DEFAULT, DOMAIN_BROWSER_BROWSE_LEGACY = range(0, 5)
+
+PROTO_UNSPEC, PROTO_INET, PROTO_INET6 = -1, 0, 1
+
+IF_UNSPEC = -1
+
+PUBLISH_UNIQUE = 1
+PUBLISH_NO_PROBE = 2
+PUBLISH_NO_ANNOUNCE = 4
+PUBLISH_ALLOW_MULTIPLE = 8
+PUBLISH_NO_REVERSE = 16
+PUBLISH_NO_COOKIE = 32
+PUBLISH_UPDATE = 64
+PUBLISH_USE_WIDE_AREA = 128
+PUBLISH_USE_MULTICAST = 256
+
+LOOKUP_USE_WIDE_AREA = 1
+LOOKUP_USE_MULTICAST = 2
+LOOKUP_NO_TXT = 4
+LOOKUP_NO_ADDRESS = 8
+
+LOOKUP_RESULT_CACHED = 1
+LOOKUP_RESULT_WIDE_AREA = 2
+LOOKUP_RESULT_MULTICAST = 4
+LOOKUP_RESULT_LOCAL = 8
+LOOKUP_RESULT_OUR_OWN = 16
+LOOKUP_RESULT_STATIC = 32
+
+SERVICE_COOKIE = "org.freedesktop.Avahi.cookie"
+SERVICE_COOKIE_INVALID = 0
+
+DBUS_NAME = "org.freedesktop.Avahi"
+DBUS_INTERFACE_SERVER = DBUS_NAME + ".Server"
+DBUS_PATH_SERVER = "/"
+DBUS_INTERFACE_ENTRY_GROUP = DBUS_NAME + ".EntryGroup"
+DBUS_INTERFACE_DOMAIN_BROWSER = DBUS_NAME + ".DomainBrowser"
+DBUS_INTERFACE_SERVICE_TYPE_BROWSER = DBUS_NAME + ".ServiceTypeBrowser"
+DBUS_INTERFACE_SERVICE_BROWSER = DBUS_NAME + ".ServiceBrowser"
+DBUS_INTERFACE_ADDRESS_RESOLVER = DBUS_NAME + ".AddressResolver"
+DBUS_INTERFACE_HOST_NAME_RESOLVER = DBUS_NAME + ".HostNameResolver"
+DBUS_INTERFACE_SERVICE_RESOLVER = DBUS_NAME + ".ServiceResolver"
+DBUS_INTERFACE_RECORD_BROWSER = DBUS_NAME + ".RecordBrowser"
+
+def byte_array_to_string(s):
+ r = ""
+
+ for c in s:
+
+ if c >= 32 and c < 127:
+ r += "%c" % c
+ else:
+ r += "."
+
+ return r
+
+def txt_array_to_string_array(t):
+ l = []
+
+ for s in t:
+ l.append(byte_array_to_string(s))
+
+ return l
+
+
+def string_to_byte_array(s):
+ r = []
+
+ for c in s:
+ r.append(dbus.Byte(ord(c)))
+
+ return r
+
+def string_array_to_txt_array(t):
+ l = []
+
+ for s in t:
+ l.append(string_to_byte_array(s))
+
+ return l
+
+def dict_to_txt_array(txt_dict):
+ l = []
+
+ for k,v in txt_dict.items():
+ l.append(string_to_byte_array("%s=%s" % (k,v)))
+
+ return l
diff --git a/network/browser.py b/network/browser.py
new file mode 100644
index 0000000..faed1da
--- /dev/null
+++ b/network/browser.py
@@ -0,0 +1,52 @@
+import curses
+import os
+from six.moves import input
+import sys
+from threading import Thread
+
+curses.setupterm(os.environ['TERM'])
+def clear_terminal():
+ f = sys.stdout
+ try:
+ f = f.buffer
+ except AttributeError:
+ pass
+ f.write(curses.tigetstr('clear'))
+ f.flush()
+
+class ConsoleServerBrowser(object):
+ def __init__(self, client):
+ self.client = client
+ self.result = None
+ self.services = []
+
+ def read_input(self):
+ while self.result is None:
+ line = input()
+ try:
+ self.result = int(line)
+ except ValueError:
+ pass
+ self.client.mainloop.quit()
+
+ def update(self, services):
+ self.services = services
+
+ # print the new output
+ output = '\n'.join(
+ '{0}: {name} @ {host} ({address}:{port})'.format(i, **service)
+ for i, service in enumerate(services, 1)
+ )
+ clear_terminal()
+ print(output)
+
+ def run(self, client_run):
+ t = Thread(target=self.read_input)
+ t.setDaemon(True)
+ t.start()
+
+ client_run()
+
+ service = self.services[self.result - 1]
+ return service['target']
+
diff --git a/network/client.py b/network/client.py
new file mode 100644
index 0000000..5f454d7
--- /dev/null
+++ b/network/client.py
@@ -0,0 +1,226 @@
+from threading import Lock as Mutex
+
+import dbus
+from dbus.mainloop.glib import DBusGMainLoop
+from gi.repository import GLib
+import logbook
+import zmq
+
+from network import avahi
+from network.browser import ConsoleServerBrowser
+
+log = logbook.Logger(__name__)
+
+dbus_loop = DBusGMainLoop()
+system_bus = dbus.SystemBus(mainloop=dbus_loop)
+
+avahi_server = dbus.Interface(
+ system_bus.get_object(avahi.DBUS_NAME, avahi.DBUS_PATH_SERVER),
+ avahi.DBUS_INTERFACE_SERVER,
+)
+
+def acquire_or_die(mutex, error):
+ if not mutex.acquire(False):
+ raise RuntimeError(error)
+
+class Discoverer(object):
+ # there seems to be a bug in my current zmq version
+ # otherwise, just set this to False.
+ IPv4_ONLY = True
+
+ def __init__(self, kind):
+ super(Discoverer, self).__init__()
+ self.kind = '_%s._tcp' % kind
+ self.services = {}
+
+ proto = avahi.PROTO_INET if self.IPv4_ONLY else avahi.PROTO_UNSPEC
+
+ self.avahi_browser = dbus.Interface(
+ system_bus.get_object(
+ avahi.DBUS_NAME,
+ avahi_server.ServiceBrowserNew(
+ avahi.IF_UNSPEC, proto,
+ self.kind, '',
+ dbus.UInt32(0),
+ )
+ ),
+ avahi.DBUS_INTERFACE_SERVICE_BROWSER,
+ )
+
+ self.avahi_browser.connect_to_signal('ItemNew', self.on_service_discovered)
+ self.avahi_browser.connect_to_signal('ItemRemove', self.on_service_removed)
+
+ def on_services_changed(self):
+ pass
+
+ def on_service_resolved(
+ self,
+ interface, protocol,
+ name, kind, domain,
+ host, aprotocol, address, port,
+ text, flags,
+ ):
+ key = (interface, protocol, name, kind, domain)
+
+ target = 'tcp://%s:%d' % (
+ '[' + address + ']'
+ if aprotocol == avahi.PROTO_INET6
+ else address,
+ port,
+ )
+
+ self.services[key] = {
+ 'name': name,
+ 'domain': domain,
+ 'host': host,
+ 'address': address,
+ 'port': port,
+ 'text': text,
+ 'target': target,
+ }
+
+ self.on_services_changed()
+
+ def on_service_error(self, key, error):
+ log.warning('Error resolving {}: {}', key, error)
+ self.services.pop(key, None)
+ self.on_services_changed()
+
+ def on_service_discovered(
+ self,
+ interface, protocol,
+ name, kind, domain,
+ flags,
+ ):
+ key = (interface, protocol, name, kind, domain)
+ log.debug('Discovered service {}', key)
+ if key in self.services:
+ log.warning('Discovered a service twice: {}', key)
+ return
+
+ self.services[key] = None
+
+ avahi_server.ResolveService(
+ interface, protocol, name, kind, domain,
+ avahi.PROTO_UNSPEC, dbus.UInt32(0),
+ reply_handler=self.on_service_resolved,
+ error_handler=lambda error: self.on_service_error(key, error),
+ )
+
+ def on_service_removed(
+ self,
+ interface, protocol,
+ name, kind, domain,
+ flags,
+ ):
+ key = (interface, protocol, name, kind, domain)
+ self.services.pop(key, None)
+ self.on_services_changed()
+
+class Client(Discoverer):
+ """
+ This is the counterpart to the Server. This client is capable of
+ discovering servers on the LAN and connecting to them (or to any other
+ server) using the same request/reply protocol (ZeroMQ).
+
+ :param kind: The machine-readable kind of server you want to connect to.
+ Currently only used by ``find_server()``.
+ """
+
+ def __init__(self, kind):
+ super(Client, self).__init__(kind)
+ self._send_mutex = Mutex()
+
+ self.socket = None
+
+ def connect(self, target, ctx=None):
+ """
+ Connect to a server. If you connect to multiple servers, ZeroMQ will
+ load-balance between them.
+
+ :param target: A ZeroMQ endpoint (e.g., "tcp://192.168.0.1:12345")
+ :param ctx: A ZeroMQ context (optional)
+ """
+ if ctx is None:
+ ctx = zmq.Context.instance()
+
+ self.socket = ctx.socket(zmq.REQ)
+ self.socket.connect(target)
+
+ def send(self, data):
+ """
+ Send a message to the server, wait for the response, and return it.
+ Both the sent and received messages are binary strings, not unicode.
+
+ :param data: The data to be sent to the server.
+ :rtype: string (the data returned from the server)
+ """
+ acquire_or_die(
+ self._send_mutex,
+ "You called send_async(), then send(): "
+ "you need to call recv_async() first.",
+ )
+ try:
+ self.socket.send(data)
+ except Exception:
+ raise
+ else:
+ return self.socket.recv()
+ finally:
+ self._send_mutex.release()
+
+ def send_async(self, data):
+ """
+ Send a message to the server, without waiting for a response.
+
+ Currently this method waits for acknowledgement of the message by a
+ server, but this extra latency will be removed in the future.
+
+ In order to get the result of the message, call ``.read_async``.
+
+ :param data: The data to be sent to the server.
+ """
+ acquire_or_die(
+ self._send_mutex,
+ "You tried to call send_async() twice. "
+ "You need to call recv_async() first.",
+ )
+ self.socket.send(data)
+
+ def recv_async(self):
+ """
+ After sending a message to the server with ``.send_async()``, calling
+ this method will wait for the response from the server and return it.
+
+ :rtype: string (the data returned from the server)
+ """
+ return self.socket.recv()
+
+ def on_services_changed(self):
+ super(Client, self).on_services_changed()
+ services = [
+ service
+ for service in self.services.values()
+ if service is not None
+ ]
+ self._browser.update(services)
+
+ def find_server(self, browser_cls=ConsoleServerBrowser, connect=True):
+ """
+ Find a server on the local network (using avahi) and (optionally)
+ connect to it.
+
+ :param browser: A Browser class to provide the user interface.
+ :param connect: If true, the client will immediately connect to the
+ chosen server.
+ :rtype: (string) The ZeroMQ endpoint of the chosen server.
+ """
+ mainloop = GLib.MainLoop()
+
+ self._browser = browser_cls(self)
+ endpoint = self._browser.run(mainloop.run)
+
+ if connect and endpoint is not None:
+ self.connect(endpoint)
+
+ return endpoint
diff --git a/network/server.py b/network/server.py
new file mode 100644
index 0000000..9c63ea5
--- /dev/null
+++ b/network/server.py
@@ -0,0 +1,148 @@
+from random import randint
+
+import dbus
+from dbus.mainloop.glib import DBusGMainLoop
+from gi.repository import GLib
+import logbook
+import zmq
+
+from network import avahi
+from network.zmqglib import ZMQSource
+
+log = logbook.Logger(__name__)
+
+dbus_loop = DBusGMainLoop()
+system_bus = dbus.SystemBus(mainloop=dbus_loop)
+
+avahi_server = dbus.Interface(
+ system_bus.get_object(avahi.DBUS_NAME, avahi.DBUS_PATH_SERVER),
+ avahi.DBUS_INTERFACE_SERVER,
+)
+
+def raise_not_implemented(message):
+ raise NotImplementedError()
+
+class Server(object):
+ """
+ This is a network server capable of handling request/reply style
+ communication (using ZeroMQ). When running, it maintains a zeroconf
+ entry (using avahi) so that it can be discovered automatically over
+ LANs.
+
+ :param name: The human-readable name of this server
+ (e.g., "Bob's Dungeon Game").
+ :param kind: The machine-readable kind of server this is.
+ (e.g., "dungeon")
+ :param port: The TCP port to run the server on.
+ If unspecified, it picks a random port (in the future, this will
+ be better, and pick an *unused* port).
+ :param handler: The function to run when on recieving a client request.
+ :param text: A list of strings to put in the avahi record.
+
+ The handler can also be specified using the server.handler decorator:
+
+ >>> server = Server("Bob's Dungeon Game", "dungeon")
+ >>> @server.handler
+ ... def on_request(msg):
+ ... if msg == 'attack':
+ ... return 'You attack a monster!'
+ ... return 'Nothing happens.'
+ ...
+ >>> server.run()
+ """
+ def __init__(
+ self, name, kind,
+ port=None,
+ handler=raise_not_implemented,
+ text=(),
+ ):
+ if port is None:
+ # TODO: fix this!
+ port = randint(49152, 65535)
+
+ self.name = name
+ self.kind = '_%s._tcp' % kind
+ self.interface = ''
+ self.port = port
+ self.text = text
+
+ self.handle = handler
+
+ self.mainloop = GLib.MainLoop()
+ self.avahi_group = dbus.Interface(
+ system_bus.get_object(
+ avahi.DBUS_NAME,
+ avahi_server.EntryGroupNew()
+ ),
+ avahi.DBUS_INTERFACE_ENTRY_GROUP,
+ )
+
+ # Communication
+
+ def handler(self, func):
+ """
+ Decorator function which provides a more readable way of defining the
+ handler for a server. Be aware that this will override any previously
+ defined handler.
+ """
+ self.handle = func
+
+ def on_message(self, socket, data, *user_data):
+ response = self.handle(data)
+ socket.send(response)
+
+ def run(self, ctx=None):
+ """
+ Start up the server, and serve requests until ``.stop()`` is called.
+
+ The server runs using a GLib mainloop so that avahi integration works,
+ which may be helpful for integrating other things (you could put a Gtk
+ application in the same thread).
+ """
+ if ctx is None:
+ ctx = zmq.Context.instance()
+
+ socket = ctx.socket(zmq.REP)
+ socket.bind('tcp://*:%d' % self.port)
+ log.debug('Running on port %d' % self.port)
+
+ self.publish()
+
+ mainctx = self.mainloop.get_context()
+
+ source = ZMQSource(socket)
+ source.attach(mainctx)
+ source.set_callback(self.on_message)
+
+ self.mainloop.run()
+
+ self.unpublish()
+
+ def stop(self):
+ """
+ Stop serving requests. This function is thread-safe, so you can call it
+ from other threads, or you can call it from within the GLib mainloop
+ (using GLib.timeout_add or GLib.idle_add or some other signal)
+ """
+ self.mainloop.quit()
+
+ # Avahi
+
+ def publish(self):
+ if self.interface != '':
+ raise NotImplementedError('Serving on a specific interface.')
+ host = ''
+ domain = ''
+
+ help(self.avahi_group)
+ self.avahi_group.AddService(
+ avahi.IF_UNSPEC, avahi.PROTO_UNSPEC,
+ dbus.UInt32(0),
+ self.name, self.kind, domain,
+ host, dbus.UInt16(self.port),
+ avahi.string_array_to_txt_array(self.text),
+ )
+ self.avahi_group.Commit()
+
+ def unpublish(self):
+ self.avahi_group.Reset()
diff --git a/network/zmqglib.py b/network/zmqglib.py
new file mode 100644
index 0000000..8334235
--- /dev/null
+++ b/network/zmqglib.py
@@ -0,0 +1,46 @@
+import zmq
+
+from gi.repository import GLib
+
+READ_EVENTS = (
+ GLib.IOCondition.IN |
+ GLib.IOCondition.HUP |
+ GLib.IOCondition.ERR
+)
+
+class ZMQSource(GLib.Source):
+ def set_socket(self, sock):
+ self.sock = sock
+
+ fd = sock.getsockopt(zmq.FD)
+ self.poll_fd = GLib.PollFD(
+ fd=fd,
+ events=READ_EVENTS,
+ )
+ self.add_poll(self.poll_fd)
+
+ def prepare(self):
+ return (False, -1)
+
+ def check(self):
+ if not self.poll_fd.revents:
+ return False
+
+ events = self.sock.getsockopt(zmq.EVENTS)
+ return bool(events & zmq.POLLIN)
+
+ def dispatch(self, callback, user_data):
+ try:
+ while True:
+ data = self.sock.recv(zmq.NOBLOCK)
+ callback(self.sock, data, *user_data)
+ except zmq.ZMQError as e:
+ if e.errno != zmq.EAGAIN:
+ raise
+ return True
+_ZMQSource = ZMQSource
+
+def ZMQSource(sock):
+ s = _ZMQSource()
+ s.set_socket(sock)
+ return s