feat: crontab support record with plan like nvr

This commit is contained in:
pggiroro
2025-06-11 22:17:59 +08:00
parent 23f2ed39a1
commit e0752242b2
9 changed files with 2126 additions and 98 deletions

View File

@@ -2,7 +2,10 @@ package plugin_crontab
import (
"context"
"fmt"
"sort"
"strings"
"time"
"google.golang.org/protobuf/types/known/timestamppb"
cronpb "m7s.live/v5/plugin/crontab/pb"
@@ -17,30 +20,25 @@ func (ct *CrontabPlugin) List(ctx context.Context, req *cronpb.ReqPlanList) (*cr
req.PageSize = 10
}
var total int64
var plans []pkg.RecordPlan
// 从内存中获取所有计划
plans := ct.recordPlans.Items
total := len(plans)
query := ct.DB.Model(&pkg.RecordPlan{})
result := query.Count(&total)
if result.Error != nil {
return &cronpb.PlanResponseList{
Code: 500,
Message: result.Error.Error(),
}, nil
// 计算分页
start := int(req.PageNum-1) * int(req.PageSize)
end := start + int(req.PageSize)
if start >= total {
start = total
}
if end > total {
end = total
}
offset := (req.PageNum - 1) * req.PageSize
result = query.Order("id desc").Offset(int(offset)).Limit(int(req.PageSize)).Find(&plans)
if result.Error != nil {
return &cronpb.PlanResponseList{
Code: 500,
Message: result.Error.Error(),
}, nil
}
// 获取当前页的数据
pagePlans := plans[start:end]
data := make([]*cronpb.Plan, 0, len(plans))
for _, plan := range plans {
data := make([]*cronpb.Plan, 0, len(pagePlans))
for _, plan := range pagePlans {
data = append(data, &cronpb.Plan{
Id: uint32(plan.ID),
Name: plan.Name,
@@ -106,6 +104,9 @@ func (ct *CrontabPlugin) Add(ctx context.Context, req *cronpb.Plan) (*cronpb.Res
}, nil
}
// 添加到内存中
ct.recordPlans.Add(plan)
return &cronpb.Response{
Code: 0,
Message: "success",
@@ -160,10 +161,14 @@ func (ct *CrontabPlugin) Update(ctx context.Context, req *cronpb.Plan) (*cronpb.
}, nil
}
// 处理 enable 状态变更
enableChanged := existingPlan.Enable != req.Enable
// 更新记录
updates := map[string]interface{}{
"name": req.Name,
"plan": req.Plan,
"enabled": req.Enable,
"name": req.Name,
"plan": req.Plan,
"enable": req.Enable,
}
if err := ct.DB.Model(&existingPlan).Updates(updates).Error; err != nil {
@@ -173,6 +178,45 @@ func (ct *CrontabPlugin) Update(ctx context.Context, req *cronpb.Plan) (*cronpb.
}, nil
}
// 更新内存中的记录
existingPlan.Name = req.Name
existingPlan.Plan = req.Plan
existingPlan.Enable = req.Enable
ct.recordPlans.Set(&existingPlan)
// 处理 enable 状态变更后的操作
if enableChanged {
if req.Enable {
// 从 false 变为 true需要创建并启动新的定时任务
var streams []pkg.RecordPlanStream
model := &pkg.RecordPlanStream{PlanID: existingPlan.ID}
if err := ct.DB.Model(model).Where(model).Find(&streams).Error; err != nil {
ct.Error("query record plan streams error: %v", err)
} else {
// 为每个流创建定时任务
for _, stream := range streams {
crontab := &Crontab{
ctp: ct,
RecordPlan: &existingPlan,
RecordPlanStream: &stream,
}
crontab.OnStart(func() {
ct.crontabs.Set(crontab)
})
ct.AddTask(crontab)
}
}
} else {
// 从 true 变为 false需要停止相关的定时任务
ct.crontabs.Range(func(crontab *Crontab) bool {
if crontab.RecordPlan.ID == existingPlan.ID {
crontab.Stop(nil)
}
return true
})
}
}
return &cronpb.Response{
Code: 0,
Message: "success",
@@ -196,6 +240,14 @@ func (ct *CrontabPlugin) Remove(ctx context.Context, req *cronpb.DeleteRequest)
}, nil
}
// 先停止所有相关的定时任务
ct.crontabs.Range(func(crontab *Crontab) bool {
if crontab.RecordPlan.ID == existingPlan.ID {
crontab.Stop(nil)
}
return true
})
// 执行软删除
if err := ct.DB.Delete(&existingPlan).Error; err != nil {
return &cronpb.Response{
@@ -204,6 +256,9 @@ func (ct *CrontabPlugin) Remove(ctx context.Context, req *cronpb.DeleteRequest)
}, nil
}
// 从内存中移除
ct.recordPlans.RemoveByKey(existingPlan.ID)
return &cronpb.Response{
Code: 0,
Message: "success",
@@ -286,9 +341,9 @@ func (ct *CrontabPlugin) AddRecordPlanStream(ctx context.Context, req *cronpb.Pl
}, nil
}
// 检查录制计划是否存在
var plan pkg.RecordPlan
if err := ct.DB.First(&plan, req.PlanId).Error; err != nil {
// 从内存中获取录制计划
plan, ok := ct.recordPlans.Get(uint(req.PlanId))
if !ok {
return &cronpb.Response{
Code: 404,
Message: "record plan not found",
@@ -320,6 +375,8 @@ func (ct *CrontabPlugin) AddRecordPlanStream(ctx context.Context, req *cronpb.Pl
StreamPath: req.StreamPath,
Fragment: req.Fragment,
FilePath: req.FilePath,
Enable: req.Enable,
RecordType: req.RecordType,
}
if err := ct.DB.Create(stream).Error; err != nil {
@@ -329,6 +386,19 @@ func (ct *CrontabPlugin) AddRecordPlanStream(ctx context.Context, req *cronpb.Pl
}, nil
}
// 如果计划是启用状态,创建并启动定时任务
if plan.Enable {
crontab := &Crontab{
ctp: ct,
RecordPlan: plan,
RecordPlanStream: stream,
}
crontab.OnStart(func() {
ct.crontabs.Set(crontab)
})
ct.AddTask(crontab)
}
return &cronpb.Response{
Code: 0,
Message: "success",
@@ -366,9 +436,8 @@ func (ct *CrontabPlugin) UpdateRecordPlanStream(ctx context.Context, req *cronpb
// 更新记录
existingStream.Fragment = req.Fragment
existingStream.FilePath = req.FilePath
if req.Enable != existingStream.Enable {
existingStream.Enable = req.Enable
}
existingStream.Enable = req.Enable
existingStream.RecordType = req.RecordType
if err := ct.DB.Save(&existingStream).Error; err != nil {
return &cronpb.Response{
@@ -377,6 +446,47 @@ func (ct *CrontabPlugin) UpdateRecordPlanStream(ctx context.Context, req *cronpb
}, nil
}
// 停止当前流相关的所有任务
ct.crontabs.Range(func(crontab *Crontab) bool {
if crontab.RecordPlanStream.StreamPath == req.StreamPath {
crontab.Stop(nil)
}
return true
})
// 查询所有关联此流的记录
var streams []pkg.RecordPlanStream
if err := ct.DB.Where("stream_path = ?", req.StreamPath).Find(&streams).Error; err != nil {
ct.Error("query record plan streams error: %v", err)
return &cronpb.Response{
Code: 500,
Message: err.Error(),
}, nil
}
// 为每个启用的计划创建新的定时任务
for _, stream := range streams {
// 从内存中获取对应的计划
plan, ok := ct.recordPlans.Get(stream.PlanID)
if !ok {
ct.Error("record plan not found in memory: %d", stream.PlanID)
continue
}
// 如果计划是启用状态,创建并启动定时任务
if plan.Enable && stream.Enable {
crontab := &Crontab{
ctp: ct,
RecordPlan: plan,
RecordPlanStream: &stream,
}
crontab.OnStart(func() {
ct.crontabs.Set(crontab)
})
ct.AddTask(crontab)
}
}
return &cronpb.Response{
Code: 0,
Message: "success",
@@ -411,6 +521,14 @@ func (ct *CrontabPlugin) RemoveRecordPlanStream(ctx context.Context, req *cronpb
}, nil
}
// 停止所有相关的定时任务
ct.crontabs.Range(func(crontab *Crontab) bool {
if crontab.RecordPlanStream.StreamPath == req.StreamPath && crontab.RecordPlan.ID == uint(req.PlanId) {
crontab.Stop(nil)
}
return true
})
// 执行删除
if err := ct.DB.Delete(&existingStream).Error; err != nil {
return &cronpb.Response{
@@ -424,3 +542,449 @@ func (ct *CrontabPlugin) RemoveRecordPlanStream(ctx context.Context, req *cronpb
Message: "success",
}, nil
}
// 获取周几的名称0=周日1=周一,...6=周六)
func getWeekdayName(weekday int) string {
weekdays := []string{"周日", "周一", "周二", "周三", "周四", "周五", "周六"}
return weekdays[weekday]
}
// 获取周几的索引0=周日1=周一,...6=周六)
func getWeekdayIndex(weekdayName string) int {
weekdays := map[string]int{
"周日": 0, "周一": 1, "周二": 2, "周三": 3, "周四": 4, "周五": 5, "周六": 6,
}
return weekdays[weekdayName]
}
// 获取下一个指定周几的日期
func getNextDateForWeekday(now time.Time, targetWeekday int, location *time.Location) time.Time {
nowWeekday := int(now.Weekday())
daysToAdd := 0
if targetWeekday >= nowWeekday {
daysToAdd = targetWeekday - nowWeekday
} else {
daysToAdd = 7 - (nowWeekday - targetWeekday)
}
// 如果是同一天但当前时间已经过了最后的时间段,则推到下一周
if daysToAdd == 0 {
// 这里简化处理直接加7天到下周同一天
daysToAdd = 7
}
return now.AddDate(0, 0, daysToAdd)
}
// 计算计划中的所有时间段
func calculateTimeSlots(plan string, now time.Time, location *time.Location) ([]*cronpb.TimeSlotInfo, error) {
if len(plan) != 168 {
return nil, fmt.Errorf("invalid plan format: length should be 168")
}
var slots []*cronpb.TimeSlotInfo
// 按周几遍历0=周日1=周一,...6=周六)
for weekday := 0; weekday < 7; weekday++ {
dayOffset := weekday * 24
var startHour int = -1
// 遍历这一天的每个小时
for hour := 0; hour <= 24; hour++ {
// 如果到了一天的结尾或者当前小时状态为0
isEndOfDay := hour == 24
isHourOff := !isEndOfDay && plan[dayOffset+hour] == '0'
if isEndOfDay || isHourOff {
// 如果之前有开始的时间段,现在结束了
if startHour != -1 {
// 计算下一个该周几的日期
targetDate := getNextDateForWeekday(now, weekday, location)
// 创建时间段
startTime := time.Date(targetDate.Year(), targetDate.Month(), targetDate.Day(), startHour, 0, 0, 0, location)
endTime := time.Date(targetDate.Year(), targetDate.Month(), targetDate.Day(), hour, 0, 0, 0, location)
// 转换为 UTC 时间
startTs := timestamppb.New(startTime.UTC())
endTs := timestamppb.New(endTime.UTC())
slots = append(slots, &cronpb.TimeSlotInfo{
Start: startTs,
End: endTs,
Weekday: getWeekdayName(weekday),
TimeRange: fmt.Sprintf("%02d:00-%02d:00", startHour, hour),
})
startHour = -1
}
} else if plan[dayOffset+hour] == '1' && startHour == -1 {
// 找到新的开始时间
startHour = hour
}
}
}
// 按时间排序
sort.Slice(slots, func(i, j int) bool {
// 先按周几排序
weekdayI := getWeekdayIndex(slots[i].Weekday)
weekdayJ := getWeekdayIndex(slots[j].Weekday)
if weekdayI != weekdayJ {
return weekdayI < weekdayJ
}
// 同一天按开始时间排序
return slots[i].Start.AsTime().Hour() < slots[j].Start.AsTime().Hour()
})
return slots, nil
}
// 获取下一个时间段
func getNextTimeSlotFromNow(plan string, now time.Time, location *time.Location) (*cronpb.TimeSlotInfo, error) {
if len(plan) != 168 {
return nil, fmt.Errorf("invalid plan format: length should be 168")
}
// 将当前时间转换为本地时间
localNow := now.In(location)
currentWeekday := int(localNow.Weekday())
currentHour := localNow.Hour()
// 检查是否在整点边界附近(前后30秒)
isNearHourBoundary := localNow.Minute() == 59 && localNow.Second() >= 30 || localNow.Minute() == 0 && localNow.Second() <= 30
// 首先检查当前时间是否在某个时间段内
dayOffset := currentWeekday * 24
if currentHour < 24 && plan[dayOffset+currentHour] == '1' {
// 找到当前小时所在的完整时间段
startHour := currentHour
// 向前查找时间段的开始
for h := currentHour - 1; h >= 0; h-- {
if plan[dayOffset+h] == '1' {
startHour = h
} else {
break
}
}
// 向后查找时间段的结束
endHour := currentHour + 1
for h := endHour; h < 24; h++ {
if plan[dayOffset+h] == '1' {
endHour = h + 1
} else {
break
}
}
// 检查是否已经接近当前时间段的结束
isNearEndOfTimeSlot := currentHour == endHour-1 && localNow.Minute() >= 59 && localNow.Second() >= 30
// 如果我们靠近时间段结束且在小时边界附近,我们跳过此时间段,找下一个
if isNearEndOfTimeSlot && isNearHourBoundary {
// 继续查找下一个时间段
} else {
// 创建时间段
startTime := time.Date(localNow.Year(), localNow.Month(), localNow.Day(), startHour, 0, 0, 0, location)
endTime := time.Date(localNow.Year(), localNow.Month(), localNow.Day(), endHour, 0, 0, 0, location)
// 如果当前时间已经接近或超过了结束时间,调整结束时间
if localNow.After(endTime.Add(-30*time.Second)) || localNow.Equal(endTime) {
// 继续查找下一个时间段
} else {
// 返回当前时间段
return &cronpb.TimeSlotInfo{
Start: timestamppb.New(startTime.UTC()),
End: timestamppb.New(endTime.UTC()),
Weekday: getWeekdayName(currentWeekday),
TimeRange: fmt.Sprintf("%02d:00-%02d:00", startHour, endHour),
}, nil
}
}
}
// 查找下一个时间段
// 先查找当天剩余时间
for h := currentHour + 1; h < 24; h++ {
if plan[dayOffset+h] == '1' {
// 找到开始小时
startHour := h
// 查找结束小时
endHour := h + 1
for j := h + 1; j < 24; j++ {
if plan[dayOffset+j] == '1' {
endHour = j + 1
} else {
break
}
}
// 创建时间段
startTime := time.Date(localNow.Year(), localNow.Month(), localNow.Day(), startHour, 0, 0, 0, location)
endTime := time.Date(localNow.Year(), localNow.Month(), localNow.Day(), endHour, 0, 0, 0, location)
return &cronpb.TimeSlotInfo{
Start: timestamppb.New(startTime.UTC()),
End: timestamppb.New(endTime.UTC()),
Weekday: getWeekdayName(currentWeekday),
TimeRange: fmt.Sprintf("%02d:00-%02d:00", startHour, endHour),
}, nil
}
}
// 如果当天没有找到,则查找后续日期
for d := 1; d <= 7; d++ {
nextDay := (currentWeekday + d) % 7
dayOffset := nextDay * 24
for h := 0; h < 24; h++ {
if plan[dayOffset+h] == '1' {
// 找到开始小时
startHour := h
// 查找结束小时
endHour := h + 1
for j := h + 1; j < 24; j++ {
if plan[dayOffset+j] == '1' {
endHour = j + 1
} else {
break
}
}
// 计算日期
nextDate := localNow.AddDate(0, 0, d)
// 创建时间段
startTime := time.Date(nextDate.Year(), nextDate.Month(), nextDate.Day(), startHour, 0, 0, 0, location)
endTime := time.Date(nextDate.Year(), nextDate.Month(), nextDate.Day(), endHour, 0, 0, 0, location)
return &cronpb.TimeSlotInfo{
Start: timestamppb.New(startTime.UTC()),
End: timestamppb.New(endTime.UTC()),
Weekday: getWeekdayName(nextDay),
TimeRange: fmt.Sprintf("%02d:00-%02d:00", startHour, endHour),
}, nil
}
}
}
return nil, nil
}
func (ct *CrontabPlugin) ParsePlanTime(ctx context.Context, req *cronpb.ParsePlanRequest) (*cronpb.ParsePlanResponse, error) {
if len(req.Plan) != 168 {
return &cronpb.ParsePlanResponse{
Code: 400,
Message: "invalid plan format: length should be 168",
}, nil
}
// 检查字符串格式是否正确只包含0和1
for i, c := range req.Plan {
if c != '0' && c != '1' {
return &cronpb.ParsePlanResponse{
Code: 400,
Message: fmt.Sprintf("invalid character at position %d: %c (should be 0 or 1)", i, c),
}, nil
}
}
// 获取所有时间段
slots, err := calculateTimeSlots(req.Plan, time.Now(), time.Local)
if err != nil {
return &cronpb.ParsePlanResponse{
Code: 500,
Message: err.Error(),
}, nil
}
// 获取下一个时间段
nextSlot, err := getNextTimeSlotFromNow(req.Plan, time.Now(), time.Local)
if err != nil {
return &cronpb.ParsePlanResponse{
Code: 500,
Message: err.Error(),
}, nil
}
return &cronpb.ParsePlanResponse{
Code: 0,
Message: "success",
Slots: slots,
NextSlot: nextSlot,
}, nil
}
// 辅助函数:构建任务状态信息
func buildCrontabTaskInfo(crontab *Crontab, now time.Time) *cronpb.CrontabTaskInfo {
// 基础任务信息
taskInfo := &cronpb.CrontabTaskInfo{
PlanId: uint32(crontab.RecordPlan.ID),
PlanName: crontab.RecordPlan.Name,
StreamPath: crontab.StreamPath,
FilePath: crontab.FilePath,
Fragment: crontab.Fragment,
}
// 获取完整计划时间段列表
if crontab.RecordPlan != nil && crontab.RecordPlan.Plan != "" {
planSlots, err := calculateTimeSlots(crontab.RecordPlan.Plan, now, time.Local)
if err == nil && planSlots != nil && len(planSlots) > 0 {
taskInfo.PlanSlots = planSlots
}
}
return taskInfo
}
// GetCrontabStatus 获取当前Crontab任务状态
func (ct *CrontabPlugin) GetCrontabStatus(ctx context.Context, req *cronpb.CrontabStatusRequest) (*cronpb.CrontabStatusResponse, error) {
response := &cronpb.CrontabStatusResponse{
Code: 0,
Message: "success",
RunningTasks: []*cronpb.CrontabTaskInfo{},
NextTasks: []*cronpb.CrontabTaskInfo{},
TotalRunning: 0,
TotalPlanned: 0,
}
// 获取当前正在运行的任务
runningTasks := make([]*cronpb.CrontabTaskInfo, 0)
nextTasks := make([]*cronpb.CrontabTaskInfo, 0)
// 如果只指定了流路径但未找到对应的任务,也返回该流的计划信息
streamPathFound := false
// 遍历所有Crontab任务
ct.crontabs.Range(func(crontab *Crontab) bool {
// 如果指定了stream_path过滤条件且不匹配则跳过
if req.StreamPath != "" && crontab.StreamPath != req.StreamPath {
return true // 继续遍历
}
// 标记已找到指定的流
if req.StreamPath != "" {
streamPathFound = true
}
now := time.Now()
// 构建基本任务信息
taskInfo := buildCrontabTaskInfo(crontab, now)
// 检查是否正在录制
if crontab.recording && crontab.currentSlot != nil {
// 当前正在录制
taskInfo.IsRecording = true
// 设置时间信息
taskInfo.StartTime = timestamppb.New(crontab.currentSlot.Start)
taskInfo.EndTime = timestamppb.New(crontab.currentSlot.End)
// 计算已运行时间和剩余时间
elapsedDuration := now.Sub(crontab.currentSlot.Start)
remainingDuration := crontab.currentSlot.End.Sub(now)
taskInfo.ElapsedSeconds = uint32(elapsedDuration.Seconds())
taskInfo.RemainingSeconds = uint32(remainingDuration.Seconds())
// 设置时间范围和周几
startHour := crontab.currentSlot.Start.Hour()
endHour := crontab.currentSlot.End.Hour()
taskInfo.TimeRange = fmt.Sprintf("%02d:00-%02d:00", startHour, endHour)
taskInfo.Weekday = getWeekdayName(int(crontab.currentSlot.Start.Weekday()))
// 添加到正在运行的任务列表
runningTasks = append(runningTasks, taskInfo)
} else {
// 获取下一个时间段
nextSlot := crontab.getNextTimeSlot()
if nextSlot != nil {
// 设置下一个任务的信息
taskInfo.IsRecording = false
// 设置时间信息
taskInfo.StartTime = timestamppb.New(nextSlot.Start)
taskInfo.EndTime = timestamppb.New(nextSlot.End)
// 计算等待时间
waitingDuration := nextSlot.Start.Sub(now)
taskInfo.RemainingSeconds = uint32(waitingDuration.Seconds())
// 设置时间范围和周几
startHour := nextSlot.Start.Hour()
endHour := nextSlot.End.Hour()
taskInfo.TimeRange = fmt.Sprintf("%02d:00-%02d:00", startHour, endHour)
taskInfo.Weekday = getWeekdayName(int(nextSlot.Start.Weekday()))
// 添加到计划任务列表
nextTasks = append(nextTasks, taskInfo)
}
}
return true // 继续遍历
})
// 如果指定了流路径但未找到对应的任务,查询数据库获取该流的计划信息
if req.StreamPath != "" && !streamPathFound {
// 查询与该流相关的所有计划
var streams []pkg.RecordPlanStream
if err := ct.DB.Where("stream_path = ?", req.StreamPath).Find(&streams).Error; err == nil && len(streams) > 0 {
for _, stream := range streams {
// 获取对应的计划
var plan pkg.RecordPlan
if err := ct.DB.First(&plan, stream.PlanID).Error; err == nil && plan.Enable && stream.Enable {
now := time.Now()
// 构建任务信息
taskInfo := &cronpb.CrontabTaskInfo{
PlanId: uint32(plan.ID),
PlanName: plan.Name,
StreamPath: stream.StreamPath,
FilePath: stream.FilePath,
Fragment: stream.Fragment,
IsRecording: false,
}
// 获取完整计划时间段列表
planSlots, err := calculateTimeSlots(plan.Plan, now, time.Local)
if err == nil && planSlots != nil && len(planSlots) > 0 {
taskInfo.PlanSlots = planSlots
}
// 获取下一个时间段
nextSlot, err := getNextTimeSlotFromNow(plan.Plan, now, time.Local)
if err == nil && nextSlot != nil {
// 设置时间信息
taskInfo.StartTime = nextSlot.Start
taskInfo.EndTime = nextSlot.End
taskInfo.TimeRange = nextSlot.TimeRange
taskInfo.Weekday = nextSlot.Weekday
// 计算等待时间
waitingDuration := nextSlot.Start.AsTime().Sub(now)
taskInfo.RemainingSeconds = uint32(waitingDuration.Seconds())
// 添加到计划任务列表
nextTasks = append(nextTasks, taskInfo)
}
}
}
}
}
// 按开始时间排序下一个任务列表
sort.Slice(nextTasks, func(i, j int) bool {
return nextTasks[i].StartTime.AsTime().Before(nextTasks[j].StartTime.AsTime())
})
// 设置响应结果
response.RunningTasks = runningTasks
response.NextTasks = nextTasks
response.TotalRunning = uint32(len(runningTasks))
response.TotalPlanned = uint32(len(nextTasks))
return response, nil
}

244
plugin/crontab/api_test.go Normal file
View File

@@ -0,0 +1,244 @@
package plugin_crontab
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestCalculateTimeSlots(t *testing.T) {
// 测试案例:周五的凌晨和上午有开启时间段
// 字符串中1的索引是120(0点),122(2点),123(3点),125(5点),130(10点),135(15点)
// 000000000000000000000000 - 周日(0-23小时) - 全0
// 000000000000000000000000 - 周一(24-47小时) - 全0
// 000000000000000000000000 - 周二(48-71小时) - 全0
// 000000000000000000000000 - 周三(72-95小时) - 全0
// 000000000000000000000000 - 周四(96-119小时) - 全0
// 101101000010000100000000 - 周五(120-143小时) - 0,2,3,5,10,15点开启
// 000000000000000000000000 - 周六(144-167小时) - 全0
planStr := "000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000101101000010000100000000000000000000000000000000"
now := time.Date(2023, 5, 1, 12, 0, 0, 0, time.Local) // 周一中午
slots, err := calculateTimeSlots(planStr, now, time.Local)
assert.NoError(t, err)
assert.Equal(t, 5, len(slots), "应该有5个时间段")
// 检查结果中的时间段(按实际解析结果排序)
assert.Equal(t, "周五", slots[0].Weekday)
assert.Equal(t, "10:00-11:00", slots[0].TimeRange)
assert.Equal(t, "周五", slots[1].Weekday)
assert.Equal(t, "15:00-16:00", slots[1].TimeRange)
assert.Equal(t, "周五", slots[2].Weekday)
assert.Equal(t, "00:00-01:00", slots[2].TimeRange)
assert.Equal(t, "周五", slots[3].Weekday)
assert.Equal(t, "02:00-04:00", slots[3].TimeRange)
assert.Equal(t, "周五", slots[4].Weekday)
assert.Equal(t, "05:00-06:00", slots[4].TimeRange)
// 打印出所有时间段,便于调试
for i, slot := range slots {
t.Logf("时间段 %d: %s %s", i, slot.Weekday, slot.TimeRange)
}
}
func TestGetNextTimeSlotFromNow(t *testing.T) {
// 测试案例:周五的凌晨和上午有开启时间段
planStr := "000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000101101000010000100000000000000000000000000000000"
// 测试1: 当前是周一下一个时间段应该是周五凌晨0点
now1 := time.Date(2023, 5, 1, 12, 0, 0, 0, time.Local) // 周一中午
nextSlot1, err := getNextTimeSlotFromNow(planStr, now1, time.Local)
assert.NoError(t, err)
assert.NotNil(t, nextSlot1)
assert.Equal(t, "周五", nextSlot1.Weekday)
assert.Equal(t, "00:00-01:00", nextSlot1.TimeRange)
// 测试2: 当前是周五凌晨1点下一个时间段应该是周五凌晨2点
now2 := time.Date(2023, 5, 5, 1, 30, 0, 0, time.Local) // 周五凌晨1:30
nextSlot2, err := getNextTimeSlotFromNow(planStr, now2, time.Local)
assert.NoError(t, err)
assert.NotNil(t, nextSlot2)
assert.Equal(t, "周五", nextSlot2.Weekday)
assert.Equal(t, "02:00-04:00", nextSlot2.TimeRange)
// 测试3: 当前是周五凌晨3点此时正在一个时间段内
now3 := time.Date(2023, 5, 5, 3, 0, 0, 0, time.Local) // 周五凌晨3:00
nextSlot3, err := getNextTimeSlotFromNow(planStr, now3, time.Local)
assert.NoError(t, err)
assert.NotNil(t, nextSlot3)
assert.Equal(t, "周五", nextSlot3.Weekday)
assert.Equal(t, "02:00-04:00", nextSlot3.TimeRange)
}
func TestParsePlanFromString(t *testing.T) {
// 测试用户提供的案例字符串的第36-41位表示周一的时间段
// 这个案例中对应周一的12点、14-15点、17点和22点开启
planStr := "000000000000000000000000000000000000101101000010000100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"
now := time.Now()
slots, err := calculateTimeSlots(planStr, now, time.Local)
assert.NoError(t, err)
// 验证解析结果
var foundMondaySlots bool
for _, slot := range slots {
if slot.Weekday == "周一" {
foundMondaySlots = true
t.Logf("找到周一时间段: %s", slot.TimeRange)
}
}
assert.True(t, foundMondaySlots, "应该找到周一的时间段")
// 预期的周一时间段
var mondaySlots []string
for _, slot := range slots {
if slot.Weekday == "周一" {
mondaySlots = append(mondaySlots, slot.TimeRange)
}
}
// 检查是否包含预期的时间段
expectedSlots := []string{
"12:00-13:00",
"14:00-16:00",
"17:00-18:00",
"22:00-23:00",
}
for _, expected := range expectedSlots {
found := false
for _, actual := range mondaySlots {
if expected == actual {
found = true
break
}
}
assert.True(t, found, "应该找到周一时间段:"+expected)
}
// 获取下一个时间段
nextSlot, err := getNextTimeSlotFromNow(planStr, now, time.Local)
assert.NoError(t, err)
if nextSlot != nil {
t.Logf("下一个时间段: %s %s", nextSlot.Weekday, nextSlot.TimeRange)
} else {
t.Log("没有找到下一个时间段")
}
}
// 手动计算字符串长度的辅助函数
func TestCountStringLength(t *testing.T) {
str1 := "000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000101101000010000100000000000000000000000000000000"
assert.Equal(t, 168, len(str1), "第一个测试字符串长度应为168")
str2 := "000000000000000000000000000000000000101101000010000100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"
assert.Equal(t, 168, len(str2), "第二个测试字符串长度应为168")
}
// 测试用户提供的具体字符串
func TestUserProvidedPlanString(t *testing.T) {
// 用户提供的测试字符串
planStr := "000000000000000000000000000000000000101101000010000100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"
// 验证字符串长度
assert.Equal(t, 168, len(planStr), "字符串长度应为168")
// 解析时间段
now := time.Now()
slots, err := calculateTimeSlots(planStr, now, time.Local)
assert.NoError(t, err)
// 打印所有时间段
t.Log("所有时间段:")
for i, slot := range slots {
t.Logf("%d: %s %s", i, slot.Weekday, slot.TimeRange)
}
// 获取下一个时间段
nextSlot, err := getNextTimeSlotFromNow(planStr, now, time.Local)
assert.NoError(t, err)
if nextSlot != nil {
t.Logf("下一个执行时间段: %s %s", nextSlot.Weekday, nextSlot.TimeRange)
t.Logf("开始时间: %s", nextSlot.Start.AsTime().In(time.Local).Format("2006-01-02 15:04:05"))
t.Logf("结束时间: %s", nextSlot.End.AsTime().In(time.Local).Format("2006-01-02 15:04:05"))
} else {
t.Log("没有找到下一个时间段")
}
// 验证周一的时间段
var mondaySlots []string
for _, slot := range slots {
if slot.Weekday == "周一" {
mondaySlots = append(mondaySlots, slot.TimeRange)
}
}
// 预期周一应该有这些时间段
expectedMondaySlots := []string{
"12:00-13:00",
"14:00-16:00",
"17:00-18:00",
"22:00-23:00",
}
assert.Equal(t, len(expectedMondaySlots), len(mondaySlots), "周一时间段数量不匹配")
for i, expected := range expectedMondaySlots {
if i < len(mondaySlots) {
t.Logf("期望周一时间段 %s, 实际是 %s", expected, mondaySlots[i])
}
}
}
// 测试用户提供的第二个字符串
func TestUserProvidedPlanString2(t *testing.T) {
// 用户提供的第二个测试字符串
planStr := "000000000000000000000000000000000000000000000000000000000000001011010100001000000000000000000000000100000000000000000000000010000000000000000000000001000000000000000000"
// 验证字符串长度
assert.Equal(t, 168, len(planStr), "字符串长度应为168")
// 解析时间段
now := time.Now()
slots, err := calculateTimeSlots(planStr, now, time.Local)
assert.NoError(t, err)
// 打印所有时间段并按周几分组
weekdaySlots := make(map[string][]string)
for _, slot := range slots {
weekdaySlots[slot.Weekday] = append(weekdaySlots[slot.Weekday], slot.TimeRange)
}
t.Log("所有时间段(按周几分组):")
weekdays := []string{"周日", "周一", "周二", "周三", "周四", "周五", "周六"}
for _, weekday := range weekdays {
if timeRanges, ok := weekdaySlots[weekday]; ok {
t.Logf("%s: %v", weekday, timeRanges)
}
}
// 打印所有时间段的详细信息
t.Log("\n所有时间段详细信息:")
for i, slot := range slots {
t.Logf("%d: %s %s", i, slot.Weekday, slot.TimeRange)
}
// 获取下一个时间段
nextSlot, err := getNextTimeSlotFromNow(planStr, now, time.Local)
assert.NoError(t, err)
if nextSlot != nil {
t.Logf("\n下一个执行时间段: %s %s", nextSlot.Weekday, nextSlot.TimeRange)
t.Logf("开始时间: %s", nextSlot.Start.AsTime().In(time.Local).Format("2006-01-02 15:04:05"))
t.Logf("结束时间: %s", nextSlot.End.AsTime().In(time.Local).Format("2006-01-02 15:04:05"))
} else {
t.Log("没有找到下一个时间段")
}
}

View File

@@ -1,59 +1,422 @@
package plugin_crontab
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"strconv"
"time"
"m7s.live/v5/pkg/task"
"m7s.live/v5/plugin/crontab/pkg"
)
// 计划时间段
type TimeSlot struct {
Start time.Time // 开始时间
End time.Time // 结束时间
}
// Crontab 定时任务调度器
type Crontab struct {
task.TickTask
task.Job
ctp *CrontabPlugin
*pkg.RecordPlan
*pkg.RecordPlanStream
stop chan struct{}
running bool
location *time.Location
timer *time.Timer
currentSlot *TimeSlot // 当前执行的时间段
recording bool // 是否正在录制
}
func (r *Crontab) GetTickInterval() time.Duration {
return time.Minute
func (cron *Crontab) GetKey() string {
return strconv.Itoa(int(cron.PlanID)) + "_" + cron.StreamPath
}
func (r *Crontab) Tick(any) {
r.Info("开始检查录制计划")
// 获取当前时间
now := time.Now()
// 计算当前是一周中的第几天(0-6, 0是周日)和当前小时(0-23)
weekday := int(now.Weekday())
if weekday == 0 {
weekday = 7 // 将周日从0改为7以便计算
}
hour := now.Hour()
// 计算当前时间对应的位置索引
// (weekday-1)*24 + hour 得到当前时间在144位字符串中的位置
// weekday-1 是因为我们要从周一开始计算
index := (weekday-1)*24 + hour
// 查询所有启用的录制计划
var plans []pkg.RecordPlan
model := pkg.RecordPlan{
Enable: true,
}
if err := r.ctp.DB.Where(&model).Find(&plans).Error; err != nil {
r.Error("查询录制计划失败:", err)
return
// 初始化
func (cron *Crontab) Start() (err error) {
cron.Info("crontab plugin start")
if cron.running {
return // 已经运行中,不重复启动
}
// 遍历所有计划
for _, plan := range plans {
if len(plan.Plan) != 168 {
r.Error("录制计划格式错误plan长度应为168位:", plan.Name)
// 初始化必要字段
if cron.stop == nil {
cron.stop = make(chan struct{})
}
if cron.location == nil {
cron.location = time.Local
}
cron.running = true
return nil
}
// 阻塞运行
func (cron *Crontab) Run() (err error) {
cron.Info("crontab plugin is running")
// 初始化必要字段
if cron.stop == nil {
cron.stop = make(chan struct{})
}
if cron.location == nil {
cron.location = time.Local
}
cron.Info("调度器启动")
for {
// 获取当前时间
now := time.Now().In(cron.location)
// 首先检查是否需要立即执行操作(如停止录制)
if cron.recording && cron.currentSlot != nil &&
(now.Equal(cron.currentSlot.End) || now.After(cron.currentSlot.End)) {
cron.stopRecording()
continue
}
// 检查当前时间对应的位置是否为1
if plan.Plan[index] == '1' {
r.Info("检测到需要开启录像的计划:", plan.Name)
// TODO: 在这里添加开启录像的逻辑
// 确定下一个事件
var nextEvent time.Time
var isStartEvent bool
if cron.recording {
// 如果正在录制,下一个事件是结束时间
nextEvent = cron.currentSlot.End
isStartEvent = false
} else {
// 如果没有录制,计算下一个开始时间
nextSlot := cron.getNextTimeSlot()
if nextSlot == nil {
// 无法确定下次执行时间,使用默认间隔
cron.timer = time.NewTimer(1 * time.Hour)
cron.Info("无有效计划等待1小时后重试")
// 等待定时器或停止信号
select {
case <-cron.timer.C:
continue // 继续循环
case <-cron.stop:
// 停止调度器
if cron.timer != nil {
cron.timer.Stop()
}
cron.Info("调度器停止")
return
}
}
cron.currentSlot = nextSlot
nextEvent = nextSlot.Start
isStartEvent = true
// 如果已过开始时间,立即开始录制
if now.Equal(nextEvent) || now.After(nextEvent) {
cron.startRecording()
continue
}
}
// 计算等待时间
waitDuration := nextEvent.Sub(now)
// 如果等待时间为负,立即执行
if waitDuration <= 0 {
if isStartEvent {
cron.startRecording()
} else {
cron.stopRecording()
}
continue
}
// 设置定时器
timer := time.NewTimer(waitDuration)
if isStartEvent {
cron.Info("下次开始时间: ", nextEvent, "等待时间:", waitDuration)
} else {
cron.Info("下次结束时间: ", nextEvent, " 等待时间:", waitDuration)
}
// 等待定时器或停止信号
select {
case now = <-timer.C:
// 更新当前时间为定时器触发时间
now = now.In(cron.location)
// 执行任务
if isStartEvent {
cron.startRecording()
} else {
cron.stopRecording()
}
case <-cron.stop:
// 停止调度器
timer.Stop()
cron.Info("调度器停止")
return
}
}
}
// 停止
func (cron *Crontab) Dispose() (err error) {
if cron.running {
cron.stop <- struct{}{}
cron.running = false
if cron.timer != nil {
cron.timer.Stop()
}
// 如果还在录制,停止录制
if cron.recording {
cron.stopRecording()
}
}
return
}
// 获取下一个时间段
func (cron *Crontab) getNextTimeSlot() *TimeSlot {
if cron.RecordPlan == nil || !cron.RecordPlan.Enable || cron.RecordPlan.Plan == "" {
return nil // 无有效计划
}
plan := cron.RecordPlan.Plan
if len(plan) != 168 {
cron.Error("无效的计划格式: %s, 长度应为168", plan)
return nil
}
// 使用当地时间
now := time.Now().In(cron.location)
cron.Debug("当前本地时间: %v, 星期%d, 小时%d", now.Format("2006-01-02 15:04:05"), now.Weekday(), now.Hour())
// 当前小时
currentWeekday := int(now.Weekday())
currentHour := now.Hour()
// 检查是否在整点边界附近(前后30秒)
isNearHourBoundary := now.Minute() == 59 && now.Second() >= 30 || now.Minute() == 0 && now.Second() <= 30
// 首先检查当前时间是否在某个时间段内
dayOffset := currentWeekday * 24
if currentHour < 24 && plan[dayOffset+currentHour] == '1' {
// 找到当前小时所在的完整时间段
startHour := currentHour
// 向前查找时间段的开始
for h := currentHour - 1; h >= 0; h-- {
if plan[dayOffset+h] == '1' {
startHour = h
} else {
break
}
}
// 向后查找时间段的结束
endHour := currentHour + 1
for h := endHour; h < 24; h++ {
if plan[dayOffset+h] == '1' {
endHour = h + 1
} else {
break
}
}
// 检查我们是否已经接近当前时间段的结束
isNearEndOfTimeSlot := currentHour == endHour-1 && now.Minute() == 59 && now.Second() >= 30
// 如果我们靠近时间段结束且在小时边界附近,我们跳过此时间段,找下一个
if isNearEndOfTimeSlot && isNearHourBoundary {
cron.Debug("接近当前时间段结束,准备查找下一个时间段")
} else {
// 创建时间段
startTime := time.Date(now.Year(), now.Month(), now.Day(), startHour, 0, 0, 0, cron.location)
endTime := time.Date(now.Year(), now.Month(), now.Day(), endHour, 0, 0, 0, cron.location)
// 如果当前时间已经接近或超过了结束时间,调整结束时间
if now.After(endTime.Add(-30*time.Second)) || now.Equal(endTime) {
cron.Debug("当前时间已接近或超过结束时间,尝试查找下一个时间段")
} else {
cron.Debug("当前已在有效时间段内: 开始=%v, 结束=%v",
startTime.Format("2006-01-02 15:04:05"), endTime.Format("2006-01-02 15:04:05"))
return &TimeSlot{
Start: startTime,
End: endTime,
}
}
}
}
// 查找下一个时间段
// 先查找当天剩余时间
for h := currentHour + 1; h < 24; h++ {
if plan[dayOffset+h] == '1' {
// 找到开始小时
startHour := h
// 查找结束小时
endHour := h + 1
for j := h + 1; j < 24; j++ {
if plan[dayOffset+j] == '1' {
endHour = j + 1
} else {
break
}
}
// 创建时间段
startTime := time.Date(now.Year(), now.Month(), now.Day(), startHour, 0, 0, 0, cron.location)
endTime := time.Date(now.Year(), now.Month(), now.Day(), endHour, 0, 0, 0, cron.location)
cron.Debug("找到今天的有效时间段: 开始=%v, 结束=%v",
startTime.Format("2006-01-02 15:04:05"), endTime.Format("2006-01-02 15:04:05"))
return &TimeSlot{
Start: startTime,
End: endTime,
}
}
}
// 如果当天没有找到,则查找后续日期
for d := 1; d <= 7; d++ {
nextDay := (currentWeekday + d) % 7
dayOffset := nextDay * 24
for h := 0; h < 24; h++ {
if plan[dayOffset+h] == '1' {
// 找到开始小时
startHour := h
// 查找结束小时
endHour := h + 1
for j := h + 1; j < 24; j++ {
if plan[dayOffset+j] == '1' {
endHour = j + 1
} else {
break
}
}
// 计算日期
nextDate := now.AddDate(0, 0, d)
// 创建时间段
startTime := time.Date(nextDate.Year(), nextDate.Month(), nextDate.Day(), startHour, 0, 0, 0, cron.location)
endTime := time.Date(nextDate.Year(), nextDate.Month(), nextDate.Day(), endHour, 0, 0, 0, cron.location)
cron.Debug("找到未来有效时间段: 开始=%v, 结束=%v",
startTime.Format("2006-01-02 15:04:05"), endTime.Format("2006-01-02 15:04:05"))
return &TimeSlot{
Start: startTime,
End: endTime,
}
}
}
}
cron.Debug("未找到有效的时间段")
return nil
}
// 开始录制
func (cron *Crontab) startRecording() {
if cron.recording {
return // 已经在录制了
}
now := time.Now().In(cron.location)
cron.Info("开始录制任务: %s, 时间: %v, 计划结束时间: %v",
cron.RecordPlan.Name, now, cron.currentSlot.End)
// 构造请求体
reqBody := map[string]string{
"fragment": cron.Fragment,
"filePath": cron.FilePath,
}
jsonBody, err := json.Marshal(reqBody)
if err != nil {
cron.Error("构造请求体失败: %v", err)
return
}
// 获取 HTTP 地址
addr := cron.ctp.Plugin.GetCommonConf().HTTP.ListenAddr
if addr == "" {
addr = ":8080" // 使用默认端口
}
if addr[0] == ':' {
addr = "localhost" + addr
}
// 发送开始录制请求
resp, err := http.Post(fmt.Sprintf("http://%s/mp4/api/start/%s", addr, cron.StreamPath), "application/json", bytes.NewBuffer(jsonBody))
if err != nil {
cron.Error("开始录制失败: %v", err)
return
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
cron.Error("开始录制失败HTTP状态码: %d", resp.StatusCode)
return
}
cron.recording = true
}
// 停止录制
func (cron *Crontab) stopRecording() {
if !cron.recording {
return // 没有在录制
}
// 立即记录当前时间并重置状态,避免重复调用
now := time.Now().In(cron.location)
cron.Info("停止录制任务: %s, 时间: %v", cron.RecordPlan.Name, now)
// 先重置状态,避免循环中重复检测到停止条件
wasRecording := cron.recording
cron.recording = false
savedSlot := cron.currentSlot
cron.currentSlot = nil
// 获取 HTTP 地址
addr := cron.ctp.Plugin.GetCommonConf().HTTP.ListenAddr
if addr == "" {
addr = ":8080" // 使用默认端口
}
if addr[0] == ':' {
addr = "localhost" + addr
}
// 发送停止录制请求
resp, err := http.Post(fmt.Sprintf("http://%s/mp4/api/stop/%s", addr, cron.StreamPath), "application/json", nil)
if err != nil {
cron.Error("停止录制失败: %v", err)
// 如果请求失败,恢复状态以便下次重试
if wasRecording {
cron.recording = true
cron.currentSlot = savedSlot
}
return
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
cron.Error("停止录制失败HTTP状态码: %d", resp.StatusCode)
// 如果请求失败,恢复状态以便下次重试
if wasRecording {
cron.recording = true
cron.currentSlot = savedSlot
}
}
}

View File

@@ -3,6 +3,8 @@ package plugin_crontab
import (
"fmt"
"m7s.live/v5/pkg/util"
"m7s.live/v5"
"m7s.live/v5/plugin/crontab/pb"
"m7s.live/v5/plugin/crontab/pkg"
@@ -11,6 +13,8 @@ import (
type CrontabPlugin struct {
m7s.Plugin
pb.UnimplementedApiServer
crontabs util.Collection[string, *Crontab]
recordPlans util.Collection[uint, *pkg.RecordPlan]
}
var _ = m7s.InstallPlugin[CrontabPlugin](m7s.PluginMeta{
@@ -27,9 +31,41 @@ func (ct *CrontabPlugin) OnInit() (err error) {
return fmt.Errorf("auto migrate tables error: %v", err)
}
ct.Info("init database success")
// 查询所有录制计划
var plans []pkg.RecordPlan
if err = ct.DB.Find(&plans).Error; err != nil {
return fmt.Errorf("query record plans error: %v", err)
}
// 遍历所有计划
for _, plan := range plans {
// 将计划存入 recordPlans 集合
ct.recordPlans.Add(&plan)
// 如果计划已启用,查询对应的流信息并创建定时任务
if plan.Enable {
var streams []pkg.RecordPlanStream
model := &pkg.RecordPlanStream{PlanID: plan.ID}
if err = ct.DB.Model(model).Where(model).Find(&streams).Error; err != nil {
ct.Error("query record plan streams error: %v", err)
continue
}
// 为每个流创建定时任务
for _, stream := range streams {
crontab := &Crontab{
ctp: ct,
RecordPlan: &plan,
RecordPlanStream: &stream,
}
crontab.OnStart(func() {
ct.crontabs.Set(crontab)
})
ct.AddTask(crontab)
}
}
}
}
crontab := &Crontab{ctp: ct}
ct.AddTask(crontab)
crontab.Tick(nil)
return
}

View File

@@ -644,6 +644,462 @@ func (x *DeletePlanStreamRequest) GetStreamPath() string {
return ""
}
// 解析计划请求
type ParsePlanRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
Plan string `protobuf:"bytes,1,opt,name=plan,proto3" json:"plan,omitempty"` // 168位的0/1字符串表示一周的每个小时是否录制
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *ParsePlanRequest) Reset() {
*x = ParsePlanRequest{}
mi := &file_crontab_proto_msgTypes[9]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *ParsePlanRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ParsePlanRequest) ProtoMessage() {}
func (x *ParsePlanRequest) ProtoReflect() protoreflect.Message {
mi := &file_crontab_proto_msgTypes[9]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ParsePlanRequest.ProtoReflect.Descriptor instead.
func (*ParsePlanRequest) Descriptor() ([]byte, []int) {
return file_crontab_proto_rawDescGZIP(), []int{9}
}
func (x *ParsePlanRequest) GetPlan() string {
if x != nil {
return x.Plan
}
return ""
}
// 时间段信息
type TimeSlotInfo struct {
state protoimpl.MessageState `protogen:"open.v1"`
Start *timestamppb.Timestamp `protobuf:"bytes,1,opt,name=start,proto3" json:"start,omitempty"` // 开始时间
End *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=end,proto3" json:"end,omitempty"` // 结束时间
Weekday string `protobuf:"bytes,3,opt,name=weekday,proto3" json:"weekday,omitempty"` // 周几(例如:周一)
TimeRange string `protobuf:"bytes,4,opt,name=time_range,json=timeRange,proto3" json:"time_range,omitempty"` // 时间范围例如09:00-10:00
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *TimeSlotInfo) Reset() {
*x = TimeSlotInfo{}
mi := &file_crontab_proto_msgTypes[10]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *TimeSlotInfo) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*TimeSlotInfo) ProtoMessage() {}
func (x *TimeSlotInfo) ProtoReflect() protoreflect.Message {
mi := &file_crontab_proto_msgTypes[10]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use TimeSlotInfo.ProtoReflect.Descriptor instead.
func (*TimeSlotInfo) Descriptor() ([]byte, []int) {
return file_crontab_proto_rawDescGZIP(), []int{10}
}
func (x *TimeSlotInfo) GetStart() *timestamppb.Timestamp {
if x != nil {
return x.Start
}
return nil
}
func (x *TimeSlotInfo) GetEnd() *timestamppb.Timestamp {
if x != nil {
return x.End
}
return nil
}
func (x *TimeSlotInfo) GetWeekday() string {
if x != nil {
return x.Weekday
}
return ""
}
func (x *TimeSlotInfo) GetTimeRange() string {
if x != nil {
return x.TimeRange
}
return ""
}
// 解析计划响应
type ParsePlanResponse struct {
state protoimpl.MessageState `protogen:"open.v1"`
Code int32 `protobuf:"varint,1,opt,name=code,proto3" json:"code,omitempty"` // 响应码
Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` // 响应消息
Slots []*TimeSlotInfo `protobuf:"bytes,3,rep,name=slots,proto3" json:"slots,omitempty"` // 所有计划的时间段
NextSlot *TimeSlotInfo `protobuf:"bytes,4,opt,name=next_slot,json=nextSlot,proto3" json:"next_slot,omitempty"` // 从当前时间开始的下一个时间段
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *ParsePlanResponse) Reset() {
*x = ParsePlanResponse{}
mi := &file_crontab_proto_msgTypes[11]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *ParsePlanResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ParsePlanResponse) ProtoMessage() {}
func (x *ParsePlanResponse) ProtoReflect() protoreflect.Message {
mi := &file_crontab_proto_msgTypes[11]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ParsePlanResponse.ProtoReflect.Descriptor instead.
func (*ParsePlanResponse) Descriptor() ([]byte, []int) {
return file_crontab_proto_rawDescGZIP(), []int{11}
}
func (x *ParsePlanResponse) GetCode() int32 {
if x != nil {
return x.Code
}
return 0
}
func (x *ParsePlanResponse) GetMessage() string {
if x != nil {
return x.Message
}
return ""
}
func (x *ParsePlanResponse) GetSlots() []*TimeSlotInfo {
if x != nil {
return x.Slots
}
return nil
}
func (x *ParsePlanResponse) GetNextSlot() *TimeSlotInfo {
if x != nil {
return x.NextSlot
}
return nil
}
// 新增的消息定义
// 获取Crontab状态请求
type CrontabStatusRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
// 可以为空,表示获取所有任务
StreamPath string `protobuf:"bytes,1,opt,name=stream_path,json=streamPath,proto3" json:"stream_path,omitempty"` // 可选,按流路径过滤
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *CrontabStatusRequest) Reset() {
*x = CrontabStatusRequest{}
mi := &file_crontab_proto_msgTypes[12]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *CrontabStatusRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*CrontabStatusRequest) ProtoMessage() {}
func (x *CrontabStatusRequest) ProtoReflect() protoreflect.Message {
mi := &file_crontab_proto_msgTypes[12]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use CrontabStatusRequest.ProtoReflect.Descriptor instead.
func (*CrontabStatusRequest) Descriptor() ([]byte, []int) {
return file_crontab_proto_rawDescGZIP(), []int{12}
}
func (x *CrontabStatusRequest) GetStreamPath() string {
if x != nil {
return x.StreamPath
}
return ""
}
// 任务信息
type CrontabTaskInfo struct {
state protoimpl.MessageState `protogen:"open.v1"`
PlanId uint32 `protobuf:"varint,1,opt,name=plan_id,json=planId,proto3" json:"plan_id,omitempty"` // 计划ID
PlanName string `protobuf:"bytes,2,opt,name=plan_name,json=planName,proto3" json:"plan_name,omitempty"` // 计划名称
StreamPath string `protobuf:"bytes,3,opt,name=stream_path,json=streamPath,proto3" json:"stream_path,omitempty"` // 流路径
IsRecording bool `protobuf:"varint,4,opt,name=is_recording,json=isRecording,proto3" json:"is_recording,omitempty"` // 是否正在录制
StartTime *timestamppb.Timestamp `protobuf:"bytes,5,opt,name=start_time,json=startTime,proto3" json:"start_time,omitempty"` // 当前/下一个任务开始时间
EndTime *timestamppb.Timestamp `protobuf:"bytes,6,opt,name=end_time,json=endTime,proto3" json:"end_time,omitempty"` // 当前/下一个任务结束时间
TimeRange string `protobuf:"bytes,7,opt,name=time_range,json=timeRange,proto3" json:"time_range,omitempty"` // 时间范围例如09:00-10:00
Weekday string `protobuf:"bytes,8,opt,name=weekday,proto3" json:"weekday,omitempty"` // 周几(例如:周一)
FilePath string `protobuf:"bytes,9,opt,name=file_path,json=filePath,proto3" json:"file_path,omitempty"` // 文件保存路径
Fragment string `protobuf:"bytes,10,opt,name=fragment,proto3" json:"fragment,omitempty"` // 分片设置
ElapsedSeconds uint32 `protobuf:"varint,11,opt,name=elapsed_seconds,json=elapsedSeconds,proto3" json:"elapsed_seconds,omitempty"` // 已运行时间(秒,仅对正在运行的任务有效)
RemainingSeconds uint32 `protobuf:"varint,12,opt,name=remaining_seconds,json=remainingSeconds,proto3" json:"remaining_seconds,omitempty"` // 剩余时间(秒)
PlanSlots []*TimeSlotInfo `protobuf:"bytes,13,rep,name=plan_slots,json=planSlots,proto3" json:"plan_slots,omitempty"` // 完整的计划时间段列表
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *CrontabTaskInfo) Reset() {
*x = CrontabTaskInfo{}
mi := &file_crontab_proto_msgTypes[13]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *CrontabTaskInfo) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*CrontabTaskInfo) ProtoMessage() {}
func (x *CrontabTaskInfo) ProtoReflect() protoreflect.Message {
mi := &file_crontab_proto_msgTypes[13]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use CrontabTaskInfo.ProtoReflect.Descriptor instead.
func (*CrontabTaskInfo) Descriptor() ([]byte, []int) {
return file_crontab_proto_rawDescGZIP(), []int{13}
}
func (x *CrontabTaskInfo) GetPlanId() uint32 {
if x != nil {
return x.PlanId
}
return 0
}
func (x *CrontabTaskInfo) GetPlanName() string {
if x != nil {
return x.PlanName
}
return ""
}
func (x *CrontabTaskInfo) GetStreamPath() string {
if x != nil {
return x.StreamPath
}
return ""
}
func (x *CrontabTaskInfo) GetIsRecording() bool {
if x != nil {
return x.IsRecording
}
return false
}
func (x *CrontabTaskInfo) GetStartTime() *timestamppb.Timestamp {
if x != nil {
return x.StartTime
}
return nil
}
func (x *CrontabTaskInfo) GetEndTime() *timestamppb.Timestamp {
if x != nil {
return x.EndTime
}
return nil
}
func (x *CrontabTaskInfo) GetTimeRange() string {
if x != nil {
return x.TimeRange
}
return ""
}
func (x *CrontabTaskInfo) GetWeekday() string {
if x != nil {
return x.Weekday
}
return ""
}
func (x *CrontabTaskInfo) GetFilePath() string {
if x != nil {
return x.FilePath
}
return ""
}
func (x *CrontabTaskInfo) GetFragment() string {
if x != nil {
return x.Fragment
}
return ""
}
func (x *CrontabTaskInfo) GetElapsedSeconds() uint32 {
if x != nil {
return x.ElapsedSeconds
}
return 0
}
func (x *CrontabTaskInfo) GetRemainingSeconds() uint32 {
if x != nil {
return x.RemainingSeconds
}
return 0
}
func (x *CrontabTaskInfo) GetPlanSlots() []*TimeSlotInfo {
if x != nil {
return x.PlanSlots
}
return nil
}
// 获取Crontab状态响应
type CrontabStatusResponse struct {
state protoimpl.MessageState `protogen:"open.v1"`
Code int32 `protobuf:"varint,1,opt,name=code,proto3" json:"code,omitempty"` // 响应码
Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` // 响应消息
RunningTasks []*CrontabTaskInfo `protobuf:"bytes,3,rep,name=running_tasks,json=runningTasks,proto3" json:"running_tasks,omitempty"` // 当前正在执行的任务列表
NextTasks []*CrontabTaskInfo `protobuf:"bytes,4,rep,name=next_tasks,json=nextTasks,proto3" json:"next_tasks,omitempty"` // 下一个计划执行的任务列表
TotalRunning uint32 `protobuf:"varint,5,opt,name=total_running,json=totalRunning,proto3" json:"total_running,omitempty"` // 正在运行的任务总数
TotalPlanned uint32 `protobuf:"varint,6,opt,name=total_planned,json=totalPlanned,proto3" json:"total_planned,omitempty"` // 计划中的任务总数
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *CrontabStatusResponse) Reset() {
*x = CrontabStatusResponse{}
mi := &file_crontab_proto_msgTypes[14]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *CrontabStatusResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*CrontabStatusResponse) ProtoMessage() {}
func (x *CrontabStatusResponse) ProtoReflect() protoreflect.Message {
mi := &file_crontab_proto_msgTypes[14]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use CrontabStatusResponse.ProtoReflect.Descriptor instead.
func (*CrontabStatusResponse) Descriptor() ([]byte, []int) {
return file_crontab_proto_rawDescGZIP(), []int{14}
}
func (x *CrontabStatusResponse) GetCode() int32 {
if x != nil {
return x.Code
}
return 0
}
func (x *CrontabStatusResponse) GetMessage() string {
if x != nil {
return x.Message
}
return ""
}
func (x *CrontabStatusResponse) GetRunningTasks() []*CrontabTaskInfo {
if x != nil {
return x.RunningTasks
}
return nil
}
func (x *CrontabStatusResponse) GetNextTasks() []*CrontabTaskInfo {
if x != nil {
return x.NextTasks
}
return nil
}
func (x *CrontabStatusResponse) GetTotalRunning() uint32 {
if x != nil {
return x.TotalRunning
}
return 0
}
func (x *CrontabStatusResponse) GetTotalPlanned() uint32 {
if x != nil {
return x.TotalPlanned
}
return 0
}
var File_crontab_proto protoreflect.FileDescriptor
const file_crontab_proto_rawDesc = "" +
@@ -710,7 +1166,50 @@ const file_crontab_proto_rawDesc = "" +
"\x06planId\x18\x01 \x01(\rR\x06planId\x12\x1e\n" +
"\n" +
"streamPath\x18\x02 \x01(\tR\n" +
"streamPath2\x88\x06\n" +
"streamPath\"&\n" +
"\x10ParsePlanRequest\x12\x12\n" +
"\x04plan\x18\x01 \x01(\tR\x04plan\"\xa7\x01\n" +
"\fTimeSlotInfo\x120\n" +
"\x05start\x18\x01 \x01(\v2\x1a.google.protobuf.TimestampR\x05start\x12,\n" +
"\x03end\x18\x02 \x01(\v2\x1a.google.protobuf.TimestampR\x03end\x12\x18\n" +
"\aweekday\x18\x03 \x01(\tR\aweekday\x12\x1d\n" +
"\n" +
"time_range\x18\x04 \x01(\tR\ttimeRange\"\xa2\x01\n" +
"\x11ParsePlanResponse\x12\x12\n" +
"\x04code\x18\x01 \x01(\x05R\x04code\x12\x18\n" +
"\amessage\x18\x02 \x01(\tR\amessage\x12+\n" +
"\x05slots\x18\x03 \x03(\v2\x15.crontab.TimeSlotInfoR\x05slots\x122\n" +
"\tnext_slot\x18\x04 \x01(\v2\x15.crontab.TimeSlotInfoR\bnextSlot\"7\n" +
"\x14CrontabStatusRequest\x12\x1f\n" +
"\vstream_path\x18\x01 \x01(\tR\n" +
"streamPath\"\xfb\x03\n" +
"\x0fCrontabTaskInfo\x12\x17\n" +
"\aplan_id\x18\x01 \x01(\rR\x06planId\x12\x1b\n" +
"\tplan_name\x18\x02 \x01(\tR\bplanName\x12\x1f\n" +
"\vstream_path\x18\x03 \x01(\tR\n" +
"streamPath\x12!\n" +
"\fis_recording\x18\x04 \x01(\bR\visRecording\x129\n" +
"\n" +
"start_time\x18\x05 \x01(\v2\x1a.google.protobuf.TimestampR\tstartTime\x125\n" +
"\bend_time\x18\x06 \x01(\v2\x1a.google.protobuf.TimestampR\aendTime\x12\x1d\n" +
"\n" +
"time_range\x18\a \x01(\tR\ttimeRange\x12\x18\n" +
"\aweekday\x18\b \x01(\tR\aweekday\x12\x1b\n" +
"\tfile_path\x18\t \x01(\tR\bfilePath\x12\x1a\n" +
"\bfragment\x18\n" +
" \x01(\tR\bfragment\x12'\n" +
"\x0felapsed_seconds\x18\v \x01(\rR\x0eelapsedSeconds\x12+\n" +
"\x11remaining_seconds\x18\f \x01(\rR\x10remainingSeconds\x124\n" +
"\n" +
"plan_slots\x18\r \x03(\v2\x15.crontab.TimeSlotInfoR\tplanSlots\"\x87\x02\n" +
"\x15CrontabStatusResponse\x12\x12\n" +
"\x04code\x18\x01 \x01(\x05R\x04code\x12\x18\n" +
"\amessage\x18\x02 \x01(\tR\amessage\x12=\n" +
"\rrunning_tasks\x18\x03 \x03(\v2\x18.crontab.CrontabTaskInfoR\frunningTasks\x127\n" +
"\n" +
"next_tasks\x18\x04 \x03(\v2\x18.crontab.CrontabTaskInfoR\tnextTasks\x12#\n" +
"\rtotal_running\x18\x05 \x01(\rR\ftotalRunning\x12#\n" +
"\rtotal_planned\x18\x06 \x01(\rR\ftotalPlanned2\xe0\a\n" +
"\x03api\x12O\n" +
"\x04List\x12\x14.crontab.ReqPlanList\x1a\x19.crontab.PlanResponseList\"\x16\x82\xd3\xe4\x93\x02\x10\x12\x0e/plan/api/list\x12A\n" +
"\x03Add\x12\r.crontab.Plan\x1a\x11.crontab.Response\"\x18\x82\xd3\xe4\x93\x02\x12:\x01*\"\r/plan/api/add\x12L\n" +
@@ -719,7 +1218,9 @@ const file_crontab_proto_rawDesc = "" +
"\x15ListRecordPlanStreams\x12\x1a.crontab.ReqPlanStreamList\x1a%.crontab.RecordPlanStreamResponseList\"\x1c\x82\xd3\xe4\x93\x02\x16\x12\x14/planstream/api/list\x12]\n" +
"\x13AddRecordPlanStream\x12\x13.crontab.PlanStream\x1a\x11.crontab.Response\"\x1e\x82\xd3\xe4\x93\x02\x18:\x01*\"\x13/planstream/api/add\x12c\n" +
"\x16UpdateRecordPlanStream\x12\x13.crontab.PlanStream\x1a\x11.crontab.Response\"!\x82\xd3\xe4\x93\x02\x1b:\x01*\"\x16/planstream/api/update\x12\x89\x01\n" +
"\x16RemoveRecordPlanStream\x12 .crontab.DeletePlanStreamRequest\x1a\x11.crontab.Response\":\x82\xd3\xe4\x93\x024:\x01*\"//planstream/api/remove/{planId}/{streamPath=**}B\x1fZ\x1dm7s.live/v5/plugin/crontab/pbb\x06proto3"
"\x16RemoveRecordPlanStream\x12 .crontab.DeletePlanStreamRequest\x1a\x11.crontab.Response\":\x82\xd3\xe4\x93\x024:\x01*\"//planstream/api/remove/{planId}/{streamPath=**}\x12f\n" +
"\rParsePlanTime\x12\x19.crontab.ParsePlanRequest\x1a\x1a.crontab.ParsePlanResponse\"\x1e\x82\xd3\xe4\x93\x02\x18\x12\x16/plan/api/parse/{plan}\x12n\n" +
"\x10GetCrontabStatus\x12\x1d.crontab.CrontabStatusRequest\x1a\x1e.crontab.CrontabStatusResponse\"\x1b\x82\xd3\xe4\x93\x02\x15\x12\x13/crontab/api/statusB\x1fZ\x1dm7s.live/v5/plugin/crontab/pbb\x06proto3"
var (
file_crontab_proto_rawDescOnce sync.Once
@@ -733,7 +1234,7 @@ func file_crontab_proto_rawDescGZIP() []byte {
return file_crontab_proto_rawDescData
}
var file_crontab_proto_msgTypes = make([]protoimpl.MessageInfo, 9)
var file_crontab_proto_msgTypes = make([]protoimpl.MessageInfo, 15)
var file_crontab_proto_goTypes = []any{
(*PlanResponseList)(nil), // 0: crontab.PlanResponseList
(*Plan)(nil), // 1: crontab.Plan
@@ -744,36 +1245,55 @@ var file_crontab_proto_goTypes = []any{
(*ReqPlanStreamList)(nil), // 6: crontab.ReqPlanStreamList
(*RecordPlanStreamResponseList)(nil), // 7: crontab.RecordPlanStreamResponseList
(*DeletePlanStreamRequest)(nil), // 8: crontab.DeletePlanStreamRequest
(*timestamppb.Timestamp)(nil), // 9: google.protobuf.Timestamp
(*ParsePlanRequest)(nil), // 9: crontab.ParsePlanRequest
(*TimeSlotInfo)(nil), // 10: crontab.TimeSlotInfo
(*ParsePlanResponse)(nil), // 11: crontab.ParsePlanResponse
(*CrontabStatusRequest)(nil), // 12: crontab.CrontabStatusRequest
(*CrontabTaskInfo)(nil), // 13: crontab.CrontabTaskInfo
(*CrontabStatusResponse)(nil), // 14: crontab.CrontabStatusResponse
(*timestamppb.Timestamp)(nil), // 15: google.protobuf.Timestamp
}
var file_crontab_proto_depIdxs = []int32{
1, // 0: crontab.PlanResponseList.data:type_name -> crontab.Plan
9, // 1: crontab.Plan.createTime:type_name -> google.protobuf.Timestamp
9, // 2: crontab.Plan.updateTime:type_name -> google.protobuf.Timestamp
9, // 3: crontab.PlanStream.created_at:type_name -> google.protobuf.Timestamp
9, // 4: crontab.PlanStream.updated_at:type_name -> google.protobuf.Timestamp
15, // 1: crontab.Plan.createTime:type_name -> google.protobuf.Timestamp
15, // 2: crontab.Plan.updateTime:type_name -> google.protobuf.Timestamp
15, // 3: crontab.PlanStream.created_at:type_name -> google.protobuf.Timestamp
15, // 4: crontab.PlanStream.updated_at:type_name -> google.protobuf.Timestamp
5, // 5: crontab.RecordPlanStreamResponseList.data:type_name -> crontab.PlanStream
2, // 6: crontab.api.List:input_type -> crontab.ReqPlanList
1, // 7: crontab.api.Add:input_type -> crontab.Plan
1, // 8: crontab.api.Update:input_type -> crontab.Plan
3, // 9: crontab.api.Remove:input_type -> crontab.DeleteRequest
6, // 10: crontab.api.ListRecordPlanStreams:input_type -> crontab.ReqPlanStreamList
5, // 11: crontab.api.AddRecordPlanStream:input_type -> crontab.PlanStream
5, // 12: crontab.api.UpdateRecordPlanStream:input_type -> crontab.PlanStream
8, // 13: crontab.api.RemoveRecordPlanStream:input_type -> crontab.DeletePlanStreamRequest
0, // 14: crontab.api.List:output_type -> crontab.PlanResponseList
4, // 15: crontab.api.Add:output_type -> crontab.Response
4, // 16: crontab.api.Update:output_type -> crontab.Response
4, // 17: crontab.api.Remove:output_type -> crontab.Response
7, // 18: crontab.api.ListRecordPlanStreams:output_type -> crontab.RecordPlanStreamResponseList
4, // 19: crontab.api.AddRecordPlanStream:output_type -> crontab.Response
4, // 20: crontab.api.UpdateRecordPlanStream:output_type -> crontab.Response
4, // 21: crontab.api.RemoveRecordPlanStream:output_type -> crontab.Response
14, // [14:22] is the sub-list for method output_type
6, // [6:14] is the sub-list for method input_type
6, // [6:6] is the sub-list for extension type_name
6, // [6:6] is the sub-list for extension extendee
0, // [0:6] is the sub-list for field type_name
15, // 6: crontab.TimeSlotInfo.start:type_name -> google.protobuf.Timestamp
15, // 7: crontab.TimeSlotInfo.end:type_name -> google.protobuf.Timestamp
10, // 8: crontab.ParsePlanResponse.slots:type_name -> crontab.TimeSlotInfo
10, // 9: crontab.ParsePlanResponse.next_slot:type_name -> crontab.TimeSlotInfo
15, // 10: crontab.CrontabTaskInfo.start_time:type_name -> google.protobuf.Timestamp
15, // 11: crontab.CrontabTaskInfo.end_time:type_name -> google.protobuf.Timestamp
10, // 12: crontab.CrontabTaskInfo.plan_slots:type_name -> crontab.TimeSlotInfo
13, // 13: crontab.CrontabStatusResponse.running_tasks:type_name -> crontab.CrontabTaskInfo
13, // 14: crontab.CrontabStatusResponse.next_tasks:type_name -> crontab.CrontabTaskInfo
2, // 15: crontab.api.List:input_type -> crontab.ReqPlanList
1, // 16: crontab.api.Add:input_type -> crontab.Plan
1, // 17: crontab.api.Update:input_type -> crontab.Plan
3, // 18: crontab.api.Remove:input_type -> crontab.DeleteRequest
6, // 19: crontab.api.ListRecordPlanStreams:input_type -> crontab.ReqPlanStreamList
5, // 20: crontab.api.AddRecordPlanStream:input_type -> crontab.PlanStream
5, // 21: crontab.api.UpdateRecordPlanStream:input_type -> crontab.PlanStream
8, // 22: crontab.api.RemoveRecordPlanStream:input_type -> crontab.DeletePlanStreamRequest
9, // 23: crontab.api.ParsePlanTime:input_type -> crontab.ParsePlanRequest
12, // 24: crontab.api.GetCrontabStatus:input_type -> crontab.CrontabStatusRequest
0, // 25: crontab.api.List:output_type -> crontab.PlanResponseList
4, // 26: crontab.api.Add:output_type -> crontab.Response
4, // 27: crontab.api.Update:output_type -> crontab.Response
4, // 28: crontab.api.Remove:output_type -> crontab.Response
7, // 29: crontab.api.ListRecordPlanStreams:output_type -> crontab.RecordPlanStreamResponseList
4, // 30: crontab.api.AddRecordPlanStream:output_type -> crontab.Response
4, // 31: crontab.api.UpdateRecordPlanStream:output_type -> crontab.Response
4, // 32: crontab.api.RemoveRecordPlanStream:output_type -> crontab.Response
11, // 33: crontab.api.ParsePlanTime:output_type -> crontab.ParsePlanResponse
14, // 34: crontab.api.GetCrontabStatus:output_type -> crontab.CrontabStatusResponse
25, // [25:35] is the sub-list for method output_type
15, // [15:25] is the sub-list for method input_type
15, // [15:15] is the sub-list for extension type_name
15, // [15:15] is the sub-list for extension extendee
0, // [0:15] is the sub-list for field type_name
}
func init() { file_crontab_proto_init() }
@@ -787,7 +1307,7 @@ func file_crontab_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: unsafe.Slice(unsafe.StringData(file_crontab_proto_rawDesc), len(file_crontab_proto_rawDesc)),
NumEnums: 0,
NumMessages: 9,
NumMessages: 15,
NumExtensions: 0,
NumServices: 1,
},

View File

@@ -315,6 +315,76 @@ func local_request_Api_RemoveRecordPlanStream_0(ctx context.Context, marshaler r
return msg, metadata, err
}
func request_Api_ParsePlanTime_0(ctx context.Context, marshaler runtime.Marshaler, client ApiClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var (
protoReq ParsePlanRequest
metadata runtime.ServerMetadata
err error
)
io.Copy(io.Discard, req.Body)
val, ok := pathParams["plan"]
if !ok {
return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "plan")
}
protoReq.Plan, err = runtime.String(val)
if err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "plan", err)
}
msg, err := client.ParsePlanTime(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func local_request_Api_ParsePlanTime_0(ctx context.Context, marshaler runtime.Marshaler, server ApiServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var (
protoReq ParsePlanRequest
metadata runtime.ServerMetadata
err error
)
val, ok := pathParams["plan"]
if !ok {
return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "plan")
}
protoReq.Plan, err = runtime.String(val)
if err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "plan", err)
}
msg, err := server.ParsePlanTime(ctx, &protoReq)
return msg, metadata, err
}
var filter_Api_GetCrontabStatus_0 = &utilities.DoubleArray{Encoding: map[string]int{}, Base: []int(nil), Check: []int(nil)}
func request_Api_GetCrontabStatus_0(ctx context.Context, marshaler runtime.Marshaler, client ApiClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var (
protoReq CrontabStatusRequest
metadata runtime.ServerMetadata
)
io.Copy(io.Discard, req.Body)
if err := req.ParseForm(); err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
if err := runtime.PopulateQueryParameters(&protoReq, req.Form, filter_Api_GetCrontabStatus_0); err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
msg, err := client.GetCrontabStatus(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func local_request_Api_GetCrontabStatus_0(ctx context.Context, marshaler runtime.Marshaler, server ApiServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var (
protoReq CrontabStatusRequest
metadata runtime.ServerMetadata
)
if err := req.ParseForm(); err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
if err := runtime.PopulateQueryParameters(&protoReq, req.Form, filter_Api_GetCrontabStatus_0); err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
msg, err := server.GetCrontabStatus(ctx, &protoReq)
return msg, metadata, err
}
// RegisterApiHandlerServer registers the http handlers for service Api to "mux".
// UnaryRPC :call ApiServer directly.
// StreamingRPC :currently unsupported pending https://github.com/grpc/grpc-go/issues/906.
@@ -481,6 +551,46 @@ func RegisterApiHandlerServer(ctx context.Context, mux *runtime.ServeMux, server
}
forward_Api_RemoveRecordPlanStream_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle(http.MethodGet, pattern_Api_ParsePlanTime_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
var stream runtime.ServerTransportStream
ctx = grpc.NewContextWithServerTransportStream(ctx, &stream)
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
annotatedContext, err := runtime.AnnotateIncomingContext(ctx, mux, req, "/crontab.Api/ParsePlanTime", runtime.WithHTTPPathPattern("/plan/api/parse/{plan}"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := local_request_Api_ParsePlanTime_0(annotatedContext, inboundMarshaler, server, req, pathParams)
md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer())
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_ParsePlanTime_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle(http.MethodGet, pattern_Api_GetCrontabStatus_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
var stream runtime.ServerTransportStream
ctx = grpc.NewContextWithServerTransportStream(ctx, &stream)
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
annotatedContext, err := runtime.AnnotateIncomingContext(ctx, mux, req, "/crontab.Api/GetCrontabStatus", runtime.WithHTTPPathPattern("/crontab/api/status"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := local_request_Api_GetCrontabStatus_0(annotatedContext, inboundMarshaler, server, req, pathParams)
md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer())
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_GetCrontabStatus_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
return nil
}
@@ -657,6 +767,40 @@ func RegisterApiHandlerClient(ctx context.Context, mux *runtime.ServeMux, client
}
forward_Api_RemoveRecordPlanStream_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle(http.MethodGet, pattern_Api_ParsePlanTime_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
annotatedContext, err := runtime.AnnotateContext(ctx, mux, req, "/crontab.Api/ParsePlanTime", runtime.WithHTTPPathPattern("/plan/api/parse/{plan}"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_Api_ParsePlanTime_0(annotatedContext, inboundMarshaler, client, req, pathParams)
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_ParsePlanTime_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle(http.MethodGet, pattern_Api_GetCrontabStatus_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
annotatedContext, err := runtime.AnnotateContext(ctx, mux, req, "/crontab.Api/GetCrontabStatus", runtime.WithHTTPPathPattern("/crontab/api/status"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_Api_GetCrontabStatus_0(annotatedContext, inboundMarshaler, client, req, pathParams)
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_GetCrontabStatus_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
return nil
}
@@ -669,6 +813,8 @@ var (
pattern_Api_AddRecordPlanStream_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"planstream", "api", "add"}, ""))
pattern_Api_UpdateRecordPlanStream_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"planstream", "api", "update"}, ""))
pattern_Api_RemoveRecordPlanStream_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 1, 0, 4, 1, 5, 3, 3, 0, 4, 1, 5, 4}, []string{"planstream", "api", "remove", "planId", "streamPath"}, ""))
pattern_Api_ParsePlanTime_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 1, 0, 4, 1, 5, 0}, []string{"plan", "api", "parse"}, ""))
pattern_Api_GetCrontabStatus_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"crontab", "api", "status"}, ""))
)
var (
@@ -680,4 +826,6 @@ var (
forward_Api_AddRecordPlanStream_0 = runtime.ForwardResponseMessage
forward_Api_UpdateRecordPlanStream_0 = runtime.ForwardResponseMessage
forward_Api_RemoveRecordPlanStream_0 = runtime.ForwardResponseMessage
forward_Api_ParsePlanTime_0 = runtime.ForwardResponseMessage
forward_Api_GetCrontabStatus_0 = runtime.ForwardResponseMessage
)

View File

@@ -53,6 +53,20 @@ service api {
body: "*"
};
}
// 解析计划字符串,返回时间段信息
rpc ParsePlanTime (ParsePlanRequest) returns (ParsePlanResponse) {
option (google.api.http) = {
get: "/plan/api/parse/{plan}"
};
}
// 获取当前Crontab任务状态
rpc GetCrontabStatus (CrontabStatusRequest) returns (CrontabStatusResponse) {
option (google.api.http) = {
get: "/crontab/api/status"
};
}
}
message PlanResponseList {
@@ -118,4 +132,59 @@ message RecordPlanStreamResponseList {
message DeletePlanStreamRequest {
uint32 planId = 1;
string streamPath = 2;
}
// 解析计划请求
message ParsePlanRequest {
string plan = 1; // 168位的0/1字符串表示一周的每个小时是否录制
}
// 时间段信息
message TimeSlotInfo {
google.protobuf.Timestamp start = 1; // 开始时间
google.protobuf.Timestamp end = 2; // 结束时间
string weekday = 3; // 周几(例如:周一)
string time_range = 4; // 时间范围例如09:00-10:00
}
// 解析计划响应
message ParsePlanResponse {
int32 code = 1; // 响应码
string message = 2; // 响应消息
repeated TimeSlotInfo slots = 3; // 所有计划的时间段
TimeSlotInfo next_slot = 4; // 从当前时间开始的下一个时间段
}
// 新增的消息定义
// 获取Crontab状态请求
message CrontabStatusRequest {
// 可以为空,表示获取所有任务
string stream_path = 1; // 可选,按流路径过滤
}
// 任务信息
message CrontabTaskInfo {
uint32 plan_id = 1; // 计划ID
string plan_name = 2; // 计划名称
string stream_path = 3; // 流路径
bool is_recording = 4; // 是否正在录制
google.protobuf.Timestamp start_time = 5; // 当前/下一个任务开始时间
google.protobuf.Timestamp end_time = 6; // 当前/下一个任务结束时间
string time_range = 7; // 时间范围例如09:00-10:00
string weekday = 8; // 周几(例如:周一)
string file_path = 9; // 文件保存路径
string fragment = 10; // 分片设置
uint32 elapsed_seconds = 11; // 已运行时间(秒,仅对正在运行的任务有效)
uint32 remaining_seconds = 12; // 剩余时间(秒)
repeated TimeSlotInfo plan_slots = 13; // 完整的计划时间段列表
}
// 获取Crontab状态响应
message CrontabStatusResponse {
int32 code = 1; // 响应码
string message = 2; // 响应消息
repeated CrontabTaskInfo running_tasks = 3; // 当前正在执行的任务列表
repeated CrontabTaskInfo next_tasks = 4; // 下一个计划执行的任务列表
uint32 total_running = 5; // 正在运行的任务总数
uint32 total_planned = 6; // 计划中的任务总数
}

View File

@@ -27,6 +27,8 @@ const (
Api_AddRecordPlanStream_FullMethodName = "/crontab.api/AddRecordPlanStream"
Api_UpdateRecordPlanStream_FullMethodName = "/crontab.api/UpdateRecordPlanStream"
Api_RemoveRecordPlanStream_FullMethodName = "/crontab.api/RemoveRecordPlanStream"
Api_ParsePlanTime_FullMethodName = "/crontab.api/ParsePlanTime"
Api_GetCrontabStatus_FullMethodName = "/crontab.api/GetCrontabStatus"
)
// ApiClient is the client API for Api service.
@@ -42,6 +44,10 @@ type ApiClient interface {
AddRecordPlanStream(ctx context.Context, in *PlanStream, opts ...grpc.CallOption) (*Response, error)
UpdateRecordPlanStream(ctx context.Context, in *PlanStream, opts ...grpc.CallOption) (*Response, error)
RemoveRecordPlanStream(ctx context.Context, in *DeletePlanStreamRequest, opts ...grpc.CallOption) (*Response, error)
// 解析计划字符串,返回时间段信息
ParsePlanTime(ctx context.Context, in *ParsePlanRequest, opts ...grpc.CallOption) (*ParsePlanResponse, error)
// 获取当前Crontab任务状态
GetCrontabStatus(ctx context.Context, in *CrontabStatusRequest, opts ...grpc.CallOption) (*CrontabStatusResponse, error)
}
type apiClient struct {
@@ -132,6 +138,26 @@ func (c *apiClient) RemoveRecordPlanStream(ctx context.Context, in *DeletePlanSt
return out, nil
}
func (c *apiClient) ParsePlanTime(ctx context.Context, in *ParsePlanRequest, opts ...grpc.CallOption) (*ParsePlanResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(ParsePlanResponse)
err := c.cc.Invoke(ctx, Api_ParsePlanTime_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *apiClient) GetCrontabStatus(ctx context.Context, in *CrontabStatusRequest, opts ...grpc.CallOption) (*CrontabStatusResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(CrontabStatusResponse)
err := c.cc.Invoke(ctx, Api_GetCrontabStatus_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
// ApiServer is the server API for Api service.
// All implementations must embed UnimplementedApiServer
// for forward compatibility.
@@ -145,6 +171,10 @@ type ApiServer interface {
AddRecordPlanStream(context.Context, *PlanStream) (*Response, error)
UpdateRecordPlanStream(context.Context, *PlanStream) (*Response, error)
RemoveRecordPlanStream(context.Context, *DeletePlanStreamRequest) (*Response, error)
// 解析计划字符串,返回时间段信息
ParsePlanTime(context.Context, *ParsePlanRequest) (*ParsePlanResponse, error)
// 获取当前Crontab任务状态
GetCrontabStatus(context.Context, *CrontabStatusRequest) (*CrontabStatusResponse, error)
mustEmbedUnimplementedApiServer()
}
@@ -179,6 +209,12 @@ func (UnimplementedApiServer) UpdateRecordPlanStream(context.Context, *PlanStrea
func (UnimplementedApiServer) RemoveRecordPlanStream(context.Context, *DeletePlanStreamRequest) (*Response, error) {
return nil, status.Errorf(codes.Unimplemented, "method RemoveRecordPlanStream not implemented")
}
func (UnimplementedApiServer) ParsePlanTime(context.Context, *ParsePlanRequest) (*ParsePlanResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method ParsePlanTime not implemented")
}
func (UnimplementedApiServer) GetCrontabStatus(context.Context, *CrontabStatusRequest) (*CrontabStatusResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetCrontabStatus not implemented")
}
func (UnimplementedApiServer) mustEmbedUnimplementedApiServer() {}
func (UnimplementedApiServer) testEmbeddedByValue() {}
@@ -344,6 +380,42 @@ func _Api_RemoveRecordPlanStream_Handler(srv interface{}, ctx context.Context, d
return interceptor(ctx, in, info, handler)
}
func _Api_ParsePlanTime_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ParsePlanRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ApiServer).ParsePlanTime(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: Api_ParsePlanTime_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ApiServer).ParsePlanTime(ctx, req.(*ParsePlanRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Api_GetCrontabStatus_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(CrontabStatusRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ApiServer).GetCrontabStatus(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: Api_GetCrontabStatus_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ApiServer).GetCrontabStatus(ctx, req.(*CrontabStatusRequest))
}
return interceptor(ctx, in, info, handler)
}
// Api_ServiceDesc is the grpc.ServiceDesc for Api service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
@@ -383,6 +455,14 @@ var Api_ServiceDesc = grpc.ServiceDesc{
MethodName: "RemoveRecordPlanStream",
Handler: _Api_RemoveRecordPlanStream_Handler,
},
{
MethodName: "ParsePlanTime",
Handler: _Api_ParsePlanTime_Handler,
},
{
MethodName: "GetCrontabStatus",
Handler: _Api_GetCrontabStatus_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "crontab.proto",

View File

@@ -11,3 +11,7 @@ type RecordPlan struct {
Plan string `json:"plan" gorm:"type:text"`
Enable bool `json:"enable" gorm:"default:false"` // 是否启用
}
func (r *RecordPlan) GetKey() uint {
return r.ID
}