diff --git a/main.go b/main.go index 67f7089..db0181a 100644 --- a/main.go +++ b/main.go @@ -9,6 +9,7 @@ import ( ) var visionTrack *VisionIpcTrack +var peerConnection *webrtc.PeerConnection func main() { signal := signaling.Create("go-webrtc-body") @@ -20,20 +21,32 @@ func main() { log.Println("Waiting to pair.") <-signal.PairWaitChannel } + signal.OnPeerConnectionCreated = func(pc *webrtc.PeerConnection) { - ReplaceTrack("road", pc) + peerConnection = pc + InitStream(pc) + } + signal.OnMessage = func(msg signaling.Message) { + switch msg.PayloadType { + case "request_track": + { + visionTrack.ChangeCamera() + } + default: + log.Printf("unimplemented message type: %s\n", msg.PayloadType) + } } for { select {} } } -func ReplaceTrack(prefix string, peerConnection *webrtc.PeerConnection) { +func InitStream(peerConnection *webrtc.PeerConnection) { var err error if visionTrack != nil { visionTrack.Stop() } - visionTrack, err = NewVisionIpcTrack(prefix + "EncodeData") + visionTrack, err = NewVisionIpcTrack() if err != nil { log.Fatal(fmt.Errorf("main: creating track failed: %w", err)) } diff --git a/signaling b/signaling index eb06131..d47c556 160000 --- a/signaling +++ b/signaling @@ -1 +1 @@ -Subproject commit eb061314f5d3308062e20ea33b6f6941a11ff836 +Subproject commit d47c556754baa933241fb884304a16b802cbd214 diff --git a/vision_ipc_track.go b/vision_ipc_track.go index ab7b54e..e27c659 100644 --- a/vision_ipc_track.go +++ b/vision_ipc_track.go @@ -15,6 +15,12 @@ import ( const V4L2_BUF_FLAG_KEYFRAME = uint32(8) +var cams = []string{ + "roadEncodeData", + "wideRoadEncodeData", + "driverEncodeData", +} + type VisionIpcTrackTranscodeStream struct { decCodecID astiav.CodecID decCodec *astiav.Codec @@ -30,6 +36,7 @@ type VisionIpcTrackTranscodeStream struct { type VisionIpcTrack struct { name string stream *VisionIpcTrackTranscodeStream + camIdx int context *zmq.Context subscriber *zmq.Socket lastIdx int64 @@ -48,6 +55,8 @@ type VisionIpcTrack struct { logMonoTime uint64 timestampEof uint64 dataSize int + + counter int } type Frame struct { @@ -57,7 +66,7 @@ type Frame struct { var Open = false -func NewVisionIpcTrack(name string) (track *VisionIpcTrack, err error) { +func NewVisionIpcTrack() (track *VisionIpcTrack, err error) { // Create a new context context, err := zmq.NewContext() if err != nil { @@ -65,15 +74,6 @@ func NewVisionIpcTrack(name string) (track *VisionIpcTrack, err error) { return nil, err } - // Connect to the socket - subscriber, err := context.NewSocket(zmq.SUB) - if err != nil { - log.Fatal(fmt.Errorf("vision_ipc_track: new zmq socket failed: %w", err)) - return nil, err - } - subscriber.SetSubscribe("") - subscriber.Connect(GetServiceURI(name)) - // Create stream s := &VisionIpcTrackTranscodeStream{} @@ -129,7 +129,7 @@ func NewVisionIpcTrack(name string) (track *VisionIpcTrack, err error) { s.encCodecContext.SetHeight(s.decCodecContext.Height()) s.encCodecContext.SetFramerate(s.decCodecContext.Framerate()) s.encCodecContext.SetPixelFormat(s.decCodecContext.PixelFormat()) - s.encCodecContext.SetBitRate(500_000) + s.encCodecContext.SetBitRate(1_500_000) // s.encCodecContext.SetGopSize(0) s.encCodecContext.SetSampleAspectRatio(s.decCodecContext.SampleAspectRatio()) s.encCodecContext.SetTimeBase(s.decCodecContext.TimeBase()) @@ -166,10 +166,9 @@ func NewVisionIpcTrack(name string) (track *VisionIpcTrack, err error) { } return &VisionIpcTrack{ - name: name, + camIdx: 0, stream: s, context: context, - subscriber: subscriber, lastIdx: -1, seenIframe: false, networkLatency: 0.0, @@ -185,6 +184,8 @@ func NewVisionIpcTrack(name string) (track *VisionIpcTrack, err error) { logMonoTime: 0, timestampEof: 0, dataSize: 0, + + counter: 0, }, nil } @@ -222,7 +223,6 @@ func (v *VisionIpcTrack) Stop() { v.stream.decPkt.Free() v.stream.encCodecContext.Free() v.stream.decCodecContext.Free() - v.subscriber.Close() v.context.Term() } @@ -231,19 +231,18 @@ func (v *VisionIpcTrack) Start() { return } - // adapter := NewAdapter() + v.Subscribe() + defer v.subscriber.Close() - // h264, h264Err := h264reader.NewReader(adapter) - // if h264Err != nil { - // panic(h264Err) - // } - - counter := 0 + v.counter = 0 duration := 50 * time.Millisecond Open = true for Open { + // if v.subscriber == nil { + // continue + // } msgs := DrainSock(v.subscriber, false) v.msgCount = len(msgs) if v.msgCount > 0 { @@ -253,7 +252,7 @@ func (v *VisionIpcTrack) Start() { v.stream.decPkt.Unref() v.stream.decFrame.Unref() - pts := int64(counter * 50) // 50ms per frame + pts := int64(v.counter * 50) // 50ms per frame evt, err := cereal.ReadRootEvent(msg) if err != nil { @@ -261,7 +260,15 @@ func (v *VisionIpcTrack) Start() { continue } - encodeData, err := evt.RoadEncodeData() + var encodeData cereal.EncodeData + switch evt.Which() { + case cereal.Event_Which_roadEncodeData: + encodeData, err = evt.RoadEncodeData() + case cereal.Event_Which_wideRoadEncodeData: + encodeData, err = evt.WideRoadEncodeData() + case cereal.Event_Which_driverEncodeData: + encodeData, err = evt.DriverEncodeData() + } if err != nil { log.Println(fmt.Errorf("cereal read road encode data failed: %w", err)) continue @@ -353,6 +360,16 @@ func (v *VisionIpcTrack) Start() { v.pcLatency = ((float64(time.Now().UnixNano()) / 1e6) - float64(v.timeQ[0])) v.timeQ = v.timeQ[1:] + // this didn't work + // // if counter is 0, we changed camera source, so encode a keyframe! + // if v.counter == 0 { + // // v.stream.encPkt.SetFlags(v.stream.encPkt.Flags().Add(astiav.PacketFlagKey)) + // // av_opt_set(codec_ctx->priv_data, "x264opts", "keyint=1", 0); + // // v.stream.encCodecContext.SetOpt("keyint") + // // v.stream.encCodecContext.SetGopSize(1) + // v.stream.encCodecContext.SetOpt("x264opts", "keyint", 1) + // } + // Send frame if err = v.stream.encCodecContext.SendFrame(v.stream.decFrame); err != nil { log.Println(fmt.Errorf("vision_ipc_track encoder: sending frame failed: %w", err)) @@ -368,6 +385,16 @@ func (v *VisionIpcTrack) Start() { continue } + // this didn't work + // // if counter is 0, we changed camera source, so encode a keyframe! + // if v.counter == 0 { + // // v.stream.encPkt.SetFlags(v.stream.encPkt.Flags().Add(astiav.PacketFlagKey)) + // // av_opt_set(codec_ctx->priv_data, "x264opts", "keyint=1", 0); + // // v.stream.encCodecContext.SetOpt("keyint") + // // v.stream.encCodecContext.SetGopSize(0) + // v.stream.encCodecContext.SetOpt("x264opts", "keyint", 0) + // } + if err != nil { log.Println(fmt.Errorf("vision_ipc_track: encode error: %w", err)) continue @@ -378,7 +405,7 @@ func (v *VisionIpcTrack) Start() { continue } - counter++ + v.counter++ v.PrintStats() } @@ -401,3 +428,35 @@ func (v *VisionIpcTrack) PrintStats() { v.name, ) } + +func (v *VisionIpcTrack) ChangeCamera() { + v.camIdx = (v.camIdx + 1) % len(cams) + v.Subscribe() +} + +func (v *VisionIpcTrack) NewSocket() (socket *zmq.Socket, err error) { + socket, err = v.context.NewSocket(zmq.SUB) + if err != nil { + return nil, err + } + v.name = cams[v.camIdx] + err = socket.Connect(GetServiceURI(v.name)) + if err != nil { + return nil, err + } + return socket, nil +} + +func (v *VisionIpcTrack) Subscribe() { + oldSub := v.subscriber + subscriber, err := v.NewSocket() + if err != nil { + log.Println(fmt.Errorf("zmq subscribe failed: %w", err)) + } + subscriber.SetSubscribe("") + v.counter = 0 + v.subscriber = subscriber + if oldSub != nil { + oldSub.Close() + } +}