Source code for netdumplings.dumplingeater
import asyncio
import json
import logging
import signal
from typing import Callable, List, Optional
import websockets
from .dumpling import Dumpling
from .exceptions import InvalidDumpling
from ._shared import ND_CLOSE_MSGS, HUB_HOST, HUB_OUT_PORT
[docs]class DumplingEater:
"""
Base helper class for Python-based dumpling eaters.
Connects to ``nd-hub`` and listens for any dumplings made by the provided
``chef_filter`` (or all chefs if ``chef_filter`` is ``None``). Can be
given ``async`` callables for any of the following events:
``on_connect(websocket_uri, websocket_obj)``
invoked when the connection to ``nd-hub`` is made
``on_dumpling(dumpling)``
invoked whenever a dumpling is emitted from ``nd-hub``
``on_connection_lost(e)``
invoked when the connection to ``nd-hub`` is closed
**The above callables must be** ``async def`` **methods**.
:param name: Name of the dumpling eater. Is ideally unique per eater.
:param hub: Address where ``nd-hub`` is sending dumplings from.
:param chef_filter: List of chef names whose dumplings this eater wants to
receive. ``None`` means get all chefs' dumplings.
:param on_connect: Called when connection to ``nd-hub`` is made. Is passed
two parameters: the ``nd-hub`` websocket URI (string) and websocket
object (:class:`websockets.client.WebSocketClientProtocol`).
:param on_dumpling: Called whenever a dumpling is received. Is passed the
dumpling as a Python dict.
:param on_connection_lost: Called when connection to ``nd-hub`` is lost. Is
passed the associated exception object.
"""
def __init__(
self,
name: str = 'nameless_eater',
hub: str ='{}:{}'.format(HUB_HOST, HUB_OUT_PORT),
*,
chef_filter: Optional[List[str]] = None,
on_connect: Optional[Callable] = None,
on_dumpling: Optional[Callable] = None,
on_connection_lost: Optional[Callable] = None) -> None:
self.name = name
self.chef_filter = chef_filter
self.hub = hub
self.hub_ws = "ws://{0}".format(hub)
# Configure handlers. If we're not provided with handlers then we
# fall back on the default handlers or the handlers provided by a
# subclass.
self.on_connect = (
on_connect if on_connect is not None else self.on_connect
)
self.on_dumpling = (
on_dumpling if on_dumpling is not None else self.on_dumpling
)
self.on_connection_lost = (
on_connection_lost if on_connection_lost is not None
else self.on_connection_lost
)
self._was_connected = False
self._logger_name = "{}.{}".format(__name__, self.name)
self.logger = logging.getLogger(self._logger_name)
def __repr__(self):
def handler_string(attr):
# We can't use 'repr(self.handler)' for callables because it causes
# an infinite loop as the repr of the handler includes the repr of
# the handler (etc). So we replace handler reprs with
# '<callable: name>'.
return (
'<callable: {}>'.format(attr.__name__) if callable(attr)
else repr(attr)
)
return (
'{}('
'name={}, '
'hub={}, '
'chef_filter={}, '
'on_connect={}, '
'on_dumpling={}, '
'on_connection_lost={})'.format(
type(self).__name__,
repr(self.name),
repr(self.hub),
repr(self.chef_filter),
handler_string(self.on_connect),
handler_string(self.on_dumpling),
handler_string(self.on_connection_lost),
)
)
async def _grab_dumplings(self, dumpling_count=None):
"""
Receives all dumplings from the hub and looks for any dumplings which
were created by the chef(s) we're interested in. All those dumplings
are then passed to the on_dumpling handler (after being converted from
their JSON form back into a Dumpling instance).
:param dumpling_count: Number of dumplings to eat. ``None`` means eat
forever.
"""
dumplings_eaten = 0
websocket = await websockets.client.connect(self.hub_ws)
self._was_connected = True
self.logger.info("{0}: Connected to dumpling hub at {1}".format(
self.name, self.hub_ws))
try:
# Announce ourselves to the dumpling hub.
await websocket.send(json.dumps({'eater_name': self.name}))
if self.on_connect:
await self.on_connect(self.hub_ws, websocket)
while True:
# Eat a single dumpling.
dumpling_json = await websocket.recv()
# Create a Dumpling from the JSON received over the websocket.
# Note that invalid dumplings will probably be stripped out by
# the hub already.
try:
dumpling = Dumpling.from_json(dumpling_json)
except InvalidDumpling as e:
self.logger.error("{0}: Invalid dumpling: {1}".format(
self.name, e))
continue
self.logger.debug("{0}: Received dumpling from {1}".format(
self.name, dumpling.chef_name))
# Call the on_dumpling handler if this dumpling is from a
# chef that we've registered interest in.
if (self.chef_filter is None or
dumpling.chef_name in self.chef_filter):
self.logger.debug(
"{0}: Calling dumpling handler {1}".format(
self.name, self.on_dumpling))
dumplings_eaten += 1
await self.on_dumpling(dumpling)
# Stop eating dumplings if we've reached our threshold.
if dumpling_count is not None and \
dumplings_eaten >= dumpling_count:
await websocket.close(*ND_CLOSE_MSGS['eater_full'])
break
except asyncio.CancelledError:
self.logger.warning(
f"\n{self.name}: Connection to dumpling hub cancelled; "
f"closing..."
)
try:
await websocket.close(*ND_CLOSE_MSGS['conn_cancelled'])
except websockets.exceptions.InvalidState:
pass
except websockets.exceptions.ConnectionClosed as e:
self.logger.warning(
"{}: Lost connection to dumpling hub: {}".format(self.name, e)
)
if self.on_connection_lost:
await self.on_connection_lost(e)
@staticmethod
def _interrupt_handler():
"""
Signal handler. Cancels all running async tasks.
"""
tasks = asyncio.Task.all_tasks()
for task in tasks:
task.cancel()
[docs] def run(self, dumpling_count=None):
"""
Run the dumpling eater.
This will block until the desired ``dumpling_count`` is met.
:param dumpling_count: Number of dumplings to eat. ``None`` means eat
forever.
"""
self.logger.info("{0}: Running dumpling eater".format(self.name))
if not callable(self.on_dumpling):
self.logger.error(
"{0}: on_dumpling handler is not callable".format(self.name))
return
self.logger.debug("{0}: Looking for dumpling hub at {1}".format(
self.name, self.hub_ws))
self.logger.debug("{0}: Chefs: {1}".format(
self.name,
", ".join(self.chef_filter) if self.chef_filter else 'all')
)
try:
asyncio.run(self._grab_dumplings(dumpling_count))
except OSError as e:
self.logger.warning(
"{0}: There was a problem with the dumpling hub connection. "
"Is nd-hub available?".format(self.name))
self.logger.warning("{0}: {1}".format(self.name, e))
finally:
if self._was_connected:
self.logger.warning(
"{0}: Done eating dumplings.".format(self.name))
[docs] async def on_connect(self, websocket_uri, websocket_obj):
"""
Default on_connect handler.
This will be used if an ``on_connect`` handler is not provided during
instantiation, and if a handler is not provided by a DumplingEater
subclass.
Only logs an warning-level log entry.
"""
self.logger.warning(
'{}: No on_connect handler specified; ignoring '
'connection.'.format(self.name)
)
[docs] async def on_dumpling(self, dumpling):
"""
Default on_dumpling handler.
This will be used if an ``on_dumpling`` handler is not provided during
instantiation, and if a handler is not provided by a DumplingEater
subclass.
Only logs an warning-level log entry.
"""
self.logger.warning(
'{}: No on_dumpling handler specified; ignoring '
'dumpling.'.format(self.name)
)
[docs] async def on_connection_lost(self, e):
"""
Default on_connection_lost handler.
This will be used if an ``on_connection_lost`` handler is not provided
during instantiation, and if a handler is not provided by a
DumplingEater subclass.
Only logs an warning-level log entry.
"""
self.logger.warning(
'{}: No on_connection_lost handler specified; ignoring '
'connection loss.'.format(self.name)
)