mirror of
https://github.com/Monibuca/plugin-gb28181.git
synced 2025-12-24 13:27:57 +08:00
125 lines
3.2 KiB
Go
125 lines
3.2 KiB
Go
package gb28181
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// 对于录像查询,通过 queryKey (即 deviceId + channelId + sn) 唯一区分一次请求和响应
|
|
// 并将其关联起来,以实现异步响应的目的
|
|
// 提供单例实例供调用
|
|
var RecordQueryLink = NewRecordQueryLink(time.Second * 60)
|
|
|
|
type recordQueryLink struct {
|
|
pendingResult map[string]recordQueryResult // queryKey 查询结果缓存
|
|
pendingResp map[string]recordQueryResp // queryKey 待回复的查询请求
|
|
timeout time.Duration // 查询结果的过期时间
|
|
sync.RWMutex
|
|
}
|
|
|
|
type recordQueryResult struct {
|
|
time time.Time
|
|
err error
|
|
sum int
|
|
finished bool
|
|
list []*Record
|
|
}
|
|
type recordQueryResp struct {
|
|
respChan chan<- recordQueryResult
|
|
timeout time.Duration
|
|
startTime time.Time
|
|
}
|
|
|
|
func NewRecordQueryLink(resultTimeout time.Duration) *recordQueryLink {
|
|
c := &recordQueryLink{
|
|
timeout: resultTimeout,
|
|
pendingResult: make(map[string]recordQueryResult),
|
|
pendingResp: make(map[string]recordQueryResp),
|
|
}
|
|
go c.cleanTimeout()
|
|
return c
|
|
}
|
|
|
|
// 唯一区分一次录像查询
|
|
func recordQueryKey(deviceId, channelId string, sn int) string {
|
|
return fmt.Sprintf("%s-%s-%d", deviceId, channelId, sn)
|
|
}
|
|
|
|
// 定期清理过期的查询结果和请求
|
|
func (c *recordQueryLink) cleanTimeout() {
|
|
tick := time.NewTicker(time.Millisecond * 100)
|
|
for {
|
|
<-tick.C
|
|
for k, s := range c.pendingResp {
|
|
if time.Since(s.startTime) > s.timeout {
|
|
if r, ok := c.pendingResult[k]; ok {
|
|
c.notify(k, r)
|
|
} else {
|
|
c.notify(k, recordQueryResult{err: fmt.Errorf("query time out")})
|
|
}
|
|
}
|
|
}
|
|
for k, r := range c.pendingResult {
|
|
if time.Since(r.time) > c.timeout {
|
|
delete(c.pendingResult, k)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *recordQueryLink) Put(deviceId, channelId string, sn int, sum int, record []*Record) {
|
|
key, r := c.doPut(deviceId, channelId, sn, sum, record)
|
|
if r.finished {
|
|
c.notify(key, r)
|
|
}
|
|
}
|
|
|
|
func (c *recordQueryLink) doPut(deviceId, channelId string, sn, sum int, record []*Record) (key string, r recordQueryResult) {
|
|
c.Lock()
|
|
defer c.Unlock()
|
|
key = recordQueryKey(deviceId, channelId, sn)
|
|
if v, ok := c.pendingResult[key]; ok {
|
|
r = v
|
|
} else {
|
|
r = recordQueryResult{time: time.Now(), sum: sum, list: make([]*Record, 0)}
|
|
}
|
|
|
|
r.list = append(r.list, record...)
|
|
if len(r.list) == sum {
|
|
r.finished = true
|
|
}
|
|
c.pendingResult[key] = r
|
|
GB28181Plugin.Logger.Debug("put record",
|
|
zap.String("key", key),
|
|
zap.Int("sum", sum),
|
|
zap.Int("count", len(r.list)))
|
|
return
|
|
}
|
|
|
|
func (c *recordQueryLink) WaitResult(
|
|
deviceId, channelId string, sn int,
|
|
timeout time.Duration) (resultCh <-chan recordQueryResult) {
|
|
|
|
key := recordQueryKey(deviceId, channelId, sn)
|
|
c.Lock()
|
|
defer c.Unlock()
|
|
respCh := make(chan recordQueryResult, 1)
|
|
resultCh = respCh
|
|
c.pendingResp[key] = recordQueryResp{startTime: time.Now(), timeout: timeout, respChan: respCh}
|
|
return
|
|
}
|
|
|
|
func (c *recordQueryLink) notify(key string, r recordQueryResult) {
|
|
if s, ok := c.pendingResp[key]; ok {
|
|
s.respChan <- r
|
|
}
|
|
c.Lock()
|
|
defer c.Unlock()
|
|
delete(c.pendingResp, key)
|
|
delete(c.pendingResult, key)
|
|
GB28181Plugin.Logger.Debug("record notify", zap.String("key", key))
|
|
}
|