diff --git a/TODO b/TODO index 32807db..ad86287 100644 --- a/TODO +++ b/TODO @@ -3,7 +3,9 @@ - [x] camera switcher: discovered that it is broken https://github.com/aiortc/aiortc/issues/304 meanwhile Pion has been supporting it for 3 years https://github.com/pion/webrtc/pull/1527 beginning of port from python/aiortc to go/pion ... maybe other issues around performance will be resolved too by doing so. - [x] write a go version of the compressed_vipc decoder demo -- [ ] write a Pion-compatible vision IPC track by looking at https://github.com/pion/webrtc/blob/master/examples/rtp-to-webrtc/main.go and https://github.com/asticode/go-astiav/blob/master/examples/transcoding/main.go +- [x] write a Pion-compatible vision IPC track by looking at https://github.com/pion/webrtc/blob/master/examples/rtp-to-webrtc/main.go and https://github.com/asticode/go-astiav/blob/master/examples/transcoding/main.go +- [ ] debug why we dont see anything on the iOS side even though it appears RTP packets are flowing out +- [ ] integrate bandwidth estimation https://github.com/pion/webrtc/blob/master/examples/bandwidth-estimation-from-disk/main.go - battery view - lowcam view - multicam diff --git a/encoder.go b/encoder.go index ba6c50e..8e4e03b 100644 --- a/encoder.go +++ b/encoder.go @@ -53,11 +53,14 @@ func NewEncoder(params EncoderParams) (ts *Encoder, err error) { s.encCodecContext.SetHeight(params.Height) s.encCodecContext.SetPixelFormat(params.PixelFormat) - s.encCodecContext.SetBitRate(1000) + s.encCodecContext.SetBitRate(200_000) s.encCodecContext.SetSampleAspectRatio(params.AspectRatio) s.encCodecContext.SetTimeBase(params.TimeBase) s.encCodecContext.SetWidth(params.Width) + // Update flags + s.encCodecContext.SetFlags(s.encCodecContext.Flags().Add(astiav.CodecContextFlagGlobalHeader)) + // Open codec context if err = s.encCodecContext.Open(s.encCodec, nil); err != nil { err = fmt.Errorf("encoder: opening codec context failed: %w", err) diff --git a/main.go b/main.go index f78fc65..3f2b516 100644 --- a/main.go +++ b/main.go @@ -21,12 +21,12 @@ func main() { <-signal.PairWaitChannel } signal.OnPeerConnectionCreated = func(pc *webrtc.PeerConnection) { - // ReplaceTrack("road", pc) + ReplaceTrack("road", pc) } - visionTrack, _ = NewVisionIpcTrack("roadEncodeData") - videoTrack, _ := visionTrack.NewTrackRTP() - visionTrack.StartRTP(videoTrack) + // visionTrack, _ = NewVisionIpcTrack("roadEncodeData") + // _, _ = visionTrack.NewTrackRTP() + // visionTrack.StartRTP() for { select {} @@ -78,7 +78,7 @@ func ReplaceTrack(prefix string, peerConnection *webrtc.PeerConnection) { log.Println(fmt.Errorf("main: peer connection closed due to error: %w", err)) } } else if connectionState.String() == "connected" { - go visionTrack.StartRTP(videoTrack) + go visionTrack.StartRTP() } }) } diff --git a/vision_ipc_track.go b/vision_ipc_track.go index 2e82f96..8220ee5 100644 --- a/vision_ipc_track.go +++ b/vision_ipc_track.go @@ -5,13 +5,12 @@ import ( "fmt" "io" "log" - "strings" "time" "github.com/asticode/go-astiav" "github.com/commaai/cereal" zmq "github.com/pebbe/zmq4" - "github.com/pion/rtp/v2" + "github.com/pion/rtp" "github.com/pion/rtp/v2/codecs" "github.com/pion/webrtc/v3" ) @@ -33,14 +32,15 @@ type VisionIpcTrack struct { seenIframe bool pkt *astiav.Packet f *astiav.Frame + encoder *Encoder + packetizer rtp.Packetizer + videoTrack *webrtc.TrackLocalStaticRTP networkLatency float64 frameLatency float64 processLatency float64 pcLatency float64 timeQ []int64 - - Frame chan *Frame } type Frame struct { @@ -112,8 +112,6 @@ func NewVisionIpcTrack(name string) (track *VisionIpcTrack, err error) { processLatency: 0.0, pcLatency: 0.0, timeQ: []int64{}, - - Frame: make(chan *Frame), }, nil } @@ -146,17 +144,14 @@ func (v *VisionIpcTrack) Stop() { return } Open = false - defer func() { - v.f.Free() - v.pkt.Free() - v.stream.decCodecContext.Free() - v.subscriber.Close() - v.context.Term() - }() - <-v.Frame + v.f.Free() + v.pkt.Free() + v.stream.decCodecContext.Free() + v.subscriber.Close() + v.context.Term() } -func (v *VisionIpcTrack) Start() { +func (v *VisionIpcTrack) Start(OnFrame func(outFrame *Frame)) { if Open { return } @@ -166,6 +161,9 @@ func (v *VisionIpcTrack) Start() { if len(msgs) > 0 { for _, msg := range msgs { + v.pkt.Unref() + v.f.Unref() + evt, err := cereal.ReadRootEvent(msg) if err != nil { log.Fatal(fmt.Errorf("cereal read root event failed: %w", err)) @@ -272,33 +270,11 @@ func (v *VisionIpcTrack) Start() { len(data), v.name, ) - v.Frame <- &Frame{Frame: v.f, Roll: roll} - + outFrame := &Frame{Frame: v.f, Roll: roll} + OnFrame(outFrame) } } } - close(v.Frame) -} - -func TestVisionIPCTrack(name string) { - track, err := NewVisionIpcTrack(name) - if err != nil { - log.Fatal(fmt.Errorf("main: creating track failed: %w", err)) - } - defer track.Stop() - - // Handle ffmpeg logs - astiav.SetLogLevel(astiav.LogLevelError) - astiav.SetLogCallback(func(l astiav.LogLevel, fmt, msg, parent string) { - log.Printf("ffmpeg log: %s (level: %d)\n", strings.TrimSpace(msg), l) - }) - - go track.Start() - - for frame := range track.Frame { - // Do something with decoded frame - fmt.Println(frame.Roll) - } } func (v *VisionIpcTrack) NewTrackRTP() (videoTrack *webrtc.TrackLocalStaticRTP, err error) { @@ -307,10 +283,11 @@ func (v *VisionIpcTrack) NewTrackRTP() (videoTrack *webrtc.TrackLocalStaticRTP, MimeType: webrtc.MimeTypeH264, ClockRate: 90000, }, "video", "pion") + v.videoTrack = videoTrack return videoTrack, err } -func (visionTrack *VisionIpcTrack) StartRTP(videoTrack *webrtc.TrackLocalStaticRTP) { +func (visionTrack *VisionIpcTrack) StartRTP() { encoderParams := EncoderParams{ Width: visionTrack.Width(), Height: visionTrack.Height(), @@ -327,67 +304,59 @@ func (visionTrack *VisionIpcTrack) StartRTP(videoTrack *webrtc.TrackLocalStaticR } defer encoder.Close() - go visionTrack.Start() - defer visionTrack.Stop() + visionTrack.encoder = encoder // Create a H.264 payloader payloader := &codecs.H264Payloader{} // Create a RTP packetizer with some parameters - packetizer := rtp.NewPacketizer( - 1600, - 96, - 1, + visionTrack.packetizer = rtp.NewPacketizer( + 1200, + 0, // Value is handled when writing + 0, // Value is handled when writing payloader, rtp.NewRandomSequencer(), + visionTrack.videoTrack.Codec().ClockRate, ) - for visionTrack != nil { - for frame := range visionTrack.Frame { - fmt.Println(frame.Roll) + visionTrack.Start(visionTrack.HandleFrameRTP) +} - // so right now frame is a raw decoded AVFrame - // https://github.com/FFmpeg/FFmpeg/blob/n5.0/libavutil/frame.h#L317 - avframe := frame.Frame +func (v *VisionIpcTrack) HandleFrameRTP(frame *Frame) { + fmt.Println(frame.Roll) - // we need to: - // 1. transcode to h264 with adaptive bitrate using astiav - outFrame, err := encoder.Encode(avframe) - if err != nil { - log.Println(fmt.Errorf("encode error: %w", err)) - - continue - } - - // 2. create an RTP packet with h264 data inside using pion's rtp packetizer - log.Println(outFrame.Duration()) - rtpPackets := packetizer.Packetize(outFrame.Data(), uint32(outFrame.Duration())) - - // 3. write the RTP packet to the videoTrack - // Loop over the RTP packets and send them to an output stream - for rtpPacketNo, rtpPacket := range rtpPackets { - // Write the RTP packet to output - data, err := rtpPacket.Marshal() - if err != nil { - log.Println(fmt.Errorf("rtp marshall error: %w", err)) - - continue - } - - bytesWritten, err := videoTrack.Write(data) - if err != nil { - if errors.Is(err, io.ErrClosedPipe) { - // The peerConnection has been closed. - return - } - - log.Println(fmt.Errorf("rtp write error: %w", err)) - } - - log.Printf("RTP(%d:%d)", rtpPacketNo, bytesWritten) + // so right now frame is a raw decoded AVFrame + // https://github.com/FFmpeg/FFmpeg/blob/n5.0/libavutil/frame.h#L317 + avframe := frame.Frame + // we need to: + // 1. transcode to h264 with adaptive bitrate using astiav + outFrame, err := v.encoder.Encode(avframe) + if err != nil { + log.Println(fmt.Errorf("encode error: %w", err)) + + return + } + + // 2. create an RTP packet with h264 data inside using pion's rtp packetizer + samples := uint32((1.0 / v.encoder.Stream.encCodecContext.TimeBase().ToDouble()) * float64(visionTrack.videoTrack.Codec().ClockRate)) + rtpPackets := v.packetizer.Packetize(outFrame.Data(), samples) + + // 3. write the RTP packet to the videoTrack + // Loop over the RTP packets and send them to an output stream + for _, rtpPacket := range rtpPackets { + + err = v.videoTrack.WriteRTP(rtpPacket) + if err != nil { + if errors.Is(err, io.ErrClosedPipe) { + // The peerConnection has been closed. + return } + log.Println(fmt.Errorf("rtp write error: %w", err)) } + + // log.Printf("RTP(%d)", rtpPacketNo) + } }