# Copyright (c) OpenMMLab. All rights reserved. import logging import time from abc import ABCMeta, abstractmethod from dataclasses import dataclass from queue import Empty from threading import Thread from typing import Callable, Dict, List, Optional, Tuple, Union from mmcv.utils.misc import is_method_overridden from mmpose.utils import StopWatch from ..utils import Message, VideoEndingMessage, limit_max_fps @dataclass class BufferInfo(): """Dataclass for buffer information.""" buffer_name: str input_name: Optional[str] = None essential: bool = False @dataclass class EventInfo(): """Dataclass for event handler information.""" event_name: str is_keyboard: bool = False handler_func: Optional[Callable] = None class Node(Thread, metaclass=ABCMeta): """Base interface of functional module. Parameters: name (str, optional): The node name (also thread name). enable_key (str|int, optional): Set a hot-key to toggle enable/disable of the node. If an int value is given, it will be treated as an ascii code of a key. Please note: 1. If enable_key is set, the bypass method need to be overridden to define the node behavior when disabled 2. Some hot-key has been use for particular use. For example: 'q', 'Q' and 27 are used for quit Default: None max_fps (int): Maximum FPS of the node. This is to avoid the node running unrestrictedly and causing large resource consuming. Default: 30 input_check_interval (float): Minimum interval (in millisecond) between checking if input is ready. Default: 0.001 enable (bool): Default enable/disable status. Default: True. daemon (bool): Whether node is a daemon. Default: True. """ def __init__(self, name: Optional[str] = None, enable_key: Optional[Union[str, int]] = None, max_fps: int = 30, input_check_interval: float = 0.01, enable: bool = True, daemon=False): super().__init__(name=name, daemon=daemon) self._runner = None self._enabled = enable self.enable_key = enable_key self.max_fps = max_fps self.input_check_interval = input_check_interval # A partitioned buffer manager the runner's buffer manager that # only accesses the buffers related to the node self._buffer_manager = None # Input/output buffers are a list of registered buffers' information self._input_buffers = [] self._output_buffers = [] # Event manager is a copy of assigned runner's event manager self._event_manager = None # A list of registered event information # See register_event() for more information # Note that we recommend to handle events in nodes by registering # handlers, but one can still access the raw event by _event_manager self._registered_events = [] # A list of (listener_threads, event_info) # See set_runner() for more information self._event_listener_threads = [] # A timer to calculate node FPS self._timer = StopWatch(window=10) # Register enable toggle key if self.enable_key: # If the node allows toggling enable, it should override the # `bypass` method to define the node behavior when disabled. if not is_method_overridden('bypass', Node, self.__class__): raise NotImplementedError( f'The node {self.__class__} does not support toggling' 'enable but got argument `enable_key`. To support toggling' 'enable, please override the `bypass` method of the node.') self.register_event( event_name=self.enable_key, is_keyboard=True, handler_func=self._toggle_enable, ) @property def registered_buffers(self): return self._input_buffers + self._output_buffers @property def registered_events(self): return self._registered_events.copy() def _toggle_enable(self): self._enabled = not self._enabled def register_input_buffer(self, buffer_name: str, input_name: str, essential: bool = False): """Register an input buffer, so that Node can automatically check if data is ready, fetch data from the buffers and format the inputs to feed into `process` method. This method can be invoked multiple times to register multiple input buffers. The subclass of Node should invoke `register_input_buffer` in its `__init__` method. Args: buffer_name (str): The name of the buffer input_name (str): The name of the fetched message from the corresponding buffer essential (bool): An essential input means the node will wait until the input is ready before processing. Otherwise, an inessential input will not block the processing, instead a None will be fetched if the buffer is not ready. """ buffer_info = BufferInfo(buffer_name, input_name, essential) self._input_buffers.append(buffer_info) def register_output_buffer(self, buffer_name: Union[str, List[str]]): """Register one or multiple output buffers, so that the Node can automatically send the output of the `process` method to these buffers. The subclass of Node should invoke `register_output_buffer` in its `__init__` method. Args: buffer_name (str|list): The name(s) of the output buffer(s). """ if not isinstance(buffer_name, list): buffer_name = [buffer_name] for name in buffer_name: buffer_info = BufferInfo(name) self._output_buffers.append(buffer_info) def register_event(self, event_name: str, is_keyboard: bool = False, handler_func: Optional[Callable] = None): """Register an event. All events used in the node need to be registered in __init__(). If a callable handler is given, a thread will be create to listen and handle the event when the node starts. Args: Args: event_name (str|int): The event name. If is_keyboard==True, event_name should be a str (as char) or an int (as ascii) is_keyboard (bool): Indicate whether it is an keyboard event. If True, the argument event_name will be regarded as a key indicator. handler_func (callable, optional): The event handler function, which should be a collable object with no arguments or return values. Default: None. """ event_info = EventInfo(event_name, is_keyboard, handler_func) self._registered_events.append(event_info) def set_runner(self, runner): # Get partitioned buffer manager buffer_names = [ buffer.buffer_name for buffer in self._input_buffers + self._output_buffers ] self._buffer_manager = runner.buffer_manager.get_sub_manager( buffer_names) # Get event manager self._event_manager = runner.event_manager def _get_input_from_buffer(self) -> Tuple[bool, Optional[Dict]]: """Get and pack input data if it's ready. The function returns a tuple of a status flag and a packed data dictionary. If input_buffer is ready, the status flag will be True, and the packed data is a dict whose items are buffer names and corresponding messages (unready additional buffers will give a `None`). Otherwise, the status flag is False and the packed data is None. Returns: bool: status flag dict[str, Message]: the packed inputs where the key is the buffer name and the value is the Message got from the corresponding buffer. """ buffer_manager = self._buffer_manager if buffer_manager is None: raise ValueError(f'{self.name}: Runner not set!') # Check that essential buffers are ready for buffer_info in self._input_buffers: if buffer_info.essential and buffer_manager.is_empty( buffer_info.buffer_name): return False, None # Default input result = { buffer_info.input_name: None for buffer_info in self._input_buffers } for buffer_info in self._input_buffers: try: result[buffer_info.input_name] = buffer_manager.get( buffer_info.buffer_name, block=False) except Empty: if buffer_info.essential: # Return unsuccessful flag if any # essential input is unready return False, None return True, result def _send_output_to_buffers(self, output_msg): """Send output of the process method to registered output buffers. Args: output_msg (Message): output message force (bool, optional): If True, block until the output message has been put into all output buffers. Default: False """ for buffer_info in self._output_buffers: buffer_name = buffer_info.buffer_name self._buffer_manager.put_force(buffer_name, output_msg) @abstractmethod def process(self, input_msgs: Dict[str, Message]) -> Union[Message, None]: """The core method that implement the function of the node. This method will be invoked when the node is enabled and the input data is ready. All subclasses of Node should override this method. Args: input_msgs (dict): The input data collected from the buffers. For each item, the key is the `input_name` of the registered input buffer, while the value is a Message instance fetched from the buffer (or None if the buffer is unessential and not ready). Returns: Message: The output message of the node. It will be send to all registered output buffers. """ def bypass(self, input_msgs: Dict[str, Message]) -> Union[Message, None]: """The method that defines the node behavior when disabled. Note that if the node has an `enable_key`, this method should be override. The method input/output is same as it of `process` method. Args: input_msgs (dict): The input data collected from the buffers. For each item, the key is the `input_name` of the registered input buffer, while the value is a Message instance fetched from the buffer (or None if the buffer is unessential and not ready). Returns: Message: The output message of the node. It will be send to all registered output buffers. """ raise NotImplementedError def _get_node_info(self): """Get route information of the node.""" info = {'fps': self._timer.report('_FPS_'), 'timestamp': time.time()} return info def on_exit(self): """This method will be invoked on event `_exit_`. Subclasses should override this method to specifying the exiting behavior. """ def run(self): """Method representing the Node's activity. This method override the standard run() method of Thread. Users should not override this method in subclasses. """ logging.info(f'Node {self.name} starts') # Create event listener threads for event_info in self._registered_events: if event_info.handler_func is None: continue def event_listener(): while True: with self._event_manager.wait_and_handle( event_info.event_name, event_info.is_keyboard): event_info.handler_func() t_listener = Thread(target=event_listener, args=(), daemon=True) t_listener.start() self._event_listener_threads.append(t_listener) # Loop while True: # Exit if self._event_manager.is_set('_exit_'): self.on_exit() break # Check if input is ready input_status, input_msgs = self._get_input_from_buffer() # Input is not ready if not input_status: time.sleep(self.input_check_interval) continue # If a VideoEndingMessage is received, broadcast the signal # without invoking process() or bypass() video_ending = False for _, msg in input_msgs.items(): if isinstance(msg, VideoEndingMessage): self._send_output_to_buffers(msg) video_ending = True break if video_ending: self.on_exit() break # Check if enabled if not self._enabled: # Override bypass method to define node behavior when disabled output_msg = self.bypass(input_msgs) else: with self._timer.timeit(): with limit_max_fps(self.max_fps): # Process output_msg = self.process(input_msgs) if output_msg: # Update route information node_info = self._get_node_info() output_msg.update_route_info(node=self, info=node_info) # Send output message if output_msg is not None: self._send_output_to_buffers(output_msg) logging.info(f'{self.name}: process ending.')