diff --git a/internal/flussonic/flussonic.go b/internal/flussonic/flussonic.go new file mode 100644 index 00000000..6e874285 --- /dev/null +++ b/internal/flussonic/flussonic.go @@ -0,0 +1,10 @@ +package flussonic + +import ( + "github.com/AlexxIT/go2rtc/internal/streams" + "github.com/AlexxIT/go2rtc/pkg/flussonic" +) + +func Init() { + streams.HandleFunc("flussonic", flussonic.Dial) +} diff --git a/main.go b/main.go index 8a62bdb6..f3589ebf 100644 --- a/main.go +++ b/main.go @@ -12,6 +12,7 @@ import ( "github.com/AlexxIT/go2rtc/internal/exec" "github.com/AlexxIT/go2rtc/internal/expr" "github.com/AlexxIT/go2rtc/internal/ffmpeg" + "github.com/AlexxIT/go2rtc/internal/flussonic" "github.com/AlexxIT/go2rtc/internal/gopro" "github.com/AlexxIT/go2rtc/internal/hass" "github.com/AlexxIT/go2rtc/internal/hls" @@ -88,6 +89,7 @@ func main() { gopro.Init() // gopro source doorbird.Init() // doorbird source v4l2.Init() // v4l2 source + flussonic.Init() // 6. Helper modules diff --git a/pkg/flussonic/flussonic.go b/pkg/flussonic/flussonic.go new file mode 100644 index 00000000..70b6e9d4 --- /dev/null +++ b/pkg/flussonic/flussonic.go @@ -0,0 +1,176 @@ +package flussonic + +import ( + "strings" + + "github.com/AlexxIT/go2rtc/pkg/aac" + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/h264" + "github.com/AlexxIT/go2rtc/pkg/iso" + "github.com/gorilla/websocket" + "github.com/pion/rtp" +) + +type Producer struct { + core.Connection + conn *websocket.Conn + + videoTrackID, audioTrackID uint32 + videoTimeScale, audioTimeScale float32 +} + +func Dial(source string) (core.Producer, error) { + url, _ := strings.CutPrefix(source, "flussonic:") + conn, _, err := websocket.DefaultDialer.Dial(url, nil) + if err != nil { + return nil, err + } + + prod := &Producer{ + Connection: core.Connection{ + ID: core.NewID(), + FormatName: "flussonic", + Protocol: core.Before(url, ":"), // wss + RemoteAddr: conn.RemoteAddr().String(), + URL: url, + Transport: conn, + }, + conn: conn, + } + + if err = prod.probe(); err != nil { + _ = conn.Close() + return nil, err + } + + return prod, nil +} + +func (p *Producer) probe() error { + var init struct { + //Metadata struct { + // Tracks []struct { + // Width int `json:"width,omitempty"` + // Height int `json:"height,omitempty"` + // Fps int `json:"fps,omitempty"` + // Content string `json:"content"` + // TrackId string `json:"trackId"` + // Bitrate int `json:"bitrate"` + // } `json:"tracks"` + //} `json:"metadata"` + Tracks []struct { + Content string `json:"content"` + Id uint32 `json:"id"` + Payload []byte `json:"payload"` + } `json:"tracks"` + //Type string `json:"type"` + } + + if err := p.conn.ReadJSON(&init); err != nil { + return err + } + + var timeScale uint32 + + for _, track := range init.Tracks { + atoms, _ := iso.DecodeAtoms(track.Payload) + for _, atom := range atoms { + switch atom := atom.(type) { + case *iso.AtomMdhd: + timeScale = atom.TimeScale + case *iso.AtomVideo: + switch atom.Name { + case "avc1": + codec := h264.AVCCToCodec(atom.Config) + p.Medias = append(p.Medias, &core.Media{ + Kind: core.KindVideo, + Direction: core.DirectionRecvonly, + Codecs: []*core.Codec{codec}, + }) + p.videoTrackID = track.Id + p.videoTimeScale = float32(codec.ClockRate) / float32(timeScale) + } + case *iso.AtomAudio: + switch atom.Name { + case "mp4a": + codec := aac.ConfigToCodec(atom.Config) + p.Medias = append(p.Medias, &core.Media{ + Kind: core.KindAudio, + Direction: core.DirectionRecvonly, + Codecs: []*core.Codec{codec}, + }) + p.audioTrackID = track.Id + p.audioTimeScale = float32(codec.ClockRate) / float32(timeScale) + } + } + } + } + + return nil +} + +func (p *Producer) Start() error { + if err := p.conn.WriteMessage(websocket.TextMessage, []byte("resume")); err != nil { + return err + } + + receivers := make(map[uint32]*core.Receiver) + timeScales := make(map[uint32]float32) + + for _, receiver := range p.Receivers { + switch receiver.Codec.Kind() { + case core.KindVideo: + receivers[p.videoTrackID] = receiver + timeScales[p.videoTrackID] = p.videoTimeScale + case core.KindAudio: + receivers[p.audioTrackID] = receiver + timeScales[p.audioTrackID] = p.audioTimeScale + } + } + + ch := make(chan []byte, 10) + defer close(ch) + + go func() { + for b := range ch { + atoms, err := iso.DecodeAtoms(b) + if err != nil { + continue + } + + var trackID uint32 + var decodeTime uint64 + + for _, atom := range atoms { + switch atom := atom.(type) { + case *iso.AtomTfhd: + trackID = atom.TrackID + case *iso.AtomTfdt: + decodeTime = atom.DecodeTime + case *iso.AtomMdat: + b = atom.Data + } + } + + if recv := receivers[trackID]; recv != nil { + timestamp := uint32(float32(decodeTime) * timeScales[trackID]) + packet := &rtp.Packet{ + Header: rtp.Header{Timestamp: timestamp}, + Payload: b, + } + recv.WriteRTP(packet) + } + } + }() + + for { + mType, b, err := p.conn.ReadMessage() + if err != nil { + return err + } + if mType == websocket.BinaryMessage { + p.Recv += len(b) + ch <- b + } + } +}