diff --git a/internal/http/http.go b/internal/http/http.go index f1b3c227..356279ba 100644 --- a/internal/http/http.go +++ b/internal/http/http.go @@ -53,11 +53,8 @@ func handleHTTP(url string) (core.Producer, error) { return multipart.NewClient(res) case "video/x-flv": - var client *flv.Client - if client, err = flv.NewClient(res.Body); err != nil { - return nil, err - } - if err = client.Describe(); err != nil { + client, err := flv.Open(res.Body) + if err != nil { return nil, err } client.URL = url diff --git a/internal/rtmp/rtmp.go b/internal/rtmp/rtmp.go index efbeb0df..1cf7c16b 100644 --- a/internal/rtmp/rtmp.go +++ b/internal/rtmp/rtmp.go @@ -23,9 +23,6 @@ func streamsHandle(url string) (core.Producer, error) { if err != nil { return nil, err } - if err = client.Describe(); err != nil { - return nil, err - } return client, nil } @@ -42,17 +39,12 @@ func apiHandle(w http.ResponseWriter, r *http.Request) { return } - client, err := flv.NewClient(r.Body) + client, err := flv.Open(r.Body) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } - if err = client.Describe(); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - stream.AddProducer(client) if err = client.Start(); err != nil && err != io.EOF { diff --git a/pkg/core/readseeker.go b/pkg/core/readseeker.go index 49894622..5eff128f 100644 --- a/pkg/core/readseeker.go +++ b/pkg/core/readseeker.go @@ -5,7 +5,7 @@ import ( "io" ) -const ProbeSize = 5 * 1024 * 1024 // 5MB +const ProbeSize = 1024 * 1024 // 1MB const ( BufferDisable = 0 @@ -95,7 +95,11 @@ func (r *ReadSeeker) Peek(n int) ([]byte, error) { if _, err := io.ReadAtLeast(r, b, n); err != nil { return nil, err } - r.BufferSize = BufferDrainAndClear - r.pos = 0 + r.Rewind() return b, nil } + +func (r *ReadSeeker) Rewind() { + r.BufferSize = BufferDrainAndClear + r.pos = 0 +} diff --git a/pkg/flv/client.go b/pkg/flv/client.go index ad447780..d75df85f 100644 --- a/pkg/flv/client.go +++ b/pkg/flv/client.go @@ -2,6 +2,8 @@ package flv import ( "bytes" + "encoding/binary" + "errors" "io" "time" @@ -12,10 +14,10 @@ import ( ) type Client struct { - Transport - URL string + rd *core.ReadSeeker + medias []*core.Media receivers []*core.Receiver @@ -24,15 +26,33 @@ type Client struct { recv int } -func NewClient(rd io.Reader) (*Client, error) { - tr, err := NewTransport(rd) - if err != nil { +func Open(rd io.Reader) (*Client, error) { + client := &Client{ + rd: core.NewReadSeeker(rd), + } + if err := client.describe(); err != nil { return nil, err } - return &Client{Transport: tr}, nil + return client, nil } -func (c *Client) Describe() error { +const ( + TagAudio = 8 + TagVideo = 9 + TagData = 18 + + CodecAAC = 10 + CodecAVC = 7 +) + +func (c *Client) describe() error { + if err := c.readHeader(); err != nil { + return err + } + + c.rd.BufferSize = core.ProbeSize + defer c.rd.Rewind() + // Normal software sends: // 1. Video/audio flag in header // 2. MetaData as first tag (with video/audio codec info) @@ -42,40 +62,39 @@ func (c *Client) Describe() error { // 1. Empty video/audio flag // 2. MedaData without stereo key for AAC // 3. Audio header after Video keyframe tag - waitVideo := true - waitAudio := true + waitType := []byte{TagData} timeout := time.Now().Add(core.ProbeTimeout) - for (waitVideo || waitAudio) && time.Now().Before(timeout) { - tagType, _, b, err := c.Transport.ReadTag() + for len(waitType) != 0 && time.Now().Before(timeout) { + pkt, err := c.readPacket() if err != nil { return err } - c.recv += len(b) + if i := bytes.IndexByte(waitType, pkt.PayloadType); i < 0 { + continue + } else { + waitType = append(waitType[:i], waitType[i+1:]...) + } - switch tagType { + switch pkt.PayloadType { case TagAudio: - if !waitAudio { - continue - } + _ = pkt.Payload[1] // bounds - waitAudio = false - - codecID := b[0] >> 4 // SoundFormat - _ = b[0] & 0b1100 // SoundRate - _ = b[0] & 0b0010 // SoundSize - _ = b[0] & 0b0001 // SoundType + codecID := pkt.Payload[0] >> 4 // SoundFormat + _ = pkt.Payload[0] & 0b1100 // SoundRate + _ = pkt.Payload[0] & 0b0010 // SoundSize + _ = pkt.Payload[0] & 0b0001 // SoundType if codecID != CodecAAC { continue } - if b[1] != 0 { // check if header + if pkt.Payload[1] != 0 { // check if header continue } - codec := aac.ConfigToCodec(b[2:]) + codec := aac.ConfigToCodec(pkt.Payload[2:]) media := &core.Media{ Kind: core.KindAudio, Direction: core.DirectionRecvonly, @@ -84,24 +103,20 @@ func (c *Client) Describe() error { c.medias = append(c.medias, media) case TagVideo: - if !waitVideo { - continue - } + _ = pkt.Payload[1] // bounds - waitVideo = false - - _ = b[0] >> 4 // FrameType - codecID := b[0] & 0b1111 // CodecID + _ = pkt.Payload[0] >> 4 // FrameType + codecID := pkt.Payload[0] & 0b1111 // CodecID if codecID != CodecAVC { continue } - if b[1] != 0 { // check if header + if pkt.Payload[1] != 0 { // check if header continue } - codec := h264.ConfigToCodec(b[5:]) + codec := h264.ConfigToCodec(pkt.Payload[5:]) media := &core.Media{ Kind: core.KindVideo, Direction: core.DirectionRecvonly, @@ -110,53 +125,100 @@ func (c *Client) Describe() error { c.medias = append(c.medias, media) case TagData: - if !bytes.Contains(b, []byte("onMetaData")) { - continue + if !bytes.Contains(pkt.Payload, []byte("onMetaData")) { + waitType = append(waitType, TagData) + } + if bytes.Contains(pkt.Payload, []byte("videocodecid")) { + waitType = append(waitType, TagVideo) + } + if bytes.Contains(pkt.Payload, []byte("audiocodecid")) { + waitType = append(waitType, TagAudio) } - waitVideo = bytes.Contains(b, []byte("videocodecid")) - waitAudio = bytes.Contains(b, []byte("audiocodecid")) } } return nil } -func (c *Client) Play() error { +func (c *Client) play() error { for { - tagType, timeMS, b, err := c.Transport.ReadTag() + pkt, err := c.readPacket() if err != nil { return err } - c.recv += len(b) + c.recv += len(pkt.Payload) - switch tagType { + switch pkt.PayloadType { case TagAudio: - if c.audio == nil || b[1] == 0 { + if c.audio == nil || pkt.Payload[1] == 0 { continue } - pkt := &rtp.Packet{ - Header: rtp.Header{ - Timestamp: TimeToRTP(timeMS, c.audio.Codec.ClockRate), - }, - Payload: b[2:], - } + pkt.Timestamp = TimeToRTP(pkt.Timestamp, c.audio.Codec.ClockRate) + pkt.Payload = pkt.Payload[2:] c.audio.WriteRTP(pkt) case TagVideo: // frame type 4b, codecID 4b, avc packet type 8b, composition time 24b - if c.video == nil || b[1] == 0 { + if c.video == nil || pkt.Payload[1] == 0 { continue } - pkt := &rtp.Packet{ - Header: rtp.Header{ - Timestamp: TimeToRTP(timeMS, c.video.Codec.ClockRate), - }, - Payload: b[5:], - } + pkt.Timestamp = TimeToRTP(pkt.Timestamp, c.video.Codec.ClockRate) + pkt.Payload = pkt.Payload[5:] c.video.WriteRTP(pkt) } } } + +func (c *Client) readHeader() error { + b := make([]byte, 9) + if _, err := io.ReadFull(c.rd, b); err != nil { + return err + } + + if string(b[:3]) != "FLV" { + return errors.New("flv: wrong header") + } + + _ = b[4] // flags (skip because unsupported by Reolink cameras) + + if skip := binary.BigEndian.Uint32(b[5:]) - 9; skip > 0 { + if _, err := io.ReadFull(c.rd, make([]byte, skip)); err != nil { + return err + } + } + + return nil +} + +func (c *Client) readPacket() (*rtp.Packet, error) { + // https://rtmp.veriskope.com/pdf/video_file_format_spec_v10.pdf + b := make([]byte, 4+11) + if _, err := io.ReadFull(c.rd, b); err != nil { + return nil, err + } + + b = b[4 : 4+11] // skip previous tag size + + size := uint32(b[1])<<16 | uint32(b[2])<<8 | uint32(b[3]) + + pkt := &rtp.Packet{ + Header: rtp.Header{ + PayloadType: b[0], + Timestamp: uint32(b[4])<<16 | uint32(b[5])<<8 | uint32(b[6]) | uint32(b[7])<<24, + }, + Payload: make([]byte, size), + } + + if _, err := io.ReadFull(c.rd, pkt.Payload); err != nil { + return nil, err + } + + return pkt, nil +} + +func TimeToRTP(timeMS uint32, clockRate uint32) uint32 { + return timeMS * clockRate / 1000 +} diff --git a/pkg/flv/flv.go b/pkg/flv/flv.go deleted file mode 100644 index 9557334b..00000000 --- a/pkg/flv/flv.go +++ /dev/null @@ -1,87 +0,0 @@ -package flv - -import ( - "encoding/binary" - "errors" - "io" -) - -const ( - TagAudio = 8 - TagVideo = 9 - TagData = 18 - - CodecAAC = 10 - CodecAVC = 7 -) - -// Transport - it is recommended to implement io.Closer -type Transport interface { - ReadTag() (byte, uint32, []byte, error) -} - -// NewTransport - it is recommended to use bufio.Reader -func NewTransport(rd io.Reader) (Transport, error) { - c := &flv{rd: rd} - if err := c.readHeader(); err != nil { - return nil, err - } - return c, nil -} - -type flv struct { - rd io.Reader -} - -func (c *flv) ReadTag() (byte, uint32, []byte, error) { - // https://rtmp.veriskope.com/pdf/video_file_format_spec_v10.pdf - b := make([]byte, 4+11) - if _, err := io.ReadFull(c.rd, b); err != nil { - return 0, 0, nil, err - } - - b = b[4 : 4+11] // skip previous tag size - - tagType := b[0] - size := uint32(b[1])<<16 | uint32(b[2])<<8 | uint32(b[3]) - timeMS := uint32(b[4])<<16 | uint32(b[5])<<8 | uint32(b[6]) | uint32(b[7])<<24 - - b = make([]byte, size) - if _, err := io.ReadFull(c.rd, b); err != nil { - return 0, 0, nil, err - } - - return tagType, timeMS, b, nil -} - -func (c *flv) Close() error { - if closer, ok := c.rd.(io.Closer); ok { - return closer.Close() - } - return nil -} - -func (c *flv) readHeader() error { - b := make([]byte, 9) - if _, err := io.ReadFull(c.rd, b); err != nil { - return err - } - - if string(b[:3]) != "FLV" { - return errors.New("flv: wrong header") - } - - _ = b[4] // flags (skip because unsupported by Reolink cameras) - - if skip := binary.BigEndian.Uint32(b[5:]) - 9; skip > 0 { - if _, err := io.ReadFull(c.rd, make([]byte, skip)); err != nil { - return err - } - } - - return nil -} - -func TimeToRTP(timeMS uint32, clockRate uint32) uint32 { - return timeMS * clockRate / 1000 -} diff --git a/pkg/flv/producer.go b/pkg/flv/producer.go index 3aa3c10a..7f4693e8 100644 --- a/pkg/flv/producer.go +++ b/pkg/flv/producer.go @@ -28,11 +28,11 @@ func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, } func (c *Client) Start() error { - return c.Play() + return c.play() } func (c *Client) Stop() error { - if closer, ok := c.Transport.(io.Closer); ok { + if closer, ok := c.rd.Reader.(io.Closer); ok { return closer.Close() } return nil diff --git a/pkg/magic/mjpeg/client.go b/pkg/magic/mjpeg/client.go index ff8a3278..4a5732a3 100644 --- a/pkg/magic/mjpeg/client.go +++ b/pkg/magic/mjpeg/client.go @@ -79,6 +79,8 @@ func (c *Client) Start() error { } c.receiver.WriteRTP(pkt) + //log.Printf("[mjpeg] ts=%d size=%d", pkt.Header.Timestamp, len(pkt.Payload)) + buf = buf[i:] } } diff --git a/pkg/rtmp/client.go b/pkg/rtmp/client.go index dd7926d8..ba9454b0 100644 --- a/pkg/rtmp/client.go +++ b/pkg/rtmp/client.go @@ -26,25 +26,42 @@ func Dial(rawURL string) (*flv.Client, error) { return nil, err } - tr := &rtmp{ - url: rawURL, - conn: conn, - rd: bufio.NewReaderSize(conn, core.BufferSize), + rd := &rtmp{ + url: rawURL, + headers: map[uint32]*header{}, + conn: conn, + rd: bufio.NewReaderSize(conn, core.BufferSize), } if args := strings.Split(u.Path, "/"); len(args) >= 2 { - tr.app = args[1] + rd.app = args[1] if len(args) >= 3 { - tr.stream = args[2] + rd.stream = args[2] if u.RawQuery != "" { - tr.stream += "?" + u.RawQuery + rd.stream += "?" + u.RawQuery } } } - if err = tr.init(); err != nil { + if err = rd.handshake(); err != nil { + return nil, err + } + if err = rd.sendConfig(); err != nil { + return nil, err + } + if err = rd.sendConnect(); err != nil { + return nil, err + } + if err = rd.sendPlay(); err != nil { return nil, err } - return &flv.Client{Transport: tr, URL: rawURL}, nil + rd.buf = []byte{ + 'F', 'L', 'V', // signature + 1, // version + 0, // flags (has video/audio) + 0, 0, 0, 9, // header size + } + + return flv.Open(rd) } diff --git a/pkg/rtmp/rtmp.go b/pkg/rtmp/rtmp.go index b91516f4..5baf00af 100644 --- a/pkg/rtmp/rtmp.go +++ b/pkg/rtmp/rtmp.go @@ -37,6 +37,55 @@ type rtmp struct { conn net.Conn rd io.Reader + + buf []byte +} + +func (c *rtmp) Read(p []byte) (n int, err error) { + // 1. Check temporary tempbuffer + if len(c.buf) == 0 { + msgType, timeMS, payload, err2 := c.readMessage() + if err2 != nil { + return 0, err2 + } + + payloadSize := len(payload) + + // previous tag size (4 byte) + header (11 byte) + payload + n = 4 + 11 + payloadSize + + // 2. Check if the message fits in the buffer + if n <= len(p) { + encodeFLV(p, msgType, timeMS, uint32(payloadSize), payload) + return + } + + // 3. Put the message into a temporary buffer + c.buf = make([]byte, n) + encodeFLV(c.buf, msgType, timeMS, uint32(payloadSize), payload) + } + + // 4. Send temporary buffer + n = copy(p, c.buf) + c.buf = c.buf[n:] + return +} + +func (c *rtmp) Close() error { + return c.conn.Close() +} + +func encodeFLV(b []byte, msgType byte, time, size uint32, payload []byte) { + b[0] = 0 + b[1] = 0 + b[2] = 0 + b[3] = 0 + b[4+0] = msgType + PutUint24(b[4+1:], size) + PutUint24(b[4+4:], time) + b[4+7] = byte(time >> 24) + + copy(b[4+11:], payload) } type header struct { @@ -45,7 +94,7 @@ type header struct { msgType byte } -func (c *rtmp) ReadTag() (byte, uint32, []byte, error) { +func (c *rtmp) readMessage() (byte, uint32, []byte, error) { hdrType, sid, err := c.readHeader() if err != nil { return 0, 0, nil, err @@ -143,30 +192,6 @@ func (c *rtmp) ReadTag() (byte, uint32, []byte, error) { return hdr.msgType, timeMS, b, nil } -func (c *rtmp) Close() error { - return c.conn.Close() -} - -func (c *rtmp) init() error { - if err := c.handshake(); err != nil { - return err - } - if err := c.sendConfig(); err != nil { - return err - } - - c.headers = map[uint32]*header{} - - if err := c.sendConnect(); err != nil { - return err - } - if err := c.sendPlay(); err != nil { - return err - } - - return nil -} - func (c *rtmp) handshake() error { // simple handshake without real random and check response const randomSize = 4 + 4 + 1528 @@ -236,7 +261,7 @@ func (c *rtmp) sendConnect() error { return err } - s, err := c.waitCode() + s, err := c.waitCode("_result", float64(1)) // result with same ID if err != nil { return err } @@ -258,7 +283,7 @@ func (c *rtmp) sendPlay() error { return err } - args, err := c.waitResponse() + args, err := c.waitResponse("_result", float64(2)) // result with same ID if err != nil { return err } @@ -271,7 +296,7 @@ func (c *rtmp) sendPlay() error { msg = amf.NewWriter() msg.WriteString("play") - msg.WriteNumber(3) + msg.WriteNumber(0) msg.WriteNull() msg.WriteString(c.stream) @@ -279,7 +304,7 @@ func (c *rtmp) sendPlay() error { return err } - s, err := c.waitCode() + s, err := c.waitCode("onStatus", float64(0)) // events has zero transaction ID if err != nil { return err } @@ -354,9 +379,9 @@ func (c *rtmp) readSize(n uint32) ([]byte, error) { return b, nil } -func (c *rtmp) waitResponse() ([]any, error) { +func (c *rtmp) waitResponse(cmd any, tid any) ([]any, error) { for { - msgType, _, b, err := c.ReadTag() + msgType, _, b, err := c.readMessage() if err != nil { return nil, err } @@ -366,13 +391,24 @@ func (c *rtmp) waitResponse() ([]any, error) { c.pktSize = binary.BigEndian.Uint32(b) case MsgCommand: - return amf.NewReader(b).ReadItems() + var v []any + if v, err = amf.NewReader(b).ReadItems(); err != nil { + return nil, err + } + + if len(v) < 4 { + return nil, ErrResponse + } + + if v[0] == cmd && v[1] == tid { + return v, nil + } } } } -func (c *rtmp) waitCode() (string, error) { - args, err := c.waitResponse() +func (c *rtmp) waitCode(cmd any, tid any) (string, error) { + args, err := c.waitResponse(cmd, tid) if err != nil { return "", err }