feat: plugin can use diffent loglevel

This commit is contained in:
langhuihui
2024-06-28 17:47:29 +08:00
parent 0e28086d02
commit bbe1d785ce
20 changed files with 869 additions and 374 deletions

View File

@@ -16,8 +16,8 @@ rtsp:
subscribe:
subaudio: false
rtmp:
tcp:
listenaddr: :11935
# tcp:
# listenaddr: :11935
publish:
# idletimeout: 10s
# closedelaytimeout: 4s

View File

@@ -1,5 +1,5 @@
global:
loglevel: info
loglevel: trace
tcp:
listenaddr: :50051
hdl:

View File

@@ -0,0 +1,5 @@
global:
loglevel: info
tcp:
listenaddr: :50050

View File

@@ -0,0 +1,20 @@
global:
tcp:
listenaddr: :50051
http:
listenaddr: :8081
listenaddrtls: :8555
rtsp:
loglevel: trace
tcp:
listenaddr:
push:
pushlist:
live/test: rtsp://localhost/live/test
hdl:
publish:
pubaudio: false
pull:
pullonstart:
live/test: /Users/dexter/Movies/jb-demo.flv

24
example/rtsp-push/main.go Normal file
View File

@@ -0,0 +1,24 @@
package main
import (
"context"
"flag"
"time"
"m7s.live/m7s/v5"
_ "m7s.live/m7s/v5/plugin/debug"
_ "m7s.live/m7s/v5/plugin/hdl"
_ "m7s.live/m7s/v5/plugin/rtsp"
)
func main() {
ctx := context.Background()
var multi bool
flag.BoolVar(&multi, "multi", false, "debug")
flag.Parse()
if multi {
go m7s.Run(ctx, "config1.yaml")
}
time.Sleep(time.Second)
m7s.NewServer().Run(ctx, "config2.yaml")
}

View File

@@ -32,6 +32,7 @@ type Config struct {
var durationType = reflect.TypeOf(time.Duration(0))
var regexpType = reflect.TypeOf(Regexp{})
var regexpYaml = regexp.MustCompile(`^(.+: )"(.+)"$`)
func (config *Config) Range(f func(key string, value Config)) {
if m, ok := config.GetValue().(map[string]Config); ok {
@@ -325,7 +326,15 @@ func (config *Config) assign(k string, v any) (target reflect.Value) {
},
})
tmpValue := reflect.New(tmpStruct)
yaml.Unmarshal([]byte(fmt.Sprintf("%s: %v", k, v)), tmpValue.Interface())
if v != nil {
var out []byte
if vv, ok := v.(string); ok {
out = []byte(fmt.Sprintf("%s: %s", k, vv))
} else {
out, _ = yaml.Marshal(map[string]any{k: v})
}
_ = yaml.Unmarshal(out, tmpValue.Interface())
}
target = tmpValue.Elem().Field(0)
}
return

View File

