This commit is contained in:
moqsien
2022-09-22 11:26:26 +08:00
parent 4bc7e5f0ef
commit 7ba266b4f5
3 changed files with 169 additions and 93 deletions

View File

@@ -1,34 +1,91 @@
package main
import "github.com/moqsien/processes"
import (
"fmt"
"os"
"runtime"
"time"
"github.com/gogf/gf/container/gtree"
"github.com/gogf/gf/util/gutil"
"github.com/moqsien/processes"
)
/*
运行结果示例
2022-09-06 20:41:21.428 [INFO] 添加进程: test
2022-09-06 20:41:21.428 [INFO] 尝试启动程序[test]
2022-09-06 20:41:21.429 [DEBU] 进程正在运行[test]等待退出
bin
games
include
lib
lib32
lib64
libexec
libx32
local
sbin
share
src
2022-09-06 20:41:21.430 [INFO] 程序[test]已经结束运行,退出码为:exit status 0
运行结果示例
*/
var manager = processes.NewManager()
func main() {
manager := processes.NewProcManager()
process, _ := manager.NewProcess("test",
processes.ProcPath("/usr/bin/ls"),
processes.ProcArgs([]string{"/usr"}),
var ch = make(chan interface{}, 6)
var tree = gtree.NewRedBlackTree(gutil.ComparatorString, true)
func test(key interface{}, _ interface{}) bool {
name, _ := key.(string)
process, _ := manager.NewProcess(name,
processes.ProcPath(os.Args[0]),
processes.ProcArgs([]string{name}),
processes.ProcStdoutLog("/dev/stdout", ""),
)
process.StartProc(true)
return true
}
func t(key string) {
for {
fmt.Println(">>>process: ", key)
time.Sleep(time.Duration(2) * time.Second)
}
}
func test2(key interface{}, value interface{}) bool {
fmt.Println("hello", key, value)
i, _ := value.(int64)
time.Sleep(time.Duration(i) * time.Second)
name, _ := key.(string)
t(name)
return true
}
func test3(key, value interface{}) bool {
fmt.Println("+++", key, value)
ch <- key
k, _ := key.(string)
if k == "d" {
close(ch)
}
return true
}
func main() {
args := os.Args[1:]
if len(args) == 0 {
tree.Set("a", 1)
tree.Set("b", 2)
tree.Set("c", 3)
tree.Set("d", 4)
tree.IteratorAsc(test)
} else if len(args) == 1 {
fmt.Println(args)
t(args[0])
}
fmt.Println(runtime.NumGoroutine())
// tree := gtree.NewAVLTree(gutil.ComparatorInt)
// for i := 0; i < 10; i++ {
// tree.Set(i, i*10)
// }
// // 打印树形
// tree.Print()
// // 前序遍历
// fmt.Println("ASC:")
// tree.IteratorAsc(func(key, value interface{}) bool {
// fmt.Println(key, value)
// return true
// })
// // 后续遍历
// fmt.Println("DESC:")
// tree.IteratorDesc(func(key, value interface{}) bool {
// fmt.Println(key, value)
// return true
// })
}

View File

@@ -5,25 +5,32 @@ import (
"os"
"sync"
"github.com/gogf/gf/container/gmap"
"github.com/gogf/gf/errors/gerror"
"github.com/moqsien/processes/logger"
)
type ProcManager struct {
Container *gmap.StrAnyMap // 存放ProcessPlus进程对象的容器
type IProc interface {
StartProc(wait bool)
StopProc(wait bool)
GetProcessInfo() *Info
Clone() (*ProcessPlus, error)
}
func NewProcManager() *ProcManager {
return &ProcManager{
Container: &gmap.StrAnyMap{},
type Manager struct {
ProcessList map[string]IProc
*sync.RWMutex
}
func NewManager() *Manager {
return &Manager{
ProcessList: map[string]IProc{},
RWMutex: &sync.RWMutex{},
}
}
// NewProcess 创建新进程path可执行文件路径一般os.Args[0]获取当前go程序可执行文件路径name进程名称
func (that *ProcManager) NewProcess(name string, options ...Option) (p *ProcessPlus, err error) {
func (that *Manager) NewProcess(name string, options ...Option) (p *ProcessPlus, err error) {
p = NewProcess(os.Args[0], name)
if _, found := that.Container.Search(p.Name); found {
if _, found := that.Search(p.Name); found {
return nil, gerror.Newf("进程[%s]已存在", p.Name)
}
p.ProcManager = that
@@ -33,88 +40,100 @@ func (that *ProcManager) NewProcess(name string, options ...Option) (p *ProcessP
option(p)
}
}
that.Add(name, p) // 新进程加入进程管理器中
// that.Add(name, p) // 新进程加入进程管理器中
return p, nil
}
// Add 添加进程到Manager
func (that *ProcManager) Add(name string, proc *ProcessPlus) {
that.Container.Set(name, proc)
logger.Info("添加进程:", name)
}
// Remove 从Manager移除进程
func (that *ProcManager) Remove(name string) *ProcessPlus {
proc := that.Container.Remove(name)
if proc == nil {
return nil
// Search 查找进程
func (that *Manager) Search(name string) (value IProc, found bool) {
that.RLock()
defer that.RUnlock()
if that.ProcessList != nil {
value, found = that.ProcessList[name]
}
logger.Info("remove process:", name)
return proc.(*ProcessPlus)
return
}
// Clear 清空容器
func (that *ProcManager) Clear() {
that.Container.Clear()
// Add 添加进程
func (that *Manager) Add(name string, process IProc) {
that.Lock()
defer that.Unlock()
if that.ProcessList == nil {
that.ProcessList = make(map[string]IProc)
}
that.ProcessList[name] = process
}
// ForEachProcess 迭代进程列表
func (that *ProcManager) ForEachProcess(procFunc func(p *ProcessPlus)) {
that.Container.Iterator(func(_ string, v interface{}) bool {
procFunc(v.(*ProcessPlus))
return true
})
// Remove 从列表移除进程
func (that *Manager) Remove(name string) (value IProc) {
that.Lock()
defer that.Unlock()
if that.ProcessList != nil {
var ok bool
if value, ok = that.ProcessList[name]; ok {
delete(that.ProcessList, name)
}
}
return
}
// StopAllProcesses 关闭所有进程
func (that *ProcManager) StopAllProcesses() {
// Clear 清空进程列表
func (that *Manager) Clear() {
that.Lock()
that.ProcessList = make(map[string]IProc)
that.Unlock()
}
// Iterator 对进程列表进行迭代
func (that *Manager) Iterate(f func(key string, value IProc) bool) {
that.RLock()
defer that.RUnlock()
for k, v := range that.ProcessList {
if !f(k, v) {
break
}
}
}
// StopAllProcs 停止所有进程
func (that *Manager) StopAllProcs() {
var wg sync.WaitGroup
that.ForEachProcess(func(proc *ProcessPlus) {
that.Iterate(func(_ string, proc IProc) bool {
wg.Add(1)
go func(wg *sync.WaitGroup) {
defer wg.Done()
go func(w *sync.WaitGroup) {
defer w.Done()
proc.StopProc(true)
}(&wg)
return true
})
wg.Wait()
}
// 获取所有进程列表
func (that *ProcManager) getAllProcess() []*ProcessPlus {
tmpProcList := make([]*ProcessPlus, 0)
for _, proc := range that.Container.Map() {
// GetAllProcs 获取所有进程列表
func (that *Manager) GetAllProcs() []IProc {
tmpProcList := make([]IProc, 0)
for _, proc := range that.ProcessList {
tmpProcList = append(tmpProcList, proc.(*ProcessPlus))
}
return tmpProcList
}
// Find 根据进程名查询进程
func (that *ProcManager) Find(name string) *ProcessPlus {
proc, ok := that.Container.Search(name)
if ok {
return proc.(*ProcessPlus)
}
return nil
}
// GetAllProcessInfo 获取所有进程信息
func (that *ProcManager) GetAllProcessInfo() ([]*Info, error) {
// GetAllProcsInfo 获取所有进程的信息列表
func (that *Manager) GetAllProcsInfo() ([]*Info, error) {
AllProcessInfo := make([]*Info, 0)
that.ForEachProcess(func(proc *ProcessPlus) {
that.Iterate(func(_ string, proc IProc) bool {
procInfo := proc.GetProcessInfo()
AllProcessInfo = append(AllProcessInfo, procInfo)
return true
})
return AllProcessInfo, nil
}
// GracefulReload 平滑重启进程
func (that *ProcManager) GracefulReload(name string, wait bool) (bool, error) {
// GracefulReload 平滑重启
func (that *Manager) GracefulReload(name string, wait bool) (bool, error) {
logger.Infof("平滑重启进程[%s]", name)
proc := that.Find(name)
if proc == nil {
proc, ok := that.Search(name)
if !ok {
return false, fmt.Errorf("没有找到要重启的进程[%s]", name)
}
procClone, err := proc.Clone()
@@ -123,6 +142,6 @@ func (that *ProcManager) GracefulReload(name string, wait bool) (bool, error) {
}
procClone.StartProc(wait)
proc.StopProc(wait)
that.Container.Set(name, procClone)
that.Add(name, procClone)
return true, nil
}

View File

@@ -20,14 +20,14 @@ import (
type ProcessPlus struct {
*exec.Cmd
*ProcSettings
ProcManager *ProcManager // 进程管理器
Name string // 进程名称
State ProcState // 进程的当前状态
Starting bool // 正在启动的时候该值为true
StopByUser bool // 用户主动关闭的时候该值为true
RetryTimes *int32 // 启动重试的次数
StartTime time.Time // 启动时间
StopTime time.Time // 停止时间
ProcManager *Manager // 进程管理器
Name string // 进程名称
State ProcState // 进程的当前状态
Starting bool // 正在启动的时候该值为true
StopByUser bool // 用户主动关闭的时候该值为true
RetryTimes *int32 // 启动重试的次数
StartTime time.Time // 启动时间
StopTime time.Time // 停止时间
Lock sync.RWMutex
Stdin io.WriteCloser