added health request stream messages

This commit is contained in:
harshabose
2025-05-07 00:56:57 +05:30
parent 5d89fe91d6
commit 420cce494f
6 changed files with 389 additions and 10 deletions

126
cmd/test/main.go Normal file
View File

@@ -0,0 +1,126 @@
package main
import (
"fmt"
"log"
"time"
"github.com/shirou/gopsutil/v4/cpu"
"github.com/shirou/gopsutil/v4/disk"
"github.com/shirou/gopsutil/v4/mem"
)
func main() {
fmt.Println("=== CPU Usage ===")
CPU()
fmt.Println("\n=== Memory Usage ===")
Memory()
fmt.Println("\n=== Disk Usage ===")
Disk()
}
func CPU() {
// Get CPU usage percentage for all cores combined (false parameter)
// Wait 100ms for measurement interval
percentages, err := cpu.Percent(100*time.Millisecond, false)
if err != nil {
log.Fatalf("Error getting CPU percentages: %v", err)
}
// Print the result
// Since we passed false as the second parameter, we get a single value
// representing the average usage across all CPU cores
fmt.Printf("CPU Usage (all cores): %.2f%%\n", percentages[0])
// Now let's get per-core CPU usage
perCorePercentages, err := cpu.Percent(100*time.Millisecond, true)
if err != nil {
log.Fatalf("Error getting per-core CPU percentages: %v", err)
}
// Print per-core results
for i, percentage := range perCorePercentages {
fmt.Printf("CPU Core #%d Usage: %.2f%%\n", i, percentage)
}
// Demonstrate continuous monitoring
fmt.Println("\nContinuous CPU monitoring (5 seconds):")
for i := 0; i < 5; i++ {
percentages, err := cpu.Percent(1000*time.Millisecond, false)
if err != nil {
log.Printf("Error getting CPU percentages: %v", err)
continue
}
fmt.Printf("CPU Usage at %s: %.2f%%\n", time.Now().Format("15:04:05"), percentages[0])
}
}
func Memory() {
// Get virtual memory statistics
vmStat, err := mem.VirtualMemory()
if err != nil {
log.Fatalf("Error getting virtual memory statistics: %v", err)
}
// Print memory usage information
fmt.Printf("Total memory: %.2f GB\n", float64(vmStat.Total)/(1024*1024*1024))
fmt.Printf("Available memory: %.2f GB\n", float64(vmStat.Available)/(1024*1024*1024))
fmt.Printf("Used memory: %.2f GB\n", float64(vmStat.Used)/(1024*1024*1024))
fmt.Printf("Memory usage percentage: %.2f%%\n", vmStat.UsedPercent)
// Get swap memory statistics
swapStat, err := mem.SwapMemory()
if err != nil {
log.Printf("Error getting swap memory statistics: %v", err)
} else {
fmt.Printf("\nSwap memory total: %.2f GB\n", float64(swapStat.Total)/(1024*1024*1024))
fmt.Printf("Swap memory used: %.2f GB\n", float64(swapStat.Used)/(1024*1024*1024))
fmt.Printf("Swap memory usage percentage: %.2f%%\n", swapStat.UsedPercent)
}
}
func Disk() {
// Get disk partitions
partitions, err := disk.Partitions(false) // false means physical partitions only
if err != nil {
log.Fatalf("Error getting disk partitions: %v", err)
}
fmt.Println("Disk partitions and usage:")
for _, partition := range partitions {
fmt.Printf("\nDevice: %s\n", partition.Device)
fmt.Printf("Mount point: %s\n", partition.Mountpoint)
fmt.Printf("File system type: %s\n", partition.Fstype)
// Get usage statistics for this partition
usageStat, err := disk.Usage(partition.Mountpoint)
if err != nil {
log.Printf("Error getting usage statistics for %s: %v", partition.Mountpoint, err)
continue
}
fmt.Printf("Total space: %.2f GB\n", float64(usageStat.Total)/(1024*1024*1024))
fmt.Printf("Free space: %.2f GB\n", float64(usageStat.Free)/(1024*1024*1024))
fmt.Printf("Used space: %.2f GB\n", float64(usageStat.Used)/(1024*1024*1024))
fmt.Printf("Usage percentage: %.2f%%\n", usageStat.UsedPercent)
}
// Get IO counters
ioCounters, err := disk.IOCounters()
if err != nil {
log.Printf("Error getting disk IO counters: %v", err)
} else {
fmt.Println("\nDisk IO statistics:")
for deviceName, ioStat := range ioCounters {
fmt.Printf("\nDevice: %s\n", deviceName)
fmt.Printf("Read count: %d\n", ioStat.ReadCount)
fmt.Printf("Write count: %d\n", ioStat.WriteCount)
fmt.Printf("Read bytes: %.2f MB\n", float64(ioStat.ReadBytes)/(1024*1024))
fmt.Printf("Write bytes: %.2f MB\n", float64(ioStat.WriteBytes)/(1024*1024))
fmt.Printf("Read time: %d ms\n", ioStat.ReadTime)
fmt.Printf("Write time: %d ms\n", ioStat.WriteTime)
}
}
}

