diff --git a/cmd/test/main.go b/cmd/test/main.go new file mode 100644 index 0000000..bd36854 --- /dev/null +++ b/cmd/test/main.go @@ -0,0 +1,126 @@ +package main + +import ( + "fmt" + "log" + "time" + + "github.com/shirou/gopsutil/v4/cpu" + "github.com/shirou/gopsutil/v4/disk" + "github.com/shirou/gopsutil/v4/mem" +) + +func main() { + fmt.Println("=== CPU Usage ===") + CPU() + + fmt.Println("\n=== Memory Usage ===") + Memory() + + fmt.Println("\n=== Disk Usage ===") + Disk() +} + +func CPU() { + // Get CPU usage percentage for all cores combined (false parameter) + // Wait 100ms for measurement interval + percentages, err := cpu.Percent(100*time.Millisecond, false) + if err != nil { + log.Fatalf("Error getting CPU percentages: %v", err) + } + + // Print the result + // Since we passed false as the second parameter, we get a single value + // representing the average usage across all CPU cores + fmt.Printf("CPU Usage (all cores): %.2f%%\n", percentages[0]) + + // Now let's get per-core CPU usage + perCorePercentages, err := cpu.Percent(100*time.Millisecond, true) + if err != nil { + log.Fatalf("Error getting per-core CPU percentages: %v", err) + } + + // Print per-core results + for i, percentage := range perCorePercentages { + fmt.Printf("CPU Core #%d Usage: %.2f%%\n", i, percentage) + } + + // Demonstrate continuous monitoring + fmt.Println("\nContinuous CPU monitoring (5 seconds):") + for i := 0; i < 5; i++ { + percentages, err := cpu.Percent(1000*time.Millisecond, false) + if err != nil { + log.Printf("Error getting CPU percentages: %v", err) + continue + } + fmt.Printf("CPU Usage at %s: %.2f%%\n", time.Now().Format("15:04:05"), percentages[0]) + } +} + +func Memory() { + // Get virtual memory statistics + vmStat, err := mem.VirtualMemory() + if err != nil { + log.Fatalf("Error getting virtual memory statistics: %v", err) + } + + // Print memory usage information + fmt.Printf("Total memory: %.2f GB\n", float64(vmStat.Total)/(1024*1024*1024)) + fmt.Printf("Available memory: %.2f GB\n", float64(vmStat.Available)/(1024*1024*1024)) + fmt.Printf("Used memory: %.2f GB\n", float64(vmStat.Used)/(1024*1024*1024)) + fmt.Printf("Memory usage percentage: %.2f%%\n", vmStat.UsedPercent) + + // Get swap memory statistics + swapStat, err := mem.SwapMemory() + if err != nil { + log.Printf("Error getting swap memory statistics: %v", err) + } else { + fmt.Printf("\nSwap memory total: %.2f GB\n", float64(swapStat.Total)/(1024*1024*1024)) + fmt.Printf("Swap memory used: %.2f GB\n", float64(swapStat.Used)/(1024*1024*1024)) + fmt.Printf("Swap memory usage percentage: %.2f%%\n", swapStat.UsedPercent) + } +} + +func Disk() { + // Get disk partitions + partitions, err := disk.Partitions(false) // false means physical partitions only + if err != nil { + log.Fatalf("Error getting disk partitions: %v", err) + } + + fmt.Println("Disk partitions and usage:") + for _, partition := range partitions { + fmt.Printf("\nDevice: %s\n", partition.Device) + fmt.Printf("Mount point: %s\n", partition.Mountpoint) + fmt.Printf("File system type: %s\n", partition.Fstype) + + // Get usage statistics for this partition + usageStat, err := disk.Usage(partition.Mountpoint) + if err != nil { + log.Printf("Error getting usage statistics for %s: %v", partition.Mountpoint, err) + continue + } + + fmt.Printf("Total space: %.2f GB\n", float64(usageStat.Total)/(1024*1024*1024)) + fmt.Printf("Free space: %.2f GB\n", float64(usageStat.Free)/(1024*1024*1024)) + fmt.Printf("Used space: %.2f GB\n", float64(usageStat.Used)/(1024*1024*1024)) + fmt.Printf("Usage percentage: %.2f%%\n", usageStat.UsedPercent) + } + + // Get IO counters + ioCounters, err := disk.IOCounters() + if err != nil { + log.Printf("Error getting disk IO counters: %v", err) + } else { + fmt.Println("\nDisk IO statistics:") + for deviceName, ioStat := range ioCounters { + fmt.Printf("\nDevice: %s\n", deviceName) + fmt.Printf("Read count: %d\n", ioStat.ReadCount) + fmt.Printf("Write count: %d\n", ioStat.WriteCount) + fmt.Printf("Read bytes: %.2f MB\n", float64(ioStat.ReadBytes)/(1024*1024)) + fmt.Printf("Write bytes: %.2f MB\n", float64(ioStat.WriteBytes)/(1024*1024)) + fmt.Printf("Read time: %d ms\n", ioStat.ReadTime) + fmt.Printf("Write time: %d ms\n", ioStat.WriteTime) + } + } +} diff --git a/go.mod b/go.mod index 77ac182..742a372 100644 --- a/go.mod +++ b/go.mod @@ -6,3 +6,15 @@ require ( github.com/google/uuid v1.6.0 golang.org/x/crypto v0.37.0 ) + +require ( + github.com/ebitengine/purego v0.8.2 // indirect + github.com/go-ole/go-ole v1.2.6 // indirect + github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect + github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect + github.com/shirou/gopsutil/v4 v4.25.4 + github.com/tklauser/go-sysconf v0.3.12 // indirect + github.com/tklauser/numcpus v0.6.1 // indirect + github.com/yusufpapurcu/wmi v1.2.4 // indirect + golang.org/x/sys v0.32.0 // indirect +) diff --git a/go.sum b/go.sum index 5d1dae3..5929769 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,38 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/ebitengine/purego v0.8.2 h1:jPPGWs2sZ1UgOSgD2bClL0MJIqu58nOmIcBuXr62z1I= +github.com/ebitengine/purego v0.8.2/go.mod h1:iIjxzd6CiRiOG0UyXP+V1+jWqUXVjPKLAI0mRfJZTmQ= +github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= +github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= +github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4= +github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw= +github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= +github.com/shirou/gopsutil/v4 v4.25.4 h1:cdtFO363VEOOFrUCjZRh4XVJkb548lyF0q0uTeMqYPw= +github.com/shirou/gopsutil/v4 v4.25.4/go.mod h1:xbuxyoZj+UsgnZrENu3lQivsngRR5BdjbJwf2fv4szA= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU= +github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI= +github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk= +github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY= +github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= +github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= golang.org/x/crypto v0.37.0 h1:kJNSjF/Xp7kU0iB2Z+9viTPMW4EqqsrywMXLJOOsXSE= golang.org/x/crypto v0.37.0/go.mod h1:vg+k43peMZ0pUMhYmVAWysMK35e6ioLh3wB8ZCAfbVc= +golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20= +golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/middleware/chat/messages/request_health_stream.go b/pkg/middleware/chat/messages/request_health_stream.go index f93c355..9633e33 100644 --- a/pkg/middleware/chat/messages/request_health_stream.go +++ b/pkg/middleware/chat/messages/request_health_stream.go @@ -2,6 +2,11 @@ package messages import ( "fmt" + "math" + "time" + + "github.com/shirou/gopsutil/v4/cpu" + "github.com/shirou/gopsutil/v4/mem" "github.com/harshabose/socket-comm/pkg/interceptor" "github.com/harshabose/socket-comm/pkg/message" @@ -17,17 +22,28 @@ var ( type RequestHealth struct { interceptor.BaseMessage - RoomID types.RoomID `json:"room_id"` + RoomID types.RoomID `json:"room_id"` + Timestamp int64 `json:"timestamp"` // in nanoseconds + ConnectionStartTime int64 `json:"connection_start_time"` } -func NewRequestHealth(id types.RoomID) *RequestHealth { - return &RequestHealth{ - RoomID: id, +func NewRequestHealth(id types.RoomID) (*RequestHealth, error) { + msg := &RequestHealth{ + RoomID: id, + Timestamp: time.Now().UnixNano(), } + + bmsg, err := interceptor.NewBaseMessage(message.NoneProtocol, nil, msg) + if err != nil { + panic(err) + } + msg.BaseMessage = bmsg + + return msg, nil } -func NewRequestHealthFactory(id types.RoomID) func() message.Message { - return func() message.Message { +func NewRequestHealthFactory(id types.RoomID) func() (message.Message, error) { + return func() (message.Message, error) { return NewRequestHealth(id) } } @@ -37,7 +53,31 @@ func (m *RequestHealth) GetProtocol() message.Protocol { } func (m *RequestHealth) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error { + s, ok := _i.(interfaces.CanGetState) + if !ok { + return errors.ErrInterfaceMisMatch + } + ss, err := s.GetState(connection) + if err != nil { + return fmt.Errorf("error while read processing 'RequestHealth' msg; err: %s", err.Error()) + } + + msg, err := NewHealthResponse(m) + if err != nil { + return fmt.Errorf("error while read processing 'RequestHealth' msg; err: %s", err.Error()) + } + + w, ok := ss.(interfaces.CanWriteMessage) + if !ok { + return errors.ErrInterfaceMisMatch + } + + if err := w.Write(msg); err != nil { + return fmt.Errorf("error while read processing 'RequestHealth' msg; err: %s", err.Error()) + } + + return nil } func (m *RequestHealth) WriteProcess(_i interceptor.Interceptor, connection interceptor.Connection) error { @@ -62,7 +102,144 @@ func (m *RequestHealth) WriteProcess(_i interceptor.Interceptor, connection inte return nil } +// NOTE: BASIC HEALTH RESPONSE FOR ROOM MANAGEMENT, OTHER METRICS WILL BE DEALT WITH LATER + type HealthResponse struct { interceptor.BaseMessage - RoomID types.RoomID `json:"room_id"` + RequestTimeStamp int64 `json:"-"` // in nanoseconds + RoomID types.RoomID `json:"room_id"` + ConnectionStatus types.ConnectionState `json:"connection_status"` + ConnectionUptime types.ConnectionUptime `json:"connection_uptime"` + CPUUsage types.CPUUsage `json:"cpu_usage"` + MemoryUsage types.MemoryUsage `json:"memory_usage"` + NetworkUsage types.NetworkUsage `json:"network_usage"` + Latency types.LatencyMs `json:"latency"` +} + +func NewHealthResponse(request *RequestHealth) (*HealthResponse, error) { + response := &HealthResponse{} + + response.RequestTimeStamp = request.Timestamp + response.RoomID = request.RoomID + + response.setConnectionStatus(request.ConnectionStartTime) + + if err := response.setCPUUsage(); err != nil { + return nil, err + } + + if err := response.setMemoryUsage(); err != nil { + return nil, err + } + + if err := response.setLatency(); err != nil { + return nil, err + } + + bmsg, err := interceptor.NewBaseMessage(message.NoneProtocol, nil, response) + if err != nil { + return nil, err + } + response.BaseMessage = bmsg + + return response, nil +} + +func (m *HealthResponse) GetProtocol() message.Protocol { + return HealthResponseProtocol +} + +func (m *HealthResponse) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error { + // TODO: IMPLEMENT HEALTH MANAGEMENT + + return nil +} + +func (m *HealthResponse) WriteProcess(_i interceptor.Interceptor, connection interceptor.Connection) error { + s, ok := _i.(interfaces.CanGetState) + if !ok { + return errors.ErrInterfaceMisMatch + } + + ss, err := s.GetState(connection) + if err != nil { + return fmt.Errorf("error while read processing 'RequestHealth' msg; err: %s", err.Error()) + } + + id, err := ss.GetClientID() + if err != nil { + return fmt.Errorf("error while read processing 'RequestHealth' msg; err: %s", err.Error()) + } + + m.SetSender(message.Sender(_i.ID())) + m.SetReceiver(message.Receiver(id)) + + return nil +} + +func (m *HealthResponse) setConnectionStatus(startTime int64) { + m.ConnectionStatus = types.ConnectionStateUp + m.ConnectionUptime = m.getConnectionUptime(startTime) +} + +func (m *HealthResponse) getConnectionUptime(startTime int64) types.ConnectionUptime { + return types.ConnectionUptime(time.Now().Unix() - startTime) +} + +func (m *HealthResponse) setCPUUsage() error { + perCorePercentages, err := cpu.Percent(100*time.Millisecond, false) + if err != nil { + return fmt.Errorf("error while reading CPU usage; err: %s", err.Error()) + } + + numCores := len(perCorePercentages) + if numCores == 0 { + return fmt.Errorf("error while reading CPU usage; err: 'no cores detected'") + } + + if numCores > math.MaxUint8 { + return fmt.Errorf("error while reading CPU usage; err: 'too many cores detected'") + } + + m.CPUUsage = types.CPUUsage{ + NumCores: uint8(numCores), + Percent: perCorePercentages, + } + + return nil +} + +func (m *HealthResponse) setMemoryUsage() error { + vmStats, err := mem.VirtualMemory() + if err != nil { + return fmt.Errorf("error while reading memory usage; err: %s", err.Error()) + } + + total := float32(vmStats.Total) / (1024.0 * 1024.0 * 1024.0) + used := float32(vmStats.Used) / (1024.0 * 1024.0 * 1024.0) + available := float32(vmStats.Available) / (1024.0 * 1024.0 * 1024.0) + + m.MemoryUsage = types.MemoryUsage{ + Total: total, + Used: used, + UsedRatio: used / total, + Available: available, + AvailableRatio: available / total, + } + + return nil +} + +func (m *HealthResponse) setLatency() error { + latencyNano := time.Now().UnixNano() - m.RequestTimeStamp + + latencyMs := latencyNano / int64(time.Millisecond) + + if latencyMs < 0 { + return fmt.Errorf("error while reading latency; err: 'latency is negative'") + } + + m.Latency = types.LatencyMs(latencyMs) + + return nil } diff --git a/pkg/middleware/chat/process/send_message_room.go b/pkg/middleware/chat/process/send_message_room.go index d742012..96c6782 100644 --- a/pkg/middleware/chat/process/send_message_room.go +++ b/pkg/middleware/chat/process/send_message_room.go @@ -14,7 +14,7 @@ import ( ) type SendMessageRoom struct { - msgFactory func() message.Message + msgFactory func() (message.Message, error) roomid types.RoomID interval time.Duration err error @@ -24,7 +24,7 @@ type SendMessageRoom struct { cancel context.CancelFunc } -func NewSendMessageRoom(ctx context.Context, cancel context.CancelFunc, msgFactory func() message.Message, roomid types.RoomID, interval time.Duration) *SendMessageRoom { +func NewSendMessageRoom(ctx context.Context, cancel context.CancelFunc, msgFactory func() (message.Message, error), roomid types.RoomID, interval time.Duration) *SendMessageRoom { return &SendMessageRoom{ ctx: ctx, cancel: cancel, @@ -94,7 +94,13 @@ func (p *SendMessageRoom) process(r interfaces.CanGetRoom, _ interfaces.State) e merr := util.NewMultiError() for _, participant := range participants { - if err := w.WriteRoomMessage(p.roomid, p.msgFactory(), "", participant); err != nil { + msg, err := p.msgFactory() + if err != nil { + merr.Add(err) + continue + } + + if err := w.WriteRoomMessage(p.roomid, msg, "", participant); err != nil { merr.Add(err) } } diff --git a/pkg/middleware/chat/types/health.go b/pkg/middleware/chat/types/health.go new file mode 100644 index 0000000..12e642c --- /dev/null +++ b/pkg/middleware/chat/types/health.go @@ -0,0 +1,24 @@ +package types + +type ( + ConnectionState string + ConnectionUptime int + CPUUsage struct { + NumCores uint8 `json:"num_cores"` + Percent []float64 `json:"percent"` + } + MemoryUsage struct { + Total float32 `json:"total"` + Used float32 `json:"used"` + UsedRatio float32 `json:"used_ratio"` + Available float32 `json:"available"` + AvailableRatio float32 `json:"available_ratio"` + } + NetworkUsage float64 + LatencyMs int64 +) + +const ( + ConnectionStateUp ConnectionState = "up" + ConnectionStateDown ConnectionState = "down" +)