diff --git a/.gitignore b/.gitignore index 1c36090..0762a84 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ .history -.vscode \ No newline at end of file +.vscode +logs \ No newline at end of file diff --git a/example/default/config.yaml b/example/default/config.yaml index b7b7a2c..d92c7a6 100644 --- a/example/default/config.yaml +++ b/example/default/config.yaml @@ -2,6 +2,8 @@ global: loglevel: trace tcp: listenaddr: :50051 +logrotate: + level: trace webrtc: publish: pubaudio: false diff --git a/example/default/main.go b/example/default/main.go index ef31209..560c51a 100644 --- a/example/default/main.go +++ b/example/default/main.go @@ -8,6 +8,7 @@ import ( _ "m7s.live/m7s/v5/plugin/hdl" _ "m7s.live/m7s/v5/plugin/webrtc" _ "m7s.live/m7s/v5/plugin/rtmp" + _ "m7s.live/m7s/v5/plugin/logrotate" ) func main() { diff --git a/go.mod b/go.mod index 782d179..023e897 100644 --- a/go.mod +++ b/go.mod @@ -9,16 +9,23 @@ require ( github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1 github.com/pion/interceptor v0.1.29 github.com/q191201771/naza v0.30.48 - github.com/quic-go/quic-go v0.42.0 + github.com/quic-go/quic-go v0.43.1 google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de google.golang.org/grpc v1.63.2 google.golang.org/protobuf v1.33.0 ) require ( + github.com/chromedp/cdproto v0.0.0-20240202021202-6d0b6a386732 // indirect + github.com/chromedp/sysutil v1.0.0 // indirect github.com/go-ole/go-ole v1.2.6 // indirect + github.com/gobwas/httphead v0.1.0 // indirect + github.com/gobwas/pool v0.2.1 // indirect + github.com/gobwas/ws v1.3.2 // indirect github.com/google/uuid v1.6.0 // indirect + github.com/josharian/intern v1.0.0 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect + github.com/mailru/easyjson v0.7.7 // indirect github.com/pion/datachannel v1.5.6 // indirect github.com/pion/dtls/v2 v2.2.10 // indirect github.com/pion/ice/v3 v3.0.6 // indirect @@ -46,7 +53,9 @@ require ( ) require ( + github.com/alchemy/rotoslog v0.2.2 github.com/bluenviron/mediacommon v1.9.2 + github.com/chromedp/chromedp v0.9.5 github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 // indirect github.com/gorilla/websocket v1.5.1 @@ -57,7 +66,7 @@ require ( github.com/shirou/gopsutil/v3 v3.24.3 go.uber.org/mock v0.4.0 // indirect golang.org/x/crypto v0.22.0 // indirect - golang.org/x/exp v0.0.0-20221205204356-47842c84f3db // indirect + golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 // indirect golang.org/x/mod v0.11.0 // indirect golang.org/x/net v0.24.0 // indirect golang.org/x/sys v0.19.0 // indirect diff --git a/go.sum b/go.sum index 385e12a..fd97955 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,14 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/alchemy/rotoslog v0.2.2 h1:yzAOjaQBKgJvAdPi0sF5KSPMq5f2vNJZEnPr73CPDzQ= +github.com/alchemy/rotoslog v0.2.2/go.mod h1:pOHF0DKryPLaQzjcUlidLVRTksvk9yW75YIu1yYiiEQ= github.com/bluenviron/mediacommon v1.9.2 h1:EHcvoC5YMXRcFE010bTNf07ZiSlB/e/AdZyG7GsEYN0= github.com/bluenviron/mediacommon v1.9.2/go.mod h1:lt8V+wMyPw8C69HAqDWV5tsAwzN9u2Z+ca8B6C//+n0= +github.com/chromedp/cdproto v0.0.0-20240202021202-6d0b6a386732 h1:XYUCaZrW8ckGWlCRJKCSoh/iFwlpX316a8yY9IFEzv8= +github.com/chromedp/cdproto v0.0.0-20240202021202-6d0b6a386732/go.mod h1:GKljq0VrfU4D5yc+2qA6OVr8pmO/MBbPEWqWQ/oqGEs= +github.com/chromedp/chromedp v0.9.5 h1:viASzruPJOiThk7c5bueOUY91jGLJVximoEMGoH93rg= +github.com/chromedp/chromedp v0.9.5/go.mod h1:D4I2qONslauw/C7INoCir1BJkSwBYMyZgx8X276z3+Y= +github.com/chromedp/sysutil v1.0.0 h1:+ZxhTpfpZlmchB58ih/LBHX52ky7w2VhQVKQMucy3Ic= +github.com/chromedp/sysutil v1.0.0/go.mod h1:kgWmDdq8fTzXYcKIBqIYvRRTnYb9aNS9moAV0xufSww= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= @@ -23,6 +31,12 @@ github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls= +github.com/gobwas/httphead v0.1.0 h1:exrUm0f4YX0L7EBwZHuCF4GDp8aJfVeBrlLQrs6NqWU= +github.com/gobwas/httphead v0.1.0/go.mod h1:O/RXo79gxV8G+RqlR/otEwx4Q36zl9rqC5u12GKvMCM= +github.com/gobwas/pool v0.2.1 h1:xfeeEhW7pwmX8nuLVlqbzVc7udMDrwetjEv+TZIz1og= +github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw= +github.com/gobwas/ws v1.3.2 h1:zlnbNHxumkRvfPWgfXu8RBwyNR1x8wh9cf5PTOCqs9Q= +github.com/gobwas/ws v1.3.2/go.mod h1:hRKAFb8wOxFROYNsT1bqfWnhX+b5MFeJM9r2ZSwg/KY= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= @@ -41,6 +55,8 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1 h1:/c3QmbOGMGTOumP2iT/rCwB7b0Q github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1/go.mod h1:5SN9VR2LTsRFsrEC6FHgRbTWrTHu6tqPeKxEQv15giM= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= +github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= +github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/kelindar/process v0.0.0-20170730150328-69a29e249ec3/go.mod h1:+lTCLnZFXOkqwD8sLPl6u4erAc0cP8wFegQHfipz7KE= github.com/kelindar/rate v1.0.0/go.mod h1:AjT4G+hTItNwt30lucEGZIz8y7Uk5zPho6vurIZ+1Es= github.com/kelindar/tcp v1.0.0/go.mod h1:JB5hj1cshLU60XrLij2BBxW3JQ4hOye8vqbyvuKb52k= @@ -50,8 +66,11 @@ github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3x github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/ledongthuc/pdf v0.0.0-20220302134840-0c2507a12d80/go.mod h1:imJHygn/1yfhB7XSJJKlFZKl/J+dCPAknuiaGOshXAs= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= +github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= +github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/mcuadros/go-defaults v1.2.0 h1:FODb8WSf0uGaY8elWJAkoLL0Ri6AlZ1bFlenk56oZtc= github.com/mcuadros/go-defaults v1.2.0/go.mod h1:WEZtHEVIGYVDqkKSWBdWKUVdRyKlMfulPaGDWIVeCWY= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= @@ -62,6 +81,7 @@ github.com/onsi/ginkgo/v2 v2.9.5/go.mod h1:tvAoo1QUJwNEU2ITftXTpR7R1RbCzoZUOs3Ro github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.27.6 h1:ENqfyGeS5AX/rlXDd/ETokDz93u0YufY1Pgxuy/PvWE= github.com/onsi/gomega v1.27.6/go.mod h1:PIQNjfQwkP3aQAH7lf7j87O/5FiNr+ZR8+ipb+qQlhg= +github.com/orisano/pixelmatch v0.0.0-20220722002657-fb0b55479cde/go.mod h1:nZgzbfBr3hhjoZnS66nKrHmduYNpc34ny7RK4z5/HM0= github.com/phsym/console-slog v0.3.1 h1:Fuzcrjr40xTc004S9Kni8XfNsk+qrptQmyR+wZw9/7A= github.com/phsym/console-slog v0.3.1/go.mod h1:oJskjp/X6e6c0mGpfP8ELkfKUsrkDifYRAqJQgmdDS0= github.com/pion/datachannel v1.5.6 h1:1IxKJntfSlYkpUj8LlYRSWpYiTTC02nUrOE8T3DqGeg= @@ -114,6 +134,8 @@ github.com/q191201771/naza v0.30.48 h1:lbYUaa7A15kJKYwOiU4AbFS1Zo8oQwppl2tLEbJTq github.com/q191201771/naza v0.30.48/go.mod h1:n+dpJjQSh90PxBwxBNuifOwQttywvSIN5TkWSSYCeBk= github.com/quic-go/quic-go v0.42.0 h1:uSfdap0eveIl8KXnipv9K7nlwZ5IqLlYOpJ58u5utpM= github.com/quic-go/quic-go v0.42.0/go.mod h1:132kz4kL3F9vxhW3CtQJLDVwcFe5wdWeJXXijhsO57M= +github.com/quic-go/quic-go v0.43.1 h1:fLiMNfQVe9q2JvSsiXo4fXOEguXHGGl9+6gLp4RPeZQ= +github.com/quic-go/quic-go v0.43.1/go.mod h1:132kz4kL3F9vxhW3CtQJLDVwcFe5wdWeJXXijhsO57M= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/shirou/gopsutil/v3 v3.24.3 h1:eoUGJSmdfLzJ3mxIhmOAhgKEKgQkeOwKpz1NbhVnuPE= @@ -156,6 +178,8 @@ golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30= golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M= golang.org/x/exp v0.0.0-20221205204356-47842c84f3db h1:D/cFflL63o2KSLJIwjlcIt8PR064j/xsmdEJL/YvY/o= golang.org/x/exp v0.0.0-20221205204356-47842c84f3db/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= +golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 h1:k/i9J1pBpvlfR+9QsetwPyERsqu1GIbi967PQMq3Ivc= +golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.11.0 h1:bUO06HqtnRcc/7l71XBe4WcqTZ+3AH1J59zWDDwLKgU= @@ -189,6 +213,7 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/pkg/avframe.go b/pkg/avframe.go index b78e1d4..02cf931 100644 --- a/pkg/avframe.go +++ b/pkg/avframe.go @@ -50,7 +50,7 @@ type ( DataFrame IDR bool Timestamp time.Duration // 绝对时间戳 - Wrap IAVFrame // 封装格式 + Wraps []IAVFrame // 封装格式 } AVRing = util.Ring[AVFrame] DataFrame struct { @@ -66,9 +66,9 @@ type ( func (frame *AVFrame) Reset() { frame.BytesIn = 0 frame.Timestamp = 0 - if frame.Wrap != nil { - frame.Wrap.Recycle() - frame.Wrap = nil + for _, wrap := range frame.Wraps { + wrap.Recycle() + wrap = nil } } diff --git a/pkg/log.go b/pkg/log.go new file mode 100644 index 0000000..b8dada6 --- /dev/null +++ b/pkg/log.go @@ -0,0 +1,67 @@ +package pkg + +import ( + "context" + "log/slog" + "slices" +) + +var _ slog.Handler = (*MultiLogHandler)(nil) + +type MultiLogHandler struct { + handlers []slog.Handler + level slog.Level +} + +func (m *MultiLogHandler) Add(h ...slog.Handler) { + m.handlers = append(m.handlers, h...) +} + +func (m *MultiLogHandler) Remove(h slog.Handler) { + if i := slices.Index(m.handlers, h); i != -1 { + m.handlers = slices.Delete(m.handlers, i, i+1) + } +} + +func (m *MultiLogHandler) SetLevel(level slog.Level) { + m.level = level +} + +// Enabled implements slog.Handler. +func (m *MultiLogHandler) Enabled(_ context.Context, l slog.Level) bool { + return l >= m.level +} + +// Handle implements slog.Handler. +func (m *MultiLogHandler) Handle(ctx context.Context, rec slog.Record) error { + for _, h := range m.handlers { + if err := h.Handle(ctx, rec); err != nil { + return err + } + } + return nil +} + +// WithAttrs implements slog.Handler. +func (m *MultiLogHandler) WithAttrs(attrs []slog.Attr) slog.Handler { + result := &MultiLogHandler{ + handlers: make([]slog.Handler, len(m.handlers)), + level: m.level, + } + for i, h := range m.handlers { + result.handlers[i] = h.WithAttrs(attrs) + } + return result +} + +// WithGroup implements slog.Handler. +func (m *MultiLogHandler) WithGroup(name string) slog.Handler { + result := &MultiLogHandler{ + handlers: make([]slog.Handler, len(m.handlers)), + level: m.level, + } + for i, h := range m.handlers { + result.handlers[i] = h.WithGroup(name) + } + return result +} diff --git a/pkg/ring-writer.go b/pkg/ring-writer.go index adc630b..06f7c52 100644 --- a/pkg/ring-writer.go +++ b/pkg/ring-writer.go @@ -15,19 +15,22 @@ var EmptyLocker emptyLocker type RingWriter struct { *util.Ring[AVFrame] `json:"-" yaml:"-"` - ReaderCount atomic.Int32 `json:"-" yaml:"-"` + IDRingList `json:"-" yaml:"-"` //最近的关键帧位置,首屏渲染 + ReaderCount atomic.Int32 `json:"-" yaml:"-"` pool *util.Ring[AVFrame] poolSize int Size int LastValue *AVFrame } -func (rb *RingWriter) Init(n int) *RingWriter { - rb.Ring = util.NewRing[AVFrame](n) - rb.Size = n +func NewRingWriter(n int) (rb *RingWriter) { + rb = &RingWriter{ + Size: n, + Ring: util.NewRing[AVFrame](n), + } rb.LastValue = &rb.Value rb.LastValue.StartWrite() - return rb + return } func (rb *RingWriter) Resize(size int) { diff --git a/pkg/ring_test.go b/pkg/ring_test.go index ff9011f..0cc203b 100644 --- a/pkg/ring_test.go +++ b/pkg/ring_test.go @@ -7,8 +7,7 @@ import ( ) func TestRing(t *testing.T) { - var w RingWriter - w.Init(10) + w := NewRingWriter(10) ctx, _ := context.WithTimeout(context.Background(), time.Second*5) go t.Run("writer", func(t *testing.T) { for i := 0; ctx.Err() == nil; i++ { @@ -23,7 +22,7 @@ func TestRing(t *testing.T) { var reader RingReader err := reader.StartRead(w.Ring) if err != nil { - t.Error(err) + t.Error(err) return } for ctx.Err() == nil { @@ -41,9 +40,9 @@ func TestRing(t *testing.T) { // slow reader t.Run("reader2", func(t *testing.T) { var reader RingReader - err := reader.StartRead(w.Ring) + err := reader.StartRead(w.Ring) if err != nil { - t.Error(err) + t.Error(err) return } for ctx.Err() == nil { @@ -61,8 +60,7 @@ func TestRing(t *testing.T) { } func BenchmarkRing(b *testing.B) { - var w RingWriter - w.Init(10) + w := NewRingWriter(10) ctx, _ := context.WithTimeout(context.Background(), time.Second*5) go func() { for i := 0; ctx.Err() == nil; i++ { diff --git a/pkg/track.go b/pkg/track.go index f38ebf6..283eaff 100644 --- a/pkg/track.go +++ b/pkg/track.go @@ -29,10 +29,10 @@ type ( AVTrack struct { Track - RingWriter - IDRingList `json:"-" yaml:"-"` //最近的关键帧位置,首屏渲染 + *RingWriter ICodecCtx SequenceFrame IAVFrame + WrapIndex int } ) @@ -46,8 +46,11 @@ func NewAVTrack(args ...any) (t *AVTrack) { t.FrameType = v case *slog.Logger: t.Logger = v + case *AVTrack: + t.Logger = v.Logger.With("subtrack", t.FrameType.String()) + t.RingWriter = v.RingWriter case int: - t.Init(v) + t.RingWriter = NewRingWriter(v) } } t.Ready = util.NewPromise(struct{}{}) diff --git a/pkg/util/sse.go b/pkg/util/sse.go new file mode 100644 index 0000000..fb97be7 --- /dev/null +++ b/pkg/util/sse.go @@ -0,0 +1,71 @@ +package util + +import ( + "context" + "encoding/json" + "net" + "net/http" + "os/exec" + + "gopkg.in/yaml.v3" +) + +var ( + sseEent = []byte("event: ") + sseBegin = []byte("data: ") + sseEnd = []byte("\n\n") +) + +type SSE struct { + http.ResponseWriter + context.Context +} + +func (sse *SSE) Write(data []byte) (n int, err error) { + if err = sse.Err(); err != nil { + return + } + buffers := net.Buffers{sseBegin, data, sseEnd} + nn, err := buffers.WriteTo(sse.ResponseWriter) + if err == nil { + sse.ResponseWriter.(http.Flusher).Flush() + } + return int(nn), err +} + +func (sse *SSE) WriteEvent(event string, data []byte) (err error) { + if err = sse.Err(); err != nil { + return + } + buffers := net.Buffers{sseEent, []byte(event + "\n"), sseBegin, data, sseEnd} + _, err = buffers.WriteTo(sse.ResponseWriter) + if err == nil { + sse.ResponseWriter.(http.Flusher).Flush() + } + return +} + +func NewSSE(w http.ResponseWriter, ctx context.Context) *SSE { + header := w.Header() + header.Set("Content-Type", "text/event-stream") + header.Set("Cache-Control", "no-cache") + header.Set("Connection", "keep-alive") + header.Set("X-Accel-Buffering", "no") + header.Set("Access-Control-Allow-Origin", "*") + return &SSE{ + ResponseWriter: w, + Context: ctx, + } +} + +func (sse *SSE) WriteJSON(data any) error { + return json.NewEncoder(sse).Encode(data) +} +func (sse *SSE) WriteYAML(data any) error { + return yaml.NewEncoder(sse).Encode(data) +} +func (sse *SSE) WriteExec(cmd *exec.Cmd) error { + cmd.Stderr = sse + cmd.Stdout = sse + return cmd.Run() +} diff --git a/plugin.go b/plugin.go index 9a2b623..9196738 100644 --- a/plugin.go +++ b/plugin.go @@ -357,3 +357,7 @@ func (p *Plugin) handle(pattern string, handler http.Handler) { } p.server.apiList = append(p.server.apiList, pattern) } + +func (p *Plugin) PostToServer(event any) { + p.server.PostMessage(event) +} diff --git a/plugin/logrotate/api.go b/plugin/logrotate/api.go new file mode 100644 index 0000000..9d4bd19 --- /dev/null +++ b/plugin/logrotate/api.go @@ -0,0 +1,59 @@ +package plugin_logrotate + +import ( + "context" + "io" + "net/http" + "os" + "path/filepath" + + "github.com/phsym/console-slog" + "google.golang.org/protobuf/types/known/emptypb" + "m7s.live/m7s/v5/pkg/util" + "m7s.live/m7s/v5/plugin/logrotate/pb" +) + +func (h *LogRotatePlugin) List(context.Context, *emptypb.Empty) (*pb.ResponseFileInfo, error) { + dir, err := os.Open(h.Path) + if err == nil { + var files []os.FileInfo + if files, err = dir.Readdir(0); err == nil { + var fileInfos []*pb.FileInfo + for _, info := range files { + fileInfos = append(fileInfos, &pb.FileInfo{ + Name: info.Name(), Size: info.Size(), + }) + } + return &pb.ResponseFileInfo{Files: fileInfos}, nil + } + } + return nil, err +} + +func (h *LogRotatePlugin) Get(_ context.Context, req *pb.RequestFileInfo) (res *pb.ResponseOpen, err error) { + file, err1 := os.Open(filepath.Join(h.Path, req.FileName)) + if err1 == nil { + defer file.Close() + res = &pb.ResponseOpen{} + content, err2 := io.ReadAll(file) + if err2 == nil { + res.Content = string(content) + } else { + err = err2 + } + } else { + err = err1 + } + return +} + +func (h *LogRotatePlugin) ServeHTTP(w http.ResponseWriter, r *http.Request) { + +} + +func (l *LogRotatePlugin) API_tail(w http.ResponseWriter, r *http.Request) { + writer := util.NewSSE(w, r.Context()) + h := console.NewHandler(writer, &console.HandlerOptions{NoColor: true}) + l.PostToServer(h) + <-r.Context().Done() +} diff --git a/plugin/logrotate/index.go b/plugin/logrotate/index.go new file mode 100644 index 0000000..8dec201 --- /dev/null +++ b/plugin/logrotate/index.go @@ -0,0 +1,43 @@ +package plugin_logrotate + +import ( + "io" + "log/slog" + + "github.com/alchemy/rotoslog" + + "github.com/phsym/console-slog" + "m7s.live/m7s/v5" + "m7s.live/m7s/v5/pkg" + "m7s.live/m7s/v5/plugin/logrotate/pb" +) + +type LogRotatePlugin struct { + pb.UnimplementedLogrotateServer + m7s.Plugin + Path string `default:"./logs" desc:"日志文件存放目录"` + Size uint64 `default:"1048576" desc:"日志文件大小,单位:字节"` + Days int `default:"1" desc:"日志文件保留天数"` + Formatter string `default:"2006-01-02T15" desc:"日志文件名格式"` + MaxFiles uint64 `default:"7" desc:"最大日志文件数量"` + Level string `default:"info" desc:"日志级别"` + handler slog.Handler +} + +var _ = m7s.InstallPlugin[LogRotatePlugin](&pb.Logrotate_ServiceDesc, pb.RegisterLogrotateHandler) + +func (config *LogRotatePlugin) OnInit() (err error) { + var lv slog.LevelVar + lv.UnmarshalText([]byte(config.Level)) + if config.Level == "trace" { + lv.Set(pkg.TraceLevel) + } + builder := func(w io.Writer, opts *slog.HandlerOptions) slog.Handler { + return console.NewHandler(w, &console.HandlerOptions{NoColor: true, Level: lv.Level()}) + } + config.handler, err = rotoslog.NewHandler(rotoslog.LogHandlerBuilder(builder), rotoslog.LogDir(config.Path), rotoslog.MaxFileSize(config.Size), rotoslog.DateTimeLayout(config.Formatter), rotoslog.MaxRotatedFiles(config.MaxFiles)) + if err == nil { + config.PostToServer(config.handler) + } + return +} diff --git a/plugin/logrotate/pb/logrotate.pb.go b/plugin/logrotate/pb/logrotate.pb.go new file mode 100644 index 0000000..373d086 --- /dev/null +++ b/plugin/logrotate/pb/logrotate.pb.go @@ -0,0 +1,365 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.1 +// protoc v3.19.1 +// source: logrotate.proto + +package pb + +import ( + _ "google.golang.org/genproto/googleapis/api/annotations" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + emptypb "google.golang.org/protobuf/types/known/emptypb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type ResponseOpen struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Content string `protobuf:"bytes,1,opt,name=content,proto3" json:"content,omitempty"` +} + +func (x *ResponseOpen) Reset() { + *x = ResponseOpen{} + if protoimpl.UnsafeEnabled { + mi := &file_logrotate_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ResponseOpen) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ResponseOpen) ProtoMessage() {} + +func (x *ResponseOpen) ProtoReflect() protoreflect.Message { + mi := &file_logrotate_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ResponseOpen.ProtoReflect.Descriptor instead. +func (*ResponseOpen) Descriptor() ([]byte, []int) { + return file_logrotate_proto_rawDescGZIP(), []int{0} +} + +func (x *ResponseOpen) GetContent() string { + if x != nil { + return x.Content + } + return "" +} + +type RequestFileInfo struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + FileName string `protobuf:"bytes,1,opt,name=fileName,proto3" json:"fileName,omitempty"` +} + +func (x *RequestFileInfo) Reset() { + *x = RequestFileInfo{} + if protoimpl.UnsafeEnabled { + mi := &file_logrotate_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *RequestFileInfo) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RequestFileInfo) ProtoMessage() {} + +func (x *RequestFileInfo) ProtoReflect() protoreflect.Message { + mi := &file_logrotate_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RequestFileInfo.ProtoReflect.Descriptor instead. +func (*RequestFileInfo) Descriptor() ([]byte, []int) { + return file_logrotate_proto_rawDescGZIP(), []int{1} +} + +func (x *RequestFileInfo) GetFileName() string { + if x != nil { + return x.FileName + } + return "" +} + +type ResponseFileInfo struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Files []*FileInfo `protobuf:"bytes,1,rep,name=files,proto3" json:"files,omitempty"` +} + +func (x *ResponseFileInfo) Reset() { + *x = ResponseFileInfo{} + if protoimpl.UnsafeEnabled { + mi := &file_logrotate_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ResponseFileInfo) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ResponseFileInfo) ProtoMessage() {} + +func (x *ResponseFileInfo) ProtoReflect() protoreflect.Message { + mi := &file_logrotate_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ResponseFileInfo.ProtoReflect.Descriptor instead. +func (*ResponseFileInfo) Descriptor() ([]byte, []int) { + return file_logrotate_proto_rawDescGZIP(), []int{2} +} + +func (x *ResponseFileInfo) GetFiles() []*FileInfo { + if x != nil { + return x.Files + } + return nil +} + +type FileInfo struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + Size int64 `protobuf:"varint,2,opt,name=size,proto3" json:"size,omitempty"` +} + +func (x *FileInfo) Reset() { + *x = FileInfo{} + if protoimpl.UnsafeEnabled { + mi := &file_logrotate_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *FileInfo) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*FileInfo) ProtoMessage() {} + +func (x *FileInfo) ProtoReflect() protoreflect.Message { + mi := &file_logrotate_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use FileInfo.ProtoReflect.Descriptor instead. +func (*FileInfo) Descriptor() ([]byte, []int) { + return file_logrotate_proto_rawDescGZIP(), []int{3} +} + +func (x *FileInfo) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *FileInfo) GetSize() int64 { + if x != nil { + return x.Size + } + return 0 +} + +var File_logrotate_proto protoreflect.FileDescriptor + +var file_logrotate_proto_rawDesc = []byte{ + 0x0a, 0x0f, 0x6c, 0x6f, 0x67, 0x72, 0x6f, 0x74, 0x61, 0x74, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x12, 0x03, 0x6d, 0x37, 0x73, 0x1a, 0x1c, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x61, + 0x70, 0x69, 0x2f, 0x61, 0x6e, 0x6e, 0x6f, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x22, 0x28, 0x0a, 0x0c, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x4f, 0x70, 0x65, + 0x6e, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x07, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x22, 0x2d, 0x0a, 0x0f, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x46, 0x69, 0x6c, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x1a, + 0x0a, 0x08, 0x66, 0x69, 0x6c, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x08, 0x66, 0x69, 0x6c, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x22, 0x37, 0x0a, 0x10, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x46, 0x69, 0x6c, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x23, + 0x0a, 0x05, 0x66, 0x69, 0x6c, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0d, 0x2e, + 0x6d, 0x37, 0x73, 0x2e, 0x46, 0x69, 0x6c, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x05, 0x66, 0x69, + 0x6c, 0x65, 0x73, 0x22, 0x32, 0x0a, 0x08, 0x46, 0x69, 0x6c, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x12, + 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, + 0x61, 0x6d, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x73, 0x69, 0x7a, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x03, 0x52, 0x04, 0x73, 0x69, 0x7a, 0x65, 0x32, 0xb9, 0x01, 0x0a, 0x09, 0x6c, 0x6f, 0x67, 0x72, + 0x6f, 0x74, 0x61, 0x74, 0x65, 0x12, 0x52, 0x0a, 0x04, 0x4c, 0x69, 0x73, 0x74, 0x12, 0x16, 0x2e, + 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, + 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x15, 0x2e, 0x6d, 0x37, 0x73, 0x2e, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x46, 0x69, 0x6c, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x22, 0x1b, 0x82, 0xd3, + 0xe4, 0x93, 0x02, 0x15, 0x12, 0x13, 0x2f, 0x6c, 0x6f, 0x67, 0x72, 0x6f, 0x74, 0x61, 0x74, 0x65, + 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x6c, 0x69, 0x73, 0x74, 0x12, 0x58, 0x0a, 0x03, 0x47, 0x65, 0x74, + 0x12, 0x14, 0x2e, 0x6d, 0x37, 0x73, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x46, 0x69, + 0x6c, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x1a, 0x11, 0x2e, 0x6d, 0x37, 0x73, 0x2e, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x4f, 0x70, 0x65, 0x6e, 0x22, 0x28, 0x82, 0xd3, 0xe4, 0x93, 0x02, + 0x22, 0x12, 0x20, 0x2f, 0x6c, 0x6f, 0x67, 0x72, 0x6f, 0x74, 0x61, 0x74, 0x65, 0x2f, 0x61, 0x70, + 0x69, 0x2f, 0x67, 0x65, 0x74, 0x2f, 0x7b, 0x66, 0x69, 0x6c, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x3d, + 0x2a, 0x2a, 0x7d, 0x42, 0x25, 0x5a, 0x23, 0x6d, 0x37, 0x73, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x2f, + 0x6d, 0x37, 0x73, 0x2f, 0x76, 0x35, 0x2f, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2f, 0x6c, 0x6f, + 0x67, 0x72, 0x6f, 0x74, 0x61, 0x74, 0x65, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, +} + +var ( + file_logrotate_proto_rawDescOnce sync.Once + file_logrotate_proto_rawDescData = file_logrotate_proto_rawDesc +) + +func file_logrotate_proto_rawDescGZIP() []byte { + file_logrotate_proto_rawDescOnce.Do(func() { + file_logrotate_proto_rawDescData = protoimpl.X.CompressGZIP(file_logrotate_proto_rawDescData) + }) + return file_logrotate_proto_rawDescData +} + +var file_logrotate_proto_msgTypes = make([]protoimpl.MessageInfo, 4) +var file_logrotate_proto_goTypes = []interface{}{ + (*ResponseOpen)(nil), // 0: m7s.ResponseOpen + (*RequestFileInfo)(nil), // 1: m7s.RequestFileInfo + (*ResponseFileInfo)(nil), // 2: m7s.ResponseFileInfo + (*FileInfo)(nil), // 3: m7s.FileInfo + (*emptypb.Empty)(nil), // 4: google.protobuf.Empty +} +var file_logrotate_proto_depIdxs = []int32{ + 3, // 0: m7s.ResponseFileInfo.files:type_name -> m7s.FileInfo + 4, // 1: m7s.logrotate.List:input_type -> google.protobuf.Empty + 1, // 2: m7s.logrotate.Get:input_type -> m7s.RequestFileInfo + 2, // 3: m7s.logrotate.List:output_type -> m7s.ResponseFileInfo + 0, // 4: m7s.logrotate.Get:output_type -> m7s.ResponseOpen + 3, // [3:5] is the sub-list for method output_type + 1, // [1:3] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_logrotate_proto_init() } +func file_logrotate_proto_init() { + if File_logrotate_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_logrotate_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ResponseOpen); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_logrotate_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*RequestFileInfo); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_logrotate_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ResponseFileInfo); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_logrotate_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*FileInfo); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_logrotate_proto_rawDesc, + NumEnums: 0, + NumMessages: 4, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_logrotate_proto_goTypes, + DependencyIndexes: file_logrotate_proto_depIdxs, + MessageInfos: file_logrotate_proto_msgTypes, + }.Build() + File_logrotate_proto = out.File + file_logrotate_proto_rawDesc = nil + file_logrotate_proto_goTypes = nil + file_logrotate_proto_depIdxs = nil +} diff --git a/plugin/logrotate/pb/logrotate.pb.gw.go b/plugin/logrotate/pb/logrotate.pb.gw.go new file mode 100644 index 0000000..b8e170c --- /dev/null +++ b/plugin/logrotate/pb/logrotate.pb.gw.go @@ -0,0 +1,259 @@ +// Code generated by protoc-gen-grpc-gateway. DO NOT EDIT. +// source: logrotate.proto + +/* +Package pb is a reverse proxy. + +It translates gRPC into RESTful JSON APIs. +*/ +package pb + +import ( + "context" + "io" + "net/http" + + "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" + "github.com/grpc-ecosystem/grpc-gateway/v2/utilities" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/emptypb" +) + +// Suppress "imported and not used" errors +var _ codes.Code +var _ io.Reader +var _ status.Status +var _ = runtime.String +var _ = utilities.NewDoubleArray +var _ = metadata.Join + +func request_Logrotate_List_0(ctx context.Context, marshaler runtime.Marshaler, client LogrotateClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var protoReq emptypb.Empty + var metadata runtime.ServerMetadata + + msg, err := client.List(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD)) + return msg, metadata, err + +} + +func local_request_Logrotate_List_0(ctx context.Context, marshaler runtime.Marshaler, server LogrotateServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var protoReq emptypb.Empty + var metadata runtime.ServerMetadata + + msg, err := server.List(ctx, &protoReq) + return msg, metadata, err + +} + +func request_Logrotate_Get_0(ctx context.Context, marshaler runtime.Marshaler, client LogrotateClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var protoReq RequestFileInfo + var metadata runtime.ServerMetadata + + var ( + val string + ok bool + err error + _ = err + ) + + val, ok = pathParams["fileName"] + if !ok { + return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "fileName") + } + + protoReq.FileName, err = runtime.String(val) + if err != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "fileName", err) + } + + msg, err := client.Get(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD)) + return msg, metadata, err + +} + +func local_request_Logrotate_Get_0(ctx context.Context, marshaler runtime.Marshaler, server LogrotateServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var protoReq RequestFileInfo + var metadata runtime.ServerMetadata + + var ( + val string + ok bool + err error + _ = err + ) + + val, ok = pathParams["fileName"] + if !ok { + return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "fileName") + } + + protoReq.FileName, err = runtime.String(val) + if err != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "fileName", err) + } + + msg, err := server.Get(ctx, &protoReq) + return msg, metadata, err + +} + +// RegisterLogrotateHandlerServer registers the http handlers for service Logrotate to "mux". +// UnaryRPC :call LogrotateServer directly. +// StreamingRPC :currently unsupported pending https://github.com/grpc/grpc-go/issues/906. +// Note that using this registration option will cause many gRPC library features to stop working. Consider using RegisterLogrotateHandlerFromEndpoint instead. +func RegisterLogrotateHandlerServer(ctx context.Context, mux *runtime.ServeMux, server LogrotateServer) error { + + mux.Handle("GET", pattern_Logrotate_List_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + var stream runtime.ServerTransportStream + ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + var err error + var annotatedContext context.Context + annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/m7s.Logrotate/List", runtime.WithHTTPPathPattern("/logrotate/api/list")) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := local_request_Logrotate_List_0(annotatedContext, inboundMarshaler, server, req, pathParams) + md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer()) + annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md) + if err != nil { + runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) + return + } + + forward_Logrotate_List_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + + }) + + mux.Handle("GET", pattern_Logrotate_Get_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + var stream runtime.ServerTransportStream + ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + var err error + var annotatedContext context.Context + annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/m7s.Logrotate/Get", runtime.WithHTTPPathPattern("/logrotate/api/get/{fileName=**}")) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := local_request_Logrotate_Get_0(annotatedContext, inboundMarshaler, server, req, pathParams) + md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer()) + annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md) + if err != nil { + runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) + return + } + + forward_Logrotate_Get_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + + }) + + return nil +} + +// RegisterLogrotateHandlerFromEndpoint is same as RegisterLogrotateHandler but +// automatically dials to "endpoint" and closes the connection when "ctx" gets done. +func RegisterLogrotateHandlerFromEndpoint(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) (err error) { + conn, err := grpc.DialContext(ctx, endpoint, opts...) + if err != nil { + return err + } + defer func() { + if err != nil { + if cerr := conn.Close(); cerr != nil { + grpclog.Infof("Failed to close conn to %s: %v", endpoint, cerr) + } + return + } + go func() { + <-ctx.Done() + if cerr := conn.Close(); cerr != nil { + grpclog.Infof("Failed to close conn to %s: %v", endpoint, cerr) + } + }() + }() + + return RegisterLogrotateHandler(ctx, mux, conn) +} + +// RegisterLogrotateHandler registers the http handlers for service Logrotate to "mux". +// The handlers forward requests to the grpc endpoint over "conn". +func RegisterLogrotateHandler(ctx context.Context, mux *runtime.ServeMux, conn *grpc.ClientConn) error { + return RegisterLogrotateHandlerClient(ctx, mux, NewLogrotateClient(conn)) +} + +// RegisterLogrotateHandlerClient registers the http handlers for service Logrotate +// to "mux". The handlers forward requests to the grpc endpoint over the given implementation of "LogrotateClient". +// Note: the gRPC framework executes interceptors within the gRPC handler. If the passed in "LogrotateClient" +// doesn't go through the normal gRPC flow (creating a gRPC client etc.) then it will be up to the passed in +// "LogrotateClient" to call the correct interceptors. +func RegisterLogrotateHandlerClient(ctx context.Context, mux *runtime.ServeMux, client LogrotateClient) error { + + mux.Handle("GET", pattern_Logrotate_List_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + var err error + var annotatedContext context.Context + annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/m7s.Logrotate/List", runtime.WithHTTPPathPattern("/logrotate/api/list")) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := request_Logrotate_List_0(annotatedContext, inboundMarshaler, client, req, pathParams) + annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md) + if err != nil { + runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) + return + } + + forward_Logrotate_List_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + + }) + + mux.Handle("GET", pattern_Logrotate_Get_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + var err error + var annotatedContext context.Context + annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/m7s.Logrotate/Get", runtime.WithHTTPPathPattern("/logrotate/api/get/{fileName=**}")) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := request_Logrotate_Get_0(annotatedContext, inboundMarshaler, client, req, pathParams) + annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md) + if err != nil { + runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) + return + } + + forward_Logrotate_Get_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + + }) + + return nil +} + +var ( + pattern_Logrotate_List_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"logrotate", "api", "list"}, "")) + + pattern_Logrotate_Get_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 3, 0, 4, 1, 5, 3}, []string{"logrotate", "api", "get", "fileName"}, "")) +) + +var ( + forward_Logrotate_List_0 = runtime.ForwardResponseMessage + + forward_Logrotate_Get_0 = runtime.ForwardResponseMessage +) diff --git a/plugin/logrotate/pb/logrotate.proto b/plugin/logrotate/pb/logrotate.proto new file mode 100644 index 0000000..160e108 --- /dev/null +++ b/plugin/logrotate/pb/logrotate.proto @@ -0,0 +1,35 @@ +syntax = "proto3"; +import "google/api/annotations.proto"; +import "google/protobuf/empty.proto"; +package m7s; +option go_package="m7s.live/m7s/v5/plugin/logrotate/pb"; + +service logrotate { + rpc List (google.protobuf.Empty) returns (ResponseFileInfo) { + option (google.api.http) = { + get: "/logrotate/api/list" + }; + } + rpc Get (RequestFileInfo) returns (ResponseOpen) { + option (google.api.http) = { + get: "/logrotate/api/get/{fileName=**}" + }; + } +} + +message ResponseOpen { + string content = 1; +} + +message RequestFileInfo { + string fileName = 1; +} + +message ResponseFileInfo { + repeated FileInfo files = 1; +} + +message FileInfo { + string name = 1; + int64 size = 2; +} \ No newline at end of file diff --git a/plugin/logrotate/pb/logrotate_grpc.pb.go b/plugin/logrotate/pb/logrotate_grpc.pb.go new file mode 100644 index 0000000..fcef430 --- /dev/null +++ b/plugin/logrotate/pb/logrotate_grpc.pb.go @@ -0,0 +1,142 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.2.0 +// - protoc v3.19.1 +// source: logrotate.proto + +package pb + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + emptypb "google.golang.org/protobuf/types/known/emptypb" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// LogrotateClient is the client API for Logrotate service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type LogrotateClient interface { + List(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ResponseFileInfo, error) + Get(ctx context.Context, in *RequestFileInfo, opts ...grpc.CallOption) (*ResponseOpen, error) +} + +type logrotateClient struct { + cc grpc.ClientConnInterface +} + +func NewLogrotateClient(cc grpc.ClientConnInterface) LogrotateClient { + return &logrotateClient{cc} +} + +func (c *logrotateClient) List(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ResponseFileInfo, error) { + out := new(ResponseFileInfo) + err := c.cc.Invoke(ctx, "/m7s.logrotate/List", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *logrotateClient) Get(ctx context.Context, in *RequestFileInfo, opts ...grpc.CallOption) (*ResponseOpen, error) { + out := new(ResponseOpen) + err := c.cc.Invoke(ctx, "/m7s.logrotate/Get", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// LogrotateServer is the server API for Logrotate service. +// All implementations must embed UnimplementedLogrotateServer +// for forward compatibility +type LogrotateServer interface { + List(context.Context, *emptypb.Empty) (*ResponseFileInfo, error) + Get(context.Context, *RequestFileInfo) (*ResponseOpen, error) + mustEmbedUnimplementedLogrotateServer() +} + +// UnimplementedLogrotateServer must be embedded to have forward compatible implementations. +type UnimplementedLogrotateServer struct { +} + +func (UnimplementedLogrotateServer) List(context.Context, *emptypb.Empty) (*ResponseFileInfo, error) { + return nil, status.Errorf(codes.Unimplemented, "method List not implemented") +} +func (UnimplementedLogrotateServer) Get(context.Context, *RequestFileInfo) (*ResponseOpen, error) { + return nil, status.Errorf(codes.Unimplemented, "method Get not implemented") +} +func (UnimplementedLogrotateServer) mustEmbedUnimplementedLogrotateServer() {} + +// UnsafeLogrotateServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to LogrotateServer will +// result in compilation errors. +type UnsafeLogrotateServer interface { + mustEmbedUnimplementedLogrotateServer() +} + +func RegisterLogrotateServer(s grpc.ServiceRegistrar, srv LogrotateServer) { + s.RegisterService(&Logrotate_ServiceDesc, srv) +} + +func _Logrotate_List_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(emptypb.Empty) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(LogrotateServer).List(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/m7s.logrotate/List", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(LogrotateServer).List(ctx, req.(*emptypb.Empty)) + } + return interceptor(ctx, in, info, handler) +} + +func _Logrotate_Get_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(RequestFileInfo) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(LogrotateServer).Get(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/m7s.logrotate/Get", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(LogrotateServer).Get(ctx, req.(*RequestFileInfo)) + } + return interceptor(ctx, in, info, handler) +} + +// Logrotate_ServiceDesc is the grpc.ServiceDesc for Logrotate service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var Logrotate_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "m7s.logrotate", + HandlerType: (*LogrotateServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "List", + Handler: _Logrotate_List_Handler, + }, + { + MethodName: "Get", + Handler: _Logrotate_Get_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "logrotate.proto", +} diff --git a/plugin/rtmp/pb/index.pb.go b/plugin/rtmp/pb/rtmp.pb.go similarity index 53% rename from plugin/rtmp/pb/index.pb.go rename to plugin/rtmp/pb/rtmp.pb.go index 54c80d1..5da3d9b 100644 --- a/plugin/rtmp/pb/index.pb.go +++ b/plugin/rtmp/pb/rtmp.pb.go @@ -2,7 +2,7 @@ // versions: // protoc-gen-go v1.28.1 // protoc v3.19.1 -// source: index.proto +// source: rtmp.proto package pb @@ -34,7 +34,7 @@ type PushRequest struct { func (x *PushRequest) Reset() { *x = PushRequest{} if protoimpl.UnsafeEnabled { - mi := &file_index_proto_msgTypes[0] + mi := &file_rtmp_proto_msgTypes[0] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -47,7 +47,7 @@ func (x *PushRequest) String() string { func (*PushRequest) ProtoMessage() {} func (x *PushRequest) ProtoReflect() protoreflect.Message { - mi := &file_index_proto_msgTypes[0] + mi := &file_rtmp_proto_msgTypes[0] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -60,7 +60,7 @@ func (x *PushRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use PushRequest.ProtoReflect.Descriptor instead. func (*PushRequest) Descriptor() ([]byte, []int) { - return file_index_proto_rawDescGZIP(), []int{0} + return file_rtmp_proto_rawDescGZIP(), []int{0} } func (x *PushRequest) GetStreamPath() string { @@ -86,7 +86,7 @@ type PushResponse struct { func (x *PushResponse) Reset() { *x = PushResponse{} if protoimpl.UnsafeEnabled { - mi := &file_index_proto_msgTypes[1] + mi := &file_rtmp_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -99,7 +99,7 @@ func (x *PushResponse) String() string { func (*PushResponse) ProtoMessage() {} func (x *PushResponse) ProtoReflect() protoreflect.Message { - mi := &file_index_proto_msgTypes[1] + mi := &file_rtmp_proto_msgTypes[1] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -112,52 +112,52 @@ func (x *PushResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use PushResponse.ProtoReflect.Descriptor instead. func (*PushResponse) Descriptor() ([]byte, []int) { - return file_index_proto_rawDescGZIP(), []int{1} + return file_rtmp_proto_rawDescGZIP(), []int{1} } -var File_index_proto protoreflect.FileDescriptor +var File_rtmp_proto protoreflect.FileDescriptor -var file_index_proto_rawDesc = []byte{ - 0x0a, 0x0b, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x03, 0x6d, - 0x37, 0x73, 0x1a, 0x1c, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x61, - 0x6e, 0x6e, 0x6f, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, - 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x4b, 0x0a, - 0x0b, 0x50, 0x75, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1e, 0x0a, 0x0a, - 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x0a, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, 0x61, 0x74, 0x68, 0x12, 0x1c, 0x0a, 0x09, - 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x55, 0x52, 0x4c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x09, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x55, 0x52, 0x4c, 0x22, 0x0e, 0x0a, 0x0c, 0x50, 0x75, - 0x73, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0x69, 0x0a, 0x04, 0x72, 0x74, - 0x6d, 0x70, 0x12, 0x61, 0x0a, 0x07, 0x50, 0x75, 0x73, 0x68, 0x4f, 0x75, 0x74, 0x12, 0x10, 0x2e, - 0x6d, 0x37, 0x73, 0x2e, 0x50, 0x75, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, - 0x11, 0x2e, 0x6d, 0x37, 0x73, 0x2e, 0x50, 0x75, 0x73, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x22, 0x31, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x2b, 0x22, 0x1e, 0x2f, 0x72, 0x74, 0x6d, - 0x70, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x70, 0x75, 0x73, 0x68, 0x2f, 0x7b, 0x73, 0x74, 0x72, 0x65, - 0x61, 0x6d, 0x50, 0x61, 0x74, 0x68, 0x3d, 0x2a, 0x2a, 0x7d, 0x3a, 0x09, 0x72, 0x65, 0x6d, 0x6f, - 0x74, 0x65, 0x55, 0x52, 0x4c, 0x42, 0x20, 0x5a, 0x1e, 0x6d, 0x37, 0x73, 0x2e, 0x6c, 0x69, 0x76, - 0x65, 0x2f, 0x6d, 0x37, 0x73, 0x2f, 0x76, 0x35, 0x2f, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2f, - 0x72, 0x74, 0x6d, 0x70, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +var file_rtmp_proto_rawDesc = []byte{ + 0x0a, 0x0a, 0x72, 0x74, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x03, 0x6d, 0x37, + 0x73, 0x1a, 0x1c, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x61, 0x6e, + 0x6e, 0x6f, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, + 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, + 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x4b, 0x0a, 0x0b, + 0x50, 0x75, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1e, 0x0a, 0x0a, 0x73, + 0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x0a, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, 0x61, 0x74, 0x68, 0x12, 0x1c, 0x0a, 0x09, 0x72, + 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x55, 0x52, 0x4c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, + 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x55, 0x52, 0x4c, 0x22, 0x0e, 0x0a, 0x0c, 0x50, 0x75, 0x73, + 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0x69, 0x0a, 0x04, 0x72, 0x74, 0x6d, + 0x70, 0x12, 0x61, 0x0a, 0x07, 0x50, 0x75, 0x73, 0x68, 0x4f, 0x75, 0x74, 0x12, 0x10, 0x2e, 0x6d, + 0x37, 0x73, 0x2e, 0x50, 0x75, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x11, + 0x2e, 0x6d, 0x37, 0x73, 0x2e, 0x50, 0x75, 0x73, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x22, 0x31, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x2b, 0x22, 0x1e, 0x2f, 0x72, 0x74, 0x6d, 0x70, + 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x70, 0x75, 0x73, 0x68, 0x2f, 0x7b, 0x73, 0x74, 0x72, 0x65, 0x61, + 0x6d, 0x50, 0x61, 0x74, 0x68, 0x3d, 0x2a, 0x2a, 0x7d, 0x3a, 0x09, 0x72, 0x65, 0x6d, 0x6f, 0x74, + 0x65, 0x55, 0x52, 0x4c, 0x42, 0x20, 0x5a, 0x1e, 0x6d, 0x37, 0x73, 0x2e, 0x6c, 0x69, 0x76, 0x65, + 0x2f, 0x6d, 0x37, 0x73, 0x2f, 0x76, 0x35, 0x2f, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2f, 0x72, + 0x74, 0x6d, 0x70, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( - file_index_proto_rawDescOnce sync.Once - file_index_proto_rawDescData = file_index_proto_rawDesc + file_rtmp_proto_rawDescOnce sync.Once + file_rtmp_proto_rawDescData = file_rtmp_proto_rawDesc ) -func file_index_proto_rawDescGZIP() []byte { - file_index_proto_rawDescOnce.Do(func() { - file_index_proto_rawDescData = protoimpl.X.CompressGZIP(file_index_proto_rawDescData) +func file_rtmp_proto_rawDescGZIP() []byte { + file_rtmp_proto_rawDescOnce.Do(func() { + file_rtmp_proto_rawDescData = protoimpl.X.CompressGZIP(file_rtmp_proto_rawDescData) }) - return file_index_proto_rawDescData + return file_rtmp_proto_rawDescData } -var file_index_proto_msgTypes = make([]protoimpl.MessageInfo, 2) -var file_index_proto_goTypes = []interface{}{ +var file_rtmp_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_rtmp_proto_goTypes = []interface{}{ (*PushRequest)(nil), // 0: m7s.PushRequest (*PushResponse)(nil), // 1: m7s.PushResponse } -var file_index_proto_depIdxs = []int32{ +var file_rtmp_proto_depIdxs = []int32{ 0, // 0: m7s.rtmp.PushOut:input_type -> m7s.PushRequest 1, // 1: m7s.rtmp.PushOut:output_type -> m7s.PushResponse 1, // [1:2] is the sub-list for method output_type @@ -167,13 +167,13 @@ var file_index_proto_depIdxs = []int32{ 0, // [0:0] is the sub-list for field type_name } -func init() { file_index_proto_init() } -func file_index_proto_init() { - if File_index_proto != nil { +func init() { file_rtmp_proto_init() } +func file_rtmp_proto_init() { + if File_rtmp_proto != nil { return } if !protoimpl.UnsafeEnabled { - file_index_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + file_rtmp_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*PushRequest); i { case 0: return &v.state @@ -185,7 +185,7 @@ func file_index_proto_init() { return nil } } - file_index_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + file_rtmp_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*PushResponse); i { case 0: return &v.state @@ -202,18 +202,18 @@ func file_index_proto_init() { out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_index_proto_rawDesc, + RawDescriptor: file_rtmp_proto_rawDesc, NumEnums: 0, NumMessages: 2, NumExtensions: 0, NumServices: 1, }, - GoTypes: file_index_proto_goTypes, - DependencyIndexes: file_index_proto_depIdxs, - MessageInfos: file_index_proto_msgTypes, + GoTypes: file_rtmp_proto_goTypes, + DependencyIndexes: file_rtmp_proto_depIdxs, + MessageInfos: file_rtmp_proto_msgTypes, }.Build() - File_index_proto = out.File - file_index_proto_rawDesc = nil - file_index_proto_goTypes = nil - file_index_proto_depIdxs = nil + File_rtmp_proto = out.File + file_rtmp_proto_rawDesc = nil + file_rtmp_proto_goTypes = nil + file_rtmp_proto_depIdxs = nil } diff --git a/plugin/rtmp/pb/index.pb.gw.go b/plugin/rtmp/pb/rtmp.pb.gw.go similarity index 99% rename from plugin/rtmp/pb/index.pb.gw.go rename to plugin/rtmp/pb/rtmp.pb.gw.go index 3d83862..759af76 100644 --- a/plugin/rtmp/pb/index.pb.gw.go +++ b/plugin/rtmp/pb/rtmp.pb.gw.go @@ -1,5 +1,5 @@ // Code generated by protoc-gen-grpc-gateway. DO NOT EDIT. -// source: index.proto +// source: rtmp.proto /* Package pb is a reverse proxy. diff --git a/plugin/rtmp/pb/index.proto b/plugin/rtmp/pb/rtmp.proto similarity index 100% rename from plugin/rtmp/pb/index.proto rename to plugin/rtmp/pb/rtmp.proto diff --git a/plugin/rtmp/pb/index_grpc.pb.go b/plugin/rtmp/pb/rtmp_grpc.pb.go similarity index 98% rename from plugin/rtmp/pb/index_grpc.pb.go rename to plugin/rtmp/pb/rtmp_grpc.pb.go index 01615a4..effbc5a 100644 --- a/plugin/rtmp/pb/index_grpc.pb.go +++ b/plugin/rtmp/pb/rtmp_grpc.pb.go @@ -2,7 +2,7 @@ // versions: // - protoc-gen-go-grpc v1.2.0 // - protoc v3.19.1 -// source: index.proto +// source: rtmp.proto package pb @@ -101,5 +101,5 @@ var Rtmp_ServiceDesc = grpc.ServiceDesc{ }, }, Streams: []grpc.StreamDesc{}, - Metadata: "index.proto", + Metadata: "rtmp.proto", } diff --git a/plugin/rtmp/pkg/video.go b/plugin/rtmp/pkg/video.go index afc8d1f..e12be57 100644 --- a/plugin/rtmp/pkg/video.go +++ b/plugin/rtmp/pkg/video.go @@ -212,7 +212,7 @@ func (avcc *RTMPVideo) ToRaw(codecCtx ICodecCtx) (any, error) { func (h264 *H264Ctx) CreateFrame(from *AVFrame) (frame IAVFrame, err error) { var rtmpVideo RTMPVideo rtmpVideo.RecyclableBuffers = &util.RecyclableBuffers{} - rtmpVideo.ScalableMemoryAllocator = from.Wrap.GetScalableMemoryAllocator() + rtmpVideo.ScalableMemoryAllocator = from.Wraps[0].GetScalableMemoryAllocator() nalus := from.Raw.(Nalus) head := rtmpVideo.Malloc(5) head[0] = util.Conditoinal[byte](from.IDR, 0x10, 0x20) | byte(ParseVideoCodec(h264.FourCC())) diff --git a/plugin/rtp/pkg/video.go b/plugin/rtp/pkg/video.go index 26c31da..ccc7bad 100644 --- a/plugin/rtp/pkg/video.go +++ b/plugin/rtp/pkg/video.go @@ -82,7 +82,7 @@ func (r *RTPVideo) Parse(t *AVTrack) (isIDR, isSeq bool, raw any, err error) { func (h264 *RTPH264Ctx) CreateFrame(from *AVFrame) (frame IAVFrame, err error) { var r RTPVideo - r.ScalableMemoryAllocator = from.Wrap.GetScalableMemoryAllocator() + r.ScalableMemoryAllocator = from.Wraps[0].GetScalableMemoryAllocator() nalus := from.Raw.(Nalus) nalutype := nalus.H264Type() var lastPacket *rtp.Packet @@ -162,14 +162,14 @@ func (r *RTPVideo) ToRaw(ictx ICodecCtx) (any, error) { switch t { case codec.NALU_STAPA, codec.NALU_STAPB: if len(packet.Payload) <= offset { - return nil, errors.New("invalid nalu size") + return nil, fmt.Errorf("invalid nalu size %d", len(packet.Payload)) } for buffer := util.Buffer(packet.Payload[offset:]); buffer.CanRead(); { if nextSize := int(buffer.ReadUint16()); buffer.Len() >= nextSize { nalu = [][]byte{buffer.ReadN(nextSize)} gotNalu(codec.ParseH264NALUType(nalu[0][0])) } else { - return nil, errors.New("invalid nalu size") + return nil, fmt.Errorf("invalid nalu size %d", nextSize) } } case codec.NALU_FUA, codec.NALU_FUB: diff --git a/plugin/webrtc/index.go b/plugin/webrtc/index.go index 8ad5eb9..3ae8821 100644 --- a/plugin/webrtc/index.go +++ b/plugin/webrtc/index.go @@ -235,7 +235,9 @@ func (conf *WebRTCPlugin) Push_(w http.ResponseWriter, r *http.Request) { if len(frame.Packets) == 0 || packet.Timestamp == frame.Packets[0].Timestamp { frame.Packets = append(frame.Packets, &packet) } else { + t := time.Now() publisher.WriteVideo(frame) + fmt.Println("write video", time.Since(t)) frame = &mrtp.RTPVideo{} frame.Packets = []*rtp.Packet{&packet} frame.RTPCodecParameters = &codecP diff --git a/plugin/webrtc/index_test.go b/plugin/webrtc/index_test.go new file mode 100644 index 0000000..30cf239 --- /dev/null +++ b/plugin/webrtc/index_test.go @@ -0,0 +1,23 @@ +package plugin_webrtc + +import ( + "context" + "testing" + "time" + + "github.com/chromedp/chromedp" + "m7s.live/m7s/v5" +) + +func TestPublish(t *testing.T) { + ctx, cancel := chromedp.NewContext(context.Background()) + go m7s.Run(ctx, "config.yaml") + defer cancel() + err := chromedp.Run(ctx, + chromedp.Navigate("http://localhost:8080/webrtc/test/publish"), + ) + if err != nil { + t.Fatal(err) + } + <-time.After(10 * time.Second) +} diff --git a/publisher.go b/publisher.go index 48f3d7b..355efa4 100644 --- a/publisher.go +++ b/publisher.go @@ -34,8 +34,8 @@ func (s *SpeedControl) speedControl(speed float64, ts time.Duration) { } else { elapsed := time.Since(s.beginTime) should := time.Duration(float64(ts) / speed) - if should > elapsed { - time.Sleep(should - elapsed) + if needSleep := should - elapsed; needSleep > time.Second { + time.Sleep(needSleep) } } } @@ -50,7 +50,8 @@ func (t *AVTracks) IsEmpty() bool { } func (t *AVTracks) CreateSubTrack(dataType reflect.Type) (track *AVTrack) { - track = NewAVTrack(dataType, t.Logger.With("subtrack", dataType.String()), t.AVTrack.Size) + track = NewAVTrack(dataType, t.AVTrack) + track.WrapIndex = len(t.Items) t.Add(track) return } @@ -144,7 +145,7 @@ func (p *Publisher) AddSubscriber(subscriber *Subscriber) (err error) { func (p *Publisher) writeAV(t *AVTrack, data IAVFrame) { frame := &t.Value - frame.Wrap = data + frame.Wraps = append(frame.Wraps, data) ts := data.GetTimestamp() if p.lastTs == 0 { p.baseTs -= ts @@ -153,7 +154,7 @@ func (p *Publisher) writeAV(t *AVTrack, data IAVFrame) { p.lastTs = frame.Timestamp if p.Enabled(p, TraceLevel) { codec := t.FourCC().String() - size, data := frame.Wrap.GetSize(), frame.Wrap.String() + size, data := frame.Wraps[0].GetSize(), frame.Wraps[0].String() p.Trace("write", "seq", frame.Sequence, "ts", frame.Timestamp, "codec", codec, "size", size, "data", data) } t.Step() @@ -215,7 +216,7 @@ func (p *Publisher) WriteVideo(data IAVFrame) (err error) { p.writeAV(t, data) if p.VideoTrack.Length > 1 && !p.VideoTrack.AVTrack.Ready.Pendding() { if t.LastValue.Raw == nil { - t.LastValue.Raw, err = t.LastValue.Wrap.ToRaw(t.ICodecCtx) + t.LastValue.Raw, err = t.LastValue.Wraps[0].ToRaw(t.ICodecCtx) if err != nil { t.Error("to raw", "err", err) return err @@ -224,15 +225,9 @@ func (p *Publisher) WriteVideo(data IAVFrame) (err error) { for i, track := range p.VideoTrack.Items[1:] { if track.ICodecCtx == nil { err = (reflect.New(track.FrameType.Elem()).Interface().(IAVFrame)).DecodeConfig(track, t.ICodecCtx) - if p.BufferTime > 0 { - track.IDRingList.AddIDR(track.Ring) - track.HistoryRing.Store(track.Ring) - } else { - track.IDRing.Store(track.Ring) - } for rf := idr; rf != t.Ring; rf = rf.Next() { if i == 0 && rf.Value.Raw == nil { - rf.Value.Raw, err = rf.Value.Wrap.ToRaw(t.ICodecCtx) + rf.Value.Raw, err = rf.Value.Wraps[0].ToRaw(t.ICodecCtx) if err != nil { t.Error("to raw", "err", err) return err @@ -259,7 +254,7 @@ func (p *Publisher) writeSubAV(to *AVTrack, frame *AVFrame) (err error) { to.Error("from raw", "err", err) return } - to.Value.Wrap = toFrame + to.Value.Wraps = append(to.Value.Wraps, toFrame) to.Value.IDR = frame.IDR to.Value.Timestamp = frame.Timestamp if p.Enabled(p, TraceLevel) { @@ -347,13 +342,13 @@ func (p *Publisher) SnapShot() (ret *pb.StreamSnapShot) { snap.Sequence = v.Sequence snap.Timestamp = uint32(v.Timestamp) snap.WriteTime = uint64(v.WriteTime.UnixNano()) - if v.Wrap != nil { - snap.Wrap = &pb.Wrap{ - Timestamp: uint32(v.Wrap.GetTimestamp()), - Size: uint32(v.Wrap.GetSize()), - Data: v.Wrap.String(), - } - } + // if v.Wrap != nil { + // snap.Wrap = &pb.Wrap{ + // Timestamp: uint32(v.Wrap.GetTimestamp()), + // Size: uint32(v.Wrap.GetSize()), + // Data: v.Wrap.String(), + // } + // } ret.VideoTrack = append(ret.VideoTrack, &snap) }) } @@ -364,13 +359,13 @@ func (p *Publisher) SnapShot() (ret *pb.StreamSnapShot) { snap.Sequence = v.Sequence snap.Timestamp = uint32(v.Timestamp) snap.WriteTime = uint64(v.WriteTime.UnixNano()) - if v.Wrap != nil { - snap.Wrap = &pb.Wrap{ - Timestamp: uint32(v.Wrap.GetTimestamp()), - Size: uint32(v.Wrap.GetSize()), - Data: v.Wrap.String(), - } - } + // if v.Wrap != nil { + // snap.Wrap = &pb.Wrap{ + // Timestamp: uint32(v.Wrap.GetTimestamp()), + // Size: uint32(v.Wrap.GetSize()), + // Data: v.Wrap.String(), + // } + // } ret.AudioTrack = append(ret.AudioTrack, &snap) }) } diff --git a/server.go b/server.go index 1f95df4..442dc93 100644 --- a/server.go +++ b/server.go @@ -53,6 +53,7 @@ type Server struct { Pushs util.Collection[string, *Pusher] Waiting map[string][]*Subscriber Subscribers util.Collection[int, *Subscriber] + LogHandler MultiLogHandler pidG int sidG int apiList []string @@ -68,10 +69,11 @@ func NewServer() (s *Server) { } s.config.HTTP.ListenAddrTLS = ":8443" s.config.HTTP.ListenAddr = ":8080" - s.Logger = slog.With("server", s.ID) s.handler = s s.server = s s.Meta = &serverMeta + s.LogHandler.Add(console.NewHandler(os.Stdout, nil)) + s.Logger = slog.New(&s.LogHandler).With("server", s.ID) Servers[s.ID] = s return } @@ -94,6 +96,9 @@ func (s *Server) reset() { server.Meta = s.Meta server.config.HTTP.ListenAddrTLS = ":8443" server.config.HTTP.ListenAddr = ":8080" + server.LogHandler = MultiLogHandler{} + server.LogHandler.Add(console.NewHandler(os.Stdout, nil)) + // server.Logger = slog.New(&server.LogHandler).With("server", s.ID) *s = server } @@ -144,9 +149,7 @@ func (s *Server) run(ctx context.Context, conf any) (err error) { if s.LogLevel == "trace" { lv.Set(TraceLevel) } - s.Logger = slog.New( - console.NewHandler(os.Stdout, &console.HandlerOptions{Level: lv.Level()}), - ).With("server", s.ID) + s.LogHandler.SetLevel(lv.Level()) s.registerHandler() if httpConf.ListenAddrTLS != "" { @@ -332,6 +335,8 @@ func (s *Server) eventLoop() { v.Resolve(&pb.StreamListResponse{List: streams}) continue } + case slog.Handler: + s.LogHandler.Add(v) } for _, plugin := range s.Plugins { if plugin.Disabled { @@ -462,3 +467,7 @@ func (s *Server) Call(arg any) (result any, err error) { } return } + +func (s *Server) PostMessage(msg any) { + s.eventChan <- msg +} diff --git a/subscriber.go b/subscriber.go index f18da91..342c7ce 100644 --- a/subscriber.go +++ b/subscriber.go @@ -60,6 +60,7 @@ func (s *Subscriber) Handle(handler SubscriberHandler) { var ar, vr *AVRingReader var ah, vh reflect.Value var a1, v1 reflect.Type + var awi, vwi int var initState = 0 var subMode = s.SubMode //订阅模式 if s.Args.Has(s.SubModeArgName) { @@ -77,6 +78,7 @@ func (s *Subscriber) Handle(handler SubscriberHandler) { return } if at := s.Publisher.GetAudioTrack(a1); at != nil { + awi = at.WrapIndex ar = NewAVRingReader(at) ar.Logger = s.Logger.With("reader", a1.String()) ar.Info("start read") @@ -88,6 +90,7 @@ func (s *Subscriber) Handle(handler SubscriberHandler) { return } if vt := s.Publisher.GetVideoTrack(v1); vt != nil { + vwi = vt.WrapIndex vr = NewAVRingReader(vt) vr.Logger = s.Logger.With("reader", v1.String()) vr.Info("start read") @@ -110,7 +113,7 @@ func (s *Subscriber) Handle(handler SubscriberHandler) { if s.Enabled(s, TraceLevel) { s.Trace("send audio frame", "seq", audioFrame.Sequence) } - res := ah.Call([]reflect.Value{reflect.ValueOf(audioFrame.Wrap)}) + res := ah.Call([]reflect.Value{reflect.ValueOf(audioFrame.Wraps[awi])}) if len(res) > 0 && !res[0].IsNil() { if err := res[0].Interface().(error); err != ErrInterrupt { s.Stop(err) @@ -122,9 +125,9 @@ func (s *Subscriber) Handle(handler SubscriberHandler) { } sendVideoFrame := func() (err error) { if s.Enabled(s, TraceLevel) { - s.Trace("send video frame", "seq", videoFrame.Sequence, "data", videoFrame.Wrap.String(), "size", videoFrame.Wrap.GetSize()) + s.Trace("send video frame", "seq", videoFrame.Sequence, "data", videoFrame.Wraps[vwi].String(), "size", videoFrame.Wraps[vwi].GetSize()) } - res := vh.Call([]reflect.Value{reflect.ValueOf(videoFrame.Wrap)}) + res := vh.Call([]reflect.Value{reflect.ValueOf(videoFrame.Wraps[vwi])}) if len(res) > 0 && !res[0].IsNil() { if err = res[0].Interface().(error); err != ErrInterrupt { s.Stop(err)