mirror of
https://github.com/kerberos-io/agent.git
synced 2025-12-24 13:07:58 +08:00
Merge pull request #204 from kerberos-io/feature/jpeg-resolution-chunking
feature/jpeg-resolution-chunking
This commit is contained in:
@@ -706,22 +706,51 @@ func HandleLiveStreamSD(livestreamCursor *packets.QueueCursor, configuration *mo
|
||||
img, err := rtspClient.DecodePacket(pkt)
|
||||
if err == nil {
|
||||
bytes, _ := utils.ImageToBytes(&img)
|
||||
encoded := base64.StdEncoding.EncodeToString(bytes)
|
||||
//encoded := base64.StdEncoding.EncodeToString(bytes)
|
||||
|
||||
valueMap := make(map[string]interface{})
|
||||
valueMap["image"] = encoded
|
||||
message := models.Message{
|
||||
Payload: models.Payload{
|
||||
Action: "receive-sd-stream",
|
||||
DeviceId: configuration.Config.Key,
|
||||
Value: valueMap,
|
||||
},
|
||||
// Split encoded image into chunks of 1kb
|
||||
// This is to prevent the MQTT message to be too large.
|
||||
// By default, bytes are not encoded to base64 here; you are splitting the raw JPEG/PNG bytes.
|
||||
// However, in MQTT and web contexts, binary data may not be handled well, so base64 is often used.
|
||||
// To avoid base64 encoding, just send the raw []byte chunks as you do here.
|
||||
// If you want to avoid base64, make sure the receiver can handle binary payloads.
|
||||
|
||||
chunkSize := 1 * 1024 // 1KB chunks
|
||||
var chunks [][]byte
|
||||
for i := 0; i < len(bytes); i += chunkSize {
|
||||
end := i + chunkSize
|
||||
if end > len(bytes) {
|
||||
end = len(bytes)
|
||||
}
|
||||
chunk := bytes[i:end]
|
||||
chunks = append(chunks, chunk)
|
||||
}
|
||||
payload, err := models.PackageMQTTMessage(configuration, message)
|
||||
if err == nil {
|
||||
mqttClient.Publish("kerberos/hub/"+hubKey, 0, false, payload)
|
||||
} else {
|
||||
log.Log.Info("cloud.HandleLiveStreamSD(): something went wrong while sending acknowledge config to hub: " + string(payload))
|
||||
|
||||
log.Log.Infof("cloud.HandleLiveStreamSD(): Sending %d chunks of size %d bytes.", len(chunks), chunkSize)
|
||||
|
||||
timestamp := time.Now().Unix()
|
||||
for i, chunk := range chunks {
|
||||
valueMap := make(map[string]interface{})
|
||||
valueMap["id"] = timestamp
|
||||
valueMap["chunk"] = chunk
|
||||
valueMap["chunkIndex"] = i
|
||||
valueMap["chunkSize"] = chunkSize
|
||||
valueMap["chunkCount"] = len(chunks)
|
||||
message := models.Message{
|
||||
Payload: models.Payload{
|
||||
Version: "v1.0.0",
|
||||
Action: "receive-sd-stream",
|
||||
DeviceId: configuration.Config.Key,
|
||||
Value: valueMap,
|
||||
},
|
||||
}
|
||||
payload, err := models.PackageMQTTMessage(configuration, message)
|
||||
if err == nil {
|
||||
mqttClient.Publish("kerberos/hub/"+hubKey, 0, false, payload)
|
||||
log.Log.Infof("cloud.HandleLiveStreamSD(): sent chunk %d/%d to MQTT topic kerberos/hub/%s", i+1, len(chunks), hubKey)
|
||||
} else {
|
||||
log.Log.Info("cloud.HandleLiveStreamSD(): something went wrong while sending acknowledge config to hub: " + string(payload))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -118,6 +118,16 @@ func (self *Logging) Info(sentence string) {
|
||||
}
|
||||
}
|
||||
|
||||
func (self *Logging) Infof(format string, args ...interface{}) {
|
||||
switch self.Logger {
|
||||
case "go-logging":
|
||||
gologging.Infof(format, args...)
|
||||
case "logrus":
|
||||
logrus.Infof(format, args...)
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func (self *Logging) Warning(sentence string) {
|
||||
switch self.Logger {
|
||||
case "go-logging":
|
||||
|
||||
@@ -132,6 +132,7 @@ type Message struct {
|
||||
// The payload structure which is used to send over
|
||||
// and receive messages from the MQTT broker
|
||||
type Payload struct {
|
||||
Version string `json:"version"` // Version of the message, e.g. "1.0"
|
||||
Action string `json:"action"`
|
||||
DeviceId string `json:"device_id"`
|
||||
Signature string `json:"signature"`
|
||||
|
||||
Reference in New Issue
Block a user