From bfdd30bb1a70d54ade5301aae23904b09e0aaed1 Mon Sep 17 00:00:00 2001 From: Keyvan Fatehi Date: Mon, 20 Feb 2023 18:49:02 -0800 Subject: [PATCH] graceful go channels --- main.go | 16 +++++++------ vision_ipc_track.go | 58 +++++++++++++++++++++++++++++---------------- 2 files changed, 46 insertions(+), 28 deletions(-) diff --git a/main.go b/main.go index a3096c3..6316531 100644 --- a/main.go +++ b/main.go @@ -13,7 +13,7 @@ func main() { if err != nil { log.Fatal(fmt.Errorf("main: creating track failed: %w", err)) } - defer track.Close() + defer track.Stop() // Handle ffmpeg logs astiav.SetLogLevel(astiav.LogLevelError) @@ -21,15 +21,17 @@ func main() { log.Printf("ffmpeg log: %s (level: %d)\n", strings.TrimSpace(msg), l) }) - for { - frame, err := track.ReceiveFrame() - if err != nil { - log.Fatal(fmt.Errorf("main: receive frame failed: %w", err)) - continue - } + go track.Start() + count := 0 + for frame := range track.frameChannel { // Do something with decoded frame // log.Printf("%d, new frame: width: %d", track.networkLatency, frame.Width()) frame.Pts() + count++ + + if count > 10 { + track.Stop() + } } } diff --git a/vision_ipc_track.go b/vision_ipc_track.go index 372838f..7985b3f 100644 --- a/vision_ipc_track.go +++ b/vision_ipc_track.go @@ -34,8 +34,12 @@ type VisionIpcTrack struct { processLatency float64 pcLatency float64 timeQ []int64 + + frameChannel chan *astiav.Frame } +var Open = false + func NewVisionIpcTrack(name string) (track *VisionIpcTrack, err error) { // Create a new context context, err := zmq.NewContext() @@ -92,20 +96,32 @@ func NewVisionIpcTrack(name string) (track *VisionIpcTrack, err error) { processLatency: 0.0, pcLatency: 0.0, timeQ: []int64{}, + + frameChannel: make(chan *astiav.Frame), }, nil } -func (v *VisionIpcTrack) Close() { - v.f.Free() - v.pkt.Free() - v.stream.decCodecContext.Free() - v.subscriber.Close() - v.context.Term() +func (v *VisionIpcTrack) Stop() { + if !Open { + return + } + Open = false + defer func() { + v.f.Free() + v.pkt.Free() + v.stream.decCodecContext.Free() + v.subscriber.Close() + v.context.Term() + }() + <-v.frameChannel } -func (v *VisionIpcTrack) ReceiveFrame() (frame *astiav.Frame, err error) { - for { - +func (v *VisionIpcTrack) Start() { + if Open { + return + } + Open = true + for Open { msgs := DrainSock(v.subscriber, true) if len(msgs) > 0 { for _, msg := range msgs { @@ -113,19 +129,19 @@ func (v *VisionIpcTrack) ReceiveFrame() (frame *astiav.Frame, err error) { evt, err := cereal.ReadRootEvent(msg) if err != nil { log.Fatal(fmt.Errorf("cereal read root event failed: %w", err)) - return nil, err + continue } encodeData, err := evt.RoadEncodeData() if err != nil { log.Fatal(fmt.Errorf("cereal read road encode data failed: %w", err)) - return nil, err + continue } encodeIndex, err := encodeData.Idx() if err != nil { log.Fatal(fmt.Errorf("cereal read encode index failed: %w", err)) - return nil, err + continue } encodeId := encodeIndex.EncodeId() @@ -161,18 +177,18 @@ func (v *VisionIpcTrack) ReceiveFrame() (frame *astiav.Frame, err error) { header, err := encodeData.Header() if err != nil { log.Fatal(fmt.Errorf("cereal read encode header failed: %w", err)) - return nil, err + continue } if err := v.pkt.FromData(header); err != nil { log.Fatal(fmt.Errorf("main: packet header load failed: %w", err)) - return nil, err + continue } // Send packet if err := v.stream.decCodecContext.SendPacket(v.pkt); err != nil { log.Fatal(fmt.Errorf("main: sending packet failed: %w", err)) - return nil, err + continue } v.seenIframe = true @@ -182,28 +198,27 @@ func (v *VisionIpcTrack) ReceiveFrame() (frame *astiav.Frame, err error) { data, err := encodeData.Data() if err != nil { log.Fatal(fmt.Errorf("cereal read encode data failed: %w", err)) - return nil, err + continue } if err := v.pkt.FromData(data); err != nil { log.Fatal(fmt.Errorf("main: packet header load failed: %w", err)) - return nil, err + continue } // Send packet if err := v.stream.decCodecContext.SendPacket(v.pkt); err != nil { log.Fatal(fmt.Errorf("main: sending packet failed: %w", err)) - return nil, err + continue } if err := v.stream.decCodecContext.ReceiveFrame(v.f); err != nil { log.Fatal(fmt.Errorf("main: receiving frame failed: %w", err)) - return nil, err + continue } v.pcLatency = ((float64(time.Now().UnixNano()) / 1e6) - float64(v.timeQ[0])) v.timeQ = v.timeQ[1:] - fmt.Printf("%2d %4d %.3f %.3f roll %6.2f ms latency %6.2f ms + %6.2f ms + %6.2f ms = %6.2f ms %d %s\n", len(msgs), encodeId, @@ -217,9 +232,10 @@ func (v *VisionIpcTrack) ReceiveFrame() (frame *astiav.Frame, err error) { len(data), v.name, ) + v.frameChannel <- v.f - return v.f, nil } } } + close(v.frameChannel) }