Files
2024-11-19 15:09:40 -05:00

134 lines
5.6 KiB
Python

import os
import sys
import cv2
import itertools
import numpy as np
import mediapipe as mp
import socket
import struct
mp_face_mesh = mp.solutions.face_mesh
face_mesh_videos = mp_face_mesh.FaceMesh(static_image_mode=False, max_num_faces=1, min_detection_confidence=0.5, min_tracking_confidence=0.3)
# eye = cv2.imread('/home/epl/Desktop/WebRTC_research/ar-filters/filter_imgs/eye.jpg')
# mouth = cv2.imread('/home/epl/Desktop/WebRTC_research/ar-filters/filter_imgs/smile.png')
eye = cv2.imread('/home/kalit/Desktop/GeorgiaTech/Fall_2024/CS_8903/WebRTC_research/ar-filters/filter_imgs/eye.jpg')
mouth = cv2.imread('/home/kalit/Desktop/GeorgiaTech/Fall_2024/CS_8903/WebRTC_research/ar-filters/filter_imgs/smile.png')
def detectFacialLandmarks(image, face_mesh):
return face_mesh.process(image[:,:,::-1])
def getSize(image, face_landmarks, INDEXES):
img_height, img_width, _ = image.shape
INDEXES_LIST = list(itertools.chain(*INDEXES))
landmarks = []
for INDEX in INDEXES_LIST:
landmarks.append([int(face_landmarks.landmark[INDEX].x*img_width), int(face_landmarks.landmark[INDEX].y*img_height)])
landmarks = np.array(landmarks)
_, _, width, height = cv2.boundingRect(landmarks)
return width, height, landmarks
def isOpen(image, face_mesh_results, face_part, threshold=5):
img_height, img_width, _ = image.shape
status = {}
if face_part == "MOUTH":
INDEXES = mp_face_mesh.FACEMESH_LIPS
elif face_part == "LEFT EYE":
INDEXES = mp_face_mesh.FACEMESH_LEFT_EYE
else:
INDEXES = mp_face_mesh.FACEMESH_RIGHT_EYE
for face_no, face_landmarks in enumerate(face_mesh_results.multi_face_landmarks):
_, height, _ = getSize(image, face_landmarks, INDEXES)
_, face_height, _ = getSize(image, face_landmarks, mp_face_mesh.FACEMESH_FACE_OVAL)
if (height/face_height)*100 > threshold:
status[face_no]='OPEN'
else:
status[face_no]='CLOSE'
return status
def overlay(image, filter_img, face_landmarks, face_part, INDEXES):
try:
annotated_img = image.copy()
_, face_part_height, landmarks = getSize(image, face_landmarks, INDEXES)
filter_img_height, filter_img_width, _ = filter_img.shape
required_height = int(face_part_height*2)
resized_filter_img = cv2.resize(filter_img, (int(filter_img_width*(required_height/filter_img_height)), required_height))
filter_img_height, filter_img_width, _ = resized_filter_img.shape
_, filter_img_mask = cv2.threshold(cv2.cvtColor(resized_filter_img, cv2.COLOR_BGR2GRAY), 25, 255, cv2.THRESH_BINARY_INV)
center = landmarks.mean(axis=0).astype('int')
location = (int(center[0]-filter_img_width/2), int(center[1]-filter_img_height/2))
ROI = image[location[1]: location[1]+filter_img_height, location[0]:location[0]+filter_img_width]
resultant_img = cv2.bitwise_and(ROI, ROI, mask=filter_img_mask)
resultant_img = cv2.add(resultant_img, resized_filter_img)
annotated_img[location[1]: location[1]+filter_img_height, location[0]:location[0]+filter_img_width] = resultant_img
return annotated_img
except Exception as e:
print(f"Failed to overlay filter on top of the image frame: {e}")
raise e
def add_filter_on_frame(frame):
frame = cv2.flip(frame, 1)
face_mesh_results = detectFacialLandmarks(frame, face_mesh_videos)
if face_mesh_results.multi_face_landmarks:
left_eye_status = isOpen(frame, face_mesh_results, 'LEFT EYE', threshold=5)
right_eye_status = isOpen(frame, face_mesh_results, 'RIGHT EYE', threshold=5)
mouth_status = isOpen(frame, face_mesh_results, 'MOUTH', threshold=15)
for face_num, face_landmarks in enumerate(face_mesh_results.multi_face_landmarks):
if left_eye_status[face_num] == 'OPEN':
frame = overlay(frame, eye, face_landmarks, 'LEFT EYE', mp_face_mesh.FACEMESH_LEFT_EYE)
if right_eye_status[face_num] == 'OPEN':
frame = overlay(frame, eye, face_landmarks, 'RIGHT EYE', mp_face_mesh.FACEMESH_RIGHT_EYE)
if mouth_status[face_num] == 'OPEN':
frame = overlay(frame, mouth, face_landmarks, 'MOUTH', mp_face_mesh.FACEMESH_LIPS)
return frame
def main():
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(('localhost', 5005))
server_socket.listen(1)
print("Server is listening for incoming frames...")
conn, addr = server_socket.accept()
print(f"Connection established with {addr}")
while True:
raw_size = conn.recv(4)
if not raw_size:
break
frame_size = struct.unpack('!I', raw_size)[0]
frame_data = bytearray()
while len(frame_data) < frame_size:
packet = conn.recv(frame_size - len(frame_data))
if not packet:
break
frame_data.extend(packet)
nparr = np.frombuffer(frame_data, np.uint8)
frame = cv2.imdecode(nparr, cv2.IMREAD_UNCHANGED)
try:
new_frame = add_filter_on_frame(frame)
except Exception as e:
print("Failed to add filter: "+str(e))
new_frame = frame
_, buffer = cv2.imencode('.jpg', new_frame)
# output_file_path = 'output.jpg'
# with open(output_file_path, 'wb') as f:
# f.write(buffer)
# print("Sending back processed image")
conn.sendall(struct.pack('!I', len(buffer)) + buffer.tobytes())
conn.close()
server_socket.close()
if __name__ == "__main__":
main()