mirror of
https://github.com/datarhei/core.git
synced 2025-12-24 13:07:56 +08:00
Fix logging, add json tags, fix create directories before copy/rename a file
This commit is contained in:
@@ -371,9 +371,9 @@ func (fs *diskFilesystem) WriteFile(path string, data []byte) (int64, bool, erro
|
||||
|
||||
func (fs *diskFilesystem) WriteFileSafe(path string, data []byte) (int64, bool, error) {
|
||||
path = fs.cleanPath(path)
|
||||
dir, filename := filepath.Split(path)
|
||||
_, filename := filepath.Split(path)
|
||||
|
||||
tmpfile, err := os.CreateTemp(dir, filename)
|
||||
tmpfile, err := os.CreateTemp("", filename)
|
||||
if err != nil {
|
||||
return -1, false, err
|
||||
}
|
||||
@@ -415,6 +415,11 @@ func (fs *diskFilesystem) rename(src, dst string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
dir, _ := filepath.Split(dst)
|
||||
if err := os.MkdirAll(dir, 0755); err != nil {
|
||||
return fmt.Errorf("failed to create destination directory: %s: %w", dir, err)
|
||||
}
|
||||
|
||||
// First try to rename the file
|
||||
if err := os.Rename(src, dst); err == nil {
|
||||
return nil
|
||||
@@ -447,6 +452,12 @@ func (fs *diskFilesystem) copy(src, dst string) error {
|
||||
return fmt.Errorf("failed to open source file: %w", err)
|
||||
}
|
||||
|
||||
dir, _ := filepath.Split(dst)
|
||||
if err := os.MkdirAll(dir, 0755); err != nil {
|
||||
source.Close()
|
||||
return fmt.Errorf("failed to create destination directory: %s: %w", dir, err)
|
||||
}
|
||||
|
||||
destination, err := os.Create(dst)
|
||||
if err != nil {
|
||||
source.Close()
|
||||
|
||||
@@ -15,20 +15,20 @@ import (
|
||||
|
||||
// Session represents an active session
|
||||
type Session struct {
|
||||
Collector string
|
||||
ID string
|
||||
Reference string
|
||||
CreatedAt time.Time
|
||||
ClosedAt time.Time
|
||||
Location string
|
||||
Peer string
|
||||
Extra map[string]interface{}
|
||||
RxBytes uint64
|
||||
RxBitrate float64 // bit/s
|
||||
TopRxBitrate float64 // bit/s
|
||||
TxBytes uint64
|
||||
TxBitrate float64 // bit/s
|
||||
TopTxBitrate float64 // bit/s
|
||||
Collector string `json:"collector"`
|
||||
ID string `json:"id"`
|
||||
Reference string `json:"reference"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
ClosedAt time.Time `json:"closed_at"`
|
||||
Location string `json:"local"`
|
||||
Peer string `json:"remote"`
|
||||
Extra map[string]interface{} `json:"extra"`
|
||||
RxBytes uint64 `json:"rx_bytes"`
|
||||
RxBitrate float64 `json:"rx_bitrate"` // bit/s
|
||||
TopRxBitrate float64 `json:"rx_top_bitrate"` // bit/s
|
||||
TxBytes uint64 `json:"tx_bytes"`
|
||||
TxBitrate float64 `json:"tx_bitrate"` // bit/s
|
||||
TopTxBitrate float64 `json:"tx_top_bitrate"` // bit/s
|
||||
}
|
||||
|
||||
// Summary is a summary over all current and past sessions.
|
||||
|
||||
@@ -198,6 +198,12 @@ func (r *registry) sessionPersister(pattern *strftime.Strftime, bufferDuration t
|
||||
buffer := &bytes.Buffer{}
|
||||
path := pattern.FormatString(time.Now())
|
||||
|
||||
file := r.persist.fs.Open(path)
|
||||
if file != nil {
|
||||
buffer.ReadFrom(file)
|
||||
file.Close()
|
||||
}
|
||||
|
||||
enc := json.NewEncoder(buffer)
|
||||
|
||||
ticker := time.NewTicker(bufferDuration)
|
||||
@@ -214,9 +220,15 @@ loop:
|
||||
if currentPath != path {
|
||||
if buffer.Len() > 0 {
|
||||
_, _, err := r.persist.fs.WriteFileSafe(path, buffer.Bytes())
|
||||
r.logger.Error().WithError(err).WithField("path", path).Log("")
|
||||
if err != nil {
|
||||
r.logger.Error().WithError(err).WithField("path", path).Log("")
|
||||
}
|
||||
}
|
||||
buffer.Reset()
|
||||
r.logger.Info().WithFields(log.Fields{
|
||||
"previous": path,
|
||||
"current": currentPath,
|
||||
}).Log("Creating new session log file")
|
||||
path = currentPath
|
||||
}
|
||||
|
||||
@@ -224,11 +236,19 @@ loop:
|
||||
case t := <-ticker.C:
|
||||
if buffer.Len() > 0 {
|
||||
_, _, err := r.persist.fs.WriteFileSafe(path, buffer.Bytes())
|
||||
r.logger.Error().WithError(err).WithField("path", path).Log("")
|
||||
if err != nil {
|
||||
r.logger.Error().WithError(err).WithField("path", path).Log("")
|
||||
} else {
|
||||
r.logger.Debug().WithField("path", path).Log("Persisted session log")
|
||||
}
|
||||
}
|
||||
currentPath := pattern.FormatString(t)
|
||||
if currentPath != path {
|
||||
buffer.Reset()
|
||||
r.logger.Info().WithFields(log.Fields{
|
||||
"previous": path,
|
||||
"current": currentPath,
|
||||
}).Log("Creating new session log file")
|
||||
path = currentPath
|
||||
}
|
||||
}
|
||||
@@ -236,7 +256,12 @@ loop:
|
||||
|
||||
if buffer.Len() > 0 {
|
||||
_, _, err := r.persist.fs.WriteFileSafe(path, buffer.Bytes())
|
||||
r.logger.Error().WithError(err).WithField("path", path).Log("")
|
||||
if err != nil {
|
||||
r.logger.Error().WithError(err).WithField("path", path).Log("")
|
||||
} else {
|
||||
r.logger.Debug().WithField("path", path).Log("Persisted session log")
|
||||
}
|
||||
buffer.Reset()
|
||||
}
|
||||
|
||||
buffer = nil
|
||||
|
||||
Reference in New Issue
Block a user