Rewrite mpegts producer and consumer

This commit is contained in:
Alexey Khit
2023-08-19 16:37:52 +03:00
parent 24039218a1
commit f67f6e5b9f
17 changed files with 993 additions and 1030 deletions

View File

@@ -12,7 +12,6 @@ import (
"github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/mp4"
"github.com/AlexxIT/go2rtc/pkg/mpegts"
"github.com/AlexxIT/go2rtc/pkg/tcp"
"github.com/rs/zerolog"
)
@@ -79,10 +78,10 @@ func handlerStream(w http.ResponseWriter, r *http.Request) {
Medias: medias,
}
} else {
cons = &mpegts.Consumer{
RemoteAddr: tcp.RemoteAddr(r),
UserAgent: r.UserAgent(),
}
//cons = &mpegts.Consumer{
// RemoteAddr: tcp.RemoteAddr(r),
// UserAgent: r.UserAgent(),
//}
}
if err := stream.AddConsumer(cons); err != nil {

View File

@@ -53,3 +53,20 @@ func ConfigToCodec(conf []byte) *core.Codec {
return codec
}
func DecodeConfig(b []byte) (objType, sampleFreqIdx, channels byte, sampleRate uint32) {
rd := bits.NewReader(b)
objType = rd.ReadBits8(5)
if objType == 0b11111 {
objType = 32 + rd.ReadBits8(6)
}
sampleFreqIdx = rd.ReadBits8(4)
if sampleFreqIdx == 0b1111 {
sampleRate = rd.ReadBits(24)
}
channels = rd.ReadBits8(4)
return
}

View File

@@ -57,5 +57,45 @@ func ADTSToCodec(b []byte) *core.Codec {
func ReadADTSSize(b []byte) uint16 {
// AAAAAAAA AAAABCCD EEFFFFGH HHIJKLMM MMMMMMMM MMMOOOOO OOOOOOPP (QQQQQQQQ QQQQQQQQ)
_ = b[5] // bounds
return uint16(b[3]&0x03)<<(8+3) | uint16(b[4])<<3 | uint16(b[5]>>5)
}
func WriteADTSSize(b []byte, size uint16) {
// AAAAAAAA AAAABCCD EEFFFFGH HHIJKLMM MMMMMMMM MMMOOOOO OOOOOOPP (QQQQQQQQ QQQQQQQQ)
_ = b[5] // bounds
b[3] |= byte(size >> (8 + 3))
b[4] = byte(size >> 3)
b[5] |= byte(size << 5)
return
}
func CodecToADTS(codec *core.Codec) []byte {
s := core.Between(codec.FmtpLine, "config=", ";")
conf, err := hex.DecodeString(s)
if err != nil {
return nil
}
objType, sampleFreqIdx, channels, _ := DecodeConfig(conf)
profile := objType - 1
wr := bits.NewWriter(nil)
wr.WriteAllBits(1, 12) // Syncword, all bits must be set to 1
wr.WriteBit(0) // MPEG Version, set to 0 for MPEG-4 and 1 for MPEG-2
wr.WriteBits8(0, 2) // Layer, always set to 0
wr.WriteBit(1) // Protection absence, set to 1 if there is no CRC and 0 if there is CRC
wr.WriteBits8(profile, 2) // Profile, the MPEG-4 Audio Object Type minus 1
wr.WriteBits8(sampleFreqIdx, 4) // MPEG-4 Sampling Frequency Index
wr.WriteBit(0) // Private bit, guaranteed never to be used by MPEG, set to 0 when encoding, ignore when decoding
wr.WriteBits8(channels, 3) // MPEG-4 Channel Configuration
wr.WriteBit(0) // Originality, set to 1 to signal originality of the audio and 0 otherwise
wr.WriteBit(0) // Home, set to 1 to signal home usage of the audio and 0 otherwise
wr.WriteBit(0) // Copyright ID bit
wr.WriteBit(0) // Copyright ID start
wr.WriteBits16(0, 13) // Frame length
wr.WriteAllBits(1, 11) // Buffer fullness (variable bitrate)
wr.WriteBits8(0, 2) // Number of AAC frames (Raw Data Blocks) in ADTS frame minus 1
return wr.Bytes()
}

27
pkg/aac/adts_test.go Normal file
View File

@@ -0,0 +1,27 @@
package aac
import (
"encoding/hex"
"testing"
"github.com/stretchr/testify/require"
)
func TestADTS(t *testing.T) {
// FFmpeg MPEG-TS AAC (one packet)
s := "fff15080021ffc210049900219002380fff15080021ffc212049900219002380" //...
src, err := hex.DecodeString(s)
require.Nil(t, err)
codec := ADTSToCodec(src)
require.Equal(t, uint32(44100), codec.ClockRate)
require.Equal(t, uint16(2), codec.Channels)
size := ReadADTSSize(src)
require.Equal(t, uint16(16), size)
dst := CodecToADTS(codec)
WriteADTSSize(dst, size)
require.Equal(t, src[:len(dst)], dst)
}

View File

@@ -77,16 +77,55 @@ func RTPPay(handler core.HandlerFunc) core.HandlerFunc {
}
}
func ADTStoRTP(b []byte) []byte {
header := make([]byte, 2)
for i := 0; i < len(b); {
auSize := ReadADTSSize(b[i:])
header = append(header, byte(auSize>>5), byte(auSize<<3)) // size in bits
func ADTStoRTP(src []byte) (dst []byte) {
dst = make([]byte, 2) // header bytes
for i := 0; i < len(src); {
auSize := ReadADTSSize(src[i:])
dst = append(dst, byte(auSize>>5), byte(auSize<<3)) // size in bits
i += int(auSize)
}
hdrSize := uint16(len(header) - 2)
binary.BigEndian.PutUint16(header, hdrSize<<3) // size in bits
return append(header, b...)
hdrSize := uint16(len(dst) - 2)
binary.BigEndian.PutUint16(dst, hdrSize<<3) // size in bits
return append(dst, src...)
}
func RTPTimeSize(b []byte) uint32 {
// convert RTP header size to units count
units := binary.BigEndian.Uint16(b) >> 4
return 1024 * uint32(units)
}
func RTPToADTS(codec *core.Codec, handler core.HandlerFunc) core.HandlerFunc {
adts := CodecToADTS(codec)
return func(packet *rtp.Packet) {
src := packet.Payload
dst := make([]byte, 0, len(src))
headersSize := binary.BigEndian.Uint16(src) >> 3
headers := src[2 : 2+headersSize]
units := src[2+headersSize:]
for len(headers) > 0 {
unitSize := binary.BigEndian.Uint16(headers) >> 3
headers = headers[2:]
unit := units[:unitSize]
units = units[unitSize:]
if !IsADTS(unit) {
i := len(dst)
dst = append(dst, adts...)
WriteADTSSize(dst[i:], ADTSHeaderSize+uint16(len(unit)))
}
dst = append(dst, unit...)
}
clone := *packet
clone.Version = RTPPacketVersionAAC
clone.Payload = dst
handler(&clone)
}
}
func RTPToCodec(b []byte) *core.Codec {

View File

@@ -1,3 +1,16 @@
## MPEG-TS
FFmpeg:
- PMTID=4096
- H264: PESID=256, StreamType=27, StreamID=224
- H265: PESID=256, StreamType=36, StreamID=224
- AAC: PESID=257, StreamType=15, StreamID=192
Tapo:
- PMTID=18
- H264: PESID=68, StreamType=27, StreamID=224
- AAC: PESID=69, StreamType=144, StreamID=192
## Useful links
- https://github.com/theREDspace/video-onboarding/blob/main/MPEGTS%20Knowledge.md

View File

@@ -1,126 +0,0 @@
package mpegts
import (
"bytes"
"io"
"time"
"github.com/AlexxIT/go2rtc/pkg/aac"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/h264"
"github.com/AlexxIT/go2rtc/pkg/h265"
)
type Client struct {
URL string
rd *core.ReadBuffer
medias []*core.Media
receivers []*core.Receiver
recv int
}
func Open(rd io.Reader) (*Client, error) {
client := &Client{rd: core.NewReadBuffer(rd)}
if err := client.describe(); err != nil {
return nil, err
}
return client, nil
}
func (c *Client) describe() error {
c.rd.BufferSize = core.ProbeSize
defer c.rd.Reset()
rd := NewReader()
// Strategy:
// 1. Wait packet with metadata, init other packets for wait
// 2. Wait other packets
// 3. Stop after timeout
waitType := []byte{metadataType}
timeout := time.Now().Add(core.ProbeTimeout)
for len(waitType) != 0 && time.Now().Before(timeout) {
pkt, err := rd.ReadPacket(c.rd)
if err != nil {
return err
}
// check if we wait this type
if i := bytes.IndexByte(waitType, pkt.PayloadType); i < 0 {
continue
} else {
waitType = append(waitType[:i], waitType[i+1:]...)
}
switch pkt.PayloadType {
case metadataType:
for _, streamType := range pkt.Payload {
switch streamType {
case StreamTypeH264, StreamTypeH265, StreamTypeAAC:
waitType = append(waitType, streamType)
}
}
case StreamTypeH264:
codec := h264.AVCCToCodec(pkt.Payload)
media := &core.Media{
Kind: core.KindVideo,
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{codec},
}
c.medias = append(c.medias, media)
case StreamTypeH265:
codec := h265.AVCCToCodec(pkt.Payload)
media := &core.Media{
Kind: core.KindVideo,
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{codec},
}
c.medias = append(c.medias, media)
case StreamTypeAAC:
codec := aac.RTPToCodec(pkt.Payload)
media := &core.Media{
Kind: core.KindAudio,
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{codec},
}
c.medias = append(c.medias, media)
}
}
return nil
}
func (c *Client) play() error {
rd := NewReader()
for {
pkt, err := rd.ReadPacket(c.rd)
if err != nil {
return err
}
//log.Printf("[mpegts] size: %6d, ts: %10d, pt: %2d", len(pkt.Payload), pkt.Timestamp, pkt.PayloadType)
for _, receiver := range c.receivers {
if receiver.ID == pkt.PayloadType {
pkt.Timestamp = PTSToTimestamp(pkt.Timestamp, receiver.Codec.ClockRate)
receiver.WriteRTP(pkt)
break
}
}
}
}
func (c *Client) Close() error {
if closer, ok := c.rd.Reader.(io.Closer); ok {
return closer.Close()
}
return nil
}

113
pkg/mpegts/consumer.go Normal file
View File

@@ -0,0 +1,113 @@
package mpegts
import (
"io"
"github.com/AlexxIT/go2rtc/pkg/aac"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/h264"
"github.com/AlexxIT/go2rtc/pkg/h265"
"github.com/pion/rtp"
)
type Consumer struct {
core.SuperConsumer
muxer *Muxer
wr *core.WriteBuffer
}
func NewConsumer() *Consumer {
c := &Consumer{
muxer: NewMuxer(),
wr: core.NewWriteBuffer(nil),
}
c.Medias = []*core.Media{
{
Kind: core.KindVideo,
Direction: core.DirectionSendonly,
Codecs: []*core.Codec{
{Name: core.CodecH264},
{Name: core.CodecH265},
},
},
{
Kind: core.KindAudio,
Direction: core.DirectionSendonly,
Codecs: []*core.Codec{
{Name: core.CodecAAC},
},
},
}
return c
}
func (c *Consumer) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error {
sender := core.NewSender(media, track.Codec)
switch track.Codec.Name {
case core.CodecH264:
pid := c.muxer.AddTrack(StreamTypeH264)
sender.Handler = func(pkt *rtp.Packet) {
b := c.muxer.GetPayload(pid, pkt.Timestamp, pkt.Payload)
if n, err := c.wr.Write(b); err == nil {
c.Send += n
}
}
if track.Codec.IsRTP() {
sender.Handler = h264.RTPDepay(track.Codec, sender.Handler)
} else {
sender.Handler = h264.RepairAVCC(track.Codec, sender.Handler)
}
case core.CodecH265:
pid := c.muxer.AddTrack(StreamTypeH265)
sender.Handler = func(pkt *rtp.Packet) {
b := c.muxer.GetPayload(pid, pkt.Timestamp, pkt.Payload)
if n, err := c.wr.Write(b); err == nil {
c.Send += n
}
}
if track.Codec.IsRTP() {
sender.Handler = h265.RTPDepay(track.Codec, sender.Handler)
}
case core.CodecAAC:
pid := c.muxer.AddTrack(StreamTypeAAC)
sender.Handler = func(pkt *rtp.Packet) {
pts := pkt.Timestamp * 90000 / track.Codec.ClockRate
b := c.muxer.GetPayload(pid, pts, pkt.Payload)
if n, err := c.wr.Write(b); err == nil {
c.Send += n
}
}
if track.Codec.IsRTP() {
sender.Handler = aac.RTPToADTS(track.Codec, sender.Handler)
} else {
panic("todo")
}
}
sender.HandleRTP(track)
c.Senders = append(c.Senders, sender)
return nil
}
func (c *Consumer) WriteTo(wr io.Writer) (int64, error) {
b := c.muxer.GetHeader()
if _, err := wr.Write(b); err != nil {
return 0, err
}
return c.wr.WriteTo(wr)
}
func (c *Consumer) Close() error {
_ = c.SuperConsumer.Close()
return c.wr.Close()
}

393
pkg/mpegts/demuxer.go Normal file
View File

@@ -0,0 +1,393 @@
package mpegts
import (
"errors"
"io"
"github.com/AlexxIT/go2rtc/pkg/aac"
"github.com/AlexxIT/go2rtc/pkg/bits"
"github.com/AlexxIT/go2rtc/pkg/h264/annexb"
"github.com/pion/rtp"
)
type Demuxer struct {
buf [PacketSize]byte // total buf
byte byte // current byte
bits byte // bits left in byte
pos byte // current pos in buf
end byte // end position
pmtID uint16 // Program Map Table (PMT) PID
pes map[uint16]*PES
}
func NewDemuxer() *Demuxer {
return &Demuxer{}
}
const skipRead = 0xFF
func (d *Demuxer) ReadPacket(rd io.Reader) (*rtp.Packet, error) {
for {
if d.pos != skipRead {
if _, err := io.ReadFull(rd, d.buf[:]); err != nil {
return nil, err
}
}
pid, start, err := d.readPacketHeader()
if err != nil {
return nil, err
}
if d.pes == nil {
switch pid {
case 0: // PAT ID
d.readPAT() // PAT: Program Association Table
case d.pmtID:
d.readPMT() // PMT : Program Map Table
pkt := &rtp.Packet{
Payload: make([]byte, 0, len(d.pes)),
}
for _, pes := range d.pes {
pkt.Payload = append(pkt.Payload, pes.StreamType)
}
return pkt, nil
}
continue
}
if pkt := d.readPES(pid, start); pkt != nil {
return pkt, nil
}
}
}
func (d *Demuxer) readPacketHeader() (pid uint16, start bool, err error) {
d.reset()
sb := d.readByte() // Sync byte
if sb != SyncByte {
return 0, false, errors.New("mpegts: wrong sync byte")
}
_ = d.readBit() // Transport error indicator (TEI)
pusi := d.readBit() // Payload unit start indicator (PUSI)
_ = d.readBit() // Transport priority
pid = d.readBits16(13) // PID
_ = d.readBits(2) // Transport scrambling control (TSC)
af := d.readBit() // Adaptation field
_ = d.readBit() // Payload
_ = d.readBits(4) // Continuity counter
if af != 0 {
adSize := d.readByte() // Adaptation field length
if adSize > PacketSize-6 {
return 0, false, errors.New("mpegts: wrong adaptation size")
}
d.skip(adSize)
}
return pid, pusi != 0, nil
}
func (d *Demuxer) skip(i byte) {
d.pos += i
}
func (d *Demuxer) readPSIHeader() {
// https://en.wikipedia.org/wiki/Program-specific_information#Table_Sections
pointer := d.readByte() // Pointer field
d.skip(pointer) // Pointer filler bytes
_ = d.readByte() // Table ID
_ = d.readBit() // Section syntax indicator
_ = d.readBit() // Private bit
_ = d.readBits(2) // Reserved bits
_ = d.readBits(2) // Section length unused bits
size := d.readBits(10) // Section length
d.setSize(byte(size))
_ = d.readBits(16) // Table ID extension
_ = d.readBits(2) // Reserved bits
_ = d.readBits(5) // Version number
_ = d.readBit() // Current/next indicator
_ = d.readByte() // Section number
_ = d.readByte() // Last section number
}
// ReadPAT (Program Association Table)
func (d *Demuxer) readPAT() {
// https://en.wikipedia.org/wiki/Program-specific_information#PAT_(Program_Association_Table)
d.readPSIHeader()
const CRCSize = 4
for d.left() > CRCSize {
num := d.readBits(16) // Program num
_ = d.readBits(3) // Reserved bits
pid := d.readBits16(13) // Program map PID
if num != 0 {
d.pmtID = pid
}
}
d.skip(4) // CRC32
}
// ReadPMT (Program map specific data)
func (d *Demuxer) readPMT() {
// https://en.wikipedia.org/wiki/Program-specific_information#PMT_(Program_map_specific_data)
d.readPSIHeader()
_ = d.readBits(3) // Reserved bits
_ = d.readBits(13) // PCR PID
_ = d.readBits(4) // Reserved bits
_ = d.readBits(2) // Program info length unused bits
size := d.readBits(10) // Program info length
d.skip(byte(size))
d.pes = map[uint16]*PES{}
const CRCSize = 4
for d.left() > CRCSize {
streamType := d.readByte() // Stream type
_ = d.readBits(3) // Reserved bits
pid := d.readBits16(13) // Elementary PID
_ = d.readBits(4) // Reserved bits
_ = d.readBits(2) // ES Info length unused bits
size = d.readBits(10) // ES Info length
d.skip(byte(size))
d.pes[pid] = &PES{StreamType: streamType}
}
d.skip(4) // CRC32
}
func (d *Demuxer) readPES(pid uint16, start bool) *rtp.Packet {
pes := d.pes[pid]
if pes == nil {
return nil
}
// if new payload beging
if start {
if pes.Payload != nil {
d.pos = skipRead
return pes.GetPacket() // finish previous packet
}
// https://en.wikipedia.org/wiki/Packetized_elementary_stream
// Packet start code prefix
if d.readByte() != 0 || d.readByte() != 0 || d.readByte() != 1 {
return nil
}
pes.StreamID = d.readByte() // Stream id
packetSize := d.readBits16(16) // PES Packet length
_ = d.readBits(2) // Marker bits
_ = d.readBits(2) // Scrambling control
_ = d.readBit() // Priority
_ = d.readBit() // Data alignment indicator
_ = d.readBit() // Copyright
_ = d.readBit() // Original or Copy
pts := d.readBit() // PTS indicator
_ = d.readBit() // DTS indicator
_ = d.readBit() // ESCR flag
_ = d.readBit() // ES rate flag
_ = d.readBit() // DSM trick mode flag
_ = d.readBit() // Additional copy info flag
_ = d.readBit() // CRC flag
_ = d.readBit() // extension flag
headerSize := d.readByte() // PES header length
//log.Printf("[mpegts] pes=%d size=%d header=%d", pes.StreamID, packetSize, headerSize)
if packetSize != 0 {
packetSize -= uint16(3 + headerSize)
}
if pts != 0 {
pes.PTS = d.readTime()
headerSize -= 5
}
d.skip(headerSize)
pes.SetBuffer(packetSize, d.bytes())
} else {
pes.AppendBuffer(d.bytes())
}
if pes.Size != 0 && len(pes.Payload) >= pes.Size {
return pes.GetPacket() // finish current packet
}
return nil
}
func (d *Demuxer) reset() {
d.pos = 0
d.end = PacketSize
d.bits = 0
}
//goland:noinspection GoStandardMethods
func (d *Demuxer) readByte() byte {
if d.bits != 0 {
return byte(d.readBits(8))
}
b := d.buf[d.pos]
d.pos++
return b
}
func (d *Demuxer) readBit() byte {
if d.bits == 0 {
d.byte = d.readByte()
d.bits = 7
} else {
d.bits--
}
return (d.byte >> d.bits) & 0b1
}
func (d *Demuxer) readBits(n byte) (res uint32) {
for i := n - 1; i != 255; i-- {
res |= uint32(d.readBit()) << i
}
return
}
func (d *Demuxer) readBits16(n byte) (res uint16) {
for i := n - 1; i != 255; i-- {
res |= uint16(d.readBit()) << i
}
return
}
func (d *Demuxer) readTime() uint32 {
// https://en.wikipedia.org/wiki/Packetized_elementary_stream
// xxxxAAAx BBBBBBBB BBBBBBBx CCCCCCCC CCCCCCCx
_ = d.readBits(4) // 0010b or 0011b or 0001b
ts := d.readBits(3) << 30
_ = d.readBits(1) // 1b
ts |= d.readBits(15) << 15
_ = d.readBits(1) // 1b
ts |= d.readBits(15)
_ = d.readBits(1) // 1b
return ts
}
func (d *Demuxer) bytes() []byte {
return d.buf[d.pos:PacketSize]
}
func (d *Demuxer) left() byte {
return d.end - d.pos
}
func (d *Demuxer) setSize(size byte) {
d.end = d.pos + size
}
const (
PacketSize = 188
SyncByte = 0x47 // Uppercase G
)
// https://en.wikipedia.org/wiki/Program-specific_information#Elementary_stream_types
const (
StreamTypeMetadata = 0 // Reserved
StreamTypePrivate = 0x06 // PCMU or PCMA or FLAC from FFmpeg
StreamTypeAAC = 0x0F
StreamTypeH264 = 0x1B
StreamTypeH265 = 0x24
StreamTypePCMATapo = 0x90
)
// PES - Packetized Elementary Stream
type PES struct {
StreamID byte // from each PES header
StreamType byte // from PMT table
Sequence uint16 // manual
Timestamp uint32 // manual
PTS uint32 // from PTS extra header, always 90000Hz
Payload []byte // from PTS body
Size int // from PTS header, can be 0
wr *bits.Writer
}
func (p *PES) SetBuffer(size uint16, b []byte) {
p.Payload = make([]byte, 0, size)
p.Payload = append(p.Payload, b...)
p.Size = int(size)
}
func (p *PES) AppendBuffer(b []byte) {
p.Payload = append(p.Payload, b...)
}
func (p *PES) GetPacket() (pkt *rtp.Packet) {
switch p.StreamType {
case StreamTypeH264, StreamTypeH265:
pkt = &rtp.Packet{
Header: rtp.Header{
PayloadType: p.StreamType,
Timestamp: p.PTS, // PTS is ok, because 90000Hz
},
Payload: annexb.EncodeToAVCC(p.Payload, false),
}
case StreamTypeAAC:
p.Sequence++
pkt = &rtp.Packet{
Header: rtp.Header{
Version: 2,
PayloadType: p.StreamType,
SequenceNumber: p.Sequence,
Timestamp: p.Timestamp,
},
Payload: aac.ADTStoRTP(p.Payload),
}
p.Timestamp += aac.RTPTimeSize(pkt.Payload) // update next timestamp!
case StreamTypePCMATapo:
p.Sequence++
pkt = &rtp.Packet{
Header: rtp.Header{
Version: 2,
PayloadType: p.StreamType,
SequenceNumber: p.Sequence,
Timestamp: p.Timestamp,
},
Payload: p.Payload,
}
p.Timestamp += uint32(len(p.Payload)) // update next timestamp!
}
p.Payload = nil
return
}
// PTSToTimestamp - convert PTS from 90000 to custom clock rate
//func PTSToTimestamp(pts, clockRate uint32) uint32 {
// if clockRate == 90000 {
// return pts
// }
// return uint32(uint64(pts) * uint64(clockRate) / 90000)
//}

View File

@@ -1,112 +0,0 @@
package mpegts
import (
"github.com/AlexxIT/go2rtc/pkg/aac"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/h264/annexb"
"github.com/pion/rtp"
)
const (
PacketSize = 188
SyncByte = 0x47 // Uppercase G
)
// https://en.wikipedia.org/wiki/Program-specific_information#Elementary_stream_types
const (
metadataType = 0
StreamTypePrivate = 0x06 // PCMU or PCMA or FLAC from FFmpeg
StreamTypeAAC = 0x0F
StreamTypeH264 = 0x1B
StreamTypeH265 = 0x24
StreamTypePCMATapo = 0x90
)
// PES - Packetized Elementary Stream
type PES struct {
StreamType byte
StreamID byte
Payload []byte
Size int
PTS uint32 // PTS always 90000Hz
Sequence uint16
decodeStream func([]byte) ([]byte, int)
}
func (p *PES) SetBuffer(size uint16, b []byte) {
p.Payload = make([]byte, 0, size)
p.Payload = append(p.Payload, b...)
p.Size = int(size)
}
func (p *PES) AppendBuffer(b []byte) {
p.Payload = append(p.Payload, b...)
}
func (p *PES) GetPacket() (pkt *rtp.Packet) {
switch p.StreamType {
case StreamTypeH264, StreamTypeH265:
pkt = &rtp.Packet{
Header: rtp.Header{
PayloadType: p.StreamType,
Timestamp: p.PTS,
},
Payload: annexb.EncodeToAVCC(p.Payload, false),
}
case StreamTypeAAC:
p.Sequence++
pkt = &rtp.Packet{
Header: rtp.Header{
Version: 2,
PayloadType: p.StreamType,
SequenceNumber: p.Sequence,
Timestamp: p.PTS,
},
Payload: aac.ADTStoRTP(p.Payload),
}
case StreamTypePCMATapo:
p.Sequence++
p.PTS += uint32(len(p.Payload))
pkt = &rtp.Packet{
Header: rtp.Header{
Version: 2,
PayloadType: p.StreamType,
SequenceNumber: p.Sequence,
Timestamp: p.PTS,
},
Payload: p.Payload,
}
}
p.Payload = nil
return
}
func StreamType(codec *core.Codec) uint8 {
switch codec.Name {
case core.CodecH264:
return StreamTypeH264
case core.CodecH265:
return StreamTypeH265
case core.CodecAAC:
return StreamTypeAAC
case core.CodecPCMA:
return StreamTypePCMATapo
}
return 0
}
// PTSToTimestamp - convert PTS from 90000 to custom clock rate
func PTSToTimestamp(pts, clockRate uint32) uint32 {
if clockRate == 90000 {
return pts
}
return uint32(uint64(pts) * uint64(clockRate) / 90000)
}

209
pkg/mpegts/muxer.go Normal file
View File

@@ -0,0 +1,209 @@
package mpegts
import (
"encoding/binary"
"github.com/AlexxIT/go2rtc/pkg/bits"
"github.com/AlexxIT/go2rtc/pkg/h264/annexb"
)
type Muxer struct {
pes map[uint16]*PES
}
func NewMuxer() *Muxer {
return &Muxer{
pes: map[uint16]*PES{},
}
}
func (m *Muxer) AddTrack(streamType byte) (pid uint16) {
pes := &PES{StreamType: streamType}
// Audio streams (0xC0-0xDF), Video streams (0xE0-0xEF)
switch streamType {
case StreamTypeH264, StreamTypeH265:
pes.StreamID = 0xE0
case StreamTypeAAC, StreamTypePCMATapo:
pes.StreamID = 0xC0
}
pid = startPID + 1 + uint16(len(m.pes))
m.pes[pid] = pes
return
}
func (m *Muxer) GetHeader() []byte {
bw := bits.NewWriter(nil)
m.writePAT(bw)
m.writePMT(bw)
return bw.Bytes()
}
// GetPayload - safe to run concurently with different pid
func (m *Muxer) GetPayload(pid uint16, pts uint32, payload []byte) []byte {
pes := m.pes[pid]
b := make([]byte, 14+len(payload))
_ = b[14] // bounds
b[0] = 0
b[1] = 0
b[2] = 1
b[3] = pes.StreamID
binary.BigEndian.PutUint16(b[4:], 8+uint16(len(payload)))
b[6] = 0x80 // Marker bits (binary)
b[7] = 0x80 // PTS indicator
b[8] = 5 // PES header length
WriteTime(b[9:], pts)
copy(b[14:], payload)
switch pes.StreamType {
case StreamTypeH264, StreamTypeH265:
annexb.DecodeAVCC(b[14:], false) // no need to safe clone after copy
}
pes.Payload = b
pes.Size = 1 // set PUSI in first PES
if pes.wr == nil {
pes.wr = bits.NewWriter(nil)
} else {
pes.wr.Reset()
}
for len(pes.Payload) > 0 {
m.writePES(pes.wr, pid, pes)
pes.Sequence++
pes.Size = 0
}
return pes.wr.Bytes()
}
const patPID = 0
const startPID = 0x20
func (m *Muxer) writePAT(wr *bits.Writer) {
m.writeHeader(wr, patPID)
i := wr.Len() + 1 // start for CRC32
m.writePSIHeader(wr, 0, 4)
wr.WriteUint16(1) // Program num
wr.WriteBits8(0b111, 3) // Reserved bits (all to 1)
wr.WriteBits16(startPID, 13) // Program map PID
crc := checksum(wr.Bytes()[i:])
wr.WriteBytes(byte(crc), byte(crc>>8), byte(crc>>16), byte(crc>>24)) // CRC32 (little endian)
m.WriteTail(wr)
}
func (m *Muxer) writePMT(wr *bits.Writer) {
m.writeHeader(wr, startPID)
i := wr.Len() + 1 // start for CRC32
m.writePSIHeader(wr, 2, 4+uint16(len(m.pes))*5) // 4 bytes below + 5 bytes each PES
wr.WriteBits8(0b111, 3) // Reserved bits (all to 1)
wr.WriteBits16(0x1FFF, 13) // Program map PID (not used)
wr.WriteBits8(0b1111, 4) // Reserved bits (all to 1)
wr.WriteBits8(0, 2) // Program info length unused bits (all to 0)
wr.WriteBits16(0, 10) // Program info length
for pid, pes := range m.pes {
wr.WriteByte(pes.StreamType) // Stream type
wr.WriteBits8(0b111, 3) // Reserved bits (all to 1)
wr.WriteBits16(pid, 13) // Elementary PID
wr.WriteBits8(0b1111, 4) // Reserved bits (all to 1)
wr.WriteBits(0, 2) // ES Info length unused bits
wr.WriteBits16(0, 10) // ES Info length
}
crc := checksum(wr.Bytes()[i:])
wr.WriteBytes(byte(crc), byte(crc>>8), byte(crc>>16), byte(crc>>24)) // CRC32 (little endian)
m.WriteTail(wr)
}
func (m *Muxer) writePES(wr *bits.Writer, pid uint16, pes *PES) {
const flagPUSI = 0b01000000_00000000
const flagAdaptation = 0b00100000
const flagPayload = 0b00010000
wr.WriteByte(SyncByte)
if pes.Size != 0 {
pid |= flagPUSI // Payload unit start indicator (PUSI)
}
wr.WriteUint16(pid)
counter := byte(pes.Sequence) & 0xF
if size := len(pes.Payload); size < PacketSize-4 {
wr.WriteByte(flagAdaptation | flagPayload | counter) // adaptation + payload
// for 183 payload will be zero
adSize := PacketSize - 4 - 1 - byte(size)
wr.WriteByte(adSize)
wr.WriteBytes(make([]byte, adSize)...)
wr.WriteBytes(pes.Payload...)
pes.Payload = nil
} else {
wr.WriteByte(flagPayload | counter) // only payload
wr.WriteBytes(pes.Payload[:PacketSize-4]...)
pes.Payload = pes.Payload[PacketSize-4:]
}
}
func (m *Muxer) writeHeader(wr *bits.Writer, pid uint16) {
wr.WriteByte(SyncByte)
wr.WriteBit(0) // Transport error indicator (TEI)
wr.WriteBit(1) // Payload unit start indicator (PUSI)
wr.WriteBit(0) // Transport priority
wr.WriteBits16(pid, 13) // PID
wr.WriteBits8(0, 2) // Transport scrambling control (TSC)
wr.WriteBit(0) // Adaptation field
wr.WriteBit(1) // Payload
wr.WriteBits8(0, 4) // Continuity counter
}
func (m *Muxer) writePSIHeader(wr *bits.Writer, tableID byte, size uint16) {
wr.WriteByte(0) // Pointer field
wr.WriteByte(tableID) // Table ID
wr.WriteBit(1) // Section syntax indicator
wr.WriteBit(0) // Private bit
wr.WriteBits8(0b11, 2) // Reserved bits (all to 1)
wr.WriteBits8(0, 2) // Section length unused bits (all to 0)
wr.WriteBits16(5+size+4, 10) // Section length (5 bytes below + content + 4 bytes CRC32)
wr.WriteUint16(1) // Table ID extension
wr.WriteBits8(0b11, 2) // Reserved bits (all to 1)
wr.WriteBits8(0, 5) // Version number
wr.WriteBit(1) // Current/next indicator
wr.WriteByte(0) // Section number
wr.WriteByte(0) // Last section number
}
func (m *Muxer) WriteTail(wr *bits.Writer) {
size := PacketSize - wr.Len()%PacketSize
wr.WriteBytes(make([]byte, size)...)
}
func WriteTime(b []byte, t uint32) {
_ = b[4] // bounds
const onlyPTS = 0x20
b[0] = onlyPTS | byte(t>>(32-3)) | 1
b[1] = byte(t >> (24 - 2))
b[2] = byte(t>>(16-2)) | 1
b[3] = byte(t >> (8 - 1))
b[4] = byte(t<<1) | 1 // t>>(0-1)
}

View File

@@ -1,45 +1,137 @@
package mpegts
import (
"encoding/json"
"bytes"
"io"
"time"
"github.com/AlexxIT/go2rtc/pkg/aac"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/h264"
"github.com/AlexxIT/go2rtc/pkg/h265"
)
func (c *Client) GetMedias() []*core.Media {
return c.medias
type Producer struct {
core.SuperProducer
rd *core.ReadBuffer
}
func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) {
for _, track := range c.receivers {
if track.Codec == codec {
return track, nil
func Open(rd io.Reader) (*Producer, error) {
prod := &Producer{rd: core.NewReadBuffer(rd)}
if err := prod.probe(); err != nil {
return nil, err
}
return prod, nil
}
func (c *Producer) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) {
receiver, _ := c.SuperProducer.GetTrack(media, codec)
receiver.ID = StreamType(codec)
return receiver, nil
}
func (c *Producer) Start() error {
rd := NewDemuxer()
for {
pkt, err := rd.ReadPacket(c.rd)
if err != nil {
return err
}
//log.Printf("[mpegts] size: %6d, muxer: %10d, pt: %2d", len(pkt.Payload), pkt.Timestamp, pkt.PayloadType)
for _, receiver := range c.Receivers {
if receiver.ID == pkt.PayloadType {
receiver.WriteRTP(pkt)
break
}
}
}
track := core.NewReceiver(media, codec)
track.ID = StreamType(codec)
c.receivers = append(c.receivers, track)
return track, nil
}
func (c *Client) Start() error {
return c.play()
func (c *Producer) Stop() error {
_ = c.SuperProducer.Close()
return c.rd.Close()
}
func (c *Client) Stop() error {
for _, receiver := range c.receivers {
receiver.Close()
func (c *Producer) probe() error {
c.rd.BufferSize = core.ProbeSize
defer c.rd.Reset()
rd := NewDemuxer()
// Strategy:
// 1. Wait packet with metadata, init other packets for wait
// 2. Wait other packets
// 3. Stop after timeout
waitType := []byte{StreamTypeMetadata}
timeout := time.Now().Add(core.ProbeTimeout)
for len(waitType) != 0 && time.Now().Before(timeout) {
pkt, err := rd.ReadPacket(c.rd)
if err != nil {
return err
}
// check if we wait this type
if i := bytes.IndexByte(waitType, pkt.PayloadType); i < 0 {
continue
} else {
waitType = append(waitType[:i], waitType[i+1:]...)
}
switch pkt.PayloadType {
case StreamTypeMetadata:
for _, streamType := range pkt.Payload {
switch streamType {
case StreamTypeH264, StreamTypeH265, StreamTypeAAC:
waitType = append(waitType, streamType)
}
}
case StreamTypeH264:
codec := h264.AVCCToCodec(pkt.Payload)
media := &core.Media{
Kind: core.KindVideo,
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{codec},
}
c.Medias = append(c.Medias, media)
case StreamTypeH265:
codec := h265.AVCCToCodec(pkt.Payload)
media := &core.Media{
Kind: core.KindVideo,
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{codec},
}
c.Medias = append(c.Medias, media)
case StreamTypeAAC:
codec := aac.RTPToCodec(pkt.Payload)
media := &core.Media{
Kind: core.KindAudio,
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{codec},
}
c.Medias = append(c.Medias, media)
}
}
return c.Close()
return nil
}
func (c *Client) MarshalJSON() ([]byte, error) {
info := &core.Info{
Type: "MPEG-TS active producer",
URL: c.URL,
Medias: c.medias,
Receivers: c.receivers,
Recv: c.recv,
func StreamType(codec *core.Codec) uint8 {
switch codec.Name {
case core.CodecH264:
return StreamTypeH264
case core.CodecH265:
return StreamTypeH265
case core.CodecAAC:
return StreamTypeAAC
case core.CodecPCMA:
return StreamTypePCMATapo
}
return json.Marshal(info)
return 0
}

View File

@@ -1,297 +0,0 @@
package mpegts
import (
"errors"
"io"
"github.com/pion/rtp"
)
type Reader struct {
buf [PacketSize]byte // total buf
byte byte // current byte
bits byte // bits left in byte
pos byte // current pos in buf
end byte // end position
pmtID uint16 // Program Map Table (PMT) PID
pes map[uint16]*PES
}
func NewReader() *Reader {
return &Reader{}
}
const skipRead = 0xFF
func (r *Reader) ReadPacket(rd io.Reader) (*rtp.Packet, error) {
for {
if r.pos != skipRead {
if _, err := io.ReadFull(rd, r.buf[:]); err != nil {
return nil, err
}
}
pid, start, err := r.readPacketHeader()
if err != nil {
return nil, err
}
if r.pes == nil {
switch pid {
case 0: // PAT ID
r.readPAT() // PAT: Program Association Table
case r.pmtID:
r.readPMT() // PMT : Program Map Table
pkt := &rtp.Packet{
Payload: make([]byte, 0, len(r.pes)),
}
for _, pes := range r.pes {
pkt.Payload = append(pkt.Payload, pes.StreamType)
}
return pkt, nil
}
continue
}
if pkt := r.readPES(pid, start); pkt != nil {
return pkt, nil
}
}
}
func (r *Reader) readPacketHeader() (pid uint16, start bool, err error) {
r.reset()
sb := r.readByte() // Sync byte
if sb != SyncByte {
return 0, false, errors.New("mpegts: wrong sync byte")
}
_ = r.readBit() // Transport error indicator (TEI)
pusi := r.readBit() // Payload unit start indicator (PUSI)
_ = r.readBit() // Transport priority
pid = r.readBits16(13) // PID
_ = r.readBits(2) // Transport scrambling control (TSC)
af := r.readBit() // Adaptation field
_ = r.readBit() // Payload
_ = r.readBits(4) // Continuity counter
if af != 0 {
adSize := r.readByte() // Adaptation field length
if adSize > PacketSize-6 {
return 0, false, errors.New("mpegts: wrong adaptation size")
}
r.skip(adSize)
}
return pid, pusi != 0, nil
}
func (r *Reader) skip(i byte) {
r.pos += i
}
func (r *Reader) readPSIHeader() {
// https://en.wikipedia.org/wiki/Program-specific_information#Table_Sections
pointer := r.readByte() // Pointer field
r.skip(pointer) // Pointer filler bytes
_ = r.readByte() // Table ID
_ = r.readBit() // Section syntax indicator
_ = r.readBit() // Private bit
_ = r.readBits(2) // Reserved bits
_ = r.readBits(2) // Section length unused bits
size := r.readBits(10) // Section length
r.setSize(byte(size))
_ = r.readBits(16) // Table ID extension
_ = r.readBits(2) // Reserved bits
_ = r.readBits(5) // Version number
_ = r.readBit() // Current/next indicator
_ = r.readByte() // Section number
_ = r.readByte() // Last section number
}
// ReadPAT (Program Association Table)
func (r *Reader) readPAT() {
// https://en.wikipedia.org/wiki/Program-specific_information#PAT_(Program_Association_Table)
r.readPSIHeader()
const CRCSize = 4
for r.left() > CRCSize {
num := r.readBits(16) // Program num
_ = r.readBits(3) // Reserved bits
pid := r.readBits16(13) // Program map PID
if num != 0 {
r.pmtID = pid
}
}
r.skip(4) // CRC32
}
// ReadPMT (Program map specific data)
func (r *Reader) readPMT() {
// https://en.wikipedia.org/wiki/Program-specific_information#PMT_(Program_map_specific_data)
r.readPSIHeader()
_ = r.readBits(3) // Reserved bits
_ = r.readBits(13) // PCR PID
_ = r.readBits(4) // Reserved bits
_ = r.readBits(2) // Program info length unused bits
size := r.readBits(10) // Program info length
r.skip(byte(size))
r.pes = map[uint16]*PES{}
const CRCSize = 4
for r.left() > CRCSize {
streamType := r.readByte() // Stream type
_ = r.readBits(3) // Reserved bits
pid := r.readBits16(13) // Elementary PID
_ = r.readBits(4) // Reserved bits
_ = r.readBits(2) // ES Info length unused bits
size = r.readBits(10) // ES Info length
r.skip(byte(size))
r.pes[pid] = &PES{StreamType: streamType}
}
r.skip(4) // CRC32
}
func (r *Reader) readPES(pid uint16, start bool) *rtp.Packet {
pes := r.pes[pid]
if pes == nil {
return nil
}
// if new payload beging
if start {
if pes.Payload != nil {
r.pos = skipRead
return pes.GetPacket() // finish previous packet
}
// https://en.wikipedia.org/wiki/Packetized_elementary_stream
// Packet start code prefix
if r.readByte() != 0 || r.readByte() != 0 || r.readByte() != 1 {
return nil
}
pes.StreamID = r.readByte() // Stream id
packetSize := r.readBits16(16) // PES Packet length
_ = r.readBits(2) // Marker bits
_ = r.readBits(2) // Scrambling control
_ = r.readBit() // Priority
_ = r.readBit() // Data alignment indicator
_ = r.readBit() // Copyright
_ = r.readBit() // Original or Copy
pts := r.readBit() // PTS indicator
_ = r.readBit() // DTS indicator
_ = r.readBit() // ESCR flag
_ = r.readBit() // ES rate flag
_ = r.readBit() // DSM trick mode flag
_ = r.readBit() // Additional copy info flag
_ = r.readBit() // CRC flag
_ = r.readBit() // extension flag
headerSize := r.readByte() // PES header length
//log.Printf("[mpegts] pes=%d size=%d header=%d", pes.StreamID, packetSize, headerSize)
if packetSize != 0 {
packetSize -= uint16(3 + headerSize)
}
if pts != 0 {
pes.PTS = r.readTime()
headerSize -= 5
}
r.skip(headerSize)
pes.SetBuffer(packetSize, r.bytes())
} else {
pes.AppendBuffer(r.bytes())
}
if pes.Size != 0 && len(pes.Payload) >= pes.Size {
return pes.GetPacket() // finish current packet
}
return nil
}
func (r *Reader) reset() {
r.pos = 0
r.end = PacketSize
r.bits = 0
}
//goland:noinspection GoStandardMethods
func (r *Reader) readByte() byte {
if r.bits != 0 {
return byte(r.readBits(8))
}
b := r.buf[r.pos]
r.pos++
return b
}
func (r *Reader) readBit() byte {
if r.bits == 0 {
r.byte = r.readByte()
r.bits = 7
} else {
r.bits--
}
return (r.byte >> r.bits) & 0b1
}
func (r *Reader) readBits(n byte) (res uint32) {
for i := n - 1; i != 255; i-- {
res |= uint32(r.readBit()) << i
}
return
}
func (r *Reader) readBits16(n byte) (res uint16) {
for i := n - 1; i != 255; i-- {
res |= uint16(r.readBit()) << i
}
return
}
func (r *Reader) readTime() uint32 {
// https://en.wikipedia.org/wiki/Packetized_elementary_stream
// xxxxAAAx BBBBBBBB BBBBBBBx CCCCCCCC CCCCCCCx
_ = r.readBits(4) // 0010b or 0011b or 0001b
ts := r.readBits(3) << 30
_ = r.readBits(1) // 1b
ts |= r.readBits(15) << 15
_ = r.readBits(1) // 1b
ts |= r.readBits(15)
_ = r.readBits(1) // 1b
return ts
}
func (r *Reader) bytes() []byte {
return r.buf[r.pos:PacketSize]
}
func (r *Reader) left() byte {
return r.end - r.pos
}
func (r *Reader) setSize(size byte) {
r.end = r.pos + size
}

View File

@@ -1,225 +0,0 @@
package mpegts
import (
"bytes"
"encoding/hex"
"encoding/json"
"time"
"github.com/AlexxIT/go2rtc/pkg/aac"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/h264"
"github.com/deepch/vdk/av"
"github.com/deepch/vdk/codec/aacparser"
"github.com/deepch/vdk/codec/h264parser"
"github.com/deepch/vdk/format/ts"
"github.com/pion/rtp"
)
type Consumer struct {
core.Listener
UserAgent string
RemoteAddr string
senders []*core.Sender
buf *bytes.Buffer
muxer *ts.Muxer
mimeType string
streams []av.CodecData
start bool
init []byte
send int
}
func (c *Consumer) GetMedias() []*core.Media {
return []*core.Media{
{
Kind: core.KindVideo,
Direction: core.DirectionSendonly,
Codecs: []*core.Codec{
{Name: core.CodecH264},
},
},
//{
// Kind: core.KindAudio,
// Direction: core.DirectionSendonly,
// Codecs: []*core.Codec{
// {Name: core.CodecAAC},
// },
//},
}
}
func (c *Consumer) AddTrack(media *core.Media, _ *core.Codec, track *core.Receiver) error {
trackID := int8(len(c.streams))
handler := core.NewSender(media, track.Codec)
switch track.Codec.Name {
case core.CodecH264:
sps, pps := h264.GetParameterSet(track.Codec.FmtpLine)
// some dummy SPS and PPS not a problem
if len(sps) == 0 {
sps = []byte{0x67, 0x42, 0x00, 0x0a, 0xf8, 0x41, 0xa2}
}
if len(pps) == 0 {
pps = []byte{0x68, 0xce, 0x38, 0x80}
}
stream, err := h264parser.NewCodecDataFromSPSAndPPS(sps, pps)
if err != nil {
return nil
}
if len(c.mimeType) > 0 {
c.mimeType += ","
}
c.mimeType += "avc1." + h264.GetProfileLevelID(track.Codec.FmtpLine)
c.streams = append(c.streams, stream)
pkt := av.Packet{Idx: trackID, CompositionTime: time.Millisecond}
ts2time := time.Second / time.Duration(track.Codec.ClockRate)
handler.Handler = func(packet *rtp.Packet) {
if packet.Version != h264.RTPPacketVersionAVC {
return
}
if !c.start {
return
}
pkt.Data = packet.Payload
newTime := time.Duration(packet.Timestamp) * ts2time
if pkt.Time > 0 {
pkt.Duration = newTime - pkt.Time
}
pkt.Time = newTime
if err = c.muxer.WritePacket(pkt); err != nil {
return
}
// clone bytes from buffer, so next packet won't overwrite it
buf := append([]byte{}, c.buf.Bytes()...)
c.Fire(buf)
c.send += len(buf)
c.buf.Reset()
}
if track.Codec.IsRTP() {
handler.Handler = h264.RTPDepay(track.Codec, handler.Handler)
} else {
handler.Handler = h264.RepairAVCC(track.Codec, handler.Handler)
}
case core.CodecAAC:
s := core.Between(track.Codec.FmtpLine, "config=", ";")
b, err := hex.DecodeString(s)
if err != nil {
return nil
}
stream, err := aacparser.NewCodecDataFromMPEG4AudioConfigBytes(b)
if err != nil {
return nil
}
if len(c.mimeType) > 0 {
c.mimeType += ","
}
c.mimeType += "mp4a.40.2"
c.streams = append(c.streams, stream)
pkt := av.Packet{Idx: trackID, CompositionTime: time.Millisecond}
ts2time := time.Second / time.Duration(track.Codec.ClockRate)
handler.Handler = func(packet *rtp.Packet) {
if !c.start {
return
}
pkt.Data = packet.Payload
newTime := time.Duration(packet.Timestamp) * ts2time
if pkt.Time > 0 {
pkt.Duration = newTime - pkt.Time
}
pkt.Time = newTime
if err = c.muxer.WritePacket(pkt); err != nil {
return
}
// clone bytes from buffer, so next packet won't overwrite it
buf := append([]byte{}, c.buf.Bytes()...)
c.Fire(buf)
c.send += len(buf)
c.buf.Reset()
}
if track.Codec.IsRTP() {
handler.Handler = aac.RTPDepay(handler.Handler)
}
default:
panic("unsupported codec")
}
handler.HandleRTP(track)
c.senders = append(c.senders, handler)
return nil
}
func (c *Consumer) MimeCodecs() string {
return c.mimeType
}
func (c *Consumer) Init() ([]byte, error) {
c.buf = bytes.NewBuffer(nil)
c.muxer = ts.NewMuxer(c.buf)
// first packet will be with header, it's ok
if err := c.muxer.WriteHeader(c.streams); err != nil {
return nil, err
}
data := append([]byte{}, c.buf.Bytes()...)
return data, nil
}
func (c *Consumer) Start() {
c.start = true
}
func (c *Consumer) Stop() error {
for _, sender := range c.senders {
sender.Close()
}
return nil
}
func (c *Consumer) MarshalJSON() ([]byte, error) {
info := &core.Info{
Type: "TS passive consumer",
RemoteAddr: c.RemoteAddr,
UserAgent: c.UserAgent,
Medias: c.GetMedias(),
Senders: c.senders,
Send: c.send,
}
return json.Marshal(info)
}

View File

@@ -1,219 +0,0 @@
package mpegts
type Writer struct {
b []byte // packets buffer
m int // crc start
pid []uint16
counter []byte
streamType []byte
timestamp []uint32
}
func NewWriter() *Writer {
return &Writer{}
}
func (w *Writer) AddPES(pid uint16, streamType byte) {
w.pid = append(w.pid, pid)
w.streamType = append(w.streamType, streamType)
w.counter = append(w.counter, 0)
w.timestamp = append(w.timestamp, 0)
}
func (w *Writer) WriteByte(b byte) {
w.b = append(w.b, b)
}
func (w *Writer) WriteUint16(i uint16) {
w.b = append(w.b, byte(i>>8), byte(i))
}
func (w *Writer) WriteTime(t uint32) {
const onlyPTS = 0x20
// [>>32 <<3] [>>24 <<2] [>>16 <<2] [>>8 <<1] [<<1]
w.b = append(w.b, onlyPTS|byte(t>>29)|1, byte(t>>22), byte(t>>14)|1, byte(t>>7), byte(t<<1)|1)
}
func (w *Writer) WriteBytes(b []byte) {
w.b = append(w.b, b...)
}
func (w *Writer) MarkChecksum() {
w.m = len(w.b)
}
func (w *Writer) WriteChecksum() {
crc := checksum(w.b[w.m:])
w.b = append(w.b, byte(crc), byte(crc>>8), byte(crc>>16), byte(crc>>24))
}
func (w *Writer) FinishPacket() {
if n := len(w.b) % PacketSize; n != 0 {
w.b = append(w.b, make([]byte, PacketSize-n)...)
}
}
func (w *Writer) Bytes() []byte {
if len(w.b)%PacketSize != 0 {
panic("wrong packet size")
}
return w.b
}
func (w *Writer) Reset() {
w.b = nil
}
const isUnitStart = 0x4000
const flagHasAdaptation = 0x20
const flagHasPayload = 0x10
const lenIsProgramTable = 0xB000
const tableFlags = 0xC1
const tableHeader = 0xE000
const tableLength = 0xF000
const patPID = 0
const patTableID = 0
const patTableExtID = 1
func (w *Writer) WritePAT() {
w.WriteByte(SyncByte)
w.WriteUint16(isUnitStart | patPID) // PAT PID
w.WriteByte(flagHasPayload) // flags...
w.WriteByte(0) // Pointer field
w.MarkChecksum()
w.WriteByte(patTableID) // Table ID
w.WriteUint16(lenIsProgramTable | 13) // Section length
w.WriteUint16(patTableExtID) // Table ID extension
w.WriteByte(tableFlags) // flags...
w.WriteByte(0) // Section number
w.WriteByte(0) // Last section number
w.WriteUint16(1) // Program num (usual 1)
w.WriteUint16(tableHeader + pmtPID)
w.WriteChecksum()
w.FinishPacket()
}
const pmtPID = 18
const pmtTableID = 2
const pmtTableExtID = 1
func (w *Writer) WritePMT() {
w.WriteByte(SyncByte)
w.WriteUint16(isUnitStart | pmtPID) // PMT PID
w.WriteByte(flagHasPayload) // flags...
w.WriteByte(0) // Pointer field
tableLen := 13 + uint16(len(w.pid))*5
w.MarkChecksum()
w.WriteByte(pmtTableID) // Table ID
w.WriteUint16(lenIsProgramTable | tableLen) // Section length
w.WriteUint16(pmtTableExtID) // Table ID extension
w.WriteByte(tableFlags) // flags...
w.WriteByte(0) // Section number
w.WriteByte(0) // Last section number
w.WriteUint16(tableHeader | w.pid[0]) // PID
w.WriteUint16(tableLength | 0) // Info length
for i, pid := range w.pid {
w.WriteByte(w.streamType[i])
w.WriteUint16(tableHeader | pid) // PID
w.WriteUint16(tableLength | 0) // Info len
}
w.WriteChecksum()
w.FinishPacket()
}
const pesHeaderSize = PacketSize - 18
func (w *Writer) WritePES(pid uint16, streamID byte, payload []byte) {
w.WriteByte(SyncByte)
w.WriteUint16(isUnitStart | pid)
// check if payload lower then max first packet size
if len(payload) < PacketSize-18 {
w.WriteByte(flagHasAdaptation | flagHasPayload)
// for 183 payload will be zero
adSize := PacketSize - 18 - 1 - byte(len(payload))
w.WriteByte(adSize)
w.WriteBytes(make([]byte, adSize))
} else {
w.WriteByte(flagHasPayload)
}
w.WriteByte(0)
w.WriteByte(0)
w.WriteByte(1)
w.WriteByte(streamID)
w.WriteUint16(uint16(8 + len(payload)))
w.WriteByte(0x80)
w.WriteByte(0x80) // only PTS
w.WriteByte(5) // optional size
switch w.streamType[0] {
case StreamTypePCMATapo:
w.timestamp[0] += uint32(len(payload) * 45 / 8)
}
w.WriteTime(w.timestamp[0])
if len(payload) < PacketSize-18 {
w.WriteBytes(payload)
return
}
w.WriteBytes(payload[:pesHeaderSize])
payload = payload[pesHeaderSize:]
var counter byte
for {
counter++
if len(payload) > PacketSize-4 {
// payload more then maximum size
w.WriteByte(SyncByte)
w.WriteUint16(pid)
w.WriteByte(flagHasPayload | counter&0xF)
w.WriteBytes(payload[:PacketSize-4])
payload = payload[PacketSize-4:]
} else if len(payload) == PacketSize-4 {
// payload equal maximum size (last packet)
w.WriteByte(SyncByte)
w.WriteUint16(pid)
w.WriteByte(flagHasPayload | counter&0xF)
w.WriteBytes(payload)
break
} else {
// payload lower than maximum size (last packet)
w.WriteByte(SyncByte)
w.WriteUint16(pid)
w.WriteByte(flagHasAdaptation | flagHasPayload | counter&0xF)
// for 183 payload will be zero
adSize := PacketSize - 4 - 1 - byte(len(payload))
w.WriteByte(adSize)
w.WriteBytes(make([]byte, adSize))
w.WriteBytes(payload)
break
}
}
}

View File

@@ -145,11 +145,11 @@ func (c *Client) SetupStream() (err error) {
// Handle - first run will be in probe state
func (c *Client) Handle() error {
multipartRd := multipart.NewReader(c.conn1, "--device-stream-boundary--")
mpegtsRd := mpegts.NewReader()
rd := multipart.NewReader(c.conn1, "--device-stream-boundary--")
demux := mpegts.NewDemuxer()
for {
p, err := multipartRd.NextRawPart()
p, err := rd.NextRawPart()
if err != nil {
return err
}
@@ -181,7 +181,7 @@ func (c *Client) Handle() error {
bytesRd := bytes.NewReader(body)
for {
pkt, err2 := mpegtsRd.ReadPacket(bytesRd)
pkt, err2 := demux.ReadPacket(bytesRd)
if pkt == nil || err2 == io.EOF {
break
}

View File

@@ -2,10 +2,11 @@ package tapo
import (
"bytes"
"strconv"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/mpegts"
"github.com/pion/rtp"
"strconv"
)
func (c *Client) AddTrack(media *core.Media, _ *core.Codec, track *core.Receiver) error {
@@ -14,17 +15,16 @@ func (c *Client) AddTrack(media *core.Media, _ *core.Codec, track *core.Receiver
return nil
}
w := mpegts.NewWriter()
w.AddPES(68, mpegts.StreamTypePCMATapo)
w.WritePAT()
w.WritePMT()
muxer := mpegts.NewMuxer()
pid := muxer.AddTrack(mpegts.StreamTypePCMATapo)
if err := c.WriteBackchannel(muxer.GetHeader()); err != nil {
return err
}
c.sender = core.NewSender(media, track.Codec)
c.sender.Handler = func(packet *rtp.Packet) {
// don't know why 68 and 192
w.WritePES(68, 192, packet.Payload)
_ = c.WriteBackchannel(w.Bytes())
w.Reset()
b := muxer.GetPayload(pid, packet.Timestamp, packet.Payload)
_ = c.WriteBackchannel(b)
}
}