diff --git a/api.go b/api.go index 033e974..f6cfdb9 100644 --- a/api.go +++ b/api.go @@ -14,15 +14,14 @@ import ( "github.com/gorilla/mux" "github.com/gorilla/websocket" "github.com/lkmio/avformat/utils" - "io" "math" "net" "net/http" - "net/http/httputil" "net/url" "os" "strconv" "strings" + "sync" "time" ) @@ -74,11 +73,6 @@ type SeekParams struct { Seconds int `json:"seconds"` } -type PlatformChannel struct { - ServerAddr string `json:"server_addr"` - Channels [][2]string `json:"channels"` //二维数组, 索引0-设备ID/索引1-通道ID -} - type BroadcastParams struct { DeviceID string `json:"device_id"` ChannelID string `json:"channel_id"` @@ -92,7 +86,9 @@ type RecordParams struct { } type StreamIDParams struct { - StreamID string `json:"streamid"` + StreamID common.StreamID `json:"streamid"` + Command string `json:"command"` + Scale int `json:"scale"` } type PageQuery struct { @@ -103,12 +99,6 @@ type PageQuery struct { Data interface{} `json:"data"` } -type PageQueryChannel struct { - PageQuery - DeviceID string `json:"device_id"` - GroupID string `json:"group_id"` -} - type SetMediaTransportReq struct { DeviceID string `json:"serial"` MediaTransport string `json:"media_transport"` @@ -223,15 +213,13 @@ func startApiServer(addr string) { apiServer.router.HandleFunc("/api/v1/hook/on_idle_timeout", common.WithJsonParams(apiServer.OnIdleTimeout, &StreamParams{})) apiServer.router.HandleFunc("/api/v1/hook/on_receive_timeout", common.WithJsonParams(apiServer.OnReceiveTimeout, &StreamParams{})) apiServer.router.HandleFunc("/api/v1/hook/on_record", common.WithJsonParams(apiServer.OnRecord, &RecordParams{})) - apiServer.router.HandleFunc("/api/v1/hook/on_started", apiServer.OnStarted) - // 统一处理live/playback/download请求 - //apiServer.router.HandleFunc("/api/v1/{action}/start", withVerify(common.WithFormDataParams(apiServer.OnInvite, InviteParams{}))) - - apiServer.router.HandleFunc("/api/v1/stream/start", withVerify(common.WithFormDataParams(apiServer.OnStreamStart, InviteParams{}))) // 实时预览 - apiServer.router.HandleFunc("/api/v1/stream/stop", withVerify(common.WithFormDataParams(apiServer.OnCloseStream, InviteParams{}))) - apiServer.router.HandleFunc("/api/v1/playback/start", withVerify(common.WithFormDataParams(apiServer.OnPlaybackStart, InviteParams{}))) // 回放/下载 + apiServer.router.HandleFunc("/api/v1/stream/start", withVerify(common.WithFormDataParams(apiServer.OnStreamStart, InviteParams{}))) // 实时预览 + apiServer.router.HandleFunc("/api/v1/stream/stop", withVerify(common.WithFormDataParams(apiServer.OnCloseLiveStream, InviteParams{}))) // 关闭实时预览 + apiServer.router.HandleFunc("/api/v1/playback/start", withVerify(common.WithFormDataParams(apiServer.OnPlaybackStart, InviteParams{}))) // 回放/下载 + apiServer.router.HandleFunc("/api/v1/playback/stop", withVerify(common.WithFormDataParams(apiServer.OnCloseStream, StreamIDParams{}))) // 关闭回放/下载 + apiServer.router.HandleFunc("/api/v1/playback/control", withVerify(common.WithFormDataParams(apiServer.OnPlaybackControl, StreamIDParams{}))) // 回放控制 apiServer.router.HandleFunc("/api/v1/device/list", withVerify(common.WithQueryStringParams(apiServer.OnDeviceList, QueryDeviceChannel{}))) // 查询设备列表 apiServer.router.HandleFunc("/api/v1/device/channeltree", withVerify(common.WithQueryStringParams(apiServer.OnDeviceTree, QueryDeviceChannel{}))) // 设备树 @@ -242,6 +230,7 @@ func startApiServer(addr string) { apiServer.router.HandleFunc("/api/v1/playback/recordlist", withVerify(common.WithQueryStringParams(apiServer.OnRecordList, QueryRecordParams{}))) // 查询录像列表 apiServer.router.HandleFunc("/api/v1/stream/info", withVerify(apiServer.OnStreamInfo)) + apiServer.router.HandleFunc("/api/v1/playback/streaminfo", withVerify(apiServer.OnStreamInfo)) apiServer.router.HandleFunc("/api/v1/device/session/list", withVerify(common.WithQueryStringParams(apiServer.OnSessionList, QueryDeviceChannel{}))) // 推流列表 apiServer.router.HandleFunc("/api/v1/device/session/stop", withVerify(common.WithFormDataParams(apiServer.OnSessionStop, StreamIDParams{}))) // 关闭流 apiServer.router.HandleFunc("/api/v1/device/setchannelid", withVerify(common.WithFormDataParams(apiServer.OnCustomChannelSet, CustomChannel{}))) // 关闭流 @@ -272,7 +261,7 @@ func startApiServer(addr string) { apiServer.router.HandleFunc("/api/v1/broadcast/invite", common.WithJsonResponse(apiServer.OnBroadcast, &BroadcastParams{Setup: &common.DefaultSetupType})) // 发起语音广播 apiServer.router.HandleFunc("/api/v1/broadcast/hangup", common.WithJsonResponse(apiServer.OnHangup, &BroadcastParams{})) // 挂断广播会话 - apiServer.router.HandleFunc("/api/v1/talk", apiServer.OnTalk) // 语音对讲 + apiServer.router.HandleFunc("/api/v1/control/ws-talk/{device}/{channel}", withVerify(apiServer.OnTalk)) // 语音对讲 apiServer.router.HandleFunc("/api/v1/jt/device/add", common.WithJsonResponse(apiServer.OnVirtualDeviceAdd, &dao.JTDeviceModel{})) apiServer.router.HandleFunc("/api/v1/jt/device/edit", common.WithJsonResponse(apiServer.OnVirtualDeviceEdit, &dao.JTDeviceModel{})) @@ -540,7 +529,7 @@ func (api *ApiServer) OnPlaybackStart(v *InviteParams, w http.ResponseWriter, r } } -func (api *ApiServer) DoStreamStart(v *InviteParams, _ http.ResponseWriter, _ *http.Request, action string) (interface{}, error) { +func (api *ApiServer) DoStreamStart(v *InviteParams, w http.ResponseWriter, r *http.Request, action string) (interface{}, error) { var code int var stream *dao.StreamModel var err error @@ -559,6 +548,17 @@ func (api *ApiServer) DoStreamStart(v *InviteParams, _ http.ResponseWriter, _ *h return nil, err } + // 录像下载, 转发到streaminfo接口 + if "download" == action { + if r.URL.RawQuery == "" { + r.URL.RawQuery = "streamid=" + string(v.streamId) + } else if r.URL.RawQuery != "" { + r.URL.RawQuery += "&streamid=" + string(v.streamId) + } + common.HttpForwardTo("/api/v1/stream/info", w, r) + return nil, nil + } + var urls map[string]string urls = make(map[string]string, 10) for _, url := range stream.Urls { @@ -671,7 +671,9 @@ func (api *ApiServer) DoInvite(inviteType common.InviteType, params *InviteParam // 解析回放或下载速度参数 speed, _ := strconv.Atoi(params.Speed) - speed = int(math.Min(4, float64(speed))) + if speed < 1 { + speed = 4 + } d := stack.Device{device} stream, err := d.StartStream(inviteType, params.streamId, params.ChannelID, startTimeSeconds, endTimeSeconds, params.Setup, speed, sync) if err != nil { @@ -681,9 +683,14 @@ func (api *ApiServer) DoInvite(inviteType common.InviteType, params *InviteParam return http.StatusOK, stream, nil } -func (api *ApiServer) OnCloseStream(v *InviteParams, w http.ResponseWriter, _ *http.Request) (interface{}, error) { - streamID := common.GenerateStreamID(common.InviteTypePlay, v.DeviceID, v.ChannelID, "", "") - stack.CloseStream(streamID, true) +func (api *ApiServer) OnCloseStream(v *StreamIDParams, _ http.ResponseWriter, _ *http.Request) (interface{}, error) { + stack.CloseStream(v.StreamID, true) + return "OK", nil +} + +func (api *ApiServer) OnCloseLiveStream(v *InviteParams, _ http.ResponseWriter, _ *http.Request) (interface{}, error) { + id := common.GenerateStreamID(common.InviteTypePlay, v.DeviceID, v.ChannelID, "", "") + stack.CloseStream(id, true) return "OK", nil } @@ -1077,8 +1084,73 @@ func (api *ApiServer) OnBroadcast(v *BroadcastParams, _ http.ResponseWriter, r * return nil, nil } -func (api *ApiServer) OnTalk(_ http.ResponseWriter, _ *http.Request) { +func (api *ApiServer) OnTalk(w http.ResponseWriter, r *http.Request) { + //vars := mux.Vars(r) + //device := vars["device"] + //channel := vars["channel"] + format := r.URL.Query().Get("format") + // 升级HTTP连接到WebSocket + conn, err := api.upgrader.Upgrade(w, r, nil) + if err != nil { + log.Sugar.Errorf("WebSocket升级失败: %v", err) + return + } + defer conn.Close() + + parse, err := url.Parse(common.Config.MediaServer) + if err != nil { + return + } + + // 目标WebSocket服务地址 + targetURL := fmt.Sprintf("ws://%s%s?format=%s", parse.Host, r.URL.Path, format) + + // 连接到目标WebSocket服务 + targetConn, _, err := websocket.DefaultDialer.Dial(targetURL, nil) + if err != nil { + log.Sugar.Errorf("连接目标WebSocket失败: %v", err) + return + } + defer targetConn.Close() + + group := sync.WaitGroup{} + group.Add(2) + + // 启动两个goroutine双向转发数据 + // 从客户端转发到目标服务 + go func() { + defer group.Done() + for { + messageType, p, err := conn.ReadMessage() + if err != nil { + log.Sugar.Debugf("读取客户端消息错误: %v", err) + return + } + if err := targetConn.WriteMessage(messageType, p); err != nil { + log.Sugar.Debugf("写入目标服务消息错误: %v", err) + return + } + } + }() + + // 从目标服务转发到客户端 + go func() { + defer group.Done() + for { + messageType, p, err := targetConn.ReadMessage() + if err != nil { + log.Sugar.Debugf("读取目标服务消息错误: %v", err) + return + } + if err := conn.WriteMessage(messageType, p); err != nil { + log.Sugar.Debugf("写入客户端消息错误: %v", err) + return + } + } + }() + + group.Wait() } func (api *ApiServer) OnStarted(_ http.ResponseWriter, _ *http.Request) { @@ -1316,28 +1388,7 @@ func (api *ApiServer) OnCatalogQuery(params *QueryDeviceChannel, _ http.Response } func (api *ApiServer) OnStreamInfo(w http.ResponseWriter, r *http.Request) { - response, err := stack.MSQueryStreamInfo(r.Header, r.URL.RawQuery) - if err != nil { - w.WriteHeader(http.StatusBadRequest) - _, _ = w.Write([]byte(err.Error())) - return - } - - defer response.Body.Close() - - // 复制响应头 - for name, values := range response.Header { - for _, value := range values { - w.Header().Add(name, value) - } - } - - // 设置状态码并转发响应体 - w.WriteHeader(response.StatusCode) - _, err = io.Copy(w, response.Body) - if err != nil { - log.Sugar.Errorf("Failed to copy response body: %v", err) - } + common.HttpForwardTo("/api/v1/stream/info", w, r) } func (api *ApiServer) OnSessionList(q *QueryDeviceChannel, _ http.ResponseWriter, r *http.Request) (interface{}, error) { @@ -1406,7 +1457,7 @@ func (api *ApiServer) OnSessionList(q *QueryDeviceChannel, _ http.ResponseWriter } func (api *ApiServer) OnSessionStop(params *StreamIDParams, w http.ResponseWriter, req *http.Request) (interface{}, error) { - err := stack.MSCloseSource(params.StreamID) + err := stack.MSCloseSource(string(params.StreamID)) if err != nil { return nil, err } @@ -1622,27 +1673,13 @@ func (api *ApiServer) OnCatalogPush(q *SetEnable, w http.ResponseWriter, req *ht } func (api *ApiServer) OnRecordStart(writer http.ResponseWriter, request *http.Request) { - target, _ := url.Parse(fmt.Sprintf("%s/api/v1/record/start", common.Config.MediaServer)) - proxy := &httputil.ReverseProxy{ - Director: func(req *http.Request) { - req.URL = target - req.Host = target.Host - req.Header.Set("X-Forwarded-Host", req.Header.Get("Host")) - }, - } - - proxy.ServeHTTP(writer, request) + common.HttpForwardTo("/api/v1/record/start", writer, request) } func (api *ApiServer) OnRecordStop(writer http.ResponseWriter, request *http.Request) { - target, _ := url.Parse(fmt.Sprintf("%s/api/v1/record/stop", common.Config.MediaServer)) - proxy := &httputil.ReverseProxy{ - Director: func(req *http.Request) { - req.URL = target - req.Host = target.Host - req.Header.Set("X-Forwarded-Host", req.Header.Get("Host")) - }, - } - - proxy.ServeHTTP(writer, request) + common.HttpForwardTo("/api/v1/record/stop", writer, request) +} + +func (api *ApiServer) OnPlaybackControl(params *StreamParams, w http.ResponseWriter, req *http.Request) (interface{}, error) { + return "OK", nil } diff --git a/common/http_proxy.go b/common/http_proxy.go new file mode 100644 index 0000000..e63ff57 --- /dev/null +++ b/common/http_proxy.go @@ -0,0 +1,54 @@ +package common + +import ( + "bytes" + "fmt" + "io" + "net/http" + "net/http/httputil" + "net/url" +) + +func HttpForwardTo(path string, w http.ResponseWriter, r *http.Request) { + target, _ := url.Parse(fmt.Sprintf("%s%s", Config.MediaServer, path)) + proxy := &httputil.ReverseProxy{ + Director: func(req *http.Request) { + req.URL = target + req.Host = target.Host + req.Header.Set("X-Forwarded-Host", req.Header.Get("Host")) + + // 复制所有原始请求头 + for name, values := range r.Header { + for _, value := range values { + req.Header.Add(name, value) + } + } + + // 保留原始查询参数 + req.URL.RawQuery = r.URL.RawQuery + + // 复制请求体(如果有) + if r.Body != nil { + bodyBytes, err := io.ReadAll(r.Body) + if err != nil { + http.Error(w, "Error reading request body", http.StatusInternalServerError) + return + } + r.Body.Close() + req.Body = io.NopCloser(bytes.NewReader(bodyBytes)) + req.ContentLength = int64(len(bodyBytes)) + req.GetBody = func() (io.ReadCloser, error) { + return io.NopCloser(bytes.NewReader(bodyBytes)), nil + } + } + + // 复制其他请求属性 + req.Method = r.Method + req.Proto = r.Proto + req.ProtoMajor = r.ProtoMajor + req.ProtoMinor = r.ProtoMinor + }, + } + + proxy.ServeHTTP(w, r) +} diff --git a/common/stream_id.go b/common/stream_id.go index a16e031..d9c8152 100644 --- a/common/stream_id.go +++ b/common/stream_id.go @@ -2,7 +2,9 @@ package common import ( "github.com/lkmio/avformat/utils" + "strconv" "strings" + "time" ) type StreamID string // 目前目涉及转码,多路流, 与SourceID相同 @@ -24,6 +26,19 @@ func GenerateStreamID(inviteType InviteType, deviceId, channelId string, startTi } streamId = append(streamId, channelId) + + // 转换时间戳 + if startTime != "" { + if t, err := time.Parse("2006-01-02T15:04:05", startTime); err == nil { + startTime = strconv.FormatInt(t.Unix(), 10) + } + } + if endTime != "" { + if t, err := time.Parse("2006-01-02T15:04:05", endTime); err == nil { + endTime = strconv.FormatInt(t.Unix(), 10) + } + } + if InviteTypePlayback == inviteType { return StreamID(strings.Join(streamId, "/") + ".playback" + "." + startTime + "." + endTime) } else if InviteTypeDownload == inviteType { diff --git a/stack/live.go b/stack/live.go index 7be7d81..b837109 100644 --- a/stack/live.go +++ b/stack/live.go @@ -9,8 +9,8 @@ import ( "gb-cms/sdp" "github.com/ghettovoice/gosip" "github.com/ghettovoice/gosip/sip" - "math" "net/http" + "strconv" "time" ) @@ -88,7 +88,7 @@ func (d *Device) Invite(inviteType common.InviteType, streamId common.StreamID, }() // 告知流媒体服务创建国标源, 返回收流地址信息 - ip, port, urls, ssrc, msErr := MSCreateGBSource(string(streamId), setup, "", string(inviteType)) + ip, port, urls, ssrc, msErr := MSCreateGBSource(string(streamId), setup, "", string(inviteType), speed) if msErr != nil { log.Sugar.Errorf("创建GBSource失败 err: %s", msErr.Error()) return nil, nil, msErr @@ -99,7 +99,6 @@ func (d *Device) Invite(inviteType common.InviteType, streamId common.StreamID, if common.InviteTypePlayback == inviteType { inviteRequest, err = d.BuildPlaybackRequest(channelId, ip, port, startTime, stopTime, setup, ssrc) } else if common.InviteTypeDownload == inviteType { - speed = int(math.Min(4, float64(speed))) inviteRequest, err = d.BuildDownloadRequest(channelId, ip, port, startTime, stopTime, setup, speed, ssrc) } else { inviteRequest, err = d.BuildLiveRequest(channelId, ip, port, setup, ssrc) @@ -129,7 +128,7 @@ func (d *Device) Invite(inviteType common.InviteType, streamId common.StreamID, recipient.SetPort(&sipPort) log.Sugar.Infof("send ack %s", ackRequest.String()) - + // 发送ack err = common.SipStack.Send(ackRequest) if err != nil { cancel() @@ -149,7 +148,7 @@ func (d *Device) Invite(inviteType common.InviteType, streamId common.StreamID, } else if dialogRequest == nil { // invite 没有收到任何应答 return nil, nil, fmt.Errorf("invite request timeout") - } else if "active" == setup { + } else if "active" == setup || common.InviteTypeDownload == inviteType { // 如果是TCP主动拉流, 还需要将拉流地址告知给流媒体服务 var answer *sdp.SDP answer, err = sdp.Parse(body) @@ -157,8 +156,20 @@ func (d *Device) Invite(inviteType common.InviteType, streamId common.StreamID, return nil, nil, err } + // 解析下载的文件大小 + var fileSize int + if common.InviteTypeDownload == inviteType { + for _, attr := range answer.Attrs { + if "filesize" != attr[0] { + continue + } + + fileSize, _ = strconv.Atoi(attr[1]) + } + } + addr := fmt.Sprintf("%s:%d", answer.Addr, answer.Video.Port) - if err = MSConnectGBSource(string(streamId), addr); err != nil { + if err = MSConnectGBSource(string(streamId), addr, fileSize); err != nil { log.Sugar.Errorf("设置GB28181连接地址失败 err: %s addr: %s", err.Error(), addr) return nil, nil, err } diff --git a/stack/media_server.go b/stack/media_server.go index bc91aff..f308816 100644 --- a/stack/media_server.go +++ b/stack/media_server.go @@ -54,6 +54,10 @@ type SDP struct { SSRC string `json:"ssrc,omitempty"` Setup string `json:"setup,omitempty"` // active/passive Transport string `json:"transport,omitempty"` // tcp/udp + Speed int `json:"speed"` + StartTime int `json:"start_time,omitempty"` + EndTime int `json:"end_time,omitempty"` + FileSize int `json:"file_size,omitempty"` } type SourceSDP struct { @@ -99,13 +103,14 @@ func SendWithUrlParams(path string, body interface{}, values url.Values) (*http. return client.Do(request) } -func MSCreateGBSource(id, setup string, ssrc string, sessionName string) (string, uint16, []string, string, error) { +func MSCreateGBSource(id, setup string, ssrc string, sessionName string, speed int) (string, uint16, []string, string, error) { v := &SourceSDP{ Source: id, SDP: SDP{ Setup: setup, SSRC: ssrc, SessionName: sessionName, + Speed: speed, }, } @@ -134,11 +139,12 @@ func MSCreateGBSource(id, setup string, ssrc string, sessionName string) (string return host, uint16(port), data.Data.Urls, data.Data.SSRC, err } -func MSConnectGBSource(id, addr string) error { +func MSConnectGBSource(id, addr string, fileSize int) error { v := &SourceSDP{ Source: id, SDP: SDP{ - Addr: addr, + Addr: addr, + FileSize: fileSize, }, } diff --git a/stack/sip_server.go b/stack/sip_server.go index 627fe8c..9323c3e 100644 --- a/stack/sip_server.go +++ b/stack/sip_server.go @@ -37,6 +37,7 @@ const ( CmdMobilePosition = "MobilePosition" CmdKeepalive = "Keepalive" CmdBroadcast = "Broadcast" + CmdMediaStatus = "MediaStatus" ) type sipServer struct { @@ -293,6 +294,11 @@ func (s *sipServer) OnMessage(wrapper *SipRequestSource) { if CmdKeepalive == cmd { // 下级设备心跳通知 ok = s.handler.OnKeepAlive(deviceId, wrapper.req.Source()) + } else if CmdMediaStatus == cmd { + // 回放/下载结束 + ok = true + id, _ := wrapper.req.CallID() + CloseStreamByCallID(id.Value()) } break @@ -429,6 +435,7 @@ func StartSipServer(id, listenIP, publicIP string, listenPort int) (common.SipSe fmt.Sprintf("%s.%s", XmlNameNotify, CmdKeepalive): reflect.TypeOf(BaseMessage{}), fmt.Sprintf("%s.%s", XmlNameNotify, CmdMobilePosition): reflect.TypeOf(BaseMessage{}), fmt.Sprintf("%s.%s", XmlNameResponse, CmdBroadcast): reflect.TypeOf(BaseMessage{}), + fmt.Sprintf("%s.%s", XmlNameNotify, CmdMediaStatus): reflect.TypeOf(BaseMessage{}), }} utils.Assert(ua.OnRequest(sip.REGISTER, filterRequest(server.OnRegister)) == nil) diff --git a/stack/stream.go b/stack/stream.go index 0da3a98..89eaeca 100644 --- a/stack/stream.go +++ b/stack/stream.go @@ -107,6 +107,13 @@ func CloseStream(streamId common.StreamID, ms bool) { } } +func CloseStreamByCallID(callId string) { + deleteStream, err := dao.Stream.DeleteStreamByCallID(callId) + if err == nil { + (&Stream{deleteStream}).Close(true, true) + } +} + // CloseStreamSinks 关闭某个流的所有sink func CloseStreamSinks(StreamID common.StreamID, bye, ms bool) []*dao.SinkModel { sinks, _ := dao.Sink.DeleteForwardSinksByStreamID(StreamID)