👌 IMPROVE: 导出RTSP插件

This commit is contained in:
dexter
2022-09-04 12:43:42 +08:00
parent 7d08e06922
commit 0501a84da6
3 changed files with 21 additions and 17 deletions

22
main.go
View File

@@ -40,16 +40,16 @@ func (conf *RTSPConfig) OnEvent(event any) {
s.Start() s.Start()
if conf.PullOnStart { if conf.PullOnStart {
for streamPath, url := range conf.PullList { for streamPath, url := range conf.PullList {
if err := plugin.Pull(streamPath, url, new(RTSPPuller), false); err != nil { if err := RTSPPlugin.Pull(streamPath, url, new(RTSPPuller), false); err != nil {
plugin.Error("pull", zap.String("streamPath", streamPath), zap.String("url", url), zap.Error(err)) RTSPPlugin.Error("pull", zap.String("streamPath", streamPath), zap.String("url", url), zap.Error(err))
} }
} }
} }
case SEpublish: case SEpublish:
for streamPath, url := range conf.PushList { for streamPath, url := range conf.PushList {
if streamPath == v.Stream.Path { if streamPath == v.Stream.Path {
if err := plugin.Push(streamPath, url, new(RTSPPusher), false); err != nil { if err := RTSPPlugin.Push(streamPath, url, new(RTSPPusher), false); err != nil {
plugin.Error("push", zap.String("streamPath", streamPath), zap.String("url", url), zap.Error(err)) RTSPPlugin.Error("push", zap.String("streamPath", streamPath), zap.String("url", url), zap.Error(err))
} }
} }
} }
@@ -57,8 +57,8 @@ func (conf *RTSPConfig) OnEvent(event any) {
if conf.PullOnSubscribe { if conf.PullOnSubscribe {
for streamPath, url := range conf.PullList { for streamPath, url := range conf.PullList {
if streamPath == v.Path { if streamPath == v.Path {
if err := plugin.Pull(streamPath, url, new(RTSPPuller), false); err != nil { if err := RTSPPlugin.Pull(streamPath, url, new(RTSPPuller), false); err != nil {
plugin.Error("pull", zap.String("streamPath", streamPath), zap.String("url", url), zap.Error(err)) RTSPPlugin.Error("pull", zap.String("streamPath", streamPath), zap.String("url", url), zap.Error(err))
} }
break break
} }
@@ -73,7 +73,7 @@ var rtspConfig = &RTSPConfig{
RTCPAddr: ":8001", RTCPAddr: ":8001",
ReadBufferSize: 2048, ReadBufferSize: 2048,
} }
var plugin = InstallPlugin(rtspConfig) var RTSPPlugin = InstallPlugin(rtspConfig)
func filterStreams() (ss []*Stream) { func filterStreams() (ss []*Stream) {
Streams.RLock() Streams.RLock()
@@ -92,15 +92,19 @@ func (*RTSPConfig) API_list(w http.ResponseWriter, r *http.Request) {
} }
func (*RTSPConfig) API_Pull(rw http.ResponseWriter, r *http.Request) { func (*RTSPConfig) API_Pull(rw http.ResponseWriter, r *http.Request) {
err := plugin.Pull(r.URL.Query().Get("streamPath"), r.URL.Query().Get("target"), new(RTSPPuller), r.URL.Query().Has("save")) err := RTSPPlugin.Pull(r.URL.Query().Get("streamPath"), r.URL.Query().Get("target"), new(RTSPPuller), r.URL.Query().Has("save"))
if err != nil { if err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest) http.Error(rw, err.Error(), http.StatusBadRequest)
} else {
rw.Write([]byte("ok"))
} }
} }
func (*RTSPConfig) API_Push(rw http.ResponseWriter, r *http.Request) { func (*RTSPConfig) API_Push(rw http.ResponseWriter, r *http.Request) {
err := plugin.Push(r.URL.Query().Get("streamPath"), r.URL.Query().Get("target"), new(RTSPPusher), r.URL.Query().Has("save")) err := RTSPPlugin.Push(r.URL.Query().Get("streamPath"), r.URL.Query().Get("target"), new(RTSPPusher), r.URL.Query().Has("save"))
if err != nil { if err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest) http.Error(rw, err.Error(), http.StatusBadRequest)
} else {
rw.Write([]byte("ok"))
} }
} }

View File

@@ -126,7 +126,7 @@ func (p *RTSPPublisher) SetTracks() error {
// 复用AVCC写入逻辑解析出AAC的配置信息 // 复用AVCC写入逻辑解析出AAC的配置信息
at.WriteAVCC(0, append([]byte{0xAF, 0}, asc...)) at.WriteAVCC(0, append([]byte{0xAF, 0}, asc...))
} else { } else {
plugin.Warn("aac no config") RTSPPlugin.Warn("aac no config")
} }
default: default:
return fmt.Errorf("unsupport codec:%s", keyval[0]) return fmt.Errorf("unsupport codec:%s", keyval[0])

View File

@@ -14,22 +14,22 @@ type RTSPIO struct {
} }
func (conf *RTSPConfig) OnConnOpen(ctx *gortsplib.ServerHandlerOnConnOpenCtx) { func (conf *RTSPConfig) OnConnOpen(ctx *gortsplib.ServerHandlerOnConnOpenCtx) {
plugin.Debug("conn opened") RTSPPlugin.Debug("conn opened")
} }
func (conf *RTSPConfig) OnConnClose(ctx *gortsplib.ServerHandlerOnConnCloseCtx) { func (conf *RTSPConfig) OnConnClose(ctx *gortsplib.ServerHandlerOnConnCloseCtx) {
plugin.Debug("conn closed") RTSPPlugin.Debug("conn closed")
if p, ok := conf.LoadAndDelete(ctx.Conn); ok { if p, ok := conf.LoadAndDelete(ctx.Conn); ok {
p.(IIO).Stop() p.(IIO).Stop()
} }
} }
func (conf *RTSPConfig) OnSessionOpen(ctx *gortsplib.ServerHandlerOnSessionOpenCtx) { func (conf *RTSPConfig) OnSessionOpen(ctx *gortsplib.ServerHandlerOnSessionOpenCtx) {
plugin.Debug("session opened") RTSPPlugin.Debug("session opened")
} }
func (conf *RTSPConfig) OnSessionClose(ctx *gortsplib.ServerHandlerOnSessionCloseCtx) { func (conf *RTSPConfig) OnSessionClose(ctx *gortsplib.ServerHandlerOnSessionCloseCtx) {
plugin.Debug("session closed") RTSPPlugin.Debug("session closed")
if p, ok := conf.LoadAndDelete(ctx.Session); ok { if p, ok := conf.LoadAndDelete(ctx.Session); ok {
p.(IIO).Stop() p.(IIO).Stop()
} }
@@ -37,10 +37,10 @@ func (conf *RTSPConfig) OnSessionClose(ctx *gortsplib.ServerHandlerOnSessionClos
// called after receiving a DESCRIBE request. // called after receiving a DESCRIBE request.
func (conf *RTSPConfig) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) { func (conf *RTSPConfig) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) {
plugin.Debug("describe request") RTSPPlugin.Debug("describe request")
var suber RTSPSubscriber var suber RTSPSubscriber
suber.SetIO(ctx.Conn.NetConn()) suber.SetIO(ctx.Conn.NetConn())
if err := plugin.Subscribe(ctx.Path, &suber); err == nil { if err := RTSPPlugin.Subscribe(ctx.Path, &suber); err == nil {
conf.Store(ctx.Conn, &suber) conf.Store(ctx.Conn, &suber)
return &base.Response{ return &base.Response{
StatusCode: base.StatusOK, StatusCode: base.StatusOK,
@@ -90,7 +90,7 @@ func (conf *RTSPConfig) OnRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base
func (conf *RTSPConfig) OnAnnounce(ctx *gortsplib.ServerHandlerOnAnnounceCtx) (*base.Response, error) { func (conf *RTSPConfig) OnAnnounce(ctx *gortsplib.ServerHandlerOnAnnounceCtx) (*base.Response, error) {
p := &RTSPPublisher{} p := &RTSPPublisher{}
p.SetIO(ctx.Conn.NetConn()) p.SetIO(ctx.Conn.NetConn())
if err := plugin.Publish(ctx.Path, p); err == nil { if err := RTSPPlugin.Publish(ctx.Path, p); err == nil {
p.tracks = ctx.Tracks p.tracks = ctx.Tracks
p.stream = gortsplib.NewServerStream(ctx.Tracks) p.stream = gortsplib.NewServerStream(ctx.Tracks)
if err = p.SetTracks(); err != nil { if err = p.SetTracks(); err != nil {