works. 300ms

This commit is contained in:
Keyvan Fatehi
2023-02-25 23:57:17 -08:00
parent 6012db727e
commit 7a60a05e12
7 changed files with 211 additions and 302 deletions

5
.vscode/settings.json vendored Normal file
View File

@@ -0,0 +1,5 @@
{
"files.associations": {
"opt.h": "c"
}
}

View File

@@ -1,29 +0,0 @@
package main
import "io"
type Adapter struct {
data []byte
readPos int
}
func NewAdapter() *Adapter {
return &Adapter{
data: []byte{},
readPos: 0,
}
}
func (r *Adapter) Read(p []byte) (n int, err error) {
if r.readPos >= len(r.data) {
return 0, io.EOF
}
n = copy(p, r.data[r.readPos:])
r.readPos += n
return n, nil
}
func (r *Adapter) Fill(data []byte) {
r.data = data
r.readPos = 0
}

View File

@@ -63,5 +63,5 @@ popd # leave ./cereal
export CGO_LDFLAGS="-L$PWD/go-astiav/tmp/n4.4.1/lib/"
export CGO_CXXFLAGS="-I$PWD/go-astiav/tmp/n4.4.1/include/"
export PKG_CONFIG_PATH="$PWD/go-astiav/tmp/n4.4.1/lib/pkgconfig"
mkdir dist
mkdir -p dist
go build -o dist/go-webrtc-body .

View File

@@ -1,101 +0,0 @@
package main
import (
"errors"
"fmt"
"log"
"github.com/asticode/go-astiav"
"github.com/asticode/go-astikit"
)
type EncoderStream struct {
encCodec *astiav.Codec
encCodecContext *astiav.CodecContext
encPkt *astiav.Packet
}
type Encoder struct {
Stream *EncoderStream
Closer *astikit.Closer
}
type EncoderParams struct {
Height int
Width int
TimeBase astiav.Rational
AspectRatio astiav.Rational
FrameRate int
PixelFormat astiav.PixelFormat
}
func NewEncoder(params EncoderParams) (ts *Encoder, err error) {
ts = &Encoder{}
s := &EncoderStream{}
c := astikit.NewCloser()
ts.Closer = c
ts.Stream = s
// Get codec id
codecID := astiav.CodecIDMpeg4
// Find encoder
if s.encCodec = astiav.FindEncoder(codecID); s.encCodec == nil {
err = errors.New("main: codec is nil")
return
}
// Alloc codec context
if s.encCodecContext = astiav.AllocCodecContext(s.encCodec); s.encCodecContext == nil {
err = errors.New("main: codec context is nil")
return
}
c.Add(s.encCodecContext.Free)
s.encCodecContext.SetHeight(params.Height)
s.encCodecContext.SetPixelFormat(params.PixelFormat)
s.encCodecContext.SetBitRate(200_000)
s.encCodecContext.SetSampleAspectRatio(params.AspectRatio)
s.encCodecContext.SetTimeBase(params.TimeBase)
s.encCodecContext.SetGopSize(params.FrameRate * 2)
s.encCodecContext.SetWidth(params.Width)
// Update flags
s.encCodecContext.SetFlags(s.encCodecContext.Flags().Add(astiav.CodecContextFlagGlobalHeader))
// Open codec context
if err = s.encCodecContext.Open(s.encCodec, nil); err != nil {
err = fmt.Errorf("encoder: opening codec context failed: %w", err)
return
}
// Allocate packet
s.encPkt = astiav.AllocPacket()
c.Add(s.encPkt.Free)
return ts, nil
}
func (ts *Encoder) Close() {
ts.Closer.Close()
}
func (ts *Encoder) Encode(f *astiav.Frame) (pkt *astiav.Packet, err error) {
s := ts.Stream
// Unref packet
s.encPkt.Unref()
// Send frame
if err = s.encCodecContext.SendFrame(f); err != nil {
err = fmt.Errorf("encoder: sending frame failed: %w", err)
return nil, err
}
if err := s.encCodecContext.ReceivePacket(s.encPkt); err != nil {
log.Fatal(fmt.Errorf("encoder: receiving frame failed: %w", err))
return nil, err
}
return s.encPkt, nil
}

