mirror of
https://github.com/Monibuca/engine.git
synced 2025-10-07 01:22:51 +08:00
rtp补完
This commit is contained in:
@@ -17,10 +17,41 @@ import (
|
||||
// Macroblock layer1 -> mb_type + PCM Data
|
||||
// Macroblock layer2 -> mb_type + Sub_mb_pred or mb_pred + Residual Data
|
||||
// Residual Data ->
|
||||
type H264NALUType byte
|
||||
|
||||
func (b H264NALUType) Or(b2 byte) byte {
|
||||
return byte(b) | b2
|
||||
}
|
||||
|
||||
func (b H264NALUType) Offset() int {
|
||||
switch b {
|
||||
case NALU_STAPA:
|
||||
return 1
|
||||
case NALU_STAPB:
|
||||
return 3
|
||||
case NALU_FUA:
|
||||
return 2
|
||||
case NALU_FUB:
|
||||
return 4
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (b H264NALUType) Byte() byte {
|
||||
return byte(b)
|
||||
}
|
||||
|
||||
func (H264NALUType) Parse(b byte) H264NALUType {
|
||||
return H264NALUType(b & 0x1F)
|
||||
}
|
||||
|
||||
func (H264NALUType) ParseBytes(bs []byte) H264NALUType {
|
||||
return H264NALUType(bs[0] & 0x1F)
|
||||
}
|
||||
|
||||
const (
|
||||
// NALU Type
|
||||
NALU_Unspecified byte = iota
|
||||
NALU_Unspecified H264NALUType = iota
|
||||
NALU_Non_IDR_Picture // 1
|
||||
NALU_Data_Partition_A // 2
|
||||
NALU_Data_Partition_B // 3
|
||||
|
@@ -8,6 +8,12 @@ import (
|
||||
"github.com/q191201771/naza/pkg/nazabits"
|
||||
)
|
||||
|
||||
type H265NALUType byte
|
||||
|
||||
func (H265NALUType) Parse(b byte) H265NALUType {
|
||||
return H265NALUType(b & 0x7E >> 1)
|
||||
}
|
||||
|
||||
const (
|
||||
// HEVC_VPS = 0x40
|
||||
// HEVC_SPS = 0x42
|
||||
@@ -16,7 +22,7 @@ const (
|
||||
// HEVC_IDR = 0x26
|
||||
// HEVC_PSLICE = 0x02
|
||||
|
||||
NAL_UNIT_CODED_SLICE_TRAIL_N byte = iota // 0
|
||||
NAL_UNIT_CODED_SLICE_TRAIL_N H265NALUType = iota // 0
|
||||
NAL_UNIT_CODED_SLICE_TRAIL_R // 1
|
||||
NAL_UNIT_CODED_SLICE_TSA_N // 2
|
||||
NAL_UNIT_CODED_SLICE_TLA // 3 // Current name in the spec: TSA_R
|
||||
@@ -64,8 +70,8 @@ const (
|
||||
NAL_UNIT_RESERVED_45
|
||||
NAL_UNIT_RESERVED_46
|
||||
NAL_UNIT_RESERVED_47
|
||||
NAL_UNIT_UNSPECIFIED_48
|
||||
NAL_UNIT_UNSPECIFIED_49
|
||||
NAL_UNIT_RTP_AP
|
||||
NAL_UNIT_RTP_FU
|
||||
NAL_UNIT_UNSPECIFIED_50
|
||||
NAL_UNIT_UNSPECIFIED_51
|
||||
NAL_UNIT_UNSPECIFIED_52
|
||||
|
@@ -4,10 +4,11 @@ import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/Monibuca/engine/v4/util"
|
||||
"github.com/Monibuca/engine/v4/codec"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
|
||||
"github.com/Monibuca/engine/v4/codec"
|
||||
"github.com/Monibuca/engine/v4/util"
|
||||
)
|
||||
|
||||
// ios13818-1-CN.pdf 45/166
|
||||
@@ -582,7 +583,7 @@ func CheckPESPacketIsKeyFrame(packet MpegTsPESPacket) bool {
|
||||
nalus := bytes.SplitN(packet.Payload, codec.NALU_Delimiter1, -1)
|
||||
|
||||
for _, v := range nalus {
|
||||
if v[0]&0x1f == codec.NALU_IDR_Picture {
|
||||
if codec.H264NALUType.ParseBytes(codec.NALU_IDR_Picture, v) == codec.NALU_IDR_Picture {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
@@ -6,6 +6,7 @@ import (
|
||||
|
||||
"github.com/Monibuca/engine/v4/codec"
|
||||
"github.com/pion/rtp"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type NALUSlice net.Buffers
|
||||
@@ -29,11 +30,31 @@ type RawSlice interface {
|
||||
// func (nalu *H264NALU) Append(slice ...NALUSlice) {
|
||||
// *nalu = append(*nalu, slice...)
|
||||
// }
|
||||
func (nalu NALUSlice) H264Type() byte {
|
||||
return nalu[0][0] & 0x1F
|
||||
func (nalu NALUSlice) H264Type() (naluType codec.H264NALUType) {
|
||||
return naluType.Parse(nalu[0][0])
|
||||
}
|
||||
func (nalu NALUSlice) H265Type() byte {
|
||||
return nalu[0][0] & 0x7E >> 1
|
||||
func (nalu NALUSlice) RefIdc() byte {
|
||||
return nalu[0][0] & 0x60
|
||||
}
|
||||
func (nalu NALUSlice) H265Type() (naluType codec.H265NALUType) {
|
||||
return naluType.Parse(nalu[0][0])
|
||||
}
|
||||
func (nalu NALUSlice) Bytes() (b []byte) {
|
||||
for _, slice := range nalu {
|
||||
b = append(b, slice...)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (nalu *NALUSlice) Reset() *NALUSlice{
|
||||
if len(*nalu) > 0 {
|
||||
*nalu = (*nalu)[:0]
|
||||
}
|
||||
return nalu
|
||||
}
|
||||
|
||||
func (nalu *NALUSlice) Append(b ...[]byte) {
|
||||
*nalu = append(*nalu, b...)
|
||||
}
|
||||
|
||||
// func (nalu *H265NALU) Append(slice ...NALUSlice) {
|
||||
@@ -54,6 +75,31 @@ func (nalu NALUSlice) H265Type() byte {
|
||||
|
||||
type AVCCFrame []byte // 一帧AVCC格式的数据
|
||||
type AnnexBFrame []byte // 一帧AnnexB格式数据
|
||||
type RTPFrame struct {
|
||||
rtp.Packet
|
||||
Raw []byte // 序列化后的数据,避免反复序列化
|
||||
}
|
||||
|
||||
func (rtp *RTPFrame) H264Type() (naluType codec.H264NALUType) {
|
||||
return naluType.Parse(rtp.Payload[0])
|
||||
}
|
||||
func (rtp *RTPFrame) H265Type() (naluType codec.H265NALUType) {
|
||||
return naluType.Parse(rtp.Payload[0])
|
||||
}
|
||||
func (rtp *RTPFrame) Marshal() *RTPFrame {
|
||||
rtp.Raw, _ = rtp.Packet.Marshal()
|
||||
return rtp
|
||||
}
|
||||
|
||||
func (rtp *RTPFrame) Unmarshal(raw []byte) *RTPFrame {
|
||||
rtp.Raw = raw
|
||||
if err := rtp.Packet.Unmarshal(raw); err != nil {
|
||||
logrus.Errorln(err)
|
||||
return nil
|
||||
}
|
||||
return rtp
|
||||
}
|
||||
|
||||
type BaseFrame struct {
|
||||
DeltaTime uint32 // 相对上一帧时间戳,毫秒
|
||||
SeqInStream uint32 //在一个流中的总序号
|
||||
@@ -73,8 +119,7 @@ type AVFrame[T RawSlice] struct {
|
||||
DTS uint32
|
||||
FLV net.Buffers // 打包好的FLV Tag
|
||||
AVCC net.Buffers // 打包好的AVCC格式
|
||||
RTP net.Buffers // 打包好的RTP格式
|
||||
RTPPackets []rtp.Packet
|
||||
RTP []RTPFrame
|
||||
Raw []T //裸数据
|
||||
canRead bool
|
||||
}
|
||||
@@ -89,18 +134,14 @@ func (av *AVFrame[T]) FillFLV(t byte, ts uint32) {
|
||||
func (av *AVFrame[T]) AppendAVCC(avcc ...[]byte) {
|
||||
av.AVCC = append(av.AVCC, avcc...)
|
||||
}
|
||||
func (av *AVFrame[T]) AppendRTP(rtp []byte) {
|
||||
av.RTP = append(av.RTP, rtp)
|
||||
}
|
||||
func (av *AVFrame[T]) AppendRTPPackets(rtp rtp.Packet) {
|
||||
av.RTPPackets = append(av.RTPPackets, rtp)
|
||||
func (av *AVFrame[T]) AppendRTP(rtp ...RTPFrame) {
|
||||
av.RTP = append(av.RTP, rtp...)
|
||||
}
|
||||
|
||||
func (av *AVFrame[T]) Reset() {
|
||||
av.FLV = nil
|
||||
av.AVCC = nil
|
||||
av.RTP = nil
|
||||
av.RTPPackets = nil
|
||||
av.Raw = nil
|
||||
}
|
||||
|
||||
@@ -147,6 +188,7 @@ func (avcc AVCCFrame) AudioCodecID() byte {
|
||||
// return
|
||||
// }
|
||||
type DecoderConfiguration[T RawSlice] struct {
|
||||
PayloadType byte
|
||||
AVCC T
|
||||
Raw T
|
||||
FLV net.Buffers
|
||||
|
@@ -12,4 +12,5 @@ type IStream interface {
|
||||
AddTrack(Track)
|
||||
IsClosed() bool
|
||||
log.Ext1FieldLogger
|
||||
SSRC() uint32
|
||||
}
|
||||
|
@@ -16,14 +16,21 @@ type Subscribe struct {
|
||||
}
|
||||
|
||||
type Pull struct {
|
||||
AutoReconnect bool // 自动重连
|
||||
Reconnect int // 自动重连,0 表示不自动重连,-1 表示无限重连,高于0 的数代表最大重连次数
|
||||
PullOnStart bool // 启动时拉流
|
||||
PullOnSubscribe bool // 订阅时自动拉流
|
||||
AutoPullList map[string]string // 自动拉流列表
|
||||
PullList map[string]string // 自动拉流列表,以streamPath为key,url为value
|
||||
}
|
||||
|
||||
func (p *Pull) AddPull(streamPath string, url string) {
|
||||
if p.PullList == nil {
|
||||
p.PullList = make(map[string]string)
|
||||
}
|
||||
p.PullList[streamPath] = url
|
||||
}
|
||||
|
||||
type Push struct {
|
||||
AutoPushList map[string]string // 自动推流列表
|
||||
PushList map[string]string // 自动推流列表
|
||||
}
|
||||
|
||||
type Engine struct {
|
||||
|
@@ -27,7 +27,7 @@ func InstallPlugin(config config.Plugin) *Plugin {
|
||||
_, pluginFilePath, _, _ := runtime.Caller(1)
|
||||
configDir := filepath.Dir(pluginFilePath)
|
||||
if parts := strings.Split(configDir, "@"); len(parts) > 1 {
|
||||
plugin.Version = parts[len(parts)-1]
|
||||
plugin.Version = util.LastElement(parts)
|
||||
}
|
||||
if _, ok := Plugins[name]; ok {
|
||||
return nil
|
||||
@@ -83,6 +83,7 @@ func (opt *Plugin) HandleFunc(pattern string, handler func(http.ResponseWriter,
|
||||
}
|
||||
|
||||
// 读取独立配置合并入总配置中
|
||||
// TODO: 覆盖逻辑有待商榷
|
||||
func (opt *Plugin) assign() {
|
||||
f, err := os.Open(opt.settingPath())
|
||||
if err == nil {
|
||||
@@ -128,7 +129,7 @@ func (opt *Plugin) autoPull() {
|
||||
if t.Field(i).Name == "Pull" {
|
||||
var pullConfig config.Pull
|
||||
reflect.ValueOf(&pullConfig).Elem().Set(v.Field(i))
|
||||
for streamPath, url := range pullConfig.AutoPullList {
|
||||
for streamPath, url := range pullConfig.PullList {
|
||||
puller := Puller{RemoteURL: url, Config: &pullConfig}
|
||||
if pullConfig.PullOnStart {
|
||||
opt.Config.(PullPlugin).PullStream(streamPath, puller)
|
||||
|
@@ -74,6 +74,11 @@ type Puller struct {
|
||||
pullCount int
|
||||
}
|
||||
|
||||
// 是否需要重连
|
||||
func (pub *Puller) reconnect() bool {
|
||||
return pub.Config.Reconnect == -1 || pub.pullCount <= pub.Config.Reconnect
|
||||
}
|
||||
|
||||
func (pub *Puller) pull() {
|
||||
pub.specific.(IPuller).Pull(pub.pullCount)
|
||||
pub.pullCount++
|
||||
@@ -84,7 +89,7 @@ func (pub *Puller) OnStateChanged(oldState StreamState, newState StreamState) {
|
||||
case STATE_WAITTRACK:
|
||||
go pub.pull()
|
||||
case STATE_WAITPUBLISH:
|
||||
if pub.Config.AutoReconnect && pub.Publish(pub.Path, pub.specific, *pub.Publisher.Config) {
|
||||
if pub.reconnect() && pub.Publish(pub.Path, pub.specific, *pub.Publisher.Config) {
|
||||
go pub.pull()
|
||||
}
|
||||
}
|
||||
|
@@ -6,6 +6,7 @@ import (
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
"github.com/Monibuca/engine/v4/track"
|
||||
"github.com/Monibuca/engine/v4/util"
|
||||
@@ -105,6 +106,10 @@ type Stream struct {
|
||||
*log.Entry `json:"-"`
|
||||
}
|
||||
|
||||
func (s *Stream) SSRC() uint32 {
|
||||
return uint32(uintptr(unsafe.Pointer(s)))
|
||||
}
|
||||
|
||||
func (s *Stream) UnPublish() {
|
||||
if !s.IsClosed() {
|
||||
s.actionChan <- UnPublishAction{}
|
||||
@@ -130,7 +135,7 @@ func findOrCreateStream(streamPath string, waitTimeout time.Duration) (s *Stream
|
||||
s = &Stream{
|
||||
URL: u,
|
||||
AppName: p[0],
|
||||
StreamName: p[len(p)-1],
|
||||
StreamName: util.LastElement(p),
|
||||
Entry: log.WithField("stream", u.Path),
|
||||
}
|
||||
s.Infoln("created:", streamPath)
|
||||
|
38
track/aac.go
38
track/aac.go
@@ -6,6 +6,8 @@ import (
|
||||
|
||||
"github.com/Monibuca/engine/v4/codec"
|
||||
. "github.com/Monibuca/engine/v4/common"
|
||||
"github.com/Monibuca/engine/v4/config"
|
||||
"github.com/Monibuca/engine/v4/util"
|
||||
)
|
||||
|
||||
func NewAAC(stream IStream) (aac *AAC) {
|
||||
@@ -15,11 +17,26 @@ func NewAAC(stream IStream) (aac *AAC) {
|
||||
aac.CodecID = codec.CodecID_AAC
|
||||
aac.Init(stream, 32)
|
||||
aac.Poll = time.Millisecond * 20
|
||||
aac.DecoderConfiguration.PayloadType = 97
|
||||
return
|
||||
}
|
||||
|
||||
type AAC Audio
|
||||
|
||||
func (aac *AAC) WriteRTP(raw []byte) {
|
||||
var packet RTPFrame
|
||||
if frame := packet.Unmarshal(raw); frame == nil {
|
||||
return
|
||||
}
|
||||
for _, payload := range codec.ParseRTPAAC(packet.Payload) {
|
||||
aac.WriteSlice(payload)
|
||||
}
|
||||
aac.Value.AppendRTP(packet)
|
||||
if packet.Marker {
|
||||
aac.Flush()
|
||||
}
|
||||
}
|
||||
|
||||
func (aac *AAC) WriteAVCC(ts uint32, frame AVCCFrame) {
|
||||
if frame.IsSequence() {
|
||||
aac.DecoderConfiguration.AVCC = AudioSlice(frame)
|
||||
@@ -35,5 +52,26 @@ func (aac *AAC) WriteAVCC(ts uint32, frame AVCCFrame) {
|
||||
aac.DecoderConfiguration.FLV = net.Buffers{adcflv1, frame, adcflv2}
|
||||
} else {
|
||||
(*Audio)(aac).WriteAVCC(ts, frame)
|
||||
aac.Flush()
|
||||
}
|
||||
}
|
||||
|
||||
func (aac *AAC) Flush() {
|
||||
// RTP格式补完
|
||||
// TODO: MTU 分割
|
||||
if aac.Value.RTP == nil && config.Global.EnableRTP {
|
||||
l := util.SizeOfBuffers(aac.Value.Raw)
|
||||
o := make([]byte, 4, l+4)
|
||||
//AU_HEADER_LENGTH,因为单位是bit, 除以8就是auHeader的字节长度;又因为单个auheader字节长度2字节,所以再除以2就是auheader的个数。
|
||||
o[0] = 0x00 //高位
|
||||
o[1] = 0x10 //低位
|
||||
//AU_HEADER
|
||||
o[2] = (byte)((l & 0x1fe0) >> 5) //高位
|
||||
o[3] = (byte)((l & 0x1f) << 3) //低位
|
||||
for _, raw := range aac.Value.Raw {
|
||||
o = append(o, raw...)
|
||||
}
|
||||
aac.PacketizeRTP(o)
|
||||
}
|
||||
(*Audio)(aac).Flush()
|
||||
}
|
||||
|
@@ -6,6 +6,7 @@ import (
|
||||
|
||||
"github.com/Monibuca/engine/v4/codec"
|
||||
. "github.com/Monibuca/engine/v4/common"
|
||||
"github.com/Monibuca/engine/v4/config"
|
||||
"github.com/Monibuca/engine/v4/util"
|
||||
)
|
||||
|
||||
@@ -39,6 +40,7 @@ func (at *Audio) Play(onAudio func(*AVFrame[AudioSlice]) error) {
|
||||
ar.MoveNext()
|
||||
}
|
||||
}
|
||||
|
||||
func (at *Audio) WriteADTS(adts []byte) {
|
||||
profile := ((adts[2] & 0xc0) >> 6) + 1
|
||||
sampleRate := (adts[2] & 0x3c) >> 2
|
||||
@@ -47,27 +49,34 @@ func (at *Audio) WriteADTS(adts []byte) {
|
||||
config2 := ((sampleRate & 0x1) << 7) | (channel << 3)
|
||||
at.SampleRate = uint32(codec.SamplingFrequencies[sampleRate])
|
||||
at.Channels = channel
|
||||
at.DecoderConfiguration.AVCC = []byte{0xAF, 0x00, config1, config2}
|
||||
at.DecoderConfiguration.Raw = at.DecoderConfiguration.AVCC[:2]
|
||||
at.DecoderConfiguration.FLV = net.Buffers{adcflv1, at.DecoderConfiguration.AVCC, adcflv2}
|
||||
avcc := []byte{0xAF, 0x00, config1, config2}
|
||||
at.DecoderConfiguration = DecoderConfiguration[AudioSlice]{
|
||||
97,
|
||||
avcc,
|
||||
avcc[:2],
|
||||
net.Buffers{adcflv1, at.DecoderConfiguration.AVCC, adcflv2},
|
||||
}
|
||||
|
||||
func (at *Audio) WriteAVCC(ts uint32, frame AVCCFrame) {
|
||||
at.Media.WriteAVCC(ts, frame)
|
||||
at.Flush()
|
||||
}
|
||||
|
||||
func (at *Audio) Flush() {
|
||||
if at.Value.AVCC == nil {
|
||||
// AVCC 格式补完
|
||||
if at.Value.AVCC == nil && (config.Global.EnableAVCC || config.Global.EnableFLV) {
|
||||
at.Value.AppendAVCC(at.avccHead)
|
||||
for _, raw := range at.Value.Raw {
|
||||
at.Value.AppendAVCC(raw)
|
||||
}
|
||||
}
|
||||
// FLV tag 补完
|
||||
if at.Value.FLV == nil {
|
||||
if at.Value.FLV == nil && config.Global.EnableFLV {
|
||||
at.Value.FillFLV(codec.FLV_TAG_TYPE_AUDIO, at.Value.DTS/90)
|
||||
}
|
||||
if at.Value.RTP == nil && config.Global.EnableRTP {
|
||||
var o []byte
|
||||
for _, raw := range at.Value.Raw {
|
||||
o = append(o, raw...)
|
||||
}
|
||||
at.PacketizeRTP(o)
|
||||
}
|
||||
at.Media.Flush()
|
||||
}
|
||||
|
||||
|
@@ -2,7 +2,6 @@ package track
|
||||
|
||||
import (
|
||||
. "github.com/Monibuca/engine/v4/common"
|
||||
"github.com/Monibuca/engine/v4/util"
|
||||
"github.com/pion/rtp"
|
||||
)
|
||||
|
||||
@@ -30,20 +29,9 @@ type Media[T RawSlice] struct {
|
||||
SampleRate uint32
|
||||
SampleSize byte
|
||||
DecoderConfiguration DecoderConfiguration[T] `json:"-"` //H264(SPS、PPS) H265(VPS、SPS、PPS) AAC(config)
|
||||
util.BytesPool //无锁内存池,用于发布者(在同一个协程中)复用小块的内存,通常是解包时需要临时使用
|
||||
// util.BytesPool //无锁内存池,用于发布者(在同一个协程中)复用小块的内存,通常是解包时需要临时使用
|
||||
lastAvccTS uint32 //上一个avcc帧的时间戳
|
||||
}
|
||||
|
||||
func (av *Media[T]) WriteRTP(raw []byte) {
|
||||
av.Value.AppendRTP(raw)
|
||||
var packet rtp.Packet
|
||||
if err := packet.Unmarshal(raw); err != nil {
|
||||
return
|
||||
}
|
||||
av.Value.AppendRTPPackets(packet)
|
||||
if packet.Marker {
|
||||
av.Flush()
|
||||
}
|
||||
rtpSequence uint16
|
||||
}
|
||||
|
||||
func (av *Media[T]) WriteSlice(slice T) {
|
||||
@@ -68,3 +56,25 @@ func (av *Media[T]) Flush() {
|
||||
av.Base.Flush(&av.Value.BaseFrame)
|
||||
av.Step()
|
||||
}
|
||||
|
||||
// Packetize packetizes the payload of an RTP packet and returns one or more RTP packets
|
||||
func (av *Media[T]) PacketizeRTP(payloads ...[]byte) {
|
||||
for i, pp := range payloads {
|
||||
av.rtpSequence++
|
||||
var frame = RTPFrame{Packet: rtp.Packet{
|
||||
Header: rtp.Header{
|
||||
Version: 2,
|
||||
Padding: false,
|
||||
Extension: false,
|
||||
Marker: i == len(payloads)-1,
|
||||
PayloadType: av.DecoderConfiguration.PayloadType,
|
||||
SequenceNumber: av.rtpSequence,
|
||||
Timestamp: av.Value.DTS, // Figure out how to do timestamps
|
||||
SSRC: av.Stream.SSRC(),
|
||||
},
|
||||
Payload: pp,
|
||||
}}
|
||||
frame.Marshal()
|
||||
av.Value.AppendRTP(frame)
|
||||
}
|
||||
}
|
||||
|
@@ -17,12 +17,26 @@ func NewG711(stream IStream, alaw bool) (g711 *G711) {
|
||||
}
|
||||
g711.Init(stream, 32)
|
||||
g711.Poll = time.Millisecond * 20
|
||||
g711.DecoderConfiguration.PayloadType = 97
|
||||
return
|
||||
}
|
||||
|
||||
type G711 Audio
|
||||
|
||||
func (g711 *G711) WriteAVCC(ts uint32, frame AVCCFrame) {
|
||||
g711.Value.AppendRaw(AudioSlice(frame[1:]))
|
||||
g711.WriteSlice(AudioSlice(frame[1:]))
|
||||
(*Audio)(g711).WriteAVCC(ts, frame)
|
||||
g711.Flush()
|
||||
}
|
||||
|
||||
func (g711 *G711) WriteRTP(raw []byte) {
|
||||
var packet RTPFrame
|
||||
if frame := packet.Unmarshal(raw); frame == nil {
|
||||
return
|
||||
}
|
||||
g711.WriteSlice(packet.Payload)
|
||||
g711.Value.AppendRTP(packet)
|
||||
if packet.Marker {
|
||||
g711.Flush()
|
||||
}
|
||||
}
|
||||
|
@@ -6,6 +6,7 @@ import (
|
||||
|
||||
"github.com/Monibuca/engine/v4/codec"
|
||||
. "github.com/Monibuca/engine/v4/common"
|
||||
"github.com/Monibuca/engine/v4/config"
|
||||
"github.com/Monibuca/engine/v4/util"
|
||||
)
|
||||
|
||||
@@ -19,6 +20,7 @@ func NewH264(stream IStream) (vt *H264) {
|
||||
vt.Stream = stream
|
||||
vt.Init(stream, 256)
|
||||
vt.Poll = time.Millisecond * 20
|
||||
vt.DecoderConfiguration.PayloadType = 96
|
||||
return
|
||||
}
|
||||
func (vt *H264) WriteAnnexB(pts uint32, dts uint32, frame AnnexBFrame) {
|
||||
@@ -28,29 +30,22 @@ func (vt *H264) WriteAnnexB(pts uint32, dts uint32, frame AnnexBFrame) {
|
||||
func (vt *H264) WriteSlice(slice NALUSlice) {
|
||||
switch slice.H264Type() {
|
||||
case codec.NALU_SPS:
|
||||
if len(vt.DecoderConfiguration.Raw) > 0 {
|
||||
vt.DecoderConfiguration.Raw = vt.DecoderConfiguration.Raw[:0]
|
||||
}
|
||||
vt.DecoderConfiguration.Raw = append(vt.DecoderConfiguration.Raw, slice[0])
|
||||
vt.DecoderConfiguration.Raw.Reset().Append(slice[0])
|
||||
case codec.NALU_PPS:
|
||||
vt.DecoderConfiguration.Raw = append(vt.DecoderConfiguration.Raw, slice[0])
|
||||
vt.DecoderConfiguration.Raw.Append(slice[0])
|
||||
vt.SPSInfo, _ = codec.ParseSPS(slice[0])
|
||||
if len(vt.DecoderConfiguration.Raw) > 0 {
|
||||
vt.DecoderConfiguration.Raw = vt.DecoderConfiguration.Raw[:0]
|
||||
}
|
||||
lenSPS := len(vt.DecoderConfiguration.Raw[0])
|
||||
lenPPS := len(vt.DecoderConfiguration.Raw[1])
|
||||
if len(vt.DecoderConfiguration.AVCC) > 0 {
|
||||
vt.DecoderConfiguration.AVCC = vt.DecoderConfiguration.AVCC[:0]
|
||||
}
|
||||
vt.DecoderConfiguration.AVCC.Reset()
|
||||
if lenSPS > 3 {
|
||||
vt.DecoderConfiguration.AVCC = append(vt.DecoderConfiguration.AVCC, codec.RTMP_AVC_HEAD[:6], vt.DecoderConfiguration.Raw[0][1:4])
|
||||
vt.DecoderConfiguration.AVCC.Append(codec.RTMP_AVC_HEAD[:6], vt.DecoderConfiguration.Raw[0][1:4])
|
||||
} else {
|
||||
vt.DecoderConfiguration.AVCC = append(vt.DecoderConfiguration.AVCC, codec.RTMP_AVC_HEAD)
|
||||
vt.DecoderConfiguration.AVCC.Append(codec.RTMP_AVC_HEAD)
|
||||
}
|
||||
tmp := []byte{0xE1, 0, 0, 0x01, 0, 0}
|
||||
vt.DecoderConfiguration.AVCC = append(vt.DecoderConfiguration.AVCC, tmp[:1], util.PutBE(tmp[1:3], lenSPS), vt.DecoderConfiguration.Raw[0], tmp[3:4], util.PutBE(tmp[3:6], lenPPS), vt.DecoderConfiguration.Raw[1])
|
||||
vt.DecoderConfiguration.AVCC.Append(tmp[:1], util.PutBE(tmp[1:3], lenSPS), vt.DecoderConfiguration.Raw[0], tmp[3:4], util.PutBE(tmp[3:6], lenPPS), vt.DecoderConfiguration.Raw[1])
|
||||
vt.DecoderConfiguration.FLV = codec.VideoAVCC2FLV(net.Buffers(vt.DecoderConfiguration.AVCC), 0)
|
||||
|
||||
case codec.NALU_IDR_Picture:
|
||||
vt.Value.IFrame = true
|
||||
fallthrough
|
||||
@@ -62,15 +57,12 @@ func (vt *H264) WriteSlice(slice NALUSlice) {
|
||||
|
||||
func (vt *H264) WriteAVCC(ts uint32, frame AVCCFrame) {
|
||||
if frame.IsSequence() {
|
||||
if len(vt.DecoderConfiguration.AVCC) > 0 {
|
||||
vt.DecoderConfiguration.AVCC = vt.DecoderConfiguration.AVCC[:0]
|
||||
}
|
||||
vt.DecoderConfiguration.AVCC = append(vt.DecoderConfiguration.AVCC, frame)
|
||||
vt.DecoderConfiguration.AVCC.Reset().Append(frame)
|
||||
var info codec.AVCDecoderConfigurationRecord
|
||||
if _, err := info.Unmarshal(frame[5:]); err == nil {
|
||||
vt.SPSInfo, _ = codec.ParseSPS(info.SequenceParameterSetNALUnit)
|
||||
vt.nalulenSize = int(info.LengthSizeMinusOne&3 + 1)
|
||||
vt.DecoderConfiguration.Raw = NALUSlice{info.SequenceParameterSetNALUnit, info.PictureParameterSetNALUnit}
|
||||
vt.DecoderConfiguration.Raw.Append(info.SequenceParameterSetNALUnit, info.PictureParameterSetNALUnit)
|
||||
}
|
||||
vt.DecoderConfiguration.FLV = codec.VideoAVCC2FLV(net.Buffers(vt.DecoderConfiguration.AVCC), 0)
|
||||
} else {
|
||||
@@ -80,6 +72,37 @@ func (vt *H264) WriteAVCC(ts uint32, frame AVCCFrame) {
|
||||
}
|
||||
}
|
||||
|
||||
func (vt *H264) WriteRTP(raw []byte) {
|
||||
var frame RTPFrame
|
||||
if packet := frame.Unmarshal(raw); packet == nil {
|
||||
return
|
||||
}
|
||||
if naluType := frame.H264Type(); naluType < 24 {
|
||||
vt.WriteSlice(NALUSlice{frame.Payload})
|
||||
} else {
|
||||
switch naluType {
|
||||
case codec.NALU_STAPA, codec.NALU_STAPB:
|
||||
for buffer := util.Buffer(frame.Payload[naluType.Offset():]); buffer.CanRead(); {
|
||||
vt.WriteSlice(NALUSlice{buffer.ReadN(int(buffer.ReadUint16()))})
|
||||
}
|
||||
case codec.NALU_FUA, codec.NALU_FUB:
|
||||
if util.Bit1(frame.Payload[1], 0) {
|
||||
vt.Value.AppendRaw(NALUSlice{[]byte{naluType.Parse(frame.Payload[1]).Or(frame.Payload[0] & 0x60)}})
|
||||
}
|
||||
lastIndex := len(vt.Value.Raw) - 1
|
||||
vt.Value.Raw[lastIndex].Append(frame.Payload[naluType.Offset():])
|
||||
if util.Bit1(frame.Payload[1], 1) {
|
||||
vt.Value.Raw = vt.Value.Raw[:lastIndex]
|
||||
vt.WriteSlice(vt.Value.Raw[lastIndex])
|
||||
}
|
||||
}
|
||||
}
|
||||
vt.Value.AppendRTP(frame)
|
||||
if frame.Marker {
|
||||
vt.Flush()
|
||||
}
|
||||
}
|
||||
|
||||
func (vt *H264) Flush() {
|
||||
if vt.Value.IFrame {
|
||||
if vt.IDRing == nil {
|
||||
@@ -88,8 +111,36 @@ func (vt *H264) Flush() {
|
||||
(*Video)(vt).ComputeGOP()
|
||||
}
|
||||
// RTP格式补完
|
||||
if vt.Value.RTP == nil {
|
||||
|
||||
if vt.Value.RTP == nil && config.Global.EnableRTP {
|
||||
var out [][]byte
|
||||
for _, nalu := range vt.Value.Raw {
|
||||
buffers := util.SplitBuffers(nalu, 1200)
|
||||
firstBuffer := NALUSlice(buffers[0])
|
||||
if l := len(buffers); l == 1 {
|
||||
out = append(out, firstBuffer.Bytes())
|
||||
} else {
|
||||
naluType := firstBuffer.H264Type()
|
||||
firstByte := codec.NALU_FUA.Or(firstBuffer.RefIdc())
|
||||
buf := []byte{firstByte, naluType.Or(1 << 7)}
|
||||
for i, sp := range firstBuffer {
|
||||
if i == 0 {
|
||||
sp = sp[1:]
|
||||
}
|
||||
buf = append(buf, sp...)
|
||||
}
|
||||
out = append(out, buf)
|
||||
for _, bufs := range buffers[1:] {
|
||||
buf := []byte{firstByte, naluType.Byte()}
|
||||
for _, sp := range bufs {
|
||||
buf = append(buf, sp...)
|
||||
}
|
||||
out = append(out, buf)
|
||||
}
|
||||
lastBuf := out[len(out)-1]
|
||||
lastBuf[1] |= 1 << 6 // set end bit
|
||||
}
|
||||
}
|
||||
vt.PacketizeRTP(out...)
|
||||
}
|
||||
(*Video)(vt).Flush()
|
||||
}
|
||||
|
@@ -6,6 +6,8 @@ import (
|
||||
|
||||
"github.com/Monibuca/engine/v4/codec"
|
||||
. "github.com/Monibuca/engine/v4/common"
|
||||
"github.com/Monibuca/engine/v4/config"
|
||||
"github.com/Monibuca/engine/v4/util"
|
||||
)
|
||||
|
||||
type H265 Video
|
||||
@@ -18,6 +20,7 @@ func NewH265(stream IStream) (vt *H265) {
|
||||
vt.Stream = stream
|
||||
vt.Init(stream, 256)
|
||||
vt.Poll = time.Millisecond * 20
|
||||
vt.DecoderConfiguration.PayloadType = 96
|
||||
return
|
||||
}
|
||||
func (vt *H265) WriteAnnexB(pts uint32, dts uint32, frame AnnexBFrame) {
|
||||
@@ -27,21 +30,15 @@ func (vt *H265) WriteAnnexB(pts uint32, dts uint32, frame AnnexBFrame) {
|
||||
func (vt *H265) WriteSlice(slice NALUSlice) {
|
||||
switch slice.H265Type() {
|
||||
case codec.NAL_UNIT_VPS:
|
||||
if len(vt.DecoderConfiguration.Raw) > 0 {
|
||||
vt.DecoderConfiguration.Raw = vt.DecoderConfiguration.Raw[:0]
|
||||
}
|
||||
vt.DecoderConfiguration.Raw = append(vt.DecoderConfiguration.Raw, slice[0])
|
||||
vt.DecoderConfiguration.Raw.Reset().Append(slice[0])
|
||||
case codec.NAL_UNIT_SPS:
|
||||
vt.DecoderConfiguration.Raw = append(vt.DecoderConfiguration.Raw, slice[0])
|
||||
vt.DecoderConfiguration.Raw.Append(slice[0])
|
||||
vt.SPSInfo, _ = codec.ParseHevcSPS(slice[0])
|
||||
case codec.NAL_UNIT_PPS:
|
||||
vt.DecoderConfiguration.Raw = append(vt.DecoderConfiguration.Raw, slice[0])
|
||||
vt.DecoderConfiguration.Raw.Append(slice[0])
|
||||
extraData, err := codec.BuildH265SeqHeaderFromVpsSpsPps(vt.DecoderConfiguration.Raw[0], vt.DecoderConfiguration.Raw[1], vt.DecoderConfiguration.Raw[2])
|
||||
if err == nil {
|
||||
if len(vt.DecoderConfiguration.AVCC) > 0 {
|
||||
vt.DecoderConfiguration.AVCC = vt.DecoderConfiguration.AVCC[:0]
|
||||
}
|
||||
vt.DecoderConfiguration.AVCC = append(vt.DecoderConfiguration.AVCC, extraData)
|
||||
vt.DecoderConfiguration.AVCC.Reset().Append(extraData)
|
||||
}
|
||||
vt.DecoderConfiguration.FLV = codec.VideoAVCC2FLV(net.Buffers(vt.DecoderConfiguration.AVCC), 0)
|
||||
case
|
||||
@@ -59,14 +56,11 @@ func (vt *H265) WriteSlice(slice NALUSlice) {
|
||||
}
|
||||
func (vt *H265) WriteAVCC(ts uint32, frame AVCCFrame) {
|
||||
if frame.IsSequence() {
|
||||
if len(vt.DecoderConfiguration.AVCC) > 0 {
|
||||
vt.DecoderConfiguration.AVCC = vt.DecoderConfiguration.AVCC[:0]
|
||||
}
|
||||
vt.DecoderConfiguration.AVCC = append(vt.DecoderConfiguration.AVCC, frame)
|
||||
vt.DecoderConfiguration.AVCC.Reset().Append(frame)
|
||||
if vps, sps, pps, err := codec.ParseVpsSpsPpsFromSeqHeaderWithoutMalloc(frame); err == nil {
|
||||
vt.SPSInfo, _ = codec.ParseHevcSPS(frame)
|
||||
vt.nalulenSize = int(frame[26]) & 0x03
|
||||
vt.DecoderConfiguration.Raw = NALUSlice{vps, sps, pps}
|
||||
vt.DecoderConfiguration.Raw.Append(vps, sps, pps)
|
||||
}
|
||||
vt.DecoderConfiguration.FLV = codec.VideoAVCC2FLV(net.Buffers(vt.DecoderConfiguration.AVCC), 0)
|
||||
} else {
|
||||
@@ -75,7 +69,43 @@ func (vt *H265) WriteAVCC(ts uint32, frame AVCCFrame) {
|
||||
vt.Flush()
|
||||
}
|
||||
}
|
||||
|
||||
func (vt *H265) WriteRTP(raw []byte) {
|
||||
var frame RTPFrame
|
||||
if packet := frame.Unmarshal(raw); packet == nil {
|
||||
return
|
||||
}
|
||||
// TODO: DONL may need to be parsed if `sprop-max-don-diff` is greater than 0 on the RTP stream.
|
||||
var usingDonlField bool
|
||||
var buffer = util.Buffer(frame.Payload)
|
||||
switch frame.H265Type() {
|
||||
case codec.NAL_UNIT_RTP_AP:
|
||||
buffer.ReadUint16()
|
||||
if usingDonlField {
|
||||
buffer.ReadUint16()
|
||||
}
|
||||
for buffer.CanRead() {
|
||||
vt.WriteSlice(NALUSlice{buffer.ReadN(int(buffer.ReadUint16()))})
|
||||
if usingDonlField {
|
||||
buffer.ReadByte()
|
||||
}
|
||||
}
|
||||
case codec.NAL_UNIT_RTP_FU:
|
||||
first3 := buffer.ReadN(3)
|
||||
fuHeader := first3[2]
|
||||
if usingDonlField {
|
||||
buffer.ReadUint16()
|
||||
}
|
||||
if naluType := fuHeader & 0b00111111; util.Bit1(fuHeader, 0) {
|
||||
vt.Value.AppendRaw(NALUSlice{[]byte{first3[0]&0b10000001 | (naluType << 1), first3[1]}})
|
||||
}
|
||||
lastIndex := len(vt.Value.Raw) - 1
|
||||
vt.Value.Raw[lastIndex].Append(buffer)
|
||||
if util.Bit1(fuHeader, 1) {
|
||||
vt.Value.Raw = vt.Value.Raw[:lastIndex]
|
||||
vt.WriteSlice(vt.Value.Raw[lastIndex])
|
||||
}
|
||||
}
|
||||
}
|
||||
func (vt *H265) Flush() {
|
||||
if vt.Value.IFrame {
|
||||
if vt.IDRing == nil {
|
||||
@@ -84,8 +114,36 @@ func (vt *H265) Flush() {
|
||||
(*Video)(vt).ComputeGOP()
|
||||
}
|
||||
// RTP格式补完
|
||||
if vt.Value.RTP == nil {
|
||||
|
||||
if vt.Value.RTP == nil && config.Global.EnableRTP {
|
||||
var out [][]byte
|
||||
for _, nalu := range vt.Value.Raw {
|
||||
buffers := util.SplitBuffers(nalu, 1200)
|
||||
firstBuffer := NALUSlice(buffers[0])
|
||||
if l := len(buffers); l == 1 {
|
||||
out = append(out, firstBuffer.Bytes())
|
||||
} else {
|
||||
naluType := firstBuffer.H265Type()
|
||||
firstByte := (byte(codec.NAL_UNIT_RTP_FU) << 1) | (firstBuffer[0][0] & 0b10000001)
|
||||
buf := []byte{firstByte, firstBuffer[0][1], (1 << 7) | (byte(naluType) >> 1)}
|
||||
for i, sp := range firstBuffer {
|
||||
if i == 0 {
|
||||
sp = sp[2:]
|
||||
}
|
||||
buf = append(buf, sp...)
|
||||
}
|
||||
out = append(out, buf)
|
||||
for _, bufs := range buffers[1:] {
|
||||
buf := []byte{firstByte, firstBuffer[0][1], byte(naluType) >> 1}
|
||||
for _, sp := range bufs {
|
||||
buf = append(buf, sp...)
|
||||
}
|
||||
out = append(out, buf)
|
||||
}
|
||||
lastBuf := out[len(out)-1]
|
||||
lastBuf[2] |= 1 << 6 // set end bit
|
||||
}
|
||||
}
|
||||
vt.PacketizeRTP(out...)
|
||||
}
|
||||
(*Video)(vt).Flush()
|
||||
}
|
||||
|
@@ -93,6 +93,11 @@ func (vt *Video) WriteAVCC(ts uint32, frame AVCCFrame) {
|
||||
}
|
||||
|
||||
func (vt *Video) Flush() {
|
||||
// 没有实际媒体数据
|
||||
if vt.Value.Raw == nil {
|
||||
vt.Value.Reset()
|
||||
return
|
||||
}
|
||||
// AVCC格式补完
|
||||
if vt.Value.AVCC == nil && (config.Global.EnableAVCC || config.Global.EnableFLV) {
|
||||
b := []byte{vt.CodecID, 1, 0, 0, 0}
|
||||
|
@@ -3,7 +3,6 @@ package util
|
||||
import (
|
||||
"encoding/binary"
|
||||
"math"
|
||||
"net"
|
||||
)
|
||||
|
||||
type Buffer []byte
|
||||
@@ -57,6 +56,9 @@ func (b *Buffer) Write(a []byte) (n int, err error) {
|
||||
func (b Buffer) Len() int {
|
||||
return len(b)
|
||||
}
|
||||
func (b Buffer) CanRead() bool {
|
||||
return b.Len() > 0
|
||||
}
|
||||
func (b Buffer) Cap() int {
|
||||
return cap(b)
|
||||
}
|
||||
@@ -86,23 +88,20 @@ func (b *Buffer) Glow(n int) {
|
||||
}
|
||||
|
||||
// SizeOfBuffers 计算Buffers的内容长度
|
||||
func SizeOfBuffers(buf net.Buffers) (size int) {
|
||||
func SizeOfBuffers[T ~[]byte](buf []T) (size int) {
|
||||
for _, b := range buf {
|
||||
size += len(b)
|
||||
}
|
||||
return
|
||||
}
|
||||
func CutBuffers(buf net.Buffers, size int) {
|
||||
|
||||
}
|
||||
|
||||
// SplitBuffers 按照一定大小分割 Buffers
|
||||
func SplitBuffers(buf net.Buffers, size int) (result []net.Buffers) {
|
||||
func SplitBuffers[T ~[]byte](buf []T, size int) (result [][]T) {
|
||||
for total := SizeOfBuffers(buf); total > 0; {
|
||||
if total <= size {
|
||||
return append(result, buf)
|
||||
} else {
|
||||
var before net.Buffers
|
||||
var before []T
|
||||
sizeOfBefore := 0
|
||||
for _, b := range buf {
|
||||
need := size - sizeOfBefore
|
||||
@@ -123,5 +122,3 @@ func SplitBuffers(buf net.Buffers, size int) (result []net.Buffers) {
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
|
@@ -1,17 +0,0 @@
|
||||
package util
|
||||
|
||||
type BytesPool [][]byte
|
||||
|
||||
func (pool *BytesPool) Get(size int) (result []byte) {
|
||||
if l := len(*pool); l > 0 {
|
||||
result = (*pool)[l-1]
|
||||
*pool = (*pool)[:l-1]
|
||||
} else {
|
||||
result = make([]byte, size, 10)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (pool *BytesPool) Put(b []byte) {
|
||||
*pool = append(*pool, b)
|
||||
}
|
@@ -29,3 +29,8 @@ func Exist(filename string) bool {
|
||||
func ConvertNum[F constraints.Integer, T constraints.Integer](from F, to T) T {
|
||||
return T(from)
|
||||
}
|
||||
|
||||
// Bit1 检查字节中的某一位是否为1 |0|1|2|3|4|5|6|7|
|
||||
func Bit1(b byte, index int) bool {
|
||||
return b&(1<<(7-index)) != 0
|
||||
}
|
||||
|
@@ -30,3 +30,7 @@ func (s *Slice[T]) ResetAppend(first T) {
|
||||
s.Reset()
|
||||
s.Add(first)
|
||||
}
|
||||
|
||||
func LastElement[T any](s []T) T {
|
||||
return s[len(s)-1]
|
||||
}
|
||||
|
Reference in New Issue
Block a user