Rewrite ivideon source

This commit is contained in:
Alex X
2025-04-04 19:55:19 +03:00
parent d99bf122ea
commit be3a1c5b5f
7 changed files with 486 additions and 441 deletions

View File

@@ -2,12 +2,9 @@ package ivideon
import (
"github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/ivideon"
)
func Init() {
streams.HandleFunc("ivideon", func(source string) (core.Producer, error) {
return ivideon.Dial(source)
})
streams.HandleFunc("ivideon", ivideon.Dial)
}

View File

@@ -89,6 +89,12 @@ func (r *Reader) ReadBits64(n byte) (res uint64) {
return
}
func (r *Reader) ReadFloat32() float64 {
i := r.ReadUint16()
f := r.ReadUint16()
return float64(i) + float64(f)/65536
}
func (r *Reader) ReadBytes(n int) (b []byte) {
if r.bits == 0 {
if r.pos+n > len(r.buf) {

View File

@@ -1,6 +1,7 @@
package iso
import (
"bytes"
"encoding/binary"
"io"
@@ -10,89 +11,192 @@ import (
type Atom struct {
Name string
Data []byte
DecodeTime uint64
SamplesDuration []uint32
SamplesSize []uint32
}
func DecodeAtoms(b []byte) ([]*Atom, error) {
var atoms []*Atom
for len(b) > 8 {
size := binary.BigEndian.Uint32(b)
if uint32(len(b)) < size {
return nil, io.EOF
type AtomTkhd struct {
TrackID uint32
}
type AtomMdhd struct {
TimeScale uint32
}
type AtomVideo struct {
Name string
Config []byte
}
type AtomAudio struct {
Name string
Channels uint16
SampleRate uint32
Config []byte
}
type AtomMfhd struct {
Sequence uint32
}
type AtomMdat struct {
Data []byte
}
type AtomTfhd struct {
TrackID uint32
SampleDuration uint32
SampleSize uint32
SampleFlags uint32
}
type AtomTfdt struct {
DecodeTime uint64
}
type AtomTrun struct {
DataOffset uint32
FirstSampleFlags uint32
SamplesDuration []uint32
SamplesSize []uint32
SamplesFlags []uint32
SamplesCTS []uint32
}
func DecodeAtom(b []byte) (any, error) {
size := binary.BigEndian.Uint32(b)
if len(b) < int(size) {
return nil, io.EOF
}
name := string(b[4:8])
data := b[8:size]
switch name {
// useful containers
case Moov, MoovTrak, MoovTrakMdia, MoovTrakMdiaMinf, MoovTrakMdiaMinfStbl, Moof, MoofTraf:
return DecodeAtoms(data)
case MoovTrakTkhd:
return &AtomTkhd{TrackID: binary.BigEndian.Uint32(data[1+3+4+4:])}, nil
case MoovTrakMdiaMdhd:
return &AtomMdhd{TimeScale: binary.BigEndian.Uint32(data[1+3+4+4:])}, nil
case MoovTrakMdiaMinfStblStsd:
// support only 1 codec entry
if n := binary.BigEndian.Uint32(data[1+3:]); n == 1 {
return DecodeAtom(data[1+3+4:])
}
name := string(b[4:8])
data := b[8:size]
case "avc1", "hev1":
b = data[6+2+2+2+4+4+4+2+2+4+4+4+2+32+2+2:]
atom, err := DecodeAtom(b)
if err != nil {
return nil, err
}
if conf, ok := atom.(*Atom); ok {
return &AtomVideo{Name: name, Config: conf.Data}, nil
}
b = b[size:]
case "mp4a":
atom := &AtomAudio{Name: name}
switch name {
case Moof, MoofTraf:
childs, err := DecodeAtoms(data)
if err != nil {
return nil, err
rd := bits.NewReader(data)
rd.ReadBytes(6 + 2 + 2 + 2 + 4) // skip
atom.Channels = rd.ReadUint16()
rd.ReadBytes(2 + 2 + 2) // skip
atom.SampleRate = uint32(rd.ReadFloat32())
atom2, _ := DecodeAtom(rd.Left())
if conf, ok := atom2.(*Atom); ok {
_, b, _ = bytes.Cut(conf.Data, []byte{5, 0x80, 0x80, 0x80})
if n := len(b); n > 0 && n > 1+int(b[0]) {
atom.Config = b[1 : 1+b[0]]
}
}
return atom, nil
case MoofMfhd:
return &AtomMfhd{Sequence: binary.BigEndian.Uint32(data[4:])}, nil
case MoofTrafTfhd:
rd := bits.NewReader(data)
_ = rd.ReadByte() // version
flags := rd.ReadUint24()
atom := &AtomTfhd{
TrackID: rd.ReadUint32(),
}
if flags&TfhdDefaultSampleDuration != 0 {
atom.SampleDuration = rd.ReadUint32()
}
if flags&TfhdDefaultSampleSize != 0 {
atom.SampleSize = rd.ReadUint32()
}
if flags&TfhdDefaultSampleFlags != 0 {
atom.SampleFlags = rd.ReadUint32() // skip
}
return atom, nil
case MoofTrafTfdt:
return &AtomTfdt{DecodeTime: binary.BigEndian.Uint64(data[4:])}, nil
case MoofTrafTrun:
rd := bits.NewReader(data)
_ = rd.ReadByte() // version
flags := rd.ReadUint24()
samples := rd.ReadUint32()
atom := &AtomTrun{}
if flags&TrunDataOffset != 0 {
atom.DataOffset = rd.ReadUint32()
}
if flags&TrunFirstSampleFlags != 0 {
atom.FirstSampleFlags = rd.ReadUint32()
}
for i := uint32(0); i < samples; i++ {
if flags&TrunSampleDuration != 0 {
atom.SamplesDuration = append(atom.SamplesDuration, rd.ReadUint32())
}
if flags&TrunSampleSize != 0 {
atom.SamplesSize = append(atom.SamplesSize, rd.ReadUint32())
}
if flags&TrunSampleFlags != 0 {
atom.SamplesFlags = append(atom.SamplesFlags, rd.ReadUint32())
}
if flags&TrunSampleCTS != 0 {
atom.SamplesCTS = append(atom.SamplesCTS, rd.ReadUint32())
}
}
return atom, nil
case Mdat:
return &AtomMdat{Data: data}, nil
}
return &Atom{Name: name, Data: data}, nil
}
func DecodeAtoms(b []byte) (atoms []any, err error) {
for len(b) > 0 {
atom, err := DecodeAtom(b)
if err != nil {
return nil, err
}
if childs, ok := atom.([]any); ok {
atoms = append(atoms, childs...)
case MoofMfhd, MoofTrafTfhd:
continue
case MoofTrafTfdt:
if len(data) < 8 {
return nil, io.EOF
}
dt := binary.BigEndian.Uint64(data[4:])
atoms = append(atoms, &Atom{Name: name, DecodeTime: dt})
case MoofTrafTrun:
rd := bits.NewReader(data)
_ = rd.ReadByte() // version
flags := rd.ReadUint24()
samples := rd.ReadUint32()
if flags&TrunDataOffset != 0 {
_ = rd.ReadUint32() // skip
}
if flags&TrunFirstSampleFlags != 0 {
_ = rd.ReadUint32() // skip
}
atom := &Atom{Name: name}
for i := uint32(0); i < samples; i++ {
if flags&TrunSampleDuration != 0 {
atom.SamplesDuration = append(atom.SamplesDuration, rd.ReadUint32())
}
if flags&TrunSampleSize != 0 {
atom.SamplesSize = append(atom.SamplesSize, rd.ReadUint32())
}
if flags&TrunSampleFlags != 0 {
_ = rd.ReadUint32() // skip
}
if flags&TrunSampleCTS != 0 {
_ = rd.ReadUint32() // skip
}
}
if rd.EOF {
return nil, io.EOF
}
} else {
atoms = append(atoms, atom)
case Mdat:
atoms = append(atoms, &Atom{Name: name, Data: data})
default:
println("iso: unsupported atom: " + name)
}
size := binary.BigEndian.Uint32(b)
b = b[size:]
}
return atoms, nil

View File

@@ -1,314 +0,0 @@
package ivideon
import (
"bytes"
"encoding/binary"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"sync"
"time"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/h264"
"github.com/AlexxIT/go2rtc/pkg/iso"
"github.com/gorilla/websocket"
"github.com/pion/rtp"
)
type State byte
const (
StateNone State = iota
StateConn
StateHandle
)
// Deprecated: should be rewritten to core.Connection
type Client struct {
core.Listener
ID string
conn *websocket.Conn
medias []*core.Media
receiver *core.Receiver
msg *message
t0 time.Time
buffer chan []byte
state State
mu sync.Mutex
recv int
}
func Dial(source string) (*Client, error) {
id := strings.Replace(source[8:], "/", ":", 1)
client := &Client{ID: id}
if err := client.Dial(); err != nil {
return nil, err
}
return client, nil
}
func (c *Client) Dial() (err error) {
resp, err := http.Get(
"https://openapi-alpha.ivideon.com/cameras/" + c.ID +
"/live_stream?op=GET&access_token=public&q=2&" +
"video_codecs=h264&format=ws-fmp4",
)
data, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
var v liveResponse
if err = json.Unmarshal(data, &v); err != nil {
return err
}
if !v.Success {
return fmt.Errorf("wrong response: %s", data)
}
c.conn, _, err = websocket.DefaultDialer.Dial(v.Result.URL, nil)
if err != nil {
return err
}
if err = c.getTracks(); err != nil {
_ = c.conn.Close()
return err
}
c.state = StateConn
return nil
}
func (c *Client) Handle() error {
// add delay to the stream for smooth playing (not a best solution)
c.t0 = time.Now().Add(time.Second)
c.mu.Lock()
if c.state == StateConn {
c.buffer = make(chan []byte, 5)
c.state = StateHandle
// processing stream in separate thread for lower delay between packets
go c.worker(c.buffer)
}
c.mu.Unlock()
_, data, err := c.conn.ReadMessage()
if err != nil {
return err
}
if c.receiver != nil && c.receiver.ID == c.msg.Track {
c.mu.Lock()
if c.state == StateHandle {
c.buffer <- data
c.recv += len(data)
}
c.mu.Unlock()
}
// we have one unprocessed msg after getTracks
for {
_, data, err = c.conn.ReadMessage()
if err != nil {
return err
}
var msg message
if err = json.Unmarshal(data, &msg); err != nil {
return err
}
switch msg.Type {
case "stream-init":
continue
case "metadata":
continue
case "fragment":
_, data, err = c.conn.ReadMessage()
if err != nil {
return err
}
if c.receiver != nil && c.receiver.ID == msg.Track {
c.mu.Lock()
if c.state == StateHandle {
c.buffer <- data
c.recv += len(data)
}
c.mu.Unlock()
}
default:
return fmt.Errorf("wrong message type: %s", data)
}
}
}
func (c *Client) Close() error {
c.mu.Lock()
defer c.mu.Unlock()
switch c.state {
case StateNone:
return nil
case StateConn:
case StateHandle:
close(c.buffer)
}
c.state = StateNone
return c.conn.Close()
}
func (c *Client) getTracks() error {
for {
_, data, err := c.conn.ReadMessage()
if err != nil {
return err
}
var msg message
if err = json.Unmarshal(data, &msg); err != nil {
return err
}
switch msg.Type {
case "metadata":
continue
case "stream-init":
s := msg.CodecString
i := strings.IndexByte(s, '.')
if i > 0 {
s = s[:i]
}
switch s {
case "avc1": // avc1.4d0029
// skip multiple identical init
if c.receiver != nil {
continue
}
i = bytes.Index(msg.Data, []byte("avcC")) - 4
if i < 0 {
return fmt.Errorf("ivideon: wrong AVC: %s", msg.Data)
}
avccLen := binary.BigEndian.Uint32(msg.Data[i:])
data = msg.Data[i+8 : i+int(avccLen)]
codec := h264.ConfigToCodec(data)
media := &core.Media{
Kind: core.KindVideo,
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{codec},
}
c.medias = append(c.medias, media)
c.receiver = core.NewReceiver(media, codec)
c.receiver.ID = msg.TrackID
case "mp4a": // mp4a.40.2
}
case "fragment":
c.msg = &msg
return nil
default:
return fmt.Errorf("wrong message type: %s", data)
}
}
}
func (c *Client) worker(buffer chan []byte) {
for data := range buffer {
atoms, err := iso.DecodeAtoms(data)
if err != nil {
continue
}
var trun *iso.Atom
var ts uint32
for _, atom := range atoms {
switch atom.Name {
case iso.MoofTrafTrun:
trun = atom
case iso.MoofTrafTfdt:
ts = uint32(atom.DecodeTime)
case iso.Mdat:
data = atom.Data
}
}
if trun == nil || trun.SamplesDuration == nil || trun.SamplesSize == nil {
continue
}
for i := 0; i < len(trun.SamplesDuration); i++ {
duration := trun.SamplesDuration[i]
size := trun.SamplesSize[i]
// synchronize framerate for WebRTC and MSE
d := time.Duration(ts)*time.Millisecond - time.Since(c.t0)
if d < 0 {
d = time.Duration(duration) * time.Millisecond / 2
}
time.Sleep(d)
// can be SPS, PPS and IFrame in one packet
packet := &rtp.Packet{
// ivideon clockrate=1000, RTP clockrate=90000
Header: rtp.Header{Timestamp: ts * 90},
Payload: data[:size],
}
c.receiver.WriteRTP(packet)
data = data[size:]
ts += duration
}
}
}
type liveResponse struct {
Result struct {
URL string `json:"url"`
} `json:"result"`
Success bool `json:"success"`
}
type message struct {
Type string `json:"type"`
CodecString string `json:"codec_string"`
Data []byte `json:"data"`
TrackID byte `json:"track_id"`
Track byte `json:"track"`
StartTime float32 `json:"start_time"`
Duration float32 `json:"duration"`
IsKey bool `json:"is_key"`
DataOffset uint32 `json:"data_offset"`
}

187
pkg/ivideon/ivideon.go Normal file
View File

@@ -0,0 +1,187 @@
package ivideon
import (
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
"time"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/mp4"
"github.com/gorilla/websocket"
)
type Producer struct {
core.Connection
conn *websocket.Conn
buf []byte
dem *mp4.Demuxer
}
func Dial(source string) (core.Producer, error) {
id := strings.Replace(source[8:], "/", ":", 1)
url, err := GetLiveStream(id)
if err != nil {
return nil, err
}
conn, _, err := websocket.DefaultDialer.Dial(url, nil)
if err != nil {
return nil, err
}
prod := &Producer{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "ivideon",
Protocol: core.Before(url, ":"), // wss
RemoteAddr: conn.RemoteAddr().String(),
Source: source,
URL: url,
Transport: conn,
},
conn: conn,
}
if err = prod.probe(); err != nil {
_ = conn.Close()
return nil, err
}
return prod, nil
}
func GetLiveStream(id string) (string, error) {
// &video_codecs=h264,h265&audio_codecs=aac,mp3,pcma,pcmu,none
resp, err := http.Get(
"https://openapi-alpha.ivideon.com/cameras/" + id +
"/live_stream?op=GET&access_token=public&q=2&video_codecs=h264&format=ws-fmp4",
)
if err != nil {
return "", err
}
var v struct {
Message string `json:"message"`
Result struct {
URL string `json:"url"`
} `json:"result"`
Success bool `json:"success"`
}
if err = json.NewDecoder(resp.Body).Decode(&v); err != nil {
return "", err
}
if !v.Success {
return "", fmt.Errorf("ivideon: can't get live_stream: " + v.Message)
}
return v.Result.URL, nil
}
func (p *Producer) Start() error {
receivers := make(map[uint32]*core.Receiver)
for _, receiver := range p.Receivers {
trackID := p.dem.GetTrackID(receiver.Codec)
receivers[trackID] = receiver
}
ch := make(chan []byte, 10)
defer close(ch)
ch <- p.buf
go func() {
// add delay to the stream for smooth playing (not a best solution)
t0 := time.Now()
for data := range ch {
trackID, packets := p.dem.Demux(data)
if receiver := receivers[trackID]; receiver != nil {
clockRate := time.Duration(receiver.Codec.ClockRate)
for _, packet := range packets {
// synchronize framerate for WebRTC and MSE
ts := time.Second * time.Duration(packet.Timestamp) / clockRate
d := ts - time.Since(t0)
if d < 0 {
d = 10 * time.Millisecond
}
time.Sleep(d)
receiver.WriteRTP(packet)
}
}
}
}()
for {
var msg message
if err := p.conn.ReadJSON(&msg); err != nil {
return err
}
switch msg.Type {
case "stream-init", "metadata":
continue
case "fragment":
_, b, err := p.conn.ReadMessage()
if err != nil {
return err
}
p.Recv += len(b)
ch <- b
default:
return errors.New("ivideon: wrong message type: " + msg.Type)
}
}
}
func (p *Producer) probe() (err error) {
p.dem = &mp4.Demuxer{}
for {
var msg message
if err = p.conn.ReadJSON(&msg); err != nil {
return err
}
switch msg.Type {
case "metadata":
continue
case "stream-init":
// it's difficult to maintain audio
if strings.HasPrefix(msg.CodecString, "avc1") {
medias := p.dem.Probe(msg.Data)
p.Medias = append(p.Medias, medias...)
}
case "fragment":
_, p.buf, err = p.conn.ReadMessage()
return
default:
return errors.New("ivideon: wrong message type: " + msg.Type)
}
}
}
type message struct {
Type string `json:"type"`
CodecString string `json:"codec_string"`
Data []byte `json:"data"`
//TrackID byte `json:"track_id"`
//Track byte `json:"track"`
//StartTime float32 `json:"start_time"`
//Duration float32 `json:"duration"`
//IsKey bool `json:"is_key"`
//DataOffset uint32 `json:"data_offset"`
}

View File

@@ -1,51 +0,0 @@
package ivideon
import (
"encoding/json"
"github.com/AlexxIT/go2rtc/pkg/core"
)
func (c *Client) GetMedias() []*core.Media {
return c.medias
}
func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) {
if c.receiver != nil {
return c.receiver, nil
}
return nil, core.ErrCantGetTrack
}
func (c *Client) Start() error {
err := c.Handle()
if c.buffer == nil {
return nil
}
return err
}
func (c *Client) Stop() error {
if c.receiver != nil {
c.receiver.Close()
}
return c.Close()
}
func (c *Client) MarshalJSON() ([]byte, error) {
info := &core.Connection{
ID: core.ID(c),
FormatName: "ivideon",
Protocol: "ws",
URL: c.ID,
Medias: c.medias,
Recv: c.recv,
}
if c.conn != nil {
info.RemoteAddr = c.conn.RemoteAddr().String()
}
if c.receiver != nil {
info.Receivers = []*core.Receiver{c.receiver}
}
return json.Marshal(info)
}

116
pkg/mp4/demuxer.go Normal file
View File

@@ -0,0 +1,116 @@
package mp4
import (
"github.com/AlexxIT/go2rtc/pkg/aac"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/h264"
"github.com/AlexxIT/go2rtc/pkg/iso"
"github.com/pion/rtp"
)
type Demuxer struct {
codecs map[uint32]*core.Codec
timeScales map[uint32]float32
}
func (d *Demuxer) Probe(init []byte) (medias []*core.Media) {
var trackID, timeScale uint32
if d.codecs == nil {
d.codecs = make(map[uint32]*core.Codec)
d.timeScales = make(map[uint32]float32)
}
atoms, _ := iso.DecodeAtoms(init)
for _, atom := range atoms {
var codec *core.Codec
switch atom := atom.(type) {
case *iso.AtomTkhd:
trackID = atom.TrackID
case *iso.AtomMdhd:
timeScale = atom.TimeScale
case *iso.AtomVideo:
switch atom.Name {
case "avc1":
codec = h264.ConfigToCodec(atom.Config)
}
case *iso.AtomAudio:
switch atom.Name {
case "mp4a":
codec = aac.ConfigToCodec(atom.Config)
}
}
if codec != nil {
d.codecs[trackID] = codec
d.timeScales[trackID] = float32(codec.ClockRate) / float32(timeScale)
medias = append(medias, &core.Media{
Kind: codec.Kind(),
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{codec},
})
}
}
return
}
func (d *Demuxer) GetTrackID(codec *core.Codec) uint32 {
for trackID, c := range d.codecs {
if c == codec {
return trackID
}
}
return 0
}
func (d *Demuxer) Demux(data2 []byte) (trackID uint32, packets []*core.Packet) {
atoms, err := iso.DecodeAtoms(data2)
if err != nil {
return 0, nil
}
var ts uint32
var trun *iso.AtomTrun
var data []byte
for _, atom := range atoms {
switch atom := atom.(type) {
case *iso.AtomTfhd:
trackID = atom.TrackID
case *iso.AtomTfdt:
ts = uint32(atom.DecodeTime)
case *iso.AtomTrun:
trun = atom
case *iso.AtomMdat:
data = atom.Data
}
}
timeScale := d.timeScales[trackID]
if timeScale == 0 {
return 0, nil
}
n := len(trun.SamplesDuration)
packets = make([]*core.Packet, n)
for i := 0; i < n; i++ {
duration := trun.SamplesDuration[i]
size := trun.SamplesSize[i]
// can be SPS, PPS and IFrame in one packet
timestamp := uint32(float32(ts) * timeScale)
packets[i] = &rtp.Packet{
Header: rtp.Header{Timestamp: timestamp},
Payload: data[:size],
}
data = data[size:]
ts += duration
}
return
}