generate services in go and extract a helper

This commit is contained in:
Keyvan Fatehi
2023-02-19 14:31:54 -08:00
parent 2ff94c73fa
commit 318f100987
3 changed files with 145 additions and 112 deletions

52
cereal_helper.go Normal file
View File

@@ -0,0 +1,52 @@
package main
import (
"fmt"
"capnproto.org/go/capnp/v3"
"github.com/commaai/cereal"
zmq "github.com/pebbe/zmq4"
)
func GetServicePort(name string) int {
port := -1
for _, s := range cereal.GetServices() {
if s.Name == name {
port = s.Port
break
}
}
if port < 0 {
panic("invalid endpoint")
}
return port
}
func DrainSock(socket *zmq.Socket, waitForOne bool) []*capnp.Message {
var ret []*capnp.Message
for {
var dat []byte
var err error
if waitForOne && len(ret) == 0 {
dat, err = socket.RecvBytes(0)
} else {
dat, err = socket.RecvBytes(zmq.DONTWAIT)
}
if err != nil {
break
}
msg, err := capnp.Unmarshal(dat)
if err != nil {
panic(err)
}
ret = append(ret, msg)
}
return ret
}
func GetServiceURI(name string) string {
port := GetServicePort(name)
return fmt.Sprintf("tcp://tici:%d", port)
}

View File

