mirror of
https://github.com/AlexxIT/go2rtc.git
synced 2025-10-16 21:30:51 +08:00
Rework RTSP and RTMP processing
This commit is contained in:
@@ -1,3 +1,11 @@
|
||||
# H264
|
||||
|
||||
Access Unit (AU) can contain one or multiple NAL Unit:
|
||||
|
||||
1. [SEI,] SPS, PPS, IFrame, [IFrame...]
|
||||
2. BFrame, [BFrame...]
|
||||
3. IFrame, [IFrame...]
|
||||
|
||||
## RTP H264
|
||||
|
||||
Camera | NALu
|
||||
|
@@ -12,49 +12,33 @@ func IsAVC(codec *streamer.Codec) bool {
|
||||
return codec.PayloadType == PayloadTypeAVC
|
||||
}
|
||||
|
||||
func EncodeAVC(raw []byte) (avc []byte) {
|
||||
avc = make([]byte, len(raw)+4)
|
||||
binary.BigEndian.PutUint32(avc, uint32(len(raw)))
|
||||
copy(avc[4:], raw)
|
||||
func EncodeAVC(nals ...[]byte) (avc []byte) {
|
||||
n := 4 * len(nals)
|
||||
for _, nal := range nals {
|
||||
n += len(nal)
|
||||
}
|
||||
|
||||
avc = make([]byte, n)
|
||||
|
||||
var i int
|
||||
for _, nal := range nals {
|
||||
binary.BigEndian.PutUint32(avc[i:], uint32(len(nal)))
|
||||
i += 4 + copy(avc[i+4:], nal)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func RepairAVC(track *streamer.Track) streamer.WrapperFunc {
|
||||
sps, pps := GetParameterSet(track.Codec.FmtpLine)
|
||||
sps = EncodeAVC(sps)
|
||||
pps = EncodeAVC(pps)
|
||||
ps := EncodeAVC(sps, pps)
|
||||
|
||||
return func(push streamer.WriterFunc) streamer.WriterFunc {
|
||||
return func(packet *rtp.Packet) (err error) {
|
||||
naluType := NALUType(packet.Payload)
|
||||
switch naluType {
|
||||
case NALUTypeSPS:
|
||||
sps = packet.Payload
|
||||
return
|
||||
case NALUTypePPS:
|
||||
pps = packet.Payload
|
||||
return
|
||||
if NALUType(packet.Payload) == NALUTypeIFrame {
|
||||
packet.Payload = Join(ps, packet.Payload)
|
||||
}
|
||||
|
||||
var clone rtp.Packet
|
||||
|
||||
if naluType == NALUTypeIFrame {
|
||||
clone = *packet
|
||||
clone.Payload = sps
|
||||
if err = push(&clone); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
clone = *packet
|
||||
clone.Payload = pps
|
||||
if err = push(&clone); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
clone = *packet
|
||||
clone.Payload = packet.Payload
|
||||
return push(&clone)
|
||||
return push(packet)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -2,6 +2,7 @@ package h264
|
||||
|
||||
import (
|
||||
"encoding/base64"
|
||||
"encoding/binary"
|
||||
"github.com/AlexxIT/go2rtc/pkg/streamer"
|
||||
"strings"
|
||||
)
|
||||
@@ -18,8 +19,31 @@ func NALUType(b []byte) byte {
|
||||
return b[4] & 0x1F
|
||||
}
|
||||
|
||||
// IsKeyframe - check if any NALU in one AU is Keyframe
|
||||
func IsKeyframe(b []byte) bool {
|
||||
return NALUType(b) == NALUTypeIFrame
|
||||
for {
|
||||
switch NALUType(b) {
|
||||
case NALUTypePFrame:
|
||||
return false
|
||||
case NALUTypeIFrame:
|
||||
return true
|
||||
}
|
||||
|
||||
size := int(binary.BigEndian.Uint32(b)) + 4
|
||||
if size < len(b) {
|
||||
b = b[size:]
|
||||
continue
|
||||
} else {
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func Join(ps, iframe []byte) []byte {
|
||||
b := make([]byte, len(ps)+len(iframe))
|
||||
i := copy(b, ps)
|
||||
copy(b[i:], iframe)
|
||||
return b
|
||||
}
|
||||
|
||||
func GetProfileLevelID(fmtp string) string {
|
||||
|
@@ -1,7 +1,6 @@
|
||||
package h264
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"github.com/AlexxIT/go2rtc/pkg/streamer"
|
||||
"github.com/pion/rtp"
|
||||
"github.com/pion/rtp/codecs"
|
||||
@@ -13,8 +12,7 @@ func RTPDepay(track *streamer.Track) streamer.WrapperFunc {
|
||||
depack := &codecs.H264Packet{IsAVC: true}
|
||||
|
||||
sps, pps := GetParameterSet(track.Codec.FmtpLine)
|
||||
sps = EncodeAVC(sps)
|
||||
pps = EncodeAVC(pps)
|
||||
ps := EncodeAVC(sps, pps)
|
||||
|
||||
var buffer []byte
|
||||
|
||||
@@ -27,86 +25,35 @@ func RTPDepay(track *streamer.Track) streamer.WrapperFunc {
|
||||
// packet.PayloadType, packet.SSRC, packet.SequenceNumber,
|
||||
//)
|
||||
|
||||
data, err := depack.Unmarshal(packet.Payload)
|
||||
if len(data) == 0 || err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
for {
|
||||
unitType := NALUType(data)
|
||||
//fmt.Printf("[H264] nalu: %2d, size: %6d\n", unitType, len(data))
|
||||
|
||||
// multiple 5 and 1 in one payload is OK
|
||||
if unitType != NALUTypeIFrame && unitType != NALUTypePFrame {
|
||||
i := int(binary.BigEndian.Uint32(data)) + 4
|
||||
if i < len(data) {
|
||||
data0 := data[:i] // NAL Unit with AVC header
|
||||
data = data[i:]
|
||||
switch unitType {
|
||||
case NALUTypeSPS:
|
||||
sps = data0
|
||||
continue
|
||||
case NALUTypePPS:
|
||||
pps = data0
|
||||
continue
|
||||
case NALUTypeSEI:
|
||||
// some unnecessary text information
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
switch unitType {
|
||||
case NALUTypeSPS:
|
||||
sps = data
|
||||
return nil
|
||||
case NALUTypePPS:
|
||||
pps = data
|
||||
return nil
|
||||
case NALUTypeSEI:
|
||||
// some unnecessary text information
|
||||
payload, err := depack.Unmarshal(packet.Payload)
|
||||
if len(payload) == 0 || err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// ffmpeg with `-tune zerolatency` enable option `-x264opts sliced-threads=1`
|
||||
// and every NALU will be sliced to multiple NALUs
|
||||
if !packet.Marker {
|
||||
buffer = append(buffer, data...)
|
||||
buffer = append(buffer, payload...)
|
||||
return nil
|
||||
}
|
||||
|
||||
if buffer != nil {
|
||||
buffer = append(buffer, data...)
|
||||
data = buffer
|
||||
payload = append(buffer, payload...)
|
||||
buffer = nil
|
||||
}
|
||||
|
||||
var clone rtp.Packet
|
||||
|
||||
if unitType == NALUTypeIFrame {
|
||||
clone = *packet
|
||||
clone.Version = RTPPacketVersionAVC
|
||||
clone.Payload = sps
|
||||
if err = push(&clone); err != nil {
|
||||
return err
|
||||
switch NALUType(payload) {
|
||||
case NALUTypeIFrame:
|
||||
payload = Join(ps, payload)
|
||||
}
|
||||
|
||||
clone = *packet
|
||||
clone := *packet
|
||||
clone.Version = RTPPacketVersionAVC
|
||||
clone.Payload = pps
|
||||
if err = push(&clone); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
clone = *packet
|
||||
clone.Version = RTPPacketVersionAVC
|
||||
clone.Payload = data
|
||||
clone.Payload = payload
|
||||
return push(&clone)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func RTPPay(mtu uint16) streamer.WrapperFunc {
|
||||
payloader := &Payloader{IsAVC: true}
|
||||
@@ -117,11 +64,12 @@ func RTPPay(mtu uint16) streamer.WrapperFunc {
|
||||
return func(packet *rtp.Packet) error {
|
||||
if packet.Version == RTPPacketVersionAVC {
|
||||
payloads := payloader.Payload(mtu, packet.Payload)
|
||||
last := len(payloads) - 1
|
||||
for i, payload := range payloads {
|
||||
clone := rtp.Packet{
|
||||
Header: rtp.Header{
|
||||
Version: 2,
|
||||
Marker: i == len(payloads)-1,
|
||||
Marker: i == last,
|
||||
//PayloadType: packet.PayloadType,
|
||||
SequenceNumber: sequencer.NextSequenceNumber(),
|
||||
Timestamp: packet.Timestamp,
|
||||
|
@@ -245,14 +245,12 @@ func (c *Client) worker() {
|
||||
time.Sleep(d)
|
||||
|
||||
// can be SPS, PPS and IFrame in one packet
|
||||
for _, payload := range h264.SplitAVC(data[:entry.Size]) {
|
||||
packet := &rtp.Packet{
|
||||
// ivideon clockrate=1000, RTP clockrate=90000
|
||||
Header: rtp.Header{Timestamp: ts * 90},
|
||||
Payload: payload,
|
||||
Payload: data[:entry.Size],
|
||||
}
|
||||
_ = track.WriteRTP(packet)
|
||||
}
|
||||
|
||||
data = data[entry.Size:]
|
||||
ts += entry.Duration
|
||||
|
@@ -53,16 +53,17 @@ func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *strea
|
||||
return nil
|
||||
}
|
||||
|
||||
switch h264.NALUType(packet.Payload) {
|
||||
case h264.NALUTypeIFrame:
|
||||
c.start = true
|
||||
case h264.NALUTypePFrame:
|
||||
if !c.start {
|
||||
if c.muxer == nil {
|
||||
return nil
|
||||
}
|
||||
default:
|
||||
|
||||
if !c.start {
|
||||
if h264.IsKeyframe(packet.Payload) {
|
||||
c.start = true
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
buf := c.muxer.Marshal(packet)
|
||||
c.send += len(buf)
|
||||
@@ -71,10 +72,13 @@ func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *strea
|
||||
return nil
|
||||
}
|
||||
|
||||
if !h264.IsAVC(codec) {
|
||||
wrapper := h264.RTPDepay(track)
|
||||
push = wrapper(push)
|
||||
var wrapper streamer.WrapperFunc
|
||||
if h264.IsAVC(codec) {
|
||||
wrapper = h264.RepairAVC(track)
|
||||
} else {
|
||||
wrapper = h264.RTPDepay(track)
|
||||
}
|
||||
push = wrapper(push)
|
||||
|
||||
return track.Bind(push)
|
||||
|
||||
|
@@ -136,22 +136,13 @@ func (c *Client) Handle() (err error) {
|
||||
// convert seconds to RTP timestamp
|
||||
timestamp := uint32(pkt.Time * time.Duration(track.Codec.ClockRate) / time.Second)
|
||||
|
||||
var payloads [][]byte
|
||||
if track.Codec.Name == streamer.CodecH264 {
|
||||
payloads = h264.SplitAVC(pkt.Data)
|
||||
} else {
|
||||
payloads = [][]byte{pkt.Data}
|
||||
}
|
||||
|
||||
for _, payload := range payloads {
|
||||
packet := &rtp.Packet{
|
||||
Header: rtp.Header{Timestamp: timestamp},
|
||||
Payload: payload,
|
||||
Payload: pkt.Data,
|
||||
}
|
||||
_ = track.WriteRTP(packet)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) Close() error {
|
||||
if c.conn == nil {
|
||||
|
Reference in New Issue
Block a user