封装http-flv

This commit is contained in:
yangjiechina
2024-03-10 14:31:14 +08:00
parent 096493aa44
commit 596502d215
13 changed files with 484 additions and 210 deletions

65
api.go
View File

@@ -1,25 +1,30 @@
package main package main
import ( import (
"context"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/yangjiechina/avformat/utils" "github.com/yangjiechina/avformat/utils"
"github.com/yangjiechina/live-server/hls" "github.com/yangjiechina/live-server/flv"
"github.com/yangjiechina/live-server/stream" "github.com/yangjiechina/live-server/stream"
"net"
"net/http" "net/http"
"strings"
"time" "time"
) )
func startApiServer(addr string) { func startApiServer(addr string) {
r := mux.NewRouter() r := mux.NewRouter()
r.HandleFunc("/live/hls/{id}", onHLS) r.HandleFunc("/live/flv/{source}", onFLV)
r.HandleFunc("/live/hls/{source}", onHLS)
http.Handle("/", r) http.Handle("/", r)
srv := &http.Server{ srv := &http.Server{
Handler: r, Handler: r,
Addr: addr, Addr: addr,
// Good practice: enforce timeouts for servers you create! // Good practice: enforce timeouts for servers you create!
WriteTimeout: 15 * time.Second, WriteTimeout: 30 * time.Second,
ReadTimeout: 15 * time.Second, ReadTimeout: 30 * time.Second,
} }
err := srv.ListenAndServe() err := srv.ListenAndServe()
@@ -29,15 +34,21 @@ func startApiServer(addr string) {
} }
} }
func onHLS(w http.ResponseWriter, r *http.Request) { func onFLV(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r) vars := mux.Vars(r)
sourceId := vars["id"] source := vars["source"]
w.Header().Set("Content-Type", "video/x-flv")
w.Header().Set("Connection", "Keep-Alive")
w.Header().Set("Transfer-Encoding", "chunked")
hj, ok := w.(http.Hijacker) hj, ok := w.(http.Hijacker)
if !ok { if !ok {
http.Error(w, "webserver doesn't support hijacking", http.StatusInternalServerError) http.Error(w, "webserver doesn't support hijacking", http.StatusInternalServerError)
return return
} }
context_ := r.Context()
w.WriteHeader(http.StatusOK)
conn, _, err := hj.Hijack() conn, _, err := hj.Hijack()
if err != nil { if err != nil {
@@ -45,19 +56,43 @@ func onHLS(w http.ResponseWriter, r *http.Request) {
return return
} }
w.Header().Set("Content-Type", "application/vnd.apple.mpegurl") var sourceId string
sinkId := stream.GenerateSinkId(conn) if index := strings.LastIndex(source, "."); index > -1 {
sourceId = source[:index]
}
/* requestTS := strings.HasSuffix(r.URL.Path, ".ts") tcpAddr, _ := net.ResolveTCPAddr("tcp", r.RemoteAddr)
if requestTS { sinkId := stream.GenerateSinkId(tcpAddr)
stream.sink sink := flv.NewFLVSink(sinkId, sourceId, conn)
}*/
sink := hls.NewSink(sinkId, sourceId, w) go func(ctx context.Context) {
sink.(*stream.SinkImpl).Play(sink, func() { sink.(*stream.SinkImpl).Play(sink, func() {
//sink.(*stream.SinkImpl).PlayDone(sink, nil, nil)
}, func(state utils.HookState) { }, func(state utils.HookState) {
w.WriteHeader(http.StatusForbidden) conn.Close()
}) })
}(context_)
}
func onHLS(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
source := vars["source"]
w.Header().Set("Content-Type", "application/vnd.apple.mpegurl")
//删除末尾的.ts/.m3u8, 请确保id中不存在.
//var sourceId string
//if index := strings.LastIndex(source, "."); index > -1 {
// sourceId = source[:index]
//}
//
//tcpAddr, _ := net.ResolveTCPAddr("tcp", r.RemoteAddr)
//sinkId := stream.GenerateSinkId(tcpAddr)
if strings.HasSuffix(source, ".m3u8") {
//查询是否存在hls流, 不存在-等生成后再响应m3u8文件. 存在-直接响应m3u8文件
http.ServeFile(w, r, "../tmp/"+source)
} else if strings.HasSuffix(source, ".ts") {
http.ServeFile(w, r, "../tmp/"+source)
}
} }

10
flv/flv_sink.go Normal file
View File

