From a59c0727f646f26f5a32ffdce3409bacf5af23e2 Mon Sep 17 00:00:00 2001 From: Keyvan Fatehi Date: Tue, 24 Jan 2023 23:25:39 -0800 Subject: [PATCH] controls and camera switching --- desktop_stream_track.py | 10 ++--- dummy_streams.py | 50 +++++++++++++++++++++ server.py | 99 +++++++++++++++++++++++++++++++++-------- 3 files changed, 136 insertions(+), 23 deletions(-) create mode 100644 dummy_streams.py diff --git a/desktop_stream_track.py b/desktop_stream_track.py index 0e7500b..7c1780c 100755 --- a/desktop_stream_track.py +++ b/desktop_stream_track.py @@ -42,11 +42,11 @@ class DesktopStreamTrack(VideoStreamTrack): x = numpy.interp(data["cursorPositionX"], (0, data["displayWidth"]), (0, self.resolution.width)) y = numpy.interp(data["cursorPositionY"], (0, data["displayHeight"]), (0, self.resolution.height)) pyautogui.moveTo(x, y, _pause=False) - elif action == "joystick": - x = numpy.interp(data["x"], (-38, 38), (0, self.resolution.width)) - y = numpy.interp(data["y"], (-38, 38), (self.resolution.height, 0)) - print(f'{data["y"]} {self.resolution.height} {y}') - pyautogui.moveTo(x, y, _pause=False) + # elif action == "joystick": + # x = numpy.interp(data["x"], (-38, 38), (0, self.resolution.width)) + # y = numpy.interp(data["y"], (-38, 38), (self.resolution.height, 0)) + # print(f'{data["y"]} {self.resolution.height} {y}') + # pyautogui.moveTo(x, y, _pause=False) elif action == "click": pyautogui.click() elif action == "rightclick": diff --git a/dummy_streams.py b/dummy_streams.py new file mode 100644 index 0000000..2d159d0 --- /dev/null +++ b/dummy_streams.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python3 +import av +import numpy as np +import cv2 +import asyncio +from aiortc import VideoStreamTrack +from aiortc.mediastreams import AudioStreamTrack + +class DummyVideoStreamTrack(VideoStreamTrack): + async def recv(self): + pts, time_base = await self.next_timestamp() + frame = None + while frame is None: + numpy_frame = np.zeros((480, 640, 3), dtype=np.uint8) + cv2.putText(numpy_frame, "hello", (200, 240), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2) + frame = av.VideoFrame.from_ndarray(numpy_frame, format="bgr24") + + frame.pts = pts + frame.time_base = time_base + return frame + +class DummyAudioStreamTrack(AudioStreamTrack): + pass + +if __name__ == "__main__": + from time import time_ns + import sys + + async def test(): + frame_count=0 + start_time=time_ns() + track = DummyVideoStreamTrack() + while True: + await track.recv() + now = time_ns() + playtime = now - start_time + playtime_sec = playtime * 0.000000001 + if playtime_sec >= 1: + print(f'fps: {frame_count}') + frame_count = 0 + start_time = time_ns() + else: + frame_count+=1 + + # Run event loop + loop = asyncio.new_event_loop() + try: + loop.run_until_complete(test()) + except KeyboardInterrupt: + sys.exit(0) \ No newline at end of file diff --git a/server.py b/server.py index 14f5fdf..05c4015 100755 --- a/server.py +++ b/server.py @@ -9,11 +9,14 @@ from typing import OrderedDict from aiortc import RTCPeerConnection, RTCSessionDescription, RTCIceCandidate, RTCRtpCodecCapability, RTCConfiguration, RTCIceServer from compressed_vipc_track import VisionIpcTrack from desktop_stream_track import DesktopStreamTrack +from dummy_streams import DummyAudioStreamTrack, DummyVideoStreamTrack + from aiortc.contrib.signaling import BYE from secureput.secureput_signaling import SecureputSignaling from aiortc.contrib.media import MediaBlackhole - +import requests +import numpy # optional, for better performance # try: # import uvloop @@ -40,9 +43,21 @@ async def heap_snapshot(): await asyncio.sleep(10) +def webjoystick(x, y): + x = numpy.interp(x, (-150, 150), (1, -1)) + y = numpy.interp(y, (-150, 150), (1, -1)) + request_url = f"http://tici:5000/control/{x:.4f}/{y:.4f}" + print(request_url) + response = requests.get(request_url) + # if response.status_code != 200: + # print(f"Error: {response.status_code}") + + +pc = None cams = ["roadEncodeData","wideRoadEncodeData","driverEncodeData"] cam = 2 +video_sender = None desktop_track = None recorder = None @@ -53,8 +68,38 @@ def get_desktop_track(): desktop_track = DesktopStreamTrack() return desktop_track -async def signal(pc, signaling): +current_track_name = None +current_track = None +track_map = { + "dummy": lambda : DummyVideoStreamTrack(), + "pc": lambda : get_desktop_track(), + "cam1": lambda : VisionIpcTrack(cams[int(0)], "tici"), + "cam2": lambda : VisionIpcTrack(cams[int(1)], "tici"), + "cam3": lambda : VisionIpcTrack(cams[int(2)], "tici") +} + +async def change_tracks(name): + global current_track_name + global current_track + global video_sender + if current_track_name != name: + mktrack = track_map[name] + track = mktrack() + if current_track_name == None: + video_sender = pc.addTrack(track) + else: + print(f"Changing track to {name}") + video_sender.replaceTrack(track) + current_track = track + current_track_name = name + return current_track + + +async def signal(signaling): + global pc global recorder + global current_track + global current_track_name await signaling.connect() print("Connected to signaling server") @@ -70,17 +115,34 @@ async def signal(pc, signaling): if isinstance(obj, RTCSessionDescription): print("Got SDP") - - if pc != None and pc.iceConnectionState != "failed": - print("Closing previous connection") + + if pc != None: await pc.close() - pc = RTCPeerConnection(configuration=RTCConfiguration([RTCIceServer("stun:stun.secureput.com:3478")])) + current_track = None + current_track_name = None + print("Closed previous peer connection") + + pc = RTCPeerConnection(configuration=RTCConfiguration([RTCIceServer("stun:stun.secureput.com:3478")])) + + if recorder != None: + await recorder.stop() + print("Stopped previous recorder") + + recorder = MediaBlackhole() + @pc.on("connectionstatechange") async def on_connectionstatechange(): print("Connection state is %s" % pc.iceConnectionState) if pc.iceConnectionState == "failed": - await pc.close() + print("ICE connection failed.") + if pc != None: + await pc.close() + pc = None + if recorder != None: + await recorder.stop() + recorder = None + @pc.on('datachannel') def on_datachannel(channel): @@ -91,7 +153,15 @@ async def signal(pc, signaling): if "type" in data: if data["type"] == "wrapped": data = json.loads(signaling.decrypt(data["payload"]["data"])) - if desktop_track and data["type"] in desktop_track.valid_actions: + elif data["type"] == "request_track": + if data["name"] in track_map: + await change_tracks(data["name"]) + else: + print("unknown track requested") + print(data) + elif data["type"] == "joystick": + webjoystick(data["x"], data["y"]) + elif desktop_track and data["type"] in desktop_track.valid_actions: desktop_track.handle_action(data["type"], data) else: print("ignored message") @@ -113,10 +183,8 @@ async def signal(pc, signaling): # TODO: stream the microphone audio = None - # video = VisionIpcTrack(cams[int(cam)], "tici") - - # pc.addTrack(video) - pc.addTrack(get_desktop_track()) + await change_tracks("cam2") + answer = await pc.createAnswer() await pc.setLocalDescription(answer) await signaling.send(pc.localDescription) @@ -133,18 +201,15 @@ if __name__ == "__main__": parser.add_argument("--verbose", "-v", action="count") args = parser.parse_args() - recorder = MediaBlackhole() - if args.verbose: logging.basicConfig(level=logging.DEBUG) # if uvloop is not None: # asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) - pc = RTCPeerConnection(configuration=RTCConfiguration([RTCIceServer(args.stun_server)])) signaling = SecureputSignaling(args.signaling_server) - coro = signal(pc, signaling) + coro = signal(signaling) # coro = asyncio.gather(watchdog.check_memory(), signal(pc, signaling)) # coro = asyncio.gather(heap_snapshot(), signal(pc, signaling)) @@ -156,6 +221,4 @@ if __name__ == "__main__": except KeyboardInterrupt: pass finally: - loop.run_until_complete(recorder.stop()) - loop.run_until_complete(pc.close()) loop.run_until_complete(signaling.close())