Reorganize the structure of the source code

This commit is contained in:
notch
2020-12-30 11:10:00 +08:00
parent 8c22b17375
commit e8bb343b3a
54 changed files with 263 additions and 263 deletions

View File

@@ -2,7 +2,7 @@
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package av
package codec
// 帧类型
const (

View File

@@ -2,15 +2,15 @@
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package av
package codec
// VideoMeta 视频元数据
type VideoMeta struct {
Codec string `json:"codec"`
DataRate float64 `json:"datarate,omitempty"`
Width int `json:"width,omitempty"`
Height int `json:"height,omitempty"`
FrameRate float64 `json:"framerate,omitempty"`
DataRate float64 `json:"datarate,omitempty"`
Sps []byte `json:"-"`
Pps []byte `json:"-"`
Vps []byte `json:"-"`
@@ -19,9 +19,9 @@ type VideoMeta struct {
// AudioMeta 音频元数据
type AudioMeta struct {
Codec string `json:"codec"`
DataRate float64 `json:"datarate,omitempty"`
SampleRate int `json:"samplerate,omitempty"`
SampleSize int `json:"samplesize,omitempty"`
Channels int `json:"channels,omitempty"`
DataRate float64 `json:"datarate,omitempty"`
Sps []byte `json:"-"` // sps
}

View File

@@ -8,9 +8,9 @@ import (
"runtime/debug"
"time"
"github.com/cnotch/ipchub/av"
"github.com/cnotch/ipchub/av/h264"
"github.com/cnotch/ipchub/protos/amf"
"github.com/cnotch/ipchub/av/codec"
"github.com/cnotch/ipchub/av/codec/h264"
"github.com/cnotch/ipchub/av/format/amf"
"github.com/cnotch/queue"
"github.com/cnotch/xlog"
)
@@ -24,8 +24,8 @@ const (
// MuxerAvcAac flv muxer from av.Frame(H264[+AAC])
type MuxerAvcAac struct {
videoMeta av.VideoMeta
audioMeta av.AudioMeta
videoMeta codec.VideoMeta
audioMeta codec.AudioMeta
typeFlags byte
audioDataTemplate *AudioData
recvQueue *queue.SyncQueue
@@ -39,7 +39,7 @@ type MuxerAvcAac struct {
}
// NewMuxerAvcAac .
func NewMuxerAvcAac(videoMeta av.VideoMeta, audioMeta av.AudioMeta, tagWriter TagWriter, logger *xlog.Logger) *MuxerAvcAac {
func NewMuxerAvcAac(videoMeta codec.VideoMeta, audioMeta codec.AudioMeta, tagWriter TagWriter, logger *xlog.Logger) *MuxerAvcAac {
muxer := &MuxerAvcAac{
recvQueue: queue.NewSyncQueue(),
videoMeta: videoMeta,
@@ -64,7 +64,7 @@ func NewMuxerAvcAac(videoMeta av.VideoMeta, audioMeta av.AudioMeta, tagWriter Ta
}
// WriteFrame .
func (muxer *MuxerAvcAac) WriteFrame(frame *av.Frame) error {
func (muxer *MuxerAvcAac) WriteFrame(frame *codec.Frame) error {
muxer.recvQueue.Push(frame)
return nil
}
@@ -111,12 +111,12 @@ func (muxer *MuxerAvcAac) process() {
continue
}
frame := f.(*av.Frame)
frame := f.(*codec.Frame)
if muxer.basePts == 0 {
muxer.basePts = frame.AbsTimestamp
}
if frame.FrameType == av.FrameVideo {
if frame.FrameType == codec.FrameVideo {
if err := muxer.muxVideoTag(frame); err != nil {
muxer.logger.Errorf("flvmuxer: muxVideoTag error - %s", err.Error())
}
@@ -128,7 +128,7 @@ func (muxer *MuxerAvcAac) process() {
}
}
func (muxer *MuxerAvcAac) muxVideoTag(frame *av.Frame) error {
func (muxer *MuxerAvcAac) muxVideoTag(frame *codec.Frame) error {
if frame.Payload[0]&0x1F == h264.NalSps {
if len(muxer.videoMeta.Sps) == 0 {
muxer.videoMeta.Sps = frame.Payload
@@ -181,10 +181,10 @@ func (muxer *MuxerAvcAac) muxVideoTag(frame *av.Frame) error {
Data: data,
}
return muxer.tagWriter.WriteTag(tag)
return muxer.tagWriter.WriteFlvTag(tag)
}
func (muxer *MuxerAvcAac) muxAudioTag(frame *av.Frame) error {
func (muxer *MuxerAvcAac) muxAudioTag(frame *codec.Frame) error {
audioData := *muxer.audioDataTemplate
audioData.Body = frame.Payload
data, _ := audioData.Marshal()
@@ -196,7 +196,7 @@ func (muxer *MuxerAvcAac) muxAudioTag(frame *av.Frame) error {
StreamID: 0,
Data: data,
}
return muxer.tagWriter.WriteTag(tag)
return muxer.tagWriter.WriteFlvTag(tag)
}
func (muxer *MuxerAvcAac) muxMetadataTag() error {
@@ -269,7 +269,7 @@ func (muxer *MuxerAvcAac) muxMetadataTag() error {
Data: data,
}
return muxer.tagWriter.WriteTag(tag)
return muxer.tagWriter.WriteFlvTag(tag)
}
func (muxer *MuxerAvcAac) muxSequenceHeaderTag() error {
@@ -304,7 +304,7 @@ func (muxer *MuxerAvcAac) muxSequenceHeaderTag() error {
Data: data,
}
if err := muxer.tagWriter.WriteTag(tag); err != nil {
if err := muxer.tagWriter.WriteFlvTag(tag); err != nil {
return err
}
@@ -328,7 +328,7 @@ func (muxer *MuxerAvcAac) muxAudioSequenceHeaderTag() error {
StreamID: 0,
Data: data,
}
return muxer.tagWriter.WriteTag(tag)
return muxer.tagWriter.WriteFlvTag(tag)
}
func (muxer *MuxerAvcAac) prepareTemplate() {

View File

@@ -7,7 +7,7 @@ package flv
import (
"bytes"
"github.com/cnotch/ipchub/protos/amf"
"github.com/cnotch/ipchub/av/format/amf"
)
// 数据名称常量,如元数据

View File

@@ -9,7 +9,7 @@ import (
"encoding/binary"
"io"
"github.com/cnotch/ipchub/protos/amf"
"github.com/cnotch/ipchub/av/format/amf"
)
// flv 标记类型ID
@@ -42,7 +42,7 @@ type Tag struct {
// TagWriter 包装 WriteTag 方法的接口
type TagWriter interface {
WriteTag(tag *Tag) error
WriteFlvTag(tag *Tag) error
}
// Size tag 的总大小(包括 Header + Data

View File

@@ -6,7 +6,7 @@
package hls
import "github.com/cnotch/ipchub/av/aac"
import "github.com/cnotch/ipchub/av/codec/aac"
const (
// in ms, for HLS aac sync time.

View File

@@ -16,7 +16,7 @@ import (
"sync"
"time"
"github.com/cnotch/ipchub/protos/mpegts"
"github.com/cnotch/ipchub/av/format/mpegts"
"github.com/cnotch/ipchub/utils/murmur"
"github.com/cnotch/xlog"
)

View File

@@ -14,7 +14,7 @@ import (
"os"
"sync"
"github.com/cnotch/ipchub/protos/mpegts"
"github.com/cnotch/ipchub/av/format/mpegts"
)
type segmentFile interface {

View File

@@ -7,8 +7,8 @@
package mpegts
import (
"github.com/cnotch/ipchub/av/aac"
"github.com/cnotch/ipchub/av/h264"
"github.com/cnotch/ipchub/av/codec/aac"
"github.com/cnotch/ipchub/av/codec/h264"
)
// the mpegts header specifed the video/audio pid.

View File

@@ -9,9 +9,9 @@ import (
"io"
"runtime/debug"
"github.com/cnotch/ipchub/av"
"github.com/cnotch/ipchub/av/aac"
"github.com/cnotch/ipchub/av/h264"
"github.com/cnotch/ipchub/av/codec"
"github.com/cnotch/ipchub/av/codec/aac"
"github.com/cnotch/ipchub/av/codec/h264"
"github.com/cnotch/queue"
"github.com/cnotch/xlog"
)
@@ -25,8 +25,8 @@ const (
// MuxerAvcAac flv muxer from av.Frame(H264[+AAC])
type MuxerAvcAac struct {
videoMeta av.VideoMeta
audioMeta av.AudioMeta
videoMeta codec.VideoMeta
audioMeta codec.AudioMeta
hasAudio bool
audioSps aac.RawSPS
recvQueue *queue.SyncQueue
@@ -39,7 +39,7 @@ type MuxerAvcAac struct {
}
// NewMuxerAvcAac .
func NewMuxerAvcAac(videoMeta av.VideoMeta, audioMeta av.AudioMeta, tsframeWriter FrameWriter, logger *xlog.Logger) (*MuxerAvcAac, error) {
func NewMuxerAvcAac(videoMeta codec.VideoMeta, audioMeta codec.AudioMeta, tsframeWriter FrameWriter, logger *xlog.Logger) (*MuxerAvcAac, error) {
muxer := &MuxerAvcAac{
recvQueue: queue.NewSyncQueue(),
videoMeta: videoMeta,
@@ -84,7 +84,7 @@ func (muxer *MuxerAvcAac) prepareAacSps() (err error) {
}
// WriteFrame .
func (muxer *MuxerAvcAac) WriteFrame(frame *av.Frame) error {
func (muxer *MuxerAvcAac) WriteFrame(frame *codec.Frame) error {
muxer.recvQueue.Push(frame)
return nil
}
@@ -126,12 +126,12 @@ func (muxer *MuxerAvcAac) process() {
continue
}
frame := f.(*av.Frame)
frame := f.(*codec.Frame)
if muxer.basePts == 0 {
muxer.basePts = frame.AbsTimestamp
}
if frame.FrameType == av.FrameVideo {
if frame.FrameType == codec.FrameVideo {
if err := muxer.muxVideoTag(frame); err != nil {
muxer.logger.Errorf("tsmuxer: muxVideoFrame error - %s", err.Error())
}
@@ -143,7 +143,7 @@ func (muxer *MuxerAvcAac) process() {
}
}
func (muxer *MuxerAvcAac) muxVideoTag(frame *av.Frame) (err error) {
func (muxer *MuxerAvcAac) muxVideoTag(frame *codec.Frame) (err error) {
if frame.Payload[0]&0x1F == h264.NalSps {
if len(muxer.videoMeta.Sps) == 0 {
muxer.videoMeta.Sps = frame.Payload
@@ -190,7 +190,7 @@ func (muxer *MuxerAvcAac) muxVideoTag(frame *av.Frame) (err error) {
return muxer.tsframeWriter.WriteMpegtsFrame(tsframe)
}
func (muxer *MuxerAvcAac) muxAudioTag(frame *av.Frame) error {
func (muxer *MuxerAvcAac) muxAudioTag(frame *codec.Frame) error {
pts := frame.AbsTimestamp - muxer.basePts + ptsDelay
pts *= 90

View File

@@ -2,9 +2,9 @@
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package protos
package format
// Pack 表示流媒体包
type Pack interface {
// Package 表示流媒体包
type Package interface {
Size() int // 包长度
}

104
av/format/rtp/demuxer.go Normal file
View File

@@ -0,0 +1,104 @@
// Copyright (c) 2019,CAOHONGJU All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package rtp
import (
"runtime/debug"
"github.com/cnotch/queue"
"github.com/cnotch/xlog"
)
// Depacketizer 解包器
type Depacketizer interface {
Control(p *Packet) error
Depacketize(p *Packet) error
}
// Demuxer 帧转换器
type Demuxer struct {
closed bool
recvQueue *queue.SyncQueue
depacketizeFuncs [4]func(packet *Packet) error
logger *xlog.Logger
}
func emptyDepacketize(*Packet) error { return nil }
// NewDemuxer 创建 rtp.Packet 解封装处理器。
func NewDemuxer(videoDepacketizer Depacketizer, audioDepacketizer Depacketizer, logger *xlog.Logger) *Demuxer {
fc := &Demuxer{
recvQueue: queue.NewSyncQueue(),
closed: false,
logger: logger,
}
if videoDepacketizer != nil {
fc.depacketizeFuncs[ChannelVideo] = videoDepacketizer.Depacketize
fc.depacketizeFuncs[ChannelVideoControl] = videoDepacketizer.Control
} else {
fc.depacketizeFuncs[ChannelVideo] = emptyDepacketize
fc.depacketizeFuncs[ChannelVideoControl] = emptyDepacketize
}
if audioDepacketizer != nil {
fc.depacketizeFuncs[ChannelAudio] = audioDepacketizer.Depacketize
fc.depacketizeFuncs[ChannelAudioControl] = audioDepacketizer.Control
} else {
fc.depacketizeFuncs[ChannelAudio] = emptyDepacketize
fc.depacketizeFuncs[ChannelAudioControl] = emptyDepacketize
}
go fc.convert()
return fc
}
func (dm *Demuxer) convert() {
defer func() {
defer func() { // 避免 handler 再 panic
recover()
}()
if r := recover(); r != nil {
dm.logger.Errorf("FrameConverter routine panicr = %v \n %s", r, debug.Stack())
}
// 尽早通知GC回收内存
dm.recvQueue.Reset()
}()
for !dm.closed {
p := dm.recvQueue.Pop()
if p == nil {
if !dm.closed {
dm.logger.Warn("FrameConverter:receive nil packet")
}
continue
}
packet := p.(*Packet)
if err := dm.depacketizeFuncs[int(packet.Channel)](packet); err != nil {
dm.logger.Errorf("FrameConverter: extract rtp frame error :%s", err.Error())
// break
}
}
}
// Close .
func (dm *Demuxer) Close() error {
if dm.closed {
return nil
}
dm.closed = true
dm.recvQueue.Signal()
return nil
}
// WritePacket .
func (fc *Demuxer) WriteRtpPacket(packet *Packet) error {
fc.recvQueue.Push(packet)
return nil
}

View File

@@ -7,19 +7,19 @@ package rtp
import (
"fmt"
"github.com/cnotch/ipchub/av"
"github.com/cnotch/ipchub/av/h264"
"github.com/cnotch/ipchub/av/codec"
"github.com/cnotch/ipchub/av/codec/h264"
)
type h264FrameExtractor struct {
type h264Depacketizer struct {
fragments []*Packet // 分片包
w av.FrameWriter
w codec.FrameWriter
syncClock SyncClock
}
// NewH264FrameExtractor 实例化 H264 帧提取器
func NewH264FrameExtractor(w av.FrameWriter) FrameExtractor {
fe := &h264FrameExtractor{
// NewH264Depacketize 实例化 H264 帧提取器
func NewH264Depacketize(w codec.FrameWriter) Depacketizer {
fe := &h264Depacketizer{
fragments: make([]*Packet, 0, 16),
w: w,
}
@@ -27,13 +27,13 @@ func NewH264FrameExtractor(w av.FrameWriter) FrameExtractor {
return fe
}
func (fe *h264FrameExtractor) Control(p *Packet) error {
fe.syncClock.Decode(p.Data)
func (h264dp *h264Depacketizer) Control(p *Packet) error {
h264dp.syncClock.Decode(p.Data)
return nil
}
func (fe *h264FrameExtractor) Extract(packet *Packet) (err error) {
if fe.syncClock.NTPTime == 0 { // 未收到同步时钟信息,忽略任意包
func (h264dp *h264Depacketizer) Depacketize(packet *Packet) (err error) {
if h264dp.syncClock.NTPTime == 0 { // 未收到同步时钟信息,忽略任意包
return
}
@@ -66,23 +66,23 @@ func (fe *h264FrameExtractor) Extract(packet *Packet) (err error) {
if payload[0]&0x1f == h264.NalFillerData {
return
}
frame := &av.Frame{
FrameType: av.FrameVideo,
AbsTimestamp: fe.rtp2ntp(packet.Timestamp),
frame := &codec.Frame{
FrameType: codec.FrameVideo,
AbsTimestamp: h264dp.rtp2ntp(packet.Timestamp),
Payload: payload,
}
err = fe.w.WriteFrame(frame)
err = h264dp.w.WriteFrame(frame)
case naluType == h264.NalStapaInRtp:
err = fe.extractStapa(packet)
err = h264dp.depacketizeStapa(packet)
case naluType == h264.NalFuAInRtp:
err = fe.extractFuA(packet)
err = h264dp.depacketizeFuA(packet)
default:
err = fmt.Errorf("nalu type %d is currently not handled", naluType)
}
return
}
func (fe *h264FrameExtractor) extractStapa(packet *Packet) (err error) {
func (h264dp *h264Depacketizer) depacketizeStapa(packet *Packet) (err error) {
payload := packet.Payload()
header := payload[0]
@@ -112,14 +112,14 @@ func (fe *h264FrameExtractor) extractStapa(packet *Packet) (err error) {
off += 2
if payload[off]&0x1f != h264.NalFillerData {
frame := &av.Frame{
FrameType: av.FrameVideo,
AbsTimestamp: fe.rtp2ntp(packet.Timestamp),
frame := &codec.Frame{
FrameType: codec.FrameVideo,
AbsTimestamp: h264dp.rtp2ntp(packet.Timestamp),
Payload: make([]byte, nalSize),
}
copy(frame.Payload, payload[off:])
frame.Payload[0] = 0 | (header & 0x60) | (frame.Payload[0] & 0x1F)
if err = fe.w.WriteFrame(frame); err != nil {
if err = h264dp.w.WriteFrame(frame); err != nil {
return
}
}
@@ -131,7 +131,7 @@ func (fe *h264FrameExtractor) extractStapa(packet *Packet) (err error) {
return
}
func (fe *h264FrameExtractor) extractFuA(packet *Packet) (err error) {
func (h264dp *h264Depacketizer) depacketizeFuA(packet *Packet) (err error) {
payload := packet.Payload()
header := payload[0]
@@ -157,45 +157,45 @@ func (fe *h264FrameExtractor) extractFuA(packet *Packet) (err error) {
}
if (fuHeader>>7)&1 == 1 { // 第一个分片包
fe.fragments = fe.fragments[:0]
h264dp.fragments = h264dp.fragments[:0]
}
if len(fe.fragments) != 0 &&
fe.fragments[len(fe.fragments)-1].SequenceNumber != packet.SequenceNumber-1 {
if len(h264dp.fragments) != 0 &&
h264dp.fragments[len(h264dp.fragments)-1].SequenceNumber != packet.SequenceNumber-1 {
// Packet loss ?
fe.fragments = fe.fragments[:0]
h264dp.fragments = h264dp.fragments[:0]
return
}
// 缓存片段
fe.fragments = append(fe.fragments, packet)
h264dp.fragments = append(h264dp.fragments, packet)
if (fuHeader>>6)&1 == 1 { // 最后一个片段
frameLen := 1 // 计数帧总长,初始 naluType header len
for _, fragment := range fe.fragments {
for _, fragment := range h264dp.fragments {
frameLen += len(fragment.Payload()) - 2
}
frame := &av.Frame{
FrameType: av.FrameVideo,
AbsTimestamp: fe.rtp2ntp(packet.Timestamp),
frame := &codec.Frame{
FrameType: codec.FrameVideo,
AbsTimestamp: h264dp.rtp2ntp(packet.Timestamp),
Payload: make([]byte, frameLen)}
frame.Payload[0] = (header & 0x60) | (fuHeader & 0x1F)
offset := 1
for _, fragment := range fe.fragments {
for _, fragment := range h264dp.fragments {
payload := fragment.Payload()[2:]
copy(frame.Payload[offset:], payload)
offset += len(payload)
}
// 清空分片缓存
fe.fragments = fe.fragments[:0]
h264dp.fragments = h264dp.fragments[:0]
err = fe.w.WriteFrame(frame)
err = h264dp.w.WriteFrame(frame)
}
return
}
func (fe *h264FrameExtractor) rtp2ntp(timestamp uint32) int64 {
return fe.syncClock.Rtp2Ntp(timestamp)
func (h264dp *h264Depacketizer) rtp2ntp(timestamp uint32) int64 {
return h264dp.syncClock.Rtp2Ntp(timestamp)
}

View File

@@ -5,21 +5,21 @@
package rtp
import (
"github.com/cnotch/ipchub/av"
"github.com/cnotch/ipchub/av/aac"
"github.com/cnotch/ipchub/av/codec"
"github.com/cnotch/ipchub/av/codec/aac"
)
type mpesFrameExtractor struct {
w av.FrameWriter
type aacDepacketizer struct {
w codec.FrameWriter
sizeLength int
indexLength int
// extractFunc func(packet *Packet) error
syncClock SyncClock
}
// NewMPESFrameExtractor 实例化 MPES 帧提取
func NewMPESFrameExtractor(w av.FrameWriter, rtpTimeUnit int) FrameExtractor {
fe := &mpesFrameExtractor{
// NewAacDepacketizer 实例化 AAC 解包
func NewAacDepacketizer(w codec.FrameWriter, rtpTimeUnit int) Depacketizer {
fe := &aacDepacketizer{
w: w,
sizeLength: 13,
indexLength: 3,
@@ -28,8 +28,8 @@ func NewMPESFrameExtractor(w av.FrameWriter, rtpTimeUnit int) FrameExtractor {
return fe
}
func (fe *mpesFrameExtractor) Control(p *Packet) error {
fe.syncClock.Decode(p.Data)
func (aacdp *aacDepacketizer) Control(p *Packet) error {
aacdp.syncClock.Decode(p.Data)
return nil
}
@@ -51,14 +51,14 @@ func (fe *mpesFrameExtractor) Control(p *Packet) error {
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-|
// 当 sizelength=6;indexlength=2;indexdeltalength=2 时
// 单帧封装时rtp payload的长度 = AU-header-lengths(两个字节) + AU-header(6+2) + AU的长度
func (fe *mpesFrameExtractor) Extract(packet *Packet) (err error) {
if fe.syncClock.NTPTime == 0 { // 未收到同步时钟信息,忽略任意包
func (aacdp *aacDepacketizer) Depacketize(packet *Packet) (err error) {
if aacdp.syncClock.NTPTime == 0 { // 未收到同步时钟信息,忽略任意包
return
}
return fe.extractFor2ByteAUHeader(packet)
return aacdp.depacketizeFor2ByteAUHeader(packet)
}
func (fe *mpesFrameExtractor) extractFor2ByteAUHeader(packet *Packet) (err error) {
func (aacdp *aacDepacketizer) depacketizeFor2ByteAUHeader(packet *Packet) (err error) {
payload := packet.Payload()
// AU-headers-length 2bytes
@@ -73,13 +73,13 @@ func (fe *mpesFrameExtractor) extractFor2ByteAUHeader(packet *Packet) (err error
frameTimeStamp := packet.Timestamp
for i := 0; i < int(auHeadersCount); i++ {
auHeader := uint16(0) | (uint16(auHeaders[0]) << 8) | uint16(auHeaders[1])
frameSize := auHeader >> fe.indexLength
frame := &av.Frame{
FrameType: av.FrameAudio,
AbsTimestamp: fe.rtp2ntp(frameTimeStamp),
frameSize := auHeader >> aacdp.indexLength
frame := &codec.Frame{
FrameType: codec.FrameAudio,
AbsTimestamp: aacdp.rtp2ntp(frameTimeStamp),
Payload: framesPayload[:frameSize],
}
if err = fe.w.WriteFrame(frame); err != nil {
if err = aacdp.w.WriteFrame(frame); err != nil {
return
}
@@ -92,7 +92,7 @@ func (fe *mpesFrameExtractor) extractFor2ByteAUHeader(packet *Packet) (err error
return
}
func (fe *mpesFrameExtractor) extractFor1ByteAUHeader(packet *Packet) (err error) {
func (aacdp *aacDepacketizer) depacketizeFor1ByteAUHeader(packet *Packet) (err error) {
payload := packet.Payload()
// AU-headers-length 2bytes
@@ -107,13 +107,13 @@ func (fe *mpesFrameExtractor) extractFor1ByteAUHeader(packet *Packet) (err error
frameTimeStamp := packet.Timestamp
for i := 0; i < int(auHeadersCount); i++ {
auHeader := auHeaders[0]
frameSize := auHeader >> fe.indexLength
frame := &av.Frame{
FrameType: av.FrameAudio,
AbsTimestamp: fe.rtp2ntp(frameTimeStamp),
frameSize := auHeader >> aacdp.indexLength
frame := &codec.Frame{
FrameType: codec.FrameAudio,
AbsTimestamp: aacdp.rtp2ntp(frameTimeStamp),
Payload: framesPayload[:frameSize],
}
if err = fe.w.WriteFrame(frame); err != nil {
if err = aacdp.w.WriteFrame(frame); err != nil {
return
}
@@ -126,6 +126,6 @@ func (fe *mpesFrameExtractor) extractFor1ByteAUHeader(packet *Packet) (err error
return
}
func (fe *mpesFrameExtractor) rtp2ntp(timestamp uint32) int64 {
return fe.syncClock.Rtp2Ntp(timestamp)
func (aacdp *aacDepacketizer) rtp2ntp(timestamp uint32) int64 {
return aacdp.syncClock.Rtp2Ntp(timestamp)
}

View File

@@ -60,7 +60,7 @@ type Packet struct {
// PacketWriter 包装 WritePacket 方法的接口
type PacketWriter interface {
WritePacket(packet *Packet) error
WriteRtpPacket(packet *Packet) error
}
// ReadPacket 根据规范从 r 中读取 rtp 包.

View File

@@ -7,7 +7,7 @@ package cache
import (
"sync"
"github.com/cnotch/ipchub/protos/flv"
"github.com/cnotch/ipchub/av/format/flv"
"github.com/cnotch/queue"
)

View File

@@ -7,8 +7,8 @@ package cache
import (
"sync"
"github.com/cnotch/ipchub/av/h264"
"github.com/cnotch/ipchub/protos/rtp"
"github.com/cnotch/ipchub/av/codec/h264"
"github.com/cnotch/ipchub/av/format/rtp"
"github.com/cnotch/queue"
)

View File

@@ -7,8 +7,8 @@ package cache
import (
"sync"
"github.com/cnotch/ipchub/av/hevc"
"github.com/cnotch/ipchub/protos/rtp"
"github.com/cnotch/ipchub/av/codec/hevc"
"github.com/cnotch/ipchub/av/format/rtp"
"github.com/cnotch/queue"
)

View File

@@ -4,7 +4,7 @@
package cache
import "github.com/cnotch/ipchub/protos"
import "github.com/cnotch/ipchub/av/format"
// Pack .
type Pack = protos.Pack
type Pack = format.Package

View File

@@ -9,14 +9,14 @@ import (
"encoding/hex"
"strings"
"github.com/cnotch/ipchub/av"
"github.com/cnotch/ipchub/av/aac"
"github.com/cnotch/ipchub/av/h264"
"github.com/cnotch/ipchub/av/codec"
"github.com/cnotch/ipchub/av/codec/aac"
"github.com/cnotch/ipchub/av/codec/h264"
"github.com/cnotch/ipchub/utils/scan"
"github.com/pixelbender/go-sdp/sdp"
)
func parseMeta(rawsdp string, video *av.VideoMeta, audio *av.AudioMeta) {
func parseMeta(rawsdp string, video *codec.VideoMeta, audio *codec.AudioMeta) {
sdp, err := sdp.ParseString(rawsdp)
if err != nil {
return
@@ -53,7 +53,7 @@ func parseMeta(rawsdp string, video *av.VideoMeta, audio *av.AudioMeta) {
}
}
func parseAudioMeta(m *sdp.Format, audio *av.AudioMeta) {
func parseAudioMeta(m *sdp.Format, audio *codec.AudioMeta) {
audio.SampleRate = 44100
audio.Channels = 2
audio.SampleSize = 16
@@ -97,7 +97,7 @@ func parseAudioMeta(m *sdp.Format, audio *av.AudioMeta) {
}
}
func parseVideoMeta(m *sdp.Format, video *av.VideoMeta) {
func parseVideoMeta(m *sdp.Format, video *codec.VideoMeta) {
if len(m.Params) == 0 {
return
}
@@ -124,7 +124,7 @@ func parseVideoMeta(m *sdp.Format, video *av.VideoMeta) {
}
}
func parseH264SpsPps(s string, video *av.VideoMeta) {
func parseH264SpsPps(s string, video *codec.VideoMeta) {
ppsStr, spsStr, ok := scan.Comma.Scan(s)
if !ok {
return

View File

@@ -7,14 +7,14 @@ package media
import (
"io"
"github.com/cnotch/ipchub/av"
"github.com/cnotch/ipchub/protos"
"github.com/cnotch/ipchub/protos/rtp"
"github.com/cnotch/ipchub/av/codec"
"github.com/cnotch/ipchub/av/format"
"github.com/cnotch/ipchub/av/format/rtp"
"github.com/cnotch/queue"
)
// Pack .
type Pack = protos.Pack
type Pack = format.Package
type packCache interface {
CachePack(pack Pack)
@@ -33,7 +33,7 @@ func (emptyCache) Reset() {}
type flvMuxer interface {
TypeFlags() byte
av.FrameWriter
codec.FrameWriter
io.Closer
}
@@ -42,7 +42,7 @@ var _ flvMuxer = emptyFlvMuxer{}
type emptyFlvMuxer struct{}
func (emptyFlvMuxer) TypeFlags() byte { return 0 }
func (emptyFlvMuxer) WriteFrame(frame *av.Frame) error { return nil }
func (emptyFlvMuxer) WriteFrame(frame *codec.Frame) error { return nil }
func (emptyFlvMuxer) Close() error { return nil }
type frameConverter interface {
@@ -55,5 +55,5 @@ var _ frameConverter = emptyFrameConverter{}
type emptyFrameConverter struct{}
func (emptyFrameConverter) TypeFlags() byte { return 0 }
func (emptyFrameConverter) WritePacket(*rtp.Packet) error { return nil }
func (emptyFrameConverter) WriteRtpPacket(*rtp.Packet) error { return nil }
func (emptyFrameConverter) Close() error { return nil }

View File

@@ -10,13 +10,13 @@ import (
"sync/atomic"
"time"
"github.com/cnotch/ipchub/av"
"github.com/cnotch/ipchub/av/codec"
"github.com/cnotch/ipchub/av/format/flv"
"github.com/cnotch/ipchub/av/format/hls"
"github.com/cnotch/ipchub/av/format/mpegts"
"github.com/cnotch/ipchub/av/format/rtp"
"github.com/cnotch/ipchub/config"
"github.com/cnotch/ipchub/media/cache"
"github.com/cnotch/ipchub/protos/flv"
"github.com/cnotch/ipchub/protos/hls"
"github.com/cnotch/ipchub/protos/mpegts"
"github.com/cnotch/ipchub/protos/rtp"
"github.com/cnotch/ipchub/stats"
"github.com/cnotch/ipchub/utils"
"github.com/cnotch/queue"
@@ -60,8 +60,8 @@ type Stream struct {
multicast Multicastable
hls Hlsable
logger *xlog.Logger // 日志对象
Video av.VideoMeta
Audio av.AudioMeta
Video codec.VideoMeta
Audio codec.AudioMeta
}
// NewStream 创建新的流
@@ -101,17 +101,17 @@ func NewStream(path string, rawsdp string, options ...Option) *Stream {
func (s *Stream) prepareOtherStream() {
// steam(rtp)->frameconverter->stream(frame)->flvmuxer->stream(tag)
// prepare rtp.Packet -> av.Frame
var videoExtractor, audioExtractor rtp.FrameExtractor
var videoExtractor, audioExtractor rtp.Depacketizer
if s.Video.Codec == "H264" {
videoExtractor = rtp.NewH264FrameExtractor(s)
videoExtractor = rtp.NewH264Depacketize(s)
}
if s.Audio.Codec == "AAC" {
audioExtractor = rtp.NewMPESFrameExtractor(s, s.Audio.SampleRate)
audioExtractor = rtp.NewAacDepacketizer(s, s.Audio.SampleRate)
}
if videoExtractor == nil && audioExtractor == nil {
s.frameConverter = emptyFrameConverter{}
} else {
s.frameConverter = rtp.NewFrameConverter(videoExtractor, audioExtractor,
s.frameConverter = rtp.NewDemuxer(videoExtractor, audioExtractor,
s.logger.With(xlog.Fields(xlog.F("extra", "rtp2frame"))))
}
@@ -197,8 +197,8 @@ func (s *Stream) close(status int32) error {
return nil
}
// WritePacket 向流写入一个媒体包
func (s *Stream) WritePacket(packet *rtp.Packet) error {
// WriteRtpPacket 向流写入一个媒体包
func (s *Stream) WriteRtpPacket(packet *rtp.Packet) error {
status := atomic.LoadInt32(&s.status)
if status != StreamOK {
return statusErrors[status]
@@ -209,12 +209,12 @@ func (s *Stream) WritePacket(packet *rtp.Packet) error {
s.cache.CachePack(packet)
s.consumptions.SendToAll(packet)
s.frameConverter.WritePacket(packet)
s.frameConverter.WriteRtpPacket(packet)
return nil
}
// WriteFrame .
func (s *Stream) WriteFrame(frame *av.Frame) error {
func (s *Stream) WriteFrame(frame *codec.Frame) error {
if err := s.flvMuxer.WriteFrame(frame); err != nil {
s.logger.Error(err.Error())
}
@@ -227,7 +227,7 @@ func (s *Stream) WriteFrame(frame *av.Frame) error {
}
// WriteTag .
func (s *Stream) WriteTag(tag *flv.Tag) error {
func (s *Stream) WriteFlvTag(tag *flv.Tag) error {
status := atomic.LoadInt32(&s.status)
if status != StreamOK {
return statusErrors[status]
@@ -319,8 +319,8 @@ type StreamInfo struct {
Path string `json:"path"`
Addr string `json:"addr"`
Size int `json:"size"`
Video *av.VideoMeta `json:"video,omitempty"`
Audio *av.AudioMeta `json:"audio,omitempty"`
Video *codec.VideoMeta `json:"video,omitempty"`
Audio *codec.AudioMeta `json:"audio,omitempty"`
ConsumptionCount int `json:"cc"`
Consumptions []ConsumptionInfo `json:"cs,omitempty"`
}

View File

@@ -9,7 +9,7 @@ import (
"testing"
"time"
"github.com/cnotch/ipchub/protos/rtp"
"github.com/cnotch/ipchub/av/format/rtp"
"github.com/stretchr/testify/assert"
)
@@ -88,7 +88,7 @@ func Test_Consumption_Consume(t *testing.T) {
closed := false
go func() {
for !closed {
s.WritePacket(&rtp.Packet{})
s.WriteRtpPacket(&rtp.Packet{})
<-time.After(time.Millisecond * 1)
}
}()
@@ -113,7 +113,7 @@ func Test_Consumption_ConsumePanic(t *testing.T) {
closed := false
go func() {
for !closed {
s.WritePacket(&rtp.Packet{})
s.WriteRtpPacket(&rtp.Packet{})
<-time.After(time.Millisecond * 1)
}
}()
@@ -136,7 +136,7 @@ func benchDispatch(n int, b *testing.B) {
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
s.WritePacket(&rtp.Packet{})
s.WriteRtpPacket(&rtp.Packet{})
}
})
s.Close()

View File

@@ -1,104 +0,0 @@
// Copyright (c) 2019,CAOHONGJU All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package rtp
import (
"runtime/debug"
"github.com/cnotch/queue"
"github.com/cnotch/xlog"
)
// FrameExtractor 帧提取器
type FrameExtractor interface {
Control(p *Packet) error
Extract(p *Packet) error
}
// FrameConverter 帧转换器
type FrameConverter struct {
closed bool
recvQueue *queue.SyncQueue
extractFuncs [4]func(packet *Packet) error
logger *xlog.Logger
}
func emptyExtract(*Packet) error { return nil }
// NewFrameConverter 创建 Packet 到 Frame 的转换器。
func NewFrameConverter(videoExtractor FrameExtractor, audioExtractor FrameExtractor, logger *xlog.Logger) *FrameConverter {
fc := &FrameConverter{
recvQueue: queue.NewSyncQueue(),
closed: false,
logger: logger,
}
if videoExtractor != nil {
fc.extractFuncs[ChannelVideo] = videoExtractor.Extract
fc.extractFuncs[ChannelVideoControl] = videoExtractor.Control
} else {
fc.extractFuncs[ChannelVideo] = emptyExtract
fc.extractFuncs[ChannelVideoControl] = emptyExtract
}
if audioExtractor != nil {
fc.extractFuncs[ChannelAudio] = audioExtractor.Extract
fc.extractFuncs[ChannelAudioControl] = audioExtractor.Control
} else {
fc.extractFuncs[ChannelAudio] = emptyExtract
fc.extractFuncs[ChannelAudioControl] = emptyExtract
}
go fc.convert()
return fc
}
func (fc *FrameConverter) convert() {
defer func() {
defer func() { // 避免 handler 再 panic
recover()
}()
if r := recover(); r != nil {
fc.logger.Errorf("FrameConverter routine panicr = %v \n %s", r, debug.Stack())
}
// 尽早通知GC回收内存
fc.recvQueue.Reset()
}()
for !fc.closed {
p := fc.recvQueue.Pop()
if p == nil {
if !fc.closed {
fc.logger.Warn("FrameConverter:receive nil packet")
}
continue
}
packet := p.(*Packet)
if err := fc.extractFuncs[int(packet.Channel)](packet); err != nil {
fc.logger.Errorf("FrameConverter: extract rtp frame error :%s", err.Error())
// break
}
}
}
// Close .
func (fc *FrameConverter) Close() error {
if fc.closed {
return nil
}
fc.closed = true
fc.recvQueue.Signal()
return nil
}
// WritePacket .
func (fc *FrameConverter) WritePacket(packet *Packet) error {
fc.recvQueue.Push(packet)
return nil
}

View File

@@ -11,7 +11,7 @@ import (
"runtime/debug"
"github.com/cnotch/ipchub/media"
"github.com/cnotch/ipchub/protos/flv"
"github.com/cnotch/ipchub/av/format/flv"
"github.com/cnotch/ipchub/stats"
"github.com/cnotch/xlog"
)

View File

@@ -4,7 +4,7 @@
package flv
import "github.com/cnotch/ipchub/protos"
import "github.com/cnotch/ipchub/av/format"
// Pack .
type Pack = protos.Pack
type Pack = format.Package

View File

@@ -10,7 +10,7 @@ import (
"github.com/cnotch/ipchub/media"
"github.com/cnotch/ipchub/network/websocket"
"github.com/cnotch/ipchub/protos/flv"
"github.com/cnotch/ipchub/av/format/flv"
"github.com/cnotch/ipchub/stats"
"github.com/cnotch/xlog"
)

View File

@@ -363,7 +363,7 @@ func (c *PullClient) playStream() {
}
func (c *PullClient) onPack(p *RTPPack) error {
return c.stream.WritePacket(p)
return c.stream.WriteRtpPacket(p)
}
func (c *PullClient) onRequest(r *Request) (err error) {

View File

@@ -60,7 +60,7 @@ func (s *tcpPushStream) Close() error {
}
func (s *tcpPushStream) WritePacket(p *RTPPack) error {
return s.stream.WritePacket(p)
return s.stream.WriteRtpPacket(p)
}
type tcpConsumer struct {

View File

@@ -5,13 +5,13 @@
package rtsp
import (
"github.com/cnotch/ipchub/protos"
"github.com/cnotch/ipchub/protos/rtp"
"github.com/cnotch/ipchub/protos/rtsp"
"github.com/cnotch/ipchub/av/format"
"github.com/cnotch/ipchub/av/format/rtp"
"github.com/cnotch/ipchub/av/format/rtsp"
)
// Pack .
type Pack = protos.Pack
type Pack = format.Package
// .
var (