import time
import numpy as np
import cv2
from mediapipe import solutions
from mediapipe.framework.formats import landmark_pb2
import mediapipe as mp
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
landmark_result = None
# The user-defined result callback for processing live stream data.
# The result callback should only be specified when the running mode is set to the live stream mode.
# The result_callback provides:
# The pose landmarker detection results.
# The input image that the pose landmarker runs on.
# The input timestamp in milliseconds.
def print_result(result: vision.PoseLandmarkerResult, output_image: mp.Image, timestamp_ms: int):
global landmark_result
landmark_result = result
#print(output_image.numpy_view())
# output_image에 접근은 가능하지만 이 콜백 함수에서 cv2.imshow()를 이용한 이미지 출력은 안되는거 같다.
# 여러가지 방법으로 해 봤지만 정상적인 작동은 되지 않는다.
# Structure of PoseLandmakerResult
# mp.tasks.vision.PoseLandmarkerResult(
# pose_landmarks: List[List[landmark_module.NormalizedLandmark]],
# pose_world_landmarks: List[List[landmark_module.Landmark]],
# segmentation_masks: Optional[List[image_module.Image]] = None
# )
#print('pose landmarker result: {}'.format(result))
#print("pose landmark: ", result.pose_landmarks[0][0].visibility)
#print("pose world landmark: ", result.pose_world_landmarks[0][0].visibility)
# pose_landmarks_list = result.pose_landmarks
# for idx in range(len(pose_landmarks_list)):
# pose_landmarks = pose_landmarks_list[idx]
# for landmark in pose_landmarks:
# print("x: %.2f, y: %.2f, z: %.2f visibility: %.2f, presence: %.2f" %(landmark.x, landmark.y,
# landmark.z, landmark.visibility, landmark.presence))
def draw_landmarks_on_image(rgb_image, detection_result, bg_black):
# Black Background
if bg_black:
annotated_image = np.zeros_like(rgb_image)
else:
annotated_image = np.copy(rgb_image)
# 비동기 detect_async()가 사용 되었기 때문에 처음 몇 프레임은 detection_result가 None일 수 있다.
# 또, 이미지(프레임)에 사람이 없을땐 detection_result.pose_landmarks 리스트가 비어있게 된다.
# 그에 대한 처리를 하지 않으면 에러가 발생한다.
if detection_result is None or detection_result.pose_landmarks == []:
return annotated_image
pose_landmarks_list = detection_result.pose_landmarks
# Loop through the detected poses to visualize.
for idx in range(len(pose_landmarks_list)):
pose_landmarks = pose_landmarks_list[idx]
# Draw the pose landmarks.
pose_landmarks_proto = landmark_pb2.NormalizedLandmarkList()
pose_landmarks_proto.landmark.extend([
landmark_pb2.NormalizedLandmark(x=landmark.x, y=landmark.y, z=landmark.z) for landmark in pose_landmarks
])
solutions.drawing_utils.draw_landmarks(
annotated_image,
pose_landmarks_proto,
solutions.pose.POSE_CONNECTIONS,
solutions.drawing_styles.get_default_pose_landmarks_style())
return annotated_image
base_options = python.BaseOptions(model_asset_path='pose_landmarker_full.task')
options = vision.PoseLandmarkerOptions(base_options=base_options,running_mode=mp.tasks.vision.RunningMode.LIVE_STREAM,
result_callback=print_result, output_segmentation_masks=False)
# The running mode of the task. Default to the image mode. PoseLandmarker has three running modes:
# 1) The image mode for detecting pose landmarks on single image inputs.
# 2) The video mode for detecting pose landmarks on the decoded frames of a video.
# 3) The live stream mode for detecting pose landmarks on the live stream of input data, such as from camera.
# In this mode, the "result_callback" below must be specified to receive the detection results asynchronously.
detector = vision.PoseLandmarker.create_from_options(options)
cap = cv2.VideoCapture(0)
while True:
ret, cv_image = cap.read()
if not ret:
break
frame = mp.Image(image_format = mp.ImageFormat.SRGB, data = cv2.cvtColor(cv_image, cv2.COLOR_BGR2RGB))
# Sends live image data to perform pose landmarks detection.
# The results will be available via the "result_callback" provided in the PoseLandmarkerOptions.
# Only use this method when the PoseLandmarker is created with the live stream running mode.
# Only use this method when the PoseLandmarker is created with the live stream running mode.
# The input timestamps should be monotonically increasing for adjacent calls of this method.
# This method will return immediately after the input image is accepted. The results will be available via
# the result_callback provided in the PoseLandmarkerOptions. The detect_async method is designed to process
# live stream data such as camera input. To lower the overall latency, pose landmarker may drop the input
# images if needed. In other words, it's not guaranteed to have output per input image.
detector.detect_async(frame, int(time.time()*1000))
annotated_image = draw_landmarks_on_image(frame.numpy_view(), landmark_result, True)
cv2.imshow('sean', cv2.cvtColor(annotated_image, cv2.COLOR_RGB2BGR))
key = cv2.waitKey(25)
if key == 27: # ESC
break
if cap.isOpened():
cap.release()
cv2.destroyAllWindows()
detector.close()