12
go.mod
View File

@@ -6,3 +6,15 @@ require (
github.com/google/uuid v1.6.0
golang.org/x/crypto v0.37.0
)
require (
github.com/ebitengine/purego v0.8.2 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/shirou/gopsutil/v4 v4.25.4
github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
golang.org/x/sys v0.32.0 // indirect
)

34
go.sum
View File

@@ -1,4 +1,38 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/ebitengine/purego v0.8.2 h1:jPPGWs2sZ1UgOSgD2bClL0MJIqu58nOmIcBuXr62z1I=
github.com/ebitengine/purego v0.8.2/go.mod h1:iIjxzd6CiRiOG0UyXP+V1+jWqUXVjPKLAI0mRfJZTmQ=
github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw=
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
github.com/shirou/gopsutil/v4 v4.25.4 h1:cdtFO363VEOOFrUCjZRh4XVJkb548lyF0q0uTeMqYPw=
github.com/shirou/gopsutil/v4 v4.25.4/go.mod h1:xbuxyoZj+UsgnZrENu3lQivsngRR5BdjbJwf2fv4szA=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU=
github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI=
github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk=
github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY=
github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0=
github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
golang.org/x/crypto v0.37.0 h1:kJNSjF/Xp7kU0iB2Z+9viTPMW4EqqsrywMXLJOOsXSE=
golang.org/x/crypto v0.37.0/go.mod h1:vg+k43peMZ0pUMhYmVAWysMK35e6ioLh3wB8ZCAfbVc=
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20=
golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@@ -2,6 +2,11 @@ package messages
import (
"fmt"
"math"
"time"
"github.com/shirou/gopsutil/v4/cpu"
"github.com/shirou/gopsutil/v4/mem"
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
@@ -17,17 +22,28 @@ var (
type RequestHealth struct {
interceptor.BaseMessage
RoomID types.RoomID `json:"room_id"`
RoomID types.RoomID `json:"room_id"`
Timestamp int64 `json:"timestamp"` // in nanoseconds
ConnectionStartTime int64 `json:"connection_start_time"`
}
func NewRequestHealth(id types.RoomID) *RequestHealth {
return &RequestHealth{
RoomID: id,
func NewRequestHealth(id types.RoomID) (*RequestHealth, error) {
msg := &RequestHealth{
RoomID: id,
Timestamp: time.Now().UnixNano(),
}
bmsg, err := interceptor.NewBaseMessage(message.NoneProtocol, nil, msg)
if err != nil {
panic(err)
}
msg.BaseMessage = bmsg
return msg, nil
}
func NewRequestHealthFactory(id types.RoomID) func() message.Message {
return func() message.Message {
func NewRequestHealthFactory(id types.RoomID) func() (message.Message, error) {
return func() (message.Message, error) {
return NewRequestHealth(id)
}
}
@@ -37,7 +53,31 @@ func (m *RequestHealth) GetProtocol() message.Protocol {
}
func (m *RequestHealth) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error {
s, ok := _i.(interfaces.CanGetState)
if !ok {
return errors.ErrInterfaceMisMatch
}
ss, err := s.GetState(connection)
if err != nil {
return fmt.Errorf("error while read processing 'RequestHealth' msg; err: %s", err.Error())
}
msg, err := NewHealthResponse(m)
if err != nil {
return fmt.Errorf("error while read processing 'RequestHealth' msg; err: %s", err.Error())
}
w, ok := ss.(interfaces.CanWriteMessage)
if !ok {
return errors.ErrInterfaceMisMatch
}
if err := w.Write(msg); err != nil {
return fmt.Errorf("error while read processing 'RequestHealth' msg; err: %s", err.Error())
}
return nil
}
func (m *RequestHealth) WriteProcess(_i interceptor.Interceptor, connection interceptor.Connection) error {
@@ -62,7 +102,144 @@ func (m *RequestHealth) WriteProcess(_i interceptor.Interceptor, connection inte
return nil
}
// NOTE: BASIC HEALTH RESPONSE FOR ROOM MANAGEMENT, OTHER METRICS WILL BE DEALT WITH LATER
type HealthResponse struct {
interceptor.BaseMessage
RoomID types.RoomID `json:"room_id"`
RequestTimeStamp int64 `json:"-"` // in nanoseconds
RoomID types.RoomID `json:"room_id"`
ConnectionStatus types.ConnectionState `json:"connection_status"`
ConnectionUptime types.ConnectionUptime `json:"connection_uptime"`
CPUUsage types.CPUUsage `json:"cpu_usage"`
MemoryUsage types.MemoryUsage `json:"memory_usage"`
NetworkUsage types.NetworkUsage `json:"network_usage"`
Latency types.LatencyMs `json:"latency"`
}
func NewHealthResponse(request *RequestHealth) (*HealthResponse, error) {
response := &HealthResponse{}
response.RequestTimeStamp = request.Timestamp
response.RoomID = request.RoomID
response.setConnectionStatus(request.ConnectionStartTime)
if err := response.setCPUUsage(); err != nil {
return nil, err
}
if err := response.setMemoryUsage(); err != nil {
return nil, err
}
if err := response.setLatency(); err != nil {
return nil, err
}
bmsg, err := interceptor.NewBaseMessage(message.NoneProtocol, nil, response)
if err != nil {
return nil, err
}
response.BaseMessage = bmsg
return response, nil
}
func (m *HealthResponse) GetProtocol() message.Protocol {
return HealthResponseProtocol
}
func (m *HealthResponse) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error {
// TODO: IMPLEMENT HEALTH MANAGEMENT
return nil
}
func (m *HealthResponse) WriteProcess(_i interceptor.Interceptor, connection interceptor.Connection) error {
s, ok := _i.(interfaces.CanGetState)
if !ok {
return errors.ErrInterfaceMisMatch
}
ss, err := s.GetState(connection)
if err != nil {
return fmt.Errorf("error while read processing 'RequestHealth' msg; err: %s", err.Error())
}
id, err := ss.GetClientID()
if err != nil {
return fmt.Errorf("error while read processing 'RequestHealth' msg; err: %s", err.Error())
}
m.SetSender(message.Sender(_i.ID()))
m.SetReceiver(message.Receiver(id))
return nil
}
func (m *HealthResponse) setConnectionStatus(startTime int64) {
m.ConnectionStatus = types.ConnectionStateUp
m.ConnectionUptime = m.getConnectionUptime(startTime)
}
func (m *HealthResponse) getConnectionUptime(startTime int64) types.ConnectionUptime {
return types.ConnectionUptime(time.Now().Unix() - startTime)
}
func (m *HealthResponse) setCPUUsage() error {
perCorePercentages, err := cpu.Percent(100*time.Millisecond, false)
if err != nil {
return fmt.Errorf("error while reading CPU usage; err: %s", err.Error())
}
numCores := len(perCorePercentages)
if numCores == 0 {
return fmt.Errorf("error while reading CPU usage; err: 'no cores detected'")
}
if numCores > math.MaxUint8 {
return fmt.Errorf("error while reading CPU usage; err: 'too many cores detected'")
}
m.CPUUsage = types.CPUUsage{
NumCores: uint8(numCores),
Percent: perCorePercentages,
}
return nil
}
func (m *HealthResponse) setMemoryUsage() error {
vmStats, err := mem.VirtualMemory()
if err != nil {
return fmt.Errorf("error while reading memory usage; err: %s", err.Error())
}
total := float32(vmStats.Total) / (1024.0 * 1024.0 * 1024.0)
used := float32(vmStats.Used) / (1024.0 * 1024.0 * 1024.0)
available := float32(vmStats.Available) / (1024.0 * 1024.0 * 1024.0)
m.MemoryUsage = types.MemoryUsage{
Total: total,
Used: used,
UsedRatio: used / total,
Available: available,
AvailableRatio: available / total,
}
return nil
}
func (m *HealthResponse) setLatency() error {
latencyNano := time.Now().UnixNano() - m.RequestTimeStamp
latencyMs := latencyNano / int64(time.Millisecond)
if latencyMs < 0 {
return fmt.Errorf("error while reading latency; err: 'latency is negative'")
}
m.Latency = types.LatencyMs(latencyMs)
return nil
}

View File

@@ -14,7 +14,7 @@ import (
)
type SendMessageRoom struct {
msgFactory func() message.Message
msgFactory func() (message.Message, error)
roomid types.RoomID
interval time.Duration
err error
@@ -24,7 +24,7 @@ type SendMessageRoom struct {
cancel context.CancelFunc
}
func NewSendMessageRoom(ctx context.Context, cancel context.CancelFunc, msgFactory func() message.Message, roomid types.RoomID, interval time.Duration) *SendMessageRoom {
func NewSendMessageRoom(ctx context.Context, cancel context.CancelFunc, msgFactory func() (message.Message, error), roomid types.RoomID, interval time.Duration) *SendMessageRoom {
return &SendMessageRoom{
ctx: ctx,
cancel: cancel,
@@ -94,7 +94,13 @@ func (p *SendMessageRoom) process(r interfaces.CanGetRoom, _ interfaces.State) e
merr := util.NewMultiError()
for _, participant := range participants {
if err := w.WriteRoomMessage(p.roomid, p.msgFactory(), "", participant); err != nil {
msg, err := p.msgFactory()
if err != nil {
merr.Add(err)
continue
}
if err := w.WriteRoomMessage(p.roomid, msg, "", participant); err != nil {
merr.Add(err)
}
}

View File

@@ -0,0 +1,24 @@
package types
type (
ConnectionState string
ConnectionUptime int
CPUUsage struct {
NumCores uint8 `json:"num_cores"`
Percent []float64 `json:"percent"`
}
MemoryUsage struct {
Total float32 `json:"total"`
Used float32 `json:"used"`
UsedRatio float32 `json:"used_ratio"`
Available float32 `json:"available"`
AvailableRatio float32 `json:"available_ratio"`
}
NetworkUsage float64
LatencyMs int64
)
const (
ConnectionStateUp ConnectionState = "up"
ConnectionStateDown ConnectionState = "down"
)