diff --git a/machinery/main.go b/machinery/main.go index 3f98456..d450533 100644 --- a/machinery/main.go +++ b/machinery/main.go @@ -10,6 +10,7 @@ import ( "github.com/kerberos-io/agent/machinery/src/components" "github.com/kerberos-io/agent/machinery/src/log" "github.com/kerberos-io/agent/machinery/src/models" + "github.com/kerberos-io/agent/machinery/src/onvif" configService "github.com/kerberos-io/agent/machinery/src/config" "github.com/kerberos-io/agent/machinery/src/routers" @@ -74,21 +75,27 @@ func main() { switch action { case "version": - log.Log.Info("You are currrently running Kerberos Agent " + VERSION) + log.Log.Info("Main(): You are currrently running Kerberos Agent " + VERSION) case "discover": - log.Log.Info(timeout) + // Convert duration to int + timeout, err := time.ParseDuration(timeout + "ms") + if err != nil { + log.Log.Fatal("Main(): could not parse timeout: " + err.Error()) + return + } + onvif.Discover(timeout) case "decrypt": - log.Log.Info("Decrypting: " + flag.Arg(0) + " with key: " + flag.Arg(1)) + log.Log.Info("Main(): Decrypting: " + flag.Arg(0) + " with key: " + flag.Arg(1)) symmetricKey := []byte(flag.Arg(1)) if symmetricKey == nil || len(symmetricKey) == 0 { - log.Log.Fatal("Main: symmetric key should not be empty") + log.Log.Fatal("Main(): symmetric key should not be empty") return } if len(symmetricKey) != 32 { - log.Log.Fatal("Main: symmetric key should be 32 bytes") + log.Log.Fatal("Main(): symmetric key should be 32 bytes") return } @@ -133,9 +140,9 @@ func main() { configuration.Config.Key = key err := configService.StoreConfig(configDirectory, configuration.Config) if err == nil { - log.Log.Info("Main: updated unique key for agent to: " + key) + log.Log.Info("Main(): updated unique key for agent to: " + key) } else { - log.Log.Info("Main: something went wrong while trying to store key: " + key) + log.Log.Info("Main(): something went wrong while trying to store key: " + key) } } @@ -162,6 +169,6 @@ func main() { routers.StartWebserver(configDirectory, &configuration, &communication, &capture) } default: - log.Log.Error("Main: Sorry I don't understand :(") + log.Log.Error("Main(): Sorry I don't understand :(") } } diff --git a/machinery/src/api/onvif.go b/machinery/src/api/onvif.go deleted file mode 100644 index 778f64e..0000000 --- a/machinery/src/api/onvif.go +++ /dev/null @@ -1 +0,0 @@ -package api diff --git a/machinery/src/capture/main.go b/machinery/src/capture/main.go index 0f1c800..0c0bef7 100644 --- a/machinery/src/capture/main.go +++ b/machinery/src/capture/main.go @@ -10,6 +10,7 @@ import ( "time" "github.com/gin-gonic/gin" + "github.com/kerberos-io/agent/machinery/src/conditions" "github.com/kerberos-io/agent/machinery/src/encryption" "github.com/kerberos-io/agent/machinery/src/log" "github.com/kerberos-io/agent/machinery/src/models" @@ -55,9 +56,7 @@ func CleanupRecordingDirectory(configDirectory string, configuration *models.Con func HandleRecordStream(queue *packets.Queue, configDirectory string, configuration *models.Configuration, communication *models.Communication, rtspClient RTSPClient) { config := configuration.Config - - // Get the streams from the rtsp client. - //streams, _ := rtspClient.GetStreams() + loc, _ := time.LoadLocation(config.Timezone) if config.Capture.Recording == "false" { log.Log.Info("capture.HandleRecordStream(): disabled, we will not record anything.") @@ -86,7 +85,6 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat // Do not do anything! log.Log.Info("capture.HandleRecordStream() - continuous: Start continuous recording ") - loc, _ := time.LoadLocation(config.Timezone) now = time.Now().Unix() timestamp = now start := false @@ -99,7 +97,6 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat fullName := "" // Get as much packets we need. - //for pkt := range packets { var cursorError error var pkt packets.Packet var nextPkt packets.Packet @@ -145,10 +142,6 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat // Cleanup muxer start = false - //_, err = file.Write(cws.buf) - //if err != nil { - // log.Log.Info("capture.HandleRecordStream() - continuous: " + err.Error()) - //} file.Close() file = nil @@ -191,29 +184,11 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat // If not yet started and a keyframe, let's make a recording if !start && pkt.IsKeyFrame { - // Check if within time interval - nowInTimezone := time.Now().In(loc) - weekday := nowInTimezone.Weekday() - hour := nowInTimezone.Hour() - minute := nowInTimezone.Minute() - second := nowInTimezone.Second() - timeEnabled := config.Time - timeInterval := config.Timetable[int(weekday)] - - if timeEnabled == "true" && timeInterval != nil { - start1 := timeInterval.Start1 - end1 := timeInterval.End1 - start2 := timeInterval.Start2 - end2 := timeInterval.End2 - currentTimeInSeconds := hour*60*60 + minute*60 + second - if (currentTimeInSeconds >= start1 && currentTimeInSeconds <= end1) || - (currentTimeInSeconds >= start2 && currentTimeInSeconds <= end2) { - - } else { - log.Log.Debug("capture.HandleRecordStream() - continuous: Disabled: no continuous recording at this moment. Not within specified time interval.") - time.Sleep(5 * time.Second) - continue - } + makeRecording := conditions.IsWithinTimeInterval(loc, configuration) + if !makeRecording { + log.Log.Debug("capture.HandleRecordStream() - continuous: Disabled: no continuous recording at this moment. Not within specified time interval.") + time.Sleep(5 * time.Second) + continue } start = true @@ -311,10 +286,6 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat // Cleanup muxer start = false - //_, err = file.Write(cws.buf) - //if err != nil { - // log.Log.Info("capture.HandleRecordStream() - continuous: " + err.Error()) - //} file.Close() file = nil @@ -501,12 +472,6 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat lastDuration = pkt.Time lastRecordingTime = time.Now().Unix() - - // Cleanup muxer - //_, err := file.Write(cws.buf) - //if err != nil { - // panic(err) - //} file.Close() file = nil @@ -663,7 +628,6 @@ func Base64Image(captureDevice *Capture, communication *models.Communication) st break } } - return encodedImage } diff --git a/machinery/src/cloud/KerberosHub.go b/machinery/src/cloud/kerberos_hub.go similarity index 100% rename from machinery/src/cloud/KerberosHub.go rename to machinery/src/cloud/kerberos_hub.go diff --git a/machinery/src/cloud/KerberosVault.go b/machinery/src/cloud/kerberos_vault.go similarity index 100% rename from machinery/src/cloud/KerberosVault.go rename to machinery/src/cloud/kerberos_vault.go diff --git a/machinery/src/components/Kerberos.go b/machinery/src/components/Kerberos.go index 56723bf..57375f7 100644 --- a/machinery/src/components/Kerberos.go +++ b/machinery/src/components/Kerberos.go @@ -198,14 +198,11 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu cameraSettings.SubRTSP != subRtspUrl || cameraSettings.Width != width || cameraSettings.Height != height { - //cameraSettings.Num != num || - //cameraSettings.Denum != denum || - //cameraSettings.Codec != videoStream.(av.VideoCodecData).Type() { // TODO: this condition is used to reset the decoder when the camera settings change. // The main idea is that you only set the decoder once, and then reuse it on each restart (no new memory allocation). // However the stream settings of the camera might have been changed, and so the decoder might need to be reloaded. - // .... + // .... Not used for the moment .... if cameraSettings.RTSP != "" && cameraSettings.SubRTSP != "" && cameraSettings.Initialized { //decoder.Close() @@ -226,10 +223,6 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu cameraSettings.SubRTSP = subRtspUrl cameraSettings.Width = width cameraSettings.Height = height - //cameraSettings.Framerate = float64(num) / float64(denum) - //cameraSettings.Num = num - //cameraSettings.Denum = denum - //cameraSettings.Codec = videoStream.(av.VideoCodecData).Type() cameraSettings.Initialized = true } else { log.Log.Info("components.Kerberos.RunAgent(): camera settings did not change, keeping decoder") diff --git a/machinery/src/components/Onvif.go b/machinery/src/components/Onvif.go deleted file mode 100644 index 9e8652f..0000000 --- a/machinery/src/components/Onvif.go +++ /dev/null @@ -1,25 +0,0 @@ -package components - -import ( - "time" - - "github.com/cedricve/go-onvif" - "github.com/kerberos-io/agent/machinery/src/log" -) - -func Discover(timeout time.Duration) { - log.Log.Info("Discovering devices") - log.Log.Info("Waiting for " + (timeout * time.Second).String()) - devices, err := onvif.StartDiscovery(timeout * time.Second) - if err != nil { - log.Log.Error(err.Error()) - } else { - for _, device := range devices { - hostname, _ := device.GetHostname() - log.Log.Info(hostname.Name) - } - if len(devices) == 0 { - log.Log.Info("No devices descovered\n") - } - } -} diff --git a/machinery/src/components/Stream.go b/machinery/src/components/Stream.go deleted file mode 100644 index 435c684..0000000 --- a/machinery/src/components/Stream.go +++ /dev/null @@ -1,82 +0,0 @@ -package components - -import ( - "fmt" - "log" - "time" - - "github.com/deepch/vdk/av" - "github.com/deepch/vdk/codec/h264parser" - "github.com/deepch/vdk/format/rtsp" -) - -type Stream struct { - Name string - Url string - Debug bool - Codecs string -} - -func CreateStream(name string, url string) *Stream { - return &Stream{ - Name: name, - Url: url, - } -} - -func (s Stream) Open() *rtsp.Client { - - // Enable debugging - if s.Debug { - rtsp.DebugRtsp = true - } - - fmt.Println("Dialing in to " + s.Url) - session, err := rtsp.Dial(s.Url) - if err != nil { - log.Println("Something went wrong dialing into stream: ", err) - time.Sleep(5 * time.Second) - } - session.RtpKeepAliveTimeout = 10 * time.Second - return session -} - -func (s Stream) Close(session *rtsp.Client) { - fmt.Println("Closing RTSP session.") - err := session.Close() - if err != nil { - log.Println("Something went wrong while closing your RTSP session: ", err) - } -} - -func (s Stream) GetCodecs() []av.CodecData { - session := s.Open() - codec, err := session.Streams() - log.Println("Reading codecs from stream: ", codec) - if err != nil { - log.Println("Something went wrong while reading codecs from stream: ", err) - time.Sleep(5 * time.Second) - } - s.Close(session) - return codec -} - -func (s Stream) ReadPackets(packetChannel chan av.Packet) { - session := s.Open() - for { - packet, err := session.ReadPacket() - if err != nil { - break - } - if len(packetChannel) < cap(packetChannel) { - packetChannel <- packet - } - } - s.Close(session) -} - -func GetSPSFromCodec(codecs []av.CodecData) ([]byte, []byte) { - sps := codecs[0].(h264parser.CodecData).SPS() - pps := codecs[0].(h264parser.CodecData).PPS() - return sps, pps -} diff --git a/machinery/src/components/Audio.go b/machinery/src/components/backchannel.go similarity index 100% rename from machinery/src/components/Audio.go rename to machinery/src/components/backchannel.go diff --git a/machinery/src/computervision/main.go b/machinery/src/computervision/main.go index d54742f..e24dfa5 100644 --- a/machinery/src/computervision/main.go +++ b/machinery/src/computervision/main.go @@ -7,6 +7,7 @@ import ( mqtt "github.com/eclipse/paho.mqtt.golang" geo "github.com/kellydunn/golang-geo" "github.com/kerberos-io/agent/machinery/src/capture" + "github.com/kerberos-io/agent/machinery/src/conditions" "github.com/kerberos-io/agent/machinery/src/log" "github.com/kerberos-io/agent/machinery/src/models" "github.com/kerberos-io/agent/machinery/src/packets" @@ -16,6 +17,7 @@ func ProcessMotion(motionCursor *packets.QueueCursor, configuration *models.Conf log.Log.Debug("ProcessMotion: started") config := configuration.Config + loc, _ := time.LoadLocation(config.Timezone) var isPixelChangeThresholdReached = false var changesToReturn = 0 @@ -104,7 +106,6 @@ func ProcessMotion(motionCursor *packets.QueueCursor, configuration *models.Conf // Start the motion detection i := 0 - loc, _ := time.LoadLocation(config.Timezone) for cursorError == nil { pkt, cursorError = motionCursor.ReadPacket() @@ -121,69 +122,48 @@ func ProcessMotion(motionCursor *packets.QueueCursor, configuration *models.Conf // Check if within time interval detectMotion := true - timeEnabled := config.Time - if timeEnabled != "false" { - now := time.Now().In(loc) - weekday := now.Weekday() - hour := now.Hour() - minute := now.Minute() - second := now.Second() - if config.Timetable != nil && len(config.Timetable) > 0 { - timeInterval := config.Timetable[int(weekday)] - if timeInterval != nil { - start1 := timeInterval.Start1 - end1 := timeInterval.End1 - start2 := timeInterval.Start2 - end2 := timeInterval.End2 - currentTimeInSeconds := hour*60*60 + minute*60 + second - if (currentTimeInSeconds >= start1 && currentTimeInSeconds <= end1) || - (currentTimeInSeconds >= start2 && currentTimeInSeconds <= end2) { - - } else { - detectMotion = false - log.Log.Info("ProcessMotion: Time interval not valid, disabling motion detection.") - } - } - } - } + detectMotion = conditions.IsWithinTimeInterval(loc, configuration) if config.Capture.Motion != "false" { - // Remember additional information about the result of findmotion - isPixelChangeThresholdReached, changesToReturn = FindMotion(imageArray, coordinatesToCheck, pixelThreshold) - if detectMotion && isPixelChangeThresholdReached { + if detectMotion { - // If offline mode is disabled, send a message to the hub - if config.Offline != "true" { - if mqttClient != nil { - if hubKey != "" { - message := models.Message{ - Payload: models.Payload{ - Action: "motion", - DeviceId: configuration.Config.Key, - Value: map[string]interface{}{ - "timestamp": time.Now().Unix(), + // Remember additional information about the result of findmotion + isPixelChangeThresholdReached, changesToReturn = FindMotion(imageArray, coordinatesToCheck, pixelThreshold) + if isPixelChangeThresholdReached { + + // If offline mode is disabled, send a message to the hub + if config.Offline != "true" { + if mqttClient != nil { + if hubKey != "" { + message := models.Message{ + Payload: models.Payload{ + Action: "motion", + DeviceId: configuration.Config.Key, + Value: map[string]interface{}{ + "timestamp": time.Now().Unix(), + }, }, - }, - } - payload, err := models.PackageMQTTMessage(configuration, message) - if err == nil { - mqttClient.Publish("kerberos/hub/"+hubKey, 0, false, payload) + } + payload, err := models.PackageMQTTMessage(configuration, message) + if err == nil { + mqttClient.Publish("kerberos/hub/"+hubKey, 0, false, payload) + } else { + log.Log.Info("ProcessMotion: failed to package MQTT message: " + err.Error()) + } } else { - log.Log.Info("ProcessMotion: failed to package MQTT message: " + err.Error()) + mqttClient.Publish("kerberos/agent/"+deviceKey, 2, false, "motion") } - } else { - mqttClient.Publish("kerberos/agent/"+deviceKey, 2, false, "motion") } } - } - if config.Capture.Recording != "false" { - dataToPass := models.MotionDataPartial{ - Timestamp: time.Now().Unix(), - NumberOfChanges: changesToReturn, + if config.Capture.Recording != "false" { + dataToPass := models.MotionDataPartial{ + Timestamp: time.Now().Unix(), + NumberOfChanges: changesToReturn, + } + communication.HandleMotion <- dataToPass //Save data to the channel } - communication.HandleMotion <- dataToPass //Save data to the channel } } diff --git a/machinery/src/conditions/timewindow.go b/machinery/src/conditions/timewindow.go new file mode 100644 index 0000000..e45a3dc --- /dev/null +++ b/machinery/src/conditions/timewindow.go @@ -0,0 +1,38 @@ +package conditions + +import ( + "time" + + "github.com/kerberos-io/agent/machinery/src/log" + "github.com/kerberos-io/agent/machinery/src/models" +) + +func IsWithinTimeInterval(loc *time.Location, configuration *models.Configuration) bool { + config := configuration.Config + timeEnabled := config.Time + detectMotion := true + if timeEnabled != "false" { + now := time.Now().In(loc) + weekday := now.Weekday() + hour := now.Hour() + minute := now.Minute() + second := now.Second() + if config.Timetable != nil && len(config.Timetable) > 0 { + timeInterval := config.Timetable[int(weekday)] + if timeInterval != nil { + start1 := timeInterval.Start1 + end1 := timeInterval.End1 + start2 := timeInterval.Start2 + end2 := timeInterval.End2 + currentTimeInSeconds := hour*60*60 + minute*60 + second + if (currentTimeInSeconds >= start1 && currentTimeInSeconds <= end1) || + (currentTimeInSeconds >= start2 && currentTimeInSeconds <= end2) { + + } else { + log.Log.Info("ProcessMotion: Time interval not valid, disabling motion detection.") + } + } + } + } + return detectMotion +} diff --git a/machinery/src/conditions/uri.go b/machinery/src/conditions/uri.go new file mode 100644 index 0000000..a53baa6 --- /dev/null +++ b/machinery/src/conditions/uri.go @@ -0,0 +1,15 @@ +package conditions + +import ( + "github.com/kerberos-io/agent/machinery/src/models" +) + +func IsValidConditionResponse(configuration *models.Configuration) bool { + config := configuration.Config + conditionURI := config.ConditionURI + detectMotion := true + if conditionURI != "" { + + } + return detectMotion +} diff --git a/machinery/src/models/ApiResponse.go b/machinery/src/models/api_response.go similarity index 100% rename from machinery/src/models/ApiResponse.go rename to machinery/src/models/api_response.go diff --git a/machinery/src/models/MotionData.go b/machinery/src/models/motion_data.go similarity index 100% rename from machinery/src/models/MotionData.go rename to machinery/src/models/motion_data.go diff --git a/machinery/src/onvif/main.go b/machinery/src/onvif/main.go index 2c381a1..cc19bb7 100644 --- a/machinery/src/onvif/main.go +++ b/machinery/src/onvif/main.go @@ -10,6 +10,7 @@ import ( "strings" "time" + onvifc "github.com/cedricve/go-onvif" "github.com/gin-gonic/gin" "github.com/kerberos-io/agent/machinery/src/log" "github.com/kerberos-io/agent/machinery/src/models" @@ -20,6 +21,23 @@ import ( xsd "github.com/kerberos-io/onvif/xsd/onvif" ) +func Discover(timeout time.Duration) { + log.Log.Info("onvif.Discover(): Discovering devices") + log.Log.Info("Waiting for " + timeout.String()) + devices, err := onvifc.StartDiscovery(timeout) + if err != nil { + log.Log.Error("onvif.Discover(): " + err.Error()) + } else { + for _, device := range devices { + hostname, _ := device.GetHostname() + log.Log.Info("onvif.Discover(): " + hostname.Name + " (" + device.XAddr + ")") + } + if len(devices) == 0 { + log.Log.Info("onvif.Discover(): No devices descovered\n") + } + } +} + func HandleONVIFActions(configuration *models.Configuration, communication *models.Communication) { log.Log.Debug("onvif.HandleONVIFActions(): started") diff --git a/machinery/src/routers/http/JWTMiddleware.go b/machinery/src/routers/http/jwt_middleware.go similarity index 100% rename from machinery/src/routers/http/JWTMiddleware.go rename to machinery/src/routers/http/jwt_middleware.go