graceful go channels

This commit is contained in:
Keyvan Fatehi
2023-02-20 18:49:02 -08:00
parent 8a127fb9c4
commit bfdd30bb1a
2 changed files with 46 additions and 28 deletions

16
main.go
View File

@@ -13,7 +13,7 @@ func main() {
if err != nil { if err != nil {
log.Fatal(fmt.Errorf("main: creating track failed: %w", err)) log.Fatal(fmt.Errorf("main: creating track failed: %w", err))
} }
defer track.Close() defer track.Stop()
// Handle ffmpeg logs // Handle ffmpeg logs
astiav.SetLogLevel(astiav.LogLevelError) astiav.SetLogLevel(astiav.LogLevelError)
@@ -21,15 +21,17 @@ func main() {
log.Printf("ffmpeg log: %s (level: %d)\n", strings.TrimSpace(msg), l) log.Printf("ffmpeg log: %s (level: %d)\n", strings.TrimSpace(msg), l)
}) })
for { go track.Start()
frame, err := track.ReceiveFrame()
if err != nil {
log.Fatal(fmt.Errorf("main: receive frame failed: %w", err))
continue
}
count := 0
for frame := range track.frameChannel {
// Do something with decoded frame // Do something with decoded frame
// log.Printf("%d, new frame: width: %d", track.networkLatency, frame.Width()) // log.Printf("%d, new frame: width: %d", track.networkLatency, frame.Width())
frame.Pts() frame.Pts()
count++
if count > 10 {
track.Stop()
}
} }
} }

View File

