feat: add logrotate

This commit is contained in:
langhuihui
2024-05-07 08:58:21 +08:00
parent e316214b92
commit d268e786d7
29 changed files with 1234 additions and 115 deletions

3
.gitignore vendored
View File

@@ -1,2 +1,3 @@
.history
.vscode
.vscode
logs

View File

@@ -2,6 +2,8 @@ global:
loglevel: trace
tcp:
listenaddr: :50051
logrotate:
level: trace
webrtc:
publish:
pubaudio: false

View File

@@ -8,6 +8,7 @@ import (
_ "m7s.live/m7s/v5/plugin/hdl"
_ "m7s.live/m7s/v5/plugin/webrtc"
_ "m7s.live/m7s/v5/plugin/rtmp"
_ "m7s.live/m7s/v5/plugin/logrotate"
)
func main() {

13
go.mod
View File

@@ -9,16 +9,23 @@ require (
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1
github.com/pion/interceptor v0.1.29
github.com/q191201771/naza v0.30.48
github.com/quic-go/quic-go v0.42.0
github.com/quic-go/quic-go v0.43.1
google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de
google.golang.org/grpc v1.63.2
google.golang.org/protobuf v1.33.0
)
require (
github.com/chromedp/cdproto v0.0.0-20240202021202-6d0b6a386732 // indirect
github.com/chromedp/sysutil v1.0.0 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/gobwas/httphead v0.1.0 // indirect
github.com/gobwas/pool v0.2.1 // indirect
github.com/gobwas/ws v1.3.2 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/pion/datachannel v1.5.6 // indirect
github.com/pion/dtls/v2 v2.2.10 // indirect
github.com/pion/ice/v3 v3.0.6 // indirect
@@ -46,7 +53,9 @@ require (
)
require (
github.com/alchemy/rotoslog v0.2.2
github.com/bluenviron/mediacommon v1.9.2
github.com/chromedp/chromedp v0.9.5
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 // indirect
github.com/gorilla/websocket v1.5.1
@@ -57,7 +66,7 @@ require (
github.com/shirou/gopsutil/v3 v3.24.3
go.uber.org/mock v0.4.0 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/exp v0.0.0-20221205204356-47842c84f3db // indirect
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 // indirect
golang.org/x/mod v0.11.0 // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/sys v0.19.0 // indirect

25
go.sum
View File

@@ -1,6 +1,14 @@
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/alchemy/rotoslog v0.2.2 h1:yzAOjaQBKgJvAdPi0sF5KSPMq5f2vNJZEnPr73CPDzQ=
github.com/alchemy/rotoslog v0.2.2/go.mod h1:pOHF0DKryPLaQzjcUlidLVRTksvk9yW75YIu1yYiiEQ=
github.com/bluenviron/mediacommon v1.9.2 h1:EHcvoC5YMXRcFE010bTNf07ZiSlB/e/AdZyG7GsEYN0=
github.com/bluenviron/mediacommon v1.9.2/go.mod h1:lt8V+wMyPw8C69HAqDWV5tsAwzN9u2Z+ca8B6C//+n0=
github.com/chromedp/cdproto v0.0.0-20240202021202-6d0b6a386732 h1:XYUCaZrW8ckGWlCRJKCSoh/iFwlpX316a8yY9IFEzv8=
github.com/chromedp/cdproto v0.0.0-20240202021202-6d0b6a386732/go.mod h1:GKljq0VrfU4D5yc+2qA6OVr8pmO/MBbPEWqWQ/oqGEs=
github.com/chromedp/chromedp v0.9.5 h1:viASzruPJOiThk7c5bueOUY91jGLJVximoEMGoH93rg=
github.com/chromedp/chromedp v0.9.5/go.mod h1:D4I2qONslauw/C7INoCir1BJkSwBYMyZgx8X276z3+Y=
github.com/chromedp/sysutil v1.0.0 h1:+ZxhTpfpZlmchB58ih/LBHX52ky7w2VhQVKQMucy3Ic=
github.com/chromedp/sysutil v1.0.0/go.mod h1:kgWmDdq8fTzXYcKIBqIYvRRTnYb9aNS9moAV0xufSww=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
@@ -23,6 +31,12 @@ github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI=
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls=
github.com/gobwas/httphead v0.1.0 h1:exrUm0f4YX0L7EBwZHuCF4GDp8aJfVeBrlLQrs6NqWU=
github.com/gobwas/httphead v0.1.0/go.mod h1:O/RXo79gxV8G+RqlR/otEwx4Q36zl9rqC5u12GKvMCM=
github.com/gobwas/pool v0.2.1 h1:xfeeEhW7pwmX8nuLVlqbzVc7udMDrwetjEv+TZIz1og=
github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
github.com/gobwas/ws v1.3.2 h1:zlnbNHxumkRvfPWgfXu8RBwyNR1x8wh9cf5PTOCqs9Q=
github.com/gobwas/ws v1.3.2/go.mod h1:hRKAFb8wOxFROYNsT1bqfWnhX+b5MFeJM9r2ZSwg/KY=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
@@ -41,6 +55,8 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1 h1:/c3QmbOGMGTOumP2iT/rCwB7b0Q
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1/go.mod h1:5SN9VR2LTsRFsrEC6FHgRbTWrTHu6tqPeKxEQv15giM=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/kelindar/process v0.0.0-20170730150328-69a29e249ec3/go.mod h1:+lTCLnZFXOkqwD8sLPl6u4erAc0cP8wFegQHfipz7KE=
github.com/kelindar/rate v1.0.0/go.mod h1:AjT4G+hTItNwt30lucEGZIz8y7Uk5zPho6vurIZ+1Es=
github.com/kelindar/tcp v1.0.0/go.mod h1:JB5hj1cshLU60XrLij2BBxW3JQ4hOye8vqbyvuKb52k=
@@ -50,8 +66,11 @@ github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3x
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/ledongthuc/pdf v0.0.0-20220302134840-0c2507a12d80/go.mod h1:imJHygn/1yfhB7XSJJKlFZKl/J+dCPAknuiaGOshXAs=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/mcuadros/go-defaults v1.2.0 h1:FODb8WSf0uGaY8elWJAkoLL0Ri6AlZ1bFlenk56oZtc=
github.com/mcuadros/go-defaults v1.2.0/go.mod h1:WEZtHEVIGYVDqkKSWBdWKUVdRyKlMfulPaGDWIVeCWY=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
@@ -62,6 +81,7 @@ github.com/onsi/ginkgo/v2 v2.9.5/go.mod h1:tvAoo1QUJwNEU2ITftXTpR7R1RbCzoZUOs3Ro
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.27.6 h1:ENqfyGeS5AX/rlXDd/ETokDz93u0YufY1Pgxuy/PvWE=
github.com/onsi/gomega v1.27.6/go.mod h1:PIQNjfQwkP3aQAH7lf7j87O/5FiNr+ZR8+ipb+qQlhg=
github.com/orisano/pixelmatch v0.0.0-20220722002657-fb0b55479cde/go.mod h1:nZgzbfBr3hhjoZnS66nKrHmduYNpc34ny7RK4z5/HM0=
github.com/phsym/console-slog v0.3.1 h1:Fuzcrjr40xTc004S9Kni8XfNsk+qrptQmyR+wZw9/7A=
github.com/phsym/console-slog v0.3.1/go.mod h1:oJskjp/X6e6c0mGpfP8ELkfKUsrkDifYRAqJQgmdDS0=
github.com/pion/datachannel v1.5.6 h1:1IxKJntfSlYkpUj8LlYRSWpYiTTC02nUrOE8T3DqGeg=
@@ -114,6 +134,8 @@ github.com/q191201771/naza v0.30.48 h1:lbYUaa7A15kJKYwOiU4AbFS1Zo8oQwppl2tLEbJTq
github.com/q191201771/naza v0.30.48/go.mod h1:n+dpJjQSh90PxBwxBNuifOwQttywvSIN5TkWSSYCeBk=
github.com/quic-go/quic-go v0.42.0 h1:uSfdap0eveIl8KXnipv9K7nlwZ5IqLlYOpJ58u5utpM=
github.com/quic-go/quic-go v0.42.0/go.mod h1:132kz4kL3F9vxhW3CtQJLDVwcFe5wdWeJXXijhsO57M=
github.com/quic-go/quic-go v0.43.1 h1:fLiMNfQVe9q2JvSsiXo4fXOEguXHGGl9+6gLp4RPeZQ=
github.com/quic-go/quic-go v0.43.1/go.mod h1:132kz4kL3F9vxhW3CtQJLDVwcFe5wdWeJXXijhsO57M=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/shirou/gopsutil/v3 v3.24.3 h1:eoUGJSmdfLzJ3mxIhmOAhgKEKgQkeOwKpz1NbhVnuPE=
@@ -156,6 +178,8 @@ golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30=
golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M=
golang.org/x/exp v0.0.0-20221205204356-47842c84f3db h1:D/cFflL63o2KSLJIwjlcIt8PR064j/xsmdEJL/YvY/o=
golang.org/x/exp v0.0.0-20221205204356-47842c84f3db/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 h1:k/i9J1pBpvlfR+9QsetwPyERsqu1GIbi967PQMq3Ivc=
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.11.0 h1:bUO06HqtnRcc/7l71XBe4WcqTZ+3AH1J59zWDDwLKgU=
@@ -189,6 +213,7 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=

View File

@@ -50,7 +50,7 @@ type (
DataFrame
IDR bool
Timestamp time.Duration // 绝对时间戳
Wrap IAVFrame // 封装格式
Wraps []IAVFrame // 封装格式
}
AVRing = util.Ring[AVFrame]
DataFrame struct {
@@ -66,9 +66,9 @@ type (
func (frame *AVFrame) Reset() {
frame.BytesIn = 0
frame.Timestamp = 0
if frame.Wrap != nil {
frame.Wrap.Recycle()
frame.Wrap = nil
for _, wrap := range frame.Wraps {
wrap.Recycle()
wrap = nil
}
}

67
pkg/log.go Normal file
View File

@@ -0,0 +1,67 @@
package pkg
import (
"context"
"log/slog"
"slices"
)
var _ slog.Handler = (*MultiLogHandler)(nil)
type MultiLogHandler struct {
handlers []slog.Handler
level slog.Level
}
func (m *MultiLogHandler) Add(h ...slog.Handler) {
m.handlers = append(m.handlers, h...)
}
func (m *MultiLogHandler) Remove(h slog.Handler) {
if i := slices.Index(m.handlers, h); i != -1 {
m.handlers = slices.Delete(m.handlers, i, i+1)
}
}
func (m *MultiLogHandler) SetLevel(level slog.Level) {
m.level = level
}
// Enabled implements slog.Handler.
func (m *MultiLogHandler) Enabled(_ context.Context, l slog.Level) bool {
return l >= m.level
}
// Handle implements slog.Handler.
func (m *MultiLogHandler) Handle(ctx context.Context, rec slog.Record) error {
for _, h := range m.handlers {
if err := h.Handle(ctx, rec); err != nil {
return err
}
}
return nil
}
// WithAttrs implements slog.Handler.
func (m *MultiLogHandler) WithAttrs(attrs []slog.Attr) slog.Handler {
result := &MultiLogHandler{
handlers: make([]slog.Handler, len(m.handlers)),
level: m.level,
}
for i, h := range m.handlers {
result.handlers[i] = h.WithAttrs(attrs)
}
return result
}
// WithGroup implements slog.Handler.
func (m *MultiLogHandler) WithGroup(name string) slog.Handler {
result := &MultiLogHandler{
handlers: make([]slog.Handler, len(m.handlers)),
level: m.level,
}
for i, h := range m.handlers {
result.handlers[i] = h.WithGroup(name)
}
return result
}

View File

@@ -15,19 +15,22 @@ var EmptyLocker emptyLocker
type RingWriter struct {
*util.Ring[AVFrame] `json:"-" yaml:"-"`
ReaderCount atomic.Int32 `json:"-" yaml:"-"`
IDRingList `json:"-" yaml:"-"` //最近的关键帧位置,首屏渲染
ReaderCount atomic.Int32 `json:"-" yaml:"-"`
pool *util.Ring[AVFrame]
poolSize int
Size int
LastValue *AVFrame
}
func (rb *RingWriter) Init(n int) *RingWriter {
rb.Ring = util.NewRing[AVFrame](n)
rb.Size = n
func NewRingWriter(n int) (rb *RingWriter) {
rb = &RingWriter{
Size: n,
Ring: util.NewRing[AVFrame](n),
}
rb.LastValue = &rb.Value
rb.LastValue.StartWrite()
return rb
return
}
func (rb *RingWriter) Resize(size int) {

View File

@@ -7,8 +7,7 @@ import (
)
func TestRing(t *testing.T) {
var w RingWriter
w.Init(10)
w := NewRingWriter(10)
ctx, _ := context.WithTimeout(context.Background(), time.Second*5)
go t.Run("writer", func(t *testing.T) {
for i := 0; ctx.Err() == nil; i++ {
@@ -23,7 +22,7 @@ func TestRing(t *testing.T) {
var reader RingReader
err := reader.StartRead(w.Ring)
if err != nil {
t.Error(err)
t.Error(err)
return
}
for ctx.Err() == nil {
@@ -41,9 +40,9 @@ func TestRing(t *testing.T) {
// slow reader
t.Run("reader2", func(t *testing.T) {
var reader RingReader
err := reader.StartRead(w.Ring)
err := reader.StartRead(w.Ring)
if err != nil {
t.Error(err)
t.Error(err)
return
}
for ctx.Err() == nil {
@@ -61,8 +60,7 @@ func TestRing(t *testing.T) {
}
func BenchmarkRing(b *testing.B) {
var w RingWriter
w.Init(10)
w := NewRingWriter(10)
ctx, _ := context.WithTimeout(context.Background(), time.Second*5)
go func() {
for i := 0; ctx.Err() == nil; i++ {

View File

@@ -29,10 +29,10 @@ type (
AVTrack struct {
Track
RingWriter
IDRingList `json:"-" yaml:"-"` //最近的关键帧位置,首屏渲染
*RingWriter
ICodecCtx
SequenceFrame IAVFrame
WrapIndex int
}
)
@@ -46,8 +46,11 @@ func NewAVTrack(args ...any) (t *AVTrack) {
t.FrameType = v
case *slog.Logger:
t.Logger = v
case *AVTrack:
t.Logger = v.Logger.With("subtrack", t.FrameType.String())
t.RingWriter = v.RingWriter
case int:
t.Init(v)
t.RingWriter = NewRingWriter(v)
}
}
t.Ready = util.NewPromise(struct{}{})

71
pkg/util/sse.go Normal file
View File

@@ -0,0 +1,71 @@
package util
import (
"context"
"encoding/json"
"net"
"net/http"
"os/exec"
"gopkg.in/yaml.v3"
)
var (
sseEent = []byte("event: ")
sseBegin = []byte("data: ")
sseEnd = []byte("\n\n")
)
type SSE struct {
http.ResponseWriter
context.Context
}
func (sse *SSE) Write(data []byte) (n int, err error) {
if err = sse.Err(); err != nil {
return
}
buffers := net.Buffers{sseBegin, data, sseEnd}
nn, err := buffers.WriteTo(sse.ResponseWriter)
if err == nil {
sse.ResponseWriter.(http.Flusher).Flush()
}
return int(nn), err
}
func (sse *SSE) WriteEvent(event string, data []byte) (err error) {
if err = sse.Err(); err != nil {
return
}
buffers := net.Buffers{sseEent, []byte(event + "\n"), sseBegin, data, sseEnd}
_, err = buffers.WriteTo(sse.ResponseWriter)
if err == nil {
sse.ResponseWriter.(http.Flusher).Flush()
}
return
}
func NewSSE(w http.ResponseWriter, ctx context.Context) *SSE {
header := w.Header()
header.Set("Content-Type", "text/event-stream")
header.Set("Cache-Control", "no-cache")
header.Set("Connection", "keep-alive")
header.Set("X-Accel-Buffering", "no")
header.Set("Access-Control-Allow-Origin", "*")
return &SSE{
ResponseWriter: w,
Context: ctx,
}
}
func (sse *SSE) WriteJSON(data any) error {
return json.NewEncoder(sse).Encode(data)
}
func (sse *SSE) WriteYAML(data any) error {
return yaml.NewEncoder(sse).Encode(data)
}
func (sse *SSE) WriteExec(cmd *exec.Cmd) error {
cmd.Stderr = sse
cmd.Stdout = sse
return cmd.Run()
}

View File

@@ -357,3 +357,7 @@ func (p *Plugin) handle(pattern string, handler http.Handler) {
}
p.server.apiList = append(p.server.apiList, pattern)
}
func (p *Plugin) PostToServer(event any) {
p.server.PostMessage(event)
}

59
plugin/logrotate/api.go Normal file
View File

@@ -0,0 +1,59 @@
package plugin_logrotate
import (
"context"
"io"
"net/http"
"os"
"path/filepath"
"github.com/phsym/console-slog"
"google.golang.org/protobuf/types/known/emptypb"
"m7s.live/m7s/v5/pkg/util"
"m7s.live/m7s/v5/plugin/logrotate/pb"
)
func (h *LogRotatePlugin) List(context.Context, *emptypb.Empty) (*pb.ResponseFileInfo, error) {
dir, err := os.Open(h.Path)
if err == nil {
var files []os.FileInfo
if files, err = dir.Readdir(0); err == nil {
var fileInfos []*pb.FileInfo
for _, info := range files {
fileInfos = append(fileInfos, &pb.FileInfo{
Name: info.Name(), Size: info.Size(),
})
}
return &pb.ResponseFileInfo{Files: fileInfos}, nil
}
}
return nil, err
}
func (h *LogRotatePlugin) Get(_ context.Context, req *pb.RequestFileInfo) (res *pb.ResponseOpen, err error) {
file, err1 := os.Open(filepath.Join(h.Path, req.FileName))
if err1 == nil {
defer file.Close()
res = &pb.ResponseOpen{}
content, err2 := io.ReadAll(file)
if err2 == nil {
res.Content = string(content)
} else {
err = err2
}
} else {
err = err1
}
return
}
func (h *LogRotatePlugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
func (l *LogRotatePlugin) API_tail(w http.ResponseWriter, r *http.Request) {
writer := util.NewSSE(w, r.Context())
h := console.NewHandler(writer, &console.HandlerOptions{NoColor: true})
l.PostToServer(h)
<-r.Context().Done()
}

43
plugin/logrotate/index.go Normal file
View File

@@ -0,0 +1,43 @@
package plugin_logrotate
import (
"io"
"log/slog"
"github.com/alchemy/rotoslog"
"github.com/phsym/console-slog"
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/plugin/logrotate/pb"
)
type LogRotatePlugin struct {
pb.UnimplementedLogrotateServer
m7s.Plugin
Path string `default:"./logs" desc:"日志文件存放目录"`
Size uint64 `default:"1048576" desc:"日志文件大小,单位:字节"`
Days int `default:"1" desc:"日志文件保留天数"`
Formatter string `default:"2006-01-02T15" desc:"日志文件名格式"`
MaxFiles uint64 `default:"7" desc:"最大日志文件数量"`
Level string `default:"info" desc:"日志级别"`
handler slog.Handler
}
var _ = m7s.InstallPlugin[LogRotatePlugin](&pb.Logrotate_ServiceDesc, pb.RegisterLogrotateHandler)
func (config *LogRotatePlugin) OnInit() (err error) {
var lv slog.LevelVar
lv.UnmarshalText([]byte(config.Level))
if config.Level == "trace" {
lv.Set(pkg.TraceLevel)
}
builder := func(w io.Writer, opts *slog.HandlerOptions) slog.Handler {
return console.NewHandler(w, &console.HandlerOptions{NoColor: true, Level: lv.Level()})
}
config.handler, err = rotoslog.NewHandler(rotoslog.LogHandlerBuilder(builder), rotoslog.LogDir(config.Path), rotoslog.MaxFileSize(config.Size), rotoslog.DateTimeLayout(config.Formatter), rotoslog.MaxRotatedFiles(config.MaxFiles))
if err == nil {
config.PostToServer(config.handler)
}
return
}

View File

@@ -0,0 +1,365 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.28.1
// protoc v3.19.1
// source: logrotate.proto
package pb
import (
_ "google.golang.org/genproto/googleapis/api/annotations"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
emptypb "google.golang.org/protobuf/types/known/emptypb"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type ResponseOpen struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Content string `protobuf:"bytes,1,opt,name=content,proto3" json:"content,omitempty"`
}
func (x *ResponseOpen) Reset() {
*x = ResponseOpen{}
if protoimpl.UnsafeEnabled {
mi := &file_logrotate_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *ResponseOpen) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ResponseOpen) ProtoMessage() {}
func (x *ResponseOpen) ProtoReflect() protoreflect.Message {
mi := &file_logrotate_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ResponseOpen.ProtoReflect.Descriptor instead.
func (*ResponseOpen) Descriptor() ([]byte, []int) {
return file_logrotate_proto_rawDescGZIP(), []int{0}
}
func (x *ResponseOpen) GetContent() string {
if x != nil {
return x.Content
}
return ""
}
type RequestFileInfo struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
FileName string `protobuf:"bytes,1,opt,name=fileName,proto3" json:"fileName,omitempty"`
}
func (x *RequestFileInfo) Reset() {
*x = RequestFileInfo{}
if protoimpl.UnsafeEnabled {
mi := &file_logrotate_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *RequestFileInfo) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*RequestFileInfo) ProtoMessage() {}
func (x *RequestFileInfo) ProtoReflect() protoreflect.Message {
mi := &file_logrotate_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use RequestFileInfo.ProtoReflect.Descriptor instead.
func (*RequestFileInfo) Descriptor() ([]byte, []int) {
return file_logrotate_proto_rawDescGZIP(), []int{1}
}
func (x *RequestFileInfo) GetFileName() string {
if x != nil {
return x.FileName
}
return ""
}
type ResponseFileInfo struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Files []*FileInfo `protobuf:"bytes,1,rep,name=files,proto3" json:"files,omitempty"`
}
func (x *ResponseFileInfo) Reset() {
*x = ResponseFileInfo{}
if protoimpl.UnsafeEnabled {
mi := &file_logrotate_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *ResponseFileInfo) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ResponseFileInfo) ProtoMessage() {}
func (x *ResponseFileInfo) ProtoReflect() protoreflect.Message {
mi := &file_logrotate_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ResponseFileInfo.ProtoReflect.Descriptor instead.
func (*ResponseFileInfo) Descriptor() ([]byte, []int) {
return file_logrotate_proto_rawDescGZIP(), []int{2}
}
func (x *ResponseFileInfo) GetFiles() []*FileInfo {
if x != nil {
return x.Files
}
return nil
}
type FileInfo struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
Size int64 `protobuf:"varint,2,opt,name=size,proto3" json:"size,omitempty"`
}
func (x *FileInfo) Reset() {
*x = FileInfo{}
if protoimpl.UnsafeEnabled {
mi := &file_logrotate_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *FileInfo) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*FileInfo) ProtoMessage() {}
func (x *FileInfo) ProtoReflect() protoreflect.Message {
mi := &file_logrotate_proto_msgTypes[3]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use FileInfo.ProtoReflect.Descriptor instead.
func (*FileInfo) Descriptor() ([]byte, []int) {
return file_logrotate_proto_rawDescGZIP(), []int{3}
}
func (x *FileInfo) GetName() string {
if x != nil {
return x.Name
}
return ""
}
func (x *FileInfo) GetSize() int64 {
if x != nil {
return x.Size
}
return 0
}
var File_logrotate_proto protoreflect.FileDescriptor
var file_logrotate_proto_rawDesc = []byte{
0x0a, 0x0f, 0x6c, 0x6f, 0x67, 0x72, 0x6f, 0x74, 0x61, 0x74, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x12, 0x03, 0x6d, 0x37, 0x73, 0x1a, 0x1c, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x61,
0x70, 0x69, 0x2f, 0x61, 0x6e, 0x6e, 0x6f, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x22, 0x28, 0x0a, 0x0c, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x4f, 0x70, 0x65,
0x6e, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01,
0x28, 0x09, 0x52, 0x07, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x22, 0x2d, 0x0a, 0x0f, 0x52,
0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x46, 0x69, 0x6c, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x1a,
0x0a, 0x08, 0x66, 0x69, 0x6c, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09,
0x52, 0x08, 0x66, 0x69, 0x6c, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x22, 0x37, 0x0a, 0x10, 0x52, 0x65,
0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x46, 0x69, 0x6c, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x23,
0x0a, 0x05, 0x66, 0x69, 0x6c, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0d, 0x2e,
0x6d, 0x37, 0x73, 0x2e, 0x46, 0x69, 0x6c, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x05, 0x66, 0x69,
0x6c, 0x65, 0x73, 0x22, 0x32, 0x0a, 0x08, 0x46, 0x69, 0x6c, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x12,
0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e,
0x61, 0x6d, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x73, 0x69, 0x7a, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28,
0x03, 0x52, 0x04, 0x73, 0x69, 0x7a, 0x65, 0x32, 0xb9, 0x01, 0x0a, 0x09, 0x6c, 0x6f, 0x67, 0x72,
0x6f, 0x74, 0x61, 0x74, 0x65, 0x12, 0x52, 0x0a, 0x04, 0x4c, 0x69, 0x73, 0x74, 0x12, 0x16, 0x2e,
0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e,
0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x15, 0x2e, 0x6d, 0x37, 0x73, 0x2e, 0x52, 0x65, 0x73, 0x70,
0x6f, 0x6e, 0x73, 0x65, 0x46, 0x69, 0x6c, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x22, 0x1b, 0x82, 0xd3,
0xe4, 0x93, 0x02, 0x15, 0x12, 0x13, 0x2f, 0x6c, 0x6f, 0x67, 0x72, 0x6f, 0x74, 0x61, 0x74, 0x65,
0x2f, 0x61, 0x70, 0x69, 0x2f, 0x6c, 0x69, 0x73, 0x74, 0x12, 0x58, 0x0a, 0x03, 0x47, 0x65, 0x74,
0x12, 0x14, 0x2e, 0x6d, 0x37, 0x73, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x46, 0x69,
0x6c, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x1a, 0x11, 0x2e, 0x6d, 0x37, 0x73, 0x2e, 0x52, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x4f, 0x70, 0x65, 0x6e, 0x22, 0x28, 0x82, 0xd3, 0xe4, 0x93, 0x02,
0x22, 0x12, 0x20, 0x2f, 0x6c, 0x6f, 0x67, 0x72, 0x6f, 0x74, 0x61, 0x74, 0x65, 0x2f, 0x61, 0x70,
0x69, 0x2f, 0x67, 0x65, 0x74, 0x2f, 0x7b, 0x66, 0x69, 0x6c, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x3d,
0x2a, 0x2a, 0x7d, 0x42, 0x25, 0x5a, 0x23, 0x6d, 0x37, 0x73, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x2f,
0x6d, 0x37, 0x73, 0x2f, 0x76, 0x35, 0x2f, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2f, 0x6c, 0x6f,
0x67, 0x72, 0x6f, 0x74, 0x61, 0x74, 0x65, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x33,
}
var (
file_logrotate_proto_rawDescOnce sync.Once
file_logrotate_proto_rawDescData = file_logrotate_proto_rawDesc
)
func file_logrotate_proto_rawDescGZIP() []byte {
file_logrotate_proto_rawDescOnce.Do(func() {
file_logrotate_proto_rawDescData = protoimpl.X.CompressGZIP(file_logrotate_proto_rawDescData)
})
return file_logrotate_proto_rawDescData
}
var file_logrotate_proto_msgTypes = make([]protoimpl.MessageInfo, 4)
var file_logrotate_proto_goTypes = []interface{}{
(*ResponseOpen)(nil), // 0: m7s.ResponseOpen
(*RequestFileInfo)(nil), // 1: m7s.RequestFileInfo
(*ResponseFileInfo)(nil), // 2: m7s.ResponseFileInfo
(*FileInfo)(nil), // 3: m7s.FileInfo
(*emptypb.Empty)(nil), // 4: google.protobuf.Empty
}
var file_logrotate_proto_depIdxs = []int32{
3, // 0: m7s.ResponseFileInfo.files:type_name -> m7s.FileInfo
4, // 1: m7s.logrotate.List:input_type -> google.protobuf.Empty
1, // 2: m7s.logrotate.Get:input_type -> m7s.RequestFileInfo
2, // 3: m7s.logrotate.List:output_type -> m7s.ResponseFileInfo
0, // 4: m7s.logrotate.Get:output_type -> m7s.ResponseOpen
3, // [3:5] is the sub-list for method output_type
1, // [1:3] is the sub-list for method input_type
1, // [1:1] is the sub-list for extension type_name
1, // [1:1] is the sub-list for extension extendee
0, // [0:1] is the sub-list for field type_name
}
func init() { file_logrotate_proto_init() }
func file_logrotate_proto_init() {
if File_logrotate_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_logrotate_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ResponseOpen); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_logrotate_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*RequestFileInfo); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_logrotate_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ResponseFileInfo); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_logrotate_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*FileInfo); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_logrotate_proto_rawDesc,
NumEnums: 0,
NumMessages: 4,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_logrotate_proto_goTypes,
DependencyIndexes: file_logrotate_proto_depIdxs,
MessageInfos: file_logrotate_proto_msgTypes,
}.Build()
File_logrotate_proto = out.File
file_logrotate_proto_rawDesc = nil
file_logrotate_proto_goTypes = nil
file_logrotate_proto_depIdxs = nil
}

View File

@@ -0,0 +1,259 @@
// Code generated by protoc-gen-grpc-gateway. DO NOT EDIT.
// source: logrotate.proto
/*
Package pb is a reverse proxy.
It translates gRPC into RESTful JSON APIs.
*/
package pb
import (
"context"
"io"
"net/http"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/grpc-ecosystem/grpc-gateway/v2/utilities"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/emptypb"
)
// Suppress "imported and not used" errors
var _ codes.Code
var _ io.Reader
var _ status.Status
var _ = runtime.String
var _ = utilities.NewDoubleArray
var _ = metadata.Join
func request_Logrotate_List_0(ctx context.Context, marshaler runtime.Marshaler, client LogrotateClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq emptypb.Empty
var metadata runtime.ServerMetadata
msg, err := client.List(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func local_request_Logrotate_List_0(ctx context.Context, marshaler runtime.Marshaler, server LogrotateServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq emptypb.Empty
var metadata runtime.ServerMetadata
msg, err := server.List(ctx, &protoReq)
return msg, metadata, err
}
func request_Logrotate_Get_0(ctx context.Context, marshaler runtime.Marshaler, client LogrotateClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq RequestFileInfo
var metadata runtime.ServerMetadata
var (
val string
ok bool
err error
_ = err
)
val, ok = pathParams["fileName"]
if !ok {
return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "fileName")
}
protoReq.FileName, err = runtime.String(val)
if err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "fileName", err)
}
msg, err := client.Get(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func local_request_Logrotate_Get_0(ctx context.Context, marshaler runtime.Marshaler, server LogrotateServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq RequestFileInfo
var metadata runtime.ServerMetadata
var (
val string
ok bool
err error
_ = err
)
val, ok = pathParams["fileName"]
if !ok {
return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "fileName")
}
protoReq.FileName, err = runtime.String(val)
if err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "fileName", err)
}
msg, err := server.Get(ctx, &protoReq)
return msg, metadata, err
}
// RegisterLogrotateHandlerServer registers the http handlers for service Logrotate to "mux".
// UnaryRPC :call LogrotateServer directly.
// StreamingRPC :currently unsupported pending https://github.com/grpc/grpc-go/issues/906.
// Note that using this registration option will cause many gRPC library features to stop working. Consider using RegisterLogrotateHandlerFromEndpoint instead.
func RegisterLogrotateHandlerServer(ctx context.Context, mux *runtime.ServeMux, server LogrotateServer) error {
mux.Handle("GET", pattern_Logrotate_List_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
var stream runtime.ServerTransportStream
ctx = grpc.NewContextWithServerTransportStream(ctx, &stream)
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/m7s.Logrotate/List", runtime.WithHTTPPathPattern("/logrotate/api/list"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := local_request_Logrotate_List_0(annotatedContext, inboundMarshaler, server, req, pathParams)
md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer())
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Logrotate_List_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("GET", pattern_Logrotate_Get_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
var stream runtime.ServerTransportStream
ctx = grpc.NewContextWithServerTransportStream(ctx, &stream)
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/m7s.Logrotate/Get", runtime.WithHTTPPathPattern("/logrotate/api/get/{fileName=**}"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := local_request_Logrotate_Get_0(annotatedContext, inboundMarshaler, server, req, pathParams)
md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer())
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Logrotate_Get_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
return nil
}
// RegisterLogrotateHandlerFromEndpoint is same as RegisterLogrotateHandler but
// automatically dials to "endpoint" and closes the connection when "ctx" gets done.
func RegisterLogrotateHandlerFromEndpoint(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) (err error) {
conn, err := grpc.DialContext(ctx, endpoint, opts...)
if err != nil {
return err
}
defer func() {
if err != nil {
if cerr := conn.Close(); cerr != nil {
grpclog.Infof("Failed to close conn to %s: %v", endpoint, cerr)
}
return
}
go func() {
<-ctx.Done()
if cerr := conn.Close(); cerr != nil {
grpclog.Infof("Failed to close conn to %s: %v", endpoint, cerr)
}
}()
}()
return RegisterLogrotateHandler(ctx, mux, conn)
}
// RegisterLogrotateHandler registers the http handlers for service Logrotate to "mux".
// The handlers forward requests to the grpc endpoint over "conn".
func RegisterLogrotateHandler(ctx context.Context, mux *runtime.ServeMux, conn *grpc.ClientConn) error {
return RegisterLogrotateHandlerClient(ctx, mux, NewLogrotateClient(conn))
}
// RegisterLogrotateHandlerClient registers the http handlers for service Logrotate
// to "mux". The handlers forward requests to the grpc endpoint over the given implementation of "LogrotateClient".
// Note: the gRPC framework executes interceptors within the gRPC handler. If the passed in "LogrotateClient"
// doesn't go through the normal gRPC flow (creating a gRPC client etc.) then it will be up to the passed in
// "LogrotateClient" to call the correct interceptors.
func RegisterLogrotateHandlerClient(ctx context.Context, mux *runtime.ServeMux, client LogrotateClient) error {
mux.Handle("GET", pattern_Logrotate_List_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/m7s.Logrotate/List", runtime.WithHTTPPathPattern("/logrotate/api/list"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_Logrotate_List_0(annotatedContext, inboundMarshaler, client, req, pathParams)
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Logrotate_List_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("GET", pattern_Logrotate_Get_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/m7s.Logrotate/Get", runtime.WithHTTPPathPattern("/logrotate/api/get/{fileName=**}"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_Logrotate_Get_0(annotatedContext, inboundMarshaler, client, req, pathParams)
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Logrotate_Get_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
return nil
}
var (
pattern_Logrotate_List_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"logrotate", "api", "list"}, ""))
pattern_Logrotate_Get_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 3, 0, 4, 1, 5, 3}, []string{"logrotate", "api", "get", "fileName"}, ""))
)
var (
forward_Logrotate_List_0 = runtime.ForwardResponseMessage
forward_Logrotate_Get_0 = runtime.ForwardResponseMessage
)

View File

@@ -0,0 +1,35 @@
syntax = "proto3";
import "google/api/annotations.proto";
import "google/protobuf/empty.proto";
package m7s;
option go_package="m7s.live/m7s/v5/plugin/logrotate/pb";
service logrotate {
rpc List (google.protobuf.Empty) returns (ResponseFileInfo) {
option (google.api.http) = {
get: "/logrotate/api/list"
};
}
rpc Get (RequestFileInfo) returns (ResponseOpen) {
option (google.api.http) = {
get: "/logrotate/api/get/{fileName=**}"
};
}
}
message ResponseOpen {
string content = 1;
}
message RequestFileInfo {
string fileName = 1;
}
message ResponseFileInfo {
repeated FileInfo files = 1;
}
message FileInfo {
string name = 1;
int64 size = 2;
}

View File

@@ -0,0 +1,142 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v3.19.1
// source: logrotate.proto
package pb
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
emptypb "google.golang.org/protobuf/types/known/emptypb"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// LogrotateClient is the client API for Logrotate service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type LogrotateClient interface {
List(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ResponseFileInfo, error)
Get(ctx context.Context, in *RequestFileInfo, opts ...grpc.CallOption) (*ResponseOpen, error)
}
type logrotateClient struct {
cc grpc.ClientConnInterface
}
func NewLogrotateClient(cc grpc.ClientConnInterface) LogrotateClient {
return &logrotateClient{cc}
}
func (c *logrotateClient) List(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ResponseFileInfo, error) {
out := new(ResponseFileInfo)
err := c.cc.Invoke(ctx, "/m7s.logrotate/List", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *logrotateClient) Get(ctx context.Context, in *RequestFileInfo, opts ...grpc.CallOption) (*ResponseOpen, error) {
out := new(ResponseOpen)
err := c.cc.Invoke(ctx, "/m7s.logrotate/Get", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// LogrotateServer is the server API for Logrotate service.
// All implementations must embed UnimplementedLogrotateServer
// for forward compatibility
type LogrotateServer interface {
List(context.Context, *emptypb.Empty) (*ResponseFileInfo, error)
Get(context.Context, *RequestFileInfo) (*ResponseOpen, error)
mustEmbedUnimplementedLogrotateServer()
}
// UnimplementedLogrotateServer must be embedded to have forward compatible implementations.
type UnimplementedLogrotateServer struct {
}
func (UnimplementedLogrotateServer) List(context.Context, *emptypb.Empty) (*ResponseFileInfo, error) {
return nil, status.Errorf(codes.Unimplemented, "method List not implemented")
}
func (UnimplementedLogrotateServer) Get(context.Context, *RequestFileInfo) (*ResponseOpen, error) {
return nil, status.Errorf(codes.Unimplemented, "method Get not implemented")
}
func (UnimplementedLogrotateServer) mustEmbedUnimplementedLogrotateServer() {}
// UnsafeLogrotateServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to LogrotateServer will
// result in compilation errors.
type UnsafeLogrotateServer interface {
mustEmbedUnimplementedLogrotateServer()
}
func RegisterLogrotateServer(s grpc.ServiceRegistrar, srv LogrotateServer) {
s.RegisterService(&Logrotate_ServiceDesc, srv)
}
func _Logrotate_List_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(emptypb.Empty)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(LogrotateServer).List(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/m7s.logrotate/List",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(LogrotateServer).List(ctx, req.(*emptypb.Empty))
}
return interceptor(ctx, in, info, handler)
}
func _Logrotate_Get_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(RequestFileInfo)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(LogrotateServer).Get(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/m7s.logrotate/Get",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(LogrotateServer).Get(ctx, req.(*RequestFileInfo))
}
return interceptor(ctx, in, info, handler)
}
// Logrotate_ServiceDesc is the grpc.ServiceDesc for Logrotate service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var Logrotate_ServiceDesc = grpc.ServiceDesc{
ServiceName: "m7s.logrotate",
HandlerType: (*LogrotateServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "List",
Handler: _Logrotate_List_Handler,
},
{
MethodName: "Get",
Handler: _Logrotate_Get_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "logrotate.proto",
}

View File

@@ -2,7 +2,7 @@
// versions:
// protoc-gen-go v1.28.1
// protoc v3.19.1
// source: index.proto
// source: rtmp.proto
package pb
@@ -34,7 +34,7 @@ type PushRequest struct {
func (x *PushRequest) Reset() {
*x = PushRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_index_proto_msgTypes[0]
mi := &file_rtmp_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -47,7 +47,7 @@ func (x *PushRequest) String() string {
func (*PushRequest) ProtoMessage() {}
func (x *PushRequest) ProtoReflect() protoreflect.Message {
mi := &file_index_proto_msgTypes[0]
mi := &file_rtmp_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -60,7 +60,7 @@ func (x *PushRequest) ProtoReflect() protoreflect.Message {
// Deprecated: Use PushRequest.ProtoReflect.Descriptor instead.
func (*PushRequest) Descriptor() ([]byte, []int) {
return file_index_proto_rawDescGZIP(), []int{0}
return file_rtmp_proto_rawDescGZIP(), []int{0}
}
func (x *PushRequest) GetStreamPath() string {
@@ -86,7 +86,7 @@ type PushResponse struct {
func (x *PushResponse) Reset() {
*x = PushResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_index_proto_msgTypes[1]
mi := &file_rtmp_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -99,7 +99,7 @@ func (x *PushResponse) String() string {
func (*PushResponse) ProtoMessage() {}
func (x *PushResponse) ProtoReflect() protoreflect.Message {
mi := &file_index_proto_msgTypes[1]
mi := &file_rtmp_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -112,52 +112,52 @@ func (x *PushResponse) ProtoReflect() protoreflect.Message {
// Deprecated: Use PushResponse.ProtoReflect.Descriptor instead.
func (*PushResponse) Descriptor() ([]byte, []int) {
return file_index_proto_rawDescGZIP(), []int{1}
return file_rtmp_proto_rawDescGZIP(), []int{1}
}
var File_index_proto protoreflect.FileDescriptor
var File_rtmp_proto protoreflect.FileDescriptor
var file_index_proto_rawDesc = []byte{
0x0a, 0x0b, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x03, 0x6d,
0x37, 0x73, 0x1a, 0x1c, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x61,
0x6e, 0x6e, 0x6f, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75,
0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x4b, 0x0a,
0x0b, 0x50, 0x75, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1e, 0x0a, 0x0a,
0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09,
0x52, 0x0a, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, 0x61, 0x74, 0x68, 0x12, 0x1c, 0x0a, 0x09,
0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x55, 0x52, 0x4c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52,
0x09, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x55, 0x52, 0x4c, 0x22, 0x0e, 0x0a, 0x0c, 0x50, 0x75,
0x73, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0x69, 0x0a, 0x04, 0x72, 0x74,
0x6d, 0x70, 0x12, 0x61, 0x0a, 0x07, 0x50, 0x75, 0x73, 0x68, 0x4f, 0x75, 0x74, 0x12, 0x10, 0x2e,
0x6d, 0x37, 0x73, 0x2e, 0x50, 0x75, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a,
0x11, 0x2e, 0x6d, 0x37, 0x73, 0x2e, 0x50, 0x75, 0x73, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e,
0x73, 0x65, 0x22, 0x31, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x2b, 0x22, 0x1e, 0x2f, 0x72, 0x74, 0x6d,
0x70, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x70, 0x75, 0x73, 0x68, 0x2f, 0x7b, 0x73, 0x74, 0x72, 0x65,
0x61, 0x6d, 0x50, 0x61, 0x74, 0x68, 0x3d, 0x2a, 0x2a, 0x7d, 0x3a, 0x09, 0x72, 0x65, 0x6d, 0x6f,
0x74, 0x65, 0x55, 0x52, 0x4c, 0x42, 0x20, 0x5a, 0x1e, 0x6d, 0x37, 0x73, 0x2e, 0x6c, 0x69, 0x76,
0x65, 0x2f, 0x6d, 0x37, 0x73, 0x2f, 0x76, 0x35, 0x2f, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2f,
0x72, 0x74, 0x6d, 0x70, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
var file_rtmp_proto_rawDesc = []byte{
0x0a, 0x0a, 0x72, 0x74, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x03, 0x6d, 0x37,
0x73, 0x1a, 0x1c, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x61, 0x6e,
0x6e, 0x6f, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a,
0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66,
0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x4b, 0x0a, 0x0b,
0x50, 0x75, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1e, 0x0a, 0x0a, 0x73,
0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52,
0x0a, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, 0x61, 0x74, 0x68, 0x12, 0x1c, 0x0a, 0x09, 0x72,
0x65, 0x6d, 0x6f, 0x74, 0x65, 0x55, 0x52, 0x4c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09,
0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x55, 0x52, 0x4c, 0x22, 0x0e, 0x0a, 0x0c, 0x50, 0x75, 0x73,
0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0x69, 0x0a, 0x04, 0x72, 0x74, 0x6d,
0x70, 0x12, 0x61, 0x0a, 0x07, 0x50, 0x75, 0x73, 0x68, 0x4f, 0x75, 0x74, 0x12, 0x10, 0x2e, 0x6d,
0x37, 0x73, 0x2e, 0x50, 0x75, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x11,
0x2e, 0x6d, 0x37, 0x73, 0x2e, 0x50, 0x75, 0x73, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73,
0x65, 0x22, 0x31, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x2b, 0x22, 0x1e, 0x2f, 0x72, 0x74, 0x6d, 0x70,
0x2f, 0x61, 0x70, 0x69, 0x2f, 0x70, 0x75, 0x73, 0x68, 0x2f, 0x7b, 0x73, 0x74, 0x72, 0x65, 0x61,
0x6d, 0x50, 0x61, 0x74, 0x68, 0x3d, 0x2a, 0x2a, 0x7d, 0x3a, 0x09, 0x72, 0x65, 0x6d, 0x6f, 0x74,
0x65, 0x55, 0x52, 0x4c, 0x42, 0x20, 0x5a, 0x1e, 0x6d, 0x37, 0x73, 0x2e, 0x6c, 0x69, 0x76, 0x65,
0x2f, 0x6d, 0x37, 0x73, 0x2f, 0x76, 0x35, 0x2f, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2f, 0x72,
0x74, 0x6d, 0x70, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_index_proto_rawDescOnce sync.Once
file_index_proto_rawDescData = file_index_proto_rawDesc
file_rtmp_proto_rawDescOnce sync.Once
file_rtmp_proto_rawDescData = file_rtmp_proto_rawDesc
)
func file_index_proto_rawDescGZIP() []byte {
file_index_proto_rawDescOnce.Do(func() {
file_index_proto_rawDescData = protoimpl.X.CompressGZIP(file_index_proto_rawDescData)
func file_rtmp_proto_rawDescGZIP() []byte {
file_rtmp_proto_rawDescOnce.Do(func() {
file_rtmp_proto_rawDescData = protoimpl.X.CompressGZIP(file_rtmp_proto_rawDescData)
})
return file_index_proto_rawDescData
return file_rtmp_proto_rawDescData
}
var file_index_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_index_proto_goTypes = []interface{}{
var file_rtmp_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_rtmp_proto_goTypes = []interface{}{
(*PushRequest)(nil), // 0: m7s.PushRequest
(*PushResponse)(nil), // 1: m7s.PushResponse
}
var file_index_proto_depIdxs = []int32{
var file_rtmp_proto_depIdxs = []int32{
0, // 0: m7s.rtmp.PushOut:input_type -> m7s.PushRequest
1, // 1: m7s.rtmp.PushOut:output_type -> m7s.PushResponse
1, // [1:2] is the sub-list for method output_type
@@ -167,13 +167,13 @@ var file_index_proto_depIdxs = []int32{
0, // [0:0] is the sub-list for field type_name
}
func init() { file_index_proto_init() }
func file_index_proto_init() {
if File_index_proto != nil {
func init() { file_rtmp_proto_init() }
func file_rtmp_proto_init() {
if File_rtmp_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_index_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
file_rtmp_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*PushRequest); i {
case 0:
return &v.state
@@ -185,7 +185,7 @@ func file_index_proto_init() {
return nil
}
}
file_index_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
file_rtmp_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*PushResponse); i {
case 0:
return &v.state
@@ -202,18 +202,18 @@ func file_index_proto_init() {
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_index_proto_rawDesc,
RawDescriptor: file_rtmp_proto_rawDesc,
NumEnums: 0,
NumMessages: 2,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_index_proto_goTypes,
DependencyIndexes: file_index_proto_depIdxs,
MessageInfos: file_index_proto_msgTypes,
GoTypes: file_rtmp_proto_goTypes,
DependencyIndexes: file_rtmp_proto_depIdxs,
MessageInfos: file_rtmp_proto_msgTypes,
}.Build()
File_index_proto = out.File
file_index_proto_rawDesc = nil
file_index_proto_goTypes = nil
file_index_proto_depIdxs = nil
File_rtmp_proto = out.File
file_rtmp_proto_rawDesc = nil
file_rtmp_proto_goTypes = nil
file_rtmp_proto_depIdxs = nil
}

View File

@@ -1,5 +1,5 @@
// Code generated by protoc-gen-grpc-gateway. DO NOT EDIT.
// source: index.proto
// source: rtmp.proto
/*
Package pb is a reverse proxy.

View File

@@ -2,7 +2,7 @@
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v3.19.1
// source: index.proto
// source: rtmp.proto
package pb
@@ -101,5 +101,5 @@ var Rtmp_ServiceDesc = grpc.ServiceDesc{
},
},
Streams: []grpc.StreamDesc{},
Metadata: "index.proto",
Metadata: "rtmp.proto",
}

View File

@@ -212,7 +212,7 @@ func (avcc *RTMPVideo) ToRaw(codecCtx ICodecCtx) (any, error) {
func (h264 *H264Ctx) CreateFrame(from *AVFrame) (frame IAVFrame, err error) {
var rtmpVideo RTMPVideo
rtmpVideo.RecyclableBuffers = &util.RecyclableBuffers{}
rtmpVideo.ScalableMemoryAllocator = from.Wrap.GetScalableMemoryAllocator()
rtmpVideo.ScalableMemoryAllocator = from.Wraps[0].GetScalableMemoryAllocator()
nalus := from.Raw.(Nalus)
head := rtmpVideo.Malloc(5)
head[0] = util.Conditoinal[byte](from.IDR, 0x10, 0x20) | byte(ParseVideoCodec(h264.FourCC()))

View File

@@ -82,7 +82,7 @@ func (r *RTPVideo) Parse(t *AVTrack) (isIDR, isSeq bool, raw any, err error) {
func (h264 *RTPH264Ctx) CreateFrame(from *AVFrame) (frame IAVFrame, err error) {
var r RTPVideo
r.ScalableMemoryAllocator = from.Wrap.GetScalableMemoryAllocator()
r.ScalableMemoryAllocator = from.Wraps[0].GetScalableMemoryAllocator()
nalus := from.Raw.(Nalus)
nalutype := nalus.H264Type()
var lastPacket *rtp.Packet
@@ -162,14 +162,14 @@ func (r *RTPVideo) ToRaw(ictx ICodecCtx) (any, error) {
switch t {
case codec.NALU_STAPA, codec.NALU_STAPB:
if len(packet.Payload) <= offset {
return nil, errors.New("invalid nalu size")
return nil, fmt.Errorf("invalid nalu size %d", len(packet.Payload))
}
for buffer := util.Buffer(packet.Payload[offset:]); buffer.CanRead(); {
if nextSize := int(buffer.ReadUint16()); buffer.Len() >= nextSize {
nalu = [][]byte{buffer.ReadN(nextSize)}
gotNalu(codec.ParseH264NALUType(nalu[0][0]))
} else {
return nil, errors.New("invalid nalu size")
return nil, fmt.Errorf("invalid nalu size %d", nextSize)
}
}
case codec.NALU_FUA, codec.NALU_FUB:

View File

@@ -235,7 +235,9 @@ func (conf *WebRTCPlugin) Push_(w http.ResponseWriter, r *http.Request) {
if len(frame.Packets) == 0 || packet.Timestamp == frame.Packets[0].Timestamp {
frame.Packets = append(frame.Packets, &packet)
} else {
t := time.Now()
publisher.WriteVideo(frame)
fmt.Println("write video", time.Since(t))
frame = &mrtp.RTPVideo{}
frame.Packets = []*rtp.Packet{&packet}
frame.RTPCodecParameters = &codecP

View File

@@ -0,0 +1,23 @@
package plugin_webrtc
import (
"context"
"testing"
"time"
"github.com/chromedp/chromedp"
"m7s.live/m7s/v5"
)
func TestPublish(t *testing.T) {
ctx, cancel := chromedp.NewContext(context.Background())
go m7s.Run(ctx, "config.yaml")
defer cancel()
err := chromedp.Run(ctx,
chromedp.Navigate("http://localhost:8080/webrtc/test/publish"),
)
if err != nil {
t.Fatal(err)
}
<-time.After(10 * time.Second)
}

View File

@@ -34,8 +34,8 @@ func (s *SpeedControl) speedControl(speed float64, ts time.Duration) {
} else {
elapsed := time.Since(s.beginTime)
should := time.Duration(float64(ts) / speed)
if should > elapsed {
time.Sleep(should - elapsed)
if needSleep := should - elapsed; needSleep > time.Second {
time.Sleep(needSleep)
}
}
}
@@ -50,7 +50,8 @@ func (t *AVTracks) IsEmpty() bool {
}
func (t *AVTracks) CreateSubTrack(dataType reflect.Type) (track *AVTrack) {
track = NewAVTrack(dataType, t.Logger.With("subtrack", dataType.String()), t.AVTrack.Size)
track = NewAVTrack(dataType, t.AVTrack)
track.WrapIndex = len(t.Items)
t.Add(track)
return
}
@@ -144,7 +145,7 @@ func (p *Publisher) AddSubscriber(subscriber *Subscriber) (err error) {
func (p *Publisher) writeAV(t *AVTrack, data IAVFrame) {
frame := &t.Value
frame.Wrap = data
frame.Wraps = append(frame.Wraps, data)
ts := data.GetTimestamp()
if p.lastTs == 0 {
p.baseTs -= ts
@@ -153,7 +154,7 @@ func (p *Publisher) writeAV(t *AVTrack, data IAVFrame) {
p.lastTs = frame.Timestamp
if p.Enabled(p, TraceLevel) {
codec := t.FourCC().String()
size, data := frame.Wrap.GetSize(), frame.Wrap.String()
size, data := frame.Wraps[0].GetSize(), frame.Wraps[0].String()
p.Trace("write", "seq", frame.Sequence, "ts", frame.Timestamp, "codec", codec, "size", size, "data", data)
}
t.Step()
@@ -215,7 +216,7 @@ func (p *Publisher) WriteVideo(data IAVFrame) (err error) {
p.writeAV(t, data)
if p.VideoTrack.Length > 1 && !p.VideoTrack.AVTrack.Ready.Pendding() {
if t.LastValue.Raw == nil {
t.LastValue.Raw, err = t.LastValue.Wrap.ToRaw(t.ICodecCtx)
t.LastValue.Raw, err = t.LastValue.Wraps[0].ToRaw(t.ICodecCtx)
if err != nil {
t.Error("to raw", "err", err)
return err
@@ -224,15 +225,9 @@ func (p *Publisher) WriteVideo(data IAVFrame) (err error) {
for i, track := range p.VideoTrack.Items[1:] {
if track.ICodecCtx == nil {
err = (reflect.New(track.FrameType.Elem()).Interface().(IAVFrame)).DecodeConfig(track, t.ICodecCtx)
if p.BufferTime > 0 {
track.IDRingList.AddIDR(track.Ring)
track.HistoryRing.Store(track.Ring)
} else {
track.IDRing.Store(track.Ring)
}
for rf := idr; rf != t.Ring; rf = rf.Next() {
if i == 0 && rf.Value.Raw == nil {
rf.Value.Raw, err = rf.Value.Wrap.ToRaw(t.ICodecCtx)
rf.Value.Raw, err = rf.Value.Wraps[0].ToRaw(t.ICodecCtx)
if err != nil {
t.Error("to raw", "err", err)
return err
@@ -259,7 +254,7 @@ func (p *Publisher) writeSubAV(to *AVTrack, frame *AVFrame) (err error) {
to.Error("from raw", "err", err)
return
}
to.Value.Wrap = toFrame
to.Value.Wraps = append(to.Value.Wraps, toFrame)
to.Value.IDR = frame.IDR
to.Value.Timestamp = frame.Timestamp
if p.Enabled(p, TraceLevel) {
@@ -347,13 +342,13 @@ func (p *Publisher) SnapShot() (ret *pb.StreamSnapShot) {
snap.Sequence = v.Sequence
snap.Timestamp = uint32(v.Timestamp)
snap.WriteTime = uint64(v.WriteTime.UnixNano())
if v.Wrap != nil {
snap.Wrap = &pb.Wrap{
Timestamp: uint32(v.Wrap.GetTimestamp()),
Size: uint32(v.Wrap.GetSize()),
Data: v.Wrap.String(),
}
}
// if v.Wrap != nil {
// snap.Wrap = &pb.Wrap{
// Timestamp: uint32(v.Wrap.GetTimestamp()),
// Size: uint32(v.Wrap.GetSize()),
// Data: v.Wrap.String(),
// }
// }
ret.VideoTrack = append(ret.VideoTrack, &snap)
})
}
@@ -364,13 +359,13 @@ func (p *Publisher) SnapShot() (ret *pb.StreamSnapShot) {
snap.Sequence = v.Sequence
snap.Timestamp = uint32(v.Timestamp)
snap.WriteTime = uint64(v.WriteTime.UnixNano())
if v.Wrap != nil {
snap.Wrap = &pb.Wrap{
Timestamp: uint32(v.Wrap.GetTimestamp()),
Size: uint32(v.Wrap.GetSize()),
Data: v.Wrap.String(),
}
}
// if v.Wrap != nil {
// snap.Wrap = &pb.Wrap{
// Timestamp: uint32(v.Wrap.GetTimestamp()),
// Size: uint32(v.Wrap.GetSize()),
// Data: v.Wrap.String(),
// }
// }
ret.AudioTrack = append(ret.AudioTrack, &snap)
})
}

View File

@@ -53,6 +53,7 @@ type Server struct {
Pushs util.Collection[string, *Pusher]
Waiting map[string][]*Subscriber
Subscribers util.Collection[int, *Subscriber]
LogHandler MultiLogHandler
pidG int
sidG int
apiList []string
@@ -68,10 +69,11 @@ func NewServer() (s *Server) {
}
s.config.HTTP.ListenAddrTLS = ":8443"
s.config.HTTP.ListenAddr = ":8080"
s.Logger = slog.With("server", s.ID)
s.handler = s
s.server = s
s.Meta = &serverMeta
s.LogHandler.Add(console.NewHandler(os.Stdout, nil))
s.Logger = slog.New(&s.LogHandler).With("server", s.ID)
Servers[s.ID] = s
return
}
@@ -94,6 +96,9 @@ func (s *Server) reset() {
server.Meta = s.Meta
server.config.HTTP.ListenAddrTLS = ":8443"
server.config.HTTP.ListenAddr = ":8080"
server.LogHandler = MultiLogHandler{}
server.LogHandler.Add(console.NewHandler(os.Stdout, nil))
// server.Logger = slog.New(&server.LogHandler).With("server", s.ID)
*s = server
}
@@ -144,9 +149,7 @@ func (s *Server) run(ctx context.Context, conf any) (err error) {
if s.LogLevel == "trace" {
lv.Set(TraceLevel)
}
s.Logger = slog.New(
console.NewHandler(os.Stdout, &console.HandlerOptions{Level: lv.Level()}),
).With("server", s.ID)
s.LogHandler.SetLevel(lv.Level())
s.registerHandler()
if httpConf.ListenAddrTLS != "" {
@@ -332,6 +335,8 @@ func (s *Server) eventLoop() {
v.Resolve(&pb.StreamListResponse{List: streams})
continue
}
case slog.Handler:
s.LogHandler.Add(v)
}
for _, plugin := range s.Plugins {
if plugin.Disabled {
@@ -462,3 +467,7 @@ func (s *Server) Call(arg any) (result any, err error) {
}
return
}
func (s *Server) PostMessage(msg any) {
s.eventChan <- msg
}

View File

@@ -60,6 +60,7 @@ func (s *Subscriber) Handle(handler SubscriberHandler) {
var ar, vr *AVRingReader
var ah, vh reflect.Value
var a1, v1 reflect.Type
var awi, vwi int
var initState = 0
var subMode = s.SubMode //订阅模式
if s.Args.Has(s.SubModeArgName) {
@@ -77,6 +78,7 @@ func (s *Subscriber) Handle(handler SubscriberHandler) {
return
}
if at := s.Publisher.GetAudioTrack(a1); at != nil {
awi = at.WrapIndex
ar = NewAVRingReader(at)
ar.Logger = s.Logger.With("reader", a1.String())
ar.Info("start read")
@@ -88,6 +90,7 @@ func (s *Subscriber) Handle(handler SubscriberHandler) {
return
}
if vt := s.Publisher.GetVideoTrack(v1); vt != nil {
vwi = vt.WrapIndex
vr = NewAVRingReader(vt)
vr.Logger = s.Logger.With("reader", v1.String())
vr.Info("start read")
@@ -110,7 +113,7 @@ func (s *Subscriber) Handle(handler SubscriberHandler) {
if s.Enabled(s, TraceLevel) {
s.Trace("send audio frame", "seq", audioFrame.Sequence)
}
res := ah.Call([]reflect.Value{reflect.ValueOf(audioFrame.Wrap)})
res := ah.Call([]reflect.Value{reflect.ValueOf(audioFrame.Wraps[awi])})
if len(res) > 0 && !res[0].IsNil() {
if err := res[0].Interface().(error); err != ErrInterrupt {
s.Stop(err)
@@ -122,9 +125,9 @@ func (s *Subscriber) Handle(handler SubscriberHandler) {
}
sendVideoFrame := func() (err error) {
if s.Enabled(s, TraceLevel) {
s.Trace("send video frame", "seq", videoFrame.Sequence, "data", videoFrame.Wrap.String(), "size", videoFrame.Wrap.GetSize())
s.Trace("send video frame", "seq", videoFrame.Sequence, "data", videoFrame.Wraps[vwi].String(), "size", videoFrame.Wraps[vwi].GetSize())
}
res := vh.Call([]reflect.Value{reflect.ValueOf(videoFrame.Wrap)})
res := vh.Call([]reflect.Value{reflect.ValueOf(videoFrame.Wraps[vwi])})
if len(res) > 0 && !res[0].IsNil() {
if err = res[0].Interface().(error); err != ErrInterrupt {
s.Stop(err)