mirror of
https://github.com/comma-hacks/webrtc.git
synced 2025-09-26 20:21:26 +08:00
209 lines
7.0 KiB
Python
Executable File
209 lines
7.0 KiB
Python
Executable File
#!/usr/bin/env python
|
|
import argparse
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import ssl
|
|
from typing import OrderedDict
|
|
from aiortc import RTCPeerConnection, RTCSessionDescription, RTCIceCandidate, RTCRtpCodecCapability, RTCConfiguration, RTCIceServer
|
|
from compressed_vipc_track import VisionIpcTrack
|
|
from desktop_stream_track import DesktopStreamTrack
|
|
from dummy_streams import DummyAudioStreamTrack, DummyVideoStreamTrack
|
|
|
|
from aiortc.contrib.signaling import BYE
|
|
from signaling.python.secureput_signaling import SecureputSignaling
|
|
|
|
from aiortc.contrib.media import MediaBlackhole
|
|
import requests
|
|
import numpy
|
|
# optional, for better performance
|
|
try:
|
|
import uvloop
|
|
except ImportError:
|
|
uvloop = None
|
|
|
|
|
|
def webjoystick(x, y):
|
|
outx = (1, -1)
|
|
outy = (-0.5, 0.5)
|
|
if current_track_name == "cam3":
|
|
# outx = (-1, 1)
|
|
outy = (0.5, -0.5)
|
|
x = numpy.interp(x, (-150, 150), outx)
|
|
y = numpy.interp(y, (-150, 150), outy)
|
|
request_url = f"http://tici:5000/control/{x:.4f}/{y:.4f}"
|
|
print(request_url)
|
|
requests.get(request_url)
|
|
|
|
|
|
pc = None
|
|
cams = ["roadEncodeData","wideRoadEncodeData","driverEncodeData"]
|
|
cam = 2
|
|
|
|
video_sender = None
|
|
desktop_track = None
|
|
recorder = None
|
|
|
|
def get_desktop_track():
|
|
global desktop_track
|
|
if desktop_track != None:
|
|
desktop_track.stop()
|
|
desktop_track = DesktopStreamTrack()
|
|
return desktop_track
|
|
|
|
current_track_name = None
|
|
current_track = None
|
|
track_map = {
|
|
"dummy": lambda : DummyVideoStreamTrack(),
|
|
"pc": lambda : get_desktop_track(),
|
|
"cam1": lambda : VisionIpcTrack(cams[int(0)], "tici"),
|
|
"cam2": lambda : VisionIpcTrack(cams[int(1)], "tici"),
|
|
"cam3": lambda : VisionIpcTrack(cams[int(2)], "tici")
|
|
}
|
|
|
|
async def change_tracks(name):
|
|
global current_track_name
|
|
global current_track
|
|
global video_sender
|
|
if current_track_name != name:
|
|
mktrack = track_map[name]
|
|
track = mktrack()
|
|
if current_track_name == None:
|
|
video_sender = pc.addTrack(track)
|
|
else:
|
|
print(f"Changing track to {name}")
|
|
video_sender.replaceTrack(track)
|
|
current_track = track
|
|
current_track_name = name
|
|
return current_track
|
|
|
|
|
|
async def signal(signaling):
|
|
global pc
|
|
global recorder
|
|
global current_track
|
|
global current_track_name
|
|
|
|
await signaling.connect()
|
|
print("Connected to signaling server")
|
|
while True:
|
|
obj = await signaling.receive()
|
|
|
|
# The peer trickles, but aiortc doesn't https://github.com/aiortc/aiortc/issues/227
|
|
# > aioice, the library which aiortc uses for ICE does not trickle ICE candidates:
|
|
# > you get all the candidates in one go. As such once you have called setLocalDescription()
|
|
# > for your offer or answer, all your ICE candidates are listed in pc.localDescription.
|
|
if isinstance(obj, RTCIceCandidate):
|
|
pc.addIceCandidate(obj)
|
|
|
|
if isinstance(obj, RTCSessionDescription):
|
|
print("Got SDP")
|
|
|
|
if pc != None:
|
|
await pc.close()
|
|
current_track = None
|
|
current_track_name = None
|
|
print("Closed previous peer connection")
|
|
|
|
pc = RTCPeerConnection(configuration=RTCConfiguration([RTCIceServer("stun:stun.secureput.com:3478")]))
|
|
|
|
if recorder != None:
|
|
await recorder.stop()
|
|
print("Stopped previous recorder")
|
|
|
|
recorder = MediaBlackhole()
|
|
|
|
|
|
@pc.on("connectionstatechange")
|
|
async def on_connectionstatechange():
|
|
print("Connection state is %s" % pc.iceConnectionState)
|
|
if pc.iceConnectionState == "failed":
|
|
print("ICE connection failed.")
|
|
if pc != None:
|
|
await pc.close()
|
|
pc = None
|
|
if recorder != None:
|
|
await recorder.stop()
|
|
recorder = None
|
|
|
|
|
|
@pc.on('datachannel')
|
|
def on_datachannel(channel):
|
|
print("data channel!")
|
|
@channel.on('message')
|
|
async def on_message(message):
|
|
data = json.loads(message)
|
|
if "type" in data:
|
|
if data["type"] == "wrapped":
|
|
data = json.loads(signaling.decrypt(data["payload"]["data"]))
|
|
if data["type"] == "request_track":
|
|
if data["name"] in track_map:
|
|
await change_tracks(data["name"])
|
|
else:
|
|
print("unknown track requested")
|
|
print(data)
|
|
elif data["type"] == "joystick":
|
|
webjoystick(data["x"], data["y"])
|
|
elif desktop_track and desktop_track.input.supports(data["type"]):
|
|
desktop_track.input.handle_action(data["type"], data)
|
|
else:
|
|
print("ignored message")
|
|
print(data)
|
|
else:
|
|
print("unsupported message")
|
|
print(data)
|
|
|
|
@pc.on("track")
|
|
def on_track(track):
|
|
print("Receiving %s" % track.kind)
|
|
recorder.addTrack(track)
|
|
|
|
|
|
await pc.setRemoteDescription(obj)
|
|
await recorder.start()
|
|
|
|
if obj.type == 'offer':
|
|
# TODO: stream the microphone
|
|
audio = None
|
|
|
|
await change_tracks("pc")
|
|
|
|
answer = await pc.createAnswer()
|
|
await pc.setLocalDescription(answer)
|
|
await signaling.send(pc.localDescription)
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="Comma Body WebRTC Service")
|
|
parser.add_argument("--addr", default='tici', help="Address of comma three")
|
|
|
|
# Not implemented (yet?). Geo already made the PoC for this, it should be possible.
|
|
# parser.add_argument("--nvidia", action="store_true", help="Use nvidia instead of ffmpeg")
|
|
|
|
parser.add_argument("--signaling-server", default="wss://signal.secureput.com", help="Signaling server to use")
|
|
parser.add_argument("--stun-server", default="stun:stun.secureput.com:3478", help="STUN server to use")
|
|
parser.add_argument("--verbose", "-v", action="count")
|
|
args = parser.parse_args()
|
|
|
|
if args.verbose:
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
|
|
if uvloop is not None:
|
|
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
|
|
|
|
signaling = SecureputSignaling(args.signaling_server)
|
|
|
|
coro = signal(signaling)
|
|
# coro = asyncio.gather(watchdog.check_memory(), signal(pc, signaling))
|
|
# coro = asyncio.gather(heap_snapshot(), signal(pc, signaling))
|
|
|
|
|
|
# run event loop
|
|
loop = asyncio.get_event_loop()
|
|
try:
|
|
loop.run_until_complete(coro)
|
|
except KeyboardInterrupt:
|
|
pass
|
|
finally:
|
|
loop.run_until_complete(signaling.close())
|