Files
monibuca/pkg/format/ts/psi.go
langhuihui 8a9fffb987 refactor: frame converter and mp4 track improvements
- Refactor frame converter implementation
- Update mp4 track to use ICodex
- General refactoring and code improvements

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-28 19:55:37 +08:00

239 lines
6.8 KiB
Go

package mpegts
import (
"bytes"
"errors"
"fmt"
"io"
"m7s.live/v5/pkg/util"
)
//
// PSI
//
const (
PSI_TYPE_PAT = 1
PSI_TYPE_PMT = 2
PSI_TYPE_NIT = 3
PSI_TYPE_CAT = 4
PSI_TYPE_TST = 5
PSI_TYPE_IPMP_CIT = 6
)
type MpegTsPSI struct {
// PAT
// PMT
// CAT
// NIT
Pat MpegTsPAT
Pmt MpegTsPMT
}
// 当传输流包有效载荷包含 PSI 数据时,payload_unit_start_indicator 具有以下意义:
// 若传输流包承载 PSI分段的首字节,则 payload_unit_start_indicator 值必为 1,指示此传输流包的有效载荷的首字节承载pointer_field.
// 若传输流包不承载 PSI 分段的首字节,则 payload_unit_start_indicator 值必为 0,指示在此有效载荷中不存在 pointer_field
// 只要是PSI就一定会有pointer_field
func ReadPSI(r io.Reader, pt uint32) (lr *io.LimitedReader, psi MpegTsPSI, err error) {
// pointer field(8)
cr, ok := r.(*util.Crc32Reader)
if ok {
r = cr.R
}
pointer_field, err := util.ReadByteToUint8(r)
if err != nil {
return
}
if pointer_field != 0 {
// 无论如何都应该确保能将pointer_field读取到,并且io.Reader指针向下移动
// ioutil.Discard常用在,http中,如果Get请求,获取到了很大的Body,要丢弃Body,就用这个方法.
// 因为http默认重链接的时候,必须等body读取完成.
// 用于发送需要读取但不想存储的数据,目的是耗尽读取端的数据
if _, err = io.CopyN(io.Discard, r, int64(pointer_field)); err != nil {
return
}
}
if ok {
r = cr
}
// table id(8)
tableId, err := util.ReadByteToUint8(r)
if err != nil {
return
}
// sectionSyntaxIndicator(1) + zero(1) + reserved1(2) + sectionLength(12)
// sectionLength 前两个字节固定为00
sectionSyntaxIndicatorAndSectionLength, err := util.ReadByteToUint16(r, true)
if err != nil {
return
}
// 指定该分段的字节数,紧随 section_length 字段开始,并包括 CRC
// 因此剩下最多只能在读sectionLength长度的字节
lr = &io.LimitedReader{R: r, N: int64(sectionSyntaxIndicatorAndSectionLength & 0x3FF)}
// PAT TransportStreamID(16) or PMT ProgramNumber(16)
transportStreamIdOrProgramNumber, err := util.ReadByteToUint16(lr, true)
if err != nil {
return
}
// reserved2(2) + versionNumber(5) + currentNextIndicator(1)
versionNumberAndCurrentNextIndicator, err := util.ReadByteToUint8(lr)
if err != nil {
return
}
// sectionNumber(8)
sectionNumber, err := util.ReadByteToUint8(lr)
if err != nil {
return
}
// lastSectionNumber(8)
lastSectionNumber, err := util.ReadByteToUint8(lr)
if err != nil {
return
}
// 因为lr.N是从sectionLength开始计算,所以要减去 pointer_field(8) + table id(8) + sectionSyntaxIndicator(1) + zero(1) + reserved1(2) + sectionLength(12)
lr.N -= 4
switch pt {
case PSI_TYPE_PAT:
{
if tableId != TABLE_PAS {
err = errors.New(fmt.Sprintf("%s, id=%d", "read pmt table id != 2", tableId))
return
}
psi.Pat.TableID = tableId
psi.Pat.SectionSyntaxIndicator = uint8((sectionSyntaxIndicatorAndSectionLength & 0x8000) >> 15)
psi.Pat.SectionLength = sectionSyntaxIndicatorAndSectionLength & 0x3FF
psi.Pat.TransportStreamID = transportStreamIdOrProgramNumber
psi.Pat.VersionNumber = versionNumberAndCurrentNextIndicator & 0x3e
psi.Pat.CurrentNextIndicator = versionNumberAndCurrentNextIndicator & 0x01
psi.Pat.SectionNumber = sectionNumber
psi.Pat.LastSectionNumber = lastSectionNumber
}
case PSI_TYPE_PMT:
{
if tableId != TABLE_TSPMS {
err = errors.New(fmt.Sprintf("%s, id=%d", "read pmt table id != 2", tableId))
return
}
psi.Pmt.TableID = tableId
psi.Pmt.SectionSyntaxIndicator = uint8((sectionSyntaxIndicatorAndSectionLength & 0x8000) >> 15)
psi.Pmt.SectionLength = sectionSyntaxIndicatorAndSectionLength & 0x3FF
psi.Pmt.ProgramNumber = transportStreamIdOrProgramNumber
psi.Pmt.VersionNumber = versionNumberAndCurrentNextIndicator & 0x3e
psi.Pmt.CurrentNextIndicator = versionNumberAndCurrentNextIndicator & 0x01
psi.Pmt.SectionNumber = sectionNumber
psi.Pmt.LastSectionNumber = lastSectionNumber
}
}
return
}
func WritePSI(w io.Writer, pt uint32, psi MpegTsPSI, data []byte) (err error) {
var tableId, versionNumberAndCurrentNextIndicator, sectionNumber, lastSectionNumber uint8
var sectionSyntaxIndicatorAndSectionLength, transportStreamIdOrProgramNumber uint16
switch pt {
case PSI_TYPE_PAT:
{
if psi.Pat.TableID != TABLE_PAS {
err = errors.New(fmt.Sprintf("%s, id=%d", "write pmt table id != 0", tableId))
return
}
tableId = psi.Pat.TableID
sectionSyntaxIndicatorAndSectionLength = uint16(psi.Pat.SectionSyntaxIndicator)<<15 | 3<<12 | psi.Pat.SectionLength
transportStreamIdOrProgramNumber = psi.Pat.TransportStreamID
versionNumberAndCurrentNextIndicator = psi.Pat.VersionNumber<<1 | psi.Pat.CurrentNextIndicator
sectionNumber = psi.Pat.SectionNumber
lastSectionNumber = psi.Pat.LastSectionNumber
}
case PSI_TYPE_PMT:
{
if psi.Pmt.TableID != TABLE_TSPMS {
err = errors.New(fmt.Sprintf("%s, id=%d", "write pmt table id != 2", tableId))
return
}
tableId = psi.Pmt.TableID
sectionSyntaxIndicatorAndSectionLength = uint16(psi.Pmt.SectionSyntaxIndicator)<<15 | 3<<12 | psi.Pmt.SectionLength
transportStreamIdOrProgramNumber = psi.Pmt.ProgramNumber
versionNumberAndCurrentNextIndicator = psi.Pmt.VersionNumber<<1 | psi.Pmt.CurrentNextIndicator
sectionNumber = psi.Pmt.SectionNumber
lastSectionNumber = psi.Pmt.LastSectionNumber
}
}
// pointer field(8)
if err = util.WriteUint8ToByte(w, 0); err != nil {
return
}
// 使用buffer收集所有需要计算CRC32的数据
bw := &bytes.Buffer{}
// table id(8)
if err = util.WriteUint8ToByte(bw, tableId); err != nil {
return
}
// sectionSyntaxIndicator(1) + zero(1) + reserved1(2) + sectionLength(12)
// sectionLength 前两个字节固定为00
// 1 0 11 sectionLength
if err = util.WriteUint16ToByte(bw, sectionSyntaxIndicatorAndSectionLength, true); err != nil {
return
}
// PAT TransportStreamID(16) or PMT ProgramNumber(16)
if err = util.WriteUint16ToByte(bw, transportStreamIdOrProgramNumber, true); err != nil {
return
}
// reserved2(2) + versionNumber(5) + currentNextIndicator(1)
// 0x3 << 6 -> 1100 0000
// 0x3 << 6 | 1 -> 1100 0001
if err = util.WriteUint8ToByte(bw, versionNumberAndCurrentNextIndicator); err != nil {
return
}
// sectionNumber(8)
if err = util.WriteUint8ToByte(bw, sectionNumber); err != nil {
return
}
// lastSectionNumber(8)
if err = util.WriteUint8ToByte(bw, lastSectionNumber); err != nil {
return
}
// data
if _, err = bw.Write(data); err != nil {
return
}
// 写入PSI数据
if _, err = w.Write(bw.Bytes()); err != nil {
return
}
// 使用MPEG-TS CRC32算法计算CRC32
crc32 := GetCRC32(bw.Bytes())
if err = util.WriteUint32ToByte(w, crc32, true); err != nil {
return
}
return
}