Source code for netdumplings.dumplingchef

import logging
from typing import Optional

import scapy.packet

import netdumplings  # noqa
from ._shared import JSONSerializable


[docs]class DumplingChef: """ Base class for all dumpling chefs. **DumplingChef objects are instantiated for you by nd-sniff. You normally won't need to instantiate DumplingChef objects yourself. Instead you'll normally subclass DumplingChef in a Python module which you'll pass to nd-sniff on the commandline.** When instantiated, a DumplingChef registers itself with the given ``kitchen`` which will then take care of calling the chef's packet and interval handlers as appropriate. This class implements the following methods which will usually be overridden by subclasses: * :meth:`packet_handler` * :meth:`interval_handler` :param kitchen: The dumpling kitchen which is providing the network packets used to create the dumplings. """ # Setting assignable_to_kitchen to False (in a subclass) will ensure the # chef cannot be assigned to any kitchens via nd-sniff. assignable_to_kitchen = True def __init__( self, kitchen: Optional['netdumplings.DumplingKitchen'] = None, ) -> None: """ """ self.kitchen = kitchen self.name = type(self).__name__ self.dumplings_sent_count = 0 self._logger = logging.getLogger(__name__) if self.kitchen: self.kitchen.register_chef(self) def __repr__(self): return '{}(kitchen={})'.format(type(self).__name__, repr(self.kitchen))
[docs] def packet_handler(self, packet: scapy.packet.Raw) -> JSONSerializable: """ Called automatically by the dumpling kitchen (``nd-sniff``) whenever a new packet has been sniffed. This method is expected to be overridden by child classes. This base implementation returns a payload which is the string representation of the packet. The return value is turned into a dumpling by the kitchen. If ``None`` is returned then no dumpling will be created. :param packet: Network packet (from scapy). :rtype: Anything which is JSON-serializable. :return: Dumpling payload. """ payload = "{0}: {1}".format(type(self).__name__, packet.summary()) self._logger.debug("{0}: Received packet: {1}", self.name, packet.summary()) return payload
[docs] def interval_handler( self, interval: Optional[int] = None) -> JSONSerializable: """ Called automatically at regular intervals by the dumpling kitchen (``nd-sniff``). Allows for time-based (rather than purely packet-based) chefs to keep on cheffing even in the absence of fresh packets. This method is expected to be overridden by child classes. This base implementation does nothing but log a debug entry. The return value is turned into a dumpling by the kitchen. If ``None`` is returned then no dumpling will be created. :param interval: Frequency (in secs) of the time interval pokes. :rtype: Anything which is JSON-serializable. :return: Dumpling payload. """ self._logger.debug( "{0}: Received interval_handler poke", self.name) return None