规范http响应的数据结构

This commit is contained in:
yangjiechina
2024-11-09 10:22:37 +08:00
parent 5beafb5117
commit 9f21399d3a
10 changed files with 484 additions and 411 deletions

639
api.go
View File

@@ -31,6 +31,63 @@ type InviteParams struct {
streamId StreamID
}
type StreamParams struct {
Stream StreamID `json:"stream"` // Source
Protocol string `json:"protocol"` // 推拉流协议
RemoteAddr string `json:"remote_addr"` // peer地址
}
type PlayDoneParams struct {
StreamParams
Sink string `json:"sink"`
}
type QueryRecordParams struct {
DeviceId string `json:"device_id"`
ChannelId string `json:"channel_id"`
Timeout int `json:"timeout"`
StartTime string `json:"start_time"`
EndTime string `json:"end_time"`
Type_ string `json:"type"`
}
type DeviceChannelID struct {
DeviceID string `json:"device_id"`
ChannelID string `json:"channel_id"`
}
type SeekParams struct {
StreamId StreamID `json:"stream_id"`
Seconds int `json:"seconds"`
}
type PlatformChannel struct {
ServerID string `json:"server_id"`
Channels [][2]string `json:"channels"` //二维数组, 索引0-设备ID/索引1-通道ID
}
type BroadcastParams struct {
DeviceID string `json:"device_id"`
ChannelID string `json:"channel_id"`
RoomID string `json:"room_id"`
Type int `json:"type"`
}
type HangupParams struct {
DeviceID string `json:"device_id"`
ChannelID string `json:"channel_id"`
RoomID string `json:"room_id"`
}
type RecordParams struct {
StreamParams
Path string `json:"path"`
}
type StreamIDParams struct {
StreamID StreamID `json:"stream_id"`
}
var apiServer *ApiServer
func init() {
@@ -45,58 +102,50 @@ func init() {
}
}
func withHookParams(f func(streamId StreamID, protocol string, w http.ResponseWriter, req *http.Request)) func(http.ResponseWriter, *http.Request) {
func filterRequestBodyParams[T any](f func(params T, w http.ResponseWriter, req *http.Request), params interface{}) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, req *http.Request) {
if "" != req.URL.RawQuery {
Sugar.Infof("on request %s?%s", req.URL.Path, req.URL.RawQuery)
}
v := struct {
Stream StreamID `json:"stream"` //Stream id
Protocol string `json:"protocol"` //推拉流协议
RemoteAddr string `json:"remote_addr"` //peer地址
}{}
err := HttpDecodeJSONBody(w, req, &v)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
if err := HttpDecodeJSONBody(w, req, params); err != nil {
Sugar.Errorf("处理http请求失败 err: %s path: %s", err.Error(), req.URL.Path)
httpResponseError(w, err.Error())
return
}
f(v.Stream, v.Protocol, w, req)
f(params.(T), w, req)
}
}
func startApiServer(addr string) {
apiServer.router.HandleFunc("/api/v1/hook/on_play", withHookParams(apiServer.OnPlay))
apiServer.router.HandleFunc("/api/v1/hook/on_play_done", withHookParams(apiServer.OnPlayDone))
apiServer.router.HandleFunc("/api/v1/hook/on_publish", withHookParams(apiServer.OnPublish))
apiServer.router.HandleFunc("/api/v1/hook/on_publish_done", withHookParams(apiServer.OnPublishDone))
apiServer.router.HandleFunc("/api/v1/hook/on_idle_timeout", withHookParams(apiServer.OnIdleTimeout))
apiServer.router.HandleFunc("/api/v1/hook/on_receive_timeout", withHookParams(apiServer.OnReceiveTimeout))
apiServer.router.HandleFunc("/api/v1/hook/on_record", withHookParams(apiServer.OnReceiveTimeout))
apiServer.router.HandleFunc("/api/v1/hook/on_play", filterRequestBodyParams(apiServer.OnPlay, &StreamParams{}))
apiServer.router.HandleFunc("/api/v1/hook/on_play_done", filterRequestBodyParams(apiServer.OnPlayDone, &PlayDoneParams{}))
apiServer.router.HandleFunc("/api/v1/hook/on_publish", filterRequestBodyParams(apiServer.OnPublish, &StreamParams{}))
apiServer.router.HandleFunc("/api/v1/hook/on_publish_done", filterRequestBodyParams(apiServer.OnPublishDone, &StreamParams{}))
apiServer.router.HandleFunc("/api/v1/hook/on_idle_timeout", filterRequestBodyParams(apiServer.OnIdleTimeout, &StreamParams{}))
apiServer.router.HandleFunc("/api/v1/hook/on_receive_timeout", filterRequestBodyParams(apiServer.OnReceiveTimeout, &StreamParams{}))
apiServer.router.HandleFunc("/api/v1/hook/on_record", filterRequestBodyParams(apiServer.OnRecord, &RecordParams{}))
apiServer.router.HandleFunc("/api/v1/hook/on_started", apiServer.OnStarted)
// 统一处理live/playback/download请求
apiServer.router.HandleFunc("/api/v1/{action}/start", apiServer.OnInvite)
apiServer.router.HandleFunc("/api/v1/stream/close", apiServer.OnCloseStream) // 释放流(实时/回放/下载), 实际以拉流计数为准, 如果没有客户端拉流, 不等流媒体服务通知空闲超时,立即释放流,否则(还有客户端拉流)不会释放。
apiServer.router.HandleFunc("/api/v1/{action}/start", filterRequestBodyParams(apiServer.OnInvite, &InviteParams{}))
// 关闭国标流. 如果是实时流, 等收流或空闲超时自行删除. 回放或下载流立即删除.
apiServer.router.HandleFunc("/api/v1/stream/close", filterRequestBodyParams(apiServer.OnCloseStream, &StreamIDParams{}))
apiServer.router.HandleFunc("/api/v1/device/list", apiServer.OnDeviceList) // 查询在线设备
apiServer.router.HandleFunc("/api/v1/record/list", apiServer.OnRecordList) // 查询录像列表
apiServer.router.HandleFunc("/api/v1/position/sub", apiServer.OnSubscribePosition) // 订阅移动位置
apiServer.router.HandleFunc("/api/v1/playback/seek", apiServer.OnSeekPlayback) // 回放seek
apiServer.router.HandleFunc("/api/v1/ptz/control", apiServer.OnPTZControl) // 云台控制
apiServer.router.HandleFunc("/api/v1/device/list", apiServer.OnDeviceList) // 查询在线设备
apiServer.router.HandleFunc("/api/v1/record/list", filterRequestBodyParams(apiServer.OnRecordList, &QueryRecordParams{})) // 查询录像列表
apiServer.router.HandleFunc("/api/v1/position/sub", filterRequestBodyParams(apiServer.OnSubscribePosition, &DeviceChannelID{})) // 订阅移动位置
apiServer.router.HandleFunc("/api/v1/playback/seek", filterRequestBodyParams(apiServer.OnSeekPlayback, &SeekParams{})) // 回放seek
apiServer.router.HandleFunc("/api/v1/ptz/control", apiServer.OnPTZControl) // 云台控制
apiServer.router.HandleFunc("/api/v1/platform/add", apiServer.OnPlatformAdd) // 添加上级平台
apiServer.router.HandleFunc("/api/v1/platform/remove", apiServer.OnPlatformRemove) // 删除上级平台
apiServer.router.HandleFunc("/api/v1/platform/list", apiServer.OnPlatformList) // 上级平台列表
apiServer.router.HandleFunc("/api/v1/platform/channel/bind", apiServer.OnPlatformChannelBind) // 级联绑定通道
apiServer.router.HandleFunc("/api/v1/platform/channel/unbind", apiServer.OnPlatformChannelUnbind) // 级联取消绑定通道
apiServer.router.HandleFunc("/api/v1/platform/add", filterRequestBodyParams(apiServer.OnPlatformAdd, &GBPlatformRecord{})) // 添加上级平台
apiServer.router.HandleFunc("/api/v1/platform/remove", filterRequestBodyParams(apiServer.OnPlatformRemove, &GBPlatformRecord{})) // 删除上级平台
apiServer.router.HandleFunc("/api/v1/platform/list", apiServer.OnPlatformList) // 上级平台列表
apiServer.router.HandleFunc("/api/v1/platform/channel/bind", filterRequestBodyParams(apiServer.OnPlatformChannelBind, &PlatformChannel{})) // 级联绑定通道
apiServer.router.HandleFunc("/api/v1/platform/channel/unbind", filterRequestBodyParams(apiServer.OnPlatformChannelUnbind, &PlatformChannel{})) // 级联取消绑定通道
apiServer.router.HandleFunc("/ws/v1/talk", apiServer.OnWSTalk) // 语音广播/对讲, 主讲音频传输链路
apiServer.router.HandleFunc("/api/v1/broadcast/invite", apiServer.OnBroadcast) // 发起语音广播
apiServer.router.HandleFunc("/api/v1/broadcast/hangup", apiServer.OnHangup) // 挂断广播会话
apiServer.router.HandleFunc("/api/v1/talk", apiServer.OnTalk) // 语音对讲
apiServer.router.HandleFunc("/ws/v1/talk", apiServer.OnWSTalk) // 语音广播/对讲, 主讲音频传输链路
apiServer.router.HandleFunc("/api/v1/broadcast/invite", filterRequestBodyParams(apiServer.OnBroadcast, &BroadcastParams{Type: int(BroadcastTypeTCP)})) // 发起语音广播
apiServer.router.HandleFunc("/api/v1/broadcast/hangup", filterRequestBodyParams(apiServer.OnHangup, &HangupParams{})) // 挂断广播会话
apiServer.router.HandleFunc("/api/v1/talk", apiServer.OnTalk) // 语音对讲
apiServer.router.HandleFunc("/broadcast.html", func(writer http.ResponseWriter, request *http.Request) {
http.ServeFile(writer, request, "./broadcast.html")
})
@@ -121,8 +170,8 @@ func startApiServer(addr string) {
}
}
func (api *ApiServer) OnPlay(streamId StreamID, protocol string, w http.ResponseWriter, r *http.Request) {
Sugar.Infof("play. protocol:%s stream id:%s", protocol, streamId)
func (api *ApiServer) OnPlay(params *StreamParams, w http.ResponseWriter, r *http.Request) {
Sugar.Infof("播放事件. protocol: %s stream : %s", params.Protocol, params.Stream)
// [注意]: windows上使用cmd/power shell推拉流如果要携带多个参数, 请用双引号将与号引起来("&")
// session_id是为了同一个录像文件, 允许同时点播多个.当然如果实时流支持多路预览, 也是可以的.
@@ -136,98 +185,179 @@ func (api *ApiServer) OnPlay(streamId StreamID, protocol string, w http.Response
//ffplay -i rtmp://127.0.0.1/34020000001320000001/34020000001310000001.session_id_0?setup=passive&stream_type=playback&start_time=2024-06-18T15:20:56&end_time=2024-06-18T15:25:56
// 跳过非国标拉流
split := strings.Split(string(streamId), "/")
split := strings.Split(string(params.Stream), "/")
if len(split) != 2 || len(split[0]) != 20 || len(split[1]) < 20 {
w.WriteHeader(http.StatusOK)
Sugar.Infof("跳过非国标流的播放事件 stream: %s", params.Stream)
return
}
// 已经存在,累加计数
if stream := StreamManager.Find(streamId); stream != nil {
stream.IncreaseSinkCount()
w.WriteHeader(http.StatusOK)
if stream := StreamManager.Find(params.Stream); stream != nil {
count := stream.IncreaseSinkCount()
Sugar.Infof("拉流计数: %d stream: %s ", count, params.Stream)
return
}
deviceId := split[0] //deviceId
channelId := split[1] //channelId
deviceId := split[0]
channelId := split[1]
if len(channelId) > 20 {
channelId = channelId[:20]
}
query := r.URL.Query()
params := InviteParams{
inviteParams := &InviteParams{
DeviceID: deviceId,
ChannelID: channelId,
StartTime: query.Get("start_time"),
EndTime: query.Get("end_time"),
Setup: strings.ToLower(query.Get("setup")),
Speed: query.Get("speed"),
streamId: streamId,
streamId: params.Stream,
}
streamType := strings.ToLower(query.Get("stream_type"))
var code int
var stream *Stream
var ok bool
var err error
streamType := strings.ToLower(query.Get("stream_type"))
if "playback" == streamType {
stream, ok = api.DoInvite(InviteTypeLive, params, false, w, r)
code, stream, err = api.DoInvite(InviteTypeLive, inviteParams, false, w, r)
} else if "download" == streamType {
stream, ok = api.DoInvite(InviteTypeDownload, params, false, w, r)
code, stream, err = api.DoInvite(InviteTypeDownload, inviteParams, false, w, r)
} else {
stream, ok = api.DoInvite(InviteTypeLive, params, false, w, r)
code, stream, err = api.DoInvite(InviteTypeLive, inviteParams, false, w, r)
}
if ok {
stream.IncreaseSinkCount()
if err != nil {
Sugar.Errorf("请求流失败 err: %s", err.Error())
}
if http.StatusOK == code {
count := stream.IncreaseSinkCount()
Sugar.Infof("拉流计数: %d stream: %s ", count, params.Stream)
}
w.WriteHeader(code)
}
func (api *ApiServer) OnInvite(w http.ResponseWriter, r *http.Request) {
v := InviteParams{}
if err := HttpDecodeJSONBody(w, r, &v); err != nil {
w.WriteHeader(http.StatusBadRequest)
func (api *ApiServer) OnPlayDone(params *PlayDoneParams, w http.ResponseWriter, r *http.Request) {
Sugar.Infof("播放结束事件. protocol: %s stream: %s", params.Protocol, params.Stream)
stream := StreamManager.Find(params.Stream)
if stream == nil {
Sugar.Errorf("处理播放结束事件失败, stream不存在. id: %s", params.Stream)
return
}
count := stream.DecreaseSinkCount()
Sugar.Infof("拉流计数: %d stream: %s ", count, params.Stream)
// 媒体链路与上级断开连接, 向上级发送Bye请求
if params.Protocol == "gb_stream_forward" {
sink := stream.RemoveForwardSink(params.Sink)
if sink == nil || sink.dialog == nil {
return
}
if platform := PlatformManager.FindPlatform(sink.platformID); platform != nil {
callID, _ := sink.dialog.CallID()
platform.CloseStream(callID.String(), true, false)
}
}
}
func (api *ApiServer) OnPublish(params *StreamParams, w http.ResponseWriter, r *http.Request) {
Sugar.Infof("推流事件. protocol: %s stream: %s", params.Protocol, params.Stream)
stream := StreamManager.Find(params.Stream)
if stream != nil {
stream.publishEvent <- 0
}
}
func (api *ApiServer) OnPublishDone(params *StreamParams, w http.ResponseWriter, r *http.Request) {
Sugar.Infof("推流结束事件. protocol: %s stream: %s", params.Protocol, params.Stream)
CloseStream(params.Stream)
}
func (api *ApiServer) OnIdleTimeout(params *StreamParams, w http.ResponseWriter, req *http.Request) {
Sugar.Infof("推流空闲超时事件. protocol: %s stream: %s", params.Protocol, params.Stream)
// 非rtmp空闲超时, 返回非200应答, 删除会话
if params.Protocol != "rtmp" {
w.WriteHeader(http.StatusForbidden)
CloseStream(params.Stream)
}
}
func (api *ApiServer) OnReceiveTimeout(params *StreamParams, w http.ResponseWriter, req *http.Request) {
Sugar.Infof("收流超时事件. protocol: %s stream: %s", params.Protocol, params.Stream)
// 非rtmp推流超时, 返回非200应答, 删除会话
if params.Protocol != "rtmp" {
w.WriteHeader(http.StatusForbidden)
CloseStream(params.Stream)
}
}
func (api *ApiServer) OnRecord(params *RecordParams, w http.ResponseWriter, req *http.Request) {
Sugar.Infof("录制事件. protocol: %s stream: %s path:%s ", params.Protocol, params.Stream, params.Path)
}
func (api *ApiServer) OnInvite(v *InviteParams, w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
action := strings.ToLower(vars["action"])
var code int
var stream *Stream
var err error
if "playback" == action {
apiServer.DoInvite(InviteTypePlayback, v, true, w, r)
code, stream, err = apiServer.DoInvite(InviteTypePlayback, v, true, w, r)
} else if "download" == action {
apiServer.DoInvite(InviteTypeDownload, v, true, w, r)
code, stream, err = apiServer.DoInvite(InviteTypeDownload, v, true, w, r)
} else if "live" == action {
apiServer.DoInvite(InviteTypeLive, v, true, w, r)
code, stream, err = apiServer.DoInvite(InviteTypeLive, v, true, w, r)
} else {
w.WriteHeader(http.StatusNotFound)
return
}
if http.StatusOK != code {
Sugar.Errorf("请求流失败 err: %s", err.Error())
httpResponseError(w, err.Error())
} else {
// 返回stream id和拉流地址
response := struct {
Stream string `json:"stream_id"`
Urls []string `json:"urls"`
}{
string(stream.ID),
stream.urls,
}
httpResponseOK(w, response)
}
}
// DoInvite 处理Invite请求
// @params sync 是否异步等待流媒体的publish事件(确认收到流), 目前请求流分两种方式流媒体hook和http接口, hook方式同步等待确认收到流再应答, http接口直接应答成功。
func (api *ApiServer) DoInvite(inviteType InviteType, params InviteParams, sync bool, w http.ResponseWriter, r *http.Request) (*Stream, bool) {
func (api *ApiServer) DoInvite(inviteType InviteType, params *InviteParams, sync bool, w http.ResponseWriter, r *http.Request) (int, *Stream, error) {
device := DeviceManager.Find(params.DeviceID)
if device == nil {
Sugar.Warnf("设备离线 id:%s", params.DeviceID)
w.WriteHeader(http.StatusNotFound)
return nil, false
return http.StatusNotFound, nil, fmt.Errorf("设备离线 id: %s", params.DeviceID)
}
// 解析时间范围参数
// 解析回放或下载的时间范围参数
var startTimeSeconds string
var endTimeSeconds string
if InviteTypeLive != inviteType {
startTime, err := time.ParseInLocation("2006-01-02t15:04:05", params.StartTime, time.Local)
if err != nil {
Sugar.Errorf("解析开始时间失败 err:%s start_time:%s", err.Error(), params.StartTime)
w.WriteHeader(http.StatusBadRequest)
return nil, false
return http.StatusBadRequest, nil, err
}
endTime, err := time.ParseInLocation("2006-01-02t15:04:05", params.EndTime, time.Local)
if err != nil {
Sugar.Errorf("解析开始时间失败 err:%s start_time:%s", err.Error(), params.EndTime)
w.WriteHeader(http.StatusBadRequest)
return nil, false
return http.StatusBadRequest, nil, err
}
startTimeSeconds = strconv.FormatInt(startTime.Unix(), 10)
@@ -242,40 +372,23 @@ func (api *ApiServer) DoInvite(inviteType InviteType, params InviteParams, sync
// 解析回放或下载速度参数
speed, _ := strconv.Atoi(params.Speed)
speed = int(math.Min(4, float64(speed)))
stream, ok := device.(*Device).StartStream(inviteType, streamId, params.ChannelID, startTimeSeconds, endTimeSeconds, params.Setup, speed, sync)
if !ok {
w.WriteHeader(http.StatusInternalServerError)
return nil, false
stream, err := device.(*Device).StartStream(inviteType, streamId, params.ChannelID, startTimeSeconds, endTimeSeconds, params.Setup, speed, sync)
if err != nil {
return http.StatusInternalServerError, nil, err
}
// 返回stream id
response := map[string]string{"stream_id": string(streamId)}
httpResponseOK(w, response)
return stream, true
return http.StatusOK, stream, nil
}
func (api *ApiServer) OnCloseStream(w http.ResponseWriter, r *http.Request) {
v := struct {
StreamID StreamID `json:"stream_id"`
}{}
err := HttpDecodeJSONBody(w, r, &v)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
func (api *ApiServer) OnCloseStream(v *StreamIDParams, w http.ResponseWriter, r *http.Request) {
stream := StreamManager.Find(v.StreamID)
if stream == nil {
w.WriteHeader(http.StatusNotFound)
return
// 等空闲或收流超时会自动关闭
if stream != nil && stream.SinkCount() < 1 {
CloseStream(v.StreamID)
}
if stream.SinkCount() > 0 {
return
}
CloseStream(v.StreamID)
httpResponseOK(w, nil)
}
func CloseStream(streamId StreamID) {
@@ -285,102 +398,31 @@ func CloseStream(streamId StreamID) {
}
}
func (api *ApiServer) OnPlayDone(streamId StreamID, protocol string, w http.ResponseWriter, r *http.Request) {
Sugar.Infof("play done. protocol:%s stream id:%s", protocol, streamId)
if stream := StreamManager.Find(streamId); stream != nil {
stream.DecreaseSinkCount()
}
// 与上级级联断开连接
if protocol == "gb_stream_forward" {
}
w.WriteHeader(http.StatusOK)
}
func (api *ApiServer) OnPublish(streamId StreamID, protocol string, w http.ResponseWriter, r *http.Request) {
Sugar.Infof("publish. protocol:%s stream id:%s", protocol, streamId)
w.WriteHeader(http.StatusOK)
stream := StreamManager.Find(streamId)
if stream != nil {
stream.publishEvent <- 0
}
}
func (api *ApiServer) OnPublishDone(streamId StreamID, protocol string, w http.ResponseWriter, r *http.Request) {
Sugar.Infof("publish done. protocol:%s stream id:%s", protocol, streamId)
w.WriteHeader(http.StatusOK)
CloseStream(streamId)
}
func (api *ApiServer) OnIdleTimeout(streamId StreamID, protocol string, w http.ResponseWriter, req *http.Request) {
Sugar.Infof("publish timeout. protocol:%s stream id:%s", protocol, streamId)
if protocol != "rtmp" {
w.WriteHeader(http.StatusForbidden)
CloseStream(streamId)
} else {
w.WriteHeader(http.StatusOK)
}
}
func (api *ApiServer) OnReceiveTimeout(streamId StreamID, protocol string, w http.ResponseWriter, req *http.Request) {
Sugar.Infof("receive timeout. protocol:%s stream id:%s", protocol, streamId)
if protocol != "rtmp" {
w.WriteHeader(http.StatusForbidden)
CloseStream(streamId)
} else {
w.WriteHeader(http.StatusOK)
}
}
func (api *ApiServer) OnRecord(streamId string, protocol string, w http.ResponseWriter, req *http.Request) {
Sugar.Infof("receive onrecord. protocol:%s stream id:%s", protocol, streamId)
w.WriteHeader(http.StatusOK)
}
func (api *ApiServer) OnDeviceList(w http.ResponseWriter, r *http.Request) {
devices := DeviceManager.AllDevices()
httpResponseOK(w, devices)
}
func (api *ApiServer) OnRecordList(w http.ResponseWriter, r *http.Request) {
v := struct {
DeviceId string `json:"device_id"`
ChannelId string `json:"channel_id"`
Timeout int `json:"timeout"`
StartTime string `json:"start_time"`
EndTime string `json:"end_time"`
Type_ string `json:"type"`
}{}
err := HttpDecodeJSONBody(w, r, &v)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
func (api *ApiServer) OnRecordList(v *QueryRecordParams, w http.ResponseWriter, r *http.Request) {
device := DeviceManager.Find(v.DeviceId)
if device == nil {
httpResponseOK(w, "设备离线")
httpResponseError(w, "设备离线")
return
}
sn := GetSN()
err = device.QueryRecord(v.ChannelId, v.StartTime, v.EndTime, sn, v.Type_)
err := device.QueryRecord(v.ChannelId, v.StartTime, v.EndTime, sn, v.Type_)
if err != nil {
httpResponseOK(w, fmt.Sprintf("发送查询录像记录失败 err:%s", err.Error()))
logger.Error("发送查询录像请求失败 err: %s", err.Error())
httpResponseError(w, err.Error())
return
}
var recordList []RecordInfo
// 设置查询超时时长
timeout := int(math.Max(math.Min(5, float64(v.Timeout)), 60))
//设置查询超时时长
withTimeout, cancelFunc := context.WithTimeout(context.Background(), time.Duration(timeout)*time.Second)
var recordList []RecordInfo
SNManager.AddEvent(sn, func(data interface{}) {
response := data.(*QueryRecordInfoResponse)
@@ -388,7 +430,7 @@ func (api *ApiServer) OnRecordList(w http.ResponseWriter, r *http.Request) {
recordList = append(recordList, response.DeviceList.Devices...)
}
//查询完成
// 所有记录响应完毕
if len(recordList) >= response.SumNum {
cancelFunc()
}
@@ -402,42 +444,26 @@ func (api *ApiServer) OnRecordList(w http.ResponseWriter, r *http.Request) {
httpResponseOK(w, recordList)
}
func (api *ApiServer) OnSubscribePosition(w http.ResponseWriter, r *http.Request) {
v := struct {
DeviceID string `json:"device_id"`
ChannelID string `json:"channel_id"`
}{}
if err := HttpDecodeJSONBody(w, r, &v); err != nil {
httpResponse2(w, err)
return
}
func (api *ApiServer) OnSubscribePosition(v *DeviceChannelID, w http.ResponseWriter, r *http.Request) {
device := DeviceManager.Find(v.DeviceID)
if device == nil {
httpResponseError(w, "设备离线")
return
}
if err := device.SubscribePosition(v.ChannelID); err != nil {
}
w.WriteHeader(http.StatusOK)
}
func (api *ApiServer) OnSeekPlayback(w http.ResponseWriter, r *http.Request) {
v := struct {
StreamId StreamID `json:"stream_id"`
Seconds int `json:"seconds"`
}{}
if err := HttpDecodeJSONBody(w, r, &v); err != nil {
httpResponse2(w, err)
logger.Error("发送订阅位置请求失败 err: %s", err.Error())
httpResponseError(w, err.Error())
return
}
httpResponseOK(w, nil)
}
func (api *ApiServer) OnSeekPlayback(v *SeekParams, w http.ResponseWriter, r *http.Request) {
stream := StreamManager.Find(v.StreamId)
if stream == nil || stream.DialogRequest == nil {
httpResponseError(w, "会话不存在")
return
}
@@ -449,7 +475,7 @@ func (api *ApiServer) OnSeekPlayback(w http.ResponseWriter, r *http.Request) {
seekRequest.AppendHeader(&RtspMessageType)
SipUA.SendRequest(seekRequest)
w.WriteHeader(http.StatusOK)
httpResponseOK(w, nil)
}
func (api *ApiServer) OnPTZControl(w http.ResponseWriter, r *http.Request) {
@@ -459,7 +485,7 @@ func (api *ApiServer) OnPTZControl(w http.ResponseWriter, r *http.Request) {
func (api *ApiServer) OnWSTalk(w http.ResponseWriter, r *http.Request) {
conn, err := api.upgrader.Upgrade(w, r, nil)
if err != nil {
Sugar.Errorf("websocket头检查失败 err:%s", err.Error())
Sugar.Errorf("websocket头检查失败 err: %s", err.Error())
w.WriteHeader(http.StatusBadRequest)
return
}
@@ -474,13 +500,6 @@ func (api *ApiServer) OnWSTalk(w http.ResponseWriter, r *http.Request) {
rtp := make([]byte, 1500)
muxer := librtp.NewMuxer(8, 0, 0xFFFFFFFF)
muxer.SetAllocHandler(func(params interface{}) []byte {
return rtp[2:]
})
muxer.SetWriteHandler(func(data []byte, timestamp uint32, params interface{}) {
binary.BigEndian.PutUint16(rtp, uint16(len(data)))
room.DispatchRtpPacket(rtp[:2+len(data)])
})
for {
_, bytes, err := conn.ReadMessage()
@@ -496,13 +515,18 @@ func (api *ApiServer) OnWSTalk(w http.ResponseWriter, r *http.Request) {
for i := 0; i < count; i++ {
offset := i * 320
min := int(math.Min(float64(n), 320))
muxer.Input(bytes[offset:offset+min], uint32(min))
muxer.Input(bytes[offset:offset+min], uint32(min), func() []byte {
return rtp[2:]
}, func(data []byte) {
binary.BigEndian.PutUint16(rtp, uint16(len(data)))
room.DispatchRtpPacket(rtp[:2+len(data)])
})
n -= min
}
}
Sugar.Infof("主讲websocket断开连接 roomid:%s", roomId)
muxer.Close()
Sugar.Infof("主讲websocket断开连接 room: %s", roomId)
sessions := BroadcastManager.RemoveRoom(roomId)
for _, session := range sessions {
@@ -510,18 +534,7 @@ func (api *ApiServer) OnWSTalk(w http.ResponseWriter, r *http.Request) {
}
}
func (api *ApiServer) OnHangup(w http.ResponseWriter, r *http.Request) {
v := struct {
DeviceID string `json:"device_id"`
ChannelID string `json:"channel_id"`
RoomID string `json:"room_id"`
}{}
if err := HttpDecodeJSONBody(w, r, &v); err != nil {
httpResponse2(w, err)
return
}
func (api *ApiServer) OnHangup(v *HangupParams, w http.ResponseWriter, r *http.Request) {
if session := BroadcastManager.Remove(GenerateSessionId(v.DeviceID, v.ChannelID)); session != nil {
session.Close(true)
}
@@ -529,40 +542,39 @@ func (api *ApiServer) OnHangup(w http.ResponseWriter, r *http.Request) {
httpResponseOK(w, nil)
}
func (api *ApiServer) OnBroadcast(w http.ResponseWriter, r *http.Request) {
v := struct {
DeviceID string `json:"device_id"`
ChannelID string `json:"channel_id"`
RoomID string `json:"room_id"`
Type int `json:"type"`
}{
Type: int(BroadcastTypeTCP),
}
func (api *ApiServer) OnBroadcast(v *BroadcastParams, w http.ResponseWriter, r *http.Request) {
Sugar.Infof("语音广播 %v", *v)
if err := HttpDecodeJSONBody(w, r, &v); err != nil {
httpResponse2(w, err)
var err error
// 响应错误消息
defer func() {
if err != nil {
Sugar.Errorf("广播失败 err: %s", err.Error())
httpResponseError(w, err.Error())
}
}()
device := DeviceManager.Find(v.DeviceID)
if device == nil {
err = fmt.Errorf("设备离线")
return
}
broadcastRoom := BroadcastManager.FindRoom(v.RoomID)
if broadcastRoom == nil {
w.WriteHeader(http.StatusNotFound)
//err := fmt.Errorf("the room with id '%s' is not found", v.RoomID)
err = fmt.Errorf("广播房间找不到. room: %s", v.RoomID)
return
}
//全局唯一ID
// 每个设备的广播唯一ID
sessionId := GenerateSessionId(v.DeviceID, v.ChannelID)
if BroadcastManager.Find(sessionId) != nil {
w.WriteHeader(http.StatusForbidden)
return
}
device := DeviceManager.Find(v.DeviceID)
if device == nil {
w.WriteHeader(http.StatusNotFound)
err = fmt.Errorf("设备正在广播中. session: %s", sessionId)
return
}
// 生成让下级应答时携带的ID
sourceId := v.RoomID + utils.RandStringBytes(10)
session := &BroadcastSession{
SourceID: sourceId,
@@ -572,25 +584,52 @@ func (api *ApiServer) OnBroadcast(w http.ResponseWriter, r *http.Request) {
Type: BroadcastType(v.Type),
}
if BroadcastManager.AddSession(v.RoomID, session) {
device.Broadcast(sourceId, v.ChannelID)
if !BroadcastManager.AddSession(v.RoomID, session) {
err = fmt.Errorf("设备正在广播中. session: %s", sessionId)
return
}
cancel := r.Context()
transaction := device.Broadcast(sourceId, v.ChannelID)
responses := transaction.Responses()
var ok bool
select {
case response := <-responses:
if response == nil {
err = fmt.Errorf("信令超时")
break
}
if response.StatusCode() != http.StatusOK {
err = fmt.Errorf("answer has a bad status code: %d response: %s", response.StatusCode(), response.String())
break
}
// 不等下级的广播请求, 直接等Invite
timeout, _ := context.WithTimeout(r.Context(), 10*time.Second)
select {
case <-timeout.Done():
err = fmt.Errorf("invite超时. session: %s", session.Id())
break
case code := <-session.Answer:
if http.StatusOK != code {
err = fmt.Errorf("bad status code %d", code)
} else {
ok = true
}
break
}
break
case <-cancel.Done():
// 取消http请求
Sugar.Warnf("广播失败, 取消http请求. session: %s", session.Id())
break
}
if ok {
httpResponseOK(w, nil)
} else {
w.WriteHeader(http.StatusForbidden)
}
select {
case <-session.Answer:
break
case <-r.Context().Done():
break
}
if !session.Successful {
Sugar.Errorf("广播失败 session:%s", sessionId)
BroadcastManager.Remove(sessionId)
} else {
Sugar.Infof("广播成功 session:%s", sessionId)
}
}
@@ -606,38 +645,47 @@ func (api *ApiServer) OnStarted(w http.ResponseWriter, req *http.Request) {
}
}
func (api *ApiServer) OnPlatformAdd(w http.ResponseWriter, r *http.Request) {
v := GBPlatformRecord{}
if err := HttpDecodeJSONBody(w, r, &v); err != nil {
httpResponse2(w, err)
func (api *ApiServer) OnPlatformAdd(v *GBPlatformRecord, w http.ResponseWriter, r *http.Request) {
Sugar.Infof("添加级联 %v", *v)
var err error
// 响应错误消息
defer func() {
if err != nil {
Sugar.Errorf("添加级联失败 err: %s", err.Error())
httpResponseError(w, err.Error())
}
}()
if PlatformManager.ExistPlatform(v.SeverID) {
err = fmt.Errorf("id冲突")
return
} else if PlatformManager.ExistPlatformWithServerAddr(v.ServerAddr) {
err = fmt.Errorf("地址冲突")
return
}
if PlatformManager.ExistPlatform(v.SeverID) || PlatformManager.ExistPlatformWithServerAddr(v.ServerAddr) {
return
}
platform, err := NewGBPlatform(&v, SipUA)
platform, err := NewGBPlatform(v, SipUA)
if err != nil {
return
} else if !PlatformManager.AddPlatform(platform) {
err = fmt.Errorf("已经存在")
return
}
platform.Start()
httpResponseOK(w, nil)
}
func (api *ApiServer) OnPlatformRemove(w http.ResponseWriter, r *http.Request) {
v := GBPlatformRecord{}
if err := HttpDecodeJSONBody(w, r, &v); err != nil {
httpResponse2(w, err)
return
}
func (api *ApiServer) OnPlatformRemove(v *GBPlatformRecord, w http.ResponseWriter, r *http.Request) {
Sugar.Infof("删除级联 %v", *v)
platform := PlatformManager.RemovePlatform(v.SeverID)
if platform != nil {
platform.Stop()
}
httpResponseOK(w, nil)
}
func (api *ApiServer) OnPlatformList(w http.ResponseWriter, r *http.Request) {
@@ -645,19 +693,11 @@ func (api *ApiServer) OnPlatformList(w http.ResponseWriter, r *http.Request) {
httpResponseOK(w, platforms)
}
func (api *ApiServer) OnPlatformChannelBind(w http.ResponseWriter, r *http.Request) {
v := struct {
ServerID string `json:"server_id"`
Channels [][2]string `json:"channels"` //二维数组, 索引0-设备ID/索引1-通道ID
}{}
if err := HttpDecodeJSONBody(w, r, &v); err != nil {
httpResponse2(w, err)
return
}
func (api *ApiServer) OnPlatformChannelBind(v *PlatformChannel, w http.ResponseWriter, r *http.Request) {
platform := PlatformManager.FindPlatform(v.ServerID)
if platform == nil {
Sugar.Errorf("绑定通道失败, id: %s", v.ServerID)
httpResponseError(w, "级联设备不存在")
return
}
@@ -677,25 +717,20 @@ func (api *ApiServer) OnPlatformChannelBind(w http.ResponseWriter, r *http.Reque
}
platform.AddChannels(channels)
httpResponseOK(w, nil)
}
func (api *ApiServer) OnPlatformChannelUnbind(w http.ResponseWriter, r *http.Request) {
v := struct {
ServerID string `json:"server_id"`
Channels [][2]string `json:"channels"` //二维数组, 索引0-设备ID/索引1-通道ID
}{}
if err := HttpDecodeJSONBody(w, r, &v); err != nil {
httpResponse2(w, err)
return
}
func (api *ApiServer) OnPlatformChannelUnbind(v *PlatformChannel, w http.ResponseWriter, r *http.Request) {
platform := PlatformManager.FindPlatform(v.ServerID)
if platform == nil {
Sugar.Errorf("取消绑定通道失败, id: %s", v.ServerID)
httpResponseError(w, "级联设备不存在")
return
}
for _, pair := range v.Channels {
platform.RemoveChannel(pair[1])
}
httpResponseOK(w, nil)
}

View File

@@ -38,6 +38,7 @@ func (d *Device) DoBroadcast(sourceId, channelId string) error {
return nil
}
// OnInvite 邀请语音广播
func (d *Device) OnInvite(request sip.Request, user string) sip.Response {
session := FindBroadcastSessionWithSourceID(user)
if session == nil {
@@ -45,46 +46,46 @@ func (d *Device) OnInvite(request sip.Request, user string) sip.Response {
}
body := request.Body()
if body == "" {
return CreateResponseWithStatusCode(request, http.StatusBadRequest)
}
sdp, err := sdp.Parse(body)
offer, err := sdp.Parse(body)
if err != nil {
Sugar.Infof("解析sdp失败 err:%s sdp:%s", err.Error(), body)
Sugar.Infof("解析sdp失败. session: %s err: %s sdp: %s", session.Id(), err.Error(), body)
session.Answer <- http.StatusBadRequest
return CreateResponseWithStatusCode(request, http.StatusBadRequest)
}
if sdp.Audio == nil {
Sugar.Infof("处理sdp失败 缺少audio字段 sdp:%s", body)
} else if offer.Audio == nil {
Sugar.Infof("offer中缺少audio字段. session: %s sdp: %s", session.Id(), body)
session.Answer <- http.StatusBadRequest
return CreateResponseWithStatusCode(request, http.StatusBadRequest)
}
var answerSDP string
isTcp := strings.Contains(sdp.Audio.Proto, "TCP")
isTcp := strings.Contains(offer.Audio.Proto, "TCP")
// UDP广播
if !isTcp && BroadcastTypeUDP == session.Type {
var client *transport.UDPClient
err := TransportManager.AllocPort(false, func(port uint16) error {
client = &transport.UDPClient{}
localAddr, _ := net.ResolveUDPAddr("udp", net.JoinHostPort(Config.ListenIP, strconv.Itoa(int(port))))
remoteAddr, _ := net.ResolveUDPAddr("udp", net.JoinHostPort(sdp.Addr, strconv.Itoa(int(sdp.Audio.Port))))
remoteAddr, _ := net.ResolveUDPAddr("udp", net.JoinHostPort(offer.Addr, strconv.Itoa(int(offer.Audio.Port))))
return client.Connect(localAddr, remoteAddr)
})
if err == nil {
Sugar.Errorf("创建UDP广播端口失败 err:%s", err.Error())
session.Answer <- http.StatusInternalServerError
return CreateResponseWithStatusCode(request, http.StatusInternalServerError)
}
session.RemoteIP = sdp.Addr
session.RemotePort = int(sdp.Audio.Port)
session.RemoteIP = offer.Addr
session.RemotePort = int(offer.Audio.Port)
session.Transport = client
session.Transport.SetHandler(session)
answerSDP = fmt.Sprintf(AnswerFormat, Config.SipId, Config.PublicIP, Config.PublicIP, client.ListenPort(), "RTP/AVP")
} else {
// TCP广播
server, err := TransportManager.NewTCPServer(Config.ListenIP)
if err != nil {
Sugar.Errorf("创建TCP广播端口失败 err:%s", err.Error())
Sugar.Errorf("创建TCP广播端口失败 session: %s err:%s", session.Id(), err.Error())
session.Answer <- http.StatusInternalServerError
return CreateResponseWithStatusCode(request, http.StatusInternalServerError)
}
@@ -95,7 +96,6 @@ func (d *Device) OnInvite(request sip.Request, user string) sip.Response {
}
response := CreateResponseWithStatusCode(request, http.StatusOK)
setToTag(response)
session.Successful = true
@@ -108,6 +108,6 @@ func (d *Device) OnInvite(request sip.Request, user string) sip.Response {
response.AppendHeader(&SDPMessageType)
response.AppendHeader(GlobalContactAddress.AsContactHeader())
session.Answer <- 0
session.Answer <- http.StatusOK
return response
}

View File

@@ -10,13 +10,13 @@ import (
type BroadcastType int
const (
BroadcastTypeUDP = BroadcastType(0) //server主动向client的udp地址发包
BroadcastTypeTCP = BroadcastType(1) //等待client连接tcpserver, 用此链接发包
BroadcastTypeTCPStream = BroadcastType(2) //@See BroadcastTypeTCP, 包头不含2字节包长
BroadcastTypeUDP = BroadcastType(0) // server主动向client的udp地址发包
BroadcastTypeTCP = BroadcastType(1) // 等待client连接tcpserver, 用此链接发包
BroadcastTypeTCPStream = BroadcastType(2) // @See BroadcastTypeTCP, 包头不含2字节包长
)
type BroadcastSession struct {
SourceID string //发送广播消息时, 让设备invite请求携带的Id
SourceID string // 发送广播消息时, 让设备invite请求携带的Id
DeviceID string
ChannelID string
RoomId string
@@ -24,10 +24,10 @@ type BroadcastSession struct {
Type BroadcastType
RemotePort int
RemoteIP string //udp广播时, 对方的连接地址
Successful bool //对讲成功
Answer chan byte //处理invite后, 通知http接口
conn net.Conn //tcp广播时, client的链路
RemoteIP string // udp广播时, 对方的连接地址
Successful bool // 对讲成功
Answer chan int // 处理invite后, 通知http接口
conn net.Conn // tcp广播时, client的链路
ByeRequest sip.Request
}

View File

@@ -61,7 +61,7 @@ type GBDevice interface {
//
//SubscribeAlarm()
Broadcast(sourceId, channelId string) error
Broadcast(sourceId, channelId string) sip.ClientTransaction
OnKeepalive()
@@ -182,11 +182,10 @@ func (d *Device) SubscribePosition(channelId string) error {
return nil
}
func (d *Device) Broadcast(sourceId, channelId string) error {
func (d *Device) Broadcast(sourceId, channelId string) sip.ClientTransaction {
body := fmt.Sprintf(BroadcastFormat, 1, sourceId, channelId)
request := d.BuildMessageRequest(channelId, body)
SipUA.SendRequest(request)
return nil
return SipUA.SendRequest(request)
}
func (d *Device) OnKeepalive() {

View File

@@ -27,18 +27,6 @@ func (mr *MalformedRequest) Error() string {
return mr.Msg
}
func httpResponse2(w http.ResponseWriter, payload interface{}) {
body, _ := json.Marshal(payload)
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT")
w.Write(body)
}
func httpResponseOK(w http.ResponseWriter, payload interface{}) {
httpResponse2(w, MalformedRequest{200, "ok", payload})
}
func DecodeJSONBody(body io.ReadCloser, dst interface{}) error {
dec := json.NewDecoder(body)
//dec.DisallowUnknownFields()

43
http_response.go Normal file
View File

@@ -0,0 +1,43 @@
package main
import (
"encoding/json"
"net/http"
)
type Response[T any] struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data T `json:"data"`
}
func httpResponse(w http.ResponseWriter, code int, msg string) {
httpResponseJson(w, MalformedRequest{
Code: code,
Msg: msg,
})
}
func httpResponseJson(w http.ResponseWriter, payload interface{}) {
body, _ := json.Marshal(payload)
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT")
w.Write(body)
}
func httpResponseOK(w http.ResponseWriter, data interface{}) {
httpResponseJson(w, MalformedRequest{
Code: http.StatusOK,
Msg: "ok",
Data: data,
})
}
func httpResponseError(w http.ResponseWriter, msg string) {
httpResponseJson(w, MalformedRequest{
Code: -1,
Msg: msg,
Data: nil,
})
}

90
live.go
View File

@@ -35,7 +35,7 @@ func (i *InviteType) SessionName2Type(name string) {
}
}
func (d *Device) StartStream(inviteType InviteType, streamId StreamID, channelId, startTime, stopTime, setup string, speed int, sync bool) (*Stream, bool) {
func (d *Device) StartStream(inviteType InviteType, streamId StreamID, channelId, startTime, stopTime, setup string, speed int, sync bool) (*Stream, error) {
stream := &Stream{
ID: streamId,
forwardSinks: map[string]*Sink{},
@@ -43,19 +43,20 @@ func (d *Device) StartStream(inviteType InviteType, streamId StreamID, channelId
// 先添加占位置, 防止重复请求
if oldStream, b := StreamManager.Add(stream); !b {
return oldStream, true
return oldStream, nil
}
if dialog, ok := d.Invite(inviteType, streamId, channelId, startTime, stopTime, setup, speed); ok {
stream.DialogRequest = dialog
callID, _ := dialog.CallID()
StreamManager.AddWithCallId(callID.Value(), stream)
} else {
dialog, urls, err := d.Invite(inviteType, streamId, channelId, startTime, stopTime, setup, speed)
if err != nil {
StreamManager.Remove(streamId)
return nil, false
return nil, err
}
//开启收流超时
stream.DialogRequest = dialog
callID, _ := dialog.CallID()
StreamManager.AddWithCallId(callID.Value(), stream)
// 等待流媒体服务发送推流通知
wait := func() bool {
ok := stream.WaitForPublishEvent(10)
if !ok {
@@ -68,35 +69,40 @@ func (d *Device) StartStream(inviteType InviteType, streamId StreamID, channelId
if sync {
go wait()
} else if !sync && !wait() {
return nil, false
return nil, fmt.Errorf("receiving stream timed out")
}
return stream, true
stream.urls = urls
return stream, nil
}
func (d *Device) Invite(inviteType InviteType, streamId StreamID, channelId, startTime, stopTime, setup string, speed int) (sip.Request, bool) {
var ok bool
func (d *Device) Invite(inviteType InviteType, streamId StreamID, channelId, startTime, stopTime, setup string, speed int) (sip.Request, []string, error) {
var err error
var ssrc string
defer func() {
if !ok {
// 如果失败, 告知流媒体服务释放国标源
if err != nil {
go CloseGBSource(string(streamId))
}
}()
// 生成下发的ssrc
if InviteTypeLive != inviteType {
ssrc = GetVodSSRC()
} else {
ssrc = GetLiveSSRC()
}
// 告知流媒体服务创建国标源, 返回收流地址信息
ssrcValue, _ := strconv.Atoi(ssrc)
ip, port, err := CreateGBSource(string(streamId), setup, uint32(ssrcValue))
if err != nil {
Sugar.Errorf("创建GBSource失败 err:%s", err.Error())
return nil, false
ip, port, urls, msErr := CreateGBSource(string(streamId), setup, uint32(ssrcValue))
if msErr != nil {
Sugar.Errorf("创建GBSource失败 err: %s", msErr.Error())
return nil, nil, msErr
}
// 创建invite请求
var inviteRequest sip.Request
if InviteTypePlayback == inviteType {
inviteRequest, err = d.BuildPlaybackRequest(channelId, ip, port, startTime, stopTime, setup, ssrc)
@@ -108,21 +114,23 @@ func (d *Device) Invite(inviteType InviteType, streamId StreamID, channelId, sta
}
if err != nil {
Sugar.Errorf("创建invite失败 err:%s", err.Error())
return nil, false
Sugar.Errorf("创建invite失败 err: %s", err.Error())
return nil, nil, err
}
var dialogRequest sip.Request
var answer string
var body string
reqCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
// invite信令交互
SipUA.SendRequestWithContext(reqCtx, inviteRequest, gosip.WithResponseHandler(func(res sip.Response, request sip.Request) {
if res.StatusCode() < 200 {
} else if res.StatusCode() == 200 {
answer = res.Body()
body = res.Body()
ackRequest := sip.NewAckRequest("", inviteRequest, res, "", nil)
ackRequest.AppendHeader(GlobalContactAddress.AsContactHeader())
//手动替换ack请求目标地址, answer的contact可能不对.
// 手动替换ack请求目标地址, answer的contact可能不对.
recipient := ackRequest.Recipient()
remoteIP, remotePortStr, _ := net.SplitHostPort(d.RemoteAddr)
remotePort, _ := strconv.Atoi(remotePortStr)
@@ -132,51 +140,49 @@ func (d *Device) Invite(inviteType InviteType, streamId StreamID, channelId, sta
Sugar.Infof("send ack %s", ackRequest.String())
err := SipUA.Send(ackRequest)
err = SipUA.Send(ackRequest)
if err != nil {
cancel()
Sugar.Errorf("send ack error %s %s", err.Error(), ackRequest.String())
} else {
ok = true
dialogRequest = d.CreateDialogRequestFromAnswer(res, false)
}
} else if res.StatusCode() > 299 {
Sugar.Errorf("invite应答失败 code:%d", res.StatusCode())
err = fmt.Errorf("answer has a bad status code: %d", res.StatusCode())
Sugar.Errorf("%s response: %s", err.Error(), res.String())
cancel()
}
}))
if !ok {
return nil, false
}
if "active" == setup {
parse, err := sdp.Parse(answer)
ok = err == nil && parse.Video != nil && parse.Video.Port != 0
if !ok {
Sugar.Errorf("解析应答sdp失败 err:%v sdp:%s", err, answer)
return nil, false
if err != nil {
return nil, nil, err
} else if "active" == setup {
// 如果是TCP主动拉流, 还需要将拉流地址告知给流媒体服务
var answer *sdp.SDP
answer, err = sdp.Parse(body)
if err != nil {
return nil, nil, err
}
addr := fmt.Sprintf("%s:%d", parse.Addr, parse.Video.Port)
addr := fmt.Sprintf("%s:%d", answer.Addr, answer.Video.Port)
if err = ConnectGBSource(string(streamId), addr); err != nil {
ok = false
Sugar.Errorf("设置GB28181连接地址失败 err:%s addr:%s", err.Error(), addr)
return nil, nil, err
}
}
return dialogRequest, ok
return dialogRequest, urls, nil
}
func (d *Device) Live(streamId StreamID, channelId, setup string) (sip.Request, bool) {
func (d *Device) Live(streamId StreamID, channelId, setup string) (sip.Request, []string, error) {
return d.Invite(InviteTypeLive, streamId, channelId, "", "", setup, 0)
}
func (d *Device) Playback(streamId StreamID, channelId, startTime, stopTime, setup string) (sip.Request, bool) {
func (d *Device) Playback(streamId StreamID, channelId, startTime, stopTime, setup string) (sip.Request, []string, error) {
return d.Invite(InviteTypePlayback, streamId, channelId, startTime, stopTime, setup, 0)
}
func (d *Device) Download(streamId StreamID, channelId, startTime, stopTime, setup string, speed int) (sip.Request, bool) {
func (d *Device) Download(streamId StreamID, channelId, startTime, stopTime, setup string, speed int) (sip.Request, []string, error) {
return d.Invite(InviteTypePlayback, streamId, channelId, startTime, stopTime, setup, speed)
}

View File

@@ -29,7 +29,7 @@ func Send(path string, body interface{}) (*http.Response, error) {
return client.Do(request)
}
func CreateGBSource(id, setup string, ssrc uint32) (string, uint16, error) {
func CreateGBSource(id, setup string, ssrc uint32) (string, uint16, []string, error) {
v := &struct {
Source string `json:"source"`
Setup string `json:"setup"`
@@ -42,24 +42,22 @@ func CreateGBSource(id, setup string, ssrc uint32) (string, uint16, error) {
response, err := Send("api/v1/gb28181/source/create", v)
if err != nil {
return "", 0, err
return "", 0, nil, err
}
connectInfo := &struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data struct {
IP string `json:"ip"`
Port uint16 `json:"port,omitempty"`
}
}{}
data := &Response[struct {
IP string `json:"ip"`
Port uint16 `json:"port,omitempty"`
Urls []string `json:"urls"`
}]{}
err = DecodeJSONBody(response.Body, connectInfo)
if err != nil {
return "", 0, err
if err = DecodeJSONBody(response.Body, data); err != nil {
return "", 0, nil, err
} else if http.StatusOK != data.Code {
return "", 0, nil, fmt.Errorf(data.Msg)
}
return connectInfo.Data.IP, connectInfo.Data.Port, nil
return data.Data.IP, data.Data.Port, data.Data.Urls, nil
}
func ConnectGBSource(id, addr string) error {
@@ -104,17 +102,19 @@ func AddForwardStreamSink(id, serverAddr, setup string, ssrc uint32) (ip string,
return "", 0, "", err
}
r := struct {
ID string `json:"id"` //sink id
data := &Response[struct {
Sink string `json:"sink"`
IP string `json:"ip"`
Port uint16 `json:"port"`
}{}
}]{}
if err = DecodeJSONBody(response.Body, &r); err != nil {
if err = DecodeJSONBody(response.Body, data); err != nil {
return "", 0, "", err
} else if http.StatusOK != data.Code {
return "", 0, "", fmt.Errorf(data.Msg)
}
return r.IP, r.Port, r.ID, nil
return data.Data.IP, data.Data.Port, data.Data.Sink, nil
}
func CloseSink(sourceId string, sinkId string) {

View File

@@ -97,13 +97,12 @@ func (g *GBPlatform) OnInvite(request sip.Request, user string) sip.Response {
break
}
var ok bool
stream := StreamManager.Find(streamId)
addr := fmt.Sprintf("%s:%d", parse.Addr, media.Port)
if stream == nil {
stream, ok = device.(*Device).StartStream(inviteType, streamId, user, time[0], time[1], offerSetup, 0, true)
if !ok {
Sugar.Errorf("级联转发失败 预览流失败 StreamID: %s", streamId)
stream, err = device.(*Device).StartStream(inviteType, streamId, user, time[0], time[1], offerSetup, 0, true)
if err != nil {
Sugar.Errorf("级联转发失败 err: %s stream: %s", err.Error(), streamId)
return CreateResponseWithStatusCode(request, http.StatusBadRequest)
}
}
@@ -132,7 +131,7 @@ func (g *GBPlatform) OnInvite(request sip.Request, user string) sip.Response {
// 添加级联转发流
callID, _ := request.CallID()
stream.AddForwardSink(callID.Value(), &Sink{sinkID, g.ID, g.CreateDialogRequestFromAnswer(response, true)})
stream.AddForwardSink(callID.Value(), &Sink{sinkID, g.ID, g.CreateDialogRequestFromAnswer(response, true), g.Username})
// 保存与上级的会话
g.streams.AddWithCallId(callID.Value(), stream)

View File

@@ -13,6 +13,8 @@ type Sink struct {
id string
deviceID string
dialog sip.Request
platformID string // 级联上级ID
}
// Stream 国标推流源
@@ -26,6 +28,7 @@ type Stream struct {
forwardSinks map[string]*Sink // 级联转发Sink, Key为与上级的CallID
lock sync.RWMutex
urls []string // 拉流地址
}
func (s *Stream) AddForwardSink(id string, sink *Sink) {
@@ -46,7 +49,7 @@ func (s *Stream) RemoveForwardSink(id string) *Sink {
return sink
}
func (s *Stream) AllForwardSink() []*Sink {
func (s *Stream) ForwardSinks() []*Sink {
s.lock.Lock()
defer s.lock.Unlock()
@@ -98,7 +101,7 @@ func (s *Stream) Close(sendBye bool) {
go CloseGBSource(string(s.ID))
// 关闭所有级联会话
sinks := s.AllForwardSink()
sinks := s.ForwardSinks()
for _, sink := range sinks {
platform := PlatformManager.FindPlatform(sink.deviceID)
id, _ := sink.dialog.CallID()