@@ -0,0 +1,10 @@
package flv
import (
"github.com/yangjiechina/live-server/stream"
"net"
)
func NewFLVSink(id stream.SinkId, sourceId string, conn net.Conn) stream.ISink {
return &stream.SinkImpl{Id_: id, SourceId_: sourceId, Protocol_: stream.ProtocolFlv, Conn: conn}
}

191
flv/http_flv.go Normal file
View File

@@ -0,0 +1,191 @@
package flv
import (
"encoding/binary"
"fmt"
"github.com/yangjiechina/avformat/libflv"
"github.com/yangjiechina/avformat/utils"
"github.com/yangjiechina/live-server/stream"
)
const (
// HttpFlvBlockLengthSize 响应flv数据时需要添加flv块长度, 缓存时预留该大小字节
HttpFlvBlockLengthSize = 20
)
var separator []byte
func init() {
separator = make([]byte, 2)
separator[0] = 0x0D
separator[1] = 0x0A
}
type httpTransStream struct {
stream.CacheTransStream
muxer libflv.Muxer
header []byte
headerSize int
}
func NewHttpTransStream() stream.ITransStream {
return &httpTransStream{
muxer: libflv.NewMuxer(),
header: make([]byte, 1024),
headerSize: HttpFlvBlockLengthSize + 9,
}
}
func (t *httpTransStream) Input(packet utils.AVPacket) error {
var flvSize int
var data []byte
var videoKey bool
if utils.AVMediaTypeAudio == packet.MediaType() {
flvSize = 17 + len(packet.Data())
data = packet.Data()
} else if utils.AVMediaTypeVideo == packet.MediaType() {
flvSize = 20 + len(packet.AVCCPacketData())
data = packet.AVCCPacketData()
videoKey = packet.KeyFrame()
}
if videoKey {
head, _ := t.StreamBuffers[0].Data()
if len(head) > t.SegmentOffset {
t.StreamBuffers[0].Mark()
t.StreamBuffers[0].Allocate(2)
t.StreamBuffers[0].Fetch()
head, _ = t.StreamBuffers[0].Data()
t.writeSeparator(head[t.SegmentOffset:])
skip := t.computeSikCount(head[t.SegmentOffset:])
t.SendPacketWithOffset(head, t.SegmentOffset+skip)
}
t.SwapStreamBuffer()
}
var n int
var separatorSize int
full := t.Full(packet.Pts())
if head, _ := t.StreamBuffers[0].Data(); t.SegmentOffset == len(head) {
separatorSize = HttpFlvBlockLengthSize
//10字节描述flv包长, 前2个字节描述无效字节长度
n = HttpFlvBlockLengthSize
}
if full {
separatorSize = 2
}
t.StreamBuffers[0].Mark()
allocate := t.StreamBuffers[0].Allocate(separatorSize + flvSize)
n += t.muxer.Input(allocate[n:], packet.MediaType(), len(data), packet.Dts(), packet.Pts(), packet.KeyFrame(), false)
copy(allocate[n:], data)
_ = t.StreamBuffers[0].Fetch()
if !full {
return nil
}
head, _ := t.StreamBuffers[0].Data()
//添加长度和换行符
//每一个合并写切片开始和预留长度所需的字节数
//合并写切片末尾加上换行符
//长度是16进制字符串
t.writeSeparator(head[t.SegmentOffset:])
skip := t.computeSikCount(head[t.SegmentOffset:])
t.SendPacketWithOffset(head, t.SegmentOffset+skip)
return nil
}
func (t *httpTransStream) AddTrack(stream utils.AVStream) error {
if err := t.TransStreamImpl.AddTrack(stream); err != nil {
return err
}
var data []byte
if utils.AVMediaTypeAudio == stream.Type() {
t.muxer.AddAudioTrack(stream.CodecId(), 0, 0, 0)
data = stream.Extra()
} else if utils.AVMediaTypeVideo == stream.Type() {
t.muxer.AddVideoTrack(stream.CodecId())
data, _ = stream.M4VCExtraData()
}
t.headerSize += t.muxer.Input(t.header[t.headerSize:], stream.Type(), len(data), 0, 0, false, true)
copy(t.header[t.headerSize:], data)
t.headerSize += len(data)
return nil
}
func (t *httpTransStream) sendBuffer(sink stream.ISink, data []byte) error {
return sink.Input(data[t.computeSikCount(data):])
}
func (t *httpTransStream) computeSikCount(data []byte) int {
return int(6 + binary.BigEndian.Uint16(data[4:]))
}
func (t *httpTransStream) AddSink(sink stream.ISink) error {
utils.Assert(t.headerSize > 0)
t.TransStreamImpl.AddSink(sink)
//发送sequence header
t.sendBuffer(sink, t.header[:t.headerSize])
send := func(sink stream.ISink, data []byte) {
var index int
for ; index < len(data); index += 4 {
size := binary.BigEndian.Uint32(data[index:])
t.sendBuffer(sink, data[index:index+4+int(size)])
index += int(size)
}
}
//发送当前内存池已有的合并写切片
if t.SegmentOffset > 0 {
data, _ := t.StreamBuffers[0].Data()
utils.Assert(len(data) > 0)
send(sink, data[:t.SegmentOffset])
return nil
}
//发送上一组GOP
if t.StreamBuffers[1] != nil && !t.StreamBuffers[1].Empty() {
data, _ := t.StreamBuffers[1].Data()
utils.Assert(len(data) > 0)
send(sink, data)
return nil
}
return nil
}
func (t *httpTransStream) writeSeparator(dst []byte) {
dst[HttpFlvBlockLengthSize-2] = 0x0D
dst[HttpFlvBlockLengthSize-1] = 0x0A
flvSize := len(dst) - HttpFlvBlockLengthSize - 2
hexStr := fmt.Sprintf("%X", flvSize)
//长度+换行符
n := len(hexStr) + 2
binary.BigEndian.PutUint16(dst[4:], uint16(HttpFlvBlockLengthSize-n-6))
copy(dst[HttpFlvBlockLengthSize-n:], hexStr)
dst[HttpFlvBlockLengthSize+flvSize] = 0x0D
dst[HttpFlvBlockLengthSize+flvSize+1] = 0x0A
binary.BigEndian.PutUint32(dst, uint32(len(dst)-4))
}
func (t *httpTransStream) WriteHeader() error {
t.Init()
_ = t.muxer.WriteHeader(t.header[HttpFlvBlockLengthSize:])
t.headerSize += 2
t.writeSeparator(t.header[:t.headerSize])
return nil
}

