mirror of
https://github.com/lkmio/gb-cms.git
synced 2025-09-27 03:56:08 +08:00
422 lines
12 KiB
Go
422 lines
12 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"gb-cms/sdp"
|
|
"github.com/ghettovoice/gosip"
|
|
"github.com/ghettovoice/gosip/sip"
|
|
"github.com/gorilla/mux"
|
|
"math"
|
|
"net/http"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
type ApiServer struct {
|
|
router *mux.Router
|
|
}
|
|
|
|
var apiServer *ApiServer
|
|
|
|
func init() {
|
|
apiServer = &ApiServer{
|
|
router: mux.NewRouter(),
|
|
}
|
|
}
|
|
|
|
func withCheckParams(f func(streamId, protocol string, w http.ResponseWriter, req *http.Request)) func(http.ResponseWriter, *http.Request) {
|
|
return func(w http.ResponseWriter, req *http.Request) {
|
|
if "" != req.URL.RawQuery {
|
|
logger.Infof("on request %s?%s", req.URL.Path, req.URL.RawQuery)
|
|
}
|
|
|
|
v := struct {
|
|
Stream string `json:"stream"` //Stream id
|
|
Protocol string `json:"protocol"` //推拉流协议
|
|
RemoteAddr string `json:"remote_addr"` //peer地址
|
|
}{}
|
|
|
|
err := HttpDecodeJSONBody(w, req, &v)
|
|
if err != nil {
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
f(v.Stream, v.Protocol, w, req)
|
|
}
|
|
}
|
|
|
|
func startApiServer(addr string) {
|
|
apiServer.router.HandleFunc("/api/v1/hook/on_play", withCheckParams(apiServer.OnPlay))
|
|
apiServer.router.HandleFunc("/api/v1/hook/on_play_done", withCheckParams(apiServer.OnPlayDone))
|
|
apiServer.router.HandleFunc("/api/v1/hook/on_publish", withCheckParams(apiServer.OnPublish))
|
|
apiServer.router.HandleFunc("/api/v1/hook/on_publish_done", withCheckParams(apiServer.OnPublishDone))
|
|
apiServer.router.HandleFunc("/api/v1/hook/on_idle_timeout", withCheckParams(apiServer.OnIdleTimeout))
|
|
apiServer.router.HandleFunc("/api/v1/hook/on_receive_timeout", withCheckParams(apiServer.OnReceiveTimeout))
|
|
|
|
apiServer.router.HandleFunc("/api/v1/device/list", apiServer.OnDeviceList) //查询在线设备
|
|
apiServer.router.HandleFunc("/api/v1/record/list", apiServer.OnRecordList) //查询录像列表
|
|
apiServer.router.HandleFunc("/api/v1/position/sub", apiServer.OnSubscribePosition) //订阅移动位置
|
|
apiServer.router.HandleFunc("/api/v1/playback/seek", apiServer.OnSeekPlayback) //回放seek
|
|
|
|
apiServer.router.HandleFunc("/api/v1/ptz/control", apiServer.OnPTZControl) //云台控制
|
|
apiServer.router.HandleFunc("/api/v1/broadcast", apiServer.OnBroadcast) //语音广播
|
|
apiServer.router.HandleFunc("/api/v1/talk", apiServer.OnTalk) //语音对讲
|
|
http.Handle("/", apiServer.router)
|
|
|
|
srv := &http.Server{
|
|
Handler: apiServer.router,
|
|
Addr: addr,
|
|
// Good practice: enforce timeouts for servers you create!
|
|
WriteTimeout: 30 * time.Second,
|
|
ReadTimeout: 30 * time.Second,
|
|
}
|
|
|
|
err := srv.ListenAndServe()
|
|
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
func (api *ApiServer) OnPlay(streamId, protocol string, w http.ResponseWriter, r *http.Request) {
|
|
Sugar.Infof("play. protocol:%s stream id:%s", protocol, streamId)
|
|
|
|
//[注意]: windows上使用cmd/power shell推拉流如果要携带多个参数, 请用双引号将与号引起来("&")
|
|
//session_id是为了同一个录像文件, 允许同时点播多个.当然如果实时流支持多路预览, 也是可以的.
|
|
//ffplay -i rtmp://127.0.0.1/34020000001320000001/34020000001310000001
|
|
//ffplay -i http://127.0.0.1:8080/34020000001320000001/34020000001310000001.flv?setup=passive
|
|
//ffplay -i http://127.0.0.1:8080/34020000001320000001/34020000001310000001.m3u8?setup=passive
|
|
//ffplay -i rtsp://test:123456@127.0.0.1/34020000001320000001/34020000001310000001?setup=passive
|
|
|
|
//回放示例
|
|
//ffplay -i rtmp://127.0.0.1/34020000001320000001/34020000001310000001.session_id_0?setup=passive"&"stream_type=playback"&"start_time=2024-06-18T15:20:56"&"end_time=2024-06-18T15:25:56
|
|
//ffplay -i rtmp://127.0.0.1/34020000001320000001/34020000001310000001.session_id_0?setup=passive&stream_type=playback&start_time=2024-06-18T15:20:56&end_time=2024-06-18T15:25:56
|
|
|
|
stream := StreamManager.Find(streamId)
|
|
if stream != nil {
|
|
w.WriteHeader(http.StatusOK)
|
|
return
|
|
}
|
|
|
|
split := strings.Split(streamId, "/")
|
|
if len(split) != 2 {
|
|
w.WriteHeader(http.StatusOK)
|
|
return
|
|
}
|
|
|
|
//跳过非国标拉流
|
|
if len(split[0]) != 20 || len(split[1]) < 20 {
|
|
w.WriteHeader(http.StatusOK)
|
|
return
|
|
}
|
|
|
|
deviceId := split[0] //deviceId
|
|
channelId := split[1] //channelId
|
|
device := DeviceManager.Find(deviceId)
|
|
|
|
if len(channelId) > 20 {
|
|
channelId = channelId[:20]
|
|
}
|
|
|
|
if device == nil {
|
|
Sugar.Warnf("设备离线 id:%s", deviceId)
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
stream = &Stream{Id: streamId, Protocol: "28181", ByeRequest: nil}
|
|
if err := StreamManager.Add(stream); err != nil {
|
|
w.WriteHeader(http.StatusOK)
|
|
return
|
|
}
|
|
|
|
var inviteOk bool
|
|
defer func() {
|
|
if !inviteOk {
|
|
api.CloseStream(streamId)
|
|
go CloseGBSource(streamId)
|
|
}
|
|
}()
|
|
|
|
query := r.URL.Query()
|
|
setup := strings.ToLower(query.Get("setup"))
|
|
streamType := strings.ToLower(query.Get("stream_type"))
|
|
startTimeStr := strings.ToLower(query.Get("start_time"))
|
|
endTimeStr := strings.ToLower(query.Get("end_time"))
|
|
speedStr := strings.ToLower(query.Get("speed"))
|
|
|
|
var startTimeSeconds string
|
|
var endTimeSeconds string
|
|
var err error
|
|
var ssrc uint32
|
|
if "playback" == streamType || "download" == streamType {
|
|
startTime, err := time.ParseInLocation("2006-01-02t15:04:05", startTimeStr, time.Local)
|
|
if err != nil {
|
|
logger.Errorf("解析开始时间失败 err:%s start_time:%s", err.Error(), startTimeStr)
|
|
return
|
|
}
|
|
endTime, err := time.ParseInLocation("2006-01-02t15:04:05", endTimeStr, time.Local)
|
|
if err != nil {
|
|
logger.Errorf("解析开始时间失败 err:%s start_time:%s", err.Error(), startTimeStr)
|
|
return
|
|
}
|
|
|
|
startTimeSeconds = strconv.FormatInt(startTime.Unix(), 10)
|
|
endTimeSeconds = strconv.FormatInt(endTime.Unix(), 10)
|
|
|
|
ssrc = GetVodSSRC()
|
|
} else {
|
|
ssrc = GetLiveSSRC()
|
|
}
|
|
|
|
ip, port, err := CreateGBSource(streamId, setup, ssrc)
|
|
if err != nil {
|
|
Sugar.Errorf("创建GBSource失败 err:%s", err.Error())
|
|
return
|
|
}
|
|
|
|
var inviteRequest sip.Request
|
|
if "playback" == streamType {
|
|
inviteRequest, err = device.BuildPlaybackRequest(channelId, ip, port, startTimeSeconds, endTimeSeconds, setup, ssrc)
|
|
} else if "download" == streamType {
|
|
speed, _ := strconv.Atoi(speedStr)
|
|
speed = int(math.Min(4, float64(speed)))
|
|
inviteRequest, err = device.BuildDownloadRequest(channelId, ip, port, startTimeSeconds, endTimeSeconds, setup, speed, ssrc)
|
|
} else {
|
|
inviteRequest, err = device.BuildLiveRequest(channelId, ip, port, setup, ssrc)
|
|
}
|
|
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
var bye sip.Request
|
|
var answer string
|
|
reqCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
SipUA.SendRequestWithContext(reqCtx, inviteRequest, gosip.WithResponseHandler(func(res sip.Response, request sip.Request) {
|
|
if res.StatusCode() < 200 {
|
|
|
|
} else if res.StatusCode() == 200 {
|
|
answer = res.Body()
|
|
ackRequest := sip.NewAckRequest("", inviteRequest, res, "", nil)
|
|
ackRequest.AppendHeader(globalContactAddress.AsContactHeader())
|
|
//手动替换ack请求目标地址, answer的contact可能不对.
|
|
recipient := ackRequest.Recipient()
|
|
recipient.SetHost(Config.PublicIP)
|
|
recipient.SetPort(&Config.SipPort)
|
|
|
|
Sugar.Infof("send ack %s", ackRequest.String())
|
|
|
|
err := SipUA.Send(ackRequest)
|
|
if err != nil {
|
|
cancel()
|
|
Sugar.Errorf("send ack error %s %s", err.Error(), ackRequest.String())
|
|
} else {
|
|
inviteOk = true
|
|
bye = ackRequest.Clone().(sip.Request)
|
|
bye.SetMethod(sip.BYE)
|
|
bye.RemoveHeader("Via")
|
|
if seq, ok := bye.CSeq(); ok {
|
|
seq.SeqNo++
|
|
seq.MethodName = sip.BYE
|
|
}
|
|
}
|
|
} else if res.StatusCode() > 299 {
|
|
cancel()
|
|
}
|
|
}))
|
|
|
|
if !inviteOk {
|
|
return
|
|
}
|
|
|
|
if "active" == setup {
|
|
parse, err := sdp.Parse(answer)
|
|
if err != nil {
|
|
inviteOk = false
|
|
logger.Errorf("解析应答sdp失败 err:%s sdp:%s", err.Error(), answer)
|
|
return
|
|
}
|
|
if parse.Video == nil || parse.Video.Port == 0 {
|
|
inviteOk = false
|
|
logger.Errorf("应答不没有视频连接地址 sdp:%s", answer)
|
|
return
|
|
}
|
|
|
|
addr := fmt.Sprintf("%s:%d", parse.Addr, parse.Video.Port)
|
|
if err = ConnectGBSource(streamId, addr); err != nil {
|
|
inviteOk = false
|
|
logger.Errorf("设置GB28181连接地址失败 err:%s addr:%s", err.Error(), addr)
|
|
}
|
|
}
|
|
|
|
if stream.waitPublishStream() {
|
|
stream.ByeRequest = bye
|
|
w.WriteHeader(http.StatusOK)
|
|
} else {
|
|
SipUA.SendRequest(bye)
|
|
}
|
|
}
|
|
|
|
func (api *ApiServer) CloseStream(streamId string) {
|
|
stream, _ := StreamManager.Remove(streamId)
|
|
if stream != nil && stream.ByeRequest != nil {
|
|
SipUA.SendRequest(stream.ByeRequest)
|
|
return
|
|
}
|
|
}
|
|
|
|
func (api *ApiServer) OnPlayDone(streamId, protocol string, w http.ResponseWriter, r *http.Request) {
|
|
Sugar.Infof("play done. protocol:%s stream id:%s", protocol, streamId)
|
|
w.WriteHeader(http.StatusOK)
|
|
}
|
|
|
|
func (api *ApiServer) OnPublish(streamId, protocol string, w http.ResponseWriter, r *http.Request) {
|
|
Sugar.Infof("publish. protocol:%s stream id:%s", protocol, streamId)
|
|
|
|
w.WriteHeader(http.StatusOK)
|
|
stream := StreamManager.Find(streamId)
|
|
if stream != nil {
|
|
stream.publishEvent <- 0
|
|
}
|
|
}
|
|
|
|
func (api *ApiServer) OnPublishDone(streamId, protocol string, w http.ResponseWriter, r *http.Request) {
|
|
Sugar.Infof("publish done. protocol:%s stream id:%s", protocol, streamId)
|
|
|
|
w.WriteHeader(http.StatusOK)
|
|
api.CloseStream(streamId)
|
|
}
|
|
|
|
func (api *ApiServer) OnIdleTimeout(streamId string, protocol string, w http.ResponseWriter, req *http.Request) {
|
|
Sugar.Infof("publish timeout. protocol:%s stream id:%s", protocol, streamId)
|
|
|
|
if protocol != "rtmp" {
|
|
w.WriteHeader(http.StatusForbidden)
|
|
api.CloseStream(streamId)
|
|
} else {
|
|
w.WriteHeader(http.StatusOK)
|
|
}
|
|
}
|
|
|
|
func (api *ApiServer) OnReceiveTimeout(streamId string, protocol string, w http.ResponseWriter, req *http.Request) {
|
|
Sugar.Infof("receive timeout. protocol:%s stream id:%s", protocol, streamId)
|
|
|
|
if protocol != "rtmp" {
|
|
w.WriteHeader(http.StatusForbidden)
|
|
api.CloseStream(streamId)
|
|
} else {
|
|
w.WriteHeader(http.StatusOK)
|
|
}
|
|
}
|
|
|
|
func (api *ApiServer) OnDeviceList(w http.ResponseWriter, r *http.Request) {
|
|
devices := DeviceManager.AllDevices()
|
|
httpResponseOK(w, devices)
|
|
}
|
|
|
|
func (api *ApiServer) OnRecordList(w http.ResponseWriter, r *http.Request) {
|
|
v := struct {
|
|
DeviceId string `json:"device_id"`
|
|
ChannelId string `json:"channel_id"`
|
|
Timeout int `json:"timeout"`
|
|
StartTime string `json:"start_time"`
|
|
EndTime string `json:"end_time"`
|
|
Type_ string `json:"type"`
|
|
}{}
|
|
|
|
err := HttpDecodeJSONBody(w, r, &v)
|
|
if err != nil {
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
device := DeviceManager.Find(v.DeviceId)
|
|
if device == nil {
|
|
httpResponseOK(w, "设备离线")
|
|
return
|
|
}
|
|
|
|
sn := GetSN()
|
|
err = device.DoRecordList(v.ChannelId, v.StartTime, v.EndTime, sn, v.Type_)
|
|
if err != nil {
|
|
httpResponseOK(w, fmt.Sprintf("发送查询录像记录失败 err:%s", err.Error()))
|
|
return
|
|
}
|
|
|
|
var recordList []RecordInfo
|
|
timeout := int(math.Max(math.Min(5, float64(v.Timeout)), 60))
|
|
//设置查询超时时长
|
|
withTimeout, cancelFunc := context.WithTimeout(context.Background(), time.Duration(timeout)*time.Second)
|
|
|
|
SNManager.AddEvent(sn, func(data interface{}) {
|
|
response := data.(*QueryRecordInfoResponse)
|
|
|
|
if len(response.DeviceList.Devices) > 0 {
|
|
recordList = append(recordList, response.DeviceList.Devices...)
|
|
}
|
|
|
|
//查询完成
|
|
if len(recordList) >= response.SumNum {
|
|
cancelFunc()
|
|
}
|
|
})
|
|
|
|
select {
|
|
case _ = <-withTimeout.Done():
|
|
break
|
|
}
|
|
|
|
httpResponseOK(w, recordList)
|
|
}
|
|
|
|
func (api *ApiServer) OnSubscribePosition(w http.ResponseWriter, r *http.Request) {
|
|
v := struct {
|
|
DeviceID string `json:"device_id"`
|
|
ChannelID string `json:"channel_id"`
|
|
}{}
|
|
|
|
if err := HttpDecodeJSONBody(w, r, &v); err != nil {
|
|
httpResponse2(w, err)
|
|
return
|
|
}
|
|
|
|
device := DeviceManager.Find(v.DeviceID)
|
|
if device == nil {
|
|
return
|
|
}
|
|
|
|
if err := device.DoSubscribePosition(v.ChannelID); err != nil {
|
|
|
|
}
|
|
|
|
w.WriteHeader(http.StatusOK)
|
|
}
|
|
|
|
func (api *ApiServer) OnSeekPlayback(w http.ResponseWriter, r *http.Request) {
|
|
devices := DeviceManager.AllDevices()
|
|
httpResponse2(w, devices)
|
|
}
|
|
|
|
func (api *ApiServer) OnPTZControl(w http.ResponseWriter, r *http.Request) {
|
|
devices := DeviceManager.AllDevices()
|
|
httpResponse2(w, devices)
|
|
}
|
|
|
|
func (api *ApiServer) OnBroadcast(w http.ResponseWriter, r *http.Request) {
|
|
//v := struct {
|
|
// DeviceID string `json:"device_id"`
|
|
// ChannelID string `json:"channel_id"`
|
|
// RoomID string `json:"room_id"` //如果要实现群呼功能, 除第一次广播外, 后续请求都携带该参数
|
|
//}{}
|
|
|
|
}
|
|
|
|
func (api *ApiServer) OnTalk(w http.ResponseWriter, r *http.Request) {
|
|
devices := DeviceManager.AllDevices()
|
|
httpResponse2(w, devices)
|
|
}
|