mysterious

This commit is contained in:
Keyvan Fatehi
2023-02-23 02:35:25 -08:00
parent 8b73c2ec4d
commit 9f9a70122a
4 changed files with 68 additions and 94 deletions

4
TODO
View File

@@ -3,7 +3,9 @@
- [x] camera switcher: discovered that it is broken https://github.com/aiortc/aiortc/issues/304 meanwhile Pion has been supporting it for 3 years https://github.com/pion/webrtc/pull/1527 - [x] camera switcher: discovered that it is broken https://github.com/aiortc/aiortc/issues/304 meanwhile Pion has been supporting it for 3 years https://github.com/pion/webrtc/pull/1527
beginning of port from python/aiortc to go/pion ... maybe other issues around performance will be resolved too by doing so. beginning of port from python/aiortc to go/pion ... maybe other issues around performance will be resolved too by doing so.
- [x] write a go version of the compressed_vipc decoder demo - [x] write a go version of the compressed_vipc decoder demo
- [ ] write a Pion-compatible vision IPC track by looking at https://github.com/pion/webrtc/blob/master/examples/rtp-to-webrtc/main.go and https://github.com/asticode/go-astiav/blob/master/examples/transcoding/main.go - [x] write a Pion-compatible vision IPC track by looking at https://github.com/pion/webrtc/blob/master/examples/rtp-to-webrtc/main.go and https://github.com/asticode/go-astiav/blob/master/examples/transcoding/main.go
- [ ] debug why we dont see anything on the iOS side even though it appears RTP packets are flowing out
- [ ] integrate bandwidth estimation https://github.com/pion/webrtc/blob/master/examples/bandwidth-estimation-from-disk/main.go
- battery view - battery view
- lowcam view - lowcam view
- multicam - multicam

View File

@@ -53,11 +53,14 @@ func NewEncoder(params EncoderParams) (ts *Encoder, err error) {
s.encCodecContext.SetHeight(params.Height) s.encCodecContext.SetHeight(params.Height)
s.encCodecContext.SetPixelFormat(params.PixelFormat) s.encCodecContext.SetPixelFormat(params.PixelFormat)
s.encCodecContext.SetBitRate(1000) s.encCodecContext.SetBitRate(200_000)
s.encCodecContext.SetSampleAspectRatio(params.AspectRatio) s.encCodecContext.SetSampleAspectRatio(params.AspectRatio)
s.encCodecContext.SetTimeBase(params.TimeBase) s.encCodecContext.SetTimeBase(params.TimeBase)
s.encCodecContext.SetWidth(params.Width) s.encCodecContext.SetWidth(params.Width)
// Update flags
s.encCodecContext.SetFlags(s.encCodecContext.Flags().Add(astiav.CodecContextFlagGlobalHeader))
// Open codec context // Open codec context
if err = s.encCodecContext.Open(s.encCodec, nil); err != nil { if err = s.encCodecContext.Open(s.encCodec, nil); err != nil {
err = fmt.Errorf("encoder: opening codec context failed: %w", err) err = fmt.Errorf("encoder: opening codec context failed: %w", err)

10
main.go
View File

@@ -21,12 +21,12 @@ func main() {
<-signal.PairWaitChannel <-signal.PairWaitChannel
} }
signal.OnPeerConnectionCreated = func(pc *webrtc.PeerConnection) { signal.OnPeerConnectionCreated = func(pc *webrtc.PeerConnection) {
// ReplaceTrack("road", pc) ReplaceTrack("road", pc)
} }
visionTrack, _ = NewVisionIpcTrack("roadEncodeData") // visionTrack, _ = NewVisionIpcTrack("roadEncodeData")
videoTrack, _ := visionTrack.NewTrackRTP() // _, _ = visionTrack.NewTrackRTP()
visionTrack.StartRTP(videoTrack) // visionTrack.StartRTP()
for { for {
select {} select {}
@@ -78,7 +78,7 @@ func ReplaceTrack(prefix string, peerConnection *webrtc.PeerConnection) {
log.Println(fmt.Errorf("main: peer connection closed due to error: %w", err)) log.Println(fmt.Errorf("main: peer connection closed due to error: %w", err))
} }
} else if connectionState.String() == "connected" { } else if connectionState.String() == "connected" {
go visionTrack.StartRTP(videoTrack) go visionTrack.StartRTP()
} }
}) })
} }