22
main.go
View File

@@ -3,7 +3,7 @@ package main
import (
"fmt"
"log"
"secureput"
signaling "secureput"
"github.com/pion/webrtc/v3"
)
@@ -11,7 +11,7 @@ import (
var visionTrack *VisionIpcTrack
func main() {
signal := secureput.Create("go-webrtc-body")
signal := signaling.Create("go-webrtc-body")
signal.DeviceMetadata = map[string]interface{}{"isRobot": true}
signal.Gui = &Face{app: &signal}
go signal.RunDaemonMode()
@@ -23,11 +23,6 @@ func main() {
signal.OnPeerConnectionCreated = func(pc *webrtc.PeerConnection) {
ReplaceTrack("road", pc)
}
// visionTrack, _ = NewVisionIpcTrack("roadEncodeData")
// _, _ = visionTrack.NewTrackRTP()
// visionTrack.StartRTP()
for {
select {}
}
@@ -43,15 +38,20 @@ func ReplaceTrack(prefix string, peerConnection *webrtc.PeerConnection) {
log.Fatal(fmt.Errorf("main: creating track failed: %w", err))
}
videoTrack, err := visionTrack.NewVideoTrack()
rtpSender, err := peerConnection.AddTrack(visionTrack.videoTrack)
if err != nil {
log.Fatal(fmt.Errorf("main: creating track failed: %w", err))
}
rtpSender, err := peerConnection.AddTrack(videoTrack)
_, err = peerConnection.AddTransceiverFromTrack(visionTrack.videoTrack,
webrtc.RtpTransceiverInit{
Direction: webrtc.RTPTransceiverDirectionSendonly,
},
)
if err != nil {
log.Fatal(fmt.Errorf("main: creating track failed: %w", err))
log.Fatal(fmt.Errorf("main: creating transceiver failed: %w", err))
}
// Later on, we will use rtpSender.ReplaceTrack() for graceful track replacement
// Read incoming RTCP packets
@@ -78,7 +78,7 @@ func ReplaceTrack(prefix string, peerConnection *webrtc.PeerConnection) {
log.Println(fmt.Errorf("main: peer connection closed due to error: %w", err))
}
} else if connectionState.String() == "connected" {
go visionTrack.StartRTP()
go visionTrack.Start()
}
})
}

View File

