mirror of
https://github.com/Kalit31/AR-Video-Streaming-over-WebRTC.git
synced 2025-10-05 07:06:52 +08:00
134 lines
5.6 KiB
Python
134 lines
5.6 KiB
Python
import os
|
|
import sys
|
|
import cv2
|
|
import itertools
|
|
import numpy as np
|
|
import mediapipe as mp
|
|
import socket
|
|
import struct
|
|
|
|
|
|
mp_face_mesh = mp.solutions.face_mesh
|
|
face_mesh_videos = mp_face_mesh.FaceMesh(static_image_mode=False, max_num_faces=1, min_detection_confidence=0.5, min_tracking_confidence=0.3)
|
|
# eye = cv2.imread('/home/epl/Desktop/WebRTC_research/ar-filters/filter_imgs/eye.jpg')
|
|
# mouth = cv2.imread('/home/epl/Desktop/WebRTC_research/ar-filters/filter_imgs/smile.png')
|
|
eye = cv2.imread('/home/kalit/Desktop/GeorgiaTech/Fall_2024/CS_8903/WebRTC_research/ar-filters/filter_imgs/eye.jpg')
|
|
mouth = cv2.imread('/home/kalit/Desktop/GeorgiaTech/Fall_2024/CS_8903/WebRTC_research/ar-filters/filter_imgs/smile.png')
|
|
|
|
def detectFacialLandmarks(image, face_mesh):
|
|
return face_mesh.process(image[:,:,::-1])
|
|
|
|
def getSize(image, face_landmarks, INDEXES):
|
|
img_height, img_width, _ = image.shape
|
|
INDEXES_LIST = list(itertools.chain(*INDEXES))
|
|
landmarks = []
|
|
for INDEX in INDEXES_LIST:
|
|
landmarks.append([int(face_landmarks.landmark[INDEX].x*img_width), int(face_landmarks.landmark[INDEX].y*img_height)])
|
|
|
|
landmarks = np.array(landmarks)
|
|
_, _, width, height = cv2.boundingRect(landmarks)
|
|
return width, height, landmarks
|
|
|
|
def isOpen(image, face_mesh_results, face_part, threshold=5):
|
|
img_height, img_width, _ = image.shape
|
|
status = {}
|
|
if face_part == "MOUTH":
|
|
INDEXES = mp_face_mesh.FACEMESH_LIPS
|
|
elif face_part == "LEFT EYE":
|
|
INDEXES = mp_face_mesh.FACEMESH_LEFT_EYE
|
|
else:
|
|
INDEXES = mp_face_mesh.FACEMESH_RIGHT_EYE
|
|
|
|
for face_no, face_landmarks in enumerate(face_mesh_results.multi_face_landmarks):
|
|
_, height, _ = getSize(image, face_landmarks, INDEXES)
|
|
_, face_height, _ = getSize(image, face_landmarks, mp_face_mesh.FACEMESH_FACE_OVAL)
|
|
if (height/face_height)*100 > threshold:
|
|
status[face_no]='OPEN'
|
|
else:
|
|
status[face_no]='CLOSE'
|
|
|
|
return status
|
|
|
|
def overlay(image, filter_img, face_landmarks, face_part, INDEXES):
|
|
try:
|
|
annotated_img = image.copy()
|
|
_, face_part_height, landmarks = getSize(image, face_landmarks, INDEXES)
|
|
filter_img_height, filter_img_width, _ = filter_img.shape
|
|
required_height = int(face_part_height*2)
|
|
resized_filter_img = cv2.resize(filter_img, (int(filter_img_width*(required_height/filter_img_height)), required_height))
|
|
filter_img_height, filter_img_width, _ = resized_filter_img.shape
|
|
_, filter_img_mask = cv2.threshold(cv2.cvtColor(resized_filter_img, cv2.COLOR_BGR2GRAY), 25, 255, cv2.THRESH_BINARY_INV)
|
|
center = landmarks.mean(axis=0).astype('int')
|
|
|
|
location = (int(center[0]-filter_img_width/2), int(center[1]-filter_img_height/2))
|
|
ROI = image[location[1]: location[1]+filter_img_height, location[0]:location[0]+filter_img_width]
|
|
resultant_img = cv2.bitwise_and(ROI, ROI, mask=filter_img_mask)
|
|
resultant_img = cv2.add(resultant_img, resized_filter_img)
|
|
annotated_img[location[1]: location[1]+filter_img_height, location[0]:location[0]+filter_img_width] = resultant_img
|
|
return annotated_img
|
|
except Exception as e:
|
|
print(f"Failed to overlay filter on top of the image frame: {e}")
|
|
raise e
|
|
|
|
def add_filter_on_frame(frame):
|
|
frame = cv2.flip(frame, 1)
|
|
face_mesh_results = detectFacialLandmarks(frame, face_mesh_videos)
|
|
|
|
if face_mesh_results.multi_face_landmarks:
|
|
left_eye_status = isOpen(frame, face_mesh_results, 'LEFT EYE', threshold=5)
|
|
right_eye_status = isOpen(frame, face_mesh_results, 'RIGHT EYE', threshold=5)
|
|
mouth_status = isOpen(frame, face_mesh_results, 'MOUTH', threshold=15)
|
|
|
|
for face_num, face_landmarks in enumerate(face_mesh_results.multi_face_landmarks):
|
|
if left_eye_status[face_num] == 'OPEN':
|
|
frame = overlay(frame, eye, face_landmarks, 'LEFT EYE', mp_face_mesh.FACEMESH_LEFT_EYE)
|
|
if right_eye_status[face_num] == 'OPEN':
|
|
frame = overlay(frame, eye, face_landmarks, 'RIGHT EYE', mp_face_mesh.FACEMESH_RIGHT_EYE)
|
|
if mouth_status[face_num] == 'OPEN':
|
|
frame = overlay(frame, mouth, face_landmarks, 'MOUTH', mp_face_mesh.FACEMESH_LIPS)
|
|
|
|
return frame
|
|
|
|
def main():
|
|
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
server_socket.bind(('localhost', 5005))
|
|
server_socket.listen(1)
|
|
print("Server is listening for incoming frames...")
|
|
conn, addr = server_socket.accept()
|
|
print(f"Connection established with {addr}")
|
|
|
|
while True:
|
|
raw_size = conn.recv(4)
|
|
if not raw_size:
|
|
break
|
|
|
|
frame_size = struct.unpack('!I', raw_size)[0]
|
|
frame_data = bytearray()
|
|
|
|
while len(frame_data) < frame_size:
|
|
packet = conn.recv(frame_size - len(frame_data))
|
|
if not packet:
|
|
break
|
|
frame_data.extend(packet)
|
|
|
|
nparr = np.frombuffer(frame_data, np.uint8)
|
|
frame = cv2.imdecode(nparr, cv2.IMREAD_UNCHANGED)
|
|
try:
|
|
new_frame = add_filter_on_frame(frame)
|
|
except Exception as e:
|
|
print("Failed to add filter: "+str(e))
|
|
new_frame = frame
|
|
|
|
_, buffer = cv2.imencode('.jpg', new_frame)
|
|
# output_file_path = 'output.jpg'
|
|
# with open(output_file_path, 'wb') as f:
|
|
# f.write(buffer)
|
|
# print("Sending back processed image")
|
|
conn.sendall(struct.pack('!I', len(buffer)) + buffer.tobytes())
|
|
|
|
conn.close()
|
|
server_socket.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |