完善hook通知事件

This commit is contained in:
yangjiechina
2024-06-07 20:46:22 +08:00
parent 2ae2622945
commit e6d7001bdc
18 changed files with 478 additions and 389 deletions

27
api.go
View File

@@ -253,17 +253,6 @@ func (api *ApiServer) generateSourceId(remoteAddr string) stream.SinkId {
return stream.GenerateSinkId(tcpAddr)
}
func (api *ApiServer) doPlay(sink stream.Sink) utils.HookState {
ok := utils.HookStateOK
stream.HookPlaying(sink, func() {
}, func(state utils.HookState) {
ok = state
})
return ok
}
func (api *ApiServer) onFlv(sourceId string, w http.ResponseWriter, r *http.Request) {
ws := true
if !("upgrade" == strings.ToLower(r.Header.Get("Connection"))) {
@@ -291,9 +280,9 @@ func (api *ApiServer) onWSFlv(sourceId string, w http.ResponseWriter, r *http.Re
sink := flv.NewFLVSink(api.generateSinkId(r.RemoteAddr), sourceId, flv.NewWSConn(conn))
log.Sugar.Infof("ws-flv 连接 sink:%s", sink.PrintInfo())
state := api.doPlay(sink)
_, state := stream.PreparePlaySink(sink)
if utils.HookStateOK != state {
log.Sugar.Warnf("ws-flv 播放失败 state:%d sink:%s", state, sink.PrintInfo())
log.Sugar.Warnf("ws-flv 播放失败 sink:%s", sink.PrintInfo())
w.WriteHeader(http.StatusForbidden)
return
}
@@ -330,9 +319,9 @@ func (api *ApiServer) onHttpFLV(sourceId string, w http.ResponseWriter, r *http.
sink := flv.NewFLVSink(api.generateSinkId(r.RemoteAddr), sourceId, conn)
log.Sugar.Infof("http-flv 连接 sink:%s", sink.PrintInfo())
state := api.doPlay(sink)
_, state := stream.PreparePlaySink(sink)
if utils.HookStateOK != state {
log.Sugar.Warnf("http-flv 播放失败 state:%d sink:%s", state, sink.PrintInfo())
log.Sugar.Warnf("http-flv 播放失败 sink:%s", sink.PrintInfo())
w.WriteHeader(http.StatusForbidden)
return
@@ -398,9 +387,9 @@ func (api *ApiServer) onHLS(sourceId string, w http.ResponseWriter, r *http.Requ
done <- 0
})
state := api.doPlay(sink)
_, state := stream.PreparePlaySink(sink)
if utils.HookStateOK != state {
log.Sugar.Warnf("m3u8 请求失败 state:%d sink:%s", state, sink.PrintInfo())
log.Sugar.Warnf("m3u8 请求失败 sink:%s", sink.PrintInfo())
w.WriteHeader(http.StatusForbidden)
return
@@ -460,9 +449,9 @@ func (api *ApiServer) onRtc(sourceId string, w http.ResponseWriter, r *http.Requ
log.Sugar.Infof("rtc 请求 sink:%s sdp:%v", sink.PrintInfo(), v.SDP)
state := api.doPlay(sink)
_, state := stream.PreparePlaySink(sink)
if utils.HookStateOK != state {
log.Sugar.Warnf("rtc 播放失败 state:%d sink:%s", state, sink.PrintInfo())
log.Sugar.Warnf("rtc 播放失败 sink:%s", sink.PrintInfo())
w.WriteHeader(http.StatusForbidden)
group.Done()

View File

@@ -4,6 +4,8 @@
"probe_timeout": 2000,
"mw_latency": 350,
"public_ip": "192.168.2.148",
"idle_timeout": 60,
"receive_timeout":60,
"http": {
"addr": "0.0.0.0:8080"
@@ -49,14 +51,15 @@
},
"hook": {
"enable": true,
"timeout": 10,
"on_publish": "http://localhost:8080/api/v1/live/publish/auth",
"on_publish_done": "http://localhost:8080/api/v1/live/publishdone",
"on_play" : "http://localhost:8080/api/v1/live/play/auth",
"on_play_done" : "http://localhost:8080/api/v1/live/playdone",
"on_publish": "http://localhost:8081/api/v1/on_publish",
"on_publish_done": "http://localhost:8081/api/v1/on_publish_done",
"on_play" : "http://localhost:8081/api/v1/on_play",
"on_play_done" : "http://localhost:8081/api/on_play_done",
"on_record": "",
"on_idle_timeout": "",
"on_recv_timeout": ""
"on_record": "http://localhost:8081/api/v1/on_reocrd",
"on_idle_timeout": "http://localhost:8081/api/v1/on_idle_timeout",
"on_receive_timeout": "http://localhost:8081/api/v1/on_recv_timeout"
}
}

View File

@@ -12,6 +12,7 @@ import (
"github.com/yangjiechina/lkm/log"
"github.com/yangjiechina/lkm/stream"
"net"
"net/http"
)
type TransportType int
@@ -137,10 +138,9 @@ func NewGBSource(id string, ssrc uint32, tcp bool, active bool) (GBSource, uint1
}
source.PrepareTransDeMuxer(id, ssrc)
if err = stream.SourceManager.Add(source); err != nil {
source.Close()
return nil, 0, err
_, state := stream.PreparePublishSource(source, false)
if http.StatusOK != state {
return nil, 0, fmt.Errorf("error code %d", state)
}
source.Init(source.Input)

View File

@@ -211,11 +211,12 @@ func (s *Session) OnJtPTPPacket(data []byte) {
s.rtpPacket = &RtpPacket{}
*s.rtpPacket = packet
s.Publish(s, func() {
//response <- utils.HookStateOK
}, func(state utils.HookState) {
//response <- state
})
_, state := stream.PreparePublishSource(s, true)
if utils.HookStateOK != state {
log.Sugar.Errorf("1078推流失败 source:%s", s.phone)
}
s.Close()
}
//完整包/最后一个分包, 创建AVPacket

16
main.go
View File

@@ -12,6 +12,7 @@ import (
"go.uber.org/zap/zapcore"
"net"
"net/http"
"time"
_ "net/http/pprof"
@@ -25,6 +26,8 @@ func NewDefaultAppConfig() stream.AppConfig_ {
GOPBufferSize: 8196000,
MergeWriteLatency: 350,
PublicIP: "192.168.2.148",
IdleTimeout: int64(60 * time.Second),
ReceiveTimeout: int64(60 * time.Second),
Hls: stream.HlsConfig{
Enable: false,
@@ -74,6 +77,18 @@ func NewDefaultAppConfig() stream.AppConfig_ {
Enable: true,
Addr: "0.0.0.0:1078",
},
Hook: stream.HookConfig{
Enable: true,
Timeout: int64(60 * time.Second),
OnPublishUrl: "http://localhost:8082/api/v1/on_publish",
OnPublishDoneUrl: "http://localhost:8082/api/v1/on_publish_done",
OnPlayUrl: "http://localhost:8082/api/v1/on_play",
OnPlayDoneUrl: "http://localhost:8082/api/on_play_done",
OnRecordUrl: "http://localhost:8082/api/v1/on_reocrd",
OnIdleTimeoutUrl: "http://localhost:8082/api/v1/on_idle_timeout",
OnReceiveTimeoutUrl: "http://localhost:8082/api/v1/on_recv_timeout",
},
}
}
@@ -85,6 +100,7 @@ func init() {
stream.RegisterTransStreamFactory(stream.ProtocolRtc, rtc.TransStreamFactory)
stream.AppConfig = NewDefaultAppConfig()
stream.InitHookUrl()
//初始化日志
log.InitLogger(zapcore.Level(stream.AppConfig.Log.Level), stream.AppConfig.Log.Name, stream.AppConfig.Log.MaxSize, stream.AppConfig.Log.MaxBackup, stream.AppConfig.Log.MaxAge, stream.AppConfig.Log.Compress)

View File

@@ -38,17 +38,18 @@ func (s *Session) OnPublish(app, stream_ string, response chan utils.HookState)
s.stack.SetOnPublishHandler(source)
//推流事件Source统一处理, 是否已经存在, Hook回调....
source.Publish(source, func() {
_, state := stream.PreparePublishSource(source, true)
if utils.HookStateOK != state {
log.Sugar.Errorf("rtmp推流失败 source:%s", sourceId)
} else {
s.handle = source
s.isPublisher = true
source.Init(source.Input)
go source.LoopEvent()
}
response <- utils.HookStateOK
}, func(state utils.HookState) {
response <- state
})
}
func (s *Session) OnPlay(app, stream_ string, response chan utils.HookState) {
@@ -58,12 +59,14 @@ func (s *Session) OnPlay(app, stream_ string, response chan utils.HookState) {
log.Sugar.Infof("rtmp onplay app:%s stream:%s sink:%v conn:%s", app, stream_, sink.Id(), s.conn.RemoteAddr().String())
stream.HookPlaying(sink, func() {
_, state := stream.PreparePlaySink(sink)
if utils.HookStateOK != state {
log.Sugar.Errorf("rtmp拉流失败 source:%s sink:%s", sourceId, sink.Id())
} else {
s.handle = sink
response <- utils.HookStateOK
}, func(state utils.HookState) {
}
response <- state
})
}
func (s *Session) Input(conn net.Conn, data []byte) error {

View File

@@ -138,14 +138,7 @@ func (h handler) OnDescribe(request Request) (*http.Response, []byte, error) {
request.session.response(response, []byte(sdp))
})
code := utils.HookStateOK
stream.HookPlaying(sink_, func() {
}, func(state utils.HookState) {
code = state
})
_, code := stream.PreparePlaySink(sink_)
if utils.HookStateOK != code {
return nil, nil, fmt.Errorf("hook failed. code:%d", code)
}

View File

@@ -1,6 +1,8 @@
package stream
import "strings"
import (
"strings"
)
const (
DefaultMergeWriteLatency = 350
@@ -8,7 +10,7 @@ const (
type TransportConfig struct {
Transport string //"UDP|TCP"
Port [2]uint16 //单端口模式[0]=port/多端口模式[0]=start port, [0]=end port.
Port [2]uint16 //单端口-1个元素/多端口-2个元素
}
type RtmpConfig struct {
@@ -21,7 +23,7 @@ type RtspConfig struct {
Addr string
Enable bool `json:"enable"`
Password string
Password string `json:"password"`
}
type RecordConfig struct {
@@ -30,10 +32,10 @@ type RecordConfig struct {
}
type HlsConfig struct {
Enable bool
Dir string
Duration int
PlaylistLength int
Enable bool `json:"enable"`
Dir string `json:"dir"`
Duration int `json:"duration"`
PlaylistLength int `json:"playlist_length"`
}
type LogConfig struct {
@@ -46,18 +48,18 @@ type LogConfig struct {
}
type HttpConfig struct {
Enable bool
Addr string
Enable bool `json:"enable"`
Addr string `json:"addr"`
}
type GB28181Config struct {
TransportConfig
Addr string
Addr string `json:"addr"`
}
type JT1078Config struct {
Enable bool
Addr string
Enable bool `json:"enable"`
Addr string `json:"addr"`
}
func (g TransportConfig) EnableTCP() bool {
@@ -91,43 +93,43 @@ func (c HlsConfig) TSFormat(sourceId string, tsSeq string) string {
}
type HookConfig struct {
Time int
Enable bool `json:"enable"`
OnPublish string `json:"on_publish"` //推流回调
OnPublishDone string `json:"on_publish_done"` //推流结束回调
OnPlay string `json:"on_play"` //拉流回调
OnPlayDone string `json:"on_play_done"` //拉流结束回调
OnRecord string `json:"on_record"` //录制流回调
OnIdleTimeout string `json:"on_idle_timeout"` //多久没有sink拉流回调
OnRecvTimeout string `json:"on_recv_timeout"` //多久没有流回调
Timeout int64 `json:"timeout"`
OnPublishUrl string `json:"on_publish"` //推流回调
OnPublishDoneUrl string `json:"on_publish_done"` //推流结束回调
OnPlayUrl string `json:"on_play"` //拉流回调
OnPlayDoneUrl string `json:"on_play_done"` //拉流结束回调
OnRecordUrl string `json:"on_record"` //录制流回调
OnIdleTimeoutUrl string `json:"on_idle_timeout"` //多久没有sink拉流回调
OnReceiveTimeoutUrl string `json:"on_receive_timeout"` //多久没有推流回调
}
func (hook *HookConfig) EnableOnPublish() bool {
return hook.OnPublish != ""
func (hook *HookConfig) EnablePublishEvent() bool {
return hook.Enable && hook.OnPublishUrl != ""
}
func (hook *HookConfig) EnableOnPublishDone() bool {
return hook.OnPublishDone != ""
return hook.Enable && hook.OnPublishDoneUrl != ""
}
func (hook *HookConfig) EnableOnPlay() bool {
return hook.OnPlay != ""
return hook.Enable && hook.OnPlayUrl != ""
}
func (hook *HookConfig) EnableOnPlayDone() bool {
return hook.OnPlayDone != ""
return hook.Enable && hook.OnPlayDoneUrl != ""
}
func (hook *HookConfig) EnableOnRecord() bool {
return hook.OnRecord != ""
return hook.Enable && hook.OnRecordUrl != ""
}
func (hook *HookConfig) EnableOnIdleTimeout() bool {
return hook.OnIdleTimeout != ""
return hook.Enable && hook.OnIdleTimeoutUrl != ""
}
func (hook *HookConfig) EnableOnRecvTimeout() bool {
return hook.OnRecvTimeout != ""
func (hook *HookConfig) EnableOnReceiveTimeout() bool {
return hook.Enable && hook.OnReceiveTimeoutUrl != ""
}
var AppConfig AppConfig_
@@ -142,23 +144,19 @@ type AppConfig_ struct {
GOPBufferSize int `json:"gop_buffer_size"` //预估GOPBuffer大小, AVPacket缓存池和合并写缓存池都会参考此大小
ProbeTimeout int `json:"probe_timeout"`
PublicIP string `json:"public_ip"`
IdleTimeout int64 `json:"idle_timeout"` //多长时间没有sink拉流, 单位秒
ReceiveTimeout int64 `json:"receive_timeout"` //多长时间没有收到流, 单位秒
//缓存指定时长的包满了之后才发送给Sink. 可以降低用户态和内核态的交互频率,大幅提升性能.
//合并写的大小范围应当大于一帧的时长不超过一组GOP的时长在实际发送流的时候也会遵循此条例.
MergeWriteLatency int `json:"mw_latency"`
Rtmp RtmpConfig
Rtsp RtspConfig
Hook HookConfig
Record RecordConfig
Hls HlsConfig
Log LogConfig
Http HttpConfig
GB28181 GB28181Config
JT1078 JT1078Config
}

View File

@@ -4,24 +4,10 @@ import (
"bytes"
"encoding/json"
"fmt"
"github.com/yangjiechina/avformat/utils"
"net/http"
"time"
)
type HookFunc func(m map[string]interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error
type HookEvent int
const (
HookEventPublish = HookEvent(0x1)
HookEventPublishDone = HookEvent(0x2)
HookEventPlay = HookEvent(0x3)
HookEventPlayDone = HookEvent(0x4)
HookEventRecord = HookEvent(0x5)
HookEventIdleTimeout = HookEvent(0x6)
HookEventRecvTimeout = HookEvent(0x7)
)
// 每个通知的时间都需要携带的字段
type eventInfo struct {
stream string //stream id
@@ -29,107 +15,41 @@ type eventInfo struct {
remoteAddr string //peer地址
}
func NewPlayHookEventInfo(stream, remoteAddr string, protocol Protocol) eventInfo {
return eventInfo{stream: stream, protocol: protocol.ToString(), remoteAddr: remoteAddr}
func NewHookPlayEventInfo(sink Sink) eventInfo {
return eventInfo{stream: sink.SourceId(), protocol: sink.Protocol().ToString(), remoteAddr: sink.PrintInfo()}
}
func NewHookPublishEventInfo(source Source) eventInfo {
return eventInfo{stream: source.Id(), protocol: source.Type().ToString(), remoteAddr: source.RemoteAddr()}
}
func NewPublishHookEventInfo(stream, remoteAddr string, protocol SourceType) eventInfo {
return eventInfo{stream: stream, protocol: protocol.ToString(), remoteAddr: remoteAddr}
}
type HookHandler interface {
Play(success func(), failure func(state utils.HookState))
PlayDone(success func(), failure func(state utils.HookState))
}
type HookSession interface {
send(url string, body interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error
Hook(event HookEvent, body interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error
}
var hookUrls map[HookEvent]string
func init() {
hookUrls = map[HookEvent]string{
HookEventPublish: "",
HookEventPublishDone: "",
HookEventPlay: "",
HookEventPlayDone: "",
HookEventRecord: "",
HookEventIdleTimeout: "",
HookEventRecvTimeout: "",
}
}
func sendHookEvent(url string, body interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error {
func sendHookEvent(url string, body interface{}) (*http.Response, error) {
marshal, err := json.Marshal(body)
if err != nil {
return err
return nil, err
}
client := &http.Client{
Timeout: time.Second * time.Duration(AppConfig.Hook.Time),
Timeout: time.Duration(AppConfig.Hook.Timeout),
}
request, err := http.NewRequest("post", url, bytes.NewBuffer(marshal))
if err != nil {
return err
return nil, err
}
request.Header.Set("Content-Type", "application/json")
response, err := client.Do(request)
if err != nil {
failure(response, err)
} else if response.StatusCode != http.StatusOK {
failure(response, fmt.Errorf("code:%d reason:%s", response.StatusCode, response.Status))
} else {
success(response)
}
return nil
return client.Do(request)
}
func hookEvent(event HookEvent, body interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error {
url := hookUrls[event]
if url == "" {
success(nil)
return nil
func Hook(event HookEvent, body interface{}) (*http.Response, error) {
url, ok := hookUrls[event]
if url == "" || !ok {
return nil, fmt.Errorf("the url for this %s event does not exist", event.ToString())
}
return sendHookEvent(url, body, success, failure)
}
type hookSession struct {
}
func (h *hookSession) send(url string, body interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error {
marshal, err := json.Marshal(body)
if err != nil {
return err
}
client := &http.Client{
Timeout: time.Second * time.Duration(AppConfig.Hook.Time),
}
request, err := http.NewRequest("post", url, bytes.NewBuffer(marshal))
if err != nil {
return err
}
request.Header.Set("Content-Type", "application/json")
response, err := client.Do(request)
if err != nil {
failure(response, err)
} else if response.StatusCode != http.StatusOK {
failure(response, fmt.Errorf("code:%d reason:%s", response.StatusCode, response.Status))
} else {
success(response)
}
return sendHookEvent(url, body, success, failure)
}
func (h *hookSession) Hook(event HookEvent, body interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error {
return hookEvent(event, body, success, failure)
response, err := sendHookEvent(url, body)
if err != nil && http.StatusOK != response.StatusCode {
return response, fmt.Errorf("code:%d reason:%s", response.StatusCode, response.Status)
}
return response, err
}

51
stream/hook_event.go Normal file
View File

@@ -0,0 +1,51 @@
package stream
import "fmt"
type HookEvent int
const (
HookEventPublish = HookEvent(0x1)
HookEventPublishDone = HookEvent(0x2)
HookEventPlay = HookEvent(0x3)
HookEventPlayDone = HookEvent(0x4)
HookEventRecord = HookEvent(0x5)
HookEventIdleTimeout = HookEvent(0x6)
HookEventReceiveTimeout = HookEvent(0x7)
)
var (
hookUrls map[HookEvent]string
)
func InitHookUrl() {
hookUrls = map[HookEvent]string{
HookEventPublish: AppConfig.Hook.OnPublishUrl,
HookEventPublishDone: AppConfig.Hook.OnPublishDoneUrl,
HookEventPlay: AppConfig.Hook.OnPlayUrl,
HookEventPlayDone: AppConfig.Hook.OnPlayDoneUrl,
HookEventRecord: AppConfig.Hook.OnRecordUrl,
HookEventIdleTimeout: AppConfig.Hook.OnIdleTimeoutUrl,
HookEventReceiveTimeout: AppConfig.Hook.OnReceiveTimeoutUrl,
}
}
func (h *HookEvent) ToString() string {
if HookEventPublish == *h {
return "publish"
} else if HookEventPublishDone == *h {
return "publish done"
} else if HookEventPlay == *h {
return "play"
} else if HookEventPlayDone == *h {
return "play done"
} else if HookEventRecord == *h {
return "record"
} else if HookEventIdleTimeout == *h {
return "idle timeout"
} else if HookEventReceiveTimeout == *h {
return "receive timeout"
}
panic(fmt.Sprintf("unknow hook type %d", h))
}

60
stream/hook_sink.go Normal file
View File

@@ -0,0 +1,60 @@
package stream
import (
"github.com/yangjiechina/avformat/utils"
"github.com/yangjiechina/lkm/log"
"net/http"
)
func PreparePlaySink(sink Sink) (*http.Response, utils.HookState) {
var response *http.Response
if AppConfig.Hook.EnableOnPlay() {
hook, err := Hook(HookEventPlay, NewHookPlayEventInfo(sink))
if err != nil {
log.Sugar.Errorf("通知播放事件失败 err:%s sink:%s-%v source:%s", err.Error(), sink.Protocol().ToString(), sink.Id(), sink.SourceId())
return hook, utils.HookStateFailure
}
response = hook
}
source := SourceManager.Find(sink.SourceId())
if source == nil {
log.Sugar.Infof("添加sink到等待队列 sink:%s-%v source:%s", sink.Protocol().ToString(), sink.Id(), sink.SourceId())
{
sink.Lock()
defer sink.UnLock()
if SessionStateClose == sink.State() {
log.Sugar.Warnf("添加到sink到等待队列失败, sink已经断开链接 %s", sink.Id())
return response, utils.HookStateFailure
} else {
sink.SetState(SessionStateWait)
AddSinkToWaitingQueue(sink.SourceId(), sink)
}
}
} else {
source.AddEvent(SourceEventPlay, sink)
}
return response, utils.HookStateOK
}
func HookPlayDoneEvent(sink Sink) (*http.Response, bool) {
var response *http.Response
if AppConfig.Hook.EnableOnPlayDone() {
hook, err := Hook(HookEventPlay, NewHookPlayEventInfo(sink))
if err != nil {
log.Sugar.Errorf("通知播放结束事件失败 err:%s sink:%s-%v source:%s", err.Error(), sink.Protocol().ToString(), sink.Id(), sink.SourceId())
return hook, false
}
response = hook
}
return response, true
}

78
stream/hook_source.go Normal file
View File

@@ -0,0 +1,78 @@
package stream
import (
"github.com/yangjiechina/avformat/utils"
"github.com/yangjiechina/lkm/log"
"net/http"
)
func PreparePublishSource(source Source, hook bool) (*http.Response, utils.HookState) {
var response *http.Response
if hook && AppConfig.Hook.EnablePublishEvent() {
rep, state := HookPublishEvent(source)
if utils.HookStateOK != state {
return rep, state
}
response = rep
}
if err := SourceManager.Add(source); err != nil {
log.Sugar.Errorf("添加源失败 source:%s err:%s", source.Id(), err.Error())
return nil, utils.HookStateOccupy
}
if AppConfig.ReceiveTimeout > 0 {
source.StartReceiveDataTimer()
}
if AppConfig.IdleTimeout > 0 {
source.StartIdleTimer()
}
return response, utils.HookStateOK
}
func HookPublishEvent(source Source) (*http.Response, utils.HookState) {
var response *http.Response
if AppConfig.Hook.EnablePublishEvent() {
hook, err := Hook(HookEventPublish, NewHookPublishEventInfo(source))
if err != nil {
log.Sugar.Errorf("通知推流事件失败 source:%s err:%s", source.Id(), err.Error())
return hook, utils.HookStateFailure
}
response = hook
}
return response, utils.HookStateOK
}
func HookPublishDoneEvent(source Source) {
if AppConfig.Hook.EnablePublishEvent() {
_, err := Hook(HookEventPublishDone, NewHookPublishEventInfo(source))
if err != nil {
log.Sugar.Errorf("通知推流结束事件失败 source:%s err:%s", source.Id(), err.Error())
}
}
}
func HookReceiveTimeoutEvent(source Source) {
if AppConfig.Hook.EnableOnReceiveTimeout() {
_, err := Hook(HookEventReceiveTimeout, NewHookPublishEventInfo(source))
if err != nil {
log.Sugar.Errorf("通知收流超时事件失败 source:%s err:%s", source.Id(), err.Error())
}
}
}
func HookIdleTimeoutEvent(source Source) {
if AppConfig.Hook.EnableOnIdleTimeout() {
_, err := Hook(HookEventIdleTimeout, NewHookPublishEventInfo(source))
if err != nil {
log.Sugar.Errorf("通知空闲超时时间失败 source:%s err:%s", source.Id(), err.Error())
}
}
}

View File

@@ -1,21 +1,42 @@
package stream
import (
"fmt"
"net/http"
"testing"
"time"
)
func TestHookServer(t *testing.T) {
http.HandleFunc("/api/v1/live/publish/auth", func(writer http.ResponseWriter, request *http.Request) {
if true {
//模拟各种多个情况对推拉流的影响
random := false
i := 1
http.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) {
println(fmt.Sprintf("接收到请求 path:%s", request.URL.Path))
if !random {
writer.WriteHeader(http.StatusOK)
} else {
writer.WriteHeader(http.StatusNonAuthoritativeInfo)
return
}
switch i {
case 1:
writer.WriteHeader(http.StatusOK)
break
case 2:
writer.WriteHeader(http.StatusNonAuthoritativeInfo)
break
case 3:
time.Sleep(5 * time.Second)
break
case 4:
time.Sleep(20 * time.Second)
break
}
i = i%5 + 1
})
err := http.ListenAndServe(":8080", nil)
err := http.ListenAndServe(":8082", nil)
if err != nil {
panic(err)
}

View File

@@ -146,13 +146,13 @@ func (m *memoryPool) Reset() {
func (m *memoryPool) FreeHead() {
utils.Assert(!m.marked)
utils.Assert(!m.blockQueue.IsEmpty())
if m.discardBlockCount > 1 {
if m.discardBlockCount > 0 {
m.discardBlockCount--
return
}
utils.Assert(!m.blockQueue.IsEmpty())
size := m.blockQueue.Pop().(int)
m.head += size
@@ -162,22 +162,30 @@ func (m *memoryPool) FreeHead() {
} else if m.head >= m.capacity {
m.head = 0
}
if m.blockQueue.IsEmpty() {
m.markIndex = 0
}
}
func (m *memoryPool) FreeTail() {
utils.Assert(!m.marked)
utils.Assert(!m.blockQueue.IsEmpty())
if m.discardBlockCount > 1 {
if m.discardBlockCount > 0 {
m.discardBlockCount--
return
}
utils.Assert(!m.blockQueue.IsEmpty())
size := m.blockQueue.PopBack().(int)
m.tail -= size
if m.tail == 0 && !m.blockQueue.IsEmpty() {
m.tail = m.capacity
}
if m.blockQueue.IsEmpty() {
m.markIndex = 0
}
}
func (m *memoryPool) Data() ([]byte, []byte) {

View File

@@ -1,17 +0,0 @@
package stream
import (
"github.com/yangjiechina/avformat/utils"
)
type SourceHook interface {
Publish(source Source, success func(), failure func(state utils.HookState))
PublishDone(source Source, success func(), failure func(state utils.HookState))
}
type SinkHook interface {
Play(sink Sink, success func(), failure func(state utils.HookState))
PlayDone(source Sink, success func(), failure func(state utils.HookState))
}

View File

@@ -47,6 +47,8 @@ type Sink interface {
PrintInfo() string
RemoteAddr() string
// Lock Sink请求拉流->Source推流->Sink断开整个阶段, 是无锁线程安全
//如果Sink在等待队列-Sink断开, 这个过程是非线程安全的
//所以Source在AddSink时, SessionStateWait状态时, 需要加锁保护.
@@ -76,8 +78,6 @@ func GenerateSinkId(addr net.Addr) SinkId {
}
type BaseSink struct {
hookSession
Id_ SinkId
SourceId_ string
Protocol_ Protocol
@@ -202,10 +202,17 @@ func (s *BaseSink) Close() {
} else if s.State_ == SessionStateWait {
RemoveSinkFromWaitingQueue(s.SourceId_, s.Id_)
//拉流结束事件, 在等待队列直接发送通知, 在拉流由Source负责发送.
HookPlayingDone(s, nil, nil)
HookPlayDoneEvent(s)
}
}
func (s *BaseSink) PrintInfo() string {
return fmt.Sprintf("%s-%v source:%s", s.Protocol().ToString(), s.Id_, s.SourceId_)
}
func (s *BaseSink) RemoteAddr() string {
if s.Conn == nil {
return ""
}
return s.Conn.RemoteAddr().String()
}

View File

@@ -1,94 +0,0 @@
package stream
import (
"github.com/yangjiechina/avformat/utils"
"github.com/yangjiechina/lkm/log"
"net/http"
)
func HookPlaying(s Sink, success func(), failure func(state utils.HookState)) {
f := func() {
source := SourceManager.Find(s.SourceId())
if source == nil {
log.Sugar.Infof("添加sink到等待队列 sink:%s-%v source:%s", s.Protocol().ToString(), s.Id(), s.SourceId())
{
s.Lock()
defer s.UnLock()
if SessionStateClose == s.State() {
log.Sugar.Warnf("添加到sink到等待队列失败, sink已经断开链接 %s", s.PrintInfo())
} else {
s.SetState(SessionStateWait)
AddSinkToWaitingQueue(s.SourceId(), s)
}
}
} else {
log.Sugar.Debugf("发送播放事件 sink:%s-%v source:%s", s.Protocol().ToString(), s.Id(), s.SourceId())
source.AddEvent(SourceEventPlay, s)
}
}
if !AppConfig.Hook.EnableOnPlay() {
f()
if success != nil {
success()
}
return
}
err := hookEvent(HookEventPlay, NewPlayHookEventInfo(s.SourceId(), "", s.Protocol()), func(response *http.Response) {
f()
if success != nil {
success()
}
}, func(response *http.Response, err error) {
log.Sugar.Errorf("Hook播放事件响应失败 err:%s sink:%s-%v source:%s", err.Error(), s.Protocol().ToString(), s.Id(), s.SourceId())
if failure != nil {
failure(utils.HookStateFailure)
}
})
if err != nil {
log.Sugar.Errorf("Hook播放事件发送失败 err:%s sink:%s-%v source:%s", err.Error(), s.Protocol().ToString(), s.Id(), s.SourceId())
if failure != nil {
failure(utils.HookStateFailure)
}
return
}
}
func HookPlayingDone(s Sink, success func(), failure func(state utils.HookState)) {
if !AppConfig.Hook.EnableOnPlayDone() {
if success != nil {
success()
}
return
}
err := hookEvent(HookEventPlayDone, NewPlayHookEventInfo(s.SourceId(), "", s.Protocol()), func(response *http.Response) {
if success != nil {
success()
}
}, func(response *http.Response, err error) {
log.Sugar.Errorf("Hook播放结束事件响应失败 err:%s sink:%s-%v source:%s", err.Error(), s.Protocol().ToString(), s.Id(), s.SourceId())
if failure != nil {
failure(utils.HookStateFailure)
}
})
if err != nil {
log.Sugar.Errorf("Hook播放结束事件发送失败 err:%s sink:%s-%v source:%s", err.Error(), s.Protocol().ToString(), s.Id(), s.SourceId())
if failure != nil {
failure(utils.HookStateFailure)
}
return
}
}

View File

@@ -3,8 +3,8 @@ package stream
import (
"fmt"
"github.com/yangjiechina/lkm/log"
"math"
"net"
"net/http"
"time"
"github.com/yangjiechina/avformat/stream"
@@ -107,14 +107,20 @@ type Source interface {
// OnDeMuxDone 所有流解析完毕回调
OnDeMuxDone()
Init(input func(data []byte) error)
LoopEvent()
Init(input func(data []byte) error)
RemoteAddr() string
// StartReceiveDataTimer 启动收流超时计时器
StartReceiveDataTimer()
// StartIdleTimer 启动拉流空闲计时器
StartIdleTimer()
}
type PublishSource struct {
hookSession
Id_ string
Type_ SourceType
state SessionState
@@ -141,12 +147,20 @@ type PublishSource struct {
//sink的拉流和断开拉流事件都通过管道交给Source处理. 意味着Source内部解析流、封装流、传输流都可以做到无锁操作
//golang的管道是有锁的(https://github.com/golang/go/blob/d38f1d13fa413436d38d86fe86d6a146be44bb84/src/runtime/chan.go#L202), 后面使用cas队列传输事件, 并且可以做到一次读取多个事件
inputEvent chan []byte
responseEvent chan byte //解析完input的数据后才能继续从网络io中读取流
closeEvent chan byte
inputDataEvent chan []byte
dataConsumedEvent chan byte //解析完input的数据后才能继续从网络io中读取流
closedEvent chan byte
playingEventQueue chan Sink
playingDoneEventQueue chan Sink
probeTimoutEvent chan bool
receiveDataTimeoutEvent chan byte
idleTimeoutEvent chan byte
lastPacketTime time.Time
removeSinkTime time.Time
receiveDataTimer *time.Timer
idleTimer *time.Timer
sinkCount int
}
func (s *PublishSource) Id() string {
@@ -160,13 +174,20 @@ func (s *PublishSource) Init(input func(data []byte) error) {
s.SetState(SessionStateTransferring)
//收流和网络断开的chan都阻塞执行
s.inputEvent = make(chan []byte)
s.responseEvent = make(chan byte)
s.closeEvent = make(chan byte)
s.inputDataEvent = make(chan []byte)
s.dataConsumedEvent = make(chan byte)
s.closedEvent = make(chan byte)
s.playingEventQueue = make(chan Sink, 128)
s.playingDoneEventQueue = make(chan Sink, 128)
s.probeTimoutEvent = make(chan bool)
if AppConfig.ReceiveTimeout > 0 {
s.receiveDataTimeoutEvent = make(chan byte)
}
if AppConfig.IdleTimeout > 0 {
s.idleTimeoutEvent = make(chan byte)
}
if s.transStreams == nil {
s.transStreams = make(map[TransStreamId]TransStream, 10)
}
@@ -199,11 +220,10 @@ func (s *PublishSource) FindOrCreatePacketBuffer(index int, mediaType utils.AVMe
s.pktBuffers[index] = NewRbMemoryPool(48000 * 64)
} else if AppConfig.GOPCache {
//开启GOP缓存
//以每秒钟4M码率大小创建视频内存池
s.pktBuffers[index] = NewRbMemoryPool(AppConfig.GOPBufferSize)
} else {
//未开启GOP缓存
//以每秒钟4M的1/8码率大小创建视频内存池
//1M缓存大小, 单帧绰绰有余
s.pktBuffers[index] = NewRbMemoryPool(1024 * 1000)
}
}
@@ -214,13 +234,17 @@ func (s *PublishSource) FindOrCreatePacketBuffer(index int, mediaType utils.AVMe
func (s *PublishSource) LoopEvent() {
for {
select {
case data := <-s.inputEvent:
case data := <-s.inputDataEvent:
if AppConfig.ReceiveTimeout > 0 {
s.lastPacketTime = time.Now()
}
if err := s.Input_(data); err != nil {
log.Sugar.Errorf("处理输入流失败 释放source:%s err:%s", s.Id_, err.Error())
s.Close()
}
s.responseEvent <- 0
s.dataConsumedEvent <- 0
break
case sink := <-s.playingEventQueue:
if !s.completed {
@@ -234,12 +258,22 @@ func (s *PublishSource) LoopEvent() {
case sink := <-s.playingDoneEventQueue:
s.RemoveSink(sink)
break
case _ = <-s.closeEvent:
case _ = <-s.closedEvent:
s.Close()
return
case _ = <-s.probeTimoutEvent:
s.writeHeader()
break
case _ = <-s.receiveDataTimeoutEvent:
log.Sugar.Errorf("收流超时 source:%s", s.Id_)
s.Close()
HookReceiveTimeoutEvent(s)
break
case _ = <-s.idleTimeoutEvent:
log.Sugar.Errorf("空闲超时 source:%s", s.Id_)
s.Close()
HookIdleTimeoutEvent(s)
break
}
}
}
@@ -350,6 +384,8 @@ func (s *PublishSource) AddSink(sink Sink) bool {
sink.SetState(SessionStateTransferring)
}
s.sinkCount++
//新的传输流,发送缓存的音视频帧
if !ok && AppConfig.GOPCache && s.existVideo {
s.gopBuffer.PeekAll(func(packet utils.AVPacket) {
@@ -367,7 +403,9 @@ func (s *PublishSource) RemoveSink(sink Sink) bool {
//如果从传输流没能删除sink, 再从等待队列删除
_, b := transStream.RemoveSink(sink.Id())
if b {
HookPlayingDone(sink, nil, nil)
s.sinkCount--
s.removeSinkTime = time.Now()
HookPlayDoneEvent(sink)
return true
}
}
@@ -378,14 +416,14 @@ func (s *PublishSource) RemoveSink(sink Sink) bool {
func (s *PublishSource) AddEvent(event SourceEvent, data interface{}) {
if SourceEventInput == event {
s.inputEvent <- data.([]byte)
<-s.responseEvent
s.inputDataEvent <- data.([]byte)
<-s.dataConsumedEvent
} else if SourceEventPlay == event {
s.playingEventQueue <- data.(Sink)
} else if SourceEventPlayDone == event {
s.playingDoneEventQueue <- data.(Sink)
} else if SourceEventClose == event {
s.closeEvent <- 0
s.closedEvent <- 0
}
}
@@ -398,11 +436,24 @@ func (s *PublishSource) Close() {
if s.gopBuffer != nil {
s.gopBuffer.Clear()
}
if s.probeTimer != nil {
s.probeTimer.Stop()
}
if s.receiveDataTimer != nil {
s.receiveDataTimer.Stop()
}
if s.idleTimer != nil {
s.idleTimer.Stop()
}
//释放解复用器
//释放转码器
//释放每路转协议流, 将所有sink添加到等待队列
_, _ = SourceManager.Remove(s.Id_)
_, err := SourceManager.Remove(s.Id_)
if err != nil {
log.Sugar.Errorf("删除源失败 source:%s err:%s", s.Id_, err.Error())
}
for _, transStream := range s.transStreams {
transStream.Close()
@@ -421,6 +472,7 @@ func (s *PublishSource) Close() {
})
}
HookPublishDoneEvent(s)
s.transStreams = nil
}
@@ -517,46 +569,46 @@ func (s *PublishSource) OnDeMuxDone() {
}
func (s *PublishSource) Publish(source Source, success func(), failure func(state utils.HookState)) {
//streamId 已经被占用
if source_ := SourceManager.Find(source.Id()); source_ != nil {
fmt.Printf("推流已经占用 Source:%s", source.Id())
failure(utils.HookStateOccupy)
}
if !AppConfig.Hook.EnableOnPublish() {
if err := SourceManager.Add(source); err == nil {
success()
} else {
fmt.Printf("添加失败 Source:%s", source.Id())
failure(utils.HookStateOccupy)
}
return
}
err := s.Hook(HookEventPublish, NewPublishHookEventInfo(source.Id(), "", source.Type()),
func(response *http.Response) {
if err := SourceManager.Add(source); err == nil {
success()
} else {
failure(utils.HookStateOccupy)
}
}, func(response *http.Response, err error) {
failure(utils.HookStateFailure)
})
//hook地址连接失败
if err != nil {
failure(utils.HookStateFailure)
return
}
}
func (s *PublishSource) PublishDone(source Source, success func(), failure func(state utils.HookState)) {
}
func (s *PublishSource) Type() SourceType {
return s.Type_
}
func (s *PublishSource) RemoteAddr() string {
if s.Conn == nil {
return ""
}
return s.Conn.RemoteAddr().String()
}
func (s *PublishSource) StartReceiveDataTimer() {
utils.Assert(s.receiveDataTimer == nil)
utils.Assert(AppConfig.ReceiveTimeout > 0)
s.lastPacketTime = time.Now()
s.receiveDataTimer = time.AfterFunc(time.Duration(AppConfig.ReceiveTimeout), func() {
dis := time.Now().Sub(s.lastPacketTime)
if dis >= time.Duration(AppConfig.ReceiveTimeout) {
s.receiveDataTimeoutEvent <- 0
} else {
s.receiveDataTimer.Reset(time.Duration(math.Abs(float64(time.Duration(AppConfig.ReceiveTimeout) - dis))))
}
})
}
func (s *PublishSource) StartIdleTimer() {
utils.Assert(s.idleTimer == nil)
utils.Assert(AppConfig.IdleTimeout > 0)
s.removeSinkTime = time.Now()
s.idleTimer = time.AfterFunc(time.Duration(AppConfig.IdleTimeout), func() {
dis := time.Now().Sub(s.removeSinkTime)
if s.sinkCount < 1 && dis >= time.Duration(AppConfig.IdleTimeout) {
s.idleTimeoutEvent <- 0
} else {
s.idleTimer.Reset(time.Duration(math.Abs(float64(AppConfig.IdleTimeout - int64(dis)))))
}
})
}