import zmq from gi.repository import GLib READ_EVENTS = ( GLib.IOCondition.IN | GLib.IOCondition.HUP | GLib.IOCondition.ERR ) class ZMQSource(GLib.Source): def set_socket(self, sock): self.sock = sock fd = sock.getsockopt(zmq.FD) self.poll_fd = GLib.PollFD( fd=fd, events=READ_EVENTS, ) self.add_poll(self.poll_fd) def prepare(self): return (False, -1) def check(self): if not self.poll_fd.revents: return False events = self.sock.getsockopt(zmq.EVENTS) return bool(events & zmq.POLLIN) def dispatch(self, callback, user_data): try: while True: data = self.sock.recv(zmq.NOBLOCK) callback(self.sock, data, *user_data) except zmq.ZMQError as e: if e.errno != zmq.EAGAIN: raise return True _ZMQSource = ZMQSource def ZMQSource(sock): s = _ZMQSource() s.set_socket(sock) return s