feat: rtmp add pb

This commit is contained in:
langhuihui
2024-04-18 20:40:19 +08:00
parent f4eab4cf51
commit 788923749e
33 changed files with 990 additions and 59 deletions

2
api.go
View File

@@ -5,7 +5,7 @@ import (
"google.golang.org/protobuf/types/known/emptypb" "google.golang.org/protobuf/types/known/emptypb"
"m7s.live/m7s/v5/pkg" "m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/pkg/pb" "m7s.live/m7s/v5/pb"
) )
func (s *Server) StreamSnap(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.StreamSnapShot, err error) { func (s *Server) StreamSnap(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.StreamSnapShot, err error) {

View File

@@ -1,5 +1,7 @@
global: global:
loglevel: debug loglevel: debug
tcp:
listenaddr: :50051
http: http:
listenaddr: :8081 listenaddr: :8081
listenaddrtls: :8555 listenaddrtls: :8555

View File

@@ -5,6 +5,7 @@ import (
"time" "time"
"m7s.live/m7s/v5" "m7s.live/m7s/v5"
_ "m7s.live/m7s/v5/plugin/debug"
_ "m7s.live/m7s/v5/plugin/hdl" _ "m7s.live/m7s/v5/plugin/hdl"
_ "m7s.live/m7s/v5/plugin/rtmp" _ "m7s.live/m7s/v5/plugin/rtmp"
) )

View File

@@ -472,9 +472,9 @@ var file_global_proto_rawDesc = []byte{
0x37, 0x73, 0x2e, 0x53, 0x74, 0x6f, 0x70, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x37, 0x73, 0x2e, 0x53, 0x74, 0x6f, 0x70, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65,
0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x23, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x1d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x23, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x1d,
0x22, 0x18, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x74, 0x6f, 0x70, 0x2f, 0x73, 0x75, 0x62, 0x73, 0x22, 0x18, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x74, 0x6f, 0x70, 0x2f, 0x73, 0x75, 0x62, 0x73,
0x63, 0x72, 0x69, 0x62, 0x65, 0x2f, 0x7b, 0x69, 0x64, 0x7d, 0x3a, 0x01, 0x2a, 0x42, 0x18, 0x5a, 0x63, 0x72, 0x69, 0x62, 0x65, 0x2f, 0x7b, 0x69, 0x64, 0x7d, 0x3a, 0x01, 0x2a, 0x42, 0x14, 0x5a,
0x16, 0x6d, 0x37, 0x73, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x2f, 0x6d, 0x37, 0x73, 0x2f, 0x76, 0x35, 0x12, 0x6d, 0x37, 0x73, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x2f, 0x6d, 0x37, 0x73, 0x2f, 0x76, 0x35,
0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
} }
var ( var (

View File

@@ -2,7 +2,7 @@ syntax = "proto3";
import "google/api/annotations.proto"; import "google/api/annotations.proto";
import "google/protobuf/empty.proto"; import "google/protobuf/empty.proto";
package m7s; package m7s;
option go_package="m7s.live/m7s/v5/pkg/pb"; option go_package="m7s.live/m7s/v5/pb";
service Global { service Global {
rpc Shutdown (RequestWithId) returns (google.protobuf.Empty) { rpc Shutdown (RequestWithId) returns (google.protobuf.Empty) {

45
pb/text_plain.go Normal file
View File

@@ -0,0 +1,45 @@
package pb
import (
"encoding/json"
"io"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
)
var _ runtime.Marshaler = (*TextPlain)(nil)
type TextPlain struct {
}
// ContentType implements runtime.Marshaler.
func (t *TextPlain) ContentType(v interface{}) string {
return "text/plain"
}
// Marshal implements runtime.Marshaler.
func (t *TextPlain) Marshal(v interface{}) ([]byte, error) {
return json.Marshal(v)
}
// NewDecoder implements runtime.Marshaler.
func (t *TextPlain) NewDecoder(r io.Reader) runtime.Decoder {
return runtime.DecoderFunc(func(v interface{}) error {
b, err := io.ReadAll(r)
*v.(*string) = string(b)
return err
})
}
// NewEncoder implements runtime.Marshaler.
func (t *TextPlain) NewEncoder(w io.Writer) runtime.Encoder {
return runtime.EncoderFunc(func(v interface{}) error {
_, err := w.Write([]byte(v.(string)))
return err
})
}
// Unmarshal implements runtime.Marshaler.
func (t *TextPlain) Unmarshal(data []byte, v interface{}) error {
return json.Unmarshal(data, v)
}

View File

@@ -66,7 +66,7 @@ func (r *AVRingReader) ReadFrame(mode int) (err error) {
if r.Track.IDRing != nil { if r.Track.IDRing != nil {
startRing = r.Track.IDRing startRing = r.Track.IDRing
} else { } else {
r.Warn("no IDRring") r.Warn("no IDRring", "track", r.Track.Codec.String())
} }
switch mode { switch mode {
case SUBMODE_REAL: case SUBMODE_REAL:

View File

@@ -5,7 +5,6 @@ import (
"crypto/subtle" "crypto/subtle"
"crypto/tls" "crypto/tls"
"net/http" "net/http"
"strings"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime" "github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
@@ -26,7 +25,8 @@ type HTTP struct {
ReadTimeout time.Duration `desc:"读取超时"` ReadTimeout time.Duration `desc:"读取超时"`
WriteTimeout time.Duration `desc:"写入超时"` WriteTimeout time.Duration `desc:"写入超时"`
IdleTimeout time.Duration `desc:"空闲超时"` IdleTimeout time.Duration `desc:"空闲超时"`
mux http.Handler mux *http.ServeMux
grpcMux *runtime.ServeMux
server *http.Server server *http.Server
serverTLS *http.Server serverTLS *http.Server
middlewares []Middleware middlewares []Middleware
@@ -38,15 +38,35 @@ type HTTPConfig interface {
// AddMiddleware(Middleware) // AddMiddleware(Middleware)
} }
func (config *HTTP) SetMux(mux http.Handler) { func (config *HTTP) GetHandler() http.Handler {
config.mux = mux if config.grpcMux != nil {
return config.grpcMux
}
return config.mux
}
func (config *HTTP) GetHttpMux() *http.ServeMux {
return config.mux
}
func (config *HTTP) GetGRPCMux() *runtime.ServeMux {
return config.grpcMux
}
func (config *HTTP) SetMux(mux *runtime.ServeMux) {
config.grpcMux = mux
handler := func(w http.ResponseWriter, r *http.Request, pathParams map[string]string) {
config.mux.ServeHTTP(w, r)
}
mux.HandlePath("GET", "/", handler)
mux.HandlePath("POST", "/", handler)
} }
func (config *HTTP) AddMiddleware(middleware Middleware) { func (config *HTTP) AddMiddleware(middleware Middleware) {
config.middlewares = append(config.middlewares, middleware) config.middlewares = append(config.middlewares, middleware)
} }
func (config *HTTP) Handle(path string, f http.Handler) { func (config *HTTP) Handle(path string, f http.Handler, last bool) {
if config.mux == nil { if config.mux == nil {
config.mux = http.NewServeMux() config.mux = http.NewServeMux()
} }
@@ -59,19 +79,7 @@ func (config *HTTP) Handle(path string, f http.Handler) {
for _, middleware := range config.middlewares { for _, middleware := range config.middlewares {
f = middleware(path, f) f = middleware(path, f)
} }
switch mux := config.mux.(type) { config.mux.Handle(path, f)
case *http.ServeMux:
mux.Handle(path, f)
case *runtime.ServeMux:
if strings.HasSuffix(path, "/") {
path += "{streamPath=**}"
}
handler := func(w http.ResponseWriter, r *http.Request, pathParams map[string]string) {
f.ServeHTTP(w, r)
}
mux.HandlePath("GET", path, handler)
mux.HandlePath("POST", path, handler)
}
} }
func (config *HTTP) GetHTTPConfig() *HTTP { func (config *HTTP) GetHTTPConfig() *HTTP {
@@ -97,7 +105,7 @@ func (config *HTTP) ListenTLS() error {
ReadTimeout: config.ReadTimeout, ReadTimeout: config.ReadTimeout,
WriteTimeout: config.WriteTimeout, WriteTimeout: config.WriteTimeout,
IdleTimeout: config.IdleTimeout, IdleTimeout: config.IdleTimeout,
Handler: config.mux, Handler: config.GetHandler(),
TLSConfig: &tls.Config{ TLSConfig: &tls.Config{
Certificates: []tls.Certificate{cer}, Certificates: []tls.Certificate{cer},
CipherSuites: []uint16{ CipherSuites: []uint16{
@@ -132,7 +140,7 @@ func (config *HTTP) Listen() error {
ReadTimeout: config.ReadTimeout, ReadTimeout: config.ReadTimeout,
WriteTimeout: config.WriteTimeout, WriteTimeout: config.WriteTimeout,
IdleTimeout: config.IdleTimeout, IdleTimeout: config.IdleTimeout,
Handler: config.mux, Handler: config.GetHandler(),
} }
return config.server.ListenAndServe() return config.server.ListenAndServe()
} }

66
pkg/util/buf-reader.go Normal file
View File

@@ -0,0 +1,66 @@
package util
import (
"io"
)
const defaultBufSize = 4096
type BufReader struct {
reader io.Reader
buf RecyclableBuffers
BufLen int
}
func NewBufReader(reader io.Reader) (r *BufReader) {
r = &BufReader{}
r.reader = reader
r.buf.ScalableMemoryAllocator = NewScalableMemoryAllocator(4096)
r.BufLen = defaultBufSize
return
}
func (r *BufReader) eat() error {
buf := r.buf.Malloc(r.BufLen)
if n, err := r.reader.Read(buf); err != nil {
return err
} else if n < r.BufLen {
r.buf.RecycleBack(r.BufLen - n)
}
return nil
}
func (r *BufReader) ReadByte() (byte, error) {
if r.buf.Length > 0 {
return r.buf.ReadByte()
}
err := r.eat()
if err != nil {
return 0, err
}
return r.buf.ReadByte()
}
func (r *BufReader) ReadBE(n int) (num int, err error) {
for i := range n {
b, err := r.ReadByte()
if err != nil {
return -1, err
}
num += int(b) << ((n - i - 1) << 3)
}
return
}
func (r *BufReader) ReadBytes(n int) (mem *RecyclableBuffers, err error) {
mem = &RecyclableBuffers{ScalableMemoryAllocator: r.buf.ScalableMemoryAllocator}
for r.buf.RecycleFront(); n > 0 && err == nil; err = r.eat() {
if r.buf.Length >= n {
mem.ReadFromBytes(r.buf.Buffers.Cut(n)...)
return
}
n -= r.buf.Length
mem.ReadFromBytes(r.buf.CutAll()...)
}
return
}

View File

@@ -0,0 +1,72 @@
package util
import (
"bytes"
"testing"
)
func TestBufRead(t *testing.T) {
t.Run(t.Name(), func(t *testing.T) {
var testData = []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20}
testReader := bytes.NewReader(testData)
reader := NewBufReader(testReader)
reader.BufLen = 5
b, err := reader.ReadByte()
if err != nil {
t.Error(err)
return
}
if b != 1 {
t.Error("byte read error")
return
}
num, err := reader.ReadBE(4)
if err != nil {
t.Error(err)
return
}
if num != 0x02030405 {
t.Error("read be error")
return
}
if reader.buf.Length != 0 {
t.Error("reader.buf.Length != 0")
return
}
b, err = reader.ReadByte()
if err != nil {
t.Error(err)
return
}
if b != 6 {
t.Error("byte read error")
return
}
mem, err := reader.ReadBytes(5)
if err != nil {
t.Error(err)
return
}
if len(mem.Buffers.Buffers) != 2 {
t.Error("len(mem.Buffers.Buffers) != 2")
return
}
if mem.Buffers.Buffers[0][0] != 7 {
t.Error("mem.Buffers.Buffers[0][0] != 7")
return
}
if mem.Buffers.Buffers[1][0] != 11 {
t.Error("mem.Buffers.Buffers[1][0] != 10")
return
}
b, err = reader.ReadByte()
if err != nil {
t.Error(err)
return
}
if b != 12 {
t.Error("byte read error")
return
}
})
}

View File

@@ -3,6 +3,7 @@ package util
import ( import (
"io" "io"
"net" "net"
"slices"
) )
type Buffers struct { type Buffers struct {
@@ -183,6 +184,120 @@ func (buffers *Buffers) ReadBE(n int) (num int, err error) {
return return
} }
func (buffers *Buffers) Consumes() (r net.Buffers) {
for i := range buffers.offset {
r = append(r, buffers.Buffers[i])
}
if buffers.curBufLen > 0 {
r = append(r, buffers.curBuf[:len(buffers.curBuf)-buffers.curBufLen])
}
return
}
func (buffers *Buffers) ClipFront() (r net.Buffers) {
if buffers.Offset == 0 {
return
}
if buffers.Length == 0 {
r = buffers.Buffers
buffers.Buffers = buffers.Buffers[:0]
buffers.curBuf = nil
buffers.curBufLen = 0
buffers.offset = 0
buffers.Offset = 0
return
}
for i := range buffers.offset {
r = append(r, buffers.Buffers[i])
l := len(buffers.Buffers[i])
buffers.Offset -= l
}
if buffers.curBufLen > 0 {
l := len(buffers.Buffers[buffers.offset]) - buffers.curBufLen
r = append(r, buffers.Buffers[buffers.offset][:l])
buffers.Offset -= l
}
buffers.Buffers = buffers.Buffers[buffers.offset:]
buffers.Buffers[0] = buffers.curBuf
buffers.offset = 0
buffers.Offset = 0
return r
}
func (buffers *Buffers) ClipBack(n int) []byte {
lastBuf := buffers.Buffers[len(buffers.Buffers)-1]
lastBufLen := len(lastBuf)
if lastBufLen < n {
panic("ClipBack: n > lastBufLen")
}
ret := lastBuf[lastBufLen-n:]
buffers.Buffers[len(buffers.Buffers)-1] = lastBuf[:lastBufLen-n]
buffers.Length -= n
if buffers.Length > 0 {
if buffers.offset == len(buffers.Buffers)-1 {
buffers.curBuf = buffers.curBuf[:buffers.curBufLen-n]
buffers.curBufLen -= n
}
} else {
buffers.curBuf = nil
buffers.curBufLen = 0
buffers.Length = 0
}
return ret
}
func (buffers *Buffers) CutAll() (r net.Buffers) {
r = append(r, buffers.curBuf)
for i := buffers.offset+1; i < len(buffers.Buffers); i++ {
r = append(r, buffers.Buffers[i])
}
if len(buffers.Buffers[buffers.offset]) == buffers.curBufLen {
buffers.Buffers = buffers.Buffers[:buffers.offset]
} else {
buffers.Buffers[buffers.offset] = buffers.Buffers[buffers.offset][:buffers.curBufLen]
buffers.offset++
}
buffers.Length = 0
buffers.curBuf = nil
buffers.curBufLen = 0
return
}
func (buffers *Buffers) Cut(n int) (r net.Buffers) {
buffers.CutTo(n, &r)
return
}
func (buffers *Buffers) CutTo(n int, result *net.Buffers) (actual int) {
for actual = n; buffers.Length > 0 && n > 0; {
if buffers.curBufLen > n {
*result = append(*result, buffers.curBuf[:n])
buffers.curBuf = buffers.curBuf[n:]
buffers.curBufLen -= n
buffers.Buffers[buffers.offset] = buffers.curBuf
buffers.Length -= n
return actual
}
*result = append(*result, buffers.curBuf)
n -= buffers.curBufLen
buffers.Length -= buffers.curBufLen
if len(buffers.Buffers[buffers.offset]) == buffers.curBufLen {
buffers.Buffers = slices.Delete(buffers.Buffers, buffers.offset, 1)
} else {
buffers.Buffers[buffers.offset] = buffers.Buffers[buffers.offset][:buffers.curBufLen]
buffers.offset++
}
if buffers.Length > 0 {
buffers.curBuf = buffers.Buffers[buffers.offset]
buffers.curBufLen = len(buffers.curBuf)
} else {
buffers.curBuf = nil
buffers.curBufLen = 0
}
}
return actual - n
}
func (buffers *Buffers) ToBytes() []byte { func (buffers *Buffers) ToBytes() []byte {
ret := make([]byte, buffers.Length) ret := make([]byte, buffers.Length)
buffers.Read(ret) buffers.Read(ret)

View File

@@ -156,3 +156,36 @@ func (r *RecyclableMemory) RecycleBack(n int) {
r.Free2(r.mem[l-3], start, *end) r.Free2(r.mem[l-3], start, *end)
*end = start *end = start
} }
type RecyclableBuffers struct {
*ScalableMemoryAllocator
Buffers
}
func (r *RecyclableBuffers) Malloc(size int) (memory []byte) {
memory = r.ScalableMemoryAllocator.Malloc(size)
r.Buffers.ReadFromBytes(memory)
return
}
func (r *RecyclableBuffers) Recycle() {
for _, buf := range r.Buffers.Buffers {
r.Free(buf)
}
}
func (r *RecyclableBuffers) RecycleBack(n int) {
r.Free(r.ClipBack(n))
}
func (r *RecyclableBuffers) RecycleFront() {
for _, buf := range r.Buffers.ClipFront() {
r.Free(buf)
}
}
func (r *RecyclableBuffers) Cut(n int) (child *RecyclableBuffers) {
child = &RecyclableBuffers{ScalableMemoryAllocator: r.ScalableMemoryAllocator}
child.ReadFromBytes(r.Buffers.Cut(n)...)
return
}

View File

@@ -10,7 +10,9 @@ import (
"runtime" "runtime"
"strings" "strings"
gatewayRuntime "github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/mcuadros/go-defaults" "github.com/mcuadros/go-defaults"
"google.golang.org/grpc"
"gopkg.in/yaml.v3" "gopkg.in/yaml.v3"
. "m7s.live/m7s/v5/pkg" . "m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/pkg/config" "m7s.live/m7s/v5/pkg/config"
@@ -19,10 +21,12 @@ import (
type DefaultYaml string type DefaultYaml string
type PluginMeta struct { type PluginMeta struct {
Name string Name string
Version string //插件版本 Version string //插件版本
Type reflect.Type Type reflect.Type
defaultYaml DefaultYaml //默认配置 defaultYaml DefaultYaml //默认配置
ServiceDesc *grpc.ServiceDesc
RegisterGRPCHandler func(context.Context, *gatewayRuntime.ServeMux, *grpc.ClientConn) error
} }
func (plugin *PluginMeta) Init(s *Server, userConfig map[string]any) { func (plugin *PluginMeta) Init(s *Server, userConfig map[string]any) {
@@ -78,6 +82,16 @@ func (plugin *PluginMeta) Init(s *Server, userConfig map[string]any) {
p.Stop(err) p.Stop(err)
return return
} }
if plugin.ServiceDesc != nil && s.grpcServer != nil {
s.grpcServer.RegisterService(plugin.ServiceDesc, instance)
if plugin.RegisterGRPCHandler != nil {
err = plugin.RegisterGRPCHandler(s.Context, s.config.HTTP.GetGRPCMux(), s.grpcClientConn)
if err != nil {
p.Error("init", "error", err)
p.Stop(err)
}
}
}
s.Plugins = append(s.Plugins, p) s.Plugins = append(s.Plugins, p)
p.Start() p.Start()
} }
@@ -117,6 +131,10 @@ func InstallPlugin[C iPlugin](options ...any) error {
switch v := option.(type) { switch v := option.(type) {
case DefaultYaml: case DefaultYaml:
meta.defaultYaml = v meta.defaultYaml = v
case *grpc.ServiceDesc:
meta.ServiceDesc = v
case func(context.Context, *gatewayRuntime.ServeMux, *grpc.ClientConn) error:
meta.RegisterGRPCHandler = v
} }
} }
plugins = append(plugins, meta) plugins = append(plugins, meta)
@@ -326,15 +344,16 @@ func (p *Plugin) handle(pattern string, handler http.Handler) {
if p == nil { if p == nil {
return return
} }
last := pattern == "/"
if !strings.HasPrefix(pattern, "/") { if !strings.HasPrefix(pattern, "/") {
pattern = "/" + pattern pattern = "/" + pattern
} }
handler = p.logHandler(handler) handler = p.logHandler(handler)
p.GetCommonConf().Handle(pattern, handler) p.config.HTTP.Handle(pattern, handler, last)
if p.server != p.handler { if p.server != p.handler {
pattern = "/" + strings.ToLower(p.Meta.Name) + pattern pattern = "/" + strings.ToLower(p.Meta.Name) + pattern
p.Debug("http handle added to server", "pattern", pattern) p.Debug("http handle added to server", "pattern", pattern)
p.server.GetCommonConf().Handle(pattern, handler) p.server.config.HTTP.Handle(pattern, handler, last)
} }
p.server.apiList = append(p.server.apiList, pattern) p.server.apiList = append(p.server.apiList, pattern)
} }

13
plugin/rtmp/api.go Normal file
View File

@@ -0,0 +1,13 @@
package plugin_rtmp
import (
"context"
"m7s.live/m7s/v5/plugin/rtmp/pb"
rtmp "m7s.live/m7s/v5/plugin/rtmp/pkg"
)
func (r *RTMPPlugin) PushOut(ctx context.Context, req *pb.PushRequest) (res *pb.PushResponse, err error) {
go r.Push(req.StreamPath, req.RemoteURL, &rtmp.Client{})
return &pb.PushResponse{}, nil
}

View File

@@ -6,17 +6,19 @@ import (
"net" "net"
"m7s.live/m7s/v5" "m7s.live/m7s/v5"
"m7s.live/m7s/v5/plugin/rtmp/pb"
. "m7s.live/m7s/v5/plugin/rtmp/pkg" . "m7s.live/m7s/v5/plugin/rtmp/pkg"
) )
type RTMPPlugin struct { type RTMPPlugin struct {
pb.UnimplementedRtmpServer
m7s.Plugin m7s.Plugin
ChunkSize int `default:"1024"` ChunkSize int `default:"1024"`
KeepAlive bool KeepAlive bool
} }
var _ = m7s.InstallPlugin[RTMPPlugin](m7s.DefaultYaml(`tcp: var _ = m7s.InstallPlugin[RTMPPlugin](m7s.DefaultYaml(`tcp:
listenaddr: :1935`)) listenaddr: :1935`), &pb.Rtmp_ServiceDesc, pb.RegisterRtmpHandler)
func (p *RTMPPlugin) OnInit() error { func (p *RTMPPlugin) OnInit() error {
for streamPath, url := range p.GetCommonConf().PullOnStart { for streamPath, url := range p.GetCommonConf().PullOnStart {

219
plugin/rtmp/pb/index.pb.go Normal file
View File

@@ -0,0 +1,219 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.28.1
// protoc v3.19.1
// source: index.proto
package pb
import (
_ "google.golang.org/genproto/googleapis/api/annotations"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
_ "google.golang.org/protobuf/types/known/emptypb"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type PushRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
StreamPath string `protobuf:"bytes,1,opt,name=streamPath,proto3" json:"streamPath,omitempty"`
RemoteURL string `protobuf:"bytes,2,opt,name=remoteURL,proto3" json:"remoteURL,omitempty"`
}
func (x *PushRequest) Reset() {
*x = PushRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_index_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *PushRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*PushRequest) ProtoMessage() {}
func (x *PushRequest) ProtoReflect() protoreflect.Message {
mi := &file_index_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use PushRequest.ProtoReflect.Descriptor instead.
func (*PushRequest) Descriptor() ([]byte, []int) {
return file_index_proto_rawDescGZIP(), []int{0}
}
func (x *PushRequest) GetStreamPath() string {
if x != nil {
return x.StreamPath
}
return ""
}
func (x *PushRequest) GetRemoteURL() string {
if x != nil {
return x.RemoteURL
}
return ""
}
type PushResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
}
func (x *PushResponse) Reset() {
*x = PushResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_index_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *PushResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*PushResponse) ProtoMessage() {}
func (x *PushResponse) ProtoReflect() protoreflect.Message {
mi := &file_index_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use PushResponse.ProtoReflect.Descriptor instead.
func (*PushResponse) Descriptor() ([]byte, []int) {
return file_index_proto_rawDescGZIP(), []int{1}
}
var File_index_proto protoreflect.FileDescriptor
var file_index_proto_rawDesc = []byte{
0x0a, 0x0b, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x03, 0x6d,
0x37, 0x73, 0x1a, 0x1c, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x61,
0x6e, 0x6e, 0x6f, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75,
0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x4b, 0x0a,
0x0b, 0x50, 0x75, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1e, 0x0a, 0x0a,
0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09,
0x52, 0x0a, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, 0x61, 0x74, 0x68, 0x12, 0x1c, 0x0a, 0x09,
0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x55, 0x52, 0x4c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52,
0x09, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x55, 0x52, 0x4c, 0x22, 0x0e, 0x0a, 0x0c, 0x50, 0x75,
0x73, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0x69, 0x0a, 0x04, 0x72, 0x74,
0x6d, 0x70, 0x12, 0x61, 0x0a, 0x07, 0x50, 0x75, 0x73, 0x68, 0x4f, 0x75, 0x74, 0x12, 0x10, 0x2e,
0x6d, 0x37, 0x73, 0x2e, 0x50, 0x75, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a,
0x11, 0x2e, 0x6d, 0x37, 0x73, 0x2e, 0x50, 0x75, 0x73, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e,
0x73, 0x65, 0x22, 0x31, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x2b, 0x22, 0x1e, 0x2f, 0x72, 0x74, 0x6d,
0x70, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x70, 0x75, 0x73, 0x68, 0x2f, 0x7b, 0x73, 0x74, 0x72, 0x65,
0x61, 0x6d, 0x50, 0x61, 0x74, 0x68, 0x3d, 0x2a, 0x2a, 0x7d, 0x3a, 0x09, 0x72, 0x65, 0x6d, 0x6f,
0x74, 0x65, 0x55, 0x52, 0x4c, 0x42, 0x20, 0x5a, 0x1e, 0x6d, 0x37, 0x73, 0x2e, 0x6c, 0x69, 0x76,
0x65, 0x2f, 0x6d, 0x37, 0x73, 0x2f, 0x76, 0x35, 0x2f, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2f,
0x72, 0x74, 0x6d, 0x70, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_index_proto_rawDescOnce sync.Once
file_index_proto_rawDescData = file_index_proto_rawDesc
)
func file_index_proto_rawDescGZIP() []byte {
file_index_proto_rawDescOnce.Do(func() {
file_index_proto_rawDescData = protoimpl.X.CompressGZIP(file_index_proto_rawDescData)
})
return file_index_proto_rawDescData
}
var file_index_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_index_proto_goTypes = []interface{}{
(*PushRequest)(nil), // 0: m7s.PushRequest
(*PushResponse)(nil), // 1: m7s.PushResponse
}
var file_index_proto_depIdxs = []int32{
0, // 0: m7s.rtmp.PushOut:input_type -> m7s.PushRequest
1, // 1: m7s.rtmp.PushOut:output_type -> m7s.PushResponse
1, // [1:2] is the sub-list for method output_type
0, // [0:1] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
}
func init() { file_index_proto_init() }
func file_index_proto_init() {
if File_index_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_index_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*PushRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_index_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*PushResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_index_proto_rawDesc,
NumEnums: 0,
NumMessages: 2,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_index_proto_goTypes,
DependencyIndexes: file_index_proto_depIdxs,
MessageInfos: file_index_proto_msgTypes,
}.Build()
File_index_proto = out.File
file_index_proto_rawDesc = nil
file_index_proto_goTypes = nil
file_index_proto_depIdxs = nil
}

View File

@@ -0,0 +1,197 @@
// Code generated by protoc-gen-grpc-gateway. DO NOT EDIT.
// source: index.proto
/*
Package pb is a reverse proxy.
It translates gRPC into RESTful JSON APIs.
*/
package pb
import (
"context"
"io"
"net/http"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/grpc-ecosystem/grpc-gateway/v2/utilities"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
)
// Suppress "imported and not used" errors
var _ codes.Code
var _ io.Reader
var _ status.Status
var _ = runtime.String
var _ = utilities.NewDoubleArray
var _ = metadata.Join
func request_Rtmp_PushOut_0(ctx context.Context, marshaler runtime.Marshaler, client RtmpClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq PushRequest
var metadata runtime.ServerMetadata
if err := marshaler.NewDecoder(req.Body).Decode(&protoReq.RemoteURL); err != nil && err != io.EOF {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
var (
val string
ok bool
err error
_ = err
)
val, ok = pathParams["streamPath"]
if !ok {
return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "streamPath")
}
protoReq.StreamPath, err = runtime.String(val)
if err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "streamPath", err)
}
msg, err := client.PushOut(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func local_request_Rtmp_PushOut_0(ctx context.Context, marshaler runtime.Marshaler, server RtmpServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq PushRequest
var metadata runtime.ServerMetadata
if err := marshaler.NewDecoder(req.Body).Decode(&protoReq.RemoteURL); err != nil && err != io.EOF {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
var (
val string
ok bool
err error
_ = err
)
val, ok = pathParams["streamPath"]
if !ok {
return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "streamPath")
}
protoReq.StreamPath, err = runtime.String(val)
if err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "streamPath", err)
}
msg, err := server.PushOut(ctx, &protoReq)
return msg, metadata, err
}
// RegisterRtmpHandlerServer registers the http handlers for service Rtmp to "mux".
// UnaryRPC :call RtmpServer directly.
// StreamingRPC :currently unsupported pending https://github.com/grpc/grpc-go/issues/906.
// Note that using this registration option will cause many gRPC library features to stop working. Consider using RegisterRtmpHandlerFromEndpoint instead.
func RegisterRtmpHandlerServer(ctx context.Context, mux *runtime.ServeMux, server RtmpServer) error {
mux.Handle("POST", pattern_Rtmp_PushOut_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
var stream runtime.ServerTransportStream
ctx = grpc.NewContextWithServerTransportStream(ctx, &stream)
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/m7s.Rtmp/PushOut", runtime.WithHTTPPathPattern("/rtmp/api/push/{streamPath=**}"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := local_request_Rtmp_PushOut_0(annotatedContext, inboundMarshaler, server, req, pathParams)
md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer())
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Rtmp_PushOut_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
return nil
}
// RegisterRtmpHandlerFromEndpoint is same as RegisterRtmpHandler but
// automatically dials to "endpoint" and closes the connection when "ctx" gets done.
func RegisterRtmpHandlerFromEndpoint(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) (err error) {
conn, err := grpc.DialContext(ctx, endpoint, opts...)
if err != nil {
return err
}
defer func() {
if err != nil {
if cerr := conn.Close(); cerr != nil {
grpclog.Infof("Failed to close conn to %s: %v", endpoint, cerr)
}
return
}
go func() {
<-ctx.Done()
if cerr := conn.Close(); cerr != nil {
grpclog.Infof("Failed to close conn to %s: %v", endpoint, cerr)
}
}()
}()
return RegisterRtmpHandler(ctx, mux, conn)
}
// RegisterRtmpHandler registers the http handlers for service Rtmp to "mux".
// The handlers forward requests to the grpc endpoint over "conn".
func RegisterRtmpHandler(ctx context.Context, mux *runtime.ServeMux, conn *grpc.ClientConn) error {
return RegisterRtmpHandlerClient(ctx, mux, NewRtmpClient(conn))
}
// RegisterRtmpHandlerClient registers the http handlers for service Rtmp
// to "mux". The handlers forward requests to the grpc endpoint over the given implementation of "RtmpClient".
// Note: the gRPC framework executes interceptors within the gRPC handler. If the passed in "RtmpClient"
// doesn't go through the normal gRPC flow (creating a gRPC client etc.) then it will be up to the passed in
// "RtmpClient" to call the correct interceptors.
func RegisterRtmpHandlerClient(ctx context.Context, mux *runtime.ServeMux, client RtmpClient) error {
mux.Handle("POST", pattern_Rtmp_PushOut_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/m7s.Rtmp/PushOut", runtime.WithHTTPPathPattern("/rtmp/api/push/{streamPath=**}"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_Rtmp_PushOut_0(annotatedContext, inboundMarshaler, client, req, pathParams)
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Rtmp_PushOut_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
return nil
}
var (
pattern_Rtmp_PushOut_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 3, 0, 4, 1, 5, 3}, []string{"rtmp", "api", "push", "streamPath"}, ""))
)
var (
forward_Rtmp_PushOut_0 = runtime.ForwardResponseMessage
)

View File

@@ -0,0 +1,23 @@
syntax = "proto3";
import "google/api/annotations.proto";
import "google/protobuf/empty.proto";
package m7s;
option go_package="m7s.live/m7s/v5/plugin/rtmp/pb";
service rtmp {
rpc PushOut (PushRequest) returns (PushResponse) {
option (google.api.http) = {
post: "/rtmp/api/push/{streamPath=**}"
body: "remoteURL"
};
}
}
message PushRequest {
string streamPath = 1;
string remoteURL = 2;
}
message PushResponse {
}

View File

@@ -0,0 +1,105 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v3.19.1
// source: index.proto
package pb
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// RtmpClient is the client API for Rtmp service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type RtmpClient interface {
PushOut(ctx context.Context, in *PushRequest, opts ...grpc.CallOption) (*PushResponse, error)
}
type rtmpClient struct {
cc grpc.ClientConnInterface
}
func NewRtmpClient(cc grpc.ClientConnInterface) RtmpClient {
return &rtmpClient{cc}
}
func (c *rtmpClient) PushOut(ctx context.Context, in *PushRequest, opts ...grpc.CallOption) (*PushResponse, error) {
out := new(PushResponse)
err := c.cc.Invoke(ctx, "/m7s.rtmp/PushOut", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// RtmpServer is the server API for Rtmp service.
// All implementations must embed UnimplementedRtmpServer
// for forward compatibility
type RtmpServer interface {
PushOut(context.Context, *PushRequest) (*PushResponse, error)
mustEmbedUnimplementedRtmpServer()
}
// UnimplementedRtmpServer must be embedded to have forward compatible implementations.
type UnimplementedRtmpServer struct {
}
func (UnimplementedRtmpServer) PushOut(context.Context, *PushRequest) (*PushResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method PushOut not implemented")
}
func (UnimplementedRtmpServer) mustEmbedUnimplementedRtmpServer() {}
// UnsafeRtmpServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to RtmpServer will
// result in compilation errors.
type UnsafeRtmpServer interface {
mustEmbedUnimplementedRtmpServer()
}
func RegisterRtmpServer(s grpc.ServiceRegistrar, srv RtmpServer) {
s.RegisterService(&Rtmp_ServiceDesc, srv)
}
func _Rtmp_PushOut_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(PushRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(RtmpServer).PushOut(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/m7s.rtmp/PushOut",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(RtmpServer).PushOut(ctx, req.(*PushRequest))
}
return interceptor(ctx, in, info, handler)
}
// Rtmp_ServiceDesc is the grpc.ServiceDesc for Rtmp service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var Rtmp_ServiceDesc = grpc.ServiceDesc{
ServiceName: "m7s.rtmp",
HandlerType: (*RtmpServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "PushOut",
Handler: _Rtmp_PushOut_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "index.proto",
}

View File

@@ -7,7 +7,7 @@ import (
. "m7s.live/m7s/v5/pkg" . "m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/pkg/config" "m7s.live/m7s/v5/pkg/config"
"m7s.live/m7s/v5/pkg/pb" "m7s.live/m7s/v5/pb"
) )
type PublisherState int type PublisherState int

View File

@@ -21,9 +21,9 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"
"gopkg.in/yaml.v3" "gopkg.in/yaml.v3"
"m7s.live/m7s/v5/pb"
. "m7s.live/m7s/v5/pkg" . "m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/pkg/config" "m7s.live/m7s/v5/pkg/config"
"m7s.live/m7s/v5/pkg/pb"
"m7s.live/m7s/v5/pkg/util" "m7s.live/m7s/v5/pkg/util"
) )
@@ -45,18 +45,19 @@ type Server struct {
pb.UnimplementedGlobalServer pb.UnimplementedGlobalServer
Plugin Plugin
config.Engine config.Engine
ID int ID int
eventChan chan any eventChan chan any
Plugins []*Plugin Plugins []*Plugin
Streams util.Collection[string, *Publisher] Streams util.Collection[string, *Publisher]
Pulls util.Collection[string, *Puller] Pulls util.Collection[string, *Puller]
Pushs util.Collection[string, *Pusher] Pushs util.Collection[string, *Pusher]
Waiting map[string][]*Subscriber Waiting map[string][]*Subscriber
Subscribers util.Collection[int, *Subscriber] Subscribers util.Collection[int, *Subscriber]
pidG int pidG int
sidG int sidG int
apiList []string apiList []string
grpcServer *grpc.Server grpcServer *grpc.Server
grpcClientConn *grpc.ClientConn
} }
func NewServer() (s *Server) { func NewServer() (s *Server) {
@@ -104,7 +105,9 @@ func (s *Server) Run(ctx context.Context, conf any) (err error) {
} }
func (s *Server) run(ctx context.Context, conf any) (err error) { func (s *Server) run(ctx context.Context, conf any) (err error) {
mux := runtime.NewServeMux() mux := runtime.NewServeMux(runtime.WithMarshalerOption("text/plain", &pb.TextPlain{}), runtime.WithRoutingErrorHandler(runtime.RoutingErrorHandlerFunc(func(_ context.Context, _ *runtime.ServeMux, _ runtime.Marshaler, w http.ResponseWriter, r *http.Request, _ int) {
s.config.HTTP.GetHttpMux().ServeHTTP(w, r)
})))
httpConf, tcpConf := &s.config.HTTP, &s.config.TCP httpConf, tcpConf := &s.config.HTTP, &s.config.TCP
httpConf.SetMux(mux) httpConf.SetMux(mux)
s.Context, s.CancelCauseFunc = context.WithCancelCause(ctx) s.Context, s.CancelCauseFunc = context.WithCancelCause(ctx)
@@ -144,7 +147,6 @@ func (s *Server) run(ctx context.Context, conf any) (err error) {
s.Logger = slog.New( s.Logger = slog.New(
console.NewHandler(os.Stdout, &console.HandlerOptions{Level: lv.Level()}), console.NewHandler(os.Stdout, &console.HandlerOptions{Level: lv.Level()}),
) )
// slog.SetLogLoggerLevel(lv.Level())
s.registerHandler() s.registerHandler()
if httpConf.ListenAddrTLS != "" { if httpConf.ListenAddrTLS != "" {
@@ -165,31 +167,40 @@ func (s *Server) run(ctx context.Context, conf any) (err error) {
s.Info("http stop listen at ", "addr", addr) s.Info("http stop listen at ", "addr", addr)
}(httpConf.ListenAddr) }(httpConf.ListenAddr)
} }
var tcplis net.Listener
if tcpConf.ListenAddr != "" { if tcpConf.ListenAddr != "" {
var opts []grpc.ServerOption var opts []grpc.ServerOption
s.grpcServer = grpc.NewServer(opts...) s.grpcServer = grpc.NewServer(opts...)
pb.RegisterGlobalServer(s.grpcServer, s) pb.RegisterGlobalServer(s.grpcServer, s)
gwopts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
if err = pb.RegisterGlobalHandlerFromEndpoint(ctx, mux, tcpConf.ListenAddr, gwopts); err != nil { s.grpcClientConn, err = grpc.DialContext(ctx, tcpConf.ListenAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
s.Error("failed to dial", "error", err)
return err
}
defer s.grpcClientConn.Close()
if err = pb.RegisterGlobalHandler(ctx, mux, s.grpcClientConn); err != nil {
s.Error("register handler faild", "error", err) s.Error("register handler faild", "error", err)
return err return err
} }
lis, err := net.Listen("tcp", tcpConf.ListenAddr) tcplis, err = net.Listen("tcp", tcpConf.ListenAddr)
if err != nil { if err != nil {
s.Error("failed to listen", "error", err) s.Error("failed to listen", "error", err)
return err return err
} }
defer lis.Close() defer tcplis.Close()
}
for _, plugin := range plugins {
plugin.Init(s, cg[strings.ToLower(plugin.Name)])
}
if tcplis != nil {
go func(addr string) { go func(addr string) {
if err := s.grpcServer.Serve(lis); err != nil { if err := s.grpcServer.Serve(tcplis); err != nil {
s.Stop(err) s.Stop(err)
} }
s.Info("grpc stop listen at ", "addr", addr) s.Info("grpc stop listen at ", "addr", addr)
}(tcpConf.ListenAddr) }(tcpConf.ListenAddr)
} }
for _, plugin := range plugins {
plugin.Init(s, cg[strings.ToLower(plugin.Name)])
}
s.eventLoop() s.eventLoop()
err = context.Cause(s) err = context.Cause(s)
s.Warn("Server is done", "reason", err) s.Warn("Server is done", "reason", err)

View File

@@ -106,7 +106,7 @@ func (s *Subscriber) Handle(handler SubscriberHandler) {
}() }()
sendAudioFrame := func() (err error) { sendAudioFrame := func() (err error) {
lastSentAF = audioFrame lastSentAF = audioFrame
s.Debug("send audio frame", "seq", audioFrame.Sequence) s.Trace("send audio frame", "seq", audioFrame.Sequence)
res := ah.Call([]reflect.Value{reflect.ValueOf(audioFrame.Wrap)}) res := ah.Call([]reflect.Value{reflect.ValueOf(audioFrame.Wrap)})
if len(res) > 0 && !res[0].IsNil() { if len(res) > 0 && !res[0].IsNil() {
if err := res[0].Interface().(error); err != ErrInterrupt { if err := res[0].Interface().(error); err != ErrInterrupt {
@@ -117,7 +117,7 @@ func (s *Subscriber) Handle(handler SubscriberHandler) {
} }
sendVideoFrame := func() (err error) { sendVideoFrame := func() (err error) {
lastSentVF = videoFrame lastSentVF = videoFrame
s.Debug("send video frame", "seq", videoFrame.Sequence, "data", videoFrame.Wrap.String(), "size", videoFrame.Wrap.GetSize()) s.Trace("send video frame", "seq", videoFrame.Sequence, "data", videoFrame.Wrap.String(), "size", videoFrame.Wrap.GetSize())
res := vh.Call([]reflect.Value{reflect.ValueOf(videoFrame.Wrap)}) res := vh.Call([]reflect.Value{reflect.ValueOf(videoFrame.Wrap)})
if len(res) > 0 && !res[0].IsNil() { if len(res) > 0 && !res[0].IsNil() {
if err = res[0].Interface().(error); err != ErrInterrupt { if err = res[0].Interface().(error); err != ErrInterrupt {