IO fix: workaround for ONVIF event system

This commit is contained in:
Cedric Verstraeten
2024-08-25 20:27:46 +02:00
parent 79205abe29
commit e0c6375261
8 changed files with 172 additions and 78 deletions

View File

@@ -1,5 +1,4 @@
// Package docs GENERATED BY SWAG; DO NOT EDIT
// This file was generated by swaggo/swag
// Package docs Code generated by swaggo/swag. DO NOT EDIT
package docs
import "github.com/swaggo/swag"
@@ -388,7 +387,7 @@ const docTemplate = `{
"operationId": "snapshot-base64",
"responses": {
"200": {
"description": ""
"description": "OK"
}
}
}
@@ -403,7 +402,7 @@ const docTemplate = `{
"operationId": "snapshot-jpeg",
"responses": {
"200": {
"description": ""
"description": "OK"
}
}
}
@@ -476,7 +475,7 @@ const docTemplate = `{
"operationId": "config",
"responses": {
"200": {
"description": ""
"description": "OK"
}
}
},
@@ -500,7 +499,7 @@ const docTemplate = `{
],
"responses": {
"200": {
"description": ""
"description": "OK"
}
}
}
@@ -515,7 +514,7 @@ const docTemplate = `{
"operationId": "dashboard",
"responses": {
"200": {
"description": ""
"description": "OK"
}
}
}
@@ -530,7 +529,7 @@ const docTemplate = `{
"operationId": "days",
"responses": {
"200": {
"description": ""
"description": "OK"
}
}
}
@@ -590,7 +589,7 @@ const docTemplate = `{
],
"responses": {
"200": {
"description": ""
"description": "OK"
}
}
}
@@ -803,6 +802,9 @@ const docTemplate = `{
"description": "obsolete",
"type": "string"
},
"hub_encryption": {
"type": "string"
},
"hub_key": {
"type": "string"
},
@@ -839,6 +841,12 @@ const docTemplate = `{
"offline": {
"type": "string"
},
"realtimeprocessing": {
"type": "string"
},
"realtimeprocessing_topic": {
"type": "string"
},
"region": {
"$ref": "#/definitions/models.Region"
},
@@ -863,6 +871,9 @@ const docTemplate = `{
"timezone": {
"type": "string"
},
"turn_force": {
"type": "string"
},
"turn_password": {
"type": "string"
},
@@ -957,9 +968,18 @@ const docTemplate = `{
"rtsp": {
"type": "string"
},
"sub_fps": {
"type": "string"
},
"sub_height": {
"type": "integer"
},
"sub_rtsp": {
"type": "string"
},
"sub_width": {
"type": "integer"
},
"width": {
"type": "integer"
}
@@ -1166,6 +1186,8 @@ var SwaggerInfo = &swag.Spec{
Description: "This is the API for using and configure Kerberos Agent.",
InfoInstanceName: "swagger",
SwaggerTemplate: docTemplate,
LeftDelim: "{{",
RightDelim: "}}",
}
func init() {

View File

@@ -380,7 +380,7 @@
"operationId": "snapshot-base64",
"responses": {
"200": {
"description": ""
"description": "OK"
}
}
}
@@ -395,7 +395,7 @@
"operationId": "snapshot-jpeg",
"responses": {
"200": {
"description": ""
"description": "OK"
}
}
}
@@ -468,7 +468,7 @@
"operationId": "config",
"responses": {
"200": {
"description": ""
"description": "OK"
}
}
},
@@ -492,7 +492,7 @@
],
"responses": {
"200": {
"description": ""
"description": "OK"
}
}
}
@@ -507,7 +507,7 @@
"operationId": "dashboard",
"responses": {
"200": {
"description": ""
"description": "OK"
}
}
}
@@ -522,7 +522,7 @@
"operationId": "days",
"responses": {
"200": {
"description": ""
"description": "OK"
}
}
}
@@ -582,7 +582,7 @@
],
"responses": {
"200": {
"description": ""
"description": "OK"
}
}
}
@@ -795,6 +795,9 @@
"description": "obsolete",
"type": "string"
},
"hub_encryption": {
"type": "string"
},
"hub_key": {
"type": "string"
},
@@ -831,6 +834,12 @@
"offline": {
"type": "string"
},
"realtimeprocessing": {
"type": "string"
},
"realtimeprocessing_topic": {
"type": "string"
},
"region": {
"$ref": "#/definitions/models.Region"
},
@@ -855,6 +864,9 @@
"timezone": {
"type": "string"
},
"turn_force": {
"type": "string"
},
"turn_password": {
"type": "string"
},
@@ -949,9 +961,18 @@
"rtsp": {
"type": "string"
},
"sub_fps": {
"type": "string"
},
"sub_height": {
"type": "integer"
},
"sub_rtsp": {
"type": "string"
},
"sub_width": {
"type": "integer"
},
"width": {
"type": "integer"
}

View File

@@ -95,6 +95,8 @@ definitions:
heartbeaturi:
description: obsolete
type: string
hub_encryption:
type: string
hub_key:
type: string
hub_private_key:
@@ -119,6 +121,10 @@ definitions:
type: string
offline:
type: string
realtimeprocessing:
type: string
realtimeprocessing_topic:
type: string
region:
$ref: '#/definitions/models.Region'
remove_after_upload:
@@ -135,6 +141,8 @@ definitions:
type: array
timezone:
type: string
turn_force:
type: string
turn_password:
type: string
turn_username:
@@ -196,8 +204,14 @@ definitions:
type: string
rtsp:
type: string
sub_fps:
type: string
sub_height:
type: integer
sub_rtsp:
type: string
sub_width:
type: integer
width:
type: integer
type: object
@@ -564,7 +578,7 @@ paths:
operationId: snapshot-base64
responses:
"200":
description: ""
description: OK
summary: Get a snapshot from the camera in base64.
tags:
- camera
@@ -574,7 +588,7 @@ paths:
operationId: snapshot-jpeg
responses:
"200":
description: ""
description: OK
summary: Get a snapshot from the camera in jpeg format.
tags:
- camera
@@ -624,7 +638,7 @@ paths:
operationId: config
responses:
"200":
description: ""
description: OK
summary: Get the current configuration.
tags:
- config
@@ -640,7 +654,7 @@ paths:
$ref: '#/definitions/models.Config'
responses:
"200":
description: ""
description: OK
summary: Update the current configuration.
tags:
- config
@@ -650,7 +664,7 @@ paths:
operationId: dashboard
responses:
"200":
description: ""
description: OK
summary: Get all information showed on the dashboard.
tags:
- general
@@ -660,7 +674,7 @@ paths:
operationId: days
responses:
"200":
description: ""
description: OK
summary: Get all days stored in the recordings directory.
tags:
- general
@@ -698,7 +712,7 @@ paths:
$ref: '#/definitions/models.EventFilter'
responses:
"200":
description: ""
description: OK
summary: Get the latest recordings (events) from the recordings directory.
tags:
- general

View File

@@ -19,7 +19,7 @@ import (
"gopkg.in/DataDog/dd-trace-go.v1/profiler"
)
var VERSION = "3.2.0"
var VERSION = utils.VERSION
func main() {
// You might be interested in debugging the agent.

View File

@@ -229,17 +229,17 @@ func HandleHeartBeat(configuration *models.Configuration, communication *models.
} else {
client = &http.Client{}
}
config := configuration.Config
kerberosAgentVersion := "3.1.8"
kerberosAgentVersion := utils.VERSION
// Get a pull point address
var pullPointAddress string
if config.Capture.IPCamera.ONVIFXAddr != "" {
// Create a loop pull point address, which we will use to retrieve async events
// As you'll read below camera manufactures are having different implementations of events.
var pullPointAddressLoopState string
if configuration.Config.Capture.IPCamera.ONVIFXAddr != "" {
cameraConfiguration := configuration.Config.Capture.IPCamera
device, _, err := onvif.ConnectToOnvifDevice(&cameraConfiguration)
if err == nil {
pullPointAddress, err = onvif.CreatePullPointSubscription(device)
if err != nil {
pullPointAddressLoopState, err = onvif.CreatePullPointSubscription(device)
if err != nil {
log.Log.Error("cloud.HandleHeartBeat(): error while creating pull point subscription: " + err.Error())
}
@@ -263,6 +263,7 @@ loop:
cameraConfiguration := configuration.Config.Capture.IPCamera
device, _, err := onvif.ConnectToOnvifDevice(&cameraConfiguration)
if err == nil {
// We will try to retrieve the PTZ configurations from the device.
onvifEnabled = "true"
configurations, err := onvif.GetPTZConfigurationsFromDevice(device)
if err == nil {
@@ -297,9 +298,22 @@ loop:
// We will also fetch some events, to know the status of the inputs and outputs.
// More event types might be added.
if pullPointAddress != "" {
// -- We have two differen pull point subscriptions, one for the initials events and one for the loop.
// -- Some cameras do send recurrent events, others don't.
// a. For some older Hikvision models, events are send repeatedly (if input is high) with the strong state (set to false).
// - In this scenarion we are using a polling mechanism and set a timestamp to understand if the input is still active.
// b. For some newer Hikvision models, Avigilon, events are send only once (if state is set active).
// - In this scenario we are creating a new subscription to retrieve the initial (current) state of the inputs and outputs.
events, err := onvif.GetEventMessages(device, pullPointAddress)
// Get a new pull point address, to get the initiatal state of the inputs and outputs.
pullPointAddressInitialState, err := onvif.CreatePullPointSubscription(device)
if err != nil {
log.Log.Error("cloud.HandleHeartBeat(): error while creating pull point subscription: " + err.Error())
}
if pullPointAddressInitialState != "" {
log.Log.Debug("cloud.HandleHeartBeat(): Fetching events from pullPointAddressInitialState")
events, err := onvif.GetEventMessages(device, pullPointAddressInitialState)
log.Log.Debug("cloud.HandleHeartBeat(): Completed fetching events from pullPointAddressInitialState")
if err == nil && len(events) > 0 {
onvifEventsList, err = json.Marshal(events)
if err != nil {
@@ -309,9 +323,28 @@ loop:
} else if err != nil {
log.Log.Error("cloud.HandleHeartBeat(): error while getting events: " + err.Error())
onvifEventsList = []byte("[]")
// Try to unsubscribe and subscribe again.
onvif.UnsubscribePullPoint(device, pullPointAddress)
pullPointAddress, err = onvif.CreatePullPointSubscription(device)
} else if len(events) == 0 {
log.Log.Debug("cloud.HandleHeartBeat(): no events found.")
onvifEventsList = []byte("[]")
}
onvif.UnsubscribePullPoint(device, pullPointAddressInitialState)
}
// We do a second run an a long-living subscription to get the events asynchronously.
if pullPointAddressLoopState != "" {
log.Log.Debug("cloud.HandleHeartBeat(): Fetching events from pullPointAddressLoopState")
events, err := onvif.GetEventMessages(device, pullPointAddressLoopState)
log.Log.Debug("cloud.HandleHeartBeat(): Completed fetching events from pullPointAddressLoopState")
if err == nil && len(events) > 0 {
onvifEventsList, err = json.Marshal(events)
if err != nil {
log.Log.Error("cloud.HandleHeartBeat(): error while marshalling events: " + err.Error())
onvifEventsList = []byte("[]")
}
} else if err != nil {
log.Log.Error("cloud.HandleHeartBeat(): error while getting events: " + err.Error())
onvifEventsList = []byte("[]")
pullPointAddressLoopState, err = onvif.CreatePullPointSubscription(device)
if err != nil {
log.Log.Error("cloud.HandleHeartBeat(): error while creating pull point subscription: " + err.Error())
}
@@ -321,14 +354,17 @@ loop:
}
} else {
log.Log.Debug("cloud.HandleHeartBeat(): no pull point address found.")
onvifEventsList = []byte("[]")
// Try again
pullPointAddress, err = onvif.CreatePullPointSubscription(device)
pullPointAddressLoopState, err = onvif.CreatePullPointSubscription(device)
if err != nil {
log.Log.Debug("cloud.HandleHeartBeat(): error while creating pull point subscription: " + err.Error())
log.Log.Error("cloud.HandleHeartBeat(): error while creating pull point subscription: " + err.Error())
}
}
// It also might be that events are not supported by the camera, in that case we will try to get the digital inputs and outputs.
// Through the `device` API, the `GetDigitalInputs` and `GetDigitalOutputs` functions are called.
// The disadvantage of this approach is that we don't have the state of the inputs and outputs (which is crazy..)
if pullPointAddressInitialState == "" && pullPointAddressLoopState == "" {
var events []onvif.ONVIFEvents
outputs, err := onvif.GetRelayOutputs(device)
if err != nil {
@@ -367,7 +403,6 @@ loop:
onvifEventsList = []byte("[]")
}
}
} else {
log.Log.Error("cloud.HandleHeartBeat(): error while connecting to ONVIF device: " + err.Error())
onvifPresetsList = []byte("[]")
@@ -612,11 +647,11 @@ loop:
}
}
if pullPointAddress != "" {
if pullPointAddressLoopState != "" {
cameraConfiguration := configuration.Config.Capture.IPCamera
device, _, err := onvif.ConnectToOnvifDevice(&cameraConfiguration)
if err == nil {
onvif.UnsubscribePullPoint(device, pullPointAddress)
if err != nil {
onvif.UnsubscribePullPoint(device, pullPointAddressLoopState)
}
}

View File

@@ -87,7 +87,6 @@ func WriteFileToBackChannel(infile av.DemuxCloser) {
break
}
// Send to backchannel
fmt.Println(buffer)
infile.Write(buffer, 2, uint32(count))
count = count + 1024

View File

@@ -5,7 +5,6 @@ import (
"encoding/json"
"encoding/xml"
"errors"
"fmt"
"io"
"strconv"
"strings"
@@ -809,14 +808,16 @@ func GetPresetsFromDevice(device *onvif.Device) ([]models.OnvifActionPreset, err
return presets, err
}
presetsList := ""
for _, preset := range presetsResponse.Preset {
log.Log.Debug("onvif.main.GetPresetsFromDevice(): " + string(preset.Name) + " (" + string(preset.Token) + ")")
p := models.OnvifActionPreset{
Name: string(preset.Name),
Token: string(preset.Token),
}
presetsList += string(preset.Name) + " (" + string(preset.Token) + "), "
presets = append(presets, p)
}
log.Log.Debug("onvif.main.GetPresetsFromDevice(): " + presetsList)
return presets, err
}
@@ -991,7 +992,8 @@ func CreatePullPointSubscription(dev *onvif.Device) (string, error) {
Filter: &event.FilterType{
TopicExpression: &event.TopicExpressionType{
Dialect: xsd.String("http://www.onvif.org/ver10/tev/topicExpression/ConcreteSet"),
TopicKinds: "tns1:Device/Trigger//.",
TopicKinds: "tns1:Device/Trigger//.", // -> This works for Avigilon, Hanwa, Hikvision
// TopicKinds: "//.", -> This works for Axis, but throws other errors.
},
},
})
@@ -1048,14 +1050,10 @@ func GetInputOutputs() ([]ONVIFEvents, error) {
// We have some odd behaviour for inputs: the logical state is set to false even if circuit is closed. However we do see repeated events (looks like heartbeats).
// We are assuming that if we do not receive an event for 15 seconds the input is inactive, otherwise we set to active.
for key, value := range inputOutputDeviceMap {
if value.Type == "input" {
if time.Now().Unix()-value.Timestamp > 15 {
value.Value = "false"
} else {
value.Value = "true"
}
inputOutputDeviceMap[key] = value
if time.Now().Unix()-value.Timestamp < 15 && value.Value == "false" {
value.Value = "true"
}
inputOutputDeviceMap[key] = value
eventsArray = append(eventsArray, *value)
}
for _, value := range eventsArray {
@@ -1080,7 +1078,7 @@ func GetEventMessages(dev *onvif.Device, pullPointAddress string) ([]ONVIFEvents
} else {
// Pull message
pullMessage := event.PullMessages{
Timeout: xsd.Duration("PT30S"),
Timeout: xsd.Duration("PT5S"),
MessageLimit: 10,
}
requestBody, err := xml.Marshal(pullMessage)
@@ -1100,7 +1098,6 @@ func GetEventMessages(dev *onvif.Device, pullPointAddress string) ([]ONVIFEvents
res.Body.Close()
if err == nil {
stringBody := string(bs)
fmt.Println(stringBody)
decodedXML, et, err := getXMLNode(stringBody, "PullMessagesResponse")
if err != nil {
log.Log.Error("onvif.main.GetEventMessages(pullMessages): " + err.Error())
@@ -1116,9 +1113,9 @@ func GetEventMessages(dev *onvif.Device, pullPointAddress string) ([]ONVIFEvents
for _, message := range pullMessagesResponse.NotificationMessage {
log.Log.Debug("onvif.main.GetEventMessages(pullMessages): " + string(message.Topic.TopicKinds))
if len(message.Message.Message.Data.SimpleItem) > 0 {
log.Log.Debug("onvif.main.GetEventMessages(pullMessages): " + string(message.Message.Message.Data.SimpleItem[0].Name) + " " + string(message.Message.Message.Data.SimpleItem[0].Value))
}
//if len(message.Message.Message.Data.SimpleItem) > 0 {
// log.Log.Debug("onvif.main.GetEventMessages(pullMessages): " + string(message.Message.Message.Data.SimpleItem[0].Name) + " " + string(message.Message.Message.Data.SimpleItem[0].Value))
//}
if message.Topic.TopicKinds == "tns1:Device/Trigger/Relay" ||
message.Topic.TopicKinds == "tns1:Device/tns1:Trigger/tns1:Relay" { // This is for avigilon cameras
if len(message.Message.Message.Data.SimpleItem) > 0 {
@@ -1126,7 +1123,8 @@ func GetEventMessages(dev *onvif.Device, pullPointAddress string) ([]ONVIFEvents
message.Message.Message.Data.SimpleItem[0].Name == "RelayLogicalState" { // On avigilon it's called RelayLogicalState
key := string(message.Message.Message.Source.SimpleItem[0].Value)
value := string(message.Message.Message.Data.SimpleItem[0].Value)
log.Log.Debug("onvif.main.GetEventMessages(pullMessages) output: " + key + " " + value)
propertyOperation := string(message.Message.Message.PropertyOperation)
log.Log.Debug("onvif.main.GetEventMessages(pullMessages) output: " + key + " " + value + " (" + propertyOperation + ")")
// Depending on the onvif library they might use different values for active and inactive.
if value == "active" || value == "1" {
@@ -1137,17 +1135,18 @@ func GetEventMessages(dev *onvif.Device, pullPointAddress string) ([]ONVIFEvents
// Check if key exists in map
// If it does not exist we'll add it to the map otherwise we'll update the value.
if _, ok := inputOutputDeviceMap[key]; !ok {
inputOutputDeviceMap[key] = &ONVIFEvents{
Key: key,
if _, ok := inputOutputDeviceMap[key+"-output"]; !ok {
inputOutputDeviceMap[key+"-output"] = &ONVIFEvents{
Key: key + "-output",
Type: "output",
Value: value,
Timestamp: 0,
}
} else {
log.Log.Debug("onvif.main.GetEventMessages(pullMessages) output: " + key + " " + value)
inputOutputDeviceMap[key].Value = value
inputOutputDeviceMap[key].Timestamp = time.Now().Unix()
} else if propertyOperation == "Changed" {
inputOutputDeviceMap[key+"-output"].Value = value
inputOutputDeviceMap[key+"-output"].Timestamp = time.Now().Unix()
} else if propertyOperation == "Initialized" {
inputOutputDeviceMap[key+"-output"].Value = value
}
}
}
@@ -1158,7 +1157,8 @@ func GetEventMessages(dev *onvif.Device, pullPointAddress string) ([]ONVIFEvents
message.Message.Message.Data.SimpleItem[0].Name == "Level" { // On avigilon it's called level
key := string(message.Message.Message.Source.SimpleItem[0].Value)
value := string(message.Message.Message.Data.SimpleItem[0].Value)
log.Log.Debug("onvif.main.GetEventMessages(pullMessages) input: " + key + " " + value)
propertyOperation := string(message.Message.Message.PropertyOperation)
log.Log.Debug("onvif.main.GetEventMessages(pullMessages) input: " + key + " " + value + " (" + propertyOperation + ")")
// Depending on the onvif library they might use different values for active and inactive.
if value == "active" || value == "1" {
@@ -1169,17 +1169,18 @@ func GetEventMessages(dev *onvif.Device, pullPointAddress string) ([]ONVIFEvents
// Check if key exists in map
// If it does not exist we'll add it to the map otherwise we'll update the value.
if _, ok := inputOutputDeviceMap[key]; !ok {
inputOutputDeviceMap[key] = &ONVIFEvents{
Key: key,
if _, ok := inputOutputDeviceMap[key+"-input"]; !ok {
inputOutputDeviceMap[key+"-input"] = &ONVIFEvents{
Key: key + "-input",
Type: "input",
Value: value,
Timestamp: 0,
}
} else {
log.Log.Debug("onvif.main.GetEventMessages(pullMessages) input: " + key + " " + value)
inputOutputDeviceMap[key].Value = value
inputOutputDeviceMap[key].Timestamp = time.Now().Unix()
} else if propertyOperation == "Changed" {
inputOutputDeviceMap[key+"-input"].Value = value
inputOutputDeviceMap[key+"-input"].Timestamp = time.Now().Unix()
} else if propertyOperation == "Initialized" {
inputOutputDeviceMap[key+"-input"].Value = value
}
}
}
@@ -1268,7 +1269,7 @@ func TriggerRelayOutput(dev *onvif.Device, output string) (err error) {
// this in the future "kerberos-io/onvif" library.
if err == nil {
token := relayoutputs.RelayOutputs[0].Token
if output == string(token) {
if output == string(token+"-output") {
outputState := device.SetRelayOutputState{
RelayOutputToken: token,
LogicalState: "active",

View File

@@ -23,6 +23,8 @@ import (
"github.com/kerberos-io/agent/machinery/src/models"
)
const VERSION = "3.2.1"
const letterBytes = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"
// MaxUint8 - maximum value which can be held in an uint8