You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
917 lines
34 KiB
917 lines
34 KiB
# Copyright (c) OpenMMLab. All rights reserved.
|
|
from typing import Dict, List, Optional, Tuple, Union
|
|
|
|
import cv2
|
|
import numpy as np
|
|
from mmcv import color_val
|
|
|
|
from mmpose.core import (apply_bugeye_effect, apply_sunglasses_effect,
|
|
imshow_bboxes, imshow_keypoints)
|
|
from mmpose.datasets import DatasetInfo
|
|
from ..utils import (FrameMessage, copy_and_paste, expand_and_clamp,
|
|
get_cached_file_path, get_eye_keypoint_ids,
|
|
get_face_keypoint_ids, get_wrist_keypoint_ids,
|
|
load_image_from_disk_or_url, screen_matting)
|
|
from .builder import NODES
|
|
from .frame_drawing_node import FrameDrawingNode
|
|
|
|
try:
|
|
import psutil
|
|
psutil_proc = psutil.Process()
|
|
except (ImportError, ModuleNotFoundError):
|
|
psutil_proc = None
|
|
|
|
|
|
@NODES.register_module()
|
|
class PoseVisualizerNode(FrameDrawingNode):
|
|
"""Draw the bbox and keypoint detection results.
|
|
|
|
Args:
|
|
name (str, optional): The node name (also thread name).
|
|
frame_buffer (str): The name of the input buffer.
|
|
output_buffer (str|list): The name(s) of the output buffer(s).
|
|
enable_key (str|int, optional): Set a hot-key to toggle enable/disable
|
|
of the node. If an int value is given, it will be treated as an
|
|
ascii code of a key. Please note:
|
|
1. If enable_key is set, the bypass method need to be
|
|
overridden to define the node behavior when disabled
|
|
2. Some hot-key has been use for particular use. For example:
|
|
'q', 'Q' and 27 are used for quit
|
|
Default: None
|
|
enable (bool): Default enable/disable status. Default: True.
|
|
kpt_thr (float): The threshold of keypoint score. Default: 0.3.
|
|
radius (int): The radius of keypoint. Default: 4.
|
|
thickness (int): The thickness of skeleton. Default: 2.
|
|
bbox_color (str|tuple|dict): If a single color (a str like 'green' or
|
|
a tuple like (0, 255, 0)), it will used to draw the bbox.
|
|
Optionally, a dict can be given as a map from class labels to
|
|
colors.
|
|
"""
|
|
|
|
default_bbox_color = {
|
|
'person': (148, 139, 255),
|
|
'cat': (255, 255, 0),
|
|
'dog': (255, 255, 0),
|
|
}
|
|
|
|
def __init__(self,
|
|
name: str,
|
|
frame_buffer: str,
|
|
output_buffer: Union[str, List[str]],
|
|
enable_key: Optional[Union[str, int]] = None,
|
|
enable: bool = True,
|
|
kpt_thr: float = 0.3,
|
|
radius: int = 4,
|
|
thickness: int = 2,
|
|
bbox_color: Optional[Union[str, Tuple, Dict]] = None):
|
|
|
|
super().__init__(name, frame_buffer, output_buffer, enable_key, enable)
|
|
|
|
self.kpt_thr = kpt_thr
|
|
self.radius = radius
|
|
self.thickness = thickness
|
|
if bbox_color is None:
|
|
self.bbox_color = self.default_bbox_color
|
|
elif isinstance(bbox_color, dict):
|
|
self.bbox_color = {k: color_val(v) for k, v in bbox_color.items()}
|
|
else:
|
|
self.bbox_color = color_val(bbox_color)
|
|
|
|
def draw(self, frame_msg):
|
|
canvas = frame_msg.get_image()
|
|
pose_results = frame_msg.get_pose_results()
|
|
|
|
if not pose_results:
|
|
return canvas
|
|
|
|
for pose_result in frame_msg.get_pose_results():
|
|
model_cfg = pose_result['model_cfg']
|
|
dataset_info = DatasetInfo(model_cfg.dataset_info)
|
|
|
|
# Extract bboxes and poses
|
|
bbox_preds = []
|
|
bbox_labels = []
|
|
pose_preds = []
|
|
for pred in pose_result['preds']:
|
|
if 'bbox' in pred:
|
|
bbox_preds.append(pred['bbox'])
|
|
bbox_labels.append(pred.get('label', None))
|
|
pose_preds.append(pred['keypoints'])
|
|
|
|
# Get bbox colors
|
|
if isinstance(self.bbox_color, dict):
|
|
bbox_colors = [
|
|
self.bbox_color.get(label, (0, 255, 0))
|
|
for label in bbox_labels
|
|
]
|
|
else:
|
|
bbox_labels = self.bbox_color
|
|
|
|
# Draw bboxes
|
|
if bbox_preds:
|
|
bboxes = np.vstack(bbox_preds)
|
|
|
|
imshow_bboxes(
|
|
canvas,
|
|
bboxes,
|
|
labels=bbox_labels,
|
|
colors=bbox_colors,
|
|
text_color='white',
|
|
font_scale=0.5,
|
|
show=False)
|
|
|
|
# Draw poses
|
|
if pose_preds:
|
|
imshow_keypoints(
|
|
canvas,
|
|
pose_preds,
|
|
skeleton=dataset_info.skeleton,
|
|
kpt_score_thr=0.3,
|
|
pose_kpt_color=dataset_info.pose_kpt_color,
|
|
pose_link_color=dataset_info.pose_link_color,
|
|
radius=self.radius,
|
|
thickness=self.thickness)
|
|
|
|
return canvas
|
|
|
|
|
|
@NODES.register_module()
|
|
class SunglassesNode(FrameDrawingNode):
|
|
|
|
def __init__(self,
|
|
name: str,
|
|
frame_buffer: str,
|
|
output_buffer: Union[str, List[str]],
|
|
enable_key: Optional[Union[str, int]] = None,
|
|
enable: bool = True,
|
|
src_img_path: Optional[str] = None):
|
|
|
|
super().__init__(name, frame_buffer, output_buffer, enable_key, enable)
|
|
|
|
if src_img_path is None:
|
|
# The image attributes to:
|
|
# https://www.vecteezy.com/free-vector/glass
|
|
# Glass Vectors by Vecteezy
|
|
src_img_path = 'demo/resources/sunglasses.jpg'
|
|
self.src_img = load_image_from_disk_or_url(src_img_path)
|
|
|
|
def draw(self, frame_msg):
|
|
canvas = frame_msg.get_image()
|
|
pose_results = frame_msg.get_pose_results()
|
|
if not pose_results:
|
|
return canvas
|
|
for pose_result in pose_results:
|
|
model_cfg = pose_result['model_cfg']
|
|
preds = pose_result['preds']
|
|
left_eye_idx, right_eye_idx = get_eye_keypoint_ids(model_cfg)
|
|
|
|
canvas = apply_sunglasses_effect(canvas, preds, self.src_img,
|
|
left_eye_idx, right_eye_idx)
|
|
return canvas
|
|
|
|
|
|
@NODES.register_module()
|
|
class SpriteNode(FrameDrawingNode):
|
|
|
|
def __init__(self,
|
|
name: str,
|
|
frame_buffer: str,
|
|
output_buffer: Union[str, List[str]],
|
|
enable_key: Optional[Union[str, int]] = None,
|
|
enable: bool = True,
|
|
src_img_path: Optional[str] = None):
|
|
|
|
super().__init__(name, frame_buffer, output_buffer, enable_key, enable)
|
|
|
|
if src_img_path is None:
|
|
# Sprites of Touhou characters :)
|
|
# Come from https://www.deviantart.com/shadowbendy/art/Touhou-rpg-maker-vx-Sprite-1-812746920 # noqa: E501
|
|
src_img_path = (
|
|
'https://user-images.githubusercontent.com/'
|
|
'26739999/151532276-33f968d9-917f-45e3-8a99-ebde60be83bb.png')
|
|
self.src_img = load_image_from_disk_or_url(
|
|
src_img_path, cv2.IMREAD_UNCHANGED)[:144, :108]
|
|
tmp = np.array(np.split(self.src_img, range(36, 144, 36), axis=0))
|
|
tmp = np.array(np.split(tmp, range(36, 108, 36), axis=2))
|
|
self.sprites = tmp
|
|
self.pos = None
|
|
self.anime_frame = 0
|
|
|
|
def apply_sprite_effect(self,
|
|
img,
|
|
pose_results,
|
|
left_hand_index,
|
|
right_hand_index,
|
|
kpt_thr=0.5):
|
|
"""Apply sprite effect.
|
|
|
|
Args:
|
|
img (np.ndarray): Image data.
|
|
pose_results (list[dict]): The pose estimation results containing:
|
|
- "keypoints" ([K,3]): detection result in [x, y, score]
|
|
left_hand_index (int): Keypoint index of left hand
|
|
right_hand_index (int): Keypoint index of right hand
|
|
kpt_thr (float): The score threshold of required keypoints.
|
|
"""
|
|
|
|
hm, wm = self.sprites.shape[2:4]
|
|
# anchor points in the sunglasses mask
|
|
if self.pos is None:
|
|
self.pos = [img.shape[0] // 2, img.shape[1] // 2]
|
|
|
|
if len(pose_results) == 0:
|
|
return img
|
|
|
|
kpts = pose_results[0]['keypoints']
|
|
|
|
if kpts[left_hand_index, 2] < kpt_thr and kpts[right_hand_index,
|
|
2] < kpt_thr:
|
|
aim = self.pos
|
|
else:
|
|
kpt_lhand = kpts[left_hand_index, :2][::-1]
|
|
kpt_rhand = kpts[right_hand_index, :2][::-1]
|
|
|
|
def distance(a, b):
|
|
return (a[0] - b[0])**2 + (a[1] - b[1])**2
|
|
|
|
# Go to the nearest hand
|
|
if distance(kpt_lhand, self.pos) < distance(kpt_rhand, self.pos):
|
|
aim = kpt_lhand
|
|
else:
|
|
aim = kpt_rhand
|
|
|
|
pos_thr = 15
|
|
if aim[0] < self.pos[0] - pos_thr:
|
|
# Go down
|
|
sprite = self.sprites[self.anime_frame][3]
|
|
self.pos[0] -= 1
|
|
elif aim[0] > self.pos[0] + pos_thr:
|
|
# Go up
|
|
sprite = self.sprites[self.anime_frame][0]
|
|
self.pos[0] += 1
|
|
elif aim[1] < self.pos[1] - pos_thr:
|
|
# Go right
|
|
sprite = self.sprites[self.anime_frame][1]
|
|
self.pos[1] -= 1
|
|
elif aim[1] > self.pos[1] + pos_thr:
|
|
# Go left
|
|
sprite = self.sprites[self.anime_frame][2]
|
|
self.pos[1] += 1
|
|
else:
|
|
# Stay
|
|
self.anime_frame = 0
|
|
sprite = self.sprites[self.anime_frame][0]
|
|
|
|
if self.anime_frame < 2:
|
|
self.anime_frame += 1
|
|
else:
|
|
self.anime_frame = 0
|
|
|
|
x = self.pos[0] - hm // 2
|
|
y = self.pos[1] - wm // 2
|
|
x = max(0, min(x, img.shape[0] - hm))
|
|
y = max(0, min(y, img.shape[0] - wm))
|
|
|
|
# Overlay image with transparent
|
|
img[x:x + hm, y:y +
|
|
wm] = (img[x:x + hm, y:y + wm] * (1 - sprite[:, :, 3:] / 255) +
|
|
sprite[:, :, :3] * (sprite[:, :, 3:] / 255)).astype('uint8')
|
|
|
|
return img
|
|
|
|
def draw(self, frame_msg):
|
|
canvas = frame_msg.get_image()
|
|
pose_results = frame_msg.get_pose_results()
|
|
if not pose_results:
|
|
return canvas
|
|
for pose_result in pose_results:
|
|
model_cfg = pose_result['model_cfg']
|
|
preds = pose_result['preds']
|
|
# left_hand_idx, right_hand_idx = get_wrist_keypoint_ids(model_cfg) # noqa: E501
|
|
left_hand_idx, right_hand_idx = get_eye_keypoint_ids(model_cfg)
|
|
|
|
canvas = self.apply_sprite_effect(canvas, preds, left_hand_idx,
|
|
right_hand_idx)
|
|
return canvas
|
|
|
|
|
|
@NODES.register_module()
|
|
class BackgroundNode(FrameDrawingNode):
|
|
|
|
def __init__(self,
|
|
name: str,
|
|
frame_buffer: str,
|
|
output_buffer: Union[str, List[str]],
|
|
enable_key: Optional[Union[str, int]] = None,
|
|
enable: bool = True,
|
|
src_img_path: Optional[str] = None,
|
|
cls_ids: Optional[List] = None,
|
|
cls_names: Optional[List] = None):
|
|
|
|
super().__init__(name, frame_buffer, output_buffer, enable_key, enable)
|
|
|
|
self.cls_ids = cls_ids
|
|
self.cls_names = cls_names
|
|
|
|
if src_img_path is None:
|
|
src_img_path = 'https://user-images.githubusercontent.com/'\
|
|
'11788150/149731957-abd5c908-9c7f-45b2-b7bf-'\
|
|
'821ab30c6a3e.jpg'
|
|
self.src_img = load_image_from_disk_or_url(src_img_path)
|
|
|
|
def apply_background_effect(self,
|
|
img,
|
|
det_results,
|
|
background_img,
|
|
effect_region=(0.2, 0.2, 0.8, 0.8)):
|
|
"""Change background.
|
|
|
|
Args:
|
|
img (np.ndarray): Image data.
|
|
det_results (list[dict]): The detection results containing:
|
|
|
|
- "cls_id" (int): Class index.
|
|
- "label" (str): Class label (e.g. 'person').
|
|
- "bbox" (ndarray:(5, )): bounding box result
|
|
[x, y, w, h, score].
|
|
- "mask" (ndarray:(w, h)): instance segmentation result.
|
|
background_img (np.ndarray): Background image.
|
|
effect_region (tuple(4, )): The region to apply mask,
|
|
the coordinates are normalized (x1, y1, x2, y2).
|
|
"""
|
|
if len(det_results) > 0:
|
|
# Choose the one with the highest score.
|
|
det_result = det_results[0]
|
|
bbox = det_result['bbox']
|
|
mask = det_result['mask'].astype(np.uint8)
|
|
img = copy_and_paste(img, background_img, mask, bbox,
|
|
effect_region)
|
|
return img
|
|
else:
|
|
return background_img
|
|
|
|
def draw(self, frame_msg):
|
|
canvas = frame_msg.get_image()
|
|
if canvas.shape != self.src_img.shape:
|
|
self.src_img = cv2.resize(self.src_img, canvas.shape[:2])
|
|
det_results = frame_msg.get_detection_results()
|
|
if not det_results:
|
|
return canvas
|
|
|
|
full_preds = []
|
|
for det_result in det_results:
|
|
preds = det_result['preds']
|
|
if self.cls_ids:
|
|
# Filter results by class ID
|
|
filtered_preds = [
|
|
p for p in preds if p['cls_id'] in self.cls_ids
|
|
]
|
|
elif self.cls_names:
|
|
# Filter results by class name
|
|
filtered_preds = [
|
|
p for p in preds if p['label'] in self.cls_names
|
|
]
|
|
else:
|
|
filtered_preds = preds
|
|
full_preds.extend(filtered_preds)
|
|
|
|
canvas = self.apply_background_effect(canvas, full_preds, self.src_img)
|
|
|
|
return canvas
|
|
|
|
|
|
@NODES.register_module()
|
|
class SaiyanNode(FrameDrawingNode):
|
|
|
|
def __init__(self,
|
|
name: str,
|
|
frame_buffer: str,
|
|
output_buffer: Union[str, List[str]],
|
|
enable_key: Optional[Union[str, int]] = None,
|
|
enable: bool = True,
|
|
hair_img_path: Optional[str] = None,
|
|
light_video_path: Optional[str] = None,
|
|
cls_ids: Optional[List] = None,
|
|
cls_names: Optional[List] = None):
|
|
|
|
super().__init__(name, frame_buffer, output_buffer, enable_key, enable)
|
|
|
|
self.cls_ids = cls_ids
|
|
self.cls_names = cls_names
|
|
|
|
if hair_img_path is None:
|
|
hair_img_path = 'https://user-images.githubusercontent.com/'\
|
|
'11788150/149732117-fcd2d804-dc2c-426c-bee7-'\
|
|
'94be6146e05c.png'
|
|
self.hair_img = load_image_from_disk_or_url(hair_img_path)
|
|
|
|
if light_video_path is None:
|
|
light_video_path = get_cached_file_path(
|
|
'https://'
|
|
'user-images.githubusercontent.com/11788150/149732080'
|
|
'-ea6cfeda-0dc5-4bbb-892a-3831e5580520.mp4')
|
|
self.light_video_path = light_video_path
|
|
self.light_video = cv2.VideoCapture(self.light_video_path)
|
|
|
|
def apply_saiyan_effect(self,
|
|
img,
|
|
pose_results,
|
|
saiyan_img,
|
|
light_frame,
|
|
face_indices,
|
|
bbox_thr=0.3,
|
|
kpt_thr=0.5):
|
|
"""Apply saiyan hair effect.
|
|
|
|
Args:
|
|
img (np.ndarray): Image data.
|
|
pose_results (list[dict]): The pose estimation results containing:
|
|
- "keypoints" ([K,3]): keypoint detection result
|
|
in [x, y, score]
|
|
saiyan_img (np.ndarray): Saiyan image with transparent background.
|
|
light_frame (np.ndarray): Light image with green screen.
|
|
face_indices (int): Keypoint index of the face
|
|
kpt_thr (float): The score threshold of required keypoints.
|
|
"""
|
|
img = img.copy()
|
|
im_shape = img.shape
|
|
# Apply lightning effects.
|
|
light_mask = screen_matting(light_frame, color='green')
|
|
|
|
# anchor points in the mask
|
|
pts_src = np.array(
|
|
[
|
|
[84, 398], # face kpt 0
|
|
[331, 393], # face kpt 16
|
|
[84, 145],
|
|
[331, 140]
|
|
],
|
|
dtype=np.float32)
|
|
|
|
for pose in pose_results:
|
|
bbox = pose['bbox']
|
|
|
|
if bbox[-1] < bbox_thr:
|
|
continue
|
|
|
|
mask_inst = pose['mask']
|
|
# cache
|
|
fg = img[np.where(mask_inst)]
|
|
|
|
bbox = expand_and_clamp(bbox[:4], im_shape, s=3.0)
|
|
# Apply light effects between fg and bg
|
|
img = copy_and_paste(
|
|
light_frame,
|
|
img,
|
|
light_mask,
|
|
effect_region=(bbox[0] / im_shape[1], bbox[1] / im_shape[0],
|
|
bbox[2] / im_shape[1], bbox[3] / im_shape[0]))
|
|
# pop
|
|
img[np.where(mask_inst)] = fg
|
|
|
|
# Apply Saiyan hair effects
|
|
kpts = pose['keypoints']
|
|
if kpts[face_indices[0], 2] < kpt_thr or kpts[face_indices[16],
|
|
2] < kpt_thr:
|
|
continue
|
|
|
|
kpt_0 = kpts[face_indices[0], :2]
|
|
kpt_16 = kpts[face_indices[16], :2]
|
|
# orthogonal vector
|
|
vo = (kpt_0 - kpt_16)[::-1] * [-1, 1]
|
|
|
|
# anchor points in the image by eye positions
|
|
pts_tar = np.vstack([kpt_0, kpt_16, kpt_0 + vo, kpt_16 + vo])
|
|
|
|
h_mat, _ = cv2.findHomography(pts_src, pts_tar)
|
|
patch = cv2.warpPerspective(
|
|
saiyan_img,
|
|
h_mat,
|
|
dsize=(img.shape[1], img.shape[0]),
|
|
borderValue=(0, 0, 0))
|
|
mask_patch = cv2.cvtColor(patch, cv2.COLOR_BGR2GRAY)
|
|
mask_patch = (mask_patch > 1).astype(np.uint8)
|
|
img = cv2.copyTo(patch, mask_patch, img)
|
|
|
|
return img
|
|
|
|
def draw(self, frame_msg):
|
|
canvas = frame_msg.get_image()
|
|
|
|
det_results = frame_msg.get_detection_results()
|
|
if not det_results:
|
|
return canvas
|
|
|
|
pose_results = frame_msg.get_pose_results()
|
|
if not pose_results:
|
|
return canvas
|
|
|
|
for pose_result in pose_results:
|
|
model_cfg = pose_result['model_cfg']
|
|
preds = pose_result['preds']
|
|
face_indices = get_face_keypoint_ids(model_cfg)
|
|
|
|
ret, frame = self.light_video.read()
|
|
if not ret:
|
|
self.light_video = cv2.VideoCapture(self.light_video_path)
|
|
ret, frame = self.light_video.read()
|
|
|
|
canvas = self.apply_saiyan_effect(canvas, preds, self.hair_img,
|
|
frame, face_indices)
|
|
|
|
return canvas
|
|
|
|
|
|
@NODES.register_module()
|
|
class MoustacheNode(FrameDrawingNode):
|
|
|
|
def __init__(self,
|
|
name: str,
|
|
frame_buffer: str,
|
|
output_buffer: Union[str, List[str]],
|
|
enable_key: Optional[Union[str, int]] = None,
|
|
enable: bool = True,
|
|
src_img_path: Optional[str] = None):
|
|
|
|
super().__init__(name, frame_buffer, output_buffer, enable_key, enable)
|
|
|
|
if src_img_path is None:
|
|
src_img_path = 'https://user-images.githubusercontent.com/'\
|
|
'11788150/149732141-3afbab55-252a-428c-b6d8'\
|
|
'-0e352f432651.jpeg'
|
|
self.src_img = load_image_from_disk_or_url(src_img_path)
|
|
|
|
def apply_moustache_effect(self,
|
|
img,
|
|
pose_results,
|
|
moustache_img,
|
|
face_indices,
|
|
kpt_thr=0.5):
|
|
"""Apply moustache effect.
|
|
|
|
Args:
|
|
img (np.ndarray): Image data.
|
|
pose_results (list[dict]): The pose estimation results containing:
|
|
- "keypoints" ([K,3]): keypoint detection result
|
|
in [x, y, score]
|
|
moustache_img (np.ndarray): Moustache image with white background.
|
|
left_eye_index (int): Keypoint index of left eye
|
|
right_eye_index (int): Keypoint index of right eye
|
|
kpt_thr (float): The score threshold of required keypoints.
|
|
"""
|
|
|
|
hm, wm = moustache_img.shape[:2]
|
|
# anchor points in the moustache mask
|
|
pts_src = np.array([[1164, 741], [1729, 741], [1164, 1244],
|
|
[1729, 1244]],
|
|
dtype=np.float32)
|
|
|
|
for pose in pose_results:
|
|
kpts = pose['keypoints']
|
|
if kpts[face_indices[32], 2] < kpt_thr \
|
|
or kpts[face_indices[34], 2] < kpt_thr \
|
|
or kpts[face_indices[61], 2] < kpt_thr \
|
|
or kpts[face_indices[63], 2] < kpt_thr:
|
|
continue
|
|
|
|
kpt_32 = kpts[face_indices[32], :2]
|
|
kpt_34 = kpts[face_indices[34], :2]
|
|
kpt_61 = kpts[face_indices[61], :2]
|
|
kpt_63 = kpts[face_indices[63], :2]
|
|
# anchor points in the image by eye positions
|
|
pts_tar = np.vstack([kpt_32, kpt_34, kpt_61, kpt_63])
|
|
|
|
h_mat, _ = cv2.findHomography(pts_src, pts_tar)
|
|
patch = cv2.warpPerspective(
|
|
moustache_img,
|
|
h_mat,
|
|
dsize=(img.shape[1], img.shape[0]),
|
|
borderValue=(255, 255, 255))
|
|
# mask the white background area in the patch with a threshold 200
|
|
mask = cv2.cvtColor(patch, cv2.COLOR_BGR2GRAY)
|
|
mask = (mask < 200).astype(np.uint8)
|
|
img = cv2.copyTo(patch, mask, img)
|
|
|
|
return img
|
|
|
|
def draw(self, frame_msg):
|
|
canvas = frame_msg.get_image()
|
|
pose_results = frame_msg.get_pose_results()
|
|
if not pose_results:
|
|
return canvas
|
|
for pose_result in pose_results:
|
|
model_cfg = pose_result['model_cfg']
|
|
preds = pose_result['preds']
|
|
face_indices = get_face_keypoint_ids(model_cfg)
|
|
canvas = self.apply_moustache_effect(canvas, preds, self.src_img,
|
|
face_indices)
|
|
return canvas
|
|
|
|
|
|
@NODES.register_module()
|
|
class BugEyeNode(FrameDrawingNode):
|
|
|
|
def draw(self, frame_msg):
|
|
canvas = frame_msg.get_image()
|
|
pose_results = frame_msg.get_pose_results()
|
|
if not pose_results:
|
|
return canvas
|
|
for pose_result in pose_results:
|
|
model_cfg = pose_result['model_cfg']
|
|
preds = pose_result['preds']
|
|
left_eye_idx, right_eye_idx = get_eye_keypoint_ids(model_cfg)
|
|
|
|
canvas = apply_bugeye_effect(canvas, preds, left_eye_idx,
|
|
right_eye_idx)
|
|
return canvas
|
|
|
|
|
|
@NODES.register_module()
|
|
class NoticeBoardNode(FrameDrawingNode):
|
|
|
|
default_content_lines = ['This is a notice board!']
|
|
|
|
def __init__(
|
|
self,
|
|
name: str,
|
|
frame_buffer: str,
|
|
output_buffer: Union[str, List[str]],
|
|
enable_key: Optional[Union[str, int]] = None,
|
|
enable: bool = True,
|
|
content_lines: Optional[List[str]] = None,
|
|
x_offset: int = 20,
|
|
y_offset: int = 20,
|
|
y_delta: int = 15,
|
|
text_color: Union[str, Tuple[int, int, int]] = 'black',
|
|
background_color: Union[str, Tuple[int, int, int]] = (255, 183, 0),
|
|
text_scale: float = 0.4,
|
|
):
|
|
super().__init__(name, frame_buffer, output_buffer, enable_key, enable)
|
|
|
|
self.x_offset = x_offset
|
|
self.y_offset = y_offset
|
|
self.y_delta = y_delta
|
|
self.text_color = color_val(text_color)
|
|
self.background_color = color_val(background_color)
|
|
self.text_scale = text_scale
|
|
|
|
if content_lines:
|
|
self.content_lines = content_lines
|
|
else:
|
|
self.content_lines = self.default_content_lines
|
|
|
|
def draw(self, frame_msg: FrameMessage) -> np.ndarray:
|
|
img = frame_msg.get_image()
|
|
canvas = np.full(img.shape, self.background_color, dtype=img.dtype)
|
|
|
|
x = self.x_offset
|
|
y = self.y_offset
|
|
|
|
max_len = max([len(line) for line in self.content_lines])
|
|
|
|
def _put_line(line=''):
|
|
nonlocal y
|
|
cv2.putText(canvas, line, (x, y), cv2.FONT_HERSHEY_DUPLEX,
|
|
self.text_scale, self.text_color, 1)
|
|
y += self.y_delta
|
|
|
|
for line in self.content_lines:
|
|
_put_line(line)
|
|
|
|
x1 = max(0, self.x_offset)
|
|
x2 = min(img.shape[1], int(x + max_len * self.text_scale * 20))
|
|
y1 = max(0, self.y_offset - self.y_delta)
|
|
y2 = min(img.shape[0], y)
|
|
|
|
src1 = canvas[y1:y2, x1:x2]
|
|
src2 = img[y1:y2, x1:x2]
|
|
img[y1:y2, x1:x2] = cv2.addWeighted(src1, 0.5, src2, 0.5, 0)
|
|
|
|
return img
|
|
|
|
|
|
@NODES.register_module()
|
|
class HatNode(FrameDrawingNode):
|
|
|
|
def __init__(self,
|
|
name: str,
|
|
frame_buffer: str,
|
|
output_buffer: Union[str, List[str]],
|
|
enable_key: Optional[Union[str, int]] = None,
|
|
src_img_path: Optional[str] = None):
|
|
|
|
super().__init__(name, frame_buffer, output_buffer, enable_key)
|
|
|
|
if src_img_path is None:
|
|
# The image attributes to:
|
|
# http://616pic.com/sucai/1m9i70p52.html
|
|
src_img_path = 'https://user-images.githubusercontent.' \
|
|
'com/28900607/149766271-2f591c19-9b67-4' \
|
|
'd92-8f94-c272396ca141.png'
|
|
self.src_img = load_image_from_disk_or_url(src_img_path,
|
|
cv2.IMREAD_UNCHANGED)
|
|
|
|
@staticmethod
|
|
def apply_hat_effect(img,
|
|
pose_results,
|
|
hat_img,
|
|
left_eye_index,
|
|
right_eye_index,
|
|
kpt_thr=0.5):
|
|
"""Apply hat effect.
|
|
Args:
|
|
img (np.ndarray): Image data.
|
|
pose_results (list[dict]): The pose estimation results containing:
|
|
- "keypoints" ([K,3]): keypoint detection result in
|
|
[x, y, score]
|
|
hat_img (np.ndarray): Hat image with white alpha channel.
|
|
left_eye_index (int): Keypoint index of left eye
|
|
right_eye_index (int): Keypoint index of right eye
|
|
kpt_thr (float): The score threshold of required keypoints.
|
|
"""
|
|
img_orig = img.copy()
|
|
|
|
img = img_orig.copy()
|
|
hm, wm = hat_img.shape[:2]
|
|
# anchor points in the sunglasses mask
|
|
a = 0.3
|
|
b = 0.7
|
|
pts_src = np.array([[a * wm, a * hm], [a * wm, b * hm],
|
|
[b * wm, a * hm], [b * wm, b * hm]],
|
|
dtype=np.float32)
|
|
|
|
for pose in pose_results:
|
|
kpts = pose['keypoints']
|
|
|
|
if kpts[left_eye_index, 2] < kpt_thr or \
|
|
kpts[right_eye_index, 2] < kpt_thr:
|
|
continue
|
|
|
|
kpt_leye = kpts[left_eye_index, :2]
|
|
kpt_reye = kpts[right_eye_index, :2]
|
|
# orthogonal vector to the left-to-right eyes
|
|
vo = 0.5 * (kpt_reye - kpt_leye)[::-1] * [-1, 1]
|
|
veye = 0.5 * (kpt_reye - kpt_leye)
|
|
|
|
# anchor points in the image by eye positions
|
|
pts_tar = np.vstack([
|
|
kpt_reye + 1 * veye + 5 * vo, kpt_reye + 1 * veye + 1 * vo,
|
|
kpt_leye - 1 * veye + 5 * vo, kpt_leye - 1 * veye + 1 * vo
|
|
])
|
|
|
|
h_mat, _ = cv2.findHomography(pts_src, pts_tar)
|
|
patch = cv2.warpPerspective(
|
|
hat_img,
|
|
h_mat,
|
|
dsize=(img.shape[1], img.shape[0]),
|
|
borderValue=(255, 255, 255))
|
|
# mask the white background area in the patch with a threshold 200
|
|
mask = (patch[:, :, -1] > 128)
|
|
patch = patch[:, :, :-1]
|
|
mask = mask * (cv2.cvtColor(patch, cv2.COLOR_BGR2GRAY) > 30)
|
|
mask = mask.astype(np.uint8)
|
|
|
|
img = cv2.copyTo(patch, mask, img)
|
|
return img
|
|
|
|
def draw(self, frame_msg):
|
|
canvas = frame_msg.get_image()
|
|
pose_results = frame_msg.get_pose_results()
|
|
if not pose_results:
|
|
return canvas
|
|
for pose_result in pose_results:
|
|
model_cfg = pose_result['model_cfg']
|
|
preds = pose_result['preds']
|
|
left_eye_idx, right_eye_idx = get_eye_keypoint_ids(model_cfg)
|
|
|
|
canvas = self.apply_hat_effect(canvas, preds, self.src_img,
|
|
left_eye_idx, right_eye_idx)
|
|
return canvas
|
|
|
|
|
|
@NODES.register_module()
|
|
class FirecrackerNode(FrameDrawingNode):
|
|
|
|
def __init__(self,
|
|
name: str,
|
|
frame_buffer: str,
|
|
output_buffer: Union[str, List[str]],
|
|
enable_key: Optional[Union[str, int]] = None,
|
|
src_img_path: Optional[str] = None):
|
|
|
|
super().__init__(name, frame_buffer, output_buffer, enable_key)
|
|
|
|
if src_img_path is None:
|
|
self.src_img_path = 'https://user-images.githubusercontent' \
|
|
'.com/28900607/149766281-6376055c-ed8b' \
|
|
'-472b-991f-60e6ae6ee1da.gif'
|
|
src_img = cv2.VideoCapture(self.src_img_path)
|
|
|
|
self.frame_list = []
|
|
ret, frame = src_img.read()
|
|
while frame is not None:
|
|
self.frame_list.append(frame)
|
|
ret, frame = src_img.read()
|
|
self.num_frames = len(self.frame_list)
|
|
self.frame_idx = 0
|
|
self.frame_period = 4 # each frame in gif lasts for 4 frames in video
|
|
|
|
@staticmethod
|
|
def apply_firecracker_effect(img,
|
|
pose_results,
|
|
firecracker_img,
|
|
left_wrist_idx,
|
|
right_wrist_idx,
|
|
kpt_thr=0.5):
|
|
"""Apply firecracker effect.
|
|
Args:
|
|
img (np.ndarray): Image data.
|
|
pose_results (list[dict]): The pose estimation results containing:
|
|
- "keypoints" ([K,3]): keypoint detection result in
|
|
[x, y, score]
|
|
firecracker_img (np.ndarray): Firecracker image with white
|
|
background.
|
|
left_wrist_idx (int): Keypoint index of left wrist
|
|
right_wrist_idx (int): Keypoint index of right wrist
|
|
kpt_thr (float): The score threshold of required keypoints.
|
|
"""
|
|
|
|
hm, wm = firecracker_img.shape[:2]
|
|
# anchor points in the firecracker mask
|
|
pts_src = np.array([[0. * wm, 0. * hm], [0. * wm, 1. * hm],
|
|
[1. * wm, 0. * hm], [1. * wm, 1. * hm]],
|
|
dtype=np.float32)
|
|
|
|
h, w = img.shape[:2]
|
|
h_tar = h / 3
|
|
w_tar = h_tar / hm * wm
|
|
|
|
for pose in pose_results:
|
|
kpts = pose['keypoints']
|
|
|
|
if kpts[left_wrist_idx, 2] > kpt_thr:
|
|
kpt_lwrist = kpts[left_wrist_idx, :2]
|
|
# anchor points in the image by eye positions
|
|
pts_tar = np.vstack([
|
|
kpt_lwrist - [w_tar / 2, 0],
|
|
kpt_lwrist - [w_tar / 2, -h_tar],
|
|
kpt_lwrist + [w_tar / 2, 0],
|
|
kpt_lwrist + [w_tar / 2, h_tar]
|
|
])
|
|
|
|
h_mat, _ = cv2.findHomography(pts_src, pts_tar)
|
|
patch = cv2.warpPerspective(
|
|
firecracker_img,
|
|
h_mat,
|
|
dsize=(img.shape[1], img.shape[0]),
|
|
borderValue=(255, 255, 255))
|
|
# mask the white background area in the patch with
|
|
# a threshold 200
|
|
mask = cv2.cvtColor(patch, cv2.COLOR_BGR2GRAY)
|
|
mask = (mask < 240).astype(np.uint8)
|
|
img = cv2.copyTo(patch, mask, img)
|
|
|
|
if kpts[right_wrist_idx, 2] > kpt_thr:
|
|
kpt_rwrist = kpts[right_wrist_idx, :2]
|
|
|
|
# anchor points in the image by eye positions
|
|
pts_tar = np.vstack([
|
|
kpt_rwrist - [w_tar / 2, 0],
|
|
kpt_rwrist - [w_tar / 2, -h_tar],
|
|
kpt_rwrist + [w_tar / 2, 0],
|
|
kpt_rwrist + [w_tar / 2, h_tar]
|
|
])
|
|
|
|
h_mat, _ = cv2.findHomography(pts_src, pts_tar)
|
|
patch = cv2.warpPerspective(
|
|
firecracker_img,
|
|
h_mat,
|
|
dsize=(img.shape[1], img.shape[0]),
|
|
borderValue=(255, 255, 255))
|
|
# mask the white background area in the patch with
|
|
# a threshold 200
|
|
mask = cv2.cvtColor(patch, cv2.COLOR_BGR2GRAY)
|
|
mask = (mask < 240).astype(np.uint8)
|
|
img = cv2.copyTo(patch, mask, img)
|
|
|
|
return img
|
|
|
|
def draw(self, frame_msg):
|
|
canvas = frame_msg.get_image()
|
|
pose_results = frame_msg.get_pose_results()
|
|
if not pose_results:
|
|
return canvas
|
|
|
|
frame = self.frame_list[self.frame_idx // self.frame_period]
|
|
for pose_result in pose_results:
|
|
model_cfg = pose_result['model_cfg']
|
|
preds = pose_result['preds']
|
|
left_wrist_idx, right_wrist_idx = get_wrist_keypoint_ids(model_cfg)
|
|
|
|
canvas = self.apply_firecracker_effect(canvas, preds, frame,
|
|
left_wrist_idx,
|
|
right_wrist_idx)
|
|
self.frame_idx = (self.frame_idx + 1) % (
|
|
self.num_frames * self.frame_period)
|
|
|
|
return canvas
|
|
|