You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
272 lines
9.7 KiB
272 lines
9.7 KiB
# Copyright (c) OpenMMLab. All rights reserved.
|
|
import logging
|
|
import sys
|
|
import time
|
|
import warnings
|
|
from contextlib import nullcontext
|
|
from threading import Thread
|
|
from typing import Dict, List, Optional, Tuple, Union
|
|
|
|
import cv2
|
|
|
|
from .nodes import NODES
|
|
from .utils import (BufferManager, EventManager, FrameMessage, ImageCapture,
|
|
VideoEndingMessage, is_image_file, limit_max_fps)
|
|
|
|
DEFAULT_FRAME_BUFFER_SIZE = 1
|
|
DEFAULT_INPUT_BUFFER_SIZE = 1
|
|
DEFAULT_DISPLAY_BUFFER_SIZE = 0
|
|
DEFAULT_USER_BUFFER_SIZE = 1
|
|
|
|
|
|
class WebcamRunner():
|
|
"""An interface for building webcam application from config.
|
|
|
|
Parameters:
|
|
name (str): Runner name.
|
|
camera_id (int | str): The camera ID (usually the ID of the default
|
|
camera is 0). Alternatively a file path or a URL can be given
|
|
to load from a video or image file.
|
|
camera_frame_shape (tuple, optional): Set the frame shape of the
|
|
camera in (width, height). If not given, the default frame shape
|
|
will be used. This argument is only valid when using a camera
|
|
as the input source. Default: None
|
|
camera_fps (int): Video reading maximum FPS. Default: 30
|
|
buffer_sizes (dict, optional): A dict to specify buffer sizes. The
|
|
key is the buffer name and the value is the buffer size.
|
|
Default: None
|
|
nodes (list): Node configs.
|
|
"""
|
|
|
|
def __init__(self,
|
|
name: str = 'Default Webcam Runner',
|
|
camera_id: Union[int, str] = 0,
|
|
camera_fps: int = 30,
|
|
camera_frame_shape: Optional[Tuple[int, int]] = None,
|
|
synchronous: bool = False,
|
|
buffer_sizes: Optional[Dict[str, int]] = None,
|
|
nodes: Optional[List[Dict]] = None):
|
|
|
|
# Basic parameters
|
|
self.name = name
|
|
self.camera_id = camera_id
|
|
self.camera_fps = camera_fps
|
|
self.camera_frame_shape = camera_frame_shape
|
|
self.synchronous = synchronous
|
|
|
|
# self.buffer_manager manages data flow between runner and nodes
|
|
self.buffer_manager = BufferManager()
|
|
# self.event_manager manages event-based asynchronous communication
|
|
self.event_manager = EventManager()
|
|
# self.node_list holds all node instance
|
|
self.node_list = []
|
|
# self.vcap is used to read camera frames. It will be built when the
|
|
# runner starts running
|
|
self.vcap = None
|
|
|
|
# Register runner events
|
|
self.event_manager.register_event('_exit_', is_keyboard=False)
|
|
if self.synchronous:
|
|
self.event_manager.register_event('_idle_', is_keyboard=False)
|
|
|
|
# Register nodes
|
|
if not nodes:
|
|
raise ValueError('No node is registered to the runner.')
|
|
|
|
# Register default buffers
|
|
if buffer_sizes is None:
|
|
buffer_sizes = {}
|
|
# _frame_ buffer
|
|
frame_buffer_size = buffer_sizes.get('_frame_',
|
|
DEFAULT_FRAME_BUFFER_SIZE)
|
|
self.buffer_manager.register_buffer('_frame_', frame_buffer_size)
|
|
# _input_ buffer
|
|
input_buffer_size = buffer_sizes.get('_input_',
|
|
DEFAULT_INPUT_BUFFER_SIZE)
|
|
self.buffer_manager.register_buffer('_input_', input_buffer_size)
|
|
# _display_ buffer
|
|
display_buffer_size = buffer_sizes.get('_display_',
|
|
DEFAULT_DISPLAY_BUFFER_SIZE)
|
|
self.buffer_manager.register_buffer('_display_', display_buffer_size)
|
|
|
|
# Build all nodes:
|
|
for node_cfg in nodes:
|
|
logging.info(f'Create node: {node_cfg.name}({node_cfg.type})')
|
|
node = NODES.build(node_cfg)
|
|
|
|
# Register node
|
|
self.node_list.append(node)
|
|
|
|
# Register buffers
|
|
for buffer_info in node.registered_buffers:
|
|
buffer_name = buffer_info.buffer_name
|
|
if buffer_name in self.buffer_manager:
|
|
continue
|
|
buffer_size = buffer_sizes.get(buffer_name,
|
|
DEFAULT_USER_BUFFER_SIZE)
|
|
self.buffer_manager.register_buffer(buffer_name, buffer_size)
|
|
logging.info(
|
|
f'Register user buffer: {buffer_name}({buffer_size})')
|
|
|
|
# Register events
|
|
for event_info in node.registered_events:
|
|
self.event_manager.register_event(
|
|
event_name=event_info.event_name,
|
|
is_keyboard=event_info.is_keyboard)
|
|
logging.info(f'Register event: {event_info.event_name}')
|
|
|
|
# Set runner for nodes
|
|
# This step is performed after node building when the runner has
|
|
# create full buffer/event managers and can
|
|
for node in self.node_list:
|
|
logging.info(f'Set runner for node: {node.name})')
|
|
node.set_runner(self)
|
|
|
|
def _read_camera(self):
|
|
"""Continually read video frames and put them into buffers."""
|
|
|
|
camera_id = self.camera_id
|
|
fps = self.camera_fps
|
|
|
|
# Build video capture
|
|
if is_image_file(camera_id):
|
|
self.vcap = ImageCapture(camera_id)
|
|
else:
|
|
self.vcap = cv2.VideoCapture(camera_id)
|
|
if self.camera_frame_shape is not None:
|
|
width, height = self.camera_frame_shape
|
|
self.vcap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
|
|
self.vcap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
|
|
|
|
if not self.vcap.isOpened():
|
|
warnings.warn(f'Cannot open camera (ID={camera_id})')
|
|
sys.exit()
|
|
|
|
# Read video frames in a loop
|
|
first_frame = True
|
|
while not self.event_manager.is_set('_exit_'):
|
|
if self.synchronous:
|
|
if first_frame:
|
|
cm = nullcontext()
|
|
else:
|
|
# Read a new frame until the last frame has been processed
|
|
cm = self.event_manager.wait_and_handle('_idle_')
|
|
else:
|
|
# Read frames with a maximum FPS
|
|
cm = limit_max_fps(fps)
|
|
|
|
first_frame = False
|
|
|
|
with cm:
|
|
# Read a frame
|
|
ret_val, frame = self.vcap.read()
|
|
if ret_val:
|
|
# Put frame message (for display) into buffer `_frame_`
|
|
frame_msg = FrameMessage(frame)
|
|
self.buffer_manager.put('_frame_', frame_msg)
|
|
|
|
# Put input message (for model inference or other use)
|
|
# into buffer `_input_`
|
|
input_msg = FrameMessage(frame.copy())
|
|
input_msg.update_route_info(
|
|
node_name='Camera Info',
|
|
node_type='dummy',
|
|
info=self._get_camera_info())
|
|
self.buffer_manager.put_force('_input_', input_msg)
|
|
|
|
else:
|
|
# Put a video ending signal
|
|
self.buffer_manager.put('_frame_', VideoEndingMessage())
|
|
|
|
self.vcap.release()
|
|
|
|
def _display(self):
|
|
"""Continually obtain and display output frames."""
|
|
|
|
output_msg = None
|
|
|
|
while not self.event_manager.is_set('_exit_'):
|
|
while self.buffer_manager.is_empty('_display_'):
|
|
time.sleep(0.001)
|
|
|
|
# Set _idle_ to allow reading next frame
|
|
if self.synchronous:
|
|
self.event_manager.set('_idle_')
|
|
|
|
# acquire output from buffer
|
|
output_msg = self.buffer_manager.get('_display_')
|
|
|
|
# None indicates input stream ends
|
|
if isinstance(output_msg, VideoEndingMessage):
|
|
self.event_manager.set('_exit_')
|
|
break
|
|
|
|
img = output_msg.get_image()
|
|
|
|
# show in a window
|
|
cv2.imshow(self.name, img)
|
|
|
|
# handle keyboard input
|
|
key = cv2.waitKey(1)
|
|
if key != -1:
|
|
self._on_keyboard_input(key)
|
|
|
|
cv2.destroyAllWindows()
|
|
|
|
def _on_keyboard_input(self, key):
|
|
"""Handle the keyboard input."""
|
|
|
|
if key in (27, ord('q'), ord('Q')):
|
|
logging.info(f'Exit event captured: {key}')
|
|
self.event_manager.set('_exit_')
|
|
else:
|
|
logging.info(f'Keyboard event captured: {key}')
|
|
self.event_manager.set(key, is_keyboard=True)
|
|
|
|
def _get_camera_info(self):
|
|
"""Return the camera information in a dict."""
|
|
|
|
frame_width = self.vcap.get(cv2.CAP_PROP_FRAME_WIDTH)
|
|
frame_height = self.vcap.get(cv2.CAP_PROP_FRAME_HEIGHT)
|
|
frame_rate = self.vcap.get(cv2.CAP_PROP_FPS)
|
|
|
|
cam_info = {
|
|
'Camera ID': self.camera_id,
|
|
'Source resolution': f'{frame_width}x{frame_height}',
|
|
'Source FPS': frame_rate,
|
|
}
|
|
|
|
return cam_info
|
|
|
|
def run(self):
|
|
"""Program entry.
|
|
|
|
This method starts all nodes as well as video I/O in separate threads.
|
|
"""
|
|
|
|
try:
|
|
# Start node threads
|
|
non_daemon_nodes = []
|
|
for node in self.node_list:
|
|
node.start()
|
|
if not node.daemon:
|
|
non_daemon_nodes.append(node)
|
|
|
|
# Create a thread to read video frames
|
|
t_read = Thread(target=self._read_camera, args=())
|
|
t_read.start()
|
|
|
|
# Run display in the main thread
|
|
self._display()
|
|
logging.info('Display shut down')
|
|
|
|
# joint non-daemon nodes and runner threads
|
|
logging.info('Camera reading about to join')
|
|
t_read.join()
|
|
|
|
for node in non_daemon_nodes:
|
|
logging.info(f'Node {node.name} about to join')
|
|
node.join()
|
|
|
|
except KeyboardInterrupt:
|
|
pass
|
|
|