View File

@@ -1,6 +1,7 @@
package main package main
import ( import (
"github.com/yangjiechina/live-server/flv"
"github.com/yangjiechina/live-server/hls" "github.com/yangjiechina/live-server/hls"
"net" "net"
"net/http" "net/http"
@@ -27,6 +28,8 @@ func CreateTransStream(source stream.ISource, protocol stream.Protocol, streams
} }
return transStream return transStream
} else if stream.ProtocolFlv == protocol {
return flv.NewHttpTransStream()
} }
return nil return nil

View File

@@ -55,7 +55,7 @@ func (s *sessionImpl) OnPlay(app, stream_ string, response chan utils.HookState)
sourceId := app + "_" + stream_ sourceId := app + "_" + stream_
//拉流事件Sink统一处理 //拉流事件Sink统一处理
sink := NewSink(stream.GenerateSinkId(s.conn), sourceId, s.conn) sink := NewSink(stream.GenerateSinkId(s.conn.RemoteAddr()), sourceId, s.conn)
sink.(*stream.SinkImpl).Play(sink, func() { sink.(*stream.SinkImpl).Play(sink, func() {
s.handle = sink s.handle = sink
response <- utils.HookStateOK response <- utils.HookStateOK

View File

@@ -1,7 +1,6 @@
package rtmp package rtmp
import ( import (
"fmt"
"github.com/yangjiechina/avformat/libflv" "github.com/yangjiechina/avformat/libflv"
"github.com/yangjiechina/avformat/librtmp" "github.com/yangjiechina/avformat/librtmp"
"github.com/yangjiechina/avformat/utils" "github.com/yangjiechina/avformat/utils"
@@ -9,41 +8,19 @@ import (
) )
type TransStream struct { type TransStream struct {
stream.TransStreamImpl stream.CacheTransStream
chunkSize int chunkSize int
//sequence header //sequence header
header []byte header []byte
headerSize int headerSize int
muxer *libflv.Muxer muxer libflv.Muxer
//只存在音频流
onlyAudio bool
audioChunk librtmp.Chunk audioChunk librtmp.Chunk
videoChunk librtmp.Chunk videoChunk librtmp.Chunk
//只需要缓存一组GOP+第2组GOP的第一个合并写切片
//当缓存到第2组GOP的第二个合并写切片时将上一个GOP缓存释放掉
//使用2块内存池分别缓存2个GOP保证内存连续一次发送
//不开启GOP缓存和只有音频包的情况下创建使用一个MemoryPool
memoryPool [2]stream.MemoryPool
//当前合并写切片的缓存时长
segmentDuration int
//当前合并写切片位于memoryPool的开始偏移量
segmentOffset int
//前一个包的时间戳
prePacketTS int64
firstVideoPacket bool
//发送未完整切片的Sinks
//当AddSink时还未缓存到一组切片有多少先发多少. 后续切片未满之前的生成的rtmp包都将直接发送给sink.
//只要满了一组切片后这些sink都不单独发包, 统一发送切片.
incompleteSinks []stream.ISink
} }
func NewTransStream(chunkSize int) stream.ITransStream { func NewTransStream(chunkSize int) stream.ITransStream {
transStream := &TransStream{chunkSize: chunkSize, TransStreamImpl: stream.TransStreamImpl{Sinks: make(map[stream.SinkId]stream.ISink, 64)}} transStream := &TransStream{chunkSize: chunkSize}
return transStream return transStream
} }
@@ -64,15 +41,6 @@ func (t *TransStream) Input(packet utils.AVPacket) error {
chunk = &t.audioChunk chunk = &t.audioChunk
payloadSize += 2 + length payloadSize += 2 + length
} else if utils.AVMediaTypeVideo == packet.MediaType() { } else if utils.AVMediaTypeVideo == packet.MediaType() {
//首帧必须要视频关键帧
if !t.firstVideoPacket {
if !packet.KeyFrame() {
return fmt.Errorf("the first video frame must be a keyframe")
}
t.firstVideoPacket = true
}
videoPkt = true videoPkt = true
videoKey = packet.KeyFrame() videoKey = packet.KeyFrame()
data = packet.AVCCPacketData() data = packet.AVCCPacketData()
@@ -81,33 +49,16 @@ func (t *TransStream) Input(packet utils.AVPacket) error {
payloadSize += 5 + length payloadSize += 5 + length
} }
//即不开启GOP缓存又不合并发送. 直接使用AVPacket的预留头封装发送
if !stream.AppConfig.GOPCache || t.onlyAudio {
//首帧视频帧必须要关键帧
return nil
}
if videoKey { if videoKey {
tmp := t.memoryPool[0] tmp := t.StreamBuffers[0]
head, _ := tmp.Data() head, _ := tmp.Data()
if len(head) > t.segmentOffset { t.SendPacket(head[t.SegmentOffset:])
for _, sink := range t.Sinks { t.SwapStreamBuffer()
sink.Input(head[t.segmentOffset:])
}
}
t.memoryPool[0].Clear()
//交替使用缓存
t.memoryPool[0] = t.memoryPool[1]
t.memoryPool[1] = tmp
t.segmentDuration = 0
t.segmentOffset = 0
} }
//分配内存 //分配内存
t.memoryPool[0].Mark() t.StreamBuffers[0].Mark()
allocate := t.memoryPool[0].Allocate(12 + payloadSize + (payloadSize / t.chunkSize)) allocate := t.StreamBuffers[0].Allocate(12 + payloadSize + (payloadSize / t.chunkSize))
//写chunk头 //写chunk头
chunk.Length = payloadSize chunk.Length = payloadSize
@@ -119,77 +70,19 @@ func (t *TransStream) Input(packet utils.AVPacket) error {
ct := packet.Pts() - packet.Dts() ct := packet.Pts() - packet.Dts()
if videoPkt { if videoPkt {
n += t.muxer.WriteVideoData(allocate[12:], uint32(ct), packet.KeyFrame(), false) n += t.muxer.WriteVideoData(allocate[12:], uint32(ct), packet.KeyFrame(), false)
n += chunk.WriteData(allocate[n:], data, t.chunkSize)
} else { } else {
n += t.muxer.WriteAudioData(allocate[12:], false) n += t.muxer.WriteAudioData(allocate[12:], false)
n += chunk.WriteData(allocate[n:], data, t.chunkSize)
} }
first := true _ = t.StreamBuffers[0].Fetch()[:n]
for length > 0 { if t.Full(packet.Pts()) {
var min int
if first {
min = utils.MinInt(length, t.chunkSize-5)
first = false
} else {
min = utils.MinInt(length, t.chunkSize)
}
copy(allocate[n:], data[:min])
n += min
length -= min
data = data[min:]
//写一个ChunkType3用作分割
if length > 0 {
if videoPkt {
allocate[n] = (0x3 << 6) | byte(librtmp.ChunkStreamIdVideo)
} else {
allocate[n] = (0x3 << 6) | byte(librtmp.ChunkStreamIdAudio)
}
n++
}
}
rtmpData := t.memoryPool[0].Fetch()[:n]
t.segmentDuration += int(packet.Pts() - t.prePacketTS)
t.prePacketTS = packet.Pts()
//给不完整切片的Sink补齐包
if len(t.incompleteSinks) > 0 {
for _, sink := range t.incompleteSinks {
sink.Input(rtmpData)
}
if t.segmentDuration >= stream.AppConfig.MergeWriteLatency {
head, tail := t.memoryPool[0].Data()
utils.Assert(len(tail) == 0)
t.segmentOffset = len(head)
t.segmentDuration = 0
t.incompleteSinks = nil
}
return nil return nil
} }
if t.segmentDuration < stream.AppConfig.MergeWriteLatency { head, _ := t.StreamBuffers[0].Data()
return nil t.SendPacketWithOffset(head[:], t.SegmentOffset)
}
head, tail := t.memoryPool[0].Data()
utils.Assert(len(tail) == 0)
for _, sink := range t.Sinks {
sink.Input(head[t.segmentOffset:])
}
t.segmentOffset = len(head)
t.segmentDuration = 0
//当缓存到第2组GOP的第二个合并写切片时将上一个GOP缓存释放掉
if t.segmentOffset > len(head) && t.memoryPool[1] != nil && !t.memoryPool[1].Empty() {
t.memoryPool[1].Clear()
}
return nil return nil
} }
@@ -200,36 +93,22 @@ func (t *TransStream) AddSink(sink stream.ISink) error {
//发送sequence header //发送sequence header
sink.Input(t.header[:t.headerSize]) sink.Input(t.header[:t.headerSize])
if !stream.AppConfig.GOPCache || t.onlyAudio {
return nil
}
//发送当前内存池已有的合并写切片 //发送当前内存池已有的合并写切片
if t.segmentOffset > 0 { if t.SegmentOffset > 0 {
data, tail := t.memoryPool[0].Data() data, _ := t.StreamBuffers[0].Data()
utils.Assert(len(data) > 0) utils.Assert(len(data) > 0)
utils.Assert(len(tail) == 0) sink.Input(data[:t.SegmentOffset])
sink.Input(data[:t.segmentOffset])
return nil return nil
} }
//发送上一组GOP //发送上一组GOP
if t.memoryPool[1] != nil && !t.memoryPool[1].Empty() { if t.StreamBuffers[1] != nil && !t.StreamBuffers[1].Empty() {
data, tail := t.memoryPool[0].Data() data, _ := t.StreamBuffers[0].Data()
utils.Assert(len(data) > 0) utils.Assert(len(data) > 0)
utils.Assert(len(tail) == 0)
sink.Input(data) sink.Input(data)
return nil return nil
} }
//不足一个合并写切片, 有多少发多少
data, tail := t.memoryPool[0].Data()
utils.Assert(len(tail) == 0)
if len(data) > 0 {
sink.Input(data)
t.incompleteSinks = append(t.incompleteSinks, sink)
}
return nil return nil
} }
@@ -237,6 +116,8 @@ func (t *TransStream) WriteHeader() error {
utils.Assert(t.Tracks != nil) utils.Assert(t.Tracks != nil)
utils.Assert(!t.TransStreamImpl.Completed) utils.Assert(!t.TransStreamImpl.Completed)
t.Init()
var audioStream utils.AVStream var audioStream utils.AVStream
var videoStream utils.AVStream var videoStream utils.AVStream
var audioCodecId utils.AVCodecID var audioCodecId utils.AVCodecID
@@ -259,16 +140,16 @@ func (t *TransStream) WriteHeader() error {
//初始化 //初始化
t.TransStreamImpl.Completed = true t.TransStreamImpl.Completed = true
t.header = make([]byte, 1024) t.header = make([]byte, 1024)
t.muxer = libflv.NewMuxer(audioCodecId, videoCodecId, 0, 0, 0) t.muxer = libflv.NewMuxer()
if utils.AVCodecIdNONE != audioCodecId {
if stream.AppConfig.GOPCache { t.muxer.AddAudioTrack(audioCodecId, 0, 0, 0)
//创建2块内存
t.memoryPool[0] = stream.NewMemoryPoolWithDirect(1024*4000, true)
t.memoryPool[1] = stream.NewMemoryPoolWithDirect(1024*4000, true)
} else {
} }
if utils.AVCodecIdNONE != videoCodecId {
t.muxer.AddVideoTrack(videoCodecId)
}
//统一生成rtmp拉流需要的数据头(chunk+sequence header)
var n int var n int
if audioStream != nil { if audioStream != nil {
n += t.muxer.WriteAudioData(t.header[12:], true) n += t.muxer.WriteAudioData(t.header[12:], true)

View File

@@ -8,23 +8,6 @@ import (
) )
type HookFunc func(m map[string]interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error type HookFunc func(m map[string]interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error
type Hook interface {
DoPublish(m map[string]interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error
DoPublishDone(m map[string]interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error
DoPlay(m map[string]interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error
DoPlayDone(m map[string]interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error
DoRecord(m map[string]interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error
DoIdleTimeout(m map[string]interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error
DoRecvTimeout(m map[string]interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error
}
type HookEvent int type HookEvent int
const ( const (
@@ -44,8 +27,12 @@ type eventInfo struct {
remoteAddr string //peer地址 remoteAddr string //peer地址
} }
func NewHookEventInfo(stream, protocol, remoteAddr string) eventInfo { func NewPlayHookEventInfo(stream, remoteAddr string, protocol Protocol) eventInfo {
return eventInfo{stream: stream, protocol: protocol, remoteAddr: remoteAddr} return eventInfo{stream: stream, protocol: streamTypeToStr(protocol), remoteAddr: remoteAddr}
}
func NewPublishHookEventInfo(stream, remoteAddr string, protocol SourceType) eventInfo {
return eventInfo{stream: stream, protocol: sourceTypeToStr(protocol), remoteAddr: remoteAddr}
} }
type HookSession interface { type HookSession interface {

View File

@@ -187,12 +187,11 @@ func (m *memoryPool) FreeTail() {
} }
func (m *memoryPool) Data() ([]byte, []byte) { func (m *memoryPool) Data() ([]byte, []byte) {
if m.tail <= m.head { if m.tail <= m.head && !m.blockQueue.IsEmpty() {
return m.data[m.head:m.capacity], m.data[:m.tail] return m.data[m.head:m.capacity], m.data[:m.tail]
} else { } else {
return m.data[m.head:m.tail], nil return m.data[m.head:m.tail], nil
} }
} }
func (m *memoryPool) Clear() { func (m *memoryPool) Clear() {

View File

@@ -5,7 +5,6 @@ import (
"github.com/yangjiechina/avformat/utils" "github.com/yangjiechina/avformat/utils"
"net" "net"
"net/http" "net/http"
"sync/atomic"
) )
type SinkId interface{} type SinkId interface{}
@@ -41,24 +40,24 @@ type ISink interface {
Close() Close()
} }
// GenerateSinkId 根据Conn生成SinkId IPV4使用一个uint64, IPV6使用String // GenerateSinkId 根据网络地址生成SinkId IPV4使用一个uint64, IPV6使用String
func GenerateSinkId(conn net.Conn) SinkId { func GenerateSinkId(addr net.Addr) SinkId {
network := conn.RemoteAddr().Network() network := addr.Network()
if "tcp" == network { if "tcp" == network {
id := uint64(utils.BytesToInt(conn.RemoteAddr().(*net.TCPAddr).IP.To4())) id := uint64(utils.BytesToInt(addr.(*net.TCPAddr).IP.To4()))
id <<= 32 id <<= 32
id |= uint64(conn.RemoteAddr().(*net.TCPAddr).Port << 16) id |= uint64(addr.(*net.TCPAddr).Port << 16)
return id return id
} else if "udp" == network { } else if "udp" == network {
id := uint64(utils.BytesToInt(conn.RemoteAddr().(*net.UDPAddr).IP.To4())) id := uint64(utils.BytesToInt(addr.(*net.UDPAddr).IP.To4()))
id <<= 32 id <<= 32
id |= uint64(conn.RemoteAddr().(*net.UDPAddr).Port << 16) id |= uint64(addr.(*net.UDPAddr).Port << 16)
return id return id
} }
return conn.RemoteAddr().String() return addr.String()
} }
type SinkImpl struct { type SinkImpl struct {
@@ -74,7 +73,7 @@ type SinkImpl struct {
//Sink在请求拉流->Source推流->Sink断开整个阶段 是无锁线程安全 //Sink在请求拉流->Source推流->Sink断开整个阶段 是无锁线程安全
//如果Sink在等待队列-Sink断开这个过程是非线程安全的 //如果Sink在等待队列-Sink断开这个过程是非线程安全的
//SetState的时候如果closed为true返回false, 调用者自行删除sink //SetState的时候如果closed为true返回false, 调用者自行删除sink
closed atomic.Bool //closed atomic.Bool
//HasSentKeyVideo 是否已经发送视频关键帧 //HasSentKeyVideo 是否已经发送视频关键帧
//未开启GOP缓存的情况下为避免播放花屏发送的首个视频帧必须为关键帧 //未开启GOP缓存的情况下为避免播放花屏发送的首个视频帧必须为关键帧
@@ -121,10 +120,10 @@ func (s *SinkImpl) State() SessionState {
} }
func (s *SinkImpl) SetState(state SessionState) bool { func (s *SinkImpl) SetState(state SessionState) bool {
load := s.closed.Load() //load := s.closed.Load()
if load { //if load {
return false // return false
} //}
if s.State_ < SessionStateClose { if s.State_ < SessionStateClose {
s.State_ = state s.State_ = state
@@ -136,7 +135,8 @@ func (s *SinkImpl) SetState(state SessionState) bool {
// //
//} //}
return !s.closed.Load() //return !s.closed.Load()
return true
} }
func (s *SinkImpl) EnableVideo() bool { func (s *SinkImpl) EnableVideo() bool {
@@ -166,7 +166,7 @@ func (s *SinkImpl) Close() {
//从等待队列中删除sink //从等待队列中删除sink
RemoveSinkFromWaitingQueue(s.SourceId_, s.Id_) RemoveSinkFromWaitingQueue(s.SourceId_, s.Id_)
s.State_ = SessionStateClose s.State_ = SessionStateClose
s.closed.Store(true) //s.closed.Store(true)
} }
} }
@@ -188,7 +188,7 @@ func (s *SinkImpl) Play(sink ISink, success func(), failure func(state utils.Hoo
return return
} }
err := s.Hook(HookEventPlay, NewHookEventInfo(sink.SourceId(), streamTypeToStr(sink.Protocol()), ""), func(response *http.Response) { err := s.Hook(HookEventPlay, NewPlayHookEventInfo(sink.SourceId(), "", sink.Protocol()), func(response *http.Response) {
f() f()
success() success()
}, func(response *http.Response, err error) { }, func(response *http.Response, err error) {

View File

@@ -1,10 +1,14 @@
package stream package stream
import "sync" import (
"fmt"
"sync"
)
// 等待队列所有的Sink
var waitingSinks map[string]map[SinkId]ISink var waitingSinks map[string]map[SinkId]ISink
var mutex sync.Mutex var mutex sync.RWMutex
func init() { func init() {
waitingSinks = make(map[string]map[SinkId]ISink, 1024) waitingSinks = make(map[string]map[SinkId]ISink, 1024)
@@ -56,5 +60,83 @@ func PopWaitingSinks(sourceId string) []ISink {
for _, sink := range source { for _, sink := range source {
sinks[index] = sink sinks[index] = sink
} }
delete(waitingSinks, sourceId)
return sinks return sinks
} }
func ExistSinkInWaitingQueue(sourceId string, sinkId SinkId) bool {
mutex.RLock()
defer mutex.RUnlock()
source, ok := waitingSinks[sourceId]
if !ok {
return false
}
_, ok = source[sinkId]
return ok
}
func ExistSink(sourceId string, sinkId SinkId) bool {
if sourceId != "" {
if exist := ExistSinkInWaitingQueue(sourceId, sinkId); exist {
return true
}
}
return SinkManager.Exist(sinkId)
}
// ISinkManager 添加到TransStream的所有Sink
type ISinkManager interface {
Add(source ISink) error
Find(id SinkId) ISink
Remove(id SinkId) (ISink, error)
Exist(id SinkId) bool
}
var SinkManager ISinkManager
func init() {
SinkManager = &sinkManagerImpl{}
}
type sinkManagerImpl struct {
m sync.Map
}
func (s *sinkManagerImpl) Add(source ISink) error {
_, ok := s.m.LoadOrStore(source.Id(), source)
if ok {
return fmt.Errorf("the source %s has been exist", source.Id())
}
return nil
}
func (s *sinkManagerImpl) Find(id SinkId) ISink {
value, ok := s.m.Load(id)
if ok {
return value.(ISink)
}
return nil
}
func (s *sinkManagerImpl) Remove(id SinkId) (ISink, error) {
value, loaded := s.m.LoadAndDelete(id)
if loaded {
return value.(ISink), nil
}
return nil, fmt.Errorf("source with id %s was not find", id)
}
func (s *sinkManagerImpl) Exist(id SinkId) bool {
_, ok := s.m.Load(id)
return ok
}

View File

@@ -494,7 +494,7 @@ func (s *SourceImpl) Publish(source ISource, success func(), failure func(state
return return
} }
err := s.Hook(HookEventPublish, NewHookEventInfo(source.Id(), sourceTypeToStr(source.Type()), ""), err := s.Hook(HookEventPublish, NewPublishHookEventInfo(source.Id(), "", source.Type()),
func(response *http.Response) { func(response *http.Response) {
if err := SourceManager.Add(source); err == nil { if err := SourceManager.Add(source); err == nil {
success() success()

View File

@@ -4,9 +4,22 @@ import (
"github.com/yangjiechina/avformat/utils" "github.com/yangjiechina/avformat/utils"
) )
type IStreamManager interface {
Add(stream utils.AVStream)
FindStream(id utils.AVCodecID) utils.AVStream
FindStreamWithType(mediaType utils.AVMediaType) utils.AVStream
FindStreams(id utils.AVCodecID) []utils.AVStream
FindStreamsWithType(mediaType utils.AVMediaType) []utils.AVStream
All() []utils.AVStream
}
type StreamManager struct { type StreamManager struct {
streams []utils.AVStream streams []utils.AVStream
completed bool
} }
func (s *StreamManager) Add(stream utils.AVStream) { func (s *StreamManager) Add(stream utils.AVStream) {

View File

@@ -66,6 +66,8 @@ var TransStreamFactory func(source ISource, protocol Protocol, streams []utils.A
// ITransStream 讲AVPacket封装成传输流转发给各个Sink // ITransStream 讲AVPacket封装成传输流转发给各个Sink
type ITransStream interface { type ITransStream interface {
Init()
Input(packet utils.AVPacket) error Input(packet utils.AVPacket) error
AddTrack(stream utils.AVStream) error AddTrack(stream utils.AVStream) error
@@ -81,6 +83,8 @@ type ITransStream interface {
AllSink() []ISink AllSink() []ISink
Close() error Close() error
SendPacket(data []byte) error
} }
type TransStreamImpl struct { type TransStreamImpl struct {
@@ -92,6 +96,10 @@ type TransStreamImpl struct {
ExistVideo bool ExistVideo bool
} }
func (t *TransStreamImpl) Init() {
t.Sinks = make(map[SinkId]ISink, 64)
}
func (t *TransStreamImpl) Input(packet utils.AVPacket) error { func (t *TransStreamImpl) Input(packet utils.AVPacket) error {
return nil return nil
} }
@@ -134,3 +142,68 @@ func (t *TransStreamImpl) AllSink() []ISink {
func (t *TransStreamImpl) Close() error { func (t *TransStreamImpl) Close() error {
return nil return nil
} }
func (t *TransStreamImpl) SendPacket(data []byte) error {
for _, sink := range t.Sinks {
sink.Input(data)
}
return nil
}
// CacheTransStream 针对RTMP/FLV/HLS等基于TCP传输的带缓存传输流.
type CacheTransStream struct {
TransStreamImpl
//作为封装流的内存缓存区, 即使没有开启GOP缓存也创建一个, 开启GOP缓存的情况下, 创建2个, 反复交替使用.
StreamBuffers []MemoryPool
//当前合并写切片位于memoryPool的开始偏移量
SegmentOffset int
//前一个包的时间戳
PrePacketTS int64
}
func (c *CacheTransStream) Init() {
c.TransStreamImpl.Init()
c.StreamBuffers = make([]MemoryPool, 2)
c.StreamBuffers[0] = NewMemoryPoolWithDirect(1024*4000, true)
if c.ExistVideo && AppConfig.MergeWriteLatency > 0 {
c.StreamBuffers[1] = NewMemoryPoolWithDirect(1024*4000, true)
}
c.SegmentOffset = 0
c.PrePacketTS = -1
}
func (c *CacheTransStream) Full(ts int64) bool {
if c.PrePacketTS == -1 {
c.PrePacketTS = ts
}
if ts < c.PrePacketTS {
c.PrePacketTS = ts
}
return int(ts-c.PrePacketTS) >= AppConfig.MergeWriteLatency
}
func (c *CacheTransStream) SwapStreamBuffer() {
utils.Assert(c.ExistVideo)
tmp := c.StreamBuffers[0]
c.StreamBuffers[0] = c.StreamBuffers[1]
c.StreamBuffers[1] = tmp
c.StreamBuffers[0].Clear()
c.PrePacketTS = -1
c.SegmentOffset = 0
}
func (c *CacheTransStream) SendPacketWithOffset(data []byte, offset int) error {
c.TransStreamImpl.SendPacket(data[offset:])
c.SegmentOffset = len(data)
c.PrePacketTS = -1
return nil
}