View File

@@ -5,13 +5,12 @@ import (
"fmt" "fmt"
"io" "io"
"log" "log"
"strings"
"time" "time"
"github.com/asticode/go-astiav" "github.com/asticode/go-astiav"
"github.com/commaai/cereal" "github.com/commaai/cereal"
zmq "github.com/pebbe/zmq4" zmq "github.com/pebbe/zmq4"
"github.com/pion/rtp/v2" "github.com/pion/rtp"
"github.com/pion/rtp/v2/codecs" "github.com/pion/rtp/v2/codecs"
"github.com/pion/webrtc/v3" "github.com/pion/webrtc/v3"
) )
@@ -33,14 +32,15 @@ type VisionIpcTrack struct {
seenIframe bool seenIframe bool
pkt *astiav.Packet pkt *astiav.Packet
f *astiav.Frame f *astiav.Frame
encoder *Encoder
packetizer rtp.Packetizer
videoTrack *webrtc.TrackLocalStaticRTP
networkLatency float64 networkLatency float64
frameLatency float64 frameLatency float64
processLatency float64 processLatency float64
pcLatency float64 pcLatency float64
timeQ []int64 timeQ []int64
Frame chan *Frame
} }
type Frame struct { type Frame struct {
@@ -112,8 +112,6 @@ func NewVisionIpcTrack(name string) (track *VisionIpcTrack, err error) {
processLatency: 0.0, processLatency: 0.0,
pcLatency: 0.0, pcLatency: 0.0,
timeQ: []int64{}, timeQ: []int64{},
Frame: make(chan *Frame),
}, nil }, nil
} }
@@ -146,17 +144,14 @@ func (v *VisionIpcTrack) Stop() {
return return
} }
Open = false Open = false
defer func() { v.f.Free()
v.f.Free() v.pkt.Free()
v.pkt.Free() v.stream.decCodecContext.Free()
v.stream.decCodecContext.Free() v.subscriber.Close()
v.subscriber.Close() v.context.Term()
v.context.Term()
}()
<-v.Frame
} }
func (v *VisionIpcTrack) Start() { func (v *VisionIpcTrack) Start(OnFrame func(outFrame *Frame)) {
if Open { if Open {
return return
} }
@@ -166,6 +161,9 @@ func (v *VisionIpcTrack) Start() {
if len(msgs) > 0 { if len(msgs) > 0 {
for _, msg := range msgs { for _, msg := range msgs {
v.pkt.Unref()
v.f.Unref()
evt, err := cereal.ReadRootEvent(msg) evt, err := cereal.ReadRootEvent(msg)
if err != nil { if err != nil {
log.Fatal(fmt.Errorf("cereal read root event failed: %w", err)) log.Fatal(fmt.Errorf("cereal read root event failed: %w", err))
@@ -272,33 +270,11 @@ func (v *VisionIpcTrack) Start() {
len(data), len(data),
v.name, v.name,
) )
v.Frame <- &Frame{Frame: v.f, Roll: roll} outFrame := &Frame{Frame: v.f, Roll: roll}
OnFrame(outFrame)
} }
} }
} }
close(v.Frame)
}
func TestVisionIPCTrack(name string) {
track, err := NewVisionIpcTrack(name)
if err != nil {
log.Fatal(fmt.Errorf("main: creating track failed: %w", err))
}
defer track.Stop()
// Handle ffmpeg logs
astiav.SetLogLevel(astiav.LogLevelError)
astiav.SetLogCallback(func(l astiav.LogLevel, fmt, msg, parent string) {
log.Printf("ffmpeg log: %s (level: %d)\n", strings.TrimSpace(msg), l)
})
go track.Start()
for frame := range track.Frame {
// Do something with decoded frame
fmt.Println(frame.Roll)
}
} }
func (v *VisionIpcTrack) NewTrackRTP() (videoTrack *webrtc.TrackLocalStaticRTP, err error) { func (v *VisionIpcTrack) NewTrackRTP() (videoTrack *webrtc.TrackLocalStaticRTP, err error) {
@@ -307,10 +283,11 @@ func (v *VisionIpcTrack) NewTrackRTP() (videoTrack *webrtc.TrackLocalStaticRTP,
MimeType: webrtc.MimeTypeH264, MimeType: webrtc.MimeTypeH264,
ClockRate: 90000, ClockRate: 90000,
}, "video", "pion") }, "video", "pion")
v.videoTrack = videoTrack
return videoTrack, err return videoTrack, err
} }
func (visionTrack *VisionIpcTrack) StartRTP(videoTrack *webrtc.TrackLocalStaticRTP) { func (visionTrack *VisionIpcTrack) StartRTP() {
encoderParams := EncoderParams{ encoderParams := EncoderParams{
Width: visionTrack.Width(), Width: visionTrack.Width(),
Height: visionTrack.Height(), Height: visionTrack.Height(),
@@ -327,67 +304,59 @@ func (visionTrack *VisionIpcTrack) StartRTP(videoTrack *webrtc.TrackLocalStaticR
} }
defer encoder.Close() defer encoder.Close()
go visionTrack.Start() visionTrack.encoder = encoder
defer visionTrack.Stop()
// Create a H.264 payloader // Create a H.264 payloader
payloader := &codecs.H264Payloader{} payloader := &codecs.H264Payloader{}
// Create a RTP packetizer with some parameters // Create a RTP packetizer with some parameters
packetizer := rtp.NewPacketizer( visionTrack.packetizer = rtp.NewPacketizer(
1600, 1200,
96, 0, // Value is handled when writing
1, 0, // Value is handled when writing
payloader, payloader,
rtp.NewRandomSequencer(), rtp.NewRandomSequencer(),
visionTrack.videoTrack.Codec().ClockRate,
) )
for visionTrack != nil { visionTrack.Start(visionTrack.HandleFrameRTP)
for frame := range visionTrack.Frame { }
fmt.Println(frame.Roll)
// so right now frame is a raw decoded AVFrame func (v *VisionIpcTrack) HandleFrameRTP(frame *Frame) {
// https://github.com/FFmpeg/FFmpeg/blob/n5.0/libavutil/frame.h#L317 fmt.Println(frame.Roll)
avframe := frame.Frame
// we need to: // so right now frame is a raw decoded AVFrame
// 1. transcode to h264 with adaptive bitrate using astiav // https://github.com/FFmpeg/FFmpeg/blob/n5.0/libavutil/frame.h#L317
outFrame, err := encoder.Encode(avframe) avframe := frame.Frame
if err != nil {
log.Println(fmt.Errorf("encode error: %w", err))
continue
}
// 2. create an RTP packet with h264 data inside using pion's rtp packetizer
log.Println(outFrame.Duration())
rtpPackets := packetizer.Packetize(outFrame.Data(), uint32(outFrame.Duration()))
// 3. write the RTP packet to the videoTrack
// Loop over the RTP packets and send them to an output stream
for rtpPacketNo, rtpPacket := range rtpPackets {
// Write the RTP packet to output
data, err := rtpPacket.Marshal()
if err != nil {
log.Println(fmt.Errorf("rtp marshall error: %w", err))
continue
}
bytesWritten, err := videoTrack.Write(data)
if err != nil {
if errors.Is(err, io.ErrClosedPipe) {
// The peerConnection has been closed.
return
}
log.Println(fmt.Errorf("rtp write error: %w", err))
}
log.Printf("RTP(%d:%d)", rtpPacketNo, bytesWritten)
// we need to:
// 1. transcode to h264 with adaptive bitrate using astiav
outFrame, err := v.encoder.Encode(avframe)
if err != nil {
log.Println(fmt.Errorf("encode error: %w", err))
return
}
// 2. create an RTP packet with h264 data inside using pion's rtp packetizer
samples := uint32((1.0 / v.encoder.Stream.encCodecContext.TimeBase().ToDouble()) * float64(visionTrack.videoTrack.Codec().ClockRate))
rtpPackets := v.packetizer.Packetize(outFrame.Data(), samples)
// 3. write the RTP packet to the videoTrack
// Loop over the RTP packets and send them to an output stream
for _, rtpPacket := range rtpPackets {
err = v.videoTrack.WriteRTP(rtpPacket)
if err != nil {
if errors.Is(err, io.ErrClosedPipe) {
// The peerConnection has been closed.
return
} }
log.Println(fmt.Errorf("rtp write error: %w", err))
} }
// log.Printf("RTP(%d)", rtpPacketNo)
} }
} }