From e72f05f1d9dfc250f6d1be9bc91867961350b34e Mon Sep 17 00:00:00 2001 From: yangjiechina <1534796060@qq.com> Date: Sat, 9 Nov 2024 10:19:16 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A7=84=E8=8C=83http=E5=93=8D=E5=BA=94?= =?UTF-8?q?=E7=9A=84=E6=95=B0=E6=8D=AE=E7=BB=93=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api.go | 20 ++++++++------- api_gb.go | 65 +++++++++++++++++++++++------------------------- http_response.go | 28 +++++++++++++-------- stream/sink.go | 4 +-- 4 files changed, 62 insertions(+), 55 deletions(-) diff --git a/api.go b/api.go index 387b096..145168d 100644 --- a/api.go +++ b/api.go @@ -64,7 +64,7 @@ func filterRequestBodyParams[T any](f func(params T, w http.ResponseWriter, req return func(w http.ResponseWriter, req *http.Request) { if err := HttpDecodeJSONBody(w, req, params); err != nil { log.Sugar.Errorf("处理http请求失败 err: %s path: %s", err.Error(), req.URL.Path) - httpResponse2(w, err) + httpResponseError(w, err.Error()) return } @@ -103,9 +103,11 @@ func startApiServer(addr string) { apiServer.router.HandleFunc("/api/v1/streams/statistics", nil) // 统计所有推拉流 - apiServer.router.HandleFunc("/api/v1/gb28181/forward", filterRequestBodyParams(apiServer.OnGBSourceForward, &GBForwardParams{})) // 设置级联转发目标,停止级联调用sink/close接口,级联断开会走on_play_done事件通知 - apiServer.router.HandleFunc("/api/v1/gb28181/source/create", filterRequestBodyParams(apiServer.OnGBSourceCreate, &GBSourceParams{})) // 创建国标推流源 - apiServer.router.HandleFunc("/api/v1/gb28181/source/connect", filterRequestBodyParams(apiServer.OnGBSourceConnect, &GBConnect{})) // 为国标TCP主动推流,设置连接地址 + if stream.AppConfig.GB28181.Enable { + apiServer.router.HandleFunc("/api/v1/gb28181/forward", filterRequestBodyParams(apiServer.OnGBSourceForward, &GBForwardParams{})) // 设置级联转发目标,停止级联调用sink/close接口,级联断开会走on_play_done事件通知 + apiServer.router.HandleFunc("/api/v1/gb28181/source/create", filterRequestBodyParams(apiServer.OnGBSourceCreate, &GBSourceParams{})) // 创建国标推流源 + apiServer.router.HandleFunc("/api/v1/gb28181/source/connect", filterRequestBodyParams(apiServer.OnGBSourceConnect, &GBConnect{})) // 为国标TCP主动推流,设置连接地址 + } apiServer.router.HandleFunc("/api/v1/gc/force", func(writer http.ResponseWriter, request *http.Request) { runtime.GC() @@ -132,7 +134,7 @@ func startApiServer(addr string) { } } -func (api *ApiServer) generateSinkId(remoteAddr string) stream.SinkID { +func (api *ApiServer) generateSinkID(remoteAddr string) stream.SinkID { tcpAddr, err := net.ResolveTCPAddr("tcp", remoteAddr) if err != nil { panic(err) @@ -167,7 +169,7 @@ func (api *ApiServer) onWSFlv(sourceId string, w http.ResponseWriter, r *http.Re return } - sink := flv.NewFLVSink(api.generateSinkId(r.RemoteAddr), sourceId, flv.NewWSConn(conn)) + sink := flv.NewFLVSink(api.generateSinkID(r.RemoteAddr), sourceId, flv.NewWSConn(conn)) sink.SetUrlValues(r.URL.Query()) log.Sugar.Infof("ws-flv 连接 sink:%s", sink.String()) @@ -207,7 +209,7 @@ func (api *ApiServer) onHttpFLV(sourceId string, w http.ResponseWriter, r *http. return } - sink := flv.NewFLVSink(api.generateSinkId(r.RemoteAddr), sourceId, conn) + sink := flv.NewFLVSink(api.generateSinkID(r.RemoteAddr), sourceId, conn) sink.SetUrlValues(r.URL.Query()) log.Sugar.Infof("http-flv 连接 sink:%s", sink.String()) @@ -344,7 +346,7 @@ func (api *ApiServer) onRtc(sourceId string, w http.ResponseWriter, r *http.Requ group := sync.WaitGroup{} group.Add(1) - sink := rtc.NewSink(api.generateSinkId(r.RemoteAddr), sourceId, v.SDP, func(sdp string) { + sink := rtc.NewSink(api.generateSinkID(r.RemoteAddr), sourceId, v.SDP, func(sdp string) { response := struct { Type string `json:"type"` SDP string `json:"sdp"` @@ -442,7 +444,7 @@ func (api *ApiServer) OnSinkList(v *IDS, w http.ResponseWriter, r *http.Request) } func (api *ApiServer) OnSourceClose(v *IDS, w http.ResponseWriter, r *http.Request) { - log.Sugar.Infof("close source: %v", v) + log.Sugar.Infof("close source: %v", v.Source) if source := stream.SourceManager.Find(v.Source); source != nil { source.Close() diff --git a/api_gb.go b/api_gb.go index 7ea31dc..a92aae5 100644 --- a/api_gb.go +++ b/api_gb.go @@ -34,23 +34,23 @@ func (api *ApiServer) OnGBSourceCreate(v *GBSourceParams, w http.ResponseWriter, // 返回收流地址 response := &struct { - IP string `json:"ip"` - Port int `json:"port,omitempty"` + IP string `json:"ip"` + Port int `json:"port,omitempty"` + Urls []string `json:"urls"` }{} var err error // 响应错误消息 defer func() { if err != nil { - log.Sugar.Errorf(err.Error()) - httpResponse2(w, err) + log.Sugar.Errorf("创建国标源失败 err: %s", err.Error()) + httpResponseError(w, err.Error()) } }() source := stream.SourceManager.Find(v.Source) if source != nil { - log.Sugar.Errorf("创建国标源失败, %s已经存在", v.Source) - err = &MalformedRequest{Code: http.StatusBadRequest, Msg: fmt.Sprintf("创建国标源失败, %s已经存在", v.Source)} + err = fmt.Errorf("%s 源已经存在", v.Source) return } @@ -66,11 +66,11 @@ func (api *ApiServer) OnGBSourceCreate(v *GBSourceParams, w http.ResponseWriter, if tcp && active { if !stream.AppConfig.GB28181.IsMultiPort() { - err = &MalformedRequest{Code: http.StatusBadRequest, Msg: "创建国标源失败, 单端口模式下不能主动拉流"} + err = fmt.Errorf("单端口模式下不能主动拉流") } else if !tcp { - err = &MalformedRequest{Code: http.StatusBadRequest, Msg: "创建国标源失败, UDP不能主动拉流"} + err = fmt.Errorf("UDP不能主动拉流") } else if !stream.AppConfig.GB28181.IsEnableTCP() { - err = &MalformedRequest{Code: http.StatusBadRequest, Msg: "创建国标源失败, 未开启TCP, UDP不能主动拉流"} + err = fmt.Errorf("未开启TCP收流服务,UDP不能主动拉流") } if err != nil { @@ -80,12 +80,12 @@ func (api *ApiServer) OnGBSourceCreate(v *GBSourceParams, w http.ResponseWriter, _, port, err := gb28181.NewGBSource(v.Source, v.SSRC, tcp, active) if err != nil { - err = &MalformedRequest{Code: http.StatusInternalServerError, Msg: fmt.Sprintf("创建国标源失败 err:%s", err.Error())} return } response.IP = stream.AppConfig.PublicIP response.Port = port + response.Urls = stream.GetStreamPlayUrls(v.Source) httpResponseOK(w, response) } @@ -93,54 +93,53 @@ func (api *ApiServer) OnGBSourceConnect(v *GBConnect, w http.ResponseWriter, r * log.Sugar.Infof("设置国标主动拉流连接地址: %v", v) var err error + // 响应错误消息 defer func() { if err != nil { - log.Sugar.Errorf(err.Error()) - httpResponse2(w, err) + log.Sugar.Errorf("设置国标主动拉流失败 err: %s", err.Error()) + httpResponseError(w, err.Error()) } }() source := stream.SourceManager.Find(v.Source) if source == nil { - log.Sugar.Errorf("设置主动拉流失败, %s源不存在", v.Source) - err = &MalformedRequest{Code: http.StatusBadRequest, Msg: "gb28181 source 不存在"} + err = fmt.Errorf("%s 源不存在", v.Source) return } activeSource, ok := source.(*gb28181.ActiveSource) if !ok { - log.Sugar.Errorf("设置主动拉流失败, %s源不是Active拉流类型", v.Source) - err = &MalformedRequest{Code: http.StatusBadRequest, Msg: "gbsource 不能转为active source"} + err = fmt.Errorf("%s 源不是Active拉流类型", v.Source) return } addr, err := net.ResolveTCPAddr("tcp", v.RemoteAddr) if err != nil { - log.Sugar.Errorf("设置主动拉流失败, err: %s", err.Error()) - err = &MalformedRequest{Code: http.StatusBadRequest, Msg: "解析连接地址失败"} return } - err = activeSource.Connect(addr) - if err != nil { - log.Sugar.Errorf("设置主动拉流失败, err: %s", err.Error()) - err = &MalformedRequest{Code: http.StatusBadRequest, Msg: fmt.Sprintf("连接Server失败 err:%s", err.Error())} - return + if err = activeSource.Connect(addr); err == nil { + httpResponseOK(w, nil) } - - httpResponseOK(w, nil) } func (api *ApiServer) OnGBSourceForward(v *GBForwardParams, w http.ResponseWriter, r *http.Request) { log.Sugar.Infof("设置国标级联转发: %v", v) + var err error + // 响应错误消息 + defer func() { + if err != nil { + log.Sugar.Errorf("设置级联转发失败 err: %s", err.Error()) + httpResponseError(w, err.Error()) + } + }() + source := stream.SourceManager.Find(v.Source) if source == nil { - log.Sugar.Infof("设置国标级联转发失败 %s源不存在", v.Source) - w.WriteHeader(http.StatusNotFound) + err = fmt.Errorf("%s 源不存在", v.Source) } else if source.GetType() != stream.SourceType28181 { - log.Sugar.Infof("设置国标级联转发失败 %s源不是国标推流类型", v.Source) - w.WriteHeader(http.StatusBadRequest) + log.Sugar.Infof("%s 源不是国标推流类型", v.Source) return } @@ -168,8 +167,6 @@ func (api *ApiServer) OnGBSourceForward(v *GBForwardParams, w http.ResponseWrite sink, port, err := gb28181.NewForwardSink(v.SSRC, v.Addr, setup, sinkId, v.Source) if err != nil { - log.Sugar.Errorf("设置国标级联转发 err: %s", err.Error()) - w.WriteHeader(http.StatusInternalServerError) return } @@ -178,10 +175,10 @@ func (api *ApiServer) OnGBSourceForward(v *GBForwardParams, w http.ResponseWrite log.Sugar.Infof("设置国标级联转发成功 ID: %s", sink.GetID()) response := struct { - ID string `json:"id"` //sink id + Sink string `json:"sink"` //sink id IP string `json:"ip"` Port int `json:"port"` - }{ID: stream.SinkId2String(sinkId), IP: stream.AppConfig.PublicIP, Port: port} + }{Sink: stream.SinkId2String(sinkId), IP: stream.AppConfig.PublicIP, Port: port} - httpResponse2(w, &response) + httpResponseOK(w, &response) } diff --git a/http_response.go b/http_response.go index d7af5ac..9640b74 100644 --- a/http_response.go +++ b/http_response.go @@ -6,24 +6,32 @@ import ( ) func httpResponse(w http.ResponseWriter, code int, msg string) { - httpResponse2(w, MalformedRequest{ + httpResponseJson(w, MalformedRequest{ Code: code, Msg: msg, }) } -func httpResponseOK(w http.ResponseWriter, data interface{}) { - httpResponse2(w, MalformedRequest{ - Code: http.StatusOK, - Msg: "ok", - Data: data, - }) -} - -func httpResponse2(w http.ResponseWriter, payload interface{}) { +func httpResponseJson(w http.ResponseWriter, payload interface{}) { body, _ := json.Marshal(payload) w.Header().Set("Content-Type", "application/json") w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT") w.Write(body) } + +func httpResponseOK(w http.ResponseWriter, data interface{}) { + httpResponseJson(w, MalformedRequest{ + Code: http.StatusOK, + Msg: "ok", + Data: data, + }) +} + +func httpResponseError(w http.ResponseWriter, msg string) { + httpResponseJson(w, MalformedRequest{ + Code: -1, + Msg: msg, + Data: nil, + }) +} diff --git a/stream/sink.go b/stream/sink.go index 6b01e77..c2f9c47 100644 --- a/stream/sink.go +++ b/stream/sink.go @@ -103,8 +103,8 @@ type BaseSink struct { TCPStreaming bool // 是否是TCP流式拉流 urlValues url.Values // 拉流时携带的Url参数 - SentPacketCount int // 发包计数 - Ready bool + SentPacketCount int // 发包计数 + Ready bool // 是否准备好推流. Sink可以通过控制该变量, 达到触发Source推流, 但不立即拉流的目的. 比如rtsp拉流端在信令交互阶段,需要先获取媒体信息,再拉流. createTime time.Time }