From f67f6e5b9f90ad03e9a0e3973e9569f7741cb1fb Mon Sep 17 00:00:00 2001 From: Alexey Khit Date: Sat, 19 Aug 2023 16:37:52 +0300 Subject: [PATCH] Rewrite mpegts producer and consumer --- internal/hls/hls.go | 9 +- pkg/aac/aac.go | 17 ++ pkg/aac/adts.go | 40 +++++ pkg/aac/adts_test.go | 27 +++ pkg/aac/rtp.go | 55 +++++- pkg/mpegts/README.md | 13 ++ pkg/mpegts/client.go | 126 ------------- pkg/mpegts/consumer.go | 113 ++++++++++++ pkg/mpegts/demuxer.go | 393 +++++++++++++++++++++++++++++++++++++++++ pkg/mpegts/helpers.go | 112 ------------ pkg/mpegts/muxer.go | 209 ++++++++++++++++++++++ pkg/mpegts/producer.go | 142 ++++++++++++--- pkg/mpegts/reader.go | 297 ------------------------------- pkg/mpegts/ts.go | 225 ----------------------- pkg/mpegts/writer.go | 219 ----------------------- pkg/tapo/client.go | 8 +- pkg/tapo/consumer.go | 18 +- 17 files changed, 993 insertions(+), 1030 deletions(-) create mode 100644 pkg/aac/adts_test.go delete mode 100644 pkg/mpegts/client.go create mode 100644 pkg/mpegts/consumer.go create mode 100644 pkg/mpegts/demuxer.go delete mode 100644 pkg/mpegts/helpers.go create mode 100644 pkg/mpegts/muxer.go delete mode 100644 pkg/mpegts/reader.go delete mode 100644 pkg/mpegts/ts.go delete mode 100644 pkg/mpegts/writer.go diff --git a/internal/hls/hls.go b/internal/hls/hls.go index 3a551200..66e467c9 100644 --- a/internal/hls/hls.go +++ b/internal/hls/hls.go @@ -12,7 +12,6 @@ import ( "github.com/AlexxIT/go2rtc/internal/streams" "github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/mp4" - "github.com/AlexxIT/go2rtc/pkg/mpegts" "github.com/AlexxIT/go2rtc/pkg/tcp" "github.com/rs/zerolog" ) @@ -79,10 +78,10 @@ func handlerStream(w http.ResponseWriter, r *http.Request) { Medias: medias, } } else { - cons = &mpegts.Consumer{ - RemoteAddr: tcp.RemoteAddr(r), - UserAgent: r.UserAgent(), - } + //cons = &mpegts.Consumer{ + // RemoteAddr: tcp.RemoteAddr(r), + // UserAgent: r.UserAgent(), + //} } if err := stream.AddConsumer(cons); err != nil { diff --git a/pkg/aac/aac.go b/pkg/aac/aac.go index 7fc143c1..43fb5ebf 100644 --- a/pkg/aac/aac.go +++ b/pkg/aac/aac.go @@ -53,3 +53,20 @@ func ConfigToCodec(conf []byte) *core.Codec { return codec } + +func DecodeConfig(b []byte) (objType, sampleFreqIdx, channels byte, sampleRate uint32) { + rd := bits.NewReader(b) + + objType = rd.ReadBits8(5) + if objType == 0b11111 { + objType = 32 + rd.ReadBits8(6) + } + + sampleFreqIdx = rd.ReadBits8(4) + if sampleFreqIdx == 0b1111 { + sampleRate = rd.ReadBits(24) + } + + channels = rd.ReadBits8(4) + return +} diff --git a/pkg/aac/adts.go b/pkg/aac/adts.go index e38b4c42..c5553b29 100644 --- a/pkg/aac/adts.go +++ b/pkg/aac/adts.go @@ -57,5 +57,45 @@ func ADTSToCodec(b []byte) *core.Codec { func ReadADTSSize(b []byte) uint16 { // AAAAAAAA AAAABCCD EEFFFFGH HHIJKLMM MMMMMMMM MMMOOOOO OOOOOOPP (QQQQQQQQ QQQQQQQQ) + _ = b[5] // bounds return uint16(b[3]&0x03)<<(8+3) | uint16(b[4])<<3 | uint16(b[5]>>5) } + +func WriteADTSSize(b []byte, size uint16) { + // AAAAAAAA AAAABCCD EEFFFFGH HHIJKLMM MMMMMMMM MMMOOOOO OOOOOOPP (QQQQQQQQ QQQQQQQQ) + _ = b[5] // bounds + b[3] |= byte(size >> (8 + 3)) + b[4] = byte(size >> 3) + b[5] |= byte(size << 5) + return +} + +func CodecToADTS(codec *core.Codec) []byte { + s := core.Between(codec.FmtpLine, "config=", ";") + conf, err := hex.DecodeString(s) + if err != nil { + return nil + } + + objType, sampleFreqIdx, channels, _ := DecodeConfig(conf) + profile := objType - 1 + + wr := bits.NewWriter(nil) + wr.WriteAllBits(1, 12) // Syncword, all bits must be set to 1 + wr.WriteBit(0) // MPEG Version, set to 0 for MPEG-4 and 1 for MPEG-2 + wr.WriteBits8(0, 2) // Layer, always set to 0 + wr.WriteBit(1) // Protection absence, set to 1 if there is no CRC and 0 if there is CRC + wr.WriteBits8(profile, 2) // Profile, the MPEG-4 Audio Object Type minus 1 + wr.WriteBits8(sampleFreqIdx, 4) // MPEG-4 Sampling Frequency Index + wr.WriteBit(0) // Private bit, guaranteed never to be used by MPEG, set to 0 when encoding, ignore when decoding + wr.WriteBits8(channels, 3) // MPEG-4 Channel Configuration + wr.WriteBit(0) // Originality, set to 1 to signal originality of the audio and 0 otherwise + wr.WriteBit(0) // Home, set to 1 to signal home usage of the audio and 0 otherwise + wr.WriteBit(0) // Copyright ID bit + wr.WriteBit(0) // Copyright ID start + wr.WriteBits16(0, 13) // Frame length + wr.WriteAllBits(1, 11) // Buffer fullness (variable bitrate) + wr.WriteBits8(0, 2) // Number of AAC frames (Raw Data Blocks) in ADTS frame minus 1 + + return wr.Bytes() +} diff --git a/pkg/aac/adts_test.go b/pkg/aac/adts_test.go new file mode 100644 index 00000000..7b4d69f6 --- /dev/null +++ b/pkg/aac/adts_test.go @@ -0,0 +1,27 @@ +package aac + +import ( + "encoding/hex" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestADTS(t *testing.T) { + // FFmpeg MPEG-TS AAC (one packet) + s := "fff15080021ffc210049900219002380fff15080021ffc212049900219002380" //... + src, err := hex.DecodeString(s) + require.Nil(t, err) + + codec := ADTSToCodec(src) + require.Equal(t, uint32(44100), codec.ClockRate) + require.Equal(t, uint16(2), codec.Channels) + + size := ReadADTSSize(src) + require.Equal(t, uint16(16), size) + + dst := CodecToADTS(codec) + WriteADTSSize(dst, size) + + require.Equal(t, src[:len(dst)], dst) +} diff --git a/pkg/aac/rtp.go b/pkg/aac/rtp.go index 50f713f6..dd9de0e8 100644 --- a/pkg/aac/rtp.go +++ b/pkg/aac/rtp.go @@ -77,16 +77,55 @@ func RTPPay(handler core.HandlerFunc) core.HandlerFunc { } } -func ADTStoRTP(b []byte) []byte { - header := make([]byte, 2) - for i := 0; i < len(b); { - auSize := ReadADTSSize(b[i:]) - header = append(header, byte(auSize>>5), byte(auSize<<3)) // size in bits +func ADTStoRTP(src []byte) (dst []byte) { + dst = make([]byte, 2) // header bytes + for i := 0; i < len(src); { + auSize := ReadADTSSize(src[i:]) + dst = append(dst, byte(auSize>>5), byte(auSize<<3)) // size in bits i += int(auSize) } - hdrSize := uint16(len(header) - 2) - binary.BigEndian.PutUint16(header, hdrSize<<3) // size in bits - return append(header, b...) + hdrSize := uint16(len(dst) - 2) + binary.BigEndian.PutUint16(dst, hdrSize<<3) // size in bits + return append(dst, src...) +} + +func RTPTimeSize(b []byte) uint32 { + // convert RTP header size to units count + units := binary.BigEndian.Uint16(b) >> 4 + return 1024 * uint32(units) +} + +func RTPToADTS(codec *core.Codec, handler core.HandlerFunc) core.HandlerFunc { + adts := CodecToADTS(codec) + + return func(packet *rtp.Packet) { + src := packet.Payload + dst := make([]byte, 0, len(src)) + + headersSize := binary.BigEndian.Uint16(src) >> 3 + headers := src[2 : 2+headersSize] + units := src[2+headersSize:] + + for len(headers) > 0 { + unitSize := binary.BigEndian.Uint16(headers) >> 3 + headers = headers[2:] + unit := units[:unitSize] + units = units[unitSize:] + + if !IsADTS(unit) { + i := len(dst) + dst = append(dst, adts...) + WriteADTSSize(dst[i:], ADTSHeaderSize+uint16(len(unit))) + } + + dst = append(dst, unit...) + } + + clone := *packet + clone.Version = RTPPacketVersionAAC + clone.Payload = dst + handler(&clone) + } } func RTPToCodec(b []byte) *core.Codec { diff --git a/pkg/mpegts/README.md b/pkg/mpegts/README.md index 1b00017a..65fc7a8b 100644 --- a/pkg/mpegts/README.md +++ b/pkg/mpegts/README.md @@ -1,3 +1,16 @@ +## MPEG-TS + +FFmpeg: +- PMTID=4096 +- H264: PESID=256, StreamType=27, StreamID=224 +- H265: PESID=256, StreamType=36, StreamID=224 +- AAC: PESID=257, StreamType=15, StreamID=192 + +Tapo: +- PMTID=18 +- H264: PESID=68, StreamType=27, StreamID=224 +- AAC: PESID=69, StreamType=144, StreamID=192 + ## Useful links - https://github.com/theREDspace/video-onboarding/blob/main/MPEGTS%20Knowledge.md diff --git a/pkg/mpegts/client.go b/pkg/mpegts/client.go deleted file mode 100644 index 6d925276..00000000 --- a/pkg/mpegts/client.go +++ /dev/null @@ -1,126 +0,0 @@ -package mpegts - -import ( - "bytes" - "io" - "time" - - "github.com/AlexxIT/go2rtc/pkg/aac" - "github.com/AlexxIT/go2rtc/pkg/core" - "github.com/AlexxIT/go2rtc/pkg/h264" - "github.com/AlexxIT/go2rtc/pkg/h265" -) - -type Client struct { - URL string - - rd *core.ReadBuffer - - medias []*core.Media - receivers []*core.Receiver - - recv int -} - -func Open(rd io.Reader) (*Client, error) { - client := &Client{rd: core.NewReadBuffer(rd)} - if err := client.describe(); err != nil { - return nil, err - } - return client, nil -} - -func (c *Client) describe() error { - c.rd.BufferSize = core.ProbeSize - defer c.rd.Reset() - - rd := NewReader() - - // Strategy: - // 1. Wait packet with metadata, init other packets for wait - // 2. Wait other packets - // 3. Stop after timeout - waitType := []byte{metadataType} - timeout := time.Now().Add(core.ProbeTimeout) - - for len(waitType) != 0 && time.Now().Before(timeout) { - pkt, err := rd.ReadPacket(c.rd) - if err != nil { - return err - } - - // check if we wait this type - if i := bytes.IndexByte(waitType, pkt.PayloadType); i < 0 { - continue - } else { - waitType = append(waitType[:i], waitType[i+1:]...) - } - - switch pkt.PayloadType { - case metadataType: - for _, streamType := range pkt.Payload { - switch streamType { - case StreamTypeH264, StreamTypeH265, StreamTypeAAC: - waitType = append(waitType, streamType) - } - } - - case StreamTypeH264: - codec := h264.AVCCToCodec(pkt.Payload) - media := &core.Media{ - Kind: core.KindVideo, - Direction: core.DirectionRecvonly, - Codecs: []*core.Codec{codec}, - } - c.medias = append(c.medias, media) - - case StreamTypeH265: - codec := h265.AVCCToCodec(pkt.Payload) - media := &core.Media{ - Kind: core.KindVideo, - Direction: core.DirectionRecvonly, - Codecs: []*core.Codec{codec}, - } - c.medias = append(c.medias, media) - - case StreamTypeAAC: - codec := aac.RTPToCodec(pkt.Payload) - media := &core.Media{ - Kind: core.KindAudio, - Direction: core.DirectionRecvonly, - Codecs: []*core.Codec{codec}, - } - c.medias = append(c.medias, media) - } - } - - return nil -} - -func (c *Client) play() error { - rd := NewReader() - - for { - pkt, err := rd.ReadPacket(c.rd) - if err != nil { - return err - } - - //log.Printf("[mpegts] size: %6d, ts: %10d, pt: %2d", len(pkt.Payload), pkt.Timestamp, pkt.PayloadType) - - for _, receiver := range c.receivers { - if receiver.ID == pkt.PayloadType { - pkt.Timestamp = PTSToTimestamp(pkt.Timestamp, receiver.Codec.ClockRate) - receiver.WriteRTP(pkt) - break - } - } - } -} - -func (c *Client) Close() error { - if closer, ok := c.rd.Reader.(io.Closer); ok { - return closer.Close() - } - return nil -} diff --git a/pkg/mpegts/consumer.go b/pkg/mpegts/consumer.go new file mode 100644 index 00000000..13a98d09 --- /dev/null +++ b/pkg/mpegts/consumer.go @@ -0,0 +1,113 @@ +package mpegts + +import ( + "io" + + "github.com/AlexxIT/go2rtc/pkg/aac" + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/h264" + "github.com/AlexxIT/go2rtc/pkg/h265" + "github.com/pion/rtp" +) + +type Consumer struct { + core.SuperConsumer + muxer *Muxer + wr *core.WriteBuffer +} + +func NewConsumer() *Consumer { + c := &Consumer{ + muxer: NewMuxer(), + wr: core.NewWriteBuffer(nil), + } + c.Medias = []*core.Media{ + { + Kind: core.KindVideo, + Direction: core.DirectionSendonly, + Codecs: []*core.Codec{ + {Name: core.CodecH264}, + {Name: core.CodecH265}, + }, + }, + { + Kind: core.KindAudio, + Direction: core.DirectionSendonly, + Codecs: []*core.Codec{ + {Name: core.CodecAAC}, + }, + }, + } + return c +} + +func (c *Consumer) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error { + sender := core.NewSender(media, track.Codec) + + switch track.Codec.Name { + case core.CodecH264: + pid := c.muxer.AddTrack(StreamTypeH264) + + sender.Handler = func(pkt *rtp.Packet) { + b := c.muxer.GetPayload(pid, pkt.Timestamp, pkt.Payload) + if n, err := c.wr.Write(b); err == nil { + c.Send += n + } + } + + if track.Codec.IsRTP() { + sender.Handler = h264.RTPDepay(track.Codec, sender.Handler) + } else { + sender.Handler = h264.RepairAVCC(track.Codec, sender.Handler) + } + + case core.CodecH265: + pid := c.muxer.AddTrack(StreamTypeH265) + + sender.Handler = func(pkt *rtp.Packet) { + b := c.muxer.GetPayload(pid, pkt.Timestamp, pkt.Payload) + if n, err := c.wr.Write(b); err == nil { + c.Send += n + } + } + + if track.Codec.IsRTP() { + sender.Handler = h265.RTPDepay(track.Codec, sender.Handler) + } + + case core.CodecAAC: + pid := c.muxer.AddTrack(StreamTypeAAC) + + sender.Handler = func(pkt *rtp.Packet) { + pts := pkt.Timestamp * 90000 / track.Codec.ClockRate + b := c.muxer.GetPayload(pid, pts, pkt.Payload) + if n, err := c.wr.Write(b); err == nil { + c.Send += n + } + } + + if track.Codec.IsRTP() { + sender.Handler = aac.RTPToADTS(track.Codec, sender.Handler) + } else { + panic("todo") + } + } + + sender.HandleRTP(track) + c.Senders = append(c.Senders, sender) + return nil +} + +func (c *Consumer) WriteTo(wr io.Writer) (int64, error) { + b := c.muxer.GetHeader() + if _, err := wr.Write(b); err != nil { + return 0, err + } + + return c.wr.WriteTo(wr) +} + +func (c *Consumer) Close() error { + _ = c.SuperConsumer.Close() + return c.wr.Close() +} diff --git a/pkg/mpegts/demuxer.go b/pkg/mpegts/demuxer.go new file mode 100644 index 00000000..aade3e0e --- /dev/null +++ b/pkg/mpegts/demuxer.go @@ -0,0 +1,393 @@ +package mpegts + +import ( + "errors" + "io" + + "github.com/AlexxIT/go2rtc/pkg/aac" + "github.com/AlexxIT/go2rtc/pkg/bits" + "github.com/AlexxIT/go2rtc/pkg/h264/annexb" + "github.com/pion/rtp" +) + +type Demuxer struct { + buf [PacketSize]byte // total buf + + byte byte // current byte + bits byte // bits left in byte + pos byte // current pos in buf + end byte // end position + + pmtID uint16 // Program Map Table (PMT) PID + pes map[uint16]*PES +} + +func NewDemuxer() *Demuxer { + return &Demuxer{} +} + +const skipRead = 0xFF + +func (d *Demuxer) ReadPacket(rd io.Reader) (*rtp.Packet, error) { + for { + if d.pos != skipRead { + if _, err := io.ReadFull(rd, d.buf[:]); err != nil { + return nil, err + } + } + + pid, start, err := d.readPacketHeader() + if err != nil { + return nil, err + } + + if d.pes == nil { + switch pid { + case 0: // PAT ID + d.readPAT() // PAT: Program Association Table + case d.pmtID: + d.readPMT() // PMT : Program Map Table + + pkt := &rtp.Packet{ + Payload: make([]byte, 0, len(d.pes)), + } + for _, pes := range d.pes { + pkt.Payload = append(pkt.Payload, pes.StreamType) + } + return pkt, nil + } + continue + } + + if pkt := d.readPES(pid, start); pkt != nil { + return pkt, nil + } + } +} + +func (d *Demuxer) readPacketHeader() (pid uint16, start bool, err error) { + d.reset() + + sb := d.readByte() // Sync byte + if sb != SyncByte { + return 0, false, errors.New("mpegts: wrong sync byte") + } + + _ = d.readBit() // Transport error indicator (TEI) + pusi := d.readBit() // Payload unit start indicator (PUSI) + _ = d.readBit() // Transport priority + pid = d.readBits16(13) // PID + + _ = d.readBits(2) // Transport scrambling control (TSC) + af := d.readBit() // Adaptation field + _ = d.readBit() // Payload + _ = d.readBits(4) // Continuity counter + + if af != 0 { + adSize := d.readByte() // Adaptation field length + if adSize > PacketSize-6 { + return 0, false, errors.New("mpegts: wrong adaptation size") + } + d.skip(adSize) + } + + return pid, pusi != 0, nil +} + +func (d *Demuxer) skip(i byte) { + d.pos += i +} + +func (d *Demuxer) readPSIHeader() { + // https://en.wikipedia.org/wiki/Program-specific_information#Table_Sections + pointer := d.readByte() // Pointer field + d.skip(pointer) // Pointer filler bytes + + _ = d.readByte() // Table ID + _ = d.readBit() // Section syntax indicator + _ = d.readBit() // Private bit + _ = d.readBits(2) // Reserved bits + _ = d.readBits(2) // Section length unused bits + size := d.readBits(10) // Section length + d.setSize(byte(size)) + + _ = d.readBits(16) // Table ID extension + _ = d.readBits(2) // Reserved bits + _ = d.readBits(5) // Version number + _ = d.readBit() // Current/next indicator + _ = d.readByte() // Section number + _ = d.readByte() // Last section number +} + +// ReadPAT (Program Association Table) +func (d *Demuxer) readPAT() { + // https://en.wikipedia.org/wiki/Program-specific_information#PAT_(Program_Association_Table) + d.readPSIHeader() + + const CRCSize = 4 + for d.left() > CRCSize { + num := d.readBits(16) // Program num + _ = d.readBits(3) // Reserved bits + pid := d.readBits16(13) // Program map PID + if num != 0 { + d.pmtID = pid + } + } + + d.skip(4) // CRC32 +} + +// ReadPMT (Program map specific data) +func (d *Demuxer) readPMT() { + // https://en.wikipedia.org/wiki/Program-specific_information#PMT_(Program_map_specific_data) + d.readPSIHeader() + + _ = d.readBits(3) // Reserved bits + _ = d.readBits(13) // PCR PID + _ = d.readBits(4) // Reserved bits + _ = d.readBits(2) // Program info length unused bits + size := d.readBits(10) // Program info length + d.skip(byte(size)) + + d.pes = map[uint16]*PES{} + + const CRCSize = 4 + for d.left() > CRCSize { + streamType := d.readByte() // Stream type + _ = d.readBits(3) // Reserved bits + pid := d.readBits16(13) // Elementary PID + _ = d.readBits(4) // Reserved bits + _ = d.readBits(2) // ES Info length unused bits + size = d.readBits(10) // ES Info length + d.skip(byte(size)) + + d.pes[pid] = &PES{StreamType: streamType} + } + + d.skip(4) // CRC32 +} + +func (d *Demuxer) readPES(pid uint16, start bool) *rtp.Packet { + pes := d.pes[pid] + if pes == nil { + return nil + } + + // if new payload beging + if start { + if pes.Payload != nil { + d.pos = skipRead + return pes.GetPacket() // finish previous packet + } + + // https://en.wikipedia.org/wiki/Packetized_elementary_stream + // Packet start code prefix + if d.readByte() != 0 || d.readByte() != 0 || d.readByte() != 1 { + return nil + } + + pes.StreamID = d.readByte() // Stream id + packetSize := d.readBits16(16) // PES Packet length + + _ = d.readBits(2) // Marker bits + _ = d.readBits(2) // Scrambling control + _ = d.readBit() // Priority + _ = d.readBit() // Data alignment indicator + _ = d.readBit() // Copyright + _ = d.readBit() // Original or Copy + + pts := d.readBit() // PTS indicator + _ = d.readBit() // DTS indicator + _ = d.readBit() // ESCR flag + _ = d.readBit() // ES rate flag + _ = d.readBit() // DSM trick mode flag + _ = d.readBit() // Additional copy info flag + _ = d.readBit() // CRC flag + _ = d.readBit() // extension flag + + headerSize := d.readByte() // PES header length + + //log.Printf("[mpegts] pes=%d size=%d header=%d", pes.StreamID, packetSize, headerSize) + + if packetSize != 0 { + packetSize -= uint16(3 + headerSize) + } + + if pts != 0 { + pes.PTS = d.readTime() + headerSize -= 5 + } + + d.skip(headerSize) + + pes.SetBuffer(packetSize, d.bytes()) + } else { + pes.AppendBuffer(d.bytes()) + } + + if pes.Size != 0 && len(pes.Payload) >= pes.Size { + return pes.GetPacket() // finish current packet + } + + return nil +} + +func (d *Demuxer) reset() { + d.pos = 0 + d.end = PacketSize + d.bits = 0 +} + +//goland:noinspection GoStandardMethods +func (d *Demuxer) readByte() byte { + if d.bits != 0 { + return byte(d.readBits(8)) + } + + b := d.buf[d.pos] + d.pos++ + return b +} + +func (d *Demuxer) readBit() byte { + if d.bits == 0 { + d.byte = d.readByte() + d.bits = 7 + } else { + d.bits-- + } + + return (d.byte >> d.bits) & 0b1 +} + +func (d *Demuxer) readBits(n byte) (res uint32) { + for i := n - 1; i != 255; i-- { + res |= uint32(d.readBit()) << i + } + return +} + +func (d *Demuxer) readBits16(n byte) (res uint16) { + for i := n - 1; i != 255; i-- { + res |= uint16(d.readBit()) << i + } + return +} + +func (d *Demuxer) readTime() uint32 { + // https://en.wikipedia.org/wiki/Packetized_elementary_stream + // xxxxAAAx BBBBBBBB BBBBBBBx CCCCCCCC CCCCCCCx + _ = d.readBits(4) // 0010b or 0011b or 0001b + ts := d.readBits(3) << 30 + _ = d.readBits(1) // 1b + ts |= d.readBits(15) << 15 + _ = d.readBits(1) // 1b + ts |= d.readBits(15) + _ = d.readBits(1) // 1b + return ts +} + +func (d *Demuxer) bytes() []byte { + return d.buf[d.pos:PacketSize] +} + +func (d *Demuxer) left() byte { + return d.end - d.pos +} + +func (d *Demuxer) setSize(size byte) { + d.end = d.pos + size +} + +const ( + PacketSize = 188 + SyncByte = 0x47 // Uppercase G +) + +// https://en.wikipedia.org/wiki/Program-specific_information#Elementary_stream_types +const ( + StreamTypeMetadata = 0 // Reserved + StreamTypePrivate = 0x06 // PCMU or PCMA or FLAC from FFmpeg + StreamTypeAAC = 0x0F + StreamTypeH264 = 0x1B + StreamTypeH265 = 0x24 + StreamTypePCMATapo = 0x90 +) + +// PES - Packetized Elementary Stream +type PES struct { + StreamID byte // from each PES header + StreamType byte // from PMT table + Sequence uint16 // manual + Timestamp uint32 // manual + PTS uint32 // from PTS extra header, always 90000Hz + Payload []byte // from PTS body + Size int // from PTS header, can be 0 + + wr *bits.Writer +} + +func (p *PES) SetBuffer(size uint16, b []byte) { + p.Payload = make([]byte, 0, size) + p.Payload = append(p.Payload, b...) + p.Size = int(size) +} + +func (p *PES) AppendBuffer(b []byte) { + p.Payload = append(p.Payload, b...) +} + +func (p *PES) GetPacket() (pkt *rtp.Packet) { + switch p.StreamType { + case StreamTypeH264, StreamTypeH265: + pkt = &rtp.Packet{ + Header: rtp.Header{ + PayloadType: p.StreamType, + Timestamp: p.PTS, // PTS is ok, because 90000Hz + }, + Payload: annexb.EncodeToAVCC(p.Payload, false), + } + + case StreamTypeAAC: + p.Sequence++ + + pkt = &rtp.Packet{ + Header: rtp.Header{ + Version: 2, + PayloadType: p.StreamType, + SequenceNumber: p.Sequence, + Timestamp: p.Timestamp, + }, + Payload: aac.ADTStoRTP(p.Payload), + } + + p.Timestamp += aac.RTPTimeSize(pkt.Payload) // update next timestamp! + + case StreamTypePCMATapo: + p.Sequence++ + + pkt = &rtp.Packet{ + Header: rtp.Header{ + Version: 2, + PayloadType: p.StreamType, + SequenceNumber: p.Sequence, + Timestamp: p.Timestamp, + }, + Payload: p.Payload, + } + + p.Timestamp += uint32(len(p.Payload)) // update next timestamp! + } + + p.Payload = nil + + return +} + +// PTSToTimestamp - convert PTS from 90000 to custom clock rate +//func PTSToTimestamp(pts, clockRate uint32) uint32 { +// if clockRate == 90000 { +// return pts +// } +// return uint32(uint64(pts) * uint64(clockRate) / 90000) +//} diff --git a/pkg/mpegts/helpers.go b/pkg/mpegts/helpers.go deleted file mode 100644 index 31c481f5..00000000 --- a/pkg/mpegts/helpers.go +++ /dev/null @@ -1,112 +0,0 @@ -package mpegts - -import ( - "github.com/AlexxIT/go2rtc/pkg/aac" - "github.com/AlexxIT/go2rtc/pkg/core" - "github.com/AlexxIT/go2rtc/pkg/h264/annexb" - "github.com/pion/rtp" -) - -const ( - PacketSize = 188 - SyncByte = 0x47 // Uppercase G -) - -// https://en.wikipedia.org/wiki/Program-specific_information#Elementary_stream_types -const ( - metadataType = 0 - StreamTypePrivate = 0x06 // PCMU or PCMA or FLAC from FFmpeg - StreamTypeAAC = 0x0F - StreamTypeH264 = 0x1B - StreamTypeH265 = 0x24 - StreamTypePCMATapo = 0x90 -) - -// PES - Packetized Elementary Stream -type PES struct { - StreamType byte - StreamID byte - Payload []byte - Size int - PTS uint32 // PTS always 90000Hz - - Sequence uint16 - - decodeStream func([]byte) ([]byte, int) -} - -func (p *PES) SetBuffer(size uint16, b []byte) { - p.Payload = make([]byte, 0, size) - p.Payload = append(p.Payload, b...) - p.Size = int(size) -} - -func (p *PES) AppendBuffer(b []byte) { - p.Payload = append(p.Payload, b...) -} - -func (p *PES) GetPacket() (pkt *rtp.Packet) { - switch p.StreamType { - case StreamTypeH264, StreamTypeH265: - pkt = &rtp.Packet{ - Header: rtp.Header{ - PayloadType: p.StreamType, - Timestamp: p.PTS, - }, - Payload: annexb.EncodeToAVCC(p.Payload, false), - } - - case StreamTypeAAC: - p.Sequence++ - - pkt = &rtp.Packet{ - Header: rtp.Header{ - Version: 2, - PayloadType: p.StreamType, - SequenceNumber: p.Sequence, - Timestamp: p.PTS, - }, - Payload: aac.ADTStoRTP(p.Payload), - } - - case StreamTypePCMATapo: - p.Sequence++ - p.PTS += uint32(len(p.Payload)) - - pkt = &rtp.Packet{ - Header: rtp.Header{ - Version: 2, - PayloadType: p.StreamType, - SequenceNumber: p.Sequence, - Timestamp: p.PTS, - }, - Payload: p.Payload, - } - } - - p.Payload = nil - - return -} - -func StreamType(codec *core.Codec) uint8 { - switch codec.Name { - case core.CodecH264: - return StreamTypeH264 - case core.CodecH265: - return StreamTypeH265 - case core.CodecAAC: - return StreamTypeAAC - case core.CodecPCMA: - return StreamTypePCMATapo - } - return 0 -} - -// PTSToTimestamp - convert PTS from 90000 to custom clock rate -func PTSToTimestamp(pts, clockRate uint32) uint32 { - if clockRate == 90000 { - return pts - } - return uint32(uint64(pts) * uint64(clockRate) / 90000) -} diff --git a/pkg/mpegts/muxer.go b/pkg/mpegts/muxer.go new file mode 100644 index 00000000..39c96ac6 --- /dev/null +++ b/pkg/mpegts/muxer.go @@ -0,0 +1,209 @@ +package mpegts + +import ( + "encoding/binary" + + "github.com/AlexxIT/go2rtc/pkg/bits" + "github.com/AlexxIT/go2rtc/pkg/h264/annexb" +) + +type Muxer struct { + pes map[uint16]*PES +} + +func NewMuxer() *Muxer { + return &Muxer{ + pes: map[uint16]*PES{}, + } +} + +func (m *Muxer) AddTrack(streamType byte) (pid uint16) { + pes := &PES{StreamType: streamType} + + // Audio streams (0xC0-0xDF), Video streams (0xE0-0xEF) + switch streamType { + case StreamTypeH264, StreamTypeH265: + pes.StreamID = 0xE0 + case StreamTypeAAC, StreamTypePCMATapo: + pes.StreamID = 0xC0 + } + + pid = startPID + 1 + uint16(len(m.pes)) + m.pes[pid] = pes + + return +} + +func (m *Muxer) GetHeader() []byte { + bw := bits.NewWriter(nil) + m.writePAT(bw) + m.writePMT(bw) + return bw.Bytes() +} + +// GetPayload - safe to run concurently with different pid +func (m *Muxer) GetPayload(pid uint16, pts uint32, payload []byte) []byte { + pes := m.pes[pid] + + b := make([]byte, 14+len(payload)) + _ = b[14] // bounds + b[0] = 0 + b[1] = 0 + b[2] = 1 + b[3] = pes.StreamID + binary.BigEndian.PutUint16(b[4:], 8+uint16(len(payload))) + b[6] = 0x80 // Marker bits (binary) + b[7] = 0x80 // PTS indicator + b[8] = 5 // PES header length + WriteTime(b[9:], pts) + copy(b[14:], payload) + + switch pes.StreamType { + case StreamTypeH264, StreamTypeH265: + annexb.DecodeAVCC(b[14:], false) // no need to safe clone after copy + } + + pes.Payload = b + pes.Size = 1 // set PUSI in first PES + + if pes.wr == nil { + pes.wr = bits.NewWriter(nil) + } else { + pes.wr.Reset() + } + + for len(pes.Payload) > 0 { + m.writePES(pes.wr, pid, pes) + pes.Sequence++ + pes.Size = 0 + } + + return pes.wr.Bytes() +} + +const patPID = 0 +const startPID = 0x20 + +func (m *Muxer) writePAT(wr *bits.Writer) { + m.writeHeader(wr, patPID) + i := wr.Len() + 1 // start for CRC32 + m.writePSIHeader(wr, 0, 4) + + wr.WriteUint16(1) // Program num + wr.WriteBits8(0b111, 3) // Reserved bits (all to 1) + wr.WriteBits16(startPID, 13) // Program map PID + + crc := checksum(wr.Bytes()[i:]) + wr.WriteBytes(byte(crc), byte(crc>>8), byte(crc>>16), byte(crc>>24)) // CRC32 (little endian) + + m.WriteTail(wr) +} + +func (m *Muxer) writePMT(wr *bits.Writer) { + m.writeHeader(wr, startPID) + i := wr.Len() + 1 // start for CRC32 + m.writePSIHeader(wr, 2, 4+uint16(len(m.pes))*5) // 4 bytes below + 5 bytes each PES + + wr.WriteBits8(0b111, 3) // Reserved bits (all to 1) + wr.WriteBits16(0x1FFF, 13) // Program map PID (not used) + + wr.WriteBits8(0b1111, 4) // Reserved bits (all to 1) + wr.WriteBits8(0, 2) // Program info length unused bits (all to 0) + wr.WriteBits16(0, 10) // Program info length + + for pid, pes := range m.pes { + wr.WriteByte(pes.StreamType) // Stream type + wr.WriteBits8(0b111, 3) // Reserved bits (all to 1) + wr.WriteBits16(pid, 13) // Elementary PID + wr.WriteBits8(0b1111, 4) // Reserved bits (all to 1) + wr.WriteBits(0, 2) // ES Info length unused bits + wr.WriteBits16(0, 10) // ES Info length + } + + crc := checksum(wr.Bytes()[i:]) + wr.WriteBytes(byte(crc), byte(crc>>8), byte(crc>>16), byte(crc>>24)) // CRC32 (little endian) + + m.WriteTail(wr) +} + +func (m *Muxer) writePES(wr *bits.Writer, pid uint16, pes *PES) { + const flagPUSI = 0b01000000_00000000 + const flagAdaptation = 0b00100000 + const flagPayload = 0b00010000 + + wr.WriteByte(SyncByte) + + if pes.Size != 0 { + pid |= flagPUSI // Payload unit start indicator (PUSI) + } + + wr.WriteUint16(pid) + + counter := byte(pes.Sequence) & 0xF + + if size := len(pes.Payload); size < PacketSize-4 { + wr.WriteByte(flagAdaptation | flagPayload | counter) // adaptation + payload + + // for 183 payload will be zero + adSize := PacketSize - 4 - 1 - byte(size) + wr.WriteByte(adSize) + wr.WriteBytes(make([]byte, adSize)...) + + wr.WriteBytes(pes.Payload...) + pes.Payload = nil + } else { + wr.WriteByte(flagPayload | counter) // only payload + + wr.WriteBytes(pes.Payload[:PacketSize-4]...) + pes.Payload = pes.Payload[PacketSize-4:] + } +} + +func (m *Muxer) writeHeader(wr *bits.Writer, pid uint16) { + wr.WriteByte(SyncByte) + + wr.WriteBit(0) // Transport error indicator (TEI) + wr.WriteBit(1) // Payload unit start indicator (PUSI) + wr.WriteBit(0) // Transport priority + wr.WriteBits16(pid, 13) // PID + + wr.WriteBits8(0, 2) // Transport scrambling control (TSC) + wr.WriteBit(0) // Adaptation field + wr.WriteBit(1) // Payload + wr.WriteBits8(0, 4) // Continuity counter +} + +func (m *Muxer) writePSIHeader(wr *bits.Writer, tableID byte, size uint16) { + wr.WriteByte(0) // Pointer field + + wr.WriteByte(tableID) // Table ID + + wr.WriteBit(1) // Section syntax indicator + wr.WriteBit(0) // Private bit + wr.WriteBits8(0b11, 2) // Reserved bits (all to 1) + wr.WriteBits8(0, 2) // Section length unused bits (all to 0) + wr.WriteBits16(5+size+4, 10) // Section length (5 bytes below + content + 4 bytes CRC32) + + wr.WriteUint16(1) // Table ID extension + wr.WriteBits8(0b11, 2) // Reserved bits (all to 1) + wr.WriteBits8(0, 5) // Version number + wr.WriteBit(1) // Current/next indicator + + wr.WriteByte(0) // Section number + wr.WriteByte(0) // Last section number +} + +func (m *Muxer) WriteTail(wr *bits.Writer) { + size := PacketSize - wr.Len()%PacketSize + wr.WriteBytes(make([]byte, size)...) +} + +func WriteTime(b []byte, t uint32) { + _ = b[4] // bounds + const onlyPTS = 0x20 + b[0] = onlyPTS | byte(t>>(32-3)) | 1 + b[1] = byte(t >> (24 - 2)) + b[2] = byte(t>>(16-2)) | 1 + b[3] = byte(t >> (8 - 1)) + b[4] = byte(t<<1) | 1 // t>>(0-1) +} diff --git a/pkg/mpegts/producer.go b/pkg/mpegts/producer.go index 2a7dc6c6..2c8f5347 100644 --- a/pkg/mpegts/producer.go +++ b/pkg/mpegts/producer.go @@ -1,45 +1,137 @@ package mpegts import ( - "encoding/json" + "bytes" + "io" + "time" + "github.com/AlexxIT/go2rtc/pkg/aac" "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/h264" + "github.com/AlexxIT/go2rtc/pkg/h265" ) -func (c *Client) GetMedias() []*core.Media { - return c.medias +type Producer struct { + core.SuperProducer + rd *core.ReadBuffer } -func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { - for _, track := range c.receivers { - if track.Codec == codec { - return track, nil +func Open(rd io.Reader) (*Producer, error) { + prod := &Producer{rd: core.NewReadBuffer(rd)} + if err := prod.probe(); err != nil { + return nil, err + } + return prod, nil +} + +func (c *Producer) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { + receiver, _ := c.SuperProducer.GetTrack(media, codec) + receiver.ID = StreamType(codec) + return receiver, nil +} + +func (c *Producer) Start() error { + rd := NewDemuxer() + + for { + pkt, err := rd.ReadPacket(c.rd) + if err != nil { + return err + } + + //log.Printf("[mpegts] size: %6d, muxer: %10d, pt: %2d", len(pkt.Payload), pkt.Timestamp, pkt.PayloadType) + + for _, receiver := range c.Receivers { + if receiver.ID == pkt.PayloadType { + receiver.WriteRTP(pkt) + break + } } } - track := core.NewReceiver(media, codec) - track.ID = StreamType(codec) - c.receivers = append(c.receivers, track) - return track, nil } -func (c *Client) Start() error { - return c.play() +func (c *Producer) Stop() error { + _ = c.SuperProducer.Close() + return c.rd.Close() } -func (c *Client) Stop() error { - for _, receiver := range c.receivers { - receiver.Close() +func (c *Producer) probe() error { + c.rd.BufferSize = core.ProbeSize + defer c.rd.Reset() + + rd := NewDemuxer() + + // Strategy: + // 1. Wait packet with metadata, init other packets for wait + // 2. Wait other packets + // 3. Stop after timeout + waitType := []byte{StreamTypeMetadata} + timeout := time.Now().Add(core.ProbeTimeout) + + for len(waitType) != 0 && time.Now().Before(timeout) { + pkt, err := rd.ReadPacket(c.rd) + if err != nil { + return err + } + + // check if we wait this type + if i := bytes.IndexByte(waitType, pkt.PayloadType); i < 0 { + continue + } else { + waitType = append(waitType[:i], waitType[i+1:]...) + } + + switch pkt.PayloadType { + case StreamTypeMetadata: + for _, streamType := range pkt.Payload { + switch streamType { + case StreamTypeH264, StreamTypeH265, StreamTypeAAC: + waitType = append(waitType, streamType) + } + } + + case StreamTypeH264: + codec := h264.AVCCToCodec(pkt.Payload) + media := &core.Media{ + Kind: core.KindVideo, + Direction: core.DirectionRecvonly, + Codecs: []*core.Codec{codec}, + } + c.Medias = append(c.Medias, media) + + case StreamTypeH265: + codec := h265.AVCCToCodec(pkt.Payload) + media := &core.Media{ + Kind: core.KindVideo, + Direction: core.DirectionRecvonly, + Codecs: []*core.Codec{codec}, + } + c.Medias = append(c.Medias, media) + + case StreamTypeAAC: + codec := aac.RTPToCodec(pkt.Payload) + media := &core.Media{ + Kind: core.KindAudio, + Direction: core.DirectionRecvonly, + Codecs: []*core.Codec{codec}, + } + c.Medias = append(c.Medias, media) + } } - return c.Close() + + return nil } -func (c *Client) MarshalJSON() ([]byte, error) { - info := &core.Info{ - Type: "MPEG-TS active producer", - URL: c.URL, - Medias: c.medias, - Receivers: c.receivers, - Recv: c.recv, +func StreamType(codec *core.Codec) uint8 { + switch codec.Name { + case core.CodecH264: + return StreamTypeH264 + case core.CodecH265: + return StreamTypeH265 + case core.CodecAAC: + return StreamTypeAAC + case core.CodecPCMA: + return StreamTypePCMATapo } - return json.Marshal(info) + return 0 } diff --git a/pkg/mpegts/reader.go b/pkg/mpegts/reader.go deleted file mode 100644 index 7cd2688e..00000000 --- a/pkg/mpegts/reader.go +++ /dev/null @@ -1,297 +0,0 @@ -package mpegts - -import ( - "errors" - "io" - - "github.com/pion/rtp" -) - -type Reader struct { - buf [PacketSize]byte // total buf - - byte byte // current byte - bits byte // bits left in byte - pos byte // current pos in buf - end byte // end position - - pmtID uint16 // Program Map Table (PMT) PID - pes map[uint16]*PES -} - -func NewReader() *Reader { - return &Reader{} -} - -const skipRead = 0xFF - -func (r *Reader) ReadPacket(rd io.Reader) (*rtp.Packet, error) { - for { - if r.pos != skipRead { - if _, err := io.ReadFull(rd, r.buf[:]); err != nil { - return nil, err - } - } - - pid, start, err := r.readPacketHeader() - if err != nil { - return nil, err - } - - if r.pes == nil { - switch pid { - case 0: // PAT ID - r.readPAT() // PAT: Program Association Table - case r.pmtID: - r.readPMT() // PMT : Program Map Table - - pkt := &rtp.Packet{ - Payload: make([]byte, 0, len(r.pes)), - } - for _, pes := range r.pes { - pkt.Payload = append(pkt.Payload, pes.StreamType) - } - return pkt, nil - } - continue - } - - if pkt := r.readPES(pid, start); pkt != nil { - return pkt, nil - } - } -} - -func (r *Reader) readPacketHeader() (pid uint16, start bool, err error) { - r.reset() - - sb := r.readByte() // Sync byte - if sb != SyncByte { - return 0, false, errors.New("mpegts: wrong sync byte") - } - - _ = r.readBit() // Transport error indicator (TEI) - pusi := r.readBit() // Payload unit start indicator (PUSI) - _ = r.readBit() // Transport priority - pid = r.readBits16(13) // PID - - _ = r.readBits(2) // Transport scrambling control (TSC) - af := r.readBit() // Adaptation field - _ = r.readBit() // Payload - _ = r.readBits(4) // Continuity counter - - if af != 0 { - adSize := r.readByte() // Adaptation field length - if adSize > PacketSize-6 { - return 0, false, errors.New("mpegts: wrong adaptation size") - } - r.skip(adSize) - } - - return pid, pusi != 0, nil -} - -func (r *Reader) skip(i byte) { - r.pos += i -} - -func (r *Reader) readPSIHeader() { - // https://en.wikipedia.org/wiki/Program-specific_information#Table_Sections - pointer := r.readByte() // Pointer field - r.skip(pointer) // Pointer filler bytes - - _ = r.readByte() // Table ID - _ = r.readBit() // Section syntax indicator - _ = r.readBit() // Private bit - _ = r.readBits(2) // Reserved bits - _ = r.readBits(2) // Section length unused bits - size := r.readBits(10) // Section length - r.setSize(byte(size)) - - _ = r.readBits(16) // Table ID extension - _ = r.readBits(2) // Reserved bits - _ = r.readBits(5) // Version number - _ = r.readBit() // Current/next indicator - _ = r.readByte() // Section number - _ = r.readByte() // Last section number -} - -// ReadPAT (Program Association Table) -func (r *Reader) readPAT() { - // https://en.wikipedia.org/wiki/Program-specific_information#PAT_(Program_Association_Table) - r.readPSIHeader() - - const CRCSize = 4 - for r.left() > CRCSize { - num := r.readBits(16) // Program num - _ = r.readBits(3) // Reserved bits - pid := r.readBits16(13) // Program map PID - if num != 0 { - r.pmtID = pid - } - } - - r.skip(4) // CRC32 -} - -// ReadPMT (Program map specific data) -func (r *Reader) readPMT() { - // https://en.wikipedia.org/wiki/Program-specific_information#PMT_(Program_map_specific_data) - r.readPSIHeader() - - _ = r.readBits(3) // Reserved bits - _ = r.readBits(13) // PCR PID - _ = r.readBits(4) // Reserved bits - _ = r.readBits(2) // Program info length unused bits - size := r.readBits(10) // Program info length - r.skip(byte(size)) - - r.pes = map[uint16]*PES{} - - const CRCSize = 4 - for r.left() > CRCSize { - streamType := r.readByte() // Stream type - _ = r.readBits(3) // Reserved bits - pid := r.readBits16(13) // Elementary PID - _ = r.readBits(4) // Reserved bits - _ = r.readBits(2) // ES Info length unused bits - size = r.readBits(10) // ES Info length - r.skip(byte(size)) - - r.pes[pid] = &PES{StreamType: streamType} - } - - r.skip(4) // CRC32 -} - -func (r *Reader) readPES(pid uint16, start bool) *rtp.Packet { - pes := r.pes[pid] - if pes == nil { - return nil - } - - // if new payload beging - if start { - if pes.Payload != nil { - r.pos = skipRead - return pes.GetPacket() // finish previous packet - } - - // https://en.wikipedia.org/wiki/Packetized_elementary_stream - // Packet start code prefix - if r.readByte() != 0 || r.readByte() != 0 || r.readByte() != 1 { - return nil - } - - pes.StreamID = r.readByte() // Stream id - packetSize := r.readBits16(16) // PES Packet length - - _ = r.readBits(2) // Marker bits - _ = r.readBits(2) // Scrambling control - _ = r.readBit() // Priority - _ = r.readBit() // Data alignment indicator - _ = r.readBit() // Copyright - _ = r.readBit() // Original or Copy - - pts := r.readBit() // PTS indicator - _ = r.readBit() // DTS indicator - _ = r.readBit() // ESCR flag - _ = r.readBit() // ES rate flag - _ = r.readBit() // DSM trick mode flag - _ = r.readBit() // Additional copy info flag - _ = r.readBit() // CRC flag - _ = r.readBit() // extension flag - - headerSize := r.readByte() // PES header length - - //log.Printf("[mpegts] pes=%d size=%d header=%d", pes.StreamID, packetSize, headerSize) - - if packetSize != 0 { - packetSize -= uint16(3 + headerSize) - } - - if pts != 0 { - pes.PTS = r.readTime() - headerSize -= 5 - } - - r.skip(headerSize) - - pes.SetBuffer(packetSize, r.bytes()) - } else { - pes.AppendBuffer(r.bytes()) - } - - if pes.Size != 0 && len(pes.Payload) >= pes.Size { - return pes.GetPacket() // finish current packet - } - - return nil -} - -func (r *Reader) reset() { - r.pos = 0 - r.end = PacketSize - r.bits = 0 -} - -//goland:noinspection GoStandardMethods -func (r *Reader) readByte() byte { - if r.bits != 0 { - return byte(r.readBits(8)) - } - - b := r.buf[r.pos] - r.pos++ - return b -} - -func (r *Reader) readBit() byte { - if r.bits == 0 { - r.byte = r.readByte() - r.bits = 7 - } else { - r.bits-- - } - - return (r.byte >> r.bits) & 0b1 -} - -func (r *Reader) readBits(n byte) (res uint32) { - for i := n - 1; i != 255; i-- { - res |= uint32(r.readBit()) << i - } - return -} - -func (r *Reader) readBits16(n byte) (res uint16) { - for i := n - 1; i != 255; i-- { - res |= uint16(r.readBit()) << i - } - return -} - -func (r *Reader) readTime() uint32 { - // https://en.wikipedia.org/wiki/Packetized_elementary_stream - // xxxxAAAx BBBBBBBB BBBBBBBx CCCCCCCC CCCCCCCx - _ = r.readBits(4) // 0010b or 0011b or 0001b - ts := r.readBits(3) << 30 - _ = r.readBits(1) // 1b - ts |= r.readBits(15) << 15 - _ = r.readBits(1) // 1b - ts |= r.readBits(15) - _ = r.readBits(1) // 1b - return ts -} - -func (r *Reader) bytes() []byte { - return r.buf[r.pos:PacketSize] -} - -func (r *Reader) left() byte { - return r.end - r.pos -} - -func (r *Reader) setSize(size byte) { - r.end = r.pos + size -} diff --git a/pkg/mpegts/ts.go b/pkg/mpegts/ts.go deleted file mode 100644 index ae1b4b49..00000000 --- a/pkg/mpegts/ts.go +++ /dev/null @@ -1,225 +0,0 @@ -package mpegts - -import ( - "bytes" - "encoding/hex" - "encoding/json" - "time" - - "github.com/AlexxIT/go2rtc/pkg/aac" - "github.com/AlexxIT/go2rtc/pkg/core" - "github.com/AlexxIT/go2rtc/pkg/h264" - "github.com/deepch/vdk/av" - "github.com/deepch/vdk/codec/aacparser" - "github.com/deepch/vdk/codec/h264parser" - "github.com/deepch/vdk/format/ts" - "github.com/pion/rtp" -) - -type Consumer struct { - core.Listener - - UserAgent string - RemoteAddr string - - senders []*core.Sender - - buf *bytes.Buffer - muxer *ts.Muxer - mimeType string - streams []av.CodecData - start bool - init []byte - - send int -} - -func (c *Consumer) GetMedias() []*core.Media { - return []*core.Media{ - { - Kind: core.KindVideo, - Direction: core.DirectionSendonly, - Codecs: []*core.Codec{ - {Name: core.CodecH264}, - }, - }, - //{ - // Kind: core.KindAudio, - // Direction: core.DirectionSendonly, - // Codecs: []*core.Codec{ - // {Name: core.CodecAAC}, - // }, - //}, - } -} - -func (c *Consumer) AddTrack(media *core.Media, _ *core.Codec, track *core.Receiver) error { - trackID := int8(len(c.streams)) - - handler := core.NewSender(media, track.Codec) - - switch track.Codec.Name { - case core.CodecH264: - sps, pps := h264.GetParameterSet(track.Codec.FmtpLine) - // some dummy SPS and PPS not a problem - if len(sps) == 0 { - sps = []byte{0x67, 0x42, 0x00, 0x0a, 0xf8, 0x41, 0xa2} - } - if len(pps) == 0 { - pps = []byte{0x68, 0xce, 0x38, 0x80} - } - - stream, err := h264parser.NewCodecDataFromSPSAndPPS(sps, pps) - if err != nil { - return nil - } - - if len(c.mimeType) > 0 { - c.mimeType += "," - } - - c.mimeType += "avc1." + h264.GetProfileLevelID(track.Codec.FmtpLine) - - c.streams = append(c.streams, stream) - - pkt := av.Packet{Idx: trackID, CompositionTime: time.Millisecond} - - ts2time := time.Second / time.Duration(track.Codec.ClockRate) - - handler.Handler = func(packet *rtp.Packet) { - if packet.Version != h264.RTPPacketVersionAVC { - return - } - - if !c.start { - return - } - - pkt.Data = packet.Payload - newTime := time.Duration(packet.Timestamp) * ts2time - if pkt.Time > 0 { - pkt.Duration = newTime - pkt.Time - } - pkt.Time = newTime - - if err = c.muxer.WritePacket(pkt); err != nil { - return - } - - // clone bytes from buffer, so next packet won't overwrite it - buf := append([]byte{}, c.buf.Bytes()...) - c.Fire(buf) - - c.send += len(buf) - - c.buf.Reset() - } - - if track.Codec.IsRTP() { - handler.Handler = h264.RTPDepay(track.Codec, handler.Handler) - } else { - handler.Handler = h264.RepairAVCC(track.Codec, handler.Handler) - } - - case core.CodecAAC: - s := core.Between(track.Codec.FmtpLine, "config=", ";") - - b, err := hex.DecodeString(s) - if err != nil { - return nil - } - - stream, err := aacparser.NewCodecDataFromMPEG4AudioConfigBytes(b) - if err != nil { - return nil - } - - if len(c.mimeType) > 0 { - c.mimeType += "," - } - - c.mimeType += "mp4a.40.2" - c.streams = append(c.streams, stream) - - pkt := av.Packet{Idx: trackID, CompositionTime: time.Millisecond} - - ts2time := time.Second / time.Duration(track.Codec.ClockRate) - - handler.Handler = func(packet *rtp.Packet) { - if !c.start { - return - } - - pkt.Data = packet.Payload - newTime := time.Duration(packet.Timestamp) * ts2time - if pkt.Time > 0 { - pkt.Duration = newTime - pkt.Time - } - pkt.Time = newTime - - if err = c.muxer.WritePacket(pkt); err != nil { - return - } - - // clone bytes from buffer, so next packet won't overwrite it - buf := append([]byte{}, c.buf.Bytes()...) - c.Fire(buf) - - c.send += len(buf) - - c.buf.Reset() - } - - if track.Codec.IsRTP() { - handler.Handler = aac.RTPDepay(handler.Handler) - } - - default: - panic("unsupported codec") - } - - handler.HandleRTP(track) - c.senders = append(c.senders, handler) - - return nil -} - -func (c *Consumer) MimeCodecs() string { - return c.mimeType -} - -func (c *Consumer) Init() ([]byte, error) { - c.buf = bytes.NewBuffer(nil) - c.muxer = ts.NewMuxer(c.buf) - - // first packet will be with header, it's ok - if err := c.muxer.WriteHeader(c.streams); err != nil { - return nil, err - } - data := append([]byte{}, c.buf.Bytes()...) - - return data, nil -} - -func (c *Consumer) Start() { - c.start = true -} - -func (c *Consumer) Stop() error { - for _, sender := range c.senders { - sender.Close() - } - return nil -} - -func (c *Consumer) MarshalJSON() ([]byte, error) { - info := &core.Info{ - Type: "TS passive consumer", - RemoteAddr: c.RemoteAddr, - UserAgent: c.UserAgent, - Medias: c.GetMedias(), - Senders: c.senders, - Send: c.send, - } - return json.Marshal(info) -} diff --git a/pkg/mpegts/writer.go b/pkg/mpegts/writer.go deleted file mode 100644 index b8ad54c9..00000000 --- a/pkg/mpegts/writer.go +++ /dev/null @@ -1,219 +0,0 @@ -package mpegts - -type Writer struct { - b []byte // packets buffer - m int // crc start - - pid []uint16 - counter []byte - streamType []byte - timestamp []uint32 -} - -func NewWriter() *Writer { - return &Writer{} -} - -func (w *Writer) AddPES(pid uint16, streamType byte) { - w.pid = append(w.pid, pid) - w.streamType = append(w.streamType, streamType) - w.counter = append(w.counter, 0) - w.timestamp = append(w.timestamp, 0) -} - -func (w *Writer) WriteByte(b byte) { - w.b = append(w.b, b) -} - -func (w *Writer) WriteUint16(i uint16) { - w.b = append(w.b, byte(i>>8), byte(i)) -} - -func (w *Writer) WriteTime(t uint32) { - const onlyPTS = 0x20 - // [>>32 <<3] [>>24 <<2] [>>16 <<2] [>>8 <<1] [<<1] - w.b = append(w.b, onlyPTS|byte(t>>29)|1, byte(t>>22), byte(t>>14)|1, byte(t>>7), byte(t<<1)|1) -} - -func (w *Writer) WriteBytes(b []byte) { - w.b = append(w.b, b...) -} - -func (w *Writer) MarkChecksum() { - w.m = len(w.b) -} - -func (w *Writer) WriteChecksum() { - crc := checksum(w.b[w.m:]) - w.b = append(w.b, byte(crc), byte(crc>>8), byte(crc>>16), byte(crc>>24)) -} - -func (w *Writer) FinishPacket() { - if n := len(w.b) % PacketSize; n != 0 { - w.b = append(w.b, make([]byte, PacketSize-n)...) - } -} - -func (w *Writer) Bytes() []byte { - if len(w.b)%PacketSize != 0 { - panic("wrong packet size") - } - return w.b -} - -func (w *Writer) Reset() { - w.b = nil -} - -const isUnitStart = 0x4000 -const flagHasAdaptation = 0x20 -const flagHasPayload = 0x10 -const lenIsProgramTable = 0xB000 -const tableFlags = 0xC1 -const tableHeader = 0xE000 -const tableLength = 0xF000 - -const patPID = 0 -const patTableID = 0 -const patTableExtID = 1 - -func (w *Writer) WritePAT() { - w.WriteByte(SyncByte) - w.WriteUint16(isUnitStart | patPID) // PAT PID - w.WriteByte(flagHasPayload) // flags... - - w.WriteByte(0) // Pointer field - - w.MarkChecksum() - w.WriteByte(patTableID) // Table ID - w.WriteUint16(lenIsProgramTable | 13) // Section length - w.WriteUint16(patTableExtID) // Table ID extension - w.WriteByte(tableFlags) // flags... - w.WriteByte(0) // Section number - w.WriteByte(0) // Last section number - - w.WriteUint16(1) // Program num (usual 1) - w.WriteUint16(tableHeader + pmtPID) - - w.WriteChecksum() - - w.FinishPacket() -} - -const pmtPID = 18 -const pmtTableID = 2 -const pmtTableExtID = 1 - -func (w *Writer) WritePMT() { - w.WriteByte(SyncByte) - w.WriteUint16(isUnitStart | pmtPID) // PMT PID - w.WriteByte(flagHasPayload) // flags... - - w.WriteByte(0) // Pointer field - - tableLen := 13 + uint16(len(w.pid))*5 - - w.MarkChecksum() - w.WriteByte(pmtTableID) // Table ID - w.WriteUint16(lenIsProgramTable | tableLen) // Section length - w.WriteUint16(pmtTableExtID) // Table ID extension - w.WriteByte(tableFlags) // flags... - w.WriteByte(0) // Section number - w.WriteByte(0) // Last section number - - w.WriteUint16(tableHeader | w.pid[0]) // PID - w.WriteUint16(tableLength | 0) // Info length - - for i, pid := range w.pid { - w.WriteByte(w.streamType[i]) - w.WriteUint16(tableHeader | pid) // PID - w.WriteUint16(tableLength | 0) // Info len - } - - w.WriteChecksum() - - w.FinishPacket() -} - -const pesHeaderSize = PacketSize - 18 - -func (w *Writer) WritePES(pid uint16, streamID byte, payload []byte) { - w.WriteByte(SyncByte) - w.WriteUint16(isUnitStart | pid) - - // check if payload lower then max first packet size - if len(payload) < PacketSize-18 { - w.WriteByte(flagHasAdaptation | flagHasPayload) - - // for 183 payload will be zero - adSize := PacketSize - 18 - 1 - byte(len(payload)) - w.WriteByte(adSize) - w.WriteBytes(make([]byte, adSize)) - } else { - w.WriteByte(flagHasPayload) - } - - w.WriteByte(0) - w.WriteByte(0) - w.WriteByte(1) - - w.WriteByte(streamID) - w.WriteUint16(uint16(8 + len(payload))) - - w.WriteByte(0x80) - w.WriteByte(0x80) // only PTS - w.WriteByte(5) // optional size - - switch w.streamType[0] { - case StreamTypePCMATapo: - w.timestamp[0] += uint32(len(payload) * 45 / 8) - } - - w.WriteTime(w.timestamp[0]) - - if len(payload) < PacketSize-18 { - w.WriteBytes(payload) - return - } - - w.WriteBytes(payload[:pesHeaderSize]) - - payload = payload[pesHeaderSize:] - var counter byte - - for { - counter++ - - if len(payload) > PacketSize-4 { - // payload more then maximum size - w.WriteByte(SyncByte) - w.WriteUint16(pid) - w.WriteByte(flagHasPayload | counter&0xF) - w.WriteBytes(payload[:PacketSize-4]) - - payload = payload[PacketSize-4:] - } else if len(payload) == PacketSize-4 { - // payload equal maximum size (last packet) - w.WriteByte(SyncByte) - w.WriteUint16(pid) - w.WriteByte(flagHasPayload | counter&0xF) - w.WriteBytes(payload) - - break - } else { - // payload lower than maximum size (last packet) - w.WriteByte(SyncByte) - w.WriteUint16(pid) - w.WriteByte(flagHasAdaptation | flagHasPayload | counter&0xF) - - // for 183 payload will be zero - adSize := PacketSize - 4 - 1 - byte(len(payload)) - w.WriteByte(adSize) - w.WriteBytes(make([]byte, adSize)) - - w.WriteBytes(payload) - - break - } - } -} diff --git a/pkg/tapo/client.go b/pkg/tapo/client.go index 15a39b6e..db4e624a 100644 --- a/pkg/tapo/client.go +++ b/pkg/tapo/client.go @@ -145,11 +145,11 @@ func (c *Client) SetupStream() (err error) { // Handle - first run will be in probe state func (c *Client) Handle() error { - multipartRd := multipart.NewReader(c.conn1, "--device-stream-boundary--") - mpegtsRd := mpegts.NewReader() + rd := multipart.NewReader(c.conn1, "--device-stream-boundary--") + demux := mpegts.NewDemuxer() for { - p, err := multipartRd.NextRawPart() + p, err := rd.NextRawPart() if err != nil { return err } @@ -181,7 +181,7 @@ func (c *Client) Handle() error { bytesRd := bytes.NewReader(body) for { - pkt, err2 := mpegtsRd.ReadPacket(bytesRd) + pkt, err2 := demux.ReadPacket(bytesRd) if pkt == nil || err2 == io.EOF { break } diff --git a/pkg/tapo/consumer.go b/pkg/tapo/consumer.go index 0a6b7792..ad23af13 100644 --- a/pkg/tapo/consumer.go +++ b/pkg/tapo/consumer.go @@ -2,10 +2,11 @@ package tapo import ( "bytes" + "strconv" + "github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/mpegts" "github.com/pion/rtp" - "strconv" ) func (c *Client) AddTrack(media *core.Media, _ *core.Codec, track *core.Receiver) error { @@ -14,17 +15,16 @@ func (c *Client) AddTrack(media *core.Media, _ *core.Codec, track *core.Receiver return nil } - w := mpegts.NewWriter() - w.AddPES(68, mpegts.StreamTypePCMATapo) - w.WritePAT() - w.WritePMT() + muxer := mpegts.NewMuxer() + pid := muxer.AddTrack(mpegts.StreamTypePCMATapo) + if err := c.WriteBackchannel(muxer.GetHeader()); err != nil { + return err + } c.sender = core.NewSender(media, track.Codec) c.sender.Handler = func(packet *rtp.Packet) { - // don't know why 68 and 192 - w.WritePES(68, 192, packet.Payload) - _ = c.WriteBackchannel(w.Bytes()) - w.Reset() + b := muxer.GetPayload(pid, packet.Timestamp, packet.Payload) + _ = c.WriteBackchannel(b) } }