cleanup names of files (still need more cleanup)+ rework discover method + separated conditions in separate package

This commit is contained in:
Cedric Verstraeten
2023-12-12 09:15:54 +01:00
parent 9151b38e7f
commit b5f5567bcf
16 changed files with 127 additions and 220 deletions

View File

@@ -10,6 +10,7 @@ import (
"github.com/kerberos-io/agent/machinery/src/components"
"github.com/kerberos-io/agent/machinery/src/log"
"github.com/kerberos-io/agent/machinery/src/models"
"github.com/kerberos-io/agent/machinery/src/onvif"
configService "github.com/kerberos-io/agent/machinery/src/config"
"github.com/kerberos-io/agent/machinery/src/routers"
@@ -74,21 +75,27 @@ func main() {
switch action {
case "version":
log.Log.Info("You are currrently running Kerberos Agent " + VERSION)
log.Log.Info("Main(): You are currrently running Kerberos Agent " + VERSION)
case "discover":
log.Log.Info(timeout)
// Convert duration to int
timeout, err := time.ParseDuration(timeout + "ms")
if err != nil {
log.Log.Fatal("Main(): could not parse timeout: " + err.Error())
return
}
onvif.Discover(timeout)
case "decrypt":
log.Log.Info("Decrypting: " + flag.Arg(0) + " with key: " + flag.Arg(1))
log.Log.Info("Main(): Decrypting: " + flag.Arg(0) + " with key: " + flag.Arg(1))
symmetricKey := []byte(flag.Arg(1))
if symmetricKey == nil || len(symmetricKey) == 0 {
log.Log.Fatal("Main: symmetric key should not be empty")
log.Log.Fatal("Main(): symmetric key should not be empty")
return
}
if len(symmetricKey) != 32 {
log.Log.Fatal("Main: symmetric key should be 32 bytes")
log.Log.Fatal("Main(): symmetric key should be 32 bytes")
return
}
@@ -133,9 +140,9 @@ func main() {
configuration.Config.Key = key
err := configService.StoreConfig(configDirectory, configuration.Config)
if err == nil {
log.Log.Info("Main: updated unique key for agent to: " + key)
log.Log.Info("Main(): updated unique key for agent to: " + key)
} else {
log.Log.Info("Main: something went wrong while trying to store key: " + key)
log.Log.Info("Main(): something went wrong while trying to store key: " + key)
}
}
@@ -162,6 +169,6 @@ func main() {
routers.StartWebserver(configDirectory, &configuration, &communication, &capture)
}
default:
log.Log.Error("Main: Sorry I don't understand :(")
log.Log.Error("Main(): Sorry I don't understand :(")
}
}

View File

@@ -1 +0,0 @@
package api

View File

@@ -10,6 +10,7 @@ import (
"time"
"github.com/gin-gonic/gin"
"github.com/kerberos-io/agent/machinery/src/conditions"
"github.com/kerberos-io/agent/machinery/src/encryption"
"github.com/kerberos-io/agent/machinery/src/log"
"github.com/kerberos-io/agent/machinery/src/models"
@@ -55,9 +56,7 @@ func CleanupRecordingDirectory(configDirectory string, configuration *models.Con
func HandleRecordStream(queue *packets.Queue, configDirectory string, configuration *models.Configuration, communication *models.Communication, rtspClient RTSPClient) {
config := configuration.Config
// Get the streams from the rtsp client.
//streams, _ := rtspClient.GetStreams()
loc, _ := time.LoadLocation(config.Timezone)
if config.Capture.Recording == "false" {
log.Log.Info("capture.HandleRecordStream(): disabled, we will not record anything.")
@@ -86,7 +85,6 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
// Do not do anything!
log.Log.Info("capture.HandleRecordStream() - continuous: Start continuous recording ")
loc, _ := time.LoadLocation(config.Timezone)
now = time.Now().Unix()
timestamp = now
start := false
@@ -99,7 +97,6 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
fullName := ""
// Get as much packets we need.
//for pkt := range packets {
var cursorError error
var pkt packets.Packet
var nextPkt packets.Packet
@@ -145,10 +142,6 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
// Cleanup muxer
start = false
//_, err = file.Write(cws.buf)
//if err != nil {
// log.Log.Info("capture.HandleRecordStream() - continuous: " + err.Error())
//}
file.Close()
file = nil
@@ -191,29 +184,11 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
// If not yet started and a keyframe, let's make a recording
if !start && pkt.IsKeyFrame {
// Check if within time interval
nowInTimezone := time.Now().In(loc)
weekday := nowInTimezone.Weekday()
hour := nowInTimezone.Hour()
minute := nowInTimezone.Minute()
second := nowInTimezone.Second()
timeEnabled := config.Time
timeInterval := config.Timetable[int(weekday)]
if timeEnabled == "true" && timeInterval != nil {
start1 := timeInterval.Start1
end1 := timeInterval.End1
start2 := timeInterval.Start2
end2 := timeInterval.End2
currentTimeInSeconds := hour*60*60 + minute*60 + second
if (currentTimeInSeconds >= start1 && currentTimeInSeconds <= end1) ||
(currentTimeInSeconds >= start2 && currentTimeInSeconds <= end2) {
} else {
log.Log.Debug("capture.HandleRecordStream() - continuous: Disabled: no continuous recording at this moment. Not within specified time interval.")
time.Sleep(5 * time.Second)
continue
}
makeRecording := conditions.IsWithinTimeInterval(loc, configuration)
if !makeRecording {
log.Log.Debug("capture.HandleRecordStream() - continuous: Disabled: no continuous recording at this moment. Not within specified time interval.")
time.Sleep(5 * time.Second)
continue
}
start = true
@@ -311,10 +286,6 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
// Cleanup muxer
start = false
//_, err = file.Write(cws.buf)
//if err != nil {
// log.Log.Info("capture.HandleRecordStream() - continuous: " + err.Error())
//}
file.Close()
file = nil
@@ -501,12 +472,6 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
lastDuration = pkt.Time
lastRecordingTime = time.Now().Unix()
// Cleanup muxer
//_, err := file.Write(cws.buf)
//if err != nil {
// panic(err)
//}
file.Close()
file = nil
@@ -663,7 +628,6 @@ func Base64Image(captureDevice *Capture, communication *models.Communication) st
break
}
}
return encodedImage
}

View File

@@ -198,14 +198,11 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu
cameraSettings.SubRTSP != subRtspUrl ||
cameraSettings.Width != width ||
cameraSettings.Height != height {
//cameraSettings.Num != num ||
//cameraSettings.Denum != denum ||
//cameraSettings.Codec != videoStream.(av.VideoCodecData).Type() {
// TODO: this condition is used to reset the decoder when the camera settings change.
// The main idea is that you only set the decoder once, and then reuse it on each restart (no new memory allocation).
// However the stream settings of the camera might have been changed, and so the decoder might need to be reloaded.
// ....
// .... Not used for the moment ....
if cameraSettings.RTSP != "" && cameraSettings.SubRTSP != "" && cameraSettings.Initialized {
//decoder.Close()
@@ -226,10 +223,6 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu
cameraSettings.SubRTSP = subRtspUrl
cameraSettings.Width = width
cameraSettings.Height = height
//cameraSettings.Framerate = float64(num) / float64(denum)
//cameraSettings.Num = num
//cameraSettings.Denum = denum
//cameraSettings.Codec = videoStream.(av.VideoCodecData).Type()
cameraSettings.Initialized = true
} else {
log.Log.Info("components.Kerberos.RunAgent(): camera settings did not change, keeping decoder")

View File

@@ -1,25 +0,0 @@
package components
import (
"time"
"github.com/cedricve/go-onvif"
"github.com/kerberos-io/agent/machinery/src/log"
)
func Discover(timeout time.Duration) {
log.Log.Info("Discovering devices")
log.Log.Info("Waiting for " + (timeout * time.Second).String())
devices, err := onvif.StartDiscovery(timeout * time.Second)
if err != nil {
log.Log.Error(err.Error())
} else {
for _, device := range devices {
hostname, _ := device.GetHostname()
log.Log.Info(hostname.Name)
}
if len(devices) == 0 {
log.Log.Info("No devices descovered\n")
}
}
}

View File

@@ -1,82 +0,0 @@
package components
import (
"fmt"
"log"
"time"
"github.com/deepch/vdk/av"
"github.com/deepch/vdk/codec/h264parser"
"github.com/deepch/vdk/format/rtsp"
)
type Stream struct {
Name string
Url string
Debug bool
Codecs string
}
func CreateStream(name string, url string) *Stream {
return &Stream{
Name: name,
Url: url,
}
}
func (s Stream) Open() *rtsp.Client {
// Enable debugging
if s.Debug {
rtsp.DebugRtsp = true
}
fmt.Println("Dialing in to " + s.Url)
session, err := rtsp.Dial(s.Url)
if err != nil {
log.Println("Something went wrong dialing into stream: ", err)
time.Sleep(5 * time.Second)
}
session.RtpKeepAliveTimeout = 10 * time.Second
return session
}
func (s Stream) Close(session *rtsp.Client) {
fmt.Println("Closing RTSP session.")
err := session.Close()
if err != nil {
log.Println("Something went wrong while closing your RTSP session: ", err)
}
}
func (s Stream) GetCodecs() []av.CodecData {
session := s.Open()
codec, err := session.Streams()
log.Println("Reading codecs from stream: ", codec)
if err != nil {
log.Println("Something went wrong while reading codecs from stream: ", err)
time.Sleep(5 * time.Second)
}
s.Close(session)
return codec
}
func (s Stream) ReadPackets(packetChannel chan av.Packet) {
session := s.Open()
for {
packet, err := session.ReadPacket()
if err != nil {
break
}
if len(packetChannel) < cap(packetChannel) {
packetChannel <- packet
}
}
s.Close(session)
}
func GetSPSFromCodec(codecs []av.CodecData) ([]byte, []byte) {
sps := codecs[0].(h264parser.CodecData).SPS()
pps := codecs[0].(h264parser.CodecData).PPS()
return sps, pps
}

View File

@@ -7,6 +7,7 @@ import (
mqtt "github.com/eclipse/paho.mqtt.golang"
geo "github.com/kellydunn/golang-geo"
"github.com/kerberos-io/agent/machinery/src/capture"
"github.com/kerberos-io/agent/machinery/src/conditions"
"github.com/kerberos-io/agent/machinery/src/log"
"github.com/kerberos-io/agent/machinery/src/models"
"github.com/kerberos-io/agent/machinery/src/packets"
@@ -16,6 +17,7 @@ func ProcessMotion(motionCursor *packets.QueueCursor, configuration *models.Conf
log.Log.Debug("ProcessMotion: started")
config := configuration.Config
loc, _ := time.LoadLocation(config.Timezone)
var isPixelChangeThresholdReached = false
var changesToReturn = 0
@@ -104,7 +106,6 @@ func ProcessMotion(motionCursor *packets.QueueCursor, configuration *models.Conf
// Start the motion detection
i := 0
loc, _ := time.LoadLocation(config.Timezone)
for cursorError == nil {
pkt, cursorError = motionCursor.ReadPacket()
@@ -121,69 +122,48 @@ func ProcessMotion(motionCursor *packets.QueueCursor, configuration *models.Conf
// Check if within time interval
detectMotion := true
timeEnabled := config.Time
if timeEnabled != "false" {
now := time.Now().In(loc)
weekday := now.Weekday()
hour := now.Hour()
minute := now.Minute()
second := now.Second()
if config.Timetable != nil && len(config.Timetable) > 0 {
timeInterval := config.Timetable[int(weekday)]
if timeInterval != nil {
start1 := timeInterval.Start1
end1 := timeInterval.End1
start2 := timeInterval.Start2
end2 := timeInterval.End2
currentTimeInSeconds := hour*60*60 + minute*60 + second
if (currentTimeInSeconds >= start1 && currentTimeInSeconds <= end1) ||
(currentTimeInSeconds >= start2 && currentTimeInSeconds <= end2) {
} else {
detectMotion = false
log.Log.Info("ProcessMotion: Time interval not valid, disabling motion detection.")
}
}
}
}
detectMotion = conditions.IsWithinTimeInterval(loc, configuration)
if config.Capture.Motion != "false" {
// Remember additional information about the result of findmotion
isPixelChangeThresholdReached, changesToReturn = FindMotion(imageArray, coordinatesToCheck, pixelThreshold)
if detectMotion && isPixelChangeThresholdReached {
if detectMotion {
// If offline mode is disabled, send a message to the hub
if config.Offline != "true" {
if mqttClient != nil {
if hubKey != "" {
message := models.Message{
Payload: models.Payload{
Action: "motion",
DeviceId: configuration.Config.Key,
Value: map[string]interface{}{
"timestamp": time.Now().Unix(),
// Remember additional information about the result of findmotion
isPixelChangeThresholdReached, changesToReturn = FindMotion(imageArray, coordinatesToCheck, pixelThreshold)
if isPixelChangeThresholdReached {
// If offline mode is disabled, send a message to the hub
if config.Offline != "true" {
if mqttClient != nil {
if hubKey != "" {
message := models.Message{
Payload: models.Payload{
Action: "motion",
DeviceId: configuration.Config.Key,
Value: map[string]interface{}{
"timestamp": time.Now().Unix(),
},
},
},
}
payload, err := models.PackageMQTTMessage(configuration, message)
if err == nil {
mqttClient.Publish("kerberos/hub/"+hubKey, 0, false, payload)
}
payload, err := models.PackageMQTTMessage(configuration, message)
if err == nil {
mqttClient.Publish("kerberos/hub/"+hubKey, 0, false, payload)
} else {
log.Log.Info("ProcessMotion: failed to package MQTT message: " + err.Error())
}
} else {
log.Log.Info("ProcessMotion: failed to package MQTT message: " + err.Error())
mqttClient.Publish("kerberos/agent/"+deviceKey, 2, false, "motion")
}
} else {
mqttClient.Publish("kerberos/agent/"+deviceKey, 2, false, "motion")
}
}
}
if config.Capture.Recording != "false" {
dataToPass := models.MotionDataPartial{
Timestamp: time.Now().Unix(),
NumberOfChanges: changesToReturn,
if config.Capture.Recording != "false" {
dataToPass := models.MotionDataPartial{
Timestamp: time.Now().Unix(),
NumberOfChanges: changesToReturn,
}
communication.HandleMotion <- dataToPass //Save data to the channel
}
communication.HandleMotion <- dataToPass //Save data to the channel
}
}

View File

@@ -0,0 +1,38 @@
package conditions
import (
"time"
"github.com/kerberos-io/agent/machinery/src/log"
"github.com/kerberos-io/agent/machinery/src/models"
)
func IsWithinTimeInterval(loc *time.Location, configuration *models.Configuration) bool {
config := configuration.Config
timeEnabled := config.Time
detectMotion := true
if timeEnabled != "false" {
now := time.Now().In(loc)
weekday := now.Weekday()
hour := now.Hour()
minute := now.Minute()
second := now.Second()
if config.Timetable != nil && len(config.Timetable) > 0 {
timeInterval := config.Timetable[int(weekday)]
if timeInterval != nil {
start1 := timeInterval.Start1
end1 := timeInterval.End1
start2 := timeInterval.Start2
end2 := timeInterval.End2
currentTimeInSeconds := hour*60*60 + minute*60 + second
if (currentTimeInSeconds >= start1 && currentTimeInSeconds <= end1) ||
(currentTimeInSeconds >= start2 && currentTimeInSeconds <= end2) {
} else {
log.Log.Info("ProcessMotion: Time interval not valid, disabling motion detection.")
}
}
}
}
return detectMotion
}

View File

@@ -0,0 +1,15 @@
package conditions
import (
"github.com/kerberos-io/agent/machinery/src/models"
)
func IsValidConditionResponse(configuration *models.Configuration) bool {
config := configuration.Config
conditionURI := config.ConditionURI
detectMotion := true
if conditionURI != "" {
}
return detectMotion
}

View File

@@ -10,6 +10,7 @@ import (
"strings"
"time"
onvifc "github.com/cedricve/go-onvif"
"github.com/gin-gonic/gin"
"github.com/kerberos-io/agent/machinery/src/log"
"github.com/kerberos-io/agent/machinery/src/models"
@@ -20,6 +21,23 @@ import (
xsd "github.com/kerberos-io/onvif/xsd/onvif"
)
func Discover(timeout time.Duration) {
log.Log.Info("onvif.Discover(): Discovering devices")
log.Log.Info("Waiting for " + timeout.String())
devices, err := onvifc.StartDiscovery(timeout)
if err != nil {
log.Log.Error("onvif.Discover(): " + err.Error())
} else {
for _, device := range devices {
hostname, _ := device.GetHostname()
log.Log.Info("onvif.Discover(): " + hostname.Name + " (" + device.XAddr + ")")
}
if len(devices) == 0 {
log.Log.Info("onvif.Discover(): No devices descovered\n")
}
}
}
func HandleONVIFActions(configuration *models.Configuration, communication *models.Communication) {
log.Log.Debug("onvif.HandleONVIFActions(): started")