@@ -24,6 +24,7 @@ type PushConfig interface {
}
type Publish struct {
MaxCount int `default:"0" desc:"最大发布者数量"` // 最大发布者数量
PubAudio bool `default:"true" desc:"是否发布音频"`
PubVideo bool `default:"true" desc:"是否发布视频"`
KickExist bool `desc:"是否踢掉已经存在的发布者"` // 是否踢掉已经存在的发布者
@@ -44,6 +45,7 @@ func (c *Publish) GetPublishConfig() *Publish {
}
type Subscribe struct {
MaxCount int `default:"0" desc:"最大订阅者数量"` // 最大订阅者数量
SubAudio bool `default:"true" desc:"是否订阅音频"`
SubVideo bool `default:"true" desc:"是否订阅视频"`
BufferTime time.Duration `desc:"缓冲时长,从缓冲时长的关键帧开始播放"`
@@ -163,9 +165,7 @@ type Console struct {
type Engine struct {
EnableSubEvent bool `default:"true" desc:"启用订阅事件,禁用可以提高性能"` //启用订阅事件,禁用可以提高性能
EnableAuth bool `default:"true" desc:"启用鉴权"` //启用鉴权
LogLang string `default:"zh" desc:"日志语言" enum:"zh:中文,en:英文"` //日志语言
LogLevel string `default:"info" enum:"trace:跟踪,debug:调试,info:信息,warn:警告,error:错误"` //日志级别
SettingDir string `default:".m7s" desc:""`
EventBusSize int `default:"10" desc:"事件总线大小"` //事件总线大小
PulseInterval time.Duration `default:"5s" desc:"心跳事件间隔"` //心跳事件间隔
@@ -175,6 +175,8 @@ type Engine struct {
type Common struct {
PublicIP string
LogLevel string `default:"info" enum:"trace:跟踪,debug:调试,info:信息,warn:警告,error:错误"` //日志级别
EnableAuth bool `desc:"启用鉴权"` //启用鉴权
Publish
Subscribe
HTTP

View File

@@ -10,6 +10,7 @@ var _ slog.Handler = (*MultiLogHandler)(nil)
type MultiLogHandler struct {
handlers []slog.Handler
parentLevel *slog.Level
level *slog.Level
}
@@ -33,8 +34,11 @@ func (m *MultiLogHandler) SetLevel(level slog.Level) {
// Enabled implements slog.Handler.
func (m *MultiLogHandler) Enabled(_ context.Context, l slog.Level) bool {
if m.level != nil {
return l >= *m.level
}
return l >= *m.parentLevel
}
// Handle implements slog.Handler.
func (m *MultiLogHandler) Handle(ctx context.Context, rec slog.Record) error {
@@ -50,7 +54,10 @@ func (m *MultiLogHandler) Handle(ctx context.Context, rec slog.Record) error {
func (m *MultiLogHandler) WithAttrs(attrs []slog.Attr) slog.Handler {
result := &MultiLogHandler{
handlers: make([]slog.Handler, len(m.handlers)),
level: m.level,
parentLevel: m.parentLevel,
}
if m.level != nil {
result.parentLevel = m.level
}
for i, h := range m.handlers {
result.handlers[i] = h.WithAttrs(attrs)
@@ -62,7 +69,10 @@ func (m *MultiLogHandler) WithAttrs(attrs []slog.Attr) slog.Handler {
func (m *MultiLogHandler) WithGroup(name string) slog.Handler {
result := &MultiLogHandler{
handlers: make([]slog.Handler, len(m.handlers)),
level: m.level,
parentLevel: m.parentLevel,
}
if m.level != nil {
result.parentLevel = m.level
}
for i, h := range m.handlers {
result.handlers[i] = h.WithGroup(name)

View File

@@ -2,6 +2,7 @@ package m7s
import (
"context"
"log/slog"
"net"
"net/http"
"os"
@@ -40,14 +41,14 @@ func (plugin *PluginMeta) Init(s *Server, userConfig map[string]any) {
p.server = s
p.Logger = s.Logger.With("plugin", plugin.Name)
p.Context, p.CancelCauseFunc = context.WithCancelCause(s.Context)
if os.Getenv(strings.ToUpper(plugin.Name)+"_ENABLE") == "false" {
upperName := strings.ToUpper(plugin.Name)
if os.Getenv(upperName+"_ENABLE") == "false" {
p.Disabled = true
p.Warn("disabled by env")
return
}
p.Config.Parse(p.GetCommonConf())
p.Config.Parse(instance, strings.ToUpper(plugin.Name))
p.Config.Parse(p.GetCommonConf(), upperName)
p.Config.Parse(instance, upperName)
for _, fname := range MergeConfigs {
if name := strings.ToLower(fname); p.Config.Has(name) {
p.Config.Get(name).ParseGlobal(s.Config.Get(name))
@@ -63,6 +64,12 @@ func (plugin *PluginMeta) Init(s *Server, userConfig map[string]any) {
}
p.Config.ParseUserFile(userConfig)
finalConfig, _ := yaml.Marshal(p.Config.GetMap())
var lv slog.LevelVar
_ = lv.UnmarshalText([]byte(p.config.LogLevel))
if p.config.LogLevel == "trace" {
lv.Set(TraceLevel)
}
p.Logger.Handler().(*MultiLogHandler).SetLevel(lv.Level())
p.Debug("config", "detail", string(finalConfig))
if s.DisableAll {
p.Disabled = true
@@ -274,7 +281,7 @@ func (p *Plugin) OnTCPConnect(conn *net.TCPConn) {
func (p *Plugin) Publish(streamPath string, options ...any) (publisher *Publisher, err error) {
publisher = &Publisher{Publish: p.config.Publish}
if p.server.EnableAuth {
if p.config.EnableAuth {
if onAuthPub, ok := p.server.OnAuthPubs[p.Meta.Name]; ok {
authPromise := util.NewPromise(publisher)
onAuthPub(authPromise)
@@ -320,7 +327,7 @@ func (p *Plugin) Pull(streamPath string, url string, options ...any) (puller *Pu
func (p *Plugin) Subscribe(streamPath string, options ...any) (subscriber *Subscriber, err error) {
subscriber = &Subscriber{Subscribe: p.config.Subscribe}
if p.server.EnableAuth {
if p.config.EnableAuth {
if onAuthSub, ok := p.server.OnAuthSubs[p.Meta.Name]; ok {
authPromise := util.NewPromise(subscriber)
onAuthSub(authPromise)

View File

@@ -17,6 +17,9 @@ type HDLPlugin struct {
m7s.Plugin
}
const defaultConfig m7s.DefaultYaml = `publish:
speed: 1`
func (p *HDLPlugin) OnInit() error {
for streamPath, url := range p.GetCommonConf().PullOnStart {
go p.Pull(streamPath, url, NewHDLPuller())
@@ -24,7 +27,7 @@ func (p *HDLPlugin) OnInit() error {
return nil
}
var _ = m7s.InstallPlugin[HDLPlugin]()
var _ = m7s.InstallPlugin[HDLPlugin](defaultConfig)
func (p *HDLPlugin) WriteFlvHeader(sub *m7s.Subscriber) (flv net.Buffers) {
at, vt := &sub.Publisher.AudioTrack, &sub.Publisher.VideoTrack

View File

@@ -23,7 +23,7 @@ type HDLPuller struct {
func NewHDLPuller() *HDLPuller {
return &HDLPuller{
ScalableMemoryAllocator: util.NewScalableMemoryAllocator(1024),
ScalableMemoryAllocator: util.NewScalableMemoryAllocator(1 << 10),
}
}

View File

@@ -214,9 +214,29 @@ func (avcc *RTMPVideo) ToRaw(codecCtx ICodecCtx) (any, error) {
if err = reader.Skip(3); err != nil {
return nil, err
}
// if _, err = avcc.DecodeConfig(nil); err != nil {
// return nil, err
// }
var nalus Nalus
if codecCtx.FourCC() == codec.FourCC_H265 {
var ctx = codecCtx.(*H265Ctx)
var spsM util.Memory
spsM.Append(ctx.SPS[0])
var ppsM util.Memory
ppsM.Append(ctx.PPS[0])
var vpsM util.Memory
vpsM.Append(ctx.VPS[0])
nalus.PTS = time.Duration(avcc.Timestamp) * 90
nalus.DTS = time.Duration(avcc.Timestamp) * 90
nalus.Nalus = append(nalus.Nalus, spsM, ppsM, vpsM)
} else {
var ctx = codecCtx.(*H264Ctx)
var spsM util.Memory
spsM.Append(ctx.SPS[0])
var ppsM util.Memory
ppsM.Append(ctx.PPS[0])
nalus.PTS = time.Duration(avcc.Timestamp) * 90
nalus.DTS = time.Duration(avcc.Timestamp) * 90
nalus.Nalus = append(nalus.Nalus, spsM, ppsM)
}
return nalus, nil
} else {
if codecCtx.FourCC() == codec.FourCC_H265 {
return avcc.parseH265(codecCtx.(*H265Ctx), reader)

View File

@@ -1,24 +1,17 @@
package plugin_rtsp
import (
"encoding/binary"
"errors"
"fmt"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/pion/webrtc/v4"
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/pkg/util"
mrtp "m7s.live/m7s/v5/plugin/rtp/pkg"
. "m7s.live/m7s/v5/plugin/rtsp/pkg"
"net"
"net/http"
"reflect"
"runtime/debug"
"strconv"
"strings"
"time"
)
const defaultConfig = m7s.DefaultYaml(`tcp:
@@ -30,6 +23,23 @@ type RTSPPlugin struct {
m7s.Plugin
}
func (p *RTSPPlugin) OnInit() error {
for streamPath, url := range p.GetCommonConf().PullOnStart {
go p.Pull(streamPath, url, &Client{})
}
return nil
}
func (p *RTSPPlugin) OnPull(puller *m7s.Puller) {
p.OnPublish(&puller.Publisher)
}
func (p *RTSPPlugin) OnPublish(puber *m7s.Publisher) {
if remoteURL, ok := p.GetCommonConf().PushList[puber.StreamPath]; ok {
go p.Push(puber.StreamPath, remoteURL, &Client{})
}
}
func (p *RTSPPlugin) OnTCPConnect(conn *net.TCPConn) {
logger := p.Logger.With("remote", conn.RemoteAddr().String())
var receiver *Receiver
@@ -47,10 +57,7 @@ func (p *RTSPPlugin) OnTCPConnect(conn *net.TCPConn) {
}
}()
var req *util.Request
var timeout time.Duration
var sendMode bool
mem := util.NewScalableMemoryAllocator(1 << 12)
defer mem.Recycle()
for {
req, err = nc.ReadRequest()
if err != nil {
@@ -97,8 +104,8 @@ func (p *RTSPPlugin) OnTCPConnect(conn *net.TCPConn) {
}
nc.SDP = string(req.Body) // for info
if nc.Medias, err = UnmarshalSDP(req.Body); err != nil {
var medias []*core.Media
if medias, err = UnmarshalSDP(req.Body); err != nil {
return
}
@@ -111,37 +118,9 @@ func (p *RTSPPlugin) OnTCPConnect(conn *net.TCPConn) {
})
return
}
for i, media := range nc.Medias {
if codec := media.Codecs[0]; codec.IsAudio() {
receiver.AudioCodecParameters = &webrtc.RTPCodecParameters{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: "audio/" + codec.Name,
ClockRate: codec.ClockRate,
Channels: codec.Channels,
SDPFmtpLine: codec.FmtpLine,
RTCPFeedback: nil,
},
PayloadType: webrtc.PayloadType(codec.PayloadType),
if err = receiver.SetMedia(medias); err != nil {
return
}
receiver.AudioChannelID = byte(i) << 1
} else if codec.IsVideo() {
receiver.VideoChannelID = byte(i) << 1
receiver.VideoCodecParameters = &webrtc.RTPCodecParameters{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: "video/" + codec.Name,
ClockRate: codec.ClockRate,
Channels: codec.Channels,
SDPFmtpLine: codec.FmtpLine,
RTCPFeedback: nil,
},
PayloadType: webrtc.PayloadType(codec.PayloadType),
}
}
}
timeout = time.Second * 15
res := &util.Response{Request: req}
if err = nc.WriteResponse(res); err != nil {
return
@@ -149,7 +128,6 @@ func (p *RTSPPlugin) OnTCPConnect(conn *net.TCPConn) {
case MethodDescribe:
sendMode = true
timeout = time.Second * 60
var subscriber *m7s.Subscriber
subscriber, err = p.Subscribe(strings.TrimPrefix(nc.URL.Path, "/"), conn)
@@ -174,53 +152,10 @@ func (p *RTSPPlugin) OnTCPConnect(conn *net.TCPConn) {
sender.NetConnection = nc
// convert tracks to real output medias
var medias []*core.Media
if subscriber.SubAudio && subscriber.Publisher.PubAudio {
audioTrack := subscriber.Publisher.GetAudioTrack(reflect.TypeOf((*mrtp.RTPAudio)(nil)))
if err = audioTrack.WaitReady(); err != nil {
if medias, err = sender.GetMedia(); err != nil {
return
}
parameter := audioTrack.ICodecCtx.(mrtp.IRTPCtx).GetRTPCodecParameter()
media := &core.Media{
Kind: "audio",
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{{
Name: parameter.MimeType[6:],
ClockRate: parameter.ClockRate,
Channels: parameter.Channels,
FmtpLine: parameter.SDPFmtpLine,
PayloadType: uint8(parameter.PayloadType),
}},
ID: fmt.Sprintf("trackID=%d", len(medias)),
}
medias = append(medias, media)
sender.AudioChannelID = 0
}
if subscriber.SubVideo && subscriber.Publisher.PubVideo {
videoTrack := subscriber.Publisher.GetVideoTrack(reflect.TypeOf((*mrtp.RTPVideo)(nil)))
if err = videoTrack.WaitReady(); err != nil {
return
}
parameter := videoTrack.ICodecCtx.(mrtp.IRTPCtx).GetRTPCodecParameter()
c := core.Codec{
Name: parameter.MimeType[6:],
ClockRate: parameter.ClockRate,
Channels: parameter.Channels,
FmtpLine: parameter.SDPFmtpLine,
PayloadType: uint8(parameter.PayloadType),
}
media := &core.Media{
Kind: "video",
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{&c},
ID: fmt.Sprintf("trackID=%d", len(medias)),
}
sender.VideoChannelID = byte(len(medias)) << 1
medias = append(medias, media)
}
res.Body, err = core.MarshalSDP(nc.SessionName, medias)
if err != nil {
if res.Body, err = core.MarshalSDP(nc.SessionName, medias); err != nil {
return
}
@@ -260,234 +195,20 @@ func (p *RTSPPlugin) OnTCPConnect(conn *net.TCPConn) {
return
}
case MethodRecord, MethodPlay:
case MethodRecord:
res := &util.Response{Request: req}
err = nc.WriteResponse(res)
var audioFrame *mrtp.RTPAudio
var videoFrame *mrtp.RTPVideo
if sendMode {
go func() {
mem := util.NewScalableMemoryAllocator(1 << 11)
sendRTP := func(pack *mrtp.RTPData, channel byte) (err error) {
nc.StartWrite()
defer nc.StopWrite()
for _, packet := range pack.Packets {
size := packet.MarshalSize()
chunk := mem.Borrow(size + 4)
chunk[0], chunk[1], chunk[2], chunk[3] = '$', channel, byte(size>>8), byte(size)
if _, err = packet.MarshalTo(chunk[4:]); err != nil {
return
}
if _, err = nc.Write(chunk); err != nil {
return
}
}
return
}
m7s.PlayBlock(sender.Subscriber, func(audio *mrtp.RTPAudio) error {
return sendRTP(&audio.RTPData, sender.AudioChannelID)
}, func(video *mrtp.RTPVideo) error {
return sendRTP(&video.RTPData, sender.VideoChannelID)
})
mem.Recycle()
}()
} else {
audioFrame = &mrtp.RTPAudio{}
audioFrame.ScalableMemoryAllocator = mem
audioFrame.RTPCodecParameters = receiver.AudioCodecParameters
videoFrame = &mrtp.RTPVideo{}
videoFrame.ScalableMemoryAllocator = mem
videoFrame.RTPCodecParameters = receiver.VideoCodecParameters
}
for err == nil {
ts := time.Now()
if err = conn.SetReadDeadline(ts.Add(timeout)); err != nil {
return
}
var magic []byte
// we can read:
// 1. RTP interleaved: `$` + 1B channel number + 2B size
// 2. RTSP response: RTSP/1.0 200 OK
// 3. RTSP request: OPTIONS ...
if magic, err = nc.Peek(4); err != nil {
return
}
var channelID byte
var size int
var buf []byte
if magic[0] != '$' {
magicWord := string(magic)
logger.Warn("not magic", "magic", magicWord)
switch magicWord {
case "RTSP":
var res *util.Response
if res, err = nc.ReadResponse(); err != nil {
return
}
logger.Warn(string(res.Body))
// for playing backchannel only after OK response on play
continue
case "OPTI", "TEAR", "DESC", "SETU", "PLAY", "PAUS", "RECO", "ANNO", "GET_", "SET_":
var req *util.Request
if req, err = nc.ReadRequest(); err != nil {
return
}
if req.Method == MethodOptions {
res := &util.Response{Request: req}
if sendMode {
nc.StartWrite()
}
if err = nc.WriteResponse(res); err != nil {
return
}
if sendMode {
nc.StopWrite()
}
}
continue
default:
logger.Error("wrong input")
//c.Fire("RTSP wrong input")
//
//for i := 0; ; i++ {
// // search next start symbol
// if _, err = c.reader.ReadBytes('$'); err != nil {
// return err
// }
//
// if channelID, err = c.reader.ReadByte(); err != nil {
// return err
// }
//
// // TODO: better check maximum good channel ID
// if channelID >= 20 {
// continue
// }
//
// buf4 = make([]byte, 2)
// if _, err = io.ReadFull(c.reader, buf4); err != nil {
// return err
// }
//
// // check if size good for RTP
// size = binary.BigEndian.Uint16(buf4)
// if size <= 1500 {
// break
// }
//
// // 10 tries to find good packet
// if i >= 10 {
// return fmt.Errorf("RTSP wrong input")
// }
//}
for err = nc.Skip(1); err == nil; {
if magic[0], err = nc.ReadByte(); magic[0] == '*' {
channelID, err = nc.ReadByte()
magic[2], err = nc.ReadByte()
magic[3], err = nc.ReadByte()
size = int(binary.BigEndian.Uint16(magic[2:]))
buf = mem.Malloc(size)
if err = nc.ReadNto(size, buf); err != nil {
err = receiver.Receive()
return
case MethodPlay:
res := &util.Response{Request: req}
if err = nc.WriteResponse(res); err != nil {
return
}
break
}
}
}
} else {
// hope that the odd channels are always RTCP
channelID = magic[1]
// get data size
size = int(binary.BigEndian.Uint16(magic[2:]))
// skip 4 bytes from c.reader.Peek
if err = nc.Skip(4); err != nil {
err = sender.Send()
return
}
buf = mem.Malloc(size)
if err = nc.ReadNto(size, buf); err != nil {
return
}
}
if channelID&1 == 0 {
switch channelID {
case receiver.AudioChannelID:
if !receiver.PubAudio {
continue
}
packet := &rtp.Packet{}
if err = packet.Unmarshal(buf); err != nil {
return
}
if len(audioFrame.Packets) == 0 || packet.Timestamp == audioFrame.Packets[0].Timestamp {
audioFrame.AddRecycleBytes(buf)
audioFrame.Packets = append(audioFrame.Packets, packet)
} else {
err = receiver.WriteAudio(audioFrame)
audioFrame = &mrtp.RTPAudio{}
audioFrame.AddRecycleBytes(buf)
audioFrame.Packets = []*rtp.Packet{packet}
audioFrame.RTPCodecParameters = receiver.VideoCodecParameters
audioFrame.ScalableMemoryAllocator = mem
}
case receiver.VideoChannelID:
if !receiver.PubVideo {
continue
}
packet := &rtp.Packet{}
if err = packet.Unmarshal(buf); err != nil {
return
}
if len(videoFrame.Packets) == 0 || packet.Timestamp == videoFrame.Packets[0].Timestamp {
videoFrame.AddRecycleBytes(buf)
videoFrame.Packets = append(videoFrame.Packets, packet)
} else {
// t := time.Now()
err = receiver.WriteVideo(videoFrame)
// fmt.Println("write video", time.Since(t))
videoFrame = &mrtp.RTPVideo{}
videoFrame.AddRecycleBytes(buf)
videoFrame.Packets = []*rtp.Packet{packet}
videoFrame.RTPCodecParameters = receiver.VideoCodecParameters
videoFrame.ScalableMemoryAllocator = mem
}
default:
}
} else {
msg := &RTCP{Channel: channelID}
mem.Free(buf)
if err = msg.Header.Unmarshal(buf); err != nil {
return
}
if msg.Packets, err = rtcp.Unmarshal(buf); err != nil {
return
}
logger.Debug("rtcp", "type", msg.Header.Type, "length", msg.Header.Length)
// TODO: rtcp msg
}
//if keepaliveDT != 0 && ts.After(keepaliveTS) {
// req := &tcp.Request{Method: MethodOptions, URL: c.URL}
// if err = c.WriteRequest(req); err != nil {
// return
// }
//
// keepaliveTS = ts.Add(keepaliveDT)
//}
}
return
case MethodTeardown:
res := &util.Response{Request: req}
_ = nc.WriteResponse(res)

336
plugin/rtsp/pkg/client.go Normal file
View File

@@ -0,0 +1,336 @@
package rtsp
import (
"crypto/tls"
"errors"
"fmt"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/tcp"
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/pkg/util"
"net"
"net/http"
"net/url"
"strconv"
"strings"
)
type Client struct {
Stream
}
func (c *Client) Connect(p *m7s.Client) (err error) {
addr := p.RemoteURL
var rtspURL *url.URL
rtspURL, err = url.Parse(addr)
if err != nil {
return err
}
//ps := strings.Split(u.Path, "/")
//if len(ps) < 3 {
// return errors.New("illegal rtsp url")
//}
istls := rtspURL.Scheme == "rtsps"
if strings.Count(rtspURL.Host, ":") == 0 {
if istls {
rtspURL.Host += ":443"
} else {
rtspURL.Host += ":554"
}
}
var conn net.Conn
if istls {
var tlsconn *tls.Conn
tlsconn, err = tls.Dial("tcp", rtspURL.Host, &tls.Config{})
conn = tlsconn
} else {
conn, err = net.Dial("tcp", rtspURL.Host)
}
if err != nil {
return err
}
defer func() {
if err != nil {
conn.Close()
}
}()
c.NetConnection = NewNetConnection(conn, p.Logger)
c.URL = rtspURL
c.auth = util.NewAuth(c.URL.User)
c.Backchannel = true
return c.Options()
}
func (c *Client) Pull(p *m7s.Puller) (err error) {
defer func() {
c.Close()
if p := recover(); p != nil {
err = p.(error)
}
p.Dispose(err)
}()
var media []*core.Media
if media, err = c.Describe(); err != nil {
return
}
receiver := &Receiver{Publisher: &p.Publisher, Stream: c.Stream}
if err = receiver.SetMedia(media); err != nil {
return
}
if err = c.Play(); err != nil {
return
}
return receiver.Receive()
}
func (c *Client) Push(p *m7s.Pusher) (err error) {
defer c.Close()
sender := &Sender{Subscriber: &p.Subscriber, Stream: c.Stream}
var medias []*core.Media
medias, err = sender.GetMedia()
err = c.Announce(medias)
if err != nil {
return
}
for i, media := range medias {
switch media.Kind {
case "audio", "video":
_, err = c.SetupMedia(media, i)
if err != nil {
return
}
default:
c.Warn("media kind not support", "kind", media.Kind)
}
}
if err = c.Record(); err != nil {
return
}
return sender.Send()
}
func (c *Client) Do(req *util.Request) (*util.Response, error) {
if err := c.WriteRequest(req); err != nil {
return nil, err
}
res, err := c.ReadResponse()
if err != nil {
return nil, err
}
if res.StatusCode == http.StatusUnauthorized {
switch c.auth.Method {
case tcp.AuthNone:
if c.auth.ReadNone(res) {
return c.Do(req)
}
return nil, errors.New("user/pass not provided")
case tcp.AuthUnknown:
if c.auth.Read(res) {
return c.Do(req)
}
default:
return nil, errors.New("wrong user/pass")
}
}
if res.StatusCode != http.StatusOK {
return res, fmt.Errorf("wrong response on %s", req.Method)
}
return res, nil
}
func (c *Client) Options() error {
req := &util.Request{Method: MethodOptions, URL: c.URL}
res, err := c.Do(req)
if err != nil {
return err
}
if val := res.Header.Get("Content-Base"); val != "" {
c.URL, err = urlParse(val)
if err != nil {
return err
}
}
return nil
}
func (c *Client) Describe() (medias []*core.Media, err error) {
// 5.3 Back channel connection
// https://www.onvif.org/specs/stream/ONVIF-Streaming-Spec.pdf
req := &util.Request{
Method: MethodDescribe,
URL: c.URL,
Header: map[string][]string{
"Accept": {"application/sdp"},
},
}
if c.Backchannel {
req.Header.Set("Require", "www.onvif.org/ver20/backchannel")
}
if c.UserAgent != "" {
// this camera will answer with 401 on DESCRIBE without User-Agent
// https://github.com/AlexxIT/go2rtc/issues/235
req.Header.Set("User-Agent", c.UserAgent)
}
var res *util.Response
res, err = c.Do(req)
if err != nil {
return
}
if val := res.Header.Get("Content-Base"); val != "" {
c.URL, err = urlParse(val)
if err != nil {
return
}
}
c.sdp = string(res.Body) // for info
medias, err = UnmarshalSDP(res.Body)
if err != nil {
return
}
if c.Media != "" {
clone := make([]*core.Media, 0, len(medias))
for _, media := range medias {
if strings.Contains(c.Media, media.Kind) {
clone = append(clone, media)
}
}
medias = clone
}
return
}
func (c *Client) Announce(medias []*core.Media) (err error) {
req := &util.Request{
Method: MethodAnnounce,
URL: c.URL,
Header: map[string][]string{
"Content-Type": {"application/sdp"},
},
}
req.Body, err = core.MarshalSDP(c.SessionName, medias)
if err != nil {
return err
}
_, err = c.Do(req)
return
}
func (c *Client) SetupMedia(media *core.Media, index int) (byte, error) {
var transport string
transport = fmt.Sprintf(
// i - RTP (data channel)
// i+1 - RTCP (control channel)
"RTP/AVP/TCP;unicast;interleaved=%d-%d", index*2, index*2+1,
)
if transport == "" {
return 0, fmt.Errorf("wrong media: %v", media)
}
rawURL := media.ID // control
if !strings.Contains(rawURL, "://") {
rawURL = c.URL.String()
if !strings.HasSuffix(rawURL, "/") {
rawURL += "/"
}
rawURL += media.ID
}
trackURL, err := urlParse(rawURL)
if err != nil {
return 0, err
}
req := &util.Request{
Method: MethodSetup,
URL: trackURL,
Header: map[string][]string{
"Transport": {transport},
},
}
res, err := c.Do(req)
if err != nil {
// some Dahua/Amcrest cameras fail here because two simultaneous
// backchannel connections
//if c.Backchannel {
// c.Backchannel = false
// if err = c.Connect(); err != nil {
// return 0, err
// }
// return c.SetupMedia(media)
//}
return 0, err
}
if c.Session == "" {
// Session: 7116520596809429228
// Session: 216525287999;timeout=60
if s := res.Header.Get("Session"); s != "" {
if i := strings.IndexByte(s, ';'); i > 0 {
c.Session = s[:i]
if i = strings.Index(s, "timeout="); i > 0 {
c.keepalive, _ = strconv.Atoi(s[i+8:])
}
} else {
c.Session = s
}
}
}
// we send our `interleaved`, but camera can answer with another
// Transport: RTP/AVP/TCP;unicast;interleaved=10-11;ssrc=10117CB7
// Transport: RTP/AVP/TCP;unicast;destination=192.168.1.111;source=192.168.1.222;interleaved=0
// Transport: RTP/AVP/TCP;ssrc=22345682;interleaved=0-1
transport = res.Header.Get("Transport")
if !strings.HasPrefix(transport, "RTP/AVP/TCP;") {
// Escam Q6 has a bug:
// Transport: RTP/AVP;unicast;destination=192.168.1.111;source=192.168.1.222;interleaved=0-1
if !strings.Contains(transport, ";interleaved=") {
return 0, fmt.Errorf("wrong transport: %s", transport)
}
}
channel := core.Between(transport, "interleaved=", "-")
i, err := strconv.Atoi(channel)
if err != nil {
return 0, err
}
return byte(i), nil
}
func (c *Client) Play() (err error) {
return c.WriteRequest(&util.Request{Method: MethodPlay, URL: c.URL})
}
func (c *Client) Record() (err error) {
return c.WriteRequest(&util.Request{Method: MethodRecord, URL: c.URL})
}
func (c *Client) Teardown() (err error) {
// allow TEARDOWN from any state (ex. ANNOUNCE > SETUP)
return c.WriteRequest(&util.Request{Method: MethodTeardown, URL: c.URL})
}
func (c *Client) Destroy() {
_ = c.Teardown()
c.NetConnection.Destroy()
}

View File

@@ -1,8 +1,9 @@
package rtsp
import (
"github.com/AlexxIT/go2rtc/pkg/core"
"encoding/binary"
"log/slog"
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/pkg/util"
"net"
"net/url"
@@ -21,6 +22,8 @@ func NewNetConnection(conn net.Conn, logger *slog.Logger) *NetConnection {
conn: conn,
Logger: logger,
BufReader: util.NewBufReader(conn),
MemoryAllocator: util.NewScalableMemoryAllocator(1 << 12),
UserAgent: "monibuca" + m7s.Version,
}
}
@@ -33,8 +36,7 @@ type NetConnection struct {
SessionName string
Timeout int
Transport string // custom transport support, ex. RTSP over WebSocket
Medias []*core.Media
MemoryAllocator *util.ScalableMemoryAllocator
UserAgent string
URL *url.URL
@@ -66,6 +68,7 @@ func (c *NetConnection) StopWrite() {
func (c *NetConnection) Destroy() {
c.conn.Close()
c.BufReader.Recycle()
c.MemoryAllocator.Recycle()
c.Info("destroy connection")
}
@@ -105,7 +108,7 @@ const (
StatePlay
)
func (c *NetConnection) WriteRequest(req *util.Request) error {
func (c *NetConnection) WriteRequest(req *util.Request) (err error) {
if req.Proto == "" {
req.Proto = ProtoRTSP
}
@@ -130,11 +133,13 @@ func (c *NetConnection) WriteRequest(req *util.Request) error {
req.Header.Set("Content-Length", val)
}
if err := c.conn.SetWriteDeadline(time.Now().Add(Timeout)); err != nil {
if err = c.conn.SetWriteDeadline(time.Now().Add(Timeout)); err != nil {
return err
}
return req.Write(c.conn)
reqStr := req.String()
c.Debug("->", "req", reqStr)
_, err = c.conn.Write([]byte(reqStr))
return
}
func (c *NetConnection) ReadRequest() (req *util.Request, err error) {
@@ -145,11 +150,11 @@ func (c *NetConnection) ReadRequest() (req *util.Request, err error) {
if err != nil {
return
}
c.Debug(req.String())
c.Debug("<-", "req", req.String())
return
}
func (c *NetConnection) WriteResponse(res *util.Response) error {
func (c *NetConnection) WriteResponse(res *util.Response) (err error) {
if res.Proto == "" {
res.Proto = ProtoRTSP
}
@@ -182,18 +187,151 @@ func (c *NetConnection) WriteResponse(res *util.Response) error {
res.Header.Set("Content-Length", val)
}
if err := c.conn.SetWriteDeadline(time.Now().Add(Timeout)); err != nil {
if err = c.conn.SetWriteDeadline(time.Now().Add(Timeout)); err != nil {
return err
}
c.Debug(res.String())
return res.Write(c.conn)
resStr := res.String()
c.Debug("->", "res", resStr)
_, err = c.conn.Write([]byte(resStr))
return
}
func (c *NetConnection) ReadResponse() (*util.Response, error) {
func (c *NetConnection) ReadResponse() (res *util.Response, err error) {
if err := c.conn.SetReadDeadline(time.Now().Add(Timeout)); err != nil {
return nil, err
}
return util.ReadResponse(c.BufReader)
res, err = util.ReadResponse(c.BufReader)
if err == nil {
c.Debug("<-", "res", res.String())
}
return
}
func (c *NetConnection) Receive(sendMode bool) (channelID byte, buf []byte, err error) {
ts := time.Now()
if err = c.conn.SetReadDeadline(ts.Add(util.Conditoinal(sendMode, time.Second*60, time.Second*15))); err != nil {
return
}
var magic []byte
// we can read:
// 1. RTP interleaved: `$` + 1B channel number + 2B size
// 2. RTSP response: RTSP/1.0 200 OK
// 3. RTSP request: OPTIONS ...
if magic, err = c.Peek(4); err != nil {
return
}
var size int
if magic[0] != '$' {
magicWord := string(magic)
c.Warn("not magic", "magic", magicWord)
switch magicWord {
case "RTSP":
var res *util.Response
if res, err = c.ReadResponse(); err != nil {
return
}
c.Warn(string(res.Body))
// for playing backchannel only after OK response on play
return
case "OPTI", "TEAR", "DESC", "SETU", "PLAY", "PAUS", "RECO", "ANNO", "GET_", "SET_":
var req *util.Request
if req, err = c.ReadRequest(); err != nil {
return
}
if req.Method == MethodOptions {
res := &util.Response{Request: req}
if sendMode {
c.StartWrite()
}
if err = c.WriteResponse(res); err != nil {
return
}
if sendMode {
c.StopWrite()
}
}
return
default:
c.Error("wrong input")
//c.Fire("RTSP wrong input")
//
//for i := 0; ; i++ {
// // search next start symbol
// if _, err = c.reader.ReadBytes('$'); err != nil {
// return err
// }
//
// if channelID, err = c.reader.ReadByte(); err != nil {
// return err
// }
//
// // TODO: better check maximum good channel ID
// if channelID >= 20 {
// continue
// }
//
// buf4 = make([]byte, 2)
// if _, err = io.ReadFull(c.reader, buf4); err != nil {
// return err
// }
//
// // check if size good for RTP
// size = binary.BigEndian.Uint16(buf4)
// if size <= 1500 {
// break
// }
//
// // 10 tries to find good packet
// if i >= 10 {
// return fmt.Errorf("RTSP wrong input")
// }
//}
for err = c.Skip(1); err == nil; {
if magic[0], err = c.ReadByte(); magic[0] == '*' {
channelID, err = c.ReadByte()
magic[2], err = c.ReadByte()
magic[3], err = c.ReadByte()
size = int(binary.BigEndian.Uint16(magic[2:]))
buf = c.MemoryAllocator.Malloc(size)
if err = c.ReadNto(size, buf); err != nil {
return
}
break
}
}
}
} else {
// hope that the odd channels are always RTCP
channelID = magic[1]
// get data size
size = int(binary.BigEndian.Uint16(magic[2:]))
// skip 4 bytes from c.reader.Peek
if err = c.Skip(4); err != nil {
return
}
buf = c.MemoryAllocator.Malloc(size)
if err = c.ReadNto(size, buf); err != nil {
return
}
}
//if keepaliveDT != 0 && ts.After(keepaliveTS) {
// req := &tcp.Request{Method: MethodOptions, URL: c.URL}
// if err = c.WriteRequest(req); err != nil {
// return
// }
//
// keepaliveTS = ts.Add(keepaliveDT)
//}
return
}
func (c *NetConnection) Write(chunk []byte) (int, error) {

View File

@@ -1,8 +1,14 @@
package rtsp
import (
"fmt"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/pion/webrtc/v4"
"m7s.live/m7s/v5"
mrtp "m7s.live/m7s/v5/plugin/rtp/pkg"
"reflect"
)
type Stream struct {
@@ -21,3 +27,195 @@ type Receiver struct {
AudioCodecParameters *webrtc.RTPCodecParameters
VideoCodecParameters *webrtc.RTPCodecParameters
}
func (ns *Stream) Close() error {
if ns.NetConnection != nil {
ns.NetConnection.Destroy()
}
return nil
}
func (s *Sender) GetMedia() (medias []*core.Media, err error) {
if s.SubAudio && s.Publisher.PubAudio {
audioTrack := s.Publisher.GetAudioTrack(reflect.TypeOf((*mrtp.RTPAudio)(nil)))
if err = audioTrack.WaitReady(); err != nil {
return
}
parameter := audioTrack.ICodecCtx.(mrtp.IRTPCtx).GetRTPCodecParameter()
media := &core.Media{
Kind: "audio",
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{{
Name: parameter.MimeType[6:],
ClockRate: parameter.ClockRate,
Channels: parameter.Channels,
FmtpLine: parameter.SDPFmtpLine,
PayloadType: uint8(parameter.PayloadType),
}},
ID: fmt.Sprintf("trackID=%d", len(medias)),
}
medias = append(medias, media)
s.AudioChannelID = 0
}
if s.SubVideo && s.Publisher.PubVideo {
videoTrack := s.Publisher.GetVideoTrack(reflect.TypeOf((*mrtp.RTPVideo)(nil)))
if err = videoTrack.WaitReady(); err != nil {
return
}
parameter := videoTrack.ICodecCtx.(mrtp.IRTPCtx).GetRTPCodecParameter()
c := core.Codec{
Name: parameter.MimeType[6:],
ClockRate: parameter.ClockRate,
Channels: parameter.Channels,
FmtpLine: parameter.SDPFmtpLine,
PayloadType: uint8(parameter.PayloadType),
}
media := &core.Media{
Kind: "video",
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{&c},
ID: fmt.Sprintf("trackID=%d", len(medias)),
}
s.VideoChannelID = byte(len(medias)) << 1
medias = append(medias, media)
}
return
}
func (s *Sender) Send() error {
sendRTP := func(pack *mrtp.RTPData, channel byte) (err error) {
s.StartWrite()
defer s.StopWrite()
for _, packet := range pack.Packets {
size := packet.MarshalSize()
chunk := s.MemoryAllocator.Borrow(size + 4)
chunk[0], chunk[1], chunk[2], chunk[3] = '$', channel, byte(size>>8), byte(size)
if _, err = packet.MarshalTo(chunk[4:]); err != nil {
return
}
if _, err = s.Write(chunk); err != nil {
return
}
}
return
}
go func(err error) {
for err == nil {
_, _, err = s.NetConnection.Receive(true)
}
}(nil)
return m7s.PlayBlock(s.Subscriber, func(audio *mrtp.RTPAudio) error {
return sendRTP(&audio.RTPData, s.AudioChannelID)
}, func(video *mrtp.RTPVideo) error {
return sendRTP(&video.RTPData, s.VideoChannelID)
})
}
func (r *Receiver) SetMedia(medias []*core.Media) (err error) {
for i, media := range medias {
if codec := media.Codecs[0]; codec.IsAudio() {
r.AudioCodecParameters = &webrtc.RTPCodecParameters{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: "audio/" + codec.Name,
ClockRate: codec.ClockRate,
Channels: codec.Channels,
SDPFmtpLine: codec.FmtpLine,
RTCPFeedback: nil,
},
PayloadType: webrtc.PayloadType(codec.PayloadType),
}
r.AudioChannelID = byte(i) << 1
} else if codec.IsVideo() {
r.VideoChannelID = byte(i) << 1
r.VideoCodecParameters = &webrtc.RTPCodecParameters{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: "video/" + codec.Name,
ClockRate: codec.ClockRate,
Channels: codec.Channels,
SDPFmtpLine: codec.FmtpLine,
RTCPFeedback: nil,
},
PayloadType: webrtc.PayloadType(codec.PayloadType),
}
} else {
r.Warn("media kind not support", "kind", codec.Kind())
}
}
return
}
func (r *Receiver) Receive() (err error) {
audioFrame, videoFrame := &mrtp.RTPAudio{}, &mrtp.RTPVideo{}
audioFrame.ScalableMemoryAllocator = r.MemoryAllocator
audioFrame.RTPCodecParameters = r.AudioCodecParameters
videoFrame.ScalableMemoryAllocator = r.MemoryAllocator
videoFrame.RTPCodecParameters = r.VideoCodecParameters
var channelID byte
var buf []byte
for err == nil {
channelID, buf, err = r.NetConnection.Receive(false)
if err != nil {
return
}
if len(buf) == 0 {
continue
}
if channelID&1 == 0 {
switch channelID {
case r.AudioChannelID:
if !r.PubAudio {
continue
}
packet := &rtp.Packet{}
if err = packet.Unmarshal(buf); err != nil {
return
}
if len(audioFrame.Packets) == 0 || packet.Timestamp == audioFrame.Packets[0].Timestamp {
audioFrame.AddRecycleBytes(buf)
audioFrame.Packets = append(audioFrame.Packets, packet)
} else {
err = r.WriteAudio(audioFrame)
audioFrame = &mrtp.RTPAudio{}
audioFrame.AddRecycleBytes(buf)
audioFrame.Packets = []*rtp.Packet{packet}
audioFrame.RTPCodecParameters = r.VideoCodecParameters
audioFrame.ScalableMemoryAllocator = r.MemoryAllocator
}
case r.VideoChannelID:
if !r.PubVideo {
continue
}
packet := &rtp.Packet{}
if err = packet.Unmarshal(buf); err != nil {
return
}
if len(videoFrame.Packets) == 0 || packet.Timestamp == videoFrame.Packets[0].Timestamp {
videoFrame.AddRecycleBytes(buf)
videoFrame.Packets = append(videoFrame.Packets, packet)
} else {
// t := time.Now()
err = r.WriteVideo(videoFrame)
// fmt.Println("write video", time.Since(t))
videoFrame = &mrtp.RTPVideo{}
videoFrame.AddRecycleBytes(buf)
videoFrame.Packets = []*rtp.Packet{packet}
videoFrame.RTPCodecParameters = r.VideoCodecParameters
videoFrame.ScalableMemoryAllocator = r.MemoryAllocator
}
default:
}
} else {
msg := &RTCP{Channel: channelID}
r.MemoryAllocator.Free(buf)
if err = msg.Header.Unmarshal(buf); err != nil {
return
}
if msg.Packets, err = rtcp.Unmarshal(buf); err != nil {
return
}
r.Debug("rtcp", "type", msg.Header.Type, "length", msg.Header.Length)
// TODO: rtcp msg
}
}
return
}

View File

@@ -23,6 +23,8 @@ const (
PublisherStateDisposed
)
const threshold = 100 * time.Millisecond
type SpeedControl struct {
speed float64
beginTime time.Time
@@ -43,7 +45,7 @@ func (s *SpeedControl) speedControl(speed float64, ts time.Duration) {
}
should := time.Duration(float64(ts) / speed)
s.Delta = should - elapsed
if s.Delta > time.Second {
if s.Delta > threshold {
time.Sleep(s.Delta)
}
}

View File

@@ -27,7 +27,7 @@ import (
var (
Version = "v5.0.0"
MergeConfigs = []string{"Publish", "Subscribe", "HTTP", "PublicIP"}
MergeConfigs = []string{"Publish", "Subscribe", "HTTP", "PublicIP", "LogLevel", "EnableAuth"}
ExecPath = os.Args[0]
ExecDir = filepath.Dir(ExecPath)
serverIndexG atomic.Uint32
@@ -145,14 +145,14 @@ func (s *Server) run(ctx context.Context, conf any) (err error) {
s.Error("parsing yml error:", err)
}
}
s.Config.Parse(&s.config)
s.Config.Parse(&s.config, "GLOBAL")
s.Config.Parse(&s.Engine, "GLOBAL")
if cg != nil {
s.Config.ParseUserFile(cg["global"])
}
var lv slog.LevelVar
lv.UnmarshalText([]byte(s.LogLevel))
if s.LogLevel == "trace" {
lv.UnmarshalText([]byte(s.config.LogLevel))
if s.config.LogLevel == "trace" {
lv.Set(TraceLevel)
}
s.LogHandler.SetLevel(lv.Level())

View File

@@ -89,7 +89,7 @@ type Subscriber struct {
VideoReader *AVRingReader
}
func PlayBlock[A any, V any](s *Subscriber, onAudio func(A) error, onVideo func(V) error) {
func PlayBlock[A any, V any](s *Subscriber, onAudio func(A) error, onVideo func(V) error) (err error) {
var ar, vr *AVRingReader
var a1, v1 reflect.Type
var at, vt *AVTrack
@@ -218,7 +218,6 @@ func PlayBlock[A any, V any](s *Subscriber, onAudio func(A) error, onVideo func(
prePublisher = s.Publisher
}
}
var err error
for err == nil {
err = s.Err()
if vr != nil {
@@ -311,4 +310,5 @@ func PlayBlock[A any, V any](s *Subscriber, onAudio func(A) error, onVideo func(
checkPublisherChange()
runtime.Gosched()
}
return
}