mirror of
https://github.com/lkmio/gb-cms.git
synced 2025-09-26 19:51:22 +08:00
支持视频回放
This commit is contained in:
179
api.go
179
api.go
@@ -2,13 +2,14 @@ package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"gb-cms/sdp"
|
||||
"github.com/ghettovoice/gosip"
|
||||
"github.com/ghettovoice/gosip/sip"
|
||||
"github.com/gorilla/mux"
|
||||
"math"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
@@ -31,16 +32,12 @@ type eventInfo struct {
|
||||
RemoteAddr string `json:"remote_addr"` //peer地址
|
||||
}
|
||||
|
||||
func httpResponse2(w http.ResponseWriter, payload interface{}) {
|
||||
body, _ := json.Marshal(payload)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Header().Set("Access-Control-Allow-Origin", "*")
|
||||
w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT")
|
||||
w.Write(body)
|
||||
}
|
||||
|
||||
func withCheckParams(f func(streamId, protocol string, w http.ResponseWriter, req *http.Request)) func(http.ResponseWriter, *http.Request) {
|
||||
return func(w http.ResponseWriter, req *http.Request) {
|
||||
if "" != req.URL.RawQuery {
|
||||
logger.Infof("on request %s?%s", req.URL.Path, req.URL.RawQuery)
|
||||
}
|
||||
|
||||
info := eventInfo{}
|
||||
err := HttpDecodeJSONBody(w, req, &info)
|
||||
if err != nil {
|
||||
@@ -59,7 +56,15 @@ func startApiServer(addr string) {
|
||||
apiServer.router.HandleFunc("/api/v1/hook/on_publish_done", withCheckParams(apiServer.OnPublishDone))
|
||||
apiServer.router.HandleFunc("/api/v1/hook/on_idle_timeout", withCheckParams(apiServer.OnIdleTimeout))
|
||||
apiServer.router.HandleFunc("/api/v1/hook/on_receive_timeout", withCheckParams(apiServer.OnReceiveTimeout))
|
||||
apiServer.router.HandleFunc("/api/v1/device/list", apiServer.OnDeviceList)
|
||||
|
||||
apiServer.router.HandleFunc("/api/v1/device/list", apiServer.OnDeviceList) //查询在线设备
|
||||
apiServer.router.HandleFunc("/api/v1/record/list", apiServer.OnRecordList) //查询录像列表
|
||||
apiServer.router.HandleFunc("/api/v1/position/sub", apiServer.OnSubscribePosition) //订阅移动位置
|
||||
apiServer.router.HandleFunc("/api/v1/playback/seek", apiServer.OnSeekPlayback) //回放seek
|
||||
|
||||
apiServer.router.HandleFunc("/api/v1/ptz/control", apiServer.OnPTZControl) //云台控制
|
||||
apiServer.router.HandleFunc("/api/v1/broadcast", apiServer.OnBroadcast) //语音广播
|
||||
apiServer.router.HandleFunc("/api/v1/talk", apiServer.OnTalk) //语音对讲
|
||||
http.Handle("/", apiServer.router)
|
||||
|
||||
srv := &http.Server{
|
||||
@@ -80,6 +85,17 @@ func startApiServer(addr string) {
|
||||
func (api *ApiServer) OnPlay(streamId, protocol string, w http.ResponseWriter, r *http.Request) {
|
||||
Sugar.Infof("play. protocol:%s stream id:%s", protocol, streamId)
|
||||
|
||||
//[注意]: windows上使用cmd/power shell推拉流如果要携带多个参数, 请用双引号将与号引起来("&")
|
||||
//session_id是为了同一个录像文件, 允许同时点播多个.当然如果实时流支持多路预览, 也是可以的.
|
||||
//ffplay -i rtmp://127.0.0.1/34020000001320000001/34020000001310000001
|
||||
//ffplay -i http://127.0.0.1:8080/34020000001320000001/34020000001310000001.flv?setup=passive
|
||||
//ffplay -i http://127.0.0.1:8080/34020000001320000001/34020000001310000001.m3u8?setup=passive
|
||||
//ffplay -i rtsp://test:123456@127.0.0.1/34020000001320000001/34020000001310000001?setup=passive
|
||||
|
||||
//回放示例
|
||||
//ffplay -i rtmp://127.0.0.1/34020000001320000001/34020000001310000001.session_id_0?setup=passive"&"stream_type=playback"&"start_time=2024-06-18T15:20:56"&"end_time=2024-06-18T15:25:56
|
||||
//ffplay -i rtmp://127.0.0.1/34020000001320000001/34020000001310000001.session_id_0?setup=passive&stream_type=playback&start_time=2024-06-18T15:20:56&end_time=2024-06-18T15:25:56
|
||||
|
||||
stream := StreamManager.Find(streamId)
|
||||
if stream != nil {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
@@ -93,13 +109,18 @@ func (api *ApiServer) OnPlay(streamId, protocol string, w http.ResponseWriter, r
|
||||
return
|
||||
}
|
||||
|
||||
//跳过非国标拉流
|
||||
if len(split[0]) != 20 || len(split[1]) < 20 {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
return
|
||||
}
|
||||
|
||||
deviceId := split[0] //deviceId
|
||||
channelId := split[1] //channelId
|
||||
device := DeviceManager.Find(deviceId)
|
||||
|
||||
if len(deviceId) != 20 || len(channelId) != 20 {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
return
|
||||
if len(channelId) > 20 {
|
||||
channelId = channelId[:20]
|
||||
}
|
||||
|
||||
if device == nil {
|
||||
@@ -122,16 +143,54 @@ func (api *ApiServer) OnPlay(streamId, protocol string, w http.ResponseWriter, r
|
||||
}
|
||||
}()
|
||||
|
||||
ssrc := GetLiveSSRC()
|
||||
query := r.URL.Query()
|
||||
setup := strings.ToLower(query.Get("setup"))
|
||||
streamType := strings.ToLower(query.Get("stream_type"))
|
||||
startTimeStr := strings.ToLower(query.Get("start_time"))
|
||||
endTimeStr := strings.ToLower(query.Get("end_time"))
|
||||
speedStr := strings.ToLower(query.Get("speed"))
|
||||
|
||||
var startTimeSeconds string
|
||||
var endTimeSeconds string
|
||||
var err error
|
||||
var ssrc uint32
|
||||
if "playback" == streamType || "download" == streamType {
|
||||
startTime, err := time.ParseInLocation("2006-01-02t15:04:05", startTimeStr, time.Local)
|
||||
if err != nil {
|
||||
logger.Errorf("解析开始时间失败 err:%s start_time:%s", err.Error(), startTimeStr)
|
||||
return
|
||||
}
|
||||
endTime, err := time.ParseInLocation("2006-01-02t15:04:05", endTimeStr, time.Local)
|
||||
if err != nil {
|
||||
logger.Errorf("解析开始时间失败 err:%s start_time:%s", err.Error(), startTimeStr)
|
||||
return
|
||||
}
|
||||
|
||||
startTimeSeconds = strconv.FormatInt(startTime.Unix(), 10)
|
||||
endTimeSeconds = strconv.FormatInt(endTime.Unix(), 10)
|
||||
|
||||
ssrc = GetVodSSRC()
|
||||
} else {
|
||||
ssrc = GetLiveSSRC()
|
||||
}
|
||||
|
||||
ip, port, err := CreateGBSource(streamId, setup, ssrc)
|
||||
if err != nil {
|
||||
Sugar.Errorf("创建GBSource失败 err:%s", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
inviteRequest, err := device.DoLive(channelId, ip, port, setup, ssrc)
|
||||
var inviteRequest sip.Request
|
||||
if "playback" == streamType {
|
||||
inviteRequest, err = device.BuildPlaybackRequest(channelId, ip, port, startTimeSeconds, endTimeSeconds, setup, ssrc)
|
||||
} else if "download" == streamType {
|
||||
speed, _ := strconv.Atoi(speedStr)
|
||||
speed = int(math.Min(4, float64(speed)))
|
||||
inviteRequest, err = device.BuildDownloadRequest(channelId, ip, port, startTimeSeconds, endTimeSeconds, setup, speed, ssrc)
|
||||
} else {
|
||||
inviteRequest, err = device.BuildLiveRequest(channelId, ip, port, setup, ssrc)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
@@ -234,11 +293,6 @@ func (api *ApiServer) OnPublishDone(streamId, protocol string, w http.ResponseWr
|
||||
api.CloseStream(streamId)
|
||||
}
|
||||
|
||||
func (api *ApiServer) OnDeviceList(w http.ResponseWriter, r *http.Request) {
|
||||
devices := DeviceManager.AllDevices()
|
||||
httpResponse2(w, devices)
|
||||
}
|
||||
|
||||
func (api *ApiServer) OnIdleTimeout(streamId string, protocol string, w http.ResponseWriter, req *http.Request) {
|
||||
Sugar.Infof("publish timeout. protocol:%s stream id:%s", protocol, streamId)
|
||||
|
||||
@@ -260,3 +314,88 @@ func (api *ApiServer) OnReceiveTimeout(streamId string, protocol string, w http.
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}
|
||||
}
|
||||
|
||||
func (api *ApiServer) OnDeviceList(w http.ResponseWriter, r *http.Request) {
|
||||
devices := DeviceManager.AllDevices()
|
||||
httpResponseOK(w, devices)
|
||||
}
|
||||
|
||||
func (api *ApiServer) OnRecordList(w http.ResponseWriter, r *http.Request) {
|
||||
v := struct {
|
||||
DeviceId string `json:"device_id"`
|
||||
ChannelId string `json:"channel_id"`
|
||||
Timeout int `json:"timeout"`
|
||||
StartTime string `json:"start_time"`
|
||||
EndTime string `json:"end_time"`
|
||||
Type_ string `json:"type"`
|
||||
}{}
|
||||
|
||||
err := HttpDecodeJSONBody(w, r, &v)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
device := DeviceManager.Find(v.DeviceId)
|
||||
if device == nil {
|
||||
httpResponseOK(w, "设备离线")
|
||||
return
|
||||
}
|
||||
|
||||
sn := GetSN()
|
||||
err = device.DoRecordList(v.ChannelId, v.StartTime, v.EndTime, sn, v.Type_)
|
||||
if err != nil {
|
||||
httpResponseOK(w, fmt.Sprintf("发送查询录像记录失败 err:%s", err.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
var recordList []RecordInfo
|
||||
timeout := int(math.Max(math.Min(5, float64(v.Timeout)), 60))
|
||||
//设置查询超时时长
|
||||
withTimeout, cancelFunc := context.WithTimeout(context.Background(), time.Duration(timeout)*time.Second)
|
||||
|
||||
SNManager.AddEvent(sn, func(data interface{}) {
|
||||
response := data.(*QueryRecordInfoResponse)
|
||||
|
||||
if len(response.DeviceList.Devices) > 0 {
|
||||
recordList = append(recordList, response.DeviceList.Devices...)
|
||||
}
|
||||
|
||||
//查询完成
|
||||
if len(recordList) >= response.SumNum {
|
||||
cancelFunc()
|
||||
}
|
||||
})
|
||||
|
||||
select {
|
||||
case _ = <-withTimeout.Done():
|
||||
break
|
||||
}
|
||||
|
||||
httpResponseOK(w, recordList)
|
||||
}
|
||||
|
||||
func (api *ApiServer) OnSubscribePosition(w http.ResponseWriter, r *http.Request) {
|
||||
devices := DeviceManager.AllDevices()
|
||||
httpResponse2(w, devices)
|
||||
}
|
||||
|
||||
func (api *ApiServer) OnSeekPlayback(w http.ResponseWriter, r *http.Request) {
|
||||
devices := DeviceManager.AllDevices()
|
||||
httpResponse2(w, devices)
|
||||
}
|
||||
|
||||
func (api *ApiServer) OnPTZControl(w http.ResponseWriter, r *http.Request) {
|
||||
devices := DeviceManager.AllDevices()
|
||||
httpResponse2(w, devices)
|
||||
}
|
||||
|
||||
func (api *ApiServer) OnBroadcast(w http.ResponseWriter, r *http.Request) {
|
||||
devices := DeviceManager.AllDevices()
|
||||
httpResponse2(w, devices)
|
||||
}
|
||||
|
||||
func (api *ApiServer) OnTalk(w http.ResponseWriter, r *http.Request) {
|
||||
devices := DeviceManager.AllDevices()
|
||||
httpResponse2(w, devices)
|
||||
}
|
||||
|
100
device.go
100
device.go
@@ -19,28 +19,6 @@ const (
|
||||
"%s" +
|
||||
"</DeviceID>\r\n" +
|
||||
"</Query>\r\n"
|
||||
|
||||
InviteFormat = "v=0\r\n" +
|
||||
"o=%s 0 0 IN IP4 %s\r\n" +
|
||||
"s=Play\r\n" +
|
||||
"c=IN IP4 %s\r\n" +
|
||||
"t=0 0\r\n" +
|
||||
"m=video %d RTP/AVP 96\r\n" +
|
||||
"a=%s\r\n" +
|
||||
"a=rtpmap:96 PS/90000\r\n" +
|
||||
"y=%s"
|
||||
|
||||
InviteTCPFormat = "v=0\r\n" +
|
||||
"o=%s 0 0 IN IP4 %s\r\n" +
|
||||
"s=Play\r\n" +
|
||||
"c=IN IP4 %s\r\n" +
|
||||
"t=0 0\r\n" +
|
||||
"m=video %d TCP/RTP/AVP 96\r\n" +
|
||||
"a=%s\r\n" +
|
||||
"a=rtpmap:96 PS/90000\r\n" +
|
||||
"a=setup:%s\r\n" +
|
||||
"a=connection:new\r\n" +
|
||||
"y=%s"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -97,13 +75,15 @@ type QueryCatalogResponse struct {
|
||||
DeviceList DeviceList `xml:"DeviceList"`
|
||||
}
|
||||
|
||||
func (d *DBDevice) DoCatalog() (sip.Request, error) {
|
||||
builder := d.NewRequestBuilder(sip.MESSAGE, Config.SipId, Config.SipRealm, d.Id)
|
||||
|
||||
func (d *DBDevice) BuildCatalogRequest() (sip.Request, error) {
|
||||
body := fmt.Sprintf(CatalogFormat, "1", d.Id)
|
||||
return d.BuildMessageRequest(d.Id, body)
|
||||
}
|
||||
|
||||
func (d *DBDevice) BuildMessageRequest(to, body string) (sip.Request, error) {
|
||||
builder := d.NewRequestBuilder(sip.MESSAGE, Config.SipId, Config.SipRealm, to)
|
||||
builder.SetContentType(&XmlMessageType)
|
||||
builder.SetBody(body)
|
||||
|
||||
return builder.Build()
|
||||
}
|
||||
|
||||
@@ -140,32 +120,62 @@ func (d *DBDevice) NewRequestBuilder(method sip.RequestMethod, from, realm, to s
|
||||
return builder
|
||||
}
|
||||
|
||||
func (d *DBDevice) DoLive(channelId, ip string, port uint16, setup string, ssrc uint32) (sip.Request, error) {
|
||||
func (d *DBDevice) BuildSDP(userName, sessionName, ip string, port uint16, startTime, stopTime, setup string, speed int, ssrc uint32) string {
|
||||
format := "v=0\r\n" +
|
||||
"o=%s 0 0 IN IP4 %s\r\n" +
|
||||
"s=%s\r\n" +
|
||||
"c=IN IP4 %s\r\n" +
|
||||
"t=%s %s\r\n" +
|
||||
"m=video %d %s 96\r\n" +
|
||||
"a=%s\r\n" +
|
||||
"a=rtpmap:96 PS/90000\r\n"
|
||||
|
||||
tcpFormat := "a=setup:%s\r\n" +
|
||||
"a=connection:new\r\n"
|
||||
|
||||
var tcp bool
|
||||
var mediaProtocol string
|
||||
if "active" == setup || "passive" == setup {
|
||||
mediaProtocol = "TCP/RTP/AVP"
|
||||
tcp = true
|
||||
} else {
|
||||
mediaProtocol = "RTP/AVP"
|
||||
}
|
||||
|
||||
sdp := fmt.Sprintf(format, userName, ip, sessionName, ip, startTime, stopTime, port, mediaProtocol, "recvonly")
|
||||
if tcp {
|
||||
sdp += fmt.Sprintf(tcpFormat, setup)
|
||||
}
|
||||
|
||||
if speed > 0 {
|
||||
sdp += fmt.Sprintf("a=downloadspeed:%d\r\n", speed)
|
||||
}
|
||||
|
||||
sdp += fmt.Sprintf("y=%s", fmt.Sprintf("%0*d", 10, ssrc))
|
||||
return sdp
|
||||
}
|
||||
|
||||
func (d *DBDevice) BuildInviteRequest(sessionName, channelId, ip string, port uint16, startTime, stopTime, setup string, speed int, ssrc uint32) (sip.Request, error) {
|
||||
builder := d.NewRequestBuilder(sip.INVITE, Config.SipId, Config.SipRealm, channelId)
|
||||
|
||||
tcp := true
|
||||
//var active bool
|
||||
var sdp string
|
||||
if "passive" == setup {
|
||||
} else if "active" == setup {
|
||||
// active = true
|
||||
} else {
|
||||
tcp = false
|
||||
}
|
||||
|
||||
if !tcp {
|
||||
sdp = fmt.Sprintf(InviteFormat, Config.SipId, ip, ip, port, "recvonly", fmt.Sprintf("%0*d", 10, ssrc))
|
||||
} else {
|
||||
sdp = fmt.Sprintf(InviteTCPFormat, Config.SipId, ip, ip, port, "recvonly", setup, fmt.Sprintf("%0*d", 10, ssrc))
|
||||
}
|
||||
|
||||
sdp := d.BuildSDP(Config.SipId, sessionName, ip, port, startTime, stopTime, setup, speed, ssrc)
|
||||
builder.SetContentType(&SDPMessageType)
|
||||
builder.SetContact(globalContactAddress)
|
||||
builder.SetBody(sdp)
|
||||
|
||||
return builder.Build()
|
||||
}
|
||||
|
||||
func (d *DBDevice) BuildLiveRequest(channelId, ip string, port uint16, setup string, ssrc uint32) (sip.Request, error) {
|
||||
return d.BuildInviteRequest("Play", channelId, ip, port, "0", "0", setup, 0, ssrc)
|
||||
}
|
||||
|
||||
func (d *DBDevice) BuildPlaybackRequest(channelId, ip string, port uint16, startTime, stopTime, setup string, ssrc uint32) (sip.Request, error) {
|
||||
return d.BuildInviteRequest("Playback", channelId, ip, port, startTime, stopTime, setup, 0, ssrc)
|
||||
}
|
||||
|
||||
func (d *DBDevice) BuildDownloadRequest(channelId, ip string, port uint16, startTime, stopTime, setup string, speed int, ssrc uint32) (sip.Request, error) {
|
||||
return d.BuildInviteRequest("Download", channelId, ip, port, startTime, stopTime, setup, speed, ssrc)
|
||||
}
|
||||
|
||||
func (d *DBDevice) OnCatalog(response *QueryCatalogResponse) {
|
||||
if d.Channels == nil {
|
||||
d.Channels = make(map[string]Channel, 5)
|
||||
|
@@ -27,6 +27,18 @@ func (mr *MalformedRequest) Error() string {
|
||||
return mr.Msg
|
||||
}
|
||||
|
||||
func httpResponse2(w http.ResponseWriter, payload interface{}) {
|
||||
body, _ := json.Marshal(payload)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Header().Set("Access-Control-Allow-Origin", "*")
|
||||
w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT")
|
||||
w.Write(body)
|
||||
}
|
||||
|
||||
func httpResponseOK(w http.ResponseWriter, payload interface{}) {
|
||||
httpResponse2(w, MalformedRequest{200, "ok", payload})
|
||||
}
|
||||
|
||||
func DecodeJSONBody(body io.ReadCloser, dst interface{}) error {
|
||||
dec := json.NewDecoder(body)
|
||||
//dec.DisallowUnknownFields()
|
||||
|
@@ -17,7 +17,7 @@ func Send(path string, body interface{}) (*http.Response, error) {
|
||||
}
|
||||
|
||||
client := &http.Client{
|
||||
Timeout: time.Duration(10 * time.Second),
|
||||
Timeout: 10 * time.Second,
|
||||
}
|
||||
|
||||
request, err := http.NewRequest("post", url, bytes.NewBuffer(marshal))
|
||||
@@ -40,7 +40,7 @@ func CreateGBSource(id, setup string, ssrc uint32) (string, uint16, error) {
|
||||
SSRC: ssrc,
|
||||
}
|
||||
|
||||
response, err := Send("v1/gb28181/source/create", v)
|
||||
response, err := Send("api/v1/gb28181/source/create", v)
|
||||
if err != nil {
|
||||
return "", 0, err
|
||||
}
|
||||
@@ -71,7 +71,7 @@ func ConnectGBSource(id, addr string) error {
|
||||
RemoteAddr: addr,
|
||||
}
|
||||
|
||||
_, err := Send("v1/gb28181/source/connect", v)
|
||||
_, err := Send("api/v1/gb28181/source/connect", v)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -82,6 +82,6 @@ func CloseGBSource(id string) error {
|
||||
Source: id,
|
||||
}
|
||||
|
||||
_, err := Send("v1/gb28181/source/close", v)
|
||||
_, err := Send("api/v1/gb28181/source/close", v)
|
||||
return err
|
||||
}
|
||||
|
75
record.go
Normal file
75
record.go
Normal file
@@ -0,0 +1,75 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/xml"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
const (
|
||||
QueryRecordFormat = "<?xml version=\"1.0\"?>\r\n" +
|
||||
"<Query>\r\n" +
|
||||
"<CmdType>RecordInfo</CmdType>\r\n" +
|
||||
"<SN>%d</SN>\r\n" +
|
||||
"<DeviceID>%s</DeviceID>\r\n" +
|
||||
"<StartTime>%s</StartTime>\r\n" +
|
||||
"<EndTime>%s</EndTime>\r\n" +
|
||||
"<Type>%s</Type>\r\n" +
|
||||
"</Query>\r\n"
|
||||
)
|
||||
|
||||
type QueryRecordInfoResponse struct {
|
||||
XMLName xml.Name `xml:"Response"`
|
||||
CmdType string `xml:"CmdType"`
|
||||
SN int `xml:"SN"`
|
||||
DeviceID string `xml:"DeviceID"`
|
||||
SumNum int `xml:"SumNum"`
|
||||
DeviceList RecordList `xml:"RecordList"`
|
||||
}
|
||||
|
||||
type RecordList struct {
|
||||
Num int `xml:"Num,attr"`
|
||||
Devices []RecordInfo `xml:"Item"`
|
||||
cancelFunction *context.CancelFunc
|
||||
}
|
||||
|
||||
type RecordInfo struct {
|
||||
FileSize uint64 `xml:"FileSize" json:"fileSize"`
|
||||
StartTime string `xml:"StartTime" json:"startTime"`
|
||||
EndTime string `xml:"EndTime" json:"endTime"`
|
||||
FilePath string `xml:"FilePath" json:"filePath"`
|
||||
ResourceType string `xml:"ResourceType" json:"type"`
|
||||
ResourceId string `xml:"ResourceId" json:"resourceId"`
|
||||
RecorderId string `xml:"RecorderId" json:"recorderId"`
|
||||
UserId string `xml:"UserId" json:"userId"`
|
||||
UserName string `xml:"UserName" json:"userName"`
|
||||
ResourceName string `xml:"ResourceName" json:"resourceName"`
|
||||
ResourceLength string `xml:"ResourceLength" json:"resourceLength"`
|
||||
ImportTime string `xml:"ImportTime" json:"importTime"`
|
||||
ResourceUrl string `xml:"ResourceUrl" json:"resourceUrl"`
|
||||
Remark string `xml:"Remark" json:"remark"`
|
||||
Level string `xml:"Level" json:"level"`
|
||||
BootTime string `xml:"BootTime" json:"bootTime"`
|
||||
ShutdownTime string `xml:"ShutdownTime" json:"shutdownTime"`
|
||||
}
|
||||
|
||||
func (d *DBDevice) DoRecordList(channelId, startTime, endTime string, sn int, type_ string) error {
|
||||
body := fmt.Sprintf(QueryRecordFormat, sn, channelId, startTime, endTime, type_)
|
||||
msg, err := d.BuildMessageRequest(channelId, body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
SipUA.SendRequest(msg)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *DBDevice) OnRecord(response *QueryRecordInfoResponse) {
|
||||
event := SNManager.FindEvent(response.SN)
|
||||
if event == nil {
|
||||
Sugar.Errorf("处理录像查询响应失败 SN:%d", response.SN)
|
||||
return
|
||||
}
|
||||
|
||||
event(response)
|
||||
}
|
@@ -81,7 +81,7 @@ func (s *sipServer) OnRegister(req sip.Request, tx sip.ServerTransaction) {
|
||||
sendResponse(tx, response)
|
||||
|
||||
if device != nil {
|
||||
catalog, err := device.DoCatalog()
|
||||
catalog, err := device.BuildCatalogRequest()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
@@ -160,18 +160,38 @@ func StartSipServer(config *Config_, db DeviceDB) (SipServer, error) {
|
||||
}
|
||||
|
||||
cmd := strings.ToLower(body[startIndex+len(CmdTagStart) : endIndex])
|
||||
var message interface{}
|
||||
if "keepalive" == cmd {
|
||||
return
|
||||
} else if "catalog" == cmd {
|
||||
message := &QueryCatalogResponse{}
|
||||
if err := DecodeXML([]byte(body), message); err != nil {
|
||||
Sugar.Errorf("解析xml异常 >>> %s %s", err.Error(), body)
|
||||
return
|
||||
}
|
||||
message = &QueryCatalogResponse{}
|
||||
} else if "recordinfo" == cmd {
|
||||
message = &QueryRecordInfoResponse{}
|
||||
} else if "mediastatus" == cmd {
|
||||
return
|
||||
}
|
||||
|
||||
if device := DeviceManager.Find(message.DeviceID); device != nil {
|
||||
device.OnCatalog(message)
|
||||
if err := DecodeXML([]byte(body), message); err != nil {
|
||||
Sugar.Errorf("解析xml异常 >>> %s %s", err.Error(), body)
|
||||
return
|
||||
}
|
||||
|
||||
switch cmd {
|
||||
case "catalog":
|
||||
{
|
||||
if device := DeviceManager.Find(message.(*QueryCatalogResponse).DeviceID); device != nil {
|
||||
device.OnCatalog(message.(*QueryCatalogResponse))
|
||||
}
|
||||
}
|
||||
break
|
||||
|
||||
case "recordinfo":
|
||||
{
|
||||
if device := DeviceManager.Find(message.(*QueryRecordInfoResponse).DeviceID); device != nil {
|
||||
device.OnRecord(message.(*QueryRecordInfoResponse))
|
||||
}
|
||||
}
|
||||
break
|
||||
}
|
||||
})
|
||||
|
||||
|
54
sn_manager.go
Normal file
54
sn_manager.go
Normal file
@@ -0,0 +1,54 @@
|
||||
package main
|
||||
|
||||
import "sync"
|
||||
|
||||
var (
|
||||
snValue int
|
||||
SNManager snManager
|
||||
)
|
||||
|
||||
type EventCb func(data interface{})
|
||||
|
||||
func init() {
|
||||
SNManager.events = make(map[int]EventCb, 1024)
|
||||
}
|
||||
|
||||
func GetSN() int {
|
||||
for snValue < 0xFFFFFF {
|
||||
snValue = (snValue + 1) % 0xFFFFFF
|
||||
if SNManager.FindEvent(snValue) == nil {
|
||||
return snValue
|
||||
}
|
||||
}
|
||||
|
||||
return 0
|
||||
}
|
||||
|
||||
type snManager struct {
|
||||
events map[int]EventCb
|
||||
lock sync.RWMutex
|
||||
}
|
||||
|
||||
func (s *snManager) AddEvent(sn int, cb EventCb) {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
|
||||
s.events[sn] = cb
|
||||
}
|
||||
|
||||
func (s *snManager) FindEvent(sn int) EventCb {
|
||||
s.lock.RLock()
|
||||
cb, ok := s.events[sn]
|
||||
s.lock.RUnlock()
|
||||
if ok {
|
||||
return cb
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *snManager) RemoveEvent(sn int) {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
delete(s.events, sn)
|
||||
}
|
@@ -16,6 +16,10 @@ var (
|
||||
func GetLiveSSRC() uint32 {
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
ssrc = ssrc + 1%SsrcMaxValue
|
||||
ssrc = (ssrc + 1) % SsrcMaxValue
|
||||
return ssrc
|
||||
}
|
||||
|
||||
func GetVodSSRC() uint32 {
|
||||
return 1000000000 + GetLiveSSRC()
|
||||
}
|
||||
|
@@ -12,21 +12,37 @@ func init() {
|
||||
}
|
||||
|
||||
type streamManager struct {
|
||||
m sync.Map
|
||||
streams sync.Map
|
||||
callIds sync.Map
|
||||
}
|
||||
|
||||
func (s *streamManager) Add(device *Stream) error {
|
||||
_, ok := s.m.LoadOrStore(device.Id, device)
|
||||
if ok {
|
||||
if _, ok := s.streams.LoadOrStore(device.Id, device); ok {
|
||||
return fmt.Errorf("the stream %s has been exist", device.Id)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *streamManager) AddWithCallId(device *Stream) error {
|
||||
id, _ := device.ByeRequest.CallID()
|
||||
if _, ok := s.streams.LoadOrStore(id.Value(), device); ok {
|
||||
return fmt.Errorf("the stream %s has been exist", id.Value())
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *streamManager) Find(id string) *Stream {
|
||||
value, ok := s.m.Load(id)
|
||||
if ok {
|
||||
if value, ok := s.streams.Load(id); ok {
|
||||
return value.(*Stream)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *streamManager) FindWithCallId(id string) *Stream {
|
||||
if value, ok := s.callIds.Load(id); ok {
|
||||
return value.(*Stream)
|
||||
}
|
||||
|
||||
@@ -34,8 +50,20 @@ func (s *streamManager) Find(id string) *Stream {
|
||||
}
|
||||
|
||||
func (s *streamManager) Remove(id string) (*Stream, error) {
|
||||
value, loaded := s.m.LoadAndDelete(id)
|
||||
if loaded {
|
||||
value, loaded := s.streams.LoadAndDelete(id)
|
||||
if loaded && value.(*Stream).ByeRequest != nil {
|
||||
id, _ := value.(*Stream).ByeRequest.CallID()
|
||||
s.callIds.Delete(id.Value())
|
||||
return value.(*Stream), nil
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("stream with id %s was not find", id)
|
||||
}
|
||||
|
||||
func (s *streamManager) RemoveWithCallId(id string) (*Stream, error) {
|
||||
value, loaded := s.callIds.LoadAndDelete(id)
|
||||
if loaded {
|
||||
s.streams.Delete(value.(*Stream).Id)
|
||||
return value.(*Stream), nil
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user