支持国标级联

This commit is contained in:
yangjiechina
2024-10-22 11:23:43 +08:00
parent e19a8708bd
commit 9090e28077
49 changed files with 1437 additions and 805 deletions

310
api.go
View File

@@ -7,7 +7,6 @@ import (
"github.com/gorilla/websocket"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/lkm/flv"
"github.com/lkmio/lkm/gb28181"
"github.com/lkmio/lkm/hls"
"github.com/lkmio/lkm/log"
"github.com/lkmio/lkm/rtc"
@@ -17,6 +16,7 @@ import (
"net/http"
"os"
"runtime"
"strconv"
"strings"
"sync"
"time"
@@ -41,10 +41,11 @@ func init() {
}
}
func withCheckParams(f func(sourceId string, w http.ResponseWriter, req *http.Request), suffix string) func(http.ResponseWriter, *http.Request) {
func filterSourceID(f func(sourceId string, w http.ResponseWriter, req *http.Request), suffix string) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, req *http.Request) {
source, err := stream.Path2SourceId(req.URL.Path, suffix)
if err != nil {
log.Sugar.Errorf("拉流失败 解析流id发生err: %s path: %s", err.Error(), req.URL.Path)
httpResponse(w, http.StatusBadRequest, err.Error())
return
}
@@ -53,6 +54,24 @@ func withCheckParams(f func(sourceId string, w http.ResponseWriter, req *http.Re
}
}
type IDS struct {
// 内部SinkID可能是uint64或者string类型, 但外部传参均使用string类型程序内部自行兼容ipv6.
Sink string `json:"sink"`
Source string `json:"source"`
}
func filterRequestBodyParams[T any](f func(params T, w http.ResponseWriter, req *http.Request), params interface{}) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, req *http.Request) {
if err := HttpDecodeJSONBody(w, req, params); err != nil {
log.Sugar.Errorf("处理http请求失败 err: %s path: %s", err.Error(), req.URL.Path)
httpResponse2(w, err)
return
}
f(params.(T), w, req)
}
}
func startApiServer(addr string) {
/**
http://host:port/xxx.flv
@@ -61,20 +80,27 @@ func startApiServer(addr string) {
http://host:port/xxx_0.ts
ws://host:port/xxx.flv
*/
//{source}.flv和/{source}/{stream}.flv意味着, 推流id(路径)只能一层
apiServer.router.HandleFunc("/{source}.flv", withCheckParams(apiServer.onFlv, ".flv"))
apiServer.router.HandleFunc("/{source}/{stream}.flv", withCheckParams(apiServer.onFlv, ".flv"))
apiServer.router.HandleFunc("/{source}.m3u8", withCheckParams(apiServer.onHLS, ".m3u8"))
apiServer.router.HandleFunc("/{source}/{stream}.m3u8", withCheckParams(apiServer.onHLS, ".m3u8"))
apiServer.router.HandleFunc("/{source}.ts", withCheckParams(apiServer.onTS, ".ts"))
apiServer.router.HandleFunc("/{source}/{stream}.ts", withCheckParams(apiServer.onTS, ".ts"))
apiServer.router.HandleFunc("/{source}.rtc", withCheckParams(apiServer.onRtc, ".rtc"))
apiServer.router.HandleFunc("/{source}/{stream}.rtc", withCheckParams(apiServer.onRtc, ".rtc"))
// {source}.flv和/{source}/{stream}.flv意味着, 推流id(路径)只能嵌套一层
apiServer.router.HandleFunc("/{source}.flv", filterSourceID(apiServer.onFlv, ".flv"))
apiServer.router.HandleFunc("/{source}/{stream}.flv", filterSourceID(apiServer.onFlv, ".flv"))
apiServer.router.HandleFunc("/{source}.m3u8", filterSourceID(apiServer.onHLS, ".m3u8"))
apiServer.router.HandleFunc("/{source}/{stream}.m3u8", filterSourceID(apiServer.onHLS, ".m3u8"))
apiServer.router.HandleFunc("/{source}.ts", filterSourceID(apiServer.onTS, ".ts"))
apiServer.router.HandleFunc("/{source}/{stream}.ts", filterSourceID(apiServer.onTS, ".ts"))
apiServer.router.HandleFunc("/{source}.rtc", filterSourceID(apiServer.onRtc, ".rtc"))
apiServer.router.HandleFunc("/{source}/{stream}.rtc", filterSourceID(apiServer.onRtc, ".rtc"))
apiServer.router.HandleFunc("/api/v1/source/list", apiServer.OnSourceList) // 查询所有推流源
apiServer.router.HandleFunc("/api/v1/source/close", filterRequestBodyParams(apiServer.OnSourceClose, &IDS{})) // 关闭推流源
apiServer.router.HandleFunc("/api/v1/sink/list", filterRequestBodyParams(apiServer.OnSinkList, &IDS{})) // 查询某个推流源下,所有的拉流端列表
apiServer.router.HandleFunc("/api/v1/sink/close", filterRequestBodyParams(apiServer.OnSinkClose, &IDS{})) // 关闭拉流端
apiServer.router.HandleFunc("/api/v1/streams/statistics", nil) // 统计所有推拉流
apiServer.router.HandleFunc("/api/v1/gb28181/forward", filterRequestBodyParams(apiServer.OnGBSourceForward, &GBForwardParams{})) // 设置级联转发目标停止级联调用sink/close接口级联断开会走on_play_done事件通知
apiServer.router.HandleFunc("/api/v1/gb28181/source/create", filterRequestBodyParams(apiServer.OnGBSourceCreate, &GBSourceParams{})) // 创建国标推流源
apiServer.router.HandleFunc("/api/v1/gb28181/source/connect", filterRequestBodyParams(apiServer.OnGBSourceConnect, &GBConnect{})) // 为国标TCP主动推流设置连接地址
apiServer.router.HandleFunc("/api/v1/gb28181/source/create", apiServer.createGBSource)
//TCP主动,设置连接地址
apiServer.router.HandleFunc("/api/v1/gb28181/source/connect", apiServer.connectGBSource)
apiServer.router.HandleFunc("/api/v1/gb28181/source/close", apiServer.closeGBSource)
apiServer.router.HandleFunc("/api/v1/gc/force", func(writer http.ResponseWriter, request *http.Request) {
runtime.GC()
writer.WriteHeader(http.StatusOK)
@@ -100,173 +126,17 @@ func startApiServer(addr string) {
}
}
func (api *ApiServer) createGBSource(w http.ResponseWriter, r *http.Request) {
//请求参数
v := &struct {
Source string `json:"source"` //SourceId
Setup string `json:"setup"` //active/passive
SSRC uint32 `json:"ssrc,omitempty"`
}{}
//返回监听的端口
response := &struct {
IP string `json:"ip"`
Port int `json:"port,omitempty"`
}{}
var err error
defer func() {
if err != nil {
log.Sugar.Errorf(err.Error())
httpResponse2(w, err)
}
}()
if err = HttpDecodeJSONBody(w, r, v); err != nil {
return
}
log.Sugar.Infof("gb create:%v", v)
source := stream.SourceManager.Find(v.Source)
if source != nil {
err = &MalformedRequest{Code: http.StatusBadRequest, Msg: fmt.Sprintf("创建GB28181 Source失败 %s 已经存在", v.Source)}
return
}
tcp := true
var active bool
if v.Setup == "passive" {
} else if v.Setup == "active" {
active = true
} else {
tcp = false
//udp收流
}
if tcp && active {
if !stream.AppConfig.GB28181.IsMultiPort() {
err = &MalformedRequest{Code: http.StatusBadRequest, Msg: "创建GB28181 Source失败, 单端口模式下不能主动拉流"}
} else if !tcp {
err = &MalformedRequest{Code: http.StatusBadRequest, Msg: "创建GB28181 Source失败, UDP不能主动拉流"}
} else if !stream.AppConfig.GB28181.IsEnableTCP() {
err = &MalformedRequest{Code: http.StatusBadRequest, Msg: "创建GB28181 Source失败, 未开启TCP, UDP不能主动拉流"}
}
if err != nil {
return
}
}
_, port, err := gb28181.NewGBSource(v.Source, v.SSRC, tcp, active)
if err != nil {
err = &MalformedRequest{Code: http.StatusInternalServerError, Msg: fmt.Sprintf("创建GB28181 Source失败 err:%s", err.Error())}
return
}
response.IP = stream.AppConfig.PublicIP
response.Port = port
httpResponseOk(w, response)
}
func (api *ApiServer) connectGBSource(w http.ResponseWriter, r *http.Request) {
//请求参数
v := &struct {
Source string `json:"source"` //SourceId
RemoteAddr string `json:"remote_addr"`
}{}
var err error
defer func() {
if err != nil {
log.Sugar.Errorf(err.Error())
httpResponse2(w, err)
}
}()
if err = HttpDecodeJSONBody(w, r, v); err != nil {
return
}
log.Sugar.Infof("gb connect:%v", v)
source := stream.SourceManager.Find(v.Source)
if source == nil {
err = &MalformedRequest{Code: http.StatusBadRequest, Msg: "gb28181 source 不存在"}
return
}
activeSource, ok := source.(*gb28181.ActiveSource)
if !ok {
err = &MalformedRequest{Code: http.StatusBadRequest, Msg: "gbsource 不能转为active source"}
return
}
addr, err := net.ResolveTCPAddr("tcp", v.RemoteAddr)
if err != nil {
err = &MalformedRequest{Code: http.StatusBadRequest, Msg: "解析连接地址失败"}
return
}
err = activeSource.Connect(addr)
if err != nil {
err = &MalformedRequest{Code: http.StatusBadRequest, Msg: fmt.Sprintf("连接Server失败 err:%s", err.Error())}
return
}
httpResponseOk(w, nil)
}
func (api *ApiServer) closeGBSource(w http.ResponseWriter, r *http.Request) {
//请求参数
v := &struct {
Source string `json:"source"` //SourceId
}{}
var err error
defer func() {
if err != nil {
log.Sugar.Errorf(err.Error())
httpResponse2(w, err)
}
}()
if err = HttpDecodeJSONBody(w, r, v); err != nil {
httpResponse2(w, err)
return
}
log.Sugar.Infof("gb close:%v", v)
source := stream.SourceManager.Find(v.Source)
if source == nil {
err = &MalformedRequest{Code: http.StatusBadRequest, Msg: "gb28181 source 不存在"}
return
}
source.Close()
httpResponseOk(w, nil)
}
func (api *ApiServer) generateSinkId(remoteAddr string) stream.SinkId {
func (api *ApiServer) generateSinkId(remoteAddr string) stream.SinkID {
tcpAddr, err := net.ResolveTCPAddr("tcp", remoteAddr)
if err != nil {
panic(err)
}
return stream.GenerateSinkId(tcpAddr)
}
func (api *ApiServer) generateSourceId(remoteAddr string) stream.SinkId {
tcpAddr, err := net.ResolveTCPAddr("tcp", remoteAddr)
if err != nil {
panic(err)
}
return stream.GenerateSinkId(tcpAddr)
return stream.NetAddr2SinkId(tcpAddr)
}
func (api *ApiServer) onFlv(sourceId string, w http.ResponseWriter, r *http.Request) {
// 区分ws请求
ws := true
if !("upgrade" == strings.ToLower(r.Header.Get("Connection"))) {
ws = false
@@ -282,6 +152,7 @@ func (api *ApiServer) onFlv(sourceId string, w http.ResponseWriter, r *http.Requ
apiServer.onHttpFLV(sourceId, w, r)
}
}
func (api *ApiServer) onWSFlv(sourceId string, w http.ResponseWriter, r *http.Request) {
conn, err := api.upgrader.Upgrade(w, r, nil)
if err != nil {
@@ -292,11 +163,11 @@ func (api *ApiServer) onWSFlv(sourceId string, w http.ResponseWriter, r *http.Re
sink := flv.NewFLVSink(api.generateSinkId(r.RemoteAddr), sourceId, flv.NewWSConn(conn))
sink.SetUrlValues(r.URL.Query())
log.Sugar.Infof("ws-flv 连接 sink:%s", sink.PrintInfo())
log.Sugar.Infof("ws-flv 连接 sink:%s", sink.String())
_, state := stream.PreparePlaySink(sink)
if utils.HookStateOK != state {
log.Sugar.Warnf("ws-flv 播放失败 sink:%s", sink.PrintInfo())
log.Sugar.Warnf("ws-flv 播放失败 sink:%s", sink.String())
w.WriteHeader(http.StatusForbidden)
return
}
@@ -305,7 +176,7 @@ func (api *ApiServer) onWSFlv(sourceId string, w http.ResponseWriter, r *http.Re
bytes := make([]byte, 64)
for {
if _, err := netConn.Read(bytes); err != nil {
log.Sugar.Infof("ws-flv 断开连接 sink:%s", sink.PrintInfo())
log.Sugar.Infof("ws-flv 断开连接 sink:%s", sink.String())
sink.Close()
break
}
@@ -332,11 +203,11 @@ func (api *ApiServer) onHttpFLV(sourceId string, w http.ResponseWriter, r *http.
sink := flv.NewFLVSink(api.generateSinkId(r.RemoteAddr), sourceId, conn)
sink.SetUrlValues(r.URL.Query())
log.Sugar.Infof("http-flv 连接 sink:%s", sink.PrintInfo())
log.Sugar.Infof("http-flv 连接 sink:%s", sink.String())
_, state := stream.PreparePlaySink(sink)
if utils.HookStateOK != state {
log.Sugar.Warnf("http-flv 播放失败 sink:%s", sink.PrintInfo())
log.Sugar.Warnf("http-flv 播放失败 sink:%s", sink.String())
w.WriteHeader(http.StatusForbidden)
return
@@ -345,7 +216,7 @@ func (api *ApiServer) onHttpFLV(sourceId string, w http.ResponseWriter, r *http.
bytes := make([]byte, 64)
for {
if _, err := conn.Read(bytes); err != nil {
log.Sugar.Infof("http-flv 断开连接 sink:%s", sink.PrintInfo())
log.Sugar.Infof("http-flv 断开连接 sink:%s", sink.String())
sink.Close()
break
}
@@ -362,7 +233,7 @@ func (api *ApiServer) onTS(source string, w http.ResponseWriter, r *http.Request
sid := r.URL.Query().Get(hls.SessionIdKey)
var sink stream.Sink
if sid != "" {
sink = stream.SinkManager.Find(stream.SinkId(sid))
sink = stream.SinkManager.Find(stream.SinkID(sid))
}
if sink == nil {
log.Sugar.Errorf("hls session with id '%s' has expired.", sid)
@@ -377,7 +248,7 @@ func (api *ApiServer) onTS(source string, w http.ResponseWriter, r *http.Request
}
seq := source[index+1:]
tsPath := stream.AppConfig.Hls.TSPath(sink.SourceId(), seq)
tsPath := stream.AppConfig.Hls.TSPath(sink.GetSourceID(), seq)
if _, err := os.Stat(tsPath); err != nil {
w.WriteHeader(http.StatusNotFound)
return
@@ -430,7 +301,7 @@ func (api *ApiServer) onHLS(sourceId string, w http.ResponseWriter, r *http.Requ
sink.SetUrlValues(r.URL.Query())
_, state := stream.PreparePlaySink(sink)
if utils.HookStateOK != state {
log.Sugar.Warnf("m3u8 请求失败 sink:%s", sink.PrintInfo())
log.Sugar.Warnf("m3u8 请求失败 sink:%s", sink.String())
w.WriteHeader(http.StatusForbidden)
return
@@ -488,11 +359,11 @@ func (api *ApiServer) onRtc(sourceId string, w http.ResponseWriter, r *http.Requ
})
sink.SetUrlValues(r.URL.Query())
log.Sugar.Infof("rtc 请求 sink:%s sdp:%v", sink.PrintInfo(), v.SDP)
log.Sugar.Infof("rtc 请求 sink:%s sdp:%v", sink.String(), v.SDP)
_, state := stream.PreparePlaySink(sink)
if utils.HookStateOK != state {
log.Sugar.Warnf("rtc 播放失败 sink:%s", sink.PrintInfo())
log.Sugar.Warnf("rtc 播放失败 sink:%s", sink.String())
w.WriteHeader(http.StatusForbidden)
group.Done()
@@ -500,3 +371,72 @@ func (api *ApiServer) onRtc(sourceId string, w http.ResponseWriter, r *http.Requ
group.Wait()
}
func (api *ApiServer) OnSourceList(w http.ResponseWriter, r *http.Request) {
sources := stream.SourceManager.All()
type SourceDetails struct {
ID string `json:"id,omitempty"`
Protocol string `json:"protocol"` // 推流协议
Time time.Time `json:"time"` // 推流时间
SinkCount int `json:"sink_count"` // 播放端计数
Bitrate string `json:"bitrate"` // 码率统计
Tracks []string `json:"tracks"` // 每路流编码器ID
}
var details []SourceDetails
for _, source := range sources {
var tracks []string
streams := source.OriginStreams()
for _, avStream := range streams {
tracks = append(tracks, avStream.CodecId().String())
}
details = append(details, SourceDetails{
ID: source.GetID(),
Protocol: source.GetType().ToString(),
Time: source.CreateTime(),
SinkCount: source.SinkCount(),
Bitrate: "", // 后续开发
Tracks: tracks,
})
}
httpResponseOK(w, details)
}
func (api *ApiServer) OnSinkList(v *IDS, w http.ResponseWriter, r *http.Request) {
}
func (api *ApiServer) OnSourceClose(v *IDS, w http.ResponseWriter, r *http.Request) {
log.Sugar.Infof("close source: %v", v)
if source := stream.SourceManager.Find(v.Source); source != nil {
source.Close()
} else {
log.Sugar.Warnf("Source with ID %s does not exist.", v.Source)
}
httpResponseOK(w, nil)
}
func (api *ApiServer) OnSinkClose(v *IDS, w http.ResponseWriter, r *http.Request) {
log.Sugar.Infof("close sink: %v", v)
var sinkId stream.SinkID
i, err := strconv.ParseUint(v.Sink, 10, 64)
if err != nil {
sinkId = stream.SinkID(v.Sink)
} else {
sinkId = stream.SinkID(i)
}
if source := stream.SourceManager.Find(v.Source); source != nil {
source.RemoveSinkWithID(sinkId)
} else {
log.Sugar.Warnf("Source with ID %s does not exist.", v.Source)
}
httpResponseOK(w, nil)
}

187
api_gb.go Normal file
View File

@@ -0,0 +1,187 @@
package main
import (
"fmt"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/lkm/gb28181"
"github.com/lkmio/lkm/log"
"github.com/lkmio/lkm/stream"
"net"
"net/http"
"strings"
)
type GBForwardParams struct {
Source string `json:"source"` //GetSourceID
Addr string `json:"addr"`
SSRC uint32 `json:"ssrc"`
Setup string `json:"setup"`
}
type GBSourceParams struct {
Source string `json:"source"` //GetSourceID
Setup string `json:"setup"` //active/passive
SSRC uint32 `json:"ssrc,omitempty"`
}
type GBConnect struct {
Source string `json:"source"` //GetSourceID
RemoteAddr string `json:"remote_addr"`
}
func (api *ApiServer) OnGBSourceCreate(v *GBSourceParams, w http.ResponseWriter, r *http.Request) {
log.Sugar.Infof("创建国标源: %v", v)
// 返回收流地址
response := &struct {
IP string `json:"ip"`
Port int `json:"port,omitempty"`
}{}
var err error
// 响应错误消息
defer func() {
if err != nil {
log.Sugar.Errorf(err.Error())
httpResponse2(w, err)
}
}()
source := stream.SourceManager.Find(v.Source)
if source != nil {
log.Sugar.Errorf("创建国标源失败, %s已经存在", v.Source)
err = &MalformedRequest{Code: http.StatusBadRequest, Msg: fmt.Sprintf("创建国标源失败, %s已经存在", v.Source)}
return
}
tcp := true
var active bool
if v.Setup == "passive" {
} else if v.Setup == "active" {
active = true
} else {
tcp = false
//udp收流
}
if tcp && active {
if !stream.AppConfig.GB28181.IsMultiPort() {
err = &MalformedRequest{Code: http.StatusBadRequest, Msg: "创建国标源失败, 单端口模式下不能主动拉流"}
} else if !tcp {
err = &MalformedRequest{Code: http.StatusBadRequest, Msg: "创建国标源失败, UDP不能主动拉流"}
} else if !stream.AppConfig.GB28181.IsEnableTCP() {
err = &MalformedRequest{Code: http.StatusBadRequest, Msg: "创建国标源失败, 未开启TCP, UDP不能主动拉流"}
}
if err != nil {
return
}
}
_, port, err := gb28181.NewGBSource(v.Source, v.SSRC, tcp, active)
if err != nil {
err = &MalformedRequest{Code: http.StatusInternalServerError, Msg: fmt.Sprintf("创建国标源失败 err:%s", err.Error())}
return
}
response.IP = stream.AppConfig.PublicIP
response.Port = port
httpResponseOK(w, response)
}
func (api *ApiServer) OnGBSourceConnect(v *GBConnect, w http.ResponseWriter, r *http.Request) {
log.Sugar.Infof("设置国标主动拉流连接地址: %v", v)
var err error
defer func() {
if err != nil {
log.Sugar.Errorf(err.Error())
httpResponse2(w, err)
}
}()
source := stream.SourceManager.Find(v.Source)
if source == nil {
log.Sugar.Errorf("设置主动拉流失败, %s源不存在", v.Source)
err = &MalformedRequest{Code: http.StatusBadRequest, Msg: "gb28181 source 不存在"}
return
}
activeSource, ok := source.(*gb28181.ActiveSource)
if !ok {
log.Sugar.Errorf("设置主动拉流失败, %s源不是Active拉流类型", v.Source)
err = &MalformedRequest{Code: http.StatusBadRequest, Msg: "gbsource 不能转为active source"}
return
}
addr, err := net.ResolveTCPAddr("tcp", v.RemoteAddr)
if err != nil {
log.Sugar.Errorf("设置主动拉流失败, err: %s", err.Error())
err = &MalformedRequest{Code: http.StatusBadRequest, Msg: "解析连接地址失败"}
return
}
err = activeSource.Connect(addr)
if err != nil {
log.Sugar.Errorf("设置主动拉流失败, err: %s", err.Error())
err = &MalformedRequest{Code: http.StatusBadRequest, Msg: fmt.Sprintf("连接Server失败 err:%s", err.Error())}
return
}
httpResponseOK(w, nil)
}
func (api *ApiServer) OnGBSourceForward(v *GBForwardParams, w http.ResponseWriter, r *http.Request) {
log.Sugar.Infof("设置国标级联转发: %v", v)
source := stream.SourceManager.Find(v.Source)
if source == nil {
log.Sugar.Infof("设置国标级联转发失败 %s源不存在", v.Source)
w.WriteHeader(http.StatusNotFound)
} else if source.GetType() != stream.SourceType28181 {
log.Sugar.Infof("设置国标级联转发失败 %s源不是国标推流类型", v.Source)
w.WriteHeader(http.StatusBadRequest)
return
}
var setup gb28181.SetupType
switch strings.ToLower(v.Setup) {
case "active":
setup = gb28181.SetupActive
break
case "passive":
setup = gb28181.SetupPassive
break
default:
setup = gb28181.SetupUDP
break
}
addr, _ := net.ResolveTCPAddr("tcp", r.RemoteAddr)
sinkId := stream.NetAddr2SinkId(addr)
// 添加随机数
if ipv4, ok := sinkId.(uint64); ok {
random := uint64(utils.RandomIntInRange(0x1000, 0xFFFF0000))
sinkId = (ipv4 & 0xFFFFFFFF00000000) | (random << 16) | (ipv4 & 0xFFFF)
}
sink, port, err := gb28181.NewForwardSink(v.SSRC, v.Addr, setup, sinkId, v.Source)
if err != nil {
log.Sugar.Errorf("设置国标级联转发 err: %s", err.Error())
w.WriteHeader(http.StatusInternalServerError)
return
}
source.AddSink(sink)
log.Sugar.Infof("设置国标级联转发成功 ID: %s", sink.GetID())
response := struct {
ID string `json:"id"` //sink id
IP string `json:"ip"`
Port int `json:"port"`
}{ID: stream.SinkId2String(sinkId), IP: stream.AppConfig.PublicIP, Port: port}
httpResponse2(w, &response)
}

View File

@@ -6,6 +6,6 @@ import (
"net"
)
func NewFLVSink(id stream.SinkId, sourceId string, conn net.Conn) stream.Sink {
return &stream.BaseSink{Id_: id, SourceId_: sourceId, Protocol_: stream.ProtocolFlv, Conn: transport.NewConn(conn)}
func NewFLVSink(id stream.SinkID, sourceId string, conn net.Conn) stream.Sink {
return &stream.BaseSink{ID: id, SourceID: sourceId, Protocol: stream.TransStreamFlv, Conn: transport.NewConn(conn)}
}

View File

@@ -216,6 +216,6 @@ func NewHttpTransStream() stream.TransStream {
}
}
func TransStreamFactory(source stream.Source, protocol stream.Protocol, streams []utils.AVStream) (stream.TransStream, error) {
func TransStreamFactory(source stream.Source, protocol stream.TransStreamProtocol, streams []utils.AVStream) (stream.TransStream, error) {
return NewHttpTransStream(), nil
}

View File

@@ -1,5 +1,6 @@
package gb28181
// Filter 关联Source
type Filter interface {
AddSource(ssrc uint32, source GBSource) bool

View File

@@ -4,10 +4,6 @@ type singleFilter struct {
source GBSource
}
func NewSingleFilter(source GBSource) Filter {
return &singleFilter{source: source}
}
func (s *singleFilter) AddSource(ssrc uint32, source GBSource) bool {
panic("implement me")
}
@@ -19,3 +15,7 @@ func (s *singleFilter) RemoveSource(ssrc uint32) {
func (s *singleFilter) FindSource(ssrc uint32) GBSource {
return s.source
}
func NewSingleFilter(source GBSource) Filter {
return &singleFilter{source: source}
}

View File

@@ -9,10 +9,6 @@ type ssrcFilter struct {
mute sync.RWMutex
}
func NewSharedFilter(guestCount int) Filter {
return &ssrcFilter{sources: make(map[uint32]GBSource, guestCount)}
}
func (r *ssrcFilter) AddSource(ssrc uint32, source GBSource) bool {
r.mute.Lock()
defer r.mute.Unlock()
@@ -36,3 +32,7 @@ func (r *ssrcFilter) FindSource(ssrc uint32) GBSource {
defer r.mute.RUnlock()
return r.sources[ssrc]
}
func NewSSRCFilter(guestCount int) Filter {
return &ssrcFilter{sources: make(map[uint32]GBSource, guestCount)}
}

148
gb28181/forward_sink.go Normal file
View File

@@ -0,0 +1,148 @@
package gb28181
import (
"encoding/binary"
"github.com/lkmio/avformat/librtp"
"github.com/lkmio/avformat/transport"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/lkm/log"
"github.com/lkmio/lkm/stream"
"net"
)
const (
TcpStreamForwardBufferBlockSize = 1024
RTPOverTCPPacketSize = 1600
)
type ForwardSink struct {
stream.BaseSink
setup SetupType
socket transport.ITransport
ssrc uint32
buffer *stream.ReceiveBuffer //发送缓冲区
}
func (f *ForwardSink) OnConnected(conn net.Conn) []byte {
log.Sugar.Infof("级联连接 conn: %s", conn.RemoteAddr())
f.Conn = conn
f.Conn.(*transport.Conn).EnableAsyncWriteMode(TcpStreamForwardBufferBlockSize - 2)
return nil
}
func (f *ForwardSink) OnPacket(conn net.Conn, data []byte) []byte {
return nil
}
func (f *ForwardSink) OnDisConnected(conn net.Conn, err error) {
log.Sugar.Infof("级联断开连接 conn: %s", conn.RemoteAddr())
f.Close()
}
func (f *ForwardSink) Input(data []byte) error {
if SetupUDP != f.setup && f.Conn == nil {
return nil
}
if len(data)+2 > RTPOverTCPPacketSize {
log.Sugar.Errorf("国标级联转发流失败 rtp包过长, 长度:%d, 最大允许:%d", len(data), RTPOverTCPPacketSize)
return nil
}
// 修改为与上级协商的SSRC
librtp.ModifySSRC(data, f.ssrc)
if SetupUDP == f.setup {
// UDP转发, 不拷贝直接发送
f.socket.(*transport.UDPClient).Write(data)
} else {
// TCP转发, 拷贝一次再发送
block := f.buffer.GetBlock()
copy(block[2:], data)
binary.BigEndian.PutUint16(block, uint16(len(data)))
if _, err := f.Conn.Write(block[:2+len(data)]); err == nil {
return nil
} else if _, ok := err.(*transport.ZeroWindowSizeError); ok {
log.Sugar.Errorf("发送缓冲区阻塞")
f.Conn.Close()
f.Conn = nil
}
}
return nil
}
// Close 关闭国标转发流
func (f *ForwardSink) Close() {
f.BaseSink.Close()
if f.socket != nil {
f.socket.Close()
}
}
// NewForwardSink 创建国标级联转发流Sink
// 返回监听的端口和Sink
func NewForwardSink(ssrc uint32, serverAddr string, setup SetupType, sinkId stream.SinkID, sourceId string) (stream.Sink, int, error) {
sink := &ForwardSink{BaseSink: stream.BaseSink{ID: sinkId, SourceID: sourceId, Protocol: stream.TransStreamGBStreamForward}, ssrc: ssrc, setup: setup}
if SetupUDP == setup {
remoteAddr, err := net.ResolveUDPAddr("udp", serverAddr)
if err != nil {
return nil, 0, err
}
client, err := TransportManger.NewUDPClient(stream.AppConfig.ListenIP, remoteAddr)
if err != nil {
return nil, 0, err
}
sink.socket = client
} else if SetupActive == setup {
server, err := TransportManger.NewTCPServer(stream.AppConfig.ListenIP)
if err != nil {
return nil, 0, err
}
sink.socket = server
} else if SetupPassive == setup {
client := transport.TCPClient{}
err := TransportManger.AllocPort(true, func(port uint16) error {
localAddr, err := net.ResolveTCPAddr("tcp", stream.ListenAddr(int(port)))
if err != nil {
return err
}
remoteAddr, err := net.ResolveTCPAddr("tcp", serverAddr)
if err != nil {
return err
}
client.SetHandler(sink)
conn, err := client.Connect(localAddr, remoteAddr)
if err != nil {
return err
}
sink.Conn = conn
return nil
})
if err != nil {
return nil, 0, err
}
sink.socket = &client
} else {
utils.Assert(false)
}
if SetupUDP != setup {
sink.buffer = stream.NewReceiveBuffer(RTPOverTCPPacketSize, TcpStreamForwardBufferBlockSize)
}
return sink, sink.socket.ListenPort(), nil
}

23
gb28181/forward_stream.go Normal file
View File

@@ -0,0 +1,23 @@
package gb28181
import (
"github.com/lkmio/avformat/utils"
"github.com/lkmio/lkm/stream"
)
// ForwardStream 国标级联转发流, 下级推什么, 就向上级发什么.
type ForwardStream struct {
stream.BaseTransStream
}
func (f *ForwardStream) WriteHeader() error {
return nil
}
func NewTransStream() (stream.TransStream, error) {
return &ForwardStream{BaseTransStream: stream.BaseTransStream{Protocol_: stream.TransStreamGBStreamForward}}, nil
}
func TransStreamFactory(source stream.Source, protocol stream.TransStreamProtocol, streams []utils.AVStream) (stream.TransStream, error) {
return NewTransStream()
}

181
gb28181/forward_test.go Normal file
View File

@@ -0,0 +1,181 @@
package gb28181
import (
"encoding/json"
"fmt"
"github.com/lkmio/avformat/transport"
"github.com/lkmio/lkm/stream"
"net"
"net/http"
"os"
"strconv"
"testing"
"time"
)
func callForward(source, setup, addr string) string {
v := &struct {
Source string `json:"source"` //GetSourceID
Addr string `json:"addr"`
SSRC uint32 `json:"ssrc"`
Setup string `json:"setup"`
}{
Source: source,
Addr: addr,
SSRC: 0x100,
Setup: setup,
}
body, err := json.Marshal(&v)
if err != nil {
panic(err)
}
response, err := stream.SendHookEvent("http://localhost:8080/api/v1/gb28181/forward", body)
if err != nil {
panic(err)
}
if response == nil || response.StatusCode != http.StatusOK {
println("设置级联转发失败")
return ""
}
resp := &struct {
ID string `json:"id"` //sink id
IP string `json:"ip"`
Port int `json:"port"`
}{}
bytes := make([]byte, 1024)
n, err := response.Body.Read(bytes)
if err != nil && n < 1 {
panic(err)
}
err = json.Unmarshal(bytes[:n], resp)
if err != nil {
panic(err)
}
return resp.ID
}
func closeForwardSink(source, sink string) {
v := &struct {
Sink string `json:"sink"`
Source string `json:"source"`
}{
Source: source,
Sink: sink,
}
body, err := json.Marshal(&v)
if err != nil {
panic(err)
}
_, err = stream.SendHookEvent("http://localhost:8080/api/v1/sink/close", body)
if err != nil {
panic(err)
}
}
func createTransport(setup string) (transport.ITransport, *os.File) {
var socket transport.ITransport
name := fmt.Sprintf("./gb_forward_ps_%s_%d.raw", setup, time.Now().UnixMilli())
file, err := os.OpenFile(name, os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
panic(err)
}
handler := func(conn net.Conn, data []byte) []byte {
if setup != "udp" {
file.Write(data[12:])
} else {
file.Write(data[14:])
}
return nil
}
if "udp" == setup {
client := transport.UDPClient{}
err := client.Connect(nil, nil)
if err != nil {
panic(err)
}
client.SetHandler2(nil, handler, nil)
go client.Receive()
socket = &client
} else if "active" == setup {
tcpClient := transport.TCPClient{}
tcpClient.SetHandler2(nil, handler, nil)
go tcpClient.Receive()
socket = &tcpClient
} else if "passive" == setup {
tcpServer := transport.TCPServer{}
tcpServer.SetHandler2(nil, handler, nil)
tcpServer.Bind(nil)
go tcpServer.Accept()
socket = &tcpServer
}
port := socket.ListenPort()
fmt.Printf("收流端口:%d\r\n", port)
return socket, file
}
func TestForwardSink(t *testing.T) {
source := "34020000001110000001/34020000001310000001"
for {
var ids []string
var transports []transport.ITransport
var files []*os.File
// 三种推流方式都测试
for i := 1; i < 2; i++ {
var setup string
if i == 0 {
setup = "udp"
} else if i == 1 {
setup = "passive"
} else {
setup = "active"
}
// 监听收流端口
client, out := createTransport(setup)
// 调用api设置为转发目标
addr := net.JoinHostPort("127.0.0.1", strconv.Itoa(client.ListenPort()))
id := callForward(source, setup, addr)
ids = append(ids, id)
transports = append(transports, client)
files = append(files, out)
}
time.Sleep(20 * time.Second)
for i := 0; i < len(ids); i++ {
if transports[i] != nil {
transports[i].Close()
}
if ids[i] != "" {
closeForwardSink(source, ids[i])
}
if files[i] != nil {
files[i].Close()
}
}
}
}

View File

@@ -15,7 +15,7 @@ import (
)
// 输入rtp负载的ps流文件路径, 根据ssrc解析, rtp头不要带扩展
func readRtp(path string, ssrc uint32, tcp bool, cb func([]byte)) {
func readRtpRaw(path string, ssrc uint32, tcp bool, cb func([]byte)) {
file, err := os.ReadFile(path)
if err != nil {
panic(err)
@@ -51,7 +51,7 @@ func readRtp(path string, ssrc uint32, tcp bool, cb func([]byte)) {
func connectSource(source string, addr string) {
v := &struct {
Source string `json:"source"` //SourceId
Source string `json:"source"` //GetSourceID
RemoteAddr string `json:"remote_addr"`
}{
Source: source,
@@ -82,7 +82,7 @@ func connectSource(source string, addr string) {
func createSource(source, setup string, ssrc uint32) (string, uint16) {
v := struct {
Source string `json:"source"` //SourceId
Source string `json:"source"` //GetSourceID
Setup string `json:"setup"` //active/passive
SSRC uint32 `json:"ssrc,omitempty"`
}{
@@ -105,8 +105,7 @@ func createSource(source, setup string, ssrc uint32) (string, uint16) {
response, err := client.Do(request)
if err != nil {
panic(err)
}
if response.StatusCode != http.StatusOK {
} else if response.StatusCode != http.StatusOK {
panic("")
}
@@ -132,15 +131,29 @@ func createSource(source, setup string, ssrc uint32) (string, uint16) {
return connectInfo.Data.IP, connectInfo.Data.Port
}
func rtp2overTcp(path string, ssrc uint32) {
file, err := os.OpenFile("./rtp.raw", os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
panic(err)
}
readRtpRaw(path, ssrc, true, func(data []byte) {
file.Write(data)
})
file.Close()
}
// 使用wireshark直接导出udp流
// 根据ssrc来查找每个rtp包, rtp不要带扩展字段
func TestUDPRecv(t *testing.T) {
path := "D:\\GOProjects\\avformat\\gb28181_h265.rtp"
ssrc := 0xBEBC202
path := "D:\\GOProjects\\avformat\\gb28181_h264.rtp"
ssrc := 0xBEBC201
localAddr := "0.0.0.0:20001"
setup := "udp" //udp/passive/active
id := "hls_mystream"
rtp2overTcp(path, uint32(ssrc))
ip, port := createSource(id, setup, uint32(ssrc))
if setup == "udp" {
@@ -153,7 +166,7 @@ func TestUDPRecv(t *testing.T) {
panic(err)
}
readRtp(path, uint32(ssrc), false, func(data []byte) {
readRtpRaw(path, uint32(ssrc), false, func(data []byte) {
client.Write(data)
time.Sleep(1 * time.Millisecond)
})
@@ -162,13 +175,13 @@ func TestUDPRecv(t *testing.T) {
remoteAddr, _ := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", ip, port))
client := transport.TCPClient{}
err := client.Connect(addr, remoteAddr)
_, err := client.Connect(addr, remoteAddr)
if err != nil {
panic(err)
}
readRtp(path, uint32(ssrc), true, func(data []byte) {
readRtpRaw(path, uint32(ssrc), true, func(data []byte) {
client.Write(data)
time.Sleep(1 * time.Millisecond)
})
@@ -177,7 +190,7 @@ func TestUDPRecv(t *testing.T) {
server := transport.TCPServer{}
server.SetHandler2(func(conn net.Conn) []byte {
readRtp(path, uint32(ssrc), true, func(data []byte) {
readRtpRaw(path, uint32(ssrc), true, func(data []byte) {
conn.Write(data)
time.Sleep(1 * time.Millisecond)
})

View File

@@ -12,12 +12,12 @@ import (
"net"
)
type TransportType int
type SetupType int
const (
TransportTypeUDP = TransportType(0)
TransportTypeTCPPassive = TransportType(1)
TransportTypeTCPActive = TransportType(2)
SetupUDP = SetupType(0)
SetupPassive = SetupType(1)
SetupActive = SetupType(2)
PsProbeBufferSize = 1024 * 1024 * 2
JitterBufferSize = 1024 * 1024
@@ -29,15 +29,13 @@ var (
SharedTCPServer *TCPServer
)
// GBSource GB28181推流Source, 接收PS流解析生成AVStream和AVPacket, 后续全权交给父类Source处理.
// udp/passive/active 都继承本接口, filter负责解析rtp包, 根据ssrc匹配对应的Source.
// GBSource GB28181推流Source, 统一解析PS流、级联转发.
type GBSource interface {
stream.Source
InputRtp(pkt *rtp.Packet) error
TransportType() TransportType
SetupType() SetupType
// PreparePublish 收到流时, 做一些初始化工作.
PreparePublish(conn net.Conn, ssrc uint32, source GBSource)
SetConn(conn net.Conn)
@@ -61,34 +59,37 @@ type BaseGBSource struct {
videoTimestamp int64
audioPacketCreatedTime int64
videoPacketCreatedTime int64
isSystemClock bool //推流时间戳不正确, 是否使用系统时间.
isSystemClock bool // 推流时间戳不正确, 是否使用系统时间.
}
func (source *BaseGBSource) InputRtp(pkt *rtp.Packet) error {
panic("implement me")
}
func (source *BaseGBSource) Transport() TransportType {
panic("implement me")
}
func (source *BaseGBSource) Init(inputCB func(data []byte) error, closeCB func(), receiveQueueSize int) {
func (source *BaseGBSource) Init(receiveQueueSize int) {
source.deMuxerCtx = libmpeg.NewPSDeMuxerContext(make([]byte, PsProbeBufferSize))
source.deMuxerCtx.SetHandler(source)
source.SetType(stream.SourceType28181)
source.PublishSource.Init(inputCB, closeCB, receiveQueueSize)
source.PublishSource.Init(receiveQueueSize)
}
// Input 解析PS流, 确保在loop event协程调用此函数
// Input 输入rtp包, 处理PS流, 负责解析->封装->推流. 所有GBSource, 均到此处处理, 在event协程调用此函数
func (source *BaseGBSource) Input(data []byte) error {
return source.deMuxerCtx.Input(data)
// 国标级联转发
for _, transStream := range source.TransStreams {
if transStream.Protocol() != stream.TransStreamGBStreamForward {
continue
}
transStream.(*ForwardStream).SendPacket(data)
}
packet := rtp.Packet{}
_ = packet.Unmarshal(data)
return source.deMuxerCtx.Input(packet.Payload)
}
// OnPartPacket 部分es流回调
func (source *BaseGBSource) OnPartPacket(index int, mediaType utils.AVMediaType, codec utils.AVCodecID, data []byte, first bool) {
buffer := source.FindOrCreatePacketBuffer(index, mediaType)
//第一个es包, 标记内存起始位置
// 第一个es包, 标记内存起始位置
if first {
buffer.Mark()
}
@@ -122,7 +123,7 @@ func (source *BaseGBSource) OnCompletePacket(index int, mediaType utils.AVMediaT
if source.IsCompleted() && source.NotTrackAdded(index) {
if !source.IsTimeoutTrack(index) {
source.SetTimeoutTrack(index)
log.Sugar.Errorf("添加track超时 source:%s", source.Id())
log.Sugar.Errorf("添加track超时 source:%s", source.GetID())
}
return nil
@@ -164,8 +165,9 @@ func (source *BaseGBSource) OnCompletePacket(index int, mediaType utils.AVMediaT
return nil
}
// 纠正国标推流的时间戳
func (source *BaseGBSource) correctTimestamp(packet utils.AVPacket, dts, pts int64) {
//dts和pts保持一致
// dts和pts保持一致
pts = int64(math.Max(float64(dts), float64(pts)))
dts = pts
packet.SetPts(pts)
@@ -181,7 +183,7 @@ func (source *BaseGBSource) correctTimestamp(packet utils.AVPacket, dts, pts int
lastCreatedTime = source.videoPacketCreatedTime
}
//计算duration
// 计算duration
var duration int64
if !source.isSystemClock && lastTimestamp != -1 {
if pts < lastTimestamp {
@@ -206,7 +208,7 @@ func (source *BaseGBSource) correctTimestamp(packet utils.AVPacket, dts, pts int
}
}
//纠正时间戳
// 纠正时间戳
if source.isSystemClock && lastTimestamp != -1 {
duration = (packet.CreatedTime() - lastCreatedTime) * 90
packet.SetDts(lastTimestamp + duration)
@@ -224,7 +226,7 @@ func (source *BaseGBSource) correctTimestamp(packet utils.AVPacket, dts, pts int
}
func (source *BaseGBSource) Close() {
log.Sugar.Infof("GB28181推流结束 ssrc:%d %s", source.ssrc, source.PublishSource.PrintInfo())
log.Sugar.Infof("GB28181推流结束 ssrc:%d %s", source.ssrc, source.PublishSource.String())
//释放收流端口
if source.transport != nil {
@@ -269,19 +271,19 @@ func (source *BaseGBSource) PreparePublish(conn net.Conn, ssrc uint32, source_ G
if stream.AppConfig.Hooks.IsEnablePublishEvent() {
go func() {
_, state := stream.HookPublishEvent(source_)
if utils.HookStateOK != state {
log.Sugar.Errorf("GB28181 推流失败 source:%s", source.Id())
if _, state := stream.HookPublishEvent(source_); utils.HookStateOK == state {
return
}
if conn != nil {
conn.Close()
}
log.Sugar.Errorf("GB28181 推流失败 source:%s", source.GetID())
if conn != nil {
conn.Close()
}
}()
}
}
// NewGBSource 创建gb源,返回监听的收流端口
// NewGBSource 创建国标推流源, 返回监听的收流端口
func NewGBSource(id string, ssrc uint32, tcp bool, active bool) (GBSource, int, error) {
if tcp {
utils.Assert(stream.AppConfig.GB28181.IsEnableTCP())
@@ -309,7 +311,7 @@ func NewGBSource(id string, ssrc uint32, tcp bool, active bool) (GBSource, int,
return nil, 0, err
}
//单端口模式绑定ssrc
// 单端口模式绑定ssrc
if !stream.AppConfig.GB28181.IsMultiPort() {
var success bool
if tcp {
@@ -324,6 +326,7 @@ func NewGBSource(id string, ssrc uint32, tcp bool, active bool) (GBSource, int,
port = stream.AppConfig.GB28181.Port[0]
} else if !active {
// 多端口模式, 创建收流Server
if tcp {
tcpServer, err := NewTCPServer(NewSingleFilter(source))
if err != nil {
@@ -350,13 +353,13 @@ func NewGBSource(id string, ssrc uint32, tcp bool, active bool) (GBSource, int,
bufferBlockCount = stream.ReceiveBufferUdpBlockCount
}
source.SetId(id)
source.SetID(id)
source.SetSSRC(ssrc)
source.Init(source.Input, source.Close, bufferBlockCount)
source.Init(bufferBlockCount)
if _, state := stream.PreparePublishSource(source, false); utils.HookStateOK != state {
return nil, 0, fmt.Errorf("error code %d", state)
}
go source.LoopEvent()
go stream.LoopEvent(source)
return source, port, err
}

View File

@@ -32,6 +32,6 @@ func (a ActiveSource) Connect(remoteAddr *net.TCPAddr) error {
return nil
}
func (a ActiveSource) TransportType() TransportType {
return TransportTypeTCPActive
func (a ActiveSource) SetupType() SetupType {
return SetupActive
}

View File

@@ -12,12 +12,10 @@ func NewPassiveSource() *PassiveSource {
return &PassiveSource{}
}
func (t PassiveSource) TransportType() TransportType {
return TransportTypeTCPPassive
func (t PassiveSource) SetupType() SetupType {
return SetupPassive
}
// InputRtp tcp收流,直接解析ps流.
func (t PassiveSource) InputRtp(pkt *rtp.Packet) error {
t.Input(pkt.Payload)
return nil
func (t PassiveSource) InputRtpPacket(pkt *rtp.Packet) error {
return t.Input(pkt.Payload)
}

View File

@@ -5,7 +5,7 @@ import (
"github.com/pion/rtp"
)
// UDPSource GB28181 UDP推流源
// UDPSource 国标UDP推流源
type UDPSource struct {
BaseGBSource
@@ -22,20 +22,22 @@ func NewUDPSource() *UDPSource {
return u
}
func (u *UDPSource) TransportType() TransportType {
return TransportTypeUDP
func (u *UDPSource) SetupType() SetupType {
return SetupUDP
}
// OnOrderedRtp 有序RTP包回调
func (u *UDPSource) OnOrderedRtp(packet interface{}) {
u.PublishSource.Input(packet.(*rtp.Packet).Payload)
// 此时还在网络收流携程, 交给Source的主协程处理
u.PublishSource.Input(packet.(*rtp.Packet).Raw)
}
// InputRtp udp收流会先拷贝rtp包,交给jitter buffer处理后再发给source
func (u *UDPSource) InputRtp(pkt *rtp.Packet) error {
// InputRtpPacket 将RTP包排序后交给Source的主协程处理
func (u *UDPSource) InputRtpPacket(pkt *rtp.Packet) error {
block := u.receiveBuffer.GetBlock()
copy(block, pkt.Raw)
copy(block, pkt.Payload)
pkt.Payload = block[:len(pkt.Payload)]
pkt.Raw = block[:len(pkt.Raw)]
u.jitterBuffer.Push(pkt.SequenceNumber, pkt)
return nil
}

View File

@@ -23,6 +23,6 @@ func NewTCPClient(listenPort int, remoteAddr *net.TCPAddr, source GBSource) (*TC
return client, err
}
err = tcp.Connect(addr, remoteAddr)
_, err = tcp.Connect(addr, remoteAddr)
return client, err
}

View File

@@ -10,7 +10,6 @@ import (
// TCPServer GB28181TCP被动收流
type TCPServer struct {
stream.StreamServer[*TCPSession]
tcp *transport.TCPServer
filter Filter
}
@@ -35,7 +34,7 @@ func (T *TCPServer) OnCloseSession(session *TCPSession) {
func (T *TCPServer) OnConnected(conn net.Conn) []byte {
T.StreamServer.OnConnected(conn)
//TCP使用ReceiveBuffer区别在于,多端口模式从第一包就使用ReceiveBuffer, 单端口模式先解析出ssrc, 找到source. 后续再使用ReceiveBuffer.
//TCP单端口收流, Session已经绑定Source, 使用ReceiveBuffer读取网络包
if conn.(*transport.Conn).Data.(*TCPSession).source != nil {
return conn.(*transport.Conn).Data.(*TCPSession).receiveBuffer.GetBlock()
}
@@ -47,17 +46,20 @@ func (T *TCPServer) OnPacket(conn net.Conn, data []byte) []byte {
T.StreamServer.OnPacket(conn, data)
session := conn.(*transport.Conn).Data.(*TCPSession)
//单端口收流
// 在Session未绑定到Source时(单端口收流), 先解析出SSRC找到Source.
if session.source == nil {
//直接传给解码器, 先根据ssrc找到source. 后续还是会直接传给source
session.Input(data)
} else {
// 将流交给Source的主协程处理主协程最终会调用TCPSession.Input函数处理
session.source.(*PassiveSource).PublishSource.Input(data)
}
// 绑定Source后, 使用ReceiveBuffer读取网络包, 减少拷贝
if session.source != nil {
return session.receiveBuffer.GetBlock()
}
return nil
}

View File

@@ -9,6 +9,7 @@ import (
"net"
)
// TCPSession 国标TCP主被动推流Session, 统一处理TCP粘包.
type TCPSession struct {
conn net.Conn
source GBSource
@@ -16,9 +17,10 @@ type TCPSession struct {
receiveBuffer *stream.ReceiveBuffer
}
// Input 处理source收到的流
// Input 解析携带包长的粘包数据
func (t *TCPSession) Input(data []byte) error {
if err := t.decoder.Input(data); err != nil {
log.Sugar.Errorf("解析粘包数据失败 err:%s", err)
t.conn.Close()
}
@@ -28,8 +30,7 @@ func (t *TCPSession) Input(data []byte) error {
func (t *TCPSession) Init(source GBSource) {
t.source = source
t.source.SetConn(t.conn)
//重新设置收流回调
t.source.SetInputCb(t.Input)
// 创建收流缓冲区
t.receiveBuffer = stream.NewTCPReceiveBuffer()
}
@@ -51,27 +52,26 @@ func NewTCPSession(conn net.Conn, filter Filter) *TCPSession {
conn: conn,
}
// 多端口收流, Source已知, 直接初始化Session
if stream.AppConfig.GB28181.IsMultiPort() {
session.Init(filter.(*singleFilter).source)
}
// 创建粘包解码器, 并设置解粘包处理回调
session.decoder = transport.NewLengthFieldFrameDecoder(0xFFFF, 2, func(bytes []byte) {
packet := rtp.Packet{}
err := packet.Unmarshal(bytes)
if err != nil {
if err := packet.Unmarshal(bytes); err != nil {
log.Sugar.Errorf("解析rtp失败 err:%s conn:%s data:%s", err.Error(), conn.RemoteAddr().String(), hex.EncodeToString(bytes))
conn.Close()
return
}
//单端口模式,ssrc匹配source
// 单端口模式,ssrc匹配source
if session.source == nil {
//匹配不到直接关闭连接
source := filter.FindSource(packet.SSRC)
if source == nil {
// 匹配不到Source, 直接关闭连接
log.Sugar.Errorf("gb28181推流失败 ssrc:%x配置不到source conn:%s data:%s", packet.SSRC, session.conn.RemoteAddr().String(), hex.EncodeToString(bytes))
conn.Close()
return
}
@@ -83,7 +83,8 @@ func NewTCPSession(conn net.Conn, filter Filter) *TCPSession {
session.source.PreparePublish(conn, packet.SSRC, session.source)
}
session.source.InputRtp(&packet)
// 已经在主协程, 直接由BaseGBSource.Input处理
session.source.Input(bytes)
})
return session

View File

@@ -51,7 +51,8 @@ func (U *UDPServer) OnPacket(conn net.Conn, data []byte) []byte {
source.PreparePublish(conn, packet.SSRC, source)
}
source.InputRtp(&packet)
packet.Raw = data
source.(*UDPSource).InputRtpPacket(&packet)
return nil
}

View File

@@ -14,7 +14,7 @@ const (
type M3U8Sink struct {
stream.BaseSink
cb func(m3u8 []byte) //生成m3u8文件的发送回调
cb func(m3u8 []byte) // 生成m3u8文件的发送回调
sessionId string
playtime time.Time
playTimer *time.Timer
@@ -36,7 +36,7 @@ func (s *M3U8Sink) Start() {
s.playTimer = time.AfterFunc(timeout, func() {
sub := time.Now().Sub(s.playtime)
if sub > timeout {
log.Sugar.Errorf("长时间没有拉取TS切片 sink:%d 超时", s.Id_)
log.Sugar.Errorf("长时间没有拉取TS切片 sink:%d 超时", s.ID)
s.Close()
return
}
@@ -61,13 +61,13 @@ func (s *M3U8Sink) Close() {
s.playTimer = nil
}
stream.SinkManager.Remove(s.Id_)
stream.SinkManager.Remove(s.ID)
s.BaseSink.Close()
}
func NewM3U8Sink(id stream.SinkId, sourceId string, cb func(m3u8 []byte), sessionId string) stream.Sink {
func NewM3U8Sink(id stream.SinkID, sourceId string, cb func(m3u8 []byte), sessionId string) stream.Sink {
return &M3U8Sink{
BaseSink: stream.BaseSink{Id_: id, SourceId_: sourceId, Protocol_: stream.ProtocolHls},
BaseSink: stream.BaseSink{ID: id, SourceID: sourceId, Protocol: stream.TransStreamHls},
cb: cb,
sessionId: sessionId,
}

View File

@@ -12,13 +12,13 @@ import (
)
type tsContext struct {
segmentSeq int //切片序号
writeBuffer []byte //ts流的缓冲区, 由TSMuxer使用. 减少用户态和内核态交互以及磁盘IO频率
writeBufferSize int //已缓存TS流大小
segmentSeq int // 切片序号
writeBuffer []byte // ts流的缓冲区, 由TSMuxer使用. 减少用户态和内核态交互以及磁盘IO频率
writeBufferSize int // 已缓存TS流大小
url string //@See transStream.tsUrl
path string //ts切片位于磁盘中的绝对路径
file *os.File //ts切片文件句柄
url string // @See transStream.tsUrl
path string // ts切片位于磁盘中的绝对路径
file *os.File // ts切片文件句柄
}
type transStream struct {
@@ -27,16 +27,16 @@ type transStream struct {
context *tsContext
m3u8 M3U8Writer
m3u8Name string //m3u8文件名
m3u8File *os.File //m3u8文件句柄
dir string //m3u8文件父目录
tsUrl string //m3u8中每个url的前缀, 默认为空, 为了支持绝对路径访问:http://xxx/xxx/xxx.ts
tsFormat string //ts文件名格式
duration int //切片时长, 单位秒
playlistLength int //最大切片文件个数
m3u8Name string // m3u8文件名
m3u8File *os.File // m3u8文件句柄
dir string // m3u8文件父目录
tsUrl string // m3u8中每个url的前缀, 默认为空, 为了支持绝对路径访问:http://xxx/xxx/xxx.ts
tsFormat string // ts文件名格式
duration int // 切片时长, 单位秒
playlistLength int // 最大切片文件个数
m3u8Sinks map[stream.SinkId]*M3U8Sink //等待响应m3u8文件的sink
m3u8StringFormat string //一个协程写, 多个协程读, 不用加锁保护
m3u8Sinks map[stream.SinkID]*M3U8Sink // 等待响应m3u8文件的sink
m3u8StringFormat string // 一个协程写, 多个协程读, 不用加锁保护
}
func (t *transStream) Input(packet utils.AVPacket) error {
@@ -44,10 +44,10 @@ func (t *transStream) Input(packet utils.AVPacket) error {
return fmt.Errorf("track not available")
}
//创建一下个切片
//已缓存时长>=指定时长, 如果存在视频, 还需要等遇到关键帧才切片
// 创建一下个切片
// 已缓存时长>=指定时长, 如果存在视频, 还需要等遇到关键帧才切片
if (!t.ExistVideo || utils.AVMediaTypeVideo == packet.MediaType() && packet.KeyFrame()) && float32(t.muxer.Duration())/90000 >= float32(t.duration) {
//保存当前切片文件
// 保存当前切片文件
if t.context.file != nil {
err := t.flushSegment(false)
if err != nil {
@@ -55,7 +55,7 @@ func (t *transStream) Input(packet utils.AVPacket) error {
}
}
//创建新的切片
// 创建新的切片
if err := t.createSegment(); err != nil {
return err
}
@@ -96,7 +96,7 @@ func (t *transStream) AddSink(sink stream.Sink) error {
return sink.(*M3U8Sink).SendM3U8Data(&t.m3u8StringFormat)
}
t.m3u8Sinks[sink.Id()] = sink.(*M3U8Sink)
t.m3u8Sinks[sink.GetID()] = sink.(*M3U8Sink)
return nil
}
@@ -119,7 +119,7 @@ func (t *transStream) flushSegment(end bool) error {
t.context.segmentSeq++
}()
//将剩余数据写入缓冲区
// 将剩余数据写入缓冲区
if t.context.writeBufferSize > 0 {
_, _ = t.context.file.Write(t.context.writeBuffer[:t.context.writeBufferSize])
t.context.writeBufferSize = 0
@@ -129,12 +129,12 @@ func (t *transStream) flushSegment(end bool) error {
return err
}
//删除多余的ts切片文件
// 删除多余的ts切片文件
if t.m3u8.Size() >= t.playlistLength {
_ = os.Remove(t.m3u8.Head().path)
}
//更新m3u8
// 更新m3u8
duration := float32(t.muxer.Duration()) / 90000
t.m3u8.AddSegment(duration, t.context.url, t.context.segmentSeq, t.context.path)
@@ -152,14 +152,14 @@ func (t *transStream) flushSegment(end bool) error {
return err
}
//通知等待m3u8的sink
//缓存完第二个切片, 才响应发送m3u8文件. 如果一个切片就发, 播放器缓存少会卡顿.
// 通知等待m3u8的sink
// 缓存完第二个切片, 才响应发送m3u8文件. 如果一个切片就发, 播放器缓存少会卡顿.
if len(t.m3u8Sinks) > 0 && t.m3u8.Size() > 1 {
for _, sink := range t.m3u8Sinks {
sink.SendM3U8Data(&t.m3u8StringFormat)
}
t.m3u8Sinks = make(map[stream.SinkId]*M3U8Sink, 0)
t.m3u8Sinks = make(map[stream.SinkID]*M3U8Sink, 0)
}
return nil
}
@@ -171,9 +171,9 @@ func (t *transStream) createSegment() error {
var tsFile *os.File
for {
tsName := fmt.Sprintf(t.tsFormat, t.context.segmentSeq)
//ts文件
// ts文件
t.context.path = fmt.Sprintf("%s/%s", t.dir, tsName)
//m3u8列表中切片的url
// m3u8列表中切片的url
t.context.url = fmt.Sprintf("%s%s", t.tsUrl, tsName)
file, err := os.OpenFile(t.context.path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
@@ -187,7 +187,7 @@ func (t *transStream) createSegment() error {
return err
}
//继续创建, 认为是文件名冲突, 并且文件已经被打开.
// 继续创建, 认为是文件名冲突, 并且文件已经被打开.
t.context.segmentSeq++
}
@@ -242,14 +242,14 @@ func DeleteOldSegments(id string) {
// @Params segmentDuration 单个切片时长
// @Params playlistLength 缓存多少个切片
func NewTransStream(dir, m3u8Name, tsFormat, tsUrl string, segmentDuration, playlistLength int) (stream.TransStream, error) {
//创建文件夹
// 创建文件夹
m3u8Path := fmt.Sprintf("%s/%s", dir, m3u8Name)
if err := os.MkdirAll(filepath.Dir(m3u8Path), 0666); err != nil {
log.Sugar.Errorf("创建目录失败 err:%s path:%s", err.Error(), m3u8Path)
return nil, err
}
//创建m3u8文件
// 创建m3u8文件
file, err := os.OpenFile(m3u8Path, os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
log.Sugar.Errorf("创建m3u8文件失败 err:%s path:%s", err.Error(), m3u8Path)
@@ -265,12 +265,12 @@ func NewTransStream(dir, m3u8Name, tsFormat, tsUrl string, segmentDuration, play
playlistLength: playlistLength,
}
//创建TS封装器
// 创建TS封装器
muxer := libmpeg.NewTSMuxer()
muxer.SetWriteHandler(stream_.onTSWrite)
muxer.SetAllocHandler(stream_.onTSAlloc)
//ts封装上下文对象
// ts封装上下文对象
stream_.context = &tsContext{
segmentSeq: 0,
writeBuffer: make([]byte, 1024*1024),
@@ -281,15 +281,15 @@ func NewTransStream(dir, m3u8Name, tsFormat, tsUrl string, segmentDuration, play
stream_.m3u8 = NewM3U8Writer(playlistLength)
stream_.m3u8File = file
stream_.m3u8Sinks = make(map[stream.SinkId]*M3U8Sink, 24)
stream_.m3u8Sinks = make(map[stream.SinkID]*M3U8Sink, 24)
return stream_, nil
}
func TransStreamFactory(source stream.Source, protocol stream.Protocol, streams []utils.AVStream) (stream.TransStream, error) {
id := source.Id()
//先删除旧的m3u8文件
func TransStreamFactory(source stream.Source, protocol stream.TransStreamProtocol, streams []utils.AVStream) (stream.TransStream, error) {
id := source.GetID()
// 先删除旧的m3u8文件
_ = os.Remove(stream.AppConfig.Hls.M3U8Path(id))
//删除旧的切片文件
// 删除旧的切片文件
go DeleteOldSegments(id)
return NewTransStream(stream.AppConfig.Hls.M3U8Dir(id), stream.AppConfig.Hls.M3U8Format(id), stream.AppConfig.Hls.TSFormat(id), "", stream.AppConfig.Hls.Duration, stream.AppConfig.Hls.PlaylistLength)
}

View File

@@ -12,7 +12,7 @@ func httpResponse(w http.ResponseWriter, code int, msg string) {
})
}
func httpResponseOk(w http.ResponseWriter, data interface{}) {
func httpResponseOK(w http.ResponseWriter, data interface{}) {
httpResponse2(w, MalformedRequest{
Code: http.StatusOK,
Msg: "ok",

View File

@@ -71,7 +71,7 @@ func (s *Session) OnJtPTPPacket(data []byte) {
//首包处理, hook通知
if s.rtpPacket == nil {
s.Id_ = packet.simNumber
s.SetID(packet.simNumber)
s.rtpPacket = &RtpPacket{}
*s.rtpPacket = packet
@@ -87,8 +87,8 @@ func (s *Session) OnJtPTPPacket(data []byte) {
}()
}
//完整包/最后一个分包, 创建AVPacket
//参考时间戳, 遇到不同的时间戳, 处理前一包. 分包标记可能不靠谱
// 完整包/最后一个分包, 创建AVPacket
// 参考时间戳, 遇到不同的时间戳, 处理前一包. 分包标记可能不靠谱
if s.rtpPacket.ts != packet.ts || s.rtpPacket.pt != packet.pt {
if s.rtpPacket.packetType == AudioFrameMark && s.audioBuffer != nil {
if err := s.processAudioPacket(s.rtpPacket.pt, s.rtpPacket.packetType, s.rtpPacket.ts, s.audioBuffer.Fetch(), s.audioIndex); err != nil {
@@ -153,7 +153,7 @@ func (s *Session) Input(data []byte) error {
}
func (s *Session) Close() {
log.Sugar.Infof("1078推流结束 phone number:%s %s", s.phone, s.PublishSource.PrintInfo())
log.Sugar.Infof("1078推流结束 phone number:%s %s", s.phone, s.PublishSource.String())
if s.audioBuffer != nil {
s.audioBuffer.Clear()
@@ -301,15 +301,15 @@ func read1078RTPPacket(data []byte) (RtpPacket, error) {
func NewSession(conn net.Conn) *Session {
session := Session{
PublishSource: stream.PublishSource{
Conn: conn,
Type_: stream.SourceType1078,
Conn: conn,
Type: stream.SourceType1078,
},
}
delimiter := [4]byte{0x30, 0x31, 0x63, 0x64}
session.decoder = transport.NewDelimiterFrameDecoder(1024*1024*2, delimiter[:], session.OnJtPTPPacket)
session.receiveBuffer = stream.NewTCPReceiveBuffer()
session.Init(session.Input, session.Close, stream.ReceiveBufferTCPBlockCount)
go session.LoopEvent()
session.Init(stream.ReceiveBufferTCPBlockCount)
go stream.LoopEvent(&session)
return &session
}

20
main.go
View File

@@ -22,11 +22,12 @@ import (
)
func init() {
stream.RegisterTransStreamFactory(stream.ProtocolRtmp, rtmp.TransStreamFactory)
stream.RegisterTransStreamFactory(stream.ProtocolHls, hls.TransStreamFactory)
stream.RegisterTransStreamFactory(stream.ProtocolFlv, flv.TransStreamFactory)
stream.RegisterTransStreamFactory(stream.ProtocolRtsp, rtsp.TransStreamFactory)
stream.RegisterTransStreamFactory(stream.ProtocolRtc, rtc.TransStreamFactory)
stream.RegisterTransStreamFactory(stream.TransStreamRtmp, rtmp.TransStreamFactory)
stream.RegisterTransStreamFactory(stream.TransStreamHls, hls.TransStreamFactory)
stream.RegisterTransStreamFactory(stream.TransStreamFlv, flv.TransStreamFactory)
stream.RegisterTransStreamFactory(stream.TransStreamRtsp, rtsp.TransStreamFactory)
stream.RegisterTransStreamFactory(stream.TransStreamRtc, rtc.TransStreamFactory)
stream.RegisterTransStreamFactory(stream.TransStreamGBStreamForward, gb28181.TransStreamFactory)
stream.SetRecordStreamFactory(record.NewFLVFileSink)
config, err := stream.LoadConfigFile("./config.json")
@@ -36,10 +37,11 @@ func init() {
stream.SetDefaultConfig(config)
stream.AppConfig = *config
stream.InitHookUrl()
stream.InitHookUrls()
// 设置公网IP和端口
rtc.InitConfig()
//初始化日志
// 初始化日志
log.InitLogger(zapcore.Level(stream.AppConfig.Log.Level), stream.AppConfig.Log.Name, stream.AppConfig.Log.MaxSize, stream.AppConfig.Log.MaxBackup, stream.AppConfig.Log.MaxAge, stream.AppConfig.Log.Compress)
if stream.AppConfig.GB28181.IsMultiPort() {
@@ -93,7 +95,7 @@ func main() {
//多端口模式下, 创建GBSource时才创建收流端口
if !stream.AppConfig.GB28181.IsMultiPort() {
if stream.AppConfig.GB28181.IsEnableUDP() {
server, err := gb28181.NewUDPServer(gb28181.NewSharedFilter(128))
server, err := gb28181.NewUDPServer(gb28181.NewSSRCFilter(128))
if err != nil {
panic(err)
}
@@ -103,7 +105,7 @@ func main() {
}
if stream.AppConfig.GB28181.IsEnableTCP() {
server, err := gb28181.NewTCPServer(gb28181.NewSharedFilter(128))
server, err := gb28181.NewTCPServer(gb28181.NewSSRCFilter(128))
if err != nil {
panic(err)
}

View File

@@ -63,7 +63,7 @@ func NewFLVFileSink(sourceId string) (stream.Sink, string, error) {
}
return &FLVFileSink{
BaseSink: stream.BaseSink{Id_: "record-sink-flv", SourceId_: sourceId, Protocol_: stream.ProtocolFlv},
BaseSink: stream.BaseSink{ID: "record-sink-flv", SourceID: sourceId, Protocol: stream.TransStreamFlv},
file: file,
}, path, nil
}

View File

@@ -20,8 +20,8 @@ type sink struct {
cb func(sdp string)
}
func NewSink(id stream.SinkId, sourceId string, offer string, cb func(sdp string)) stream.Sink {
return &sink{stream.BaseSink{Id_: id, SourceId_: sourceId, Protocol_: stream.ProtocolRtc}, offer, "", nil, nil, webrtc.ICEConnectionStateNew, cb}
func NewSink(id stream.SinkID, sourceId string, offer string, cb func(sdp string)) stream.Sink {
return &sink{stream.BaseSink{ID: id, SourceID: sourceId, Protocol: stream.TransStreamRtc}, offer, "", nil, nil, webrtc.ICEConnectionStateNew, cb}
}
func (s *sink) setTrackCount(count int) {

View File

@@ -109,10 +109,10 @@ func (t *transStream) AddSink(sink_ stream.Sink) error {
<-complete
connection.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
rtcSink.state = state
log.Sugar.Infof("ice state:%v sink:%d source:%s", state.String(), rtcSink.Id_, rtcSink.SourceId_)
log.Sugar.Infof("ice state:%v sink:%d source:%s", state.String(), rtcSink.GetID(), rtcSink.SourceID)
if state > webrtc.ICEConnectionStateDisconnected {
log.Sugar.Errorf("webrtc peer断开连接 sink:%v source:%s", rtcSink.Id_, rtcSink.SourceId_)
log.Sugar.Errorf("webrtc peer断开连接 sink:%v source:%s", rtcSink.GetID(), rtcSink.SourceID)
rtcSink.Close()
}
})
@@ -163,6 +163,6 @@ func InitConfig() {
webrtcApi = webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i), webrtc.WithSettingEngine(setting))
}
func TransStreamFactory(source stream.Source, protocol stream.Protocol, streams []utils.AVStream) (stream.TransStream, error) {
func TransStreamFactory(source stream.Source, protocol stream.TransStreamProtocol, streams []utils.AVStream) (stream.TransStream, error) {
return NewTransStream(), nil
}

View File

@@ -18,7 +18,7 @@ type Publisher struct {
func NewPublisher(sourceId string, stack *librtmp.Stack, conn net.Conn) *Publisher {
deMuxer := libflv.NewDeMuxer()
publisher_ := &Publisher{PublishSource: stream.PublishSource{Id_: sourceId, Type_: stream.SourceTypeRtmp, TransDeMuxer: deMuxer, Conn: conn}, stack: stack}
publisher_ := &Publisher{PublishSource: stream.PublishSource{ID: sourceId, Type: stream.SourceTypeRtmp, TransDeMuxer: deMuxer, Conn: conn}, stack: stack}
//设置回调从flv解析出来的Stream和AVPacket都将统一回调到stream.PublishSource
deMuxer.SetHandler(publisher_)
//为推流方分配足够多的缓冲区

View File

@@ -8,8 +8,8 @@ import (
"testing"
)
func CreateTransStream(source stream.Source, protocol stream.Protocol, streams []utils.AVStream) stream.TransStream {
if stream.ProtocolRtmp == protocol {
func CreateTransStream(source stream.Source, protocol stream.TransStreamProtocol, streams []utils.AVStream) stream.TransStream {
if stream.TransStreamRtmp == protocol {
return NewTransStream(librtmp.ChunkSize)
}

View File

@@ -39,7 +39,7 @@ func (s *Session) OnPublish(app, stream_ string) utils.HookState {
s.stack.SetOnPublishHandler(source)
//初始化放在add source前面, 以防add后再init, 空窗期拉流队列空指针.
source.Init(source.Input, source.Close, stream.ReceiveBufferTCPBlockCount)
source.Init(stream.ReceiveBufferTCPBlockCount)
source.SetUrlValues(values)
//统一处理source推流事件, source是否已经存在, hook回调....
@@ -51,7 +51,7 @@ func (s *Session) OnPublish(app, stream_ string) utils.HookState {
s.isPublisher = true
s.receiveBuffer = stream.NewTCPReceiveBuffer()
go source.LoopEvent()
go stream.LoopEvent(source)
}
return state
@@ -61,14 +61,14 @@ func (s *Session) OnPlay(app, stream_ string) utils.HookState {
streamName, values := stream.ParseUrl(stream_)
sourceId := s.generateSourceId(app, streamName)
sink := NewSink(stream.GenerateSinkId(s.conn.RemoteAddr()), sourceId, s.conn, s.stack)
sink := NewSink(stream.NetAddr2SinkId(s.conn.RemoteAddr()), sourceId, s.conn, s.stack)
sink.SetUrlValues(values)
log.Sugar.Infof("rtmp onplay app:%s stream:%s sink:%v conn:%s", app, stream_, sink.Id(), s.conn.RemoteAddr().String())
log.Sugar.Infof("rtmp onplay app:%s stream:%s sink:%v conn:%s", app, stream_, sink.GetID(), s.conn.RemoteAddr().String())
_, state := stream.PreparePlaySink(sink)
if utils.HookStateOK != state {
log.Sugar.Errorf("rtmp拉流失败 source:%s sink:%s", sourceId, sink.Id())
log.Sugar.Errorf("rtmp拉流失败 source:%s sink:%s", sourceId, sink.GetID())
} else {
s.handle = sink
}
@@ -104,7 +104,7 @@ func (s *Session) Close() {
publisher, ok := s.handle.(*Publisher)
if ok {
log.Sugar.Infof("rtmp推流结束 %s", publisher.PrintInfo())
log.Sugar.Infof("rtmp推流结束 %s", publisher.String())
if s.isPublisher {
s.handle.(*Publisher).Close()
@@ -112,7 +112,7 @@ func (s *Session) Close() {
}
} else {
sink := s.handle.(*Sink)
log.Sugar.Infof("rtmp拉流结束 %s", sink.PrintInfo())
log.Sugar.Infof("rtmp拉流结束 %s", sink.String())
sink.Close()
}
}

View File

@@ -25,9 +25,9 @@ func (s *Sink) Close() {
s.BaseSink.Close()
}
func NewSink(id stream.SinkId, sourceId string, conn net.Conn, stack *librtmp.Stack) stream.Sink {
func NewSink(id stream.SinkID, sourceId string, conn net.Conn, stack *librtmp.Stack) stream.Sink {
return &Sink{
BaseSink: stream.BaseSink{Id_: id, SourceId_: sourceId, State_: stream.SessionStateCreate, Protocol_: stream.ProtocolRtmp, Conn: conn, DesiredAudioCodecId_: utils.AVCodecIdNONE, DesiredVideoCodecId_: utils.AVCodecIdNONE},
BaseSink: stream.BaseSink{ID: id, SourceID: sourceId, State: stream.SessionStateCreate, Protocol: stream.TransStreamRtmp, Conn: conn, DesiredAudioCodecId_: utils.AVCodecIdNONE, DesiredVideoCodecId_: utils.AVCodecIdNONE},
stack: stack,
}
}

View File

@@ -191,6 +191,6 @@ func NewTransStream(chunkSize int) stream.TransStream {
return &transStream{chunkSize: chunkSize}
}
func TransStreamFactory(source stream.Source, protocol stream.Protocol, streams []utils.AVStream) (stream.TransStream, error) {
func TransStreamFactory(source stream.Source, protocol stream.TransStreamProtocol, streams []utils.AVStream) (stream.TransStream, error) {
return NewTransStream(librtmp.ChunkSize), nil
}

View File

@@ -124,7 +124,7 @@ func (h handler) OnDescribe(request Request) (*http.Response, []byte, error) {
}
}
sinkId := stream.GenerateSinkId(request.session.conn.RemoteAddr())
sinkId := stream.NetAddr2SinkId(request.session.conn.RemoteAddr())
sink_ := NewSink(sinkId, request.sourceId, request.session.conn, func(sdp string) {
response = NewOKResponse(request.headers.Get("Cseq"))
response.Header.Set("Content-Type", "application/sdp")

View File

@@ -28,9 +28,9 @@ type sink struct {
playing bool //是否已经收到play请求
}
func NewSink(id stream.SinkId, sourceId string, conn net.Conn, cb func(sdp string)) stream.Sink {
func NewSink(id stream.SinkID, sourceId string, conn net.Conn, cb func(sdp string)) stream.Sink {
return &sink{
stream.BaseSink{Id_: id, SourceId_: sourceId, Protocol_: stream.ProtocolRtsp, Conn: conn},
stream.BaseSink{ID: id, SourceID: sourceId, Protocol: stream.TransStreamRtsp, Conn: conn},
nil,
cb,
false,

View File

@@ -275,7 +275,7 @@ func NewTransStream(addr net.IPAddr, urlFormat string) stream.TransStream {
return t
}
func TransStreamFactory(source stream.Source, protocol stream.Protocol, streams []utils.AVStream) (stream.TransStream, error) {
func TransStreamFactory(source stream.Source, protocol stream.TransStreamProtocol, streams []utils.AVStream) (stream.TransStream, error) {
trackFormat := "?track=%d"
return NewTransStream(net.IPAddr{
IP: net.ParseIP(stream.AppConfig.PublicIP),

View File

@@ -12,7 +12,7 @@ import (
// 每个通知事件都需要携带的字段
type eventInfo struct {
Stream string `json:"stream"` //stream id
Stream string `json:"stream"` //stream GetID
Protocol string `json:"protocol"` //推拉流协议
RemoteAddr string `json:"remote_addr"` //peer地址
}
@@ -27,7 +27,7 @@ func responseBodyToString(resp *http.Response) string {
return string(bodyBytes)
}
func sendHookEvent(url string, body []byte) (*http.Response, error) {
func SendHookEvent(url string, body []byte) (*http.Response, error) {
client := &http.Client{
Timeout: time.Duration(AppConfig.Hooks.Timeout),
}
@@ -56,7 +56,7 @@ func Hook(event HookEvent, params string, body interface{}) (*http.Response, err
}
log.Sugar.Infof("sent a hook event for %s. url: %s body: %s", event.ToString(), url, bytes)
response, err := sendHookEvent(url, bytes)
response, err := SendHookEvent(url, bytes)
if err != nil {
log.Sugar.Errorf("failed to %s the hook event. err: %s", event.ToString(), err.Error())
} else {
@@ -71,11 +71,11 @@ func Hook(event HookEvent, params string, body interface{}) (*http.Response, err
}
func NewHookPlayEventInfo(sink Sink) eventInfo {
return eventInfo{Stream: sink.SourceId(), Protocol: sink.Protocol().ToString(), RemoteAddr: sink.PrintInfo()}
return eventInfo{Stream: sink.GetSourceID(), Protocol: sink.GetProtocol().ToString(), RemoteAddr: sink.String()}
}
func NewHookPublishEventInfo(source Source) eventInfo {
return eventInfo{Stream: source.Id(), Protocol: source.Type().ToString(), RemoteAddr: source.RemoteAddr()}
return eventInfo{Stream: source.GetID(), Protocol: source.GetType().ToString(), RemoteAddr: source.RemoteAddr()}
}
func NewRecordEventInfo(source Source, path string) interface{} {

View File

@@ -19,7 +19,7 @@ var (
hookUrls map[HookEvent]string
)
func InitHookUrl() {
func InitHookUrls() {
hookUrls = map[HookEvent]string{
HookEventPublish: AppConfig.Hooks.OnPublishUrl,
HookEventPublishDone: AppConfig.Hooks.OnPublishDoneUrl,

View File

@@ -12,7 +12,7 @@ func PreparePlaySink(sink Sink) (*http.Response, utils.HookState) {
if AppConfig.Hooks.IsEnableOnPlay() {
hook, err := Hook(HookEventPlay, sink.UrlValues().Encode(), NewHookPlayEventInfo(sink))
if err != nil {
log.Sugar.Errorf("通知播放事件失败 err:%s sink:%s-%v source:%s", err.Error(), sink.Protocol().ToString(), sink.Id(), sink.SourceId())
log.Sugar.Errorf("播放事件-通知失败 err:%s sink:%s-%v source:%s", err.Error(), sink.GetProtocol().ToString(), sink.GetID(), sink.GetSourceID())
return hook, utils.HookStateFailure
}
@@ -20,24 +20,24 @@ func PreparePlaySink(sink Sink) (*http.Response, utils.HookState) {
response = hook
}
source := SourceManager.Find(sink.SourceId())
source := SourceManager.Find(sink.GetSourceID())
if source == nil {
log.Sugar.Infof("添加sink到等待队列 sink:%s-%v source:%s", sink.Protocol().ToString(), sink.Id(), sink.SourceId())
log.Sugar.Infof("添加sink到等待队列 sink:%s-%v source:%s", sink.GetProtocol().ToString(), sink.GetID(), sink.GetSourceID())
{
sink.Lock()
defer sink.UnLock()
if SessionStateClosed == sink.State() {
log.Sugar.Warnf("添加到sink到等待队列失败, sink已经断开连接 %s", sink.Id())
if SessionStateClosed == sink.GetState() {
log.Sugar.Warnf("添加到sink到等待队列失败, sink已经断开连接 %s", sink.GetID())
return response, utils.HookStateFailure
} else {
sink.SetState(SessionStateWait)
AddSinkToWaitingQueue(sink.SourceId(), sink)
AddSinkToWaitingQueue(sink.GetSourceID(), sink)
}
}
} else {
source.AddEvent(SourceEventPlay, sink)
source.AddSink(sink)
}
return response, utils.HookStateOK
@@ -49,7 +49,7 @@ func HookPlayDoneEvent(sink Sink) (*http.Response, bool) {
if AppConfig.Hooks.IsEnableOnPlayDone() {
hook, err := Hook(HookEventPlayDone, sink.UrlValues().Encode(), NewHookPlayEventInfo(sink))
if err != nil {
log.Sugar.Errorf("通知播放结束事件失败 err:%s sink:%s-%v source:%s", err.Error(), sink.Protocol().ToString(), sink.Id(), sink.SourceId())
log.Sugar.Errorf("播放结束事件-通知失败 err:%s sink:%s-%v source:%s", err.Error(), sink.GetProtocol().ToString(), sink.GetID(), sink.GetSourceID())
return hook, false
}

View File

@@ -5,6 +5,7 @@ import (
"github.com/lkmio/avformat/utils"
"github.com/lkmio/lkm/log"
"net/http"
"time"
)
func PreparePublishSource(source Source, hook bool) (*http.Response, utils.HookState) {
@@ -23,17 +24,20 @@ func PreparePublishSource(source Source, hook bool) (*http.Response, utils.HookS
return nil, utils.HookStateOccupy
}
if AppConfig.ReceiveTimeout > 0 {
source.StartReceiveDataTimer()
if AppConfig.Hooks.IsEnableOnReceiveTimeout() && AppConfig.ReceiveTimeout > 0 {
StartReceiveDataTimer(source)
}
if AppConfig.IdleTimeout > 0 {
source.StartIdleTimer()
if AppConfig.Hooks.IsEnableOnIdleTimeout() && AppConfig.IdleTimeout > 0 {
StartIdleTimer(source)
}
urls := GetStreamPlayUrls(source.Id())
source.SetCreateTime(time.Now())
urls := GetStreamPlayUrls(source.GetID())
indent, _ := json.MarshalIndent(urls, "", "\t")
log.Sugar.Infof("%s准备推流 source:%s 拉流地址:\r\n%s", source.Type().ToString(), source.Id(), indent)
log.Sugar.Infof("%s准备推流 source:%s 拉流地址:\r\n%s", source.GetType().ToString(), source.GetID(), indent)
return response, utils.HookStateOK
}

View File

@@ -1,7 +1,6 @@
package stream
import (
"encoding/binary"
"fmt"
"github.com/lkmio/avformat/utils"
"net"
@@ -9,32 +8,33 @@ import (
"sync"
)
type SinkId interface{}
// Sink 对拉流端的封装
type Sink interface {
Id() SinkId
GetID() SinkID
SetID(sink SinkID)
GetSourceID() string
Input(data []byte) error
SendHeader(data []byte) error
SourceId() string
GetTransStreamID() TransStreamID
TransStreamId() TransStreamId
SetTransStreamID(id TransStreamID)
SetTransStreamId(id TransStreamId)
GetProtocol() TransStreamProtocol
Protocol() Protocol
// State 获取Sink状态, 调用前外部必须手动加锁
State() SessionState
// GetState 获取Sink状态, 调用前外部必须手动加锁
GetState() SessionState
// SetState 设置Sink状态, 调用前外部必须手动加锁
SetState(state SessionState)
EnableVideo() bool
// SetEnableVideo 允许客户端只拉取音频流
// SetEnableVideo 设置是否拉取视频流, 允许客户端只拉取音频流
SetEnableVideo(enable bool)
// DesiredAudioCodecId 允许客户端拉取指定的音频流
@@ -46,13 +46,13 @@ type Sink interface {
// Close 关闭释放Sink, 从传输流或等待队列中删除sink
Close()
PrintInfo() string
String() string
RemoteAddr() string
// Lock Sink请求拉流->Source推流->Sink断开整个阶段, 是无锁线程安全
//如果Sink在等待队列-Sink断开, 这个过程是非线程安全的
//所以Source在AddSink时, SessionStateWait状态时, 需要加锁保护.
// 如果Sink在等待队列-Sink断开, 这个过程是非线程安全的
// 所以Source在AddSink时, SessionStateWait状态时, 需要加锁保护.
Lock()
UnLock()
@@ -68,53 +68,30 @@ type Sink interface {
GetConn() net.Conn
}
// GenerateSinkId 根据网络地址生成SinkId IPV4使用一个uint64, IPV6使用String
func GenerateSinkId(addr net.Addr) SinkId {
network := addr.Network()
if "tcp" == network {
to4 := addr.(*net.TCPAddr).IP.To4()
if to4 == nil {
to4 = make([]byte, 4)
}
id := uint64(binary.BigEndian.Uint32(to4))
id <<= 32
id |= uint64(addr.(*net.TCPAddr).Port << 16)
return id
} else if "udp" == network {
id := uint64(binary.BigEndian.Uint32(addr.(*net.UDPAddr).IP.To4()))
id <<= 32
id |= uint64(addr.(*net.UDPAddr).Port << 16)
return id
}
return addr.String()
}
type BaseSink struct {
Id_ SinkId
SourceId_ string
Protocol_ Protocol
State_ SessionState
TransStreamId_ TransStreamId
disableVideo bool
ID SinkID
SourceID string
Protocol TransStreamProtocol
State SessionState
TransStreamID TransStreamID
disableVideo bool
lock sync.RWMutex
//HasSentKeyVideo 是否已经发送视频关键帧
//未开启GOP缓存的情况下为避免播放花屏发送的首个视频帧必须为关键帧
HasSentKeyVideo bool
lock sync.RWMutex
HasSentKeyVideo bool // 是否已经发送视频关键帧未开启GOP缓存的情况下为避免播放花屏发送的首个视频帧必须为关键帧
DesiredAudioCodecId_ utils.AVCodecID
DesiredVideoCodecId_ utils.AVCodecID
Conn net.Conn
urlValues url.Values
urlValues url.Values // 拉流时携带的Url参数
}
func (s *BaseSink) Id() SinkId {
return s.Id_
func (s *BaseSink) GetID() SinkID {
return s.ID
}
func (s *BaseSink) SetID(id SinkID) {
s.ID = id
}
func (s *BaseSink) Input(data []byte) error {
@@ -131,20 +108,20 @@ func (s *BaseSink) SendHeader(data []byte) error {
return s.Input(data)
}
func (s *BaseSink) SourceId() string {
return s.SourceId_
func (s *BaseSink) GetSourceID() string {
return s.SourceID
}
func (s *BaseSink) TransStreamId() TransStreamId {
return s.TransStreamId_
func (s *BaseSink) GetTransStreamID() TransStreamID {
return s.TransStreamID
}
func (s *BaseSink) SetTransStreamId(id TransStreamId) {
s.TransStreamId_ = id
func (s *BaseSink) SetTransStreamID(id TransStreamID) {
s.TransStreamID = id
}
func (s *BaseSink) Protocol() Protocol {
return s.Protocol_
func (s *BaseSink) GetProtocol() TransStreamProtocol {
return s.Protocol
}
func (s *BaseSink) Lock() {
@@ -155,16 +132,16 @@ func (s *BaseSink) UnLock() {
s.lock.Unlock()
}
func (s *BaseSink) State() SessionState {
func (s *BaseSink) GetState() SessionState {
utils.Assert(!s.lock.TryLock())
return s.State_
return s.State
}
func (s *BaseSink) SetState(state SessionState) {
utils.Assert(!s.lock.TryLock())
s.State_ = state
s.State = state
}
func (s *BaseSink) EnableVideo() bool {
@@ -190,7 +167,7 @@ func (s *BaseSink) DesiredVideoCodecId() utils.AVCodecID {
// 拉流断开连接,不需要考虑线程安全
// 踢流走source管道删除,并且关闭Conn
func (s *BaseSink) Close() {
if SessionStateClosed == s.State_ {
if SessionStateClosed == s.State {
return
}
@@ -199,35 +176,37 @@ func (s *BaseSink) Close() {
s.Conn = nil
}
//还没有添加到任何队列, 不做任何处理
if s.State_ < SessionStateWait {
// Sink未添加到任何队列, 不做处理
if s.State < SessionStateWait {
return
}
// 更新Sink状态
var state SessionState
{
s.Lock()
defer s.UnLock()
if s.State_ == SessionStateClosed {
if s.State == SessionStateClosed {
return
}
state = s.State_
s.State_ = SessionStateClosed
state = s.State
s.State = SessionStateClosed
}
if state == SessionStateTransferring {
source := SourceManager.Find(s.SourceId_)
source.AddEvent(SourceEventPlayDone, s)
// 从Source中删除Sink
source := SourceManager.Find(s.SourceID)
source.RemoveSink(s)
} else if state == SessionStateWait {
RemoveSinkFromWaitingQueue(s.SourceId_, s.Id_)
//拉流结束事件, 在等待队列直接发送通知, 在拉流由Source负责发送.
// 从等待队列中删除Sink
RemoveSinkFromWaitingQueue(s.SourceID, s.ID)
go HookPlayDoneEvent(s)
}
}
func (s *BaseSink) PrintInfo() string {
return fmt.Sprintf("%s-%v source:%s", s.Protocol().ToString(), s.Id_, s.SourceId_)
func (s *BaseSink) String() string {
return fmt.Sprintf("%s-%v source:%s", s.GetProtocol().ToString(), s.ID, s.SourceID)
}
func (s *BaseSink) RemoteAddr() string {
@@ -251,6 +230,7 @@ func (s *BaseSink) Start() {
}
func (s *BaseSink) Flush() {
}
func (s *BaseSink) GetConn() net.Conn {

View File

@@ -5,27 +5,27 @@ import (
"sync"
)
// SinkManager 目前只用于保存HLS拉流Sink
var SinkManager *sinkManager
func init() {
SinkManager = &sinkManager{}
}
// ISinkManager 添加到TransStream的所有Sink
type sinkManager struct {
m sync.Map
}
func (s *sinkManager) Add(sink Sink) error {
_, ok := s.m.LoadOrStore(sink.Id(), sink)
_, ok := s.m.LoadOrStore(sink.GetID(), sink)
if ok {
return fmt.Errorf("the sink %s has been exist", sink.Id())
return fmt.Errorf("the sink %s has been exist", sink.GetID())
}
return nil
}
func (s *sinkManager) Find(id SinkId) Sink {
func (s *sinkManager) Find(id SinkID) Sink {
value, ok := s.m.Load(id)
if ok {
return value.(Sink)
@@ -34,16 +34,16 @@ func (s *sinkManager) Find(id SinkId) Sink {
return nil
}
func (s *sinkManager) Remove(id SinkId) (Sink, error) {
func (s *sinkManager) Remove(id SinkID) (Sink, error) {
value, loaded := s.m.LoadAndDelete(id)
if loaded {
return value.(Sink), nil
}
return nil, fmt.Errorf("source with id %s was not find", id)
return nil, fmt.Errorf("source with GetID %s was not find", id)
}
func (s *sinkManager) Exist(id SinkId) bool {
func (s *sinkManager) Exist(id SinkID) bool {
_, ok := s.m.Load(id)
return ok
}

50
stream/sink_utils.go Normal file
View File

@@ -0,0 +1,50 @@
package stream
import (
"encoding/binary"
"net"
"strconv"
)
// SinkID IPV4使用uint64、IPV6使用string作为ID类型
type SinkID interface{}
type IPV4SinkID uint64
type IPV6SinkID string
func ipv4Addr2UInt64(ip uint32, port int) uint64 {
return (uint64(ip) << 32) | uint64(port)
}
// NetAddr2SinkId 根据网络地址生成SinkId IPV4使用一个uint64, IPV6使用String
func NetAddr2SinkId(addr net.Addr) SinkID {
network := addr.Network()
if "tcp" == network {
to4 := addr.(*net.TCPAddr).IP.To4()
var intIP uint32
if to4 != nil {
intIP = binary.BigEndian.Uint32(to4)
}
return ipv4Addr2UInt64(intIP, addr.(*net.TCPAddr).Port)
} else if "udp" == network {
to4 := addr.(*net.UDPAddr).IP.To4()
var intIP uint32
if to4 != nil {
intIP = binary.BigEndian.Uint32(to4)
}
return ipv4Addr2UInt64(intIP, addr.(*net.UDPAddr).Port)
}
return addr.String()
}
func SinkId2String(id SinkID) string {
if i, ok := id.(uint64); ok {
return strconv.FormatUint(i, 10)
}
return id.(string)
}

View File

@@ -3,12 +3,12 @@ package stream
import "sync"
// 等待队列所有的Sink
var waitingSinks map[string]map[SinkId]Sink
var waitingSinks map[string]map[SinkID]Sink
var mutex sync.RWMutex
func init() {
waitingSinks = make(map[string]map[SinkId]Sink, 1024)
waitingSinks = make(map[string]map[SinkID]Sink, 1024)
}
func AddSinkToWaitingQueue(streamId string, sink Sink) {
@@ -18,15 +18,15 @@ func AddSinkToWaitingQueue(streamId string, sink Sink) {
m, ok := waitingSinks[streamId]
if !ok {
if m, ok = waitingSinks[streamId]; !ok {
m = make(map[SinkId]Sink, 64)
m = make(map[SinkID]Sink, 64)
waitingSinks[streamId] = m
}
}
m[sink.Id()] = sink
m[sink.GetID()] = sink
}
func RemoveSinkFromWaitingQueue(sourceId string, sinkId SinkId) (Sink, bool) {
func RemoveSinkFromWaitingQueue(sourceId string, sinkId SinkID) (Sink, bool) {
mutex.Lock()
defer mutex.Unlock()
@@ -63,7 +63,7 @@ func PopWaitingSinks(sourceId string) []Sink {
return sinks
}
func ExistSinkInWaitingQueue(sourceId string, sinkId SinkId) bool {
func ExistSinkInWaitingQueue(sourceId string, sinkId SinkID) bool {
mutex.RLock()
defer mutex.RUnlock()
@@ -76,7 +76,7 @@ func ExistSinkInWaitingQueue(sourceId string, sinkId SinkId) bool {
return ok
}
func ExistSink(sourceId string, sinkId SinkId) bool {
func ExistSink(sourceId string, sinkId SinkID) bool {
if sourceId != "" {
if exist := ExistSinkInWaitingQueue(sourceId, sinkId); exist {
return true

View File

@@ -2,7 +2,6 @@ package stream
import (
"fmt"
"github.com/lkmio/avformat/transport"
"github.com/lkmio/lkm/collections"
"github.com/lkmio/lkm/log"
"net"
@@ -14,58 +13,18 @@ import (
"github.com/lkmio/lkm/transcode"
)
// SourceType 推流类型
type SourceType byte
// Protocol 输出的流协议
type Protocol uint32
type SourceEvent byte
// SessionState 推拉流Session的状态
// 包含握手和Hook授权阶段
type SessionState uint32
const (
SourceTypeRtmp = SourceType(1)
SourceType28181 = SourceType(2)
SourceType1078 = SourceType(3)
ProtocolRtmp = Protocol(1)
ProtocolFlv = Protocol(2)
ProtocolRtsp = Protocol(3)
ProtocolHls = Protocol(4)
ProtocolRtc = Protocol(5)
SourceEventPlay = SourceEvent(1)
SourceEventPlayDone = SourceEvent(2)
SourceEventInput = SourceEvent(3)
SourceEventClose = SourceEvent(4)
)
const (
SessionStateCreate = SessionState(1) //新建状态
SessionStateHandshaking = SessionState(2) //握手中
SessionStateHandshakeFailure = SessionState(3) //握手失败
SessionStateHandshakeDone = SessionState(4) //握手完成
SessionStateWait = SessionState(5) //位于等待队列中
SessionStateTransferring = SessionState(6) //推拉流中
SessionStateClosed = SessionState(7) //关闭状态
)
// Source 父类Source负责, 除解析流以外的所有事情
// Source 对推流源的封装, 处理除解析流以外的所有事情
type Source interface {
// Id Source的唯一ID/**
Id() string
// GetID 返回SourceID
GetID() string
SetId(id string)
SetID(id string)
// Input 输入推流数据
//@Return bool fatal error.释放Source
Input(data []byte) error
// Type 推流类型
Type() SourceType
// GetType 返回推流类型
GetType() SourceType
SetType(sourceType SourceType)
@@ -76,21 +35,26 @@ type Source interface {
TranscodeStreams() []utils.AVStream
// AddSink 添加Sink, 在此之前请确保Sink已经握手、授权通过. 如果Source还未WriteHeader先将Sink添加到等待队列.
// 匹配拉流的编码器, 创建TransStream或向存在TransStream添加Sink
AddSink(sink Sink) bool
// 匹配拉流期望的编码器, 创建TransStream或向已经存在TransStream添加Sink
AddSink(sink Sink)
// RemoveSink 删除Sink/**
RemoveSink(sink Sink) bool
// RemoveSink 删除Sink
RemoveSink(sink Sink)
AddEvent(event SourceEvent, data interface{})
RemoveSinkWithID(id SinkID)
SetState(state SessionState)
// Close 关闭Source
// 停止一切封装和转发流以及转码工作
// 关闭推流网络链路, 停止一切封装和转发流以及转码工作
// 将Sink添加到等待队列
Close()
DoClose()
// IsCompleted 所有推流track是否解析完毕
IsCompleted() bool
// FindOrCreatePacketBuffer 查找或者创建AVPacket的内存池
FindOrCreatePacketBuffer(index int, mediaType utils.AVMediaType) collections.MemoryPool
@@ -100,10 +64,7 @@ type Source interface {
// OnDeMuxStream 解析出AVStream回调
OnDeMuxStream(stream utils.AVStream)
// IsCompleted 是否已经WireHeader
IsCompleted() bool
// OnDeMuxStreamDone 所有track解析完毕, 后续的OnDeMuxStream回调不再处理
// OnDeMuxStreamDone 所有track解析完毕回调, 后续的OnDeMuxStream回调不再处理
OnDeMuxStreamDone()
// OnDeMuxPacket 解析出AvPacket回调
@@ -112,127 +73,166 @@ type Source interface {
// OnDeMuxDone 所有流解析完毕回调
OnDeMuxDone()
Init(inputCB func(data []byte) error, closeCB func(), receiveQueueSize int)
LoopEvent()
Init(receiveQueueSize int)
RemoteAddr() string
PrintInfo() string
// StartReceiveDataTimer 启动收流超时计时器
StartReceiveDataTimer()
// StartIdleTimer 启动拉流空闲计时器
StartIdleTimer()
String() string
State() SessionState
SetInputCb(func(data []byte) error)
// UrlValues 返回推流url参数
UrlValues() url.Values
// SetUrlValues 设置推流url参数
SetUrlValues(values url.Values)
// PostEvent 切换到主协程执行当前函数
PostEvent(cb func())
LastPacketTime() time.Time
SetLastPacketTime(time2 time.Time)
SinkCount() int
LastStreamEndTime() time.Time
SetReceiveDataTimer(timer *time.Timer)
SetIdleTimer(timer *time.Timer)
IsClosed() bool
StreamPipe() chan []byte
MainContextEvents() chan func()
CreateTime() time.Time
SetCreateTime(time time.Time)
}
type PublishSource struct {
Id_ string
Type_ SourceType
ID string
Type SourceType
state SessionState
Conn net.Conn
TransDeMuxer stream.DeMuxer //负责从推流协议中解析出AVStream和AVPacket
recordSink Sink //每个Source的录制流
recordFilePath string //录制流文件路径
hlsStream TransStream //HLS传输流, 如果开启, 在@seee writeHeader 直接创建, 如果等拉流时再创建, 会进一步加大HLS延迟.
audioTranscoders []transcode.Transcoder //音频解码器
videoTranscoders []transcode.Transcoder //视频解码器
originStreams StreamManager //推流的音视频Streams
allStreams StreamManager //推流Streams+转码器获得的Stream
pktBuffers [8]collections.MemoryPool //推流每路的AVPacket缓存, AVPacket的data从该内存池中分配. 在GOP缓存溢出时,释放池中内存.
gopBuffer GOPBuffer //GOP缓存, 音频和视频混合使用, 以视频关键帧为界, 缓存第二个视频关键帧时, 释放前一组gop. 如果不存在视频流, 不缓存音频
TransDeMuxer stream.DeMuxer // 负责从推流协议中解析出AVStream和AVPacket
recordSink Sink // 每个Source的录制流
recordFilePath string // 录制流文件路径
hlsStream TransStream // HLS传输流, 如果开启, 在@see writeHeader 函数中直接创建, 如果等拉流时再创建, 会进一步加大HLS延迟.
audioTranscoders []transcode.Transcoder // 音频解码器
videoTranscoders []transcode.Transcoder // 视频解码器
originStreams StreamManager // 推流的音视频Streams
allStreams StreamManager // 推流Streams+转码器获得的Stream
pktBuffers [8]collections.MemoryPool // 推流每路的AVPacket缓存, AVPacket的data从该内存池中分配. 在GOP缓存溢出时,释放池中内存.
gopBuffer GOPBuffer // GOP缓存, 音频和视频混合使用, 以视频关键帧为界, 缓存第二个视频关键帧时, 释放前一组gop. 如果不存在视频流, 不缓存音频
existVideo bool //是否存在视频
completed bool
probeTimer *time.Timer
closed bool // source是否已经关闭
completed bool // 所有推流track是否解析完毕, @see writeHeader 函数中赋值为true
existVideo bool // 是否存在视频
inputCB func(data []byte) error //子类Input回调
closeCB func() //子类Close回调
probeTimer *time.Timer // track解析超时计时器, 触发时执行@see writeHeader
receiveDataTimer *time.Timer // 收流超时计时器
idleTimer *time.Timer // 拉流空闲计时器
transStreams map[TransStreamId]TransStream //所有的输出流, 持有Sink
TransStreams map[TransStreamID]TransStream //所有的输出流, 持有Sink
//sink的拉流和断开拉流事件都通过管道交给Source处理. 意味着Source内部解析流、封装流、传输流都可以做到无锁操作
//golang的管道是有锁的(https://github.com/golang/go/blob/d38f1d13fa413436d38d86fe86d6a146be44bb84/src/runtime/chan.go#L202), 后面使用cas队列传输事件, 并且可以做到一次读取多个事件
inputDataEvent chan []byte
closedEvent chan byte //发送关闭事件
closedConsumedEvent chan byte //关闭事件已经被消费
playingEventQueue chan Sink
playingDoneEventQueue chan Sink
probeTimoutEvent chan bool
streamPipe chan []byte // 推流数据管道
mainContextEvents chan func() // 切换到主协程执行函数的事件管道
lastPacketTime time.Time
removeSinkTime time.Time
receiveDataTimer *time.Timer
idleTimer *time.Timer
sinkCount int //拉流计数
closed bool //是否已经被关闭
urlValues url.Values
timeoutTracks []int
lastPacketTime time.Time // 最近收到推流包的时间
lastStreamEndTime time.Time // 最近拉流端结束拉流的时间
sinkCount int // 拉流端计数
urlValues url.Values // 推流url携带的参数
timeoutTracks []int
createTime time.Time // source创建时间
}
func (s *PublishSource) Id() string {
return s.Id_
func (s *PublishSource) SetLastPacketTime(time2 time.Time) {
s.lastPacketTime = time2
}
func (s *PublishSource) SetId(id string) {
s.Id_ = id
func (s *PublishSource) IsClosed() bool {
return s.closed
}
func (s *PublishSource) Init(inputCB func(data []byte) error, closeCB func(), receiveQueueSize int) {
s.inputCB = inputCB
s.closeCB = closeCB
func (s *PublishSource) StreamPipe() chan []byte {
return s.streamPipe
}
func (s *PublishSource) MainContextEvents() chan func() {
return s.mainContextEvents
}
func (s *PublishSource) SetReceiveDataTimer(timer *time.Timer) {
s.receiveDataTimer = timer
}
func (s *PublishSource) SetIdleTimer(timer *time.Timer) {
s.idleTimer = timer
}
func (s *PublishSource) LastStreamEndTime() time.Time {
return s.lastStreamEndTime
}
func (s *PublishSource) LastPacketTime() time.Time {
return s.lastPacketTime
}
func (s *PublishSource) SinkCount() int {
return s.sinkCount
}
func (s *PublishSource) GetID() string {
return s.ID
}
func (s *PublishSource) SetID(id string) {
s.ID = id
}
func (s *PublishSource) Init(receiveQueueSize int) {
s.SetState(SessionStateHandshakeDone)
//初始化事件接收缓冲区
//收流和网络断开的chan都阻塞执行
//-2是为了保证从管道取到流, 到处理完流.整个过程安全的, 不会被覆盖
s.inputDataEvent = make(chan []byte, receiveQueueSize-2)
s.closedEvent = make(chan byte)
s.closedConsumedEvent = make(chan byte)
s.playingEventQueue = make(chan Sink, 128)
s.playingDoneEventQueue = make(chan Sink, 128)
s.probeTimoutEvent = make(chan bool)
// 初始化事件接收管道
// -2是为了保证从管道取到流, 到处理完流整个过程安全的, 不会被覆盖
s.streamPipe = make(chan []byte, receiveQueueSize-2)
s.mainContextEvents = make(chan func(), 128)
}
func (s *PublishSource) CreateDefaultOutStreams() {
if s.transStreams == nil {
s.transStreams = make(map[TransStreamId]TransStream, 10)
if s.TransStreams == nil {
s.TransStreams = make(map[TransStreamID]TransStream, 10)
}
//创建录制流
// 创建录制流
if AppConfig.Record.Enable {
sink, path, err := CreateRecordStream(s.Id_)
sink, path, err := CreateRecordStream(s.ID)
if err != nil {
log.Sugar.Errorf("创建录制sink失败 source:%s err:%s", s.Id_, err.Error())
log.Sugar.Errorf("创建录制sink失败 source:%s err:%s", s.ID, err.Error())
} else {
s.recordSink = sink
s.recordFilePath = path
}
}
//创建HLS输出流
// 创建HLS输出流
if AppConfig.Hls.Enable {
streams := s.OriginStreams()
utils.Assert(len(streams) > 0)
hlsStream, err := s.CreateTransStream(ProtocolHls, streams)
hlsStream, err := s.CreateTransStream(TransStreamHls, streams)
if err != nil {
panic(err)
}
s.dispatchGOPBuffer(hlsStream)
s.hlsStream = hlsStream
s.transStreams[GenerateTransStreamId(ProtocolHls, streams...)] = s.hlsStream
s.TransStreams[GenerateTransStreamID(TransStreamHls, streams...)] = s.hlsStream
}
}
@@ -246,11 +246,11 @@ func (s *PublishSource) FindOrCreatePacketBuffer(index int, mediaType utils.AVMe
if utils.AVMediaTypeAudio == mediaType {
s.pktBuffers[index] = collections.NewRbMemoryPool(48000 * 12)
} else if AppConfig.GOPCache {
//开启GOP缓存
// 开启GOP缓存
s.pktBuffers[index] = collections.NewRbMemoryPool(AppConfig.GOPBufferSize)
} else {
//未开启GOP缓存
//1M缓存大小, 单帧绰绰有余
// 未开启GOP缓存
// 1M缓存大小, 单帧绰绰有余
s.pktBuffers[index] = collections.NewRbMemoryPool(1024 * 1000)
}
}
@@ -258,48 +258,8 @@ func (s *PublishSource) FindOrCreatePacketBuffer(index int, mediaType utils.AVMe
return s.pktBuffers[index]
}
func (s *PublishSource) LoopEvent() {
for {
select {
case data := <-s.inputDataEvent:
if s.closed {
break
}
if AppConfig.ReceiveTimeout > 0 {
s.lastPacketTime = time.Now()
}
if err := s.inputCB(data); err != nil {
log.Sugar.Errorf("处理输入流失败 释放source:%s err:%s", s.Id_, err.Error())
s.doClose()
}
break
case sink := <-s.playingEventQueue:
if !s.completed {
AddSinkToWaitingQueue(sink.SourceId(), sink)
} else {
if !s.AddSink(sink) {
sink.Close()
}
}
break
case sink := <-s.playingDoneEventQueue:
s.RemoveSink(sink)
break
case _ = <-s.closedEvent:
s.doClose()
s.closedConsumedEvent <- 1
return
case _ = <-s.probeTimoutEvent:
s.writeHeader()
break
}
}
}
func (s *PublishSource) Input(data []byte) error {
s.AddEvent(SourceEventInput, data)
s.streamPipe <- data
return nil
}
@@ -311,20 +271,20 @@ func (s *PublishSource) TranscodeStreams() []utils.AVStream {
return s.allStreams.All()
}
func IsSupportMux(protocol Protocol, audioCodecId, videoCodecId utils.AVCodecID) bool {
if ProtocolRtmp == protocol || ProtocolFlv == protocol {
func IsSupportMux(protocol TransStreamProtocol, audioCodecId, videoCodecId utils.AVCodecID) bool {
if TransStreamRtmp == protocol || TransStreamFlv == protocol {
}
return true
}
func (s *PublishSource) CreateTransStream(protocol Protocol, streams []utils.AVStream) (TransStream, error) {
log.Sugar.Debugf("创建%s-stream source:%s", protocol.ToString(), s.Id_)
func (s *PublishSource) CreateTransStream(protocol TransStreamProtocol, streams []utils.AVStream) (TransStream, error) {
log.Sugar.Debugf("创建%s-stream source:%s", protocol.ToString(), s.ID)
transStream, err := CreateTransStream(s, protocol, streams)
if err != nil {
log.Sugar.Errorf("创建传输流失败 err:%s source:%s", err.Error(), s.Id_)
log.Sugar.Errorf("创建传输流失败 err:%s source:%s", err.Error(), s.ID)
return nil, err
}
@@ -344,7 +304,7 @@ func (s *PublishSource) dispatchGOPBuffer(transStream TransStream) {
})
}
func (s *PublishSource) AddSink(sink Sink) bool {
func (s *PublishSource) doAddSink(sink Sink) bool {
// 暂时不考虑多路视频流意味着只能1路视频流和多路音频流同理originStreams和allStreams里面的Stream互斥. 同时多路音频流的Codec必须一致
audioCodecId, videoCodecId := sink.DesiredAudioCodecId(), sink.DesiredVideoCodecId()
audioStream := s.originStreams.FindStreamWithType(utils.AVMediaTypeAudio)
@@ -356,8 +316,8 @@ func (s *PublishSource) AddSink(sink Sink) bool {
return false
}
//不支持对期望编码的流封装. 降级
if (utils.AVCodecIdNONE != audioCodecId || utils.AVCodecIdNONE != videoCodecId) && !IsSupportMux(sink.Protocol(), audioCodecId, videoCodecId) {
// 不支持对期望编码的流封装. 降级
if (utils.AVCodecIdNONE != audioCodecId || utils.AVCodecIdNONE != videoCodecId) && !IsSupportMux(sink.GetProtocol(), audioCodecId, videoCodecId) {
audioCodecId = utils.AVCodecIdNONE
videoCodecId = utils.AVCodecIdNONE
}
@@ -369,12 +329,12 @@ func (s *PublishSource) AddSink(sink Sink) bool {
videoCodecId = videoStream.CodecId()
}
//创建音频转码器
// 创建音频转码器
if !disableAudio && audioCodecId != audioStream.CodecId() {
utils.Assert(false)
}
//创建视频转码器
// 创建视频转码器
if !disableVideo && videoCodecId != videoStream.CodecId() {
utils.Assert(false)
}
@@ -391,31 +351,31 @@ func (s *PublishSource) AddSink(sink Sink) bool {
size++
}
transStreamId := GenerateTransStreamId(sink.Protocol(), streams[:size]...)
transStream, ok := s.transStreams[transStreamId]
transStreamId := GenerateTransStreamID(sink.GetProtocol(), streams[:size]...)
transStream, ok := s.TransStreams[transStreamId]
if !ok {
if s.transStreams == nil {
s.transStreams = make(map[TransStreamId]TransStream, 10)
if s.TransStreams == nil {
s.TransStreams = make(map[TransStreamID]TransStream, 10)
}
var err error
transStream, err = s.CreateTransStream(sink.Protocol(), streams[:size])
transStream, err = s.CreateTransStream(sink.GetProtocol(), streams[:size])
if err != nil {
log.Sugar.Errorf("创建传输流失败 err:%s source:%s", err.Error(), s.Id_)
log.Sugar.Errorf("创建传输流失败 err: %s source: %s", err.Error(), s.ID)
return false
}
s.transStreams[transStreamId] = transStream
s.TransStreams[transStreamId] = transStream
}
sink.SetTransStreamId(transStreamId)
sink.SetTransStreamID(transStreamId)
{
sink.Lock()
defer sink.UnLock()
if SessionStateClosed == sink.State() {
log.Sugar.Warnf("AddSink失败, sink已经断开连接 %s", sink.PrintInfo())
if SessionStateClosed == sink.GetState() {
log.Sugar.Warnf("AddSink失败, sink已经断开连接 %s", sink.String())
} else {
transStream.AddSink(sink)
}
@@ -424,10 +384,10 @@ func (s *PublishSource) AddSink(sink Sink) bool {
if s.recordSink != sink {
s.sinkCount++
log.Sugar.Infof("sink count:%d source:%s", s.sinkCount, s.Id_)
log.Sugar.Infof("sink count: %d source: %s", s.sinkCount, s.ID)
}
//新的传输流,发送缓存的音视频帧
// 新建传输流,发送缓存的音视频帧
if !ok && AppConfig.GOPCache && s.existVideo {
s.dispatchGOPBuffer(transStream)
}
@@ -435,59 +395,84 @@ func (s *PublishSource) AddSink(sink Sink) bool {
return true
}
func (s *PublishSource) RemoveSink(sink Sink) bool {
id := sink.TransStreamId()
func (s *PublishSource) AddSink(sink Sink) {
s.PostEvent(func() {
if !s.completed {
AddSinkToWaitingQueue(sink.GetSourceID(), sink)
} else {
if !s.doAddSink(sink) {
sink.Close()
}
}
})
}
func (s *PublishSource) RemoveSink(sink Sink) {
s.PostEvent(func() {
s.doRemoveSink(sink)
})
}
func (s *PublishSource) RemoveSinkWithID(id SinkID) {
s.PostEvent(func() {
for _, transStream := range s.TransStreams {
if sink, _ := transStream.RemoveSink(id); sink != nil {
s.doRemoveSink(sink)
break
}
}
})
}
func (s *PublishSource) doRemoveSink(sink Sink) bool {
id := sink.GetTransStreamID()
if id > 0 {
transStream := s.transStreams[id]
//如果从传输流没能删除sink, 再从等待队列删除
_, b := transStream.RemoveSink(sink.Id())
transStream := s.TransStreams[id]
// 从输出流中删除Sink
_, b := transStream.RemoveSink(sink.GetID())
if b {
s.sinkCount--
s.removeSinkTime = time.Now()
s.lastStreamEndTime = time.Now()
HookPlayDoneEvent(sink)
log.Sugar.Infof("sink count:%d source:%s", s.sinkCount, s.Id_)
log.Sugar.Infof("sink count: %d source: %s", s.sinkCount, s.ID)
return true
}
}
_, b := RemoveSinkFromWaitingQueue(sink.SourceId(), sink.Id())
return b
}
// 从等待队列中删除Sink
_, b := RemoveSinkFromWaitingQueue(sink.GetSourceID(), sink.GetID())
func (s *PublishSource) AddEvent(event SourceEvent, data interface{}) {
if SourceEventInput == event {
s.inputDataEvent <- data.([]byte)
} else if SourceEventPlay == event {
s.playingEventQueue <- data.(Sink)
} else if SourceEventPlayDone == event {
s.playingDoneEventQueue <- data.(Sink)
} else if SourceEventClose == event {
s.closedEvent <- 0
}
// 从HLS拉流队列删除
SinkManager.Remove(sink.GetID())
return b
}
func (s *PublishSource) SetState(state SessionState) {
s.state = state
}
func (s *PublishSource) doClose() {
func (s *PublishSource) DoClose() {
if s.closed {
return
}
log.Sugar.Infof("关闭推流源 id: %s", s.ID)
if s.TransDeMuxer != nil {
s.TransDeMuxer.Close()
s.TransDeMuxer = nil
}
//清空未写完的buffer
// 清空未写完的buffer
for _, buffer := range s.pktBuffers {
if buffer != nil {
buffer.Reset()
}
}
//释放GOP缓存
// 释放GOP缓存
if s.gopBuffer != nil {
s.gopBuffer.Clear()
s.gopBuffer.Close()
@@ -510,19 +495,20 @@ func (s *PublishSource) doClose() {
s.recordSink.Close()
}
//释放解复用器
//释放转码器
//释放每路转协议流, 将所有sink添加到等待队列
_, err := SourceManager.Remove(s.Id_)
// 释放解复用器
// 释放转码器
// 释放每路转协议流, 将所有sink添加到等待队列
_, err := SourceManager.Remove(s.ID)
if err != nil {
log.Sugar.Errorf("删除源失败 source:%s err:%s", s.Id_, err.Error())
log.Sugar.Errorf("删除源失败 source:%s err:%s", s.ID, err.Error())
}
for _, transStream := range s.transStreams {
// 将所有Sink添加到等待队列
for _, transStream := range s.TransStreams {
transStream.Close()
transStream.PopAllSink(func(sink Sink) {
sink.SetTransStreamId(0)
sink.SetTransStreamID(0)
if s.recordSink == sink {
return
}
@@ -531,24 +517,24 @@ func (s *PublishSource) doClose() {
sink.Lock()
defer sink.UnLock()
if SessionStateClosed == sink.State() {
log.Sugar.Warnf("添加到sink到等待队列失败, sink已经断开连接 %s", sink.PrintInfo())
if SessionStateClosed == sink.GetState() {
log.Sugar.Warnf("添加到sink到等待队列失败, sink已经断开连接 %s", sink.String())
} else {
sink.SetState(SessionStateWait)
AddSinkToWaitingQueue(s.Id_, sink)
AddSinkToWaitingQueue(s.ID, sink)
}
}
if SessionStateClosed != sink.State() {
if SessionStateClosed != sink.GetState() {
sink.Flush()
}
})
}
s.closed = true
s.transStreams = nil
s.TransStreams = nil
go func() {
if s.Conn != nil && s.Conn.(*transport.Conn).IsActive() {
if s.Conn != nil {
s.Conn.Close()
s.Conn = nil
}
@@ -562,8 +548,9 @@ func (s *PublishSource) doClose() {
}
func (s *PublishSource) Close() {
s.AddEvent(SourceEventClose, nil)
<-s.closedConsumedEvent
s.PostEvent(func() {
s.DoClose()
})
}
func (s *PublishSource) OnDiscardPacket(packet utils.AVPacket) {
@@ -572,17 +559,19 @@ func (s *PublishSource) OnDiscardPacket(packet utils.AVPacket) {
func (s *PublishSource) OnDeMuxStream(stream utils.AVStream) {
if s.completed {
log.Sugar.Warnf("添加Stream失败 Source: %s已经WriteHeader", s.Id_)
log.Sugar.Warnf("添加Stream失败 Source: %s已经WriteHeader", s.ID)
return
}
s.originStreams.Add(stream)
s.allStreams.Add(stream)
//启动探测超时计时器
// 启动track解析超时计时器
if len(s.originStreams.All()) == 1 {
s.probeTimer = time.AfterFunc(time.Duration(AppConfig.ProbeTimeout)*time.Millisecond, func() {
s.probeTimoutEvent <- true
s.PostEvent(func() {
s.writeHeader()
})
})
}
@@ -590,7 +579,7 @@ func (s *PublishSource) OnDeMuxStream(stream utils.AVStream) {
s.existVideo = true
}
//创建GOPBuffer
// 创建GOPBuffer
if AppConfig.GOPCache && s.existVideo && s.gopBuffer == nil {
s.gopBuffer = NewStreamBuffer()
//设置GOP缓存溢出回调
@@ -598,10 +587,10 @@ func (s *PublishSource) OnDeMuxStream(stream utils.AVStream) {
}
}
// 从DeMuxer解析完Stream后, 处理等待Sinks
// 解析完所有track后, 做一些初始化工作
func (s *PublishSource) writeHeader() {
if s.completed {
fmt.Printf("添加Stream失败 Source: %s已经WriteHeader", s.Id_)
fmt.Printf("添加Stream失败 Source: %s已经WriteHeader", s.ID)
return
}
@@ -611,21 +600,22 @@ func (s *PublishSource) writeHeader() {
}
if len(s.originStreams.All()) == 0 {
log.Sugar.Errorf("没有一路流, 删除source:%s", s.Id_)
s.doClose()
log.Sugar.Errorf("没有一路流, 删除source:%s", s.ID)
s.DoClose()
return
}
//创建录制流和HLS
// 创建录制流和HLS
s.CreateDefaultOutStreams()
sinks := PopWaitingSinks(s.Id_)
// 将等待队列的Sink添加到输出流队列
sinks := PopWaitingSinks(s.ID)
if s.recordSink != nil {
sinks = append(sinks, s.recordSink)
}
for _, sink := range sinks {
if !s.AddSink(sink) {
if !s.doAddSink(sink) {
sink.Close()
}
}
@@ -668,12 +658,12 @@ func (s *PublishSource) OnDeMuxPacket(packet utils.AVPacket) {
s.gopBuffer.AddPacket(packet)
}
//分发给各个传输流
for _, stream_ := range s.transStreams {
// 分发给各个传输流
for _, stream_ := range s.TransStreams {
stream_.Input(packet)
}
//未开启GOP缓存或只存在音频流, 释放掉内存
// 未开启GOP缓存或只存在音频流, 释放掉内存
if !AppConfig.GOPCache || !s.existVideo {
s.FindOrCreatePacketBuffer(packet.Index(), packet.MediaType()).FreeTail()
}
@@ -683,12 +673,12 @@ func (s *PublishSource) OnDeMuxDone() {
}
func (s *PublishSource) Type() SourceType {
return s.Type_
func (s *PublishSource) GetType() SourceType {
return s.Type
}
func (s *PublishSource) SetType(sourceType SourceType) {
s.Type_ = sourceType
s.Type = sourceType
}
func (s *PublishSource) RemoteAddr() string {
@@ -699,67 +689,30 @@ func (s *PublishSource) RemoteAddr() string {
return s.Conn.RemoteAddr().String()
}
func (s *PublishSource) PrintInfo() string {
return fmt.Sprintf("id:%s type:%s conn:%s ", s.Id_, s.Type_.ToString(), s.RemoteAddr())
}
func (s *PublishSource) StartReceiveDataTimer() {
utils.Assert(s.receiveDataTimer == nil)
utils.Assert(AppConfig.ReceiveTimeout > 0)
s.lastPacketTime = time.Now()
s.receiveDataTimer = time.AfterFunc(time.Duration(AppConfig.ReceiveTimeout), func() {
dis := time.Now().Sub(s.lastPacketTime)
//如果开启Hook通知, 根据响应决定是否关闭Source
//如果通知失败, 或者非200应答, 释放Source
//如果没有开启Hook通知, 直接删除
if dis >= time.Duration(AppConfig.ReceiveTimeout) {
log.Sugar.Errorf("收流超时 source:%s", s.Id_)
response, state := HookReceiveTimeoutEvent(s)
if utils.HookStateOK != state || response == nil {
s.closeCB()
return
}
}
//对精度没要求
s.receiveDataTimer.Reset(time.Duration(AppConfig.ReceiveTimeout))
})
}
func (s *PublishSource) StartIdleTimer() {
utils.Assert(s.idleTimer == nil)
utils.Assert(AppConfig.IdleTimeout > 0)
s.removeSinkTime = time.Now()
s.idleTimer = time.AfterFunc(time.Duration(AppConfig.IdleTimeout), func() {
dis := time.Now().Sub(s.removeSinkTime)
if s.sinkCount < 1 && dis >= time.Duration(AppConfig.IdleTimeout) {
log.Sugar.Errorf("空闲超时 source:%s", s.Id_)
response, state := HookIdleTimeoutEvent(s)
if utils.HookStateOK != state || response == nil {
s.closeCB()
return
}
}
s.idleTimer.Reset(time.Duration(AppConfig.IdleTimeout))
})
func (s *PublishSource) String() string {
return fmt.Sprintf("source: %s type: %s conn: %s ", s.ID, s.Type.ToString(), s.RemoteAddr())
}
func (s *PublishSource) State() SessionState {
return s.state
}
func (s *PublishSource) SetInputCb(cb func(data []byte) error) {
s.inputCB = cb
}
func (s *PublishSource) UrlValues() url.Values {
return s.urlValues
}
func (s *PublishSource) SetUrlValues(values url.Values) {
s.urlValues = values
}
func (s *PublishSource) PostEvent(cb func()) {
s.mainContextEvents <- cb
}
func (s *PublishSource) CreateTime() time.Time {
return s.createTime
}
func (s *PublishSource) SetCreateTime(time time.Time) {
s.createTime = time
}

View File

@@ -5,6 +5,7 @@ import (
"sync"
)
// SourceManager 全局管理所有推流源
var SourceManager *sourceManger
func init() {
@@ -16,9 +17,9 @@ type sourceManger struct {
}
func (s *sourceManger) Add(source Source) error {
_, ok := s.m.LoadOrStore(source.Id(), source)
_, ok := s.m.LoadOrStore(source.GetID(), source)
if ok {
return fmt.Errorf("the source %s has been exist", source.Id())
return fmt.Errorf("the source %s has been exist", source.GetID())
}
return nil
@@ -39,5 +40,16 @@ func (s *sourceManger) Remove(id string) (Source, error) {
return value.(Source), nil
}
return nil, fmt.Errorf("source with id %s was not find", id)
return nil, fmt.Errorf("source with GetID %s was not find", id)
}
func (s *sourceManger) All() []Source {
var all []Source
s.m.Range(func(key, value any) bool {
all = append(all, value.(Source))
return true
})
return all
}

View File

@@ -9,6 +9,40 @@ import (
"github.com/lkmio/lkm/log"
"net/url"
"strings"
"time"
)
// SourceType 推流类型
type SourceType byte
// TransStreamProtocol 输出的流协议
type TransStreamProtocol uint32
// SessionState 推拉流Session的状态
// 包含握手和Hook授权阶段
type SessionState uint32
const (
SourceTypeRtmp = SourceType(1)
SourceType28181 = SourceType(2)
SourceType1078 = SourceType(3)
TransStreamRtmp = TransStreamProtocol(1)
TransStreamFlv = TransStreamProtocol(2)
TransStreamRtsp = TransStreamProtocol(3)
TransStreamHls = TransStreamProtocol(4)
TransStreamRtc = TransStreamProtocol(5)
TransStreamGBStreamForward = TransStreamProtocol(6) // 国标级联转发
)
const (
SessionStateCreate = SessionState(1) //新建状态
SessionStateHandshaking = SessionState(2) //握手中
SessionStateHandshakeFailure = SessionState(3) //握手失败
SessionStateHandshakeDone = SessionState(4) //握手完成
SessionStateWait = SessionState(5) //位于等待队列中
SessionStateTransferring = SessionState(6) //推拉流中
SessionStateClosed = SessionState(7) //关闭状态
)
func (s SourceType) ToString() string {
@@ -23,17 +57,19 @@ func (s SourceType) ToString() string {
panic(fmt.Sprintf("unknown source type %d", s))
}
func (p Protocol) ToString() string {
if ProtocolRtmp == p {
func (p TransStreamProtocol) ToString() string {
if TransStreamRtmp == p {
return "rtmp"
} else if ProtocolFlv == p {
} else if TransStreamFlv == p {
return "flv"
} else if ProtocolRtsp == p {
} else if TransStreamRtsp == p {
return "rtsp"
} else if ProtocolHls == p {
} else if TransStreamHls == p {
return "hls"
} else if ProtocolRtc == p {
} else if TransStreamRtc == p {
return "rtc"
} else if TransStreamGBStreamForward == p {
return "gb_stream_forward"
}
panic(fmt.Sprintf("unknown stream protocol %d", p))
@@ -163,3 +199,91 @@ func ExtractAudioPacket(codec utils.AVCodecID, extractStream bool, data []byte,
return stream, packet, nil
}
// StartReceiveDataTimer 启动收流超时计时器
func StartReceiveDataTimer(source Source) {
utils.Assert(AppConfig.ReceiveTimeout > 0)
var receiveDataTimer *time.Timer
receiveDataTimer = time.AfterFunc(time.Duration(AppConfig.ReceiveTimeout), func() {
dis := time.Now().Sub(source.LastPacketTime())
// 如果开启Hook通知, 根据响应决定是否关闭Source
// 如果通知失败, 或者非200应答, 释放Source
// 如果没有开启Hook通知, 直接删除
if dis >= time.Duration(AppConfig.ReceiveTimeout) {
log.Sugar.Errorf("收流超时 source: %s", source.GetID())
response, state := HookReceiveTimeoutEvent(source)
if utils.HookStateOK != state || response == nil {
source.Close()
return
}
}
// 对精度没要求
receiveDataTimer.Reset(time.Duration(AppConfig.ReceiveTimeout))
})
}
// StartIdleTimer 启动拉流空闲计时器
func StartIdleTimer(source Source) {
utils.Assert(AppConfig.IdleTimeout > 0)
var idleTimer *time.Timer
idleTimer = time.AfterFunc(time.Duration(AppConfig.IdleTimeout), func() {
dis := time.Now().Sub(source.LastStreamEndTime())
if source.SinkCount() < 1 && dis >= time.Duration(AppConfig.IdleTimeout) {
log.Sugar.Errorf("拉流空闲超时 source: %s", source.GetID())
response, state := HookIdleTimeoutEvent(source)
if utils.HookStateOK != state || response == nil {
source.Close()
return
}
}
idleTimer.Reset(time.Duration(AppConfig.IdleTimeout))
})
}
// LoopEvent 循环读取事件
func LoopEvent(source Source) {
for {
select {
case data := <-source.StreamPipe():
if source.IsClosed() {
return
}
if AppConfig.ReceiveTimeout > 0 {
source.SetLastPacketTime(time.Now())
}
if err := source.Input(data); err != nil {
log.Sugar.Errorf("处理输入流失败 释放source:%s err:%s", source.GetID(), err.Error())
source.DoClose()
return
}
if source.IsClosed() {
return
}
break
case event := <-source.MainContextEvents():
if source.IsClosed() {
return
}
event()
if source.IsClosed() {
return
}
break
}
}
}

View File

@@ -5,20 +5,20 @@ import (
"github.com/lkmio/avformat/utils"
)
type TransStreamFactory func(source Source, protocol Protocol, streams []utils.AVStream) (TransStream, error)
type TransStreamFactory func(source Source, protocol TransStreamProtocol, streams []utils.AVStream) (TransStream, error)
type RecordStreamFactory func(source string) (Sink, string, error)
var (
transStreamFactories map[Protocol]TransStreamFactory
transStreamFactories map[TransStreamProtocol]TransStreamFactory
recordStreamFactory RecordStreamFactory
)
func init() {
transStreamFactories = make(map[Protocol]TransStreamFactory, 8)
transStreamFactories = make(map[TransStreamProtocol]TransStreamFactory, 8)
}
func RegisterTransStreamFactory(protocol Protocol, streamFunc TransStreamFactory) {
func RegisterTransStreamFactory(protocol TransStreamProtocol, streamFunc TransStreamFactory) {
_, ok := transStreamFactories[protocol]
if ok {
panic(fmt.Sprintf("%s has been registered", protocol.ToString()))
@@ -27,7 +27,7 @@ func RegisterTransStreamFactory(protocol Protocol, streamFunc TransStreamFactory
transStreamFactories[protocol] = streamFunc
}
func FindTransStreamFactory(protocol Protocol) (TransStreamFactory, error) {
func FindTransStreamFactory(protocol TransStreamProtocol) (TransStreamFactory, error) {
f, ok := transStreamFactories[protocol]
if !ok {
return nil, fmt.Errorf("unknown protocol %s", protocol.ToString())
@@ -36,7 +36,7 @@ func FindTransStreamFactory(protocol Protocol) (TransStreamFactory, error) {
return f, nil
}
func CreateTransStream(source Source, protocol Protocol, streams []utils.AVStream) (TransStream, error) {
func CreateTransStream(source Source, protocol TransStreamProtocol, streams []utils.AVStream) (TransStream, error) {
factory, err := FindTransStreamFactory(protocol)
if err != nil {
return nil, err

View File

@@ -18,9 +18,9 @@ type TransStream interface {
AddSink(sink Sink) error
ExistSink(id SinkId) bool
ExistSink(id SinkID) bool
RemoveSink(id SinkId) (Sink, bool)
RemoveSink(id SinkID) (Sink, bool)
PopAllSink(handler func(sink Sink))
@@ -29,18 +29,21 @@ type TransStream interface {
Close() error
SendPacket(data []byte) error
Protocol() TransStreamProtocol
}
type BaseTransStream struct {
Sinks map[SinkId]Sink
Sinks map[SinkID]Sink
//muxer stream.Muxer
Tracks []utils.AVStream
Completed bool
ExistVideo bool
Protocol_ TransStreamProtocol
}
func (t *BaseTransStream) Init() {
t.Sinks = make(map[SinkId]Sink, 64)
t.Sinks = make(map[SinkID]Sink, 64)
}
func (t *BaseTransStream) Input(packet utils.AVPacket) error {
@@ -56,17 +59,17 @@ func (t *BaseTransStream) AddTrack(stream utils.AVStream) error {
}
func (t *BaseTransStream) AddSink(sink Sink) error {
t.Sinks[sink.Id()] = sink
t.Sinks[sink.GetID()] = sink
sink.Start()
return nil
}
func (t *BaseTransStream) ExistSink(id SinkId) bool {
func (t *BaseTransStream) ExistSink(id SinkID) bool {
_, ok := t.Sinks[id]
return ok
}
func (t *BaseTransStream) RemoveSink(id SinkId) (Sink, bool) {
func (t *BaseTransStream) RemoveSink(id SinkID) (Sink, bool) {
sink, ok := t.Sinks[id]
if ok {
delete(t.Sinks, id)
@@ -100,6 +103,10 @@ func (t *BaseTransStream) SendPacket(data []byte) error {
return nil
}
func (t *BaseTransStream) Protocol() TransStreamProtocol {
return t.Protocol_
}
type TCPTransStream struct {
BaseTransStream
}
@@ -123,7 +130,7 @@ func (t *TCPTransStream) SendPacket(data []byte) error {
}
if _, ok := err.(*transport.ZeroWindowSizeError); ok {
log.Sugar.Errorf("发送超时, 强制断开连接 sink:%s", sink.PrintInfo())
log.Sugar.Errorf("发送超时, 强制断开连接 sink:%s", sink.String())
sink.GetConn().Close()
}
}

View File

@@ -2,10 +2,10 @@ package stream
import "github.com/lkmio/avformat/utils"
// TransStreamId 每个传输流的唯一Id根据输出流协议ID+流包含的音视频编码器ID生成
// TransStreamID 每个传输流的唯一Id根据输出流协议ID+流包含的音视频编码器ID生成
// 输出流协议ID占用高8位
// 每个音视频编译器ID占用8位. 意味着每个输出流至多7路流.
type TransStreamId uint64
type TransStreamID uint64
var (
// AVCodecID转为byte的对应关系
@@ -29,27 +29,27 @@ func init() {
}
}
// GenerateTransStreamId 根据传入的推拉流协议和编码器ID生成StreamId
// GenerateTransStreamID 根据传入的推拉流协议和编码器ID生成StreamId
// 请确保ids根据值升序排序传参
/*func GenerateTransStreamId(protocol Protocol, ids ...utils.AVCodecID) TransStreamId {
/*func GenerateTransStreamID(protocol GetProtocol, ids ...utils.AVCodecID) GetTransStreamID {
len_ := len(ids)
utils.Assert(len_ > 0 && len_ < 8)
var streamId uint64
streamId = uint64(protocol) << 56
for i, id := range ids {
bId, ok := narrowCodecIds[int(id)]
for i, GetID := range ids {
bId, ok := narrowCodecIds[int(GetID)]
utils.Assert(ok)
streamId |= uint64(bId) << (48 - i*8)
}
return TransStreamId(streamId)
return GetTransStreamID(streamId)
}*/
// GenerateTransStreamId 根据输出流协议和输出流包含的音视频编码器ID生成流ID
func GenerateTransStreamId(protocol Protocol, ids ...utils.AVStream) TransStreamId {
// GenerateTransStreamID 根据输出流协议和输出流包含的音视频编码器ID生成流ID
func GenerateTransStreamID(protocol TransStreamProtocol, ids ...utils.AVStream) TransStreamID {
len_ := len(ids)
utils.Assert(len_ > 0 && len_ < 8)
@@ -63,5 +63,5 @@ func GenerateTransStreamId(protocol Protocol, ids ...utils.AVStream) TransStream
streamId |= uint64(bId) << (48 - i*8)
}
return TransStreamId(streamId)
return TransStreamID(streamId)
}