@@ -34,8 +34,12 @@ type VisionIpcTrack struct {
processLatency float64 processLatency float64
pcLatency float64 pcLatency float64
timeQ []int64 timeQ []int64
frameChannel chan *astiav.Frame
} }
var Open = false
func NewVisionIpcTrack(name string) (track *VisionIpcTrack, err error) { func NewVisionIpcTrack(name string) (track *VisionIpcTrack, err error) {
// Create a new context // Create a new context
context, err := zmq.NewContext() context, err := zmq.NewContext()
@@ -92,20 +96,32 @@ func NewVisionIpcTrack(name string) (track *VisionIpcTrack, err error) {
processLatency: 0.0, processLatency: 0.0,
pcLatency: 0.0, pcLatency: 0.0,
timeQ: []int64{}, timeQ: []int64{},
frameChannel: make(chan *astiav.Frame),
}, nil }, nil
} }
func (v *VisionIpcTrack) Close() { func (v *VisionIpcTrack) Stop() {
if !Open {
return
}
Open = false
defer func() {
v.f.Free() v.f.Free()
v.pkt.Free() v.pkt.Free()
v.stream.decCodecContext.Free() v.stream.decCodecContext.Free()
v.subscriber.Close() v.subscriber.Close()
v.context.Term() v.context.Term()
}()
<-v.frameChannel
} }
func (v *VisionIpcTrack) ReceiveFrame() (frame *astiav.Frame, err error) { func (v *VisionIpcTrack) Start() {
for { if Open {
return
}
Open = true
for Open {
msgs := DrainSock(v.subscriber, true) msgs := DrainSock(v.subscriber, true)
if len(msgs) > 0 { if len(msgs) > 0 {
for _, msg := range msgs { for _, msg := range msgs {
@@ -113,19 +129,19 @@ func (v *VisionIpcTrack) ReceiveFrame() (frame *astiav.Frame, err error) {
evt, err := cereal.ReadRootEvent(msg) evt, err := cereal.ReadRootEvent(msg)
if err != nil { if err != nil {
log.Fatal(fmt.Errorf("cereal read root event failed: %w", err)) log.Fatal(fmt.Errorf("cereal read root event failed: %w", err))
return nil, err continue
} }
encodeData, err := evt.RoadEncodeData() encodeData, err := evt.RoadEncodeData()
if err != nil { if err != nil {
log.Fatal(fmt.Errorf("cereal read road encode data failed: %w", err)) log.Fatal(fmt.Errorf("cereal read road encode data failed: %w", err))
return nil, err continue
} }
encodeIndex, err := encodeData.Idx() encodeIndex, err := encodeData.Idx()
if err != nil { if err != nil {
log.Fatal(fmt.Errorf("cereal read encode index failed: %w", err)) log.Fatal(fmt.Errorf("cereal read encode index failed: %w", err))
return nil, err continue
} }
encodeId := encodeIndex.EncodeId() encodeId := encodeIndex.EncodeId()
@@ -161,18 +177,18 @@ func (v *VisionIpcTrack) ReceiveFrame() (frame *astiav.Frame, err error) {
header, err := encodeData.Header() header, err := encodeData.Header()
if err != nil { if err != nil {
log.Fatal(fmt.Errorf("cereal read encode header failed: %w", err)) log.Fatal(fmt.Errorf("cereal read encode header failed: %w", err))
return nil, err continue
} }
if err := v.pkt.FromData(header); err != nil { if err := v.pkt.FromData(header); err != nil {
log.Fatal(fmt.Errorf("main: packet header load failed: %w", err)) log.Fatal(fmt.Errorf("main: packet header load failed: %w", err))
return nil, err continue
} }
// Send packet // Send packet
if err := v.stream.decCodecContext.SendPacket(v.pkt); err != nil { if err := v.stream.decCodecContext.SendPacket(v.pkt); err != nil {
log.Fatal(fmt.Errorf("main: sending packet failed: %w", err)) log.Fatal(fmt.Errorf("main: sending packet failed: %w", err))
return nil, err continue
} }
v.seenIframe = true v.seenIframe = true
@@ -182,28 +198,27 @@ func (v *VisionIpcTrack) ReceiveFrame() (frame *astiav.Frame, err error) {
data, err := encodeData.Data() data, err := encodeData.Data()
if err != nil { if err != nil {
log.Fatal(fmt.Errorf("cereal read encode data failed: %w", err)) log.Fatal(fmt.Errorf("cereal read encode data failed: %w", err))
return nil, err continue
} }
if err := v.pkt.FromData(data); err != nil { if err := v.pkt.FromData(data); err != nil {
log.Fatal(fmt.Errorf("main: packet header load failed: %w", err)) log.Fatal(fmt.Errorf("main: packet header load failed: %w", err))
return nil, err continue
} }
// Send packet // Send packet
if err := v.stream.decCodecContext.SendPacket(v.pkt); err != nil { if err := v.stream.decCodecContext.SendPacket(v.pkt); err != nil {
log.Fatal(fmt.Errorf("main: sending packet failed: %w", err)) log.Fatal(fmt.Errorf("main: sending packet failed: %w", err))
return nil, err continue
} }
if err := v.stream.decCodecContext.ReceiveFrame(v.f); err != nil { if err := v.stream.decCodecContext.ReceiveFrame(v.f); err != nil {
log.Fatal(fmt.Errorf("main: receiving frame failed: %w", err)) log.Fatal(fmt.Errorf("main: receiving frame failed: %w", err))
return nil, err continue
} }
v.pcLatency = ((float64(time.Now().UnixNano()) / 1e6) - float64(v.timeQ[0])) v.pcLatency = ((float64(time.Now().UnixNano()) / 1e6) - float64(v.timeQ[0]))
v.timeQ = v.timeQ[1:] v.timeQ = v.timeQ[1:]
fmt.Printf("%2d %4d %.3f %.3f roll %6.2f ms latency %6.2f ms + %6.2f ms + %6.2f ms = %6.2f ms %d %s\n", fmt.Printf("%2d %4d %.3f %.3f roll %6.2f ms latency %6.2f ms + %6.2f ms + %6.2f ms = %6.2f ms %d %s\n",
len(msgs), len(msgs),
encodeId, encodeId,
@@ -217,9 +232,10 @@ func (v *VisionIpcTrack) ReceiveFrame() (frame *astiav.Frame, err error) {
len(data), len(data),
v.name, v.name,
) )
v.frameChannel <- v.f
return v.f, nil
} }
} }
} }
close(v.frameChannel)
} }