@@ -1,30 +1,56 @@
#!/bin/bash
set -euo pipefail
set -euxo pipefail
# Capn Proto
pushd cereal
git checkout *.capnp
rm -rf gen/go
for f in *.capnp; do
name=$(echo $f | sed 's/.capnp//')
mv $f $f.bak
echo "using Go = import \"/go.capnp\";" > $name.capnp
echo "\$Go.package(\"cereal\");" >> $name.capnp
echo "\$Go.import(\"cereal\");" >> $name.capnp
cat $f.bak >> $name.capnp
rm $f.bak
rm -rf gen/go tmp/go
mkdir -p gen/go tmp/go
pushd tmp/go
for f in ../../*.capnp; do
out=$(basename $f)
echo "using Go = import \"/go.capnp\";" > $out
echo "\$Go.package(\"cereal\");" >> $out
echo "\$Go.import(\"cereal\");" >> $out
cat $f | grep -v 'c++.capnp' | grep -v '$Cxx.namespace' >> $out
done
mkdir gen/go
go install capnproto.org/go/capnp/v3/capnpc-go@latest
capnp compile -I$(go env GOPATH)/src/capnproto.org/go/capnp/std -o go:gen/go *.capnp
git checkout *.capnp
capnp compile -I$(go env GOPATH)/src/capnproto.org/go/capnp/std -o go:../../gen/go *.capnp
popd # leave ./tmp/go
## Services
cat <<EOF > gen/go/services.go
// Code generated. DO NOT EDIT.
package cereal
type Service struct {
Name string
Port int
ShouldLog bool
Frequency int
Decimation int
}
var services = []Service{
EOF
grep ' {' services.h >> gen/go/services.go
cat <<EOF >> gen/go/services.go
}
func GetServices() []Service {
return services
}
EOF
cat <<EOF > gen/go/go.mod
// Code generated. DO NOT EDIT.
module github.com/commaai/cereal
go 1.19
EOF
popd
popd # leave ./cereal
# Go program
go build -o webrtc-body main.go
# go build -o webrtc-body main.go

145
main.go
View File

@@ -5,65 +5,13 @@ import (
"os"
"unsafe"
"capnproto.org/go/capnp/v3"
"github.com/giorgisio/goav/avcodec"
"github.com/giorgisio/goav/avutil"
zmq "github.com/pebbe/zmq4"
"github.com/commaai/cereal"
zmq "github.com/pebbe/zmq4"
)
type Service struct {
name string
port int
should_log bool
frequency int
decimation int
}
var services = []Service{
{"roadEncodeData", 8062, false, 20, -1},
{"driverEncodeData", 8063, false, 20, -1},
{"wideRoadEncodeData", 8064, false, 20, -1},
}
func getPort(endpoint string) int {
port := -1
for _, s := range services {
if s.name == endpoint {
port = s.port
break
}
}
if port < 0 {
panic("invalid endpoint")
}
return port
}
func drainSock(socket *zmq.Socket, waitForOne bool) [][]byte {
// XXX need to use captproto
// https://github.com/capnproto/go-capnproto2
var ret [][]byte
for {
var dat []byte
var err error
if waitForOne && len(ret) == 0 {
dat, err = socket.RecvBytes(0)
} else {
dat, err = socket.RecvBytes(zmq.DONTWAIT)
}
if err != nil {
break
}
ret = append(ret, dat)
}
return ret
}
func main() {
// Create a new context
context, _ := zmq.NewContext()
@@ -72,9 +20,8 @@ func main() {
// Connect to the socket
subscriber, _ := context.NewSocket(zmq.SUB)
defer subscriber.Close()
service := "roadEncodeData"
port := getPort(service)
out := fmt.Sprintf("tcp://tici:%d", port)
out := GetServiceURI("roadEncodeData")
fmt.Println(out)
subscriber.SetSubscribe("")
subscriber.Connect(out)
@@ -108,13 +55,9 @@ func main() {
pFrame := avutil.AvFrameAlloc()
var frame []byte
for frame == nil {
msgs := drainSock(subscriber, true)
msgs := DrainSock(subscriber, true)
if len(msgs) > 0 {
for _, rawMsg := range msgs {
msg, err := capnp.Unmarshal(rawMsg)
if err != nil {
panic(err)
}
for _, msg := range msgs {
evt, err := cereal.ReadRootEncodeData(msg)
if err != nil {
@@ -128,48 +71,60 @@ func main() {
panic(err)
}
encodeId := idx.EncodeId()
idxFlags := idx.Flags()
frameId := idx.FrameId()
segmentNum := idx.SegmentNum()
fmt.Printf("ts: %d,encodeId: %d\n", ts, encodeId)
fmt.Printf("ts: %d,frameId: %d,segmentNum: %d,encodeId: %d,idxFlags: %d\n", ts, frameId, segmentNum, encodeId, idxFlags)
continue
if encodeId != 0 && encodeId != uint32(lastIdx+1) {
fmt.Println("DROP PACKET!")
} else {
lastIdx = int(encodeId)
}
idxFlags := idx.Flags()
lastIdx = int(encodeId)
if !seenIframe && (idxFlags&V4L2_BUF_FLAG_KEYFRAME) != 0 {
fmt.Println("waiting for iframe")
continue
if !seenIframe && (idxFlags&V4L2_BUF_FLAG_KEYFRAME) == 0 {
fmt.Println("waiting for iframe")
continue
}
if !seenIframe {
// Decode video frame
pkt := avcodec.AvPacketAlloc()
if pkt == nil {
panic("cannot allocate packet")
}
if !seenIframe {
// Decode video frame
pkt := avcodec.AvPacketAlloc()
if pkt == nil {
panic("cannot allocate packet")
}
pkt.AvInitPacket()
pkt.SetFlags(pkt.Flags() | avcodec.AV_CODEC_FLAG_TRUNCATED)
response := codecContext.AvcodecSendPacket(packet)
if response < 0 {
fmt.Printf("Error while sending a packet to the decoder: %s\n", avutil.ErrorFromCode(response))
panic("decoding error")
}
if response >= 0 {
response = codecContext.AvcodecReceiveFrame((*avcodec.Frame)(unsafe.Pointer(pFrame)))
if response == avutil.AvErrorEAGAIN || response == avutil.AvErrorEOF {
break
} else if response < 0 {
fmt.Printf("Error while receiving a frame from the decoder: %s\n", avutil.ErrorFromCode(response))
fmt.Println("DROP SURFACE")
continue
}
seenIframe = true
}
// AvPacketFromData
header, err := evt.Header()
if err != nil {
panic(err)
}
headerPtr := unsafe.Pointer(&header[0])
pkt.AvPacketFromData((*uint8)(headerPtr), len(header))
// pkt.AvInitPacket()
pkt.SetFlags(pkt.Flags() | avcodec.AV_CODEC_FLAG_TRUNCATED)
response := codecContext.AvcodecSendPacket(pkt)
if response < 0 {
fmt.Printf("Error while sending a packet to the decoder: %s\n", avutil.ErrorFromCode(response))
// panic("decoding error")
}
if response >= 0 {
response = codecContext.AvcodecReceiveFrame((*avcodec.Frame)(unsafe.Pointer(pFrame)))
if response == avutil.AvErrorEAGAIN || response == avutil.AvErrorEOF {
fmt.Println("decode error")
break
} else if response < 0 {
fmt.Printf("Error while receiving a frame from the decoder: %s\n", avutil.ErrorFromCode(response))
fmt.Println("DROP SURFACE")
continue
}
fmt.Println("??")
seenIframe = true
}
fmt.Println("sss")
// frames, err := codec.Decode(av.Packet{Data: evt.Data()})
// if err != nil {