mirror of
https://github.com/comma-hacks/webrtc.git
synced 2025-09-27 04:26:31 +08:00
269 lines
6.1 KiB
Go
269 lines
6.1 KiB
Go
package main
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"log"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/asticode/go-astiav"
|
|
"github.com/commaai/cereal"
|
|
zmq "github.com/pebbe/zmq4"
|
|
)
|
|
|
|
const V4L2_BUF_FLAG_KEYFRAME = uint32(8)
|
|
|
|
type Stream struct {
|
|
decCodec *astiav.Codec
|
|
decCodecContext *astiav.CodecContext
|
|
inputStream *astiav.Stream
|
|
}
|
|
|
|
type VisionIpcTrack struct {
|
|
name string
|
|
stream *Stream
|
|
context *zmq.Context
|
|
subscriber *zmq.Socket
|
|
lastIdx int64
|
|
seenIframe bool
|
|
pkt *astiav.Packet
|
|
f *astiav.Frame
|
|
|
|
networkLatency float64
|
|
frameLatency float64
|
|
processLatency float64
|
|
pcLatency float64
|
|
timeQ []int64
|
|
|
|
Frame chan *Frame
|
|
}
|
|
|
|
type Frame struct {
|
|
Frame *astiav.Frame
|
|
Roll string
|
|
}
|
|
|
|
var Open = false
|
|
|
|
func NewVisionIpcTrack(name string) (track *VisionIpcTrack, err error) {
|
|
// Create a new context
|
|
context, err := zmq.NewContext()
|
|
if err != nil {
|
|
log.Fatal(fmt.Errorf("main: new zmq context failed: %w", err))
|
|
return nil, err
|
|
}
|
|
|
|
// Connect to the socket
|
|
subscriber, err := context.NewSocket(zmq.SUB)
|
|
if err != nil {
|
|
log.Fatal(fmt.Errorf("main: new zmq socket failed: %w", err))
|
|
return nil, err
|
|
}
|
|
subscriber.SetSubscribe("")
|
|
subscriber.Connect(GetServiceURI(name))
|
|
|
|
// Create stream
|
|
s := &Stream{inputStream: nil}
|
|
|
|
// Find decoder
|
|
if s.decCodec = astiav.FindDecoder(astiav.CodecIDHevc); s.decCodec == nil {
|
|
return nil, errors.New("main: codec is nil")
|
|
}
|
|
|
|
// Alloc codec context
|
|
if s.decCodecContext = astiav.AllocCodecContext(s.decCodec); s.decCodecContext == nil {
|
|
return nil, errors.New("main: codec context is nil")
|
|
}
|
|
|
|
// Open codec context
|
|
if err := s.decCodecContext.Open(s.decCodec, nil); err != nil {
|
|
log.Fatal(fmt.Errorf("main: opening codec context failed: %w", err))
|
|
return nil, err
|
|
}
|
|
|
|
// Alloc packet
|
|
pkt := astiav.AllocPacket()
|
|
|
|
// Alloc frame
|
|
f := astiav.AllocFrame()
|
|
|
|
return &VisionIpcTrack{
|
|
name: name,
|
|
stream: s,
|
|
context: context,
|
|
subscriber: subscriber,
|
|
lastIdx: -1,
|
|
seenIframe: false,
|
|
f: f,
|
|
pkt: pkt,
|
|
networkLatency: 0.0,
|
|
frameLatency: 0.0,
|
|
processLatency: 0.0,
|
|
pcLatency: 0.0,
|
|
timeQ: []int64{},
|
|
|
|
Frame: make(chan *Frame),
|
|
}, nil
|
|
}
|
|
|
|
func (v *VisionIpcTrack) Stop() {
|
|
if !Open {
|
|
return
|
|
}
|
|
Open = false
|
|
defer func() {
|
|
v.f.Free()
|
|
v.pkt.Free()
|
|
v.stream.decCodecContext.Free()
|
|
v.subscriber.Close()
|
|
v.context.Term()
|
|
}()
|
|
<-v.Frame
|
|
}
|
|
|
|
func (v *VisionIpcTrack) Start() {
|
|
if Open {
|
|
return
|
|
}
|
|
Open = true
|
|
for Open {
|
|
msgs := DrainSock(v.subscriber, false)
|
|
if len(msgs) > 0 {
|
|
for _, msg := range msgs {
|
|
|
|
evt, err := cereal.ReadRootEvent(msg)
|
|
if err != nil {
|
|
log.Fatal(fmt.Errorf("cereal read root event failed: %w", err))
|
|
continue
|
|
}
|
|
|
|
encodeData, err := evt.RoadEncodeData()
|
|
if err != nil {
|
|
log.Fatal(fmt.Errorf("cereal read road encode data failed: %w", err))
|
|
continue
|
|
}
|
|
|
|
encodeIndex, err := encodeData.Idx()
|
|
if err != nil {
|
|
log.Fatal(fmt.Errorf("cereal read encode index failed: %w", err))
|
|
continue
|
|
}
|
|
|
|
encodeId := encodeIndex.EncodeId()
|
|
idxFlags := encodeIndex.Flags()
|
|
|
|
if encodeId != 0 && encodeId != uint32(v.lastIdx+1) {
|
|
fmt.Println("DROP PACKET!")
|
|
}
|
|
|
|
v.lastIdx = int64(encodeId)
|
|
|
|
if !v.seenIframe && (idxFlags&V4L2_BUF_FLAG_KEYFRAME) == 0 {
|
|
fmt.Println("waiting for iframe")
|
|
continue
|
|
}
|
|
v.timeQ = append(v.timeQ, (time.Now().UnixNano() / 1e6))
|
|
|
|
timestampEof := encodeIndex.TimestampEof()
|
|
timestampSof := encodeIndex.TimestampSof()
|
|
|
|
logMonoTime := evt.LogMonoTime()
|
|
|
|
ts := int64(encodeData.UnixTimestampNanos())
|
|
v.networkLatency = float64(time.Now().UnixNano()-ts) / 1e6
|
|
|
|
v.frameLatency = ((float64(timestampEof) / 1e9) - (float64(timestampSof) / 1e9)) * 1000
|
|
v.processLatency = ((float64(logMonoTime) / 1e9) - (float64(timestampEof) / 1e9)) * 1000
|
|
|
|
if !v.seenIframe {
|
|
// Decode video frame
|
|
|
|
// AvPacketFromData
|
|
header, err := encodeData.Header()
|
|
if err != nil {
|
|
log.Fatal(fmt.Errorf("cereal read encode header failed: %w", err))
|
|
continue
|
|
}
|
|
|
|
if err := v.pkt.FromData(header); err != nil {
|
|
log.Fatal(fmt.Errorf("main: packet header load failed: %w", err))
|
|
continue
|
|
}
|
|
|
|
// Send packet
|
|
if err := v.stream.decCodecContext.SendPacket(v.pkt); err != nil {
|
|
log.Fatal(fmt.Errorf("main: sending packet failed: %w", err))
|
|
continue
|
|
}
|
|
|
|
v.seenIframe = true
|
|
}
|
|
|
|
// AvPacketFromData
|
|
data, err := encodeData.Data()
|
|
if err != nil {
|
|
log.Fatal(fmt.Errorf("cereal read encode data failed: %w", err))
|
|
continue
|
|
}
|
|
|
|
if err := v.pkt.FromData(data); err != nil {
|
|
log.Fatal(fmt.Errorf("main: packet header load failed: %w", err))
|
|
continue
|
|
}
|
|
|
|
// Send packet
|
|
if err := v.stream.decCodecContext.SendPacket(v.pkt); err != nil {
|
|
log.Fatal(fmt.Errorf("main: sending packet failed: %w", err))
|
|
continue
|
|
}
|
|
|
|
if err := v.stream.decCodecContext.ReceiveFrame(v.f); err != nil {
|
|
log.Fatal(fmt.Errorf("main: receiving frame failed: %w", err))
|
|
continue
|
|
}
|
|
|
|
v.pcLatency = ((float64(time.Now().UnixNano()) / 1e6) - float64(v.timeQ[0]))
|
|
v.timeQ = v.timeQ[1:]
|
|
roll := fmt.Sprintf("%2d %4d %.3f %.3f roll %6.2f ms latency %6.2f ms + %6.2f ms + %6.2f ms = %6.2f ms %d %s",
|
|
len(msgs),
|
|
encodeId,
|
|
(float64(logMonoTime) / 1e9),
|
|
(float64(encodeIndex.TimestampEof()) / 1e6),
|
|
v.frameLatency,
|
|
v.processLatency,
|
|
v.networkLatency,
|
|
v.pcLatency,
|
|
v.processLatency+v.networkLatency+v.pcLatency,
|
|
len(data),
|
|
v.name,
|
|
)
|
|
v.Frame <- &Frame{Frame: v.f, Roll: roll}
|
|
|
|
}
|
|
}
|
|
}
|
|
close(v.Frame)
|
|
}
|
|
|
|
func TestVisionIPCTrack(name string) {
|
|
track, err := NewVisionIpcTrack(name)
|
|
if err != nil {
|
|
log.Fatal(fmt.Errorf("main: creating track failed: %w", err))
|
|
}
|
|
defer track.Stop()
|
|
|
|
// Handle ffmpeg logs
|
|
astiav.SetLogLevel(astiav.LogLevelError)
|
|
astiav.SetLogCallback(func(l astiav.LogLevel, fmt, msg, parent string) {
|
|
log.Printf("ffmpeg log: %s (level: %d)\n", strings.TrimSpace(msg), l)
|
|
})
|
|
|
|
go track.Start()
|
|
|
|
for frame := range track.Frame {
|
|
// Do something with decoded frame
|
|
fmt.Println(frame.Roll)
|
|
}
|
|
}
|