@@ -3,49 +3,51 @@ package main
import (
"errors"
"fmt"
"io"
"log"
"os"
"time"
"github.com/asticode/go-astiav"
"github.com/commaai/cereal"
zmq "github.com/pebbe/zmq4"
"github.com/pion/rtp"
"github.com/pion/webrtc/v3"
"github.com/pion/webrtc/v3/pkg/media"
"github.com/pion/webrtc/v3/pkg/media/h264reader"
)
const V4L2_BUF_FLAG_KEYFRAME = uint32(8)
type VisionIpcTrackDecoderStream struct {
type VisionIpcTrackTranscodeStream struct {
decCodecID astiav.CodecID
decCodec *astiav.Codec
decCodecContext *astiav.CodecContext
inputStream *astiav.Stream
encCodecID astiav.CodecID
encCodec *astiav.Codec
encCodecContext *astiav.CodecContext
decPkt *astiav.Packet
decFrame *astiav.Frame
encPkt *astiav.Packet
}
type VisionIpcTrack struct {
name string
stream *VisionIpcTrackDecoderStream
context *zmq.Context
subscriber *zmq.Socket
lastIdx int64
seenIframe bool
pkt *astiav.Packet
f *astiav.Frame
encoder *Encoder
packetizer rtp.Packetizer
videoTrack *webrtc.TrackLocalStaticSample
adapter *Adapter
h264 *h264reader.H264Reader
spsAndPpsCache []byte
name string
stream *VisionIpcTrackTranscodeStream
context *zmq.Context
subscriber *zmq.Socket
lastIdx int64
seenIframe bool
videoTrack *webrtc.TrackLocalStaticSample
networkLatency float64
frameLatency float64
processLatency float64
pcLatency float64
timeQ []int64
msgCount int
encodeId uint32
logMonoTime uint64
timestampEof uint64
dataSize int
}
type Frame struct {
@@ -59,30 +61,32 @@ func NewVisionIpcTrack(name string) (track *VisionIpcTrack, err error) {
// Create a new context
context, err := zmq.NewContext()
if err != nil {
log.Fatal(fmt.Errorf("main: new zmq context failed: %w", err))
log.Fatal(fmt.Errorf("vision_ipc_track: new zmq context failed: %w", err))
return nil, err
}
// Connect to the socket
subscriber, err := context.NewSocket(zmq.SUB)
if err != nil {
log.Fatal(fmt.Errorf("main: new zmq socket failed: %w", err))
log.Fatal(fmt.Errorf("vision_ipc_track: new zmq socket failed: %w", err))
return nil, err
}
subscriber.SetSubscribe("")
subscriber.Connect(GetServiceURI(name))
// Create stream
s := &VisionIpcTrackDecoderStream{inputStream: nil}
s := &VisionIpcTrackTranscodeStream{}
s.decCodecID = astiav.CodecIDHevc
// Find decoder
if s.decCodec = astiav.FindDecoder(astiav.CodecIDHevc); s.decCodec == nil {
return nil, errors.New("main: codec is nil")
if s.decCodec = astiav.FindDecoder(s.decCodecID); s.decCodec == nil {
return nil, errors.New("vision_ipc_track: codec is nil")
}
// Alloc codec context
if s.decCodecContext = astiav.AllocCodecContext(s.decCodec); s.decCodecContext == nil {
return nil, errors.New("main: codec context is nil")
return nil, errors.New("vision_ipc_track: codec context is nil")
}
s.decCodecContext.SetHeight(track.Height())
@@ -93,15 +97,73 @@ func NewVisionIpcTrack(name string) (track *VisionIpcTrack, err error) {
// Open codec context
if err := s.decCodecContext.Open(s.decCodec, nil); err != nil {
log.Fatal(fmt.Errorf("main: opening codec context failed: %w", err))
log.Fatal(fmt.Errorf("vision_ipc_track: opening codec context failed: %w", err))
return nil, err
}
// Alloc packet
pkt := astiav.AllocPacket()
s.decPkt = astiav.AllocPacket()
// Alloc frame
f := astiav.AllocFrame()
s.decFrame = astiav.AllocFrame()
//=========
// ENCODER
//=========
// Get codec id
s.encCodecID = astiav.CodecIDH264
// Find encoder
if s.encCodec = astiav.FindEncoder(s.encCodecID); s.encCodec == nil {
err = errors.New("vision_ipc_track: codec is nil")
return
}
// Alloc codec context
if s.encCodecContext = astiav.AllocCodecContext(s.encCodec); s.encCodecContext == nil {
err = errors.New("main: codec context is nil")
return
}
s.encCodecContext.SetHeight(s.decCodecContext.Height())
s.encCodecContext.SetFramerate(s.decCodecContext.Framerate())
s.encCodecContext.SetPixelFormat(s.decCodecContext.PixelFormat())
s.encCodecContext.SetBitRate(500_000)
// s.encCodecContext.SetGopSize(0)
s.encCodecContext.SetSampleAspectRatio(s.decCodecContext.SampleAspectRatio())
s.encCodecContext.SetTimeBase(s.decCodecContext.TimeBase())
s.encCodecContext.SetWidth(s.decCodecContext.Width())
s.encCodecContext.SetStrictStdCompliance(astiav.StrictStdComplianceExperimental)
// Update flags
flags := s.encCodecContext.Flags()
flags = flags.Add(astiav.CodecContextFlagLowDelay)
s.encCodecContext.SetFlags(flags)
// TOOD possibly PR to go-astiav with this SetOpt function
// av_opt_set(mCodecContext->priv_data, "preset", "ultrafast", 0);
// av_opt_set(mCodecContext->priv_data, "tune", "zerolatency", 0);
s.encCodecContext.SetOpt("preset", "ultrafast", 0)
s.encCodecContext.SetOpt("tune", "zerolatency", 0)
// from https://trac.ffmpeg.org/ticket/3354
// interesting: https://github.com/giorgisio/goav/blob/master/avcodec/context.go#L190
// Open codec context
if err = s.encCodecContext.Open(s.encCodec, nil); err != nil {
err = fmt.Errorf("encoder: opening codec context failed: %w", err)
return
}
// Allocate packet
s.encPkt = astiav.AllocPacket()
videoTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeH264,
}, "video", "pion")
if err != nil {
log.Fatal(fmt.Errorf("vision_ipc_track: creating track failed: %w", err))
}
return &VisionIpcTrack{
name: name,
@@ -110,18 +172,24 @@ func NewVisionIpcTrack(name string) (track *VisionIpcTrack, err error) {
subscriber: subscriber,
lastIdx: -1,
seenIframe: false,
f: f,
pkt: pkt,
networkLatency: 0.0,
frameLatency: 0.0,
processLatency: 0.0,
pcLatency: 0.0,
timeQ: []int64{},
videoTrack: videoTrack,
msgCount: 0,
encodeId: 0,
logMonoTime: 0,
timestampEof: 0,
dataSize: 0,
}, nil
}
func (v *VisionIpcTrack) TimeBase() (t astiav.Rational) {
return astiav.NewRational(v.FrameRate(), 1)
return astiav.NewRational(1, v.FrameRate())
}
func (v *VisionIpcTrack) FrameRate() int {
@@ -149,52 +217,71 @@ func (v *VisionIpcTrack) Stop() {
return
}
Open = false
v.f.Free()
v.pkt.Free()
v.stream.decFrame.Free()
v.stream.encPkt.Free()
v.stream.decPkt.Free()
v.stream.encCodecContext.Free()
v.stream.decCodecContext.Free()
v.subscriber.Close()
v.context.Term()
}
func (v *VisionIpcTrack) Start(OnFrame func(outFrame *Frame)) {
func (v *VisionIpcTrack) Start() {
if Open {
return
}
// adapter := NewAdapter()
// h264, h264Err := h264reader.NewReader(adapter)
// if h264Err != nil {
// panic(h264Err)
// }
counter := 0
duration := 50 * time.Millisecond
Open = true
for Open {
msgs := DrainSock(v.subscriber, false)
if len(msgs) > 0 {
v.msgCount = len(msgs)
if v.msgCount > 0 {
for _, msg := range msgs {
v.pkt.Unref()
v.f.Unref()
v.stream.encPkt.Unref()
v.stream.decPkt.Unref()
v.stream.decFrame.Unref()
pts := int64(counter * 50) // 50ms per frame
evt, err := cereal.ReadRootEvent(msg)
if err != nil {
log.Fatal(fmt.Errorf("cereal read root event failed: %w", err))
log.Println(fmt.Errorf("cereal read root event failed: %w", err))
continue
}
encodeData, err := evt.RoadEncodeData()
if err != nil {
log.Fatal(fmt.Errorf("cereal read road encode data failed: %w", err))
log.Println(fmt.Errorf("cereal read road encode data failed: %w", err))
continue
}
encodeIndex, err := encodeData.Idx()
if err != nil {
log.Fatal(fmt.Errorf("cereal read encode index failed: %w", err))
log.Println(fmt.Errorf("cereal read encode index failed: %w", err))
continue
}
encodeId := encodeIndex.EncodeId()
v.encodeId = encodeIndex.EncodeId()
idxFlags := encodeIndex.Flags()
if encodeId != 0 && encodeId != uint32(v.lastIdx+1) {
if v.encodeId != 0 && v.encodeId != uint32(v.lastIdx+1) {
fmt.Println("DROP PACKET!")
}
v.lastIdx = int64(encodeId)
v.lastIdx = int64(v.encodeId)
if !v.seenIframe && (idxFlags&V4L2_BUF_FLAG_KEYFRAME) == 0 {
fmt.Println("waiting for iframe")
@@ -202,16 +289,16 @@ func (v *VisionIpcTrack) Start(OnFrame func(outFrame *Frame)) {
}
v.timeQ = append(v.timeQ, (time.Now().UnixNano() / 1e6))
timestampEof := encodeIndex.TimestampEof()
v.timestampEof = encodeIndex.TimestampEof()
timestampSof := encodeIndex.TimestampSof()
logMonoTime := evt.LogMonoTime()
v.logMonoTime = evt.LogMonoTime()
ts := int64(encodeData.UnixTimestampNanos())
v.networkLatency = float64(time.Now().UnixNano()-ts) / 1e6
v.frameLatency = ((float64(timestampEof) / 1e9) - (float64(timestampSof) / 1e9)) * 1000
v.processLatency = ((float64(logMonoTime) / 1e9) - (float64(timestampEof) / 1e9)) * 1000
v.frameLatency = ((float64(v.timestampEof) / 1e9) - (float64(timestampSof) / 1e9)) * 1000
v.processLatency = ((float64(v.logMonoTime) / 1e9) - (float64(v.timestampEof) / 1e9)) * 1000
if !v.seenIframe {
// Decode video frame
@@ -219,18 +306,18 @@ func (v *VisionIpcTrack) Start(OnFrame func(outFrame *Frame)) {
// AvPacketFromData
header, err := encodeData.Header()
if err != nil {
log.Fatal(fmt.Errorf("cereal read encode header failed: %w", err))
log.Println(fmt.Errorf("cereal read encode header failed: %w", err))
continue
}
if err := v.pkt.FromData(header); err != nil {
log.Fatal(fmt.Errorf("main: packet header load failed: %w", err))
if err := v.stream.decPkt.FromData(header); err != nil {
log.Println(fmt.Errorf("vision_ipc_track: packet header load failed: %w", err))
continue
}
// Send packet
if err := v.stream.decCodecContext.SendPacket(v.pkt); err != nil {
log.Fatal(fmt.Errorf("main: sending packet failed: %w", err))
if err := v.stream.decCodecContext.SendPacket(v.stream.decPkt); err != nil {
log.Println(fmt.Errorf("vision_ipc_track: sending packet failed: %w", err))
continue
}
@@ -240,130 +327,77 @@ func (v *VisionIpcTrack) Start(OnFrame func(outFrame *Frame)) {
// AvPacketFromData
data, err := encodeData.Data()
if err != nil {
log.Fatal(fmt.Errorf("cereal read encode data failed: %w", err))
log.Println(fmt.Errorf("cereal read encode data failed: %w", err))
continue
}
v.dataSize = len(data)
if err := v.pkt.FromData(data); err != nil {
log.Fatal(fmt.Errorf("main: packet header load failed: %w", err))
v.stream.decPkt.SetPts(pts)
if err := v.stream.decPkt.FromData(data); err != nil {
log.Println(fmt.Errorf("vision_ipc_track decoder: packet data load failed: %w", err))
continue
}
// Send packet
if err := v.stream.decCodecContext.SendPacket(v.pkt); err != nil {
log.Fatal(fmt.Errorf("main: sending packet failed: %w", err))
if err := v.stream.decCodecContext.SendPacket(v.stream.decPkt); err != nil {
log.Println(fmt.Errorf("vision_ipc_track decoder: sending packet failed: %w", err))
continue
}
if err := v.stream.decCodecContext.ReceiveFrame(v.f); err != nil {
log.Fatal(fmt.Errorf("main: receiving frame failed: %w", err))
if err := v.stream.decCodecContext.ReceiveFrame(v.stream.decFrame); err != nil {
log.Println(fmt.Errorf("vision_ipc_track decoder: receiving frame failed: %w", err))
continue
}
v.pcLatency = ((float64(time.Now().UnixNano()) / 1e6) - float64(v.timeQ[0]))
v.timeQ = v.timeQ[1:]
roll := fmt.Sprintf("%2d %4d %.3f %.3f roll %6.2f ms latency %6.2f ms + %6.2f ms + %6.2f ms = %6.2f ms %d %s",
len(msgs),
encodeId,
(float64(logMonoTime) / 1e9),
(float64(encodeIndex.TimestampEof()) / 1e6),
v.frameLatency,
v.processLatency,
v.networkLatency,
v.pcLatency,
v.processLatency+v.networkLatency+v.pcLatency,
len(data),
v.name,
)
outFrame := &Frame{Frame: v.f, Roll: roll}
OnFrame(outFrame)
// Send frame
if err = v.stream.encCodecContext.SendFrame(v.stream.decFrame); err != nil {
log.Println(fmt.Errorf("vision_ipc_track encoder: sending frame failed: %w", err))
continue
}
if err := v.stream.encCodecContext.ReceivePacket(v.stream.encPkt); err != nil {
if astiav.ErrEagain.Is(err) {
// Encoder might need a few frames on startup to get started. Keep going
} else {
log.Println(fmt.Errorf("vision_ipc_track encoder: receiving frame failed: %w", err))
}
continue
}
if err != nil {
log.Println(fmt.Errorf("vision_ipc_track: encode error: %w", err))
continue
}
if err = v.videoTrack.WriteSample(media.Sample{Data: v.stream.encPkt.Data(), Duration: duration}); err != nil {
log.Println(fmt.Errorf("vision_ipc_track: sample write error: %w", err))
continue
}
counter++
v.PrintStats()
}
}
}
}
func (v *VisionIpcTrack) NewVideoTrack() (videoTrack *webrtc.TrackLocalStaticSample, err error) {
// Create a video track
videoTrack, err = webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeH264,
ClockRate: 90000,
}, "video", "pion")
v.videoTrack = videoTrack
return videoTrack, err
}
func (visionTrack *VisionIpcTrack) StartRTP() {
encoderParams := EncoderParams{
Width: visionTrack.Width(),
Height: visionTrack.Height(),
TimeBase: visionTrack.TimeBase(),
AspectRatio: visionTrack.AspectRatio(),
PixelFormat: visionTrack.PixelFormat(),
FrameRate: visionTrack.FrameRate(),
}
encoder, err := NewEncoder(encoderParams)
if err != nil {
log.Fatal(fmt.Errorf("main: creating track failed: %w", err))
return
}
defer encoder.Close()
visionTrack.encoder = encoder
visionTrack.adapter = NewAdapter()
h264, h264Err := h264reader.NewReader(visionTrack.adapter)
if h264Err != nil {
panic(h264Err)
}
visionTrack.h264 = h264
visionTrack.spsAndPpsCache = []byte{}
visionTrack.Start(visionTrack.HandleFrameRTP)
}
func (v *VisionIpcTrack) HandleFrameRTP(frame *Frame) {
fmt.Println(frame.Roll)
// so right now frame is a raw decoded AVFrame
// https://github.com/FFmpeg/FFmpeg/blob/n5.0/libavutil/frame.h#L317
avframe := frame.Frame
// we need to:
// 1. transcode to h264 with adaptive bitrate using astiav
outFrame, err := v.encoder.Encode(avframe)
if err != nil {
log.Println(fmt.Errorf("encode error: %w", err))
return
}
v.adapter.Fill(outFrame.Data())
nal, h264Err := v.h264.NextNAL()
if h264Err == io.EOF {
fmt.Printf("All video frames parsed and sent")
os.Exit(0)
}
if h264Err != nil {
panic(h264Err)
}
nal.Data = append([]byte{0x00, 0x00, 0x00, 0x01}, nal.Data...)
if nal.UnitType == h264reader.NalUnitTypeSPS || nal.UnitType == h264reader.NalUnitTypePPS {
v.spsAndPpsCache = append(v.spsAndPpsCache, nal.Data...)
return
} else if nal.UnitType == h264reader.NalUnitTypeCodedSliceIdr {
nal.Data = append(v.spsAndPpsCache, nal.Data...)
v.spsAndPpsCache = []byte{}
}
if h264Err = v.videoTrack.WriteSample(media.Sample{Data: nal.Data, Duration: time.Second}); h264Err != nil {
panic(h264Err)
}
func (v *VisionIpcTrack) PrintStats() {
fmt.Printf("%2d %4d %.3f %.3f roll %6.2f ms latency %6.2f ms + %6.2f ms + %6.2f ms = %6.2f ms %d %s\n",
v.msgCount,
v.encodeId,
(float64(v.logMonoTime) / 1e9),
(float64(v.timestampEof) / 1e6),
v.frameLatency,
v.processLatency,
v.networkLatency,
v.pcLatency,
v.processLatency+v.networkLatency+v.pcLatency,
v.dataSize,
v.name,
)
}