feat: rtsp client

This commit is contained in:
langhuihui
2024-07-02 13:29:07 +08:00
parent bbe1d785ce
commit 87dc204fc0
15 changed files with 137 additions and 74 deletions

4
api.go
View File

@@ -182,7 +182,7 @@ func (s *Server) GetSubscribers(ctx context.Context, req *pb.SubscribersRequest)
}
func (s *Server) AudioTrackSnap(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.TrackSnapShotResponse, err error) {
s.Call(func() {
if pub, ok := s.Streams.Get(req.StreamPath); ok && !pub.AudioTrack.IsEmpty() {
if pub, ok := s.Streams.Get(req.StreamPath); ok && pub.HasAudioTrack() {
res = &pb.TrackSnapShotResponse{}
for _, memlist := range pub.AudioTrack.Allocator.GetChildren() {
var list []*pb.MemoryBlock
@@ -260,7 +260,7 @@ func (s *Server) api_VideoTrack_SSE(rw http.ResponseWriter, r *http.Request) {
func (s *Server) VideoTrackSnap(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.TrackSnapShotResponse, err error) {
s.Call(func() {
if pub, ok := s.Streams.Get(req.StreamPath); ok && !pub.VideoTrack.IsEmpty() {
if pub, ok := s.Streams.Get(req.StreamPath); ok && pub.HasVideoTrack() {
res = &pb.TrackSnapShotResponse{}
for _, memlist := range pub.VideoTrack.Allocator.GetChildren() {
var list []*pb.MemoryBlock

View File

@@ -0,0 +1,10 @@
global:
loglevel: info
tcp:
listenaddr: :50050
hdl:
publish:
pubaudio: false
pull:
pullonstart:
live/test: /Users/dexter/Movies/jb-demo.flv

View File

@@ -0,0 +1,12 @@
global:
tcp:
listenaddr: :50051
http:
listenaddr: :8081
listenaddrtls: :8555
rtsp:
tcp:
listenaddr: :8554
pull:
pullonstart:
live/test: rtsp://localhost/live/test

24
example/rtsp-pull/main.go Normal file
View File

@@ -0,0 +1,24 @@
package main
import (
"context"
"flag"
"time"
"m7s.live/m7s/v5"
_ "m7s.live/m7s/v5/plugin/debug"
_ "m7s.live/m7s/v5/plugin/hdl"
_ "m7s.live/m7s/v5/plugin/rtsp"
)
func main() {
ctx := context.Background()
var multi bool
flag.BoolVar(&multi, "multi", false, "debug")
flag.Parse()
if multi {
go m7s.Run(ctx, "config1.yaml")
}
time.Sleep(time.Second)
m7s.NewServer().Run(ctx, "config2.yaml")
}

View File

@@ -2,4 +2,3 @@ global:
loglevel: info
tcp:
listenaddr: :50050

View File

@@ -8,6 +8,16 @@ import (
var _ slog.Handler = (*MultiLogHandler)(nil)
func ParseLevel(level string) slog.Level {
var lv slog.LevelVar
if level == "trace" {
lv.Set(TraceLevel)
} else {
lv.UnmarshalText([]byte(level))
}
return lv.Level()
}
type MultiLogHandler struct {
handlers []slog.Handler
parentLevel *slog.Level

View File

@@ -123,11 +123,8 @@ func (r *BufReader) ReadRange(n int, yield func([]byte)) (err error) {
return
}
n -= r.buf.Length
if yield != nil {
r.buf.Range(yield)
}
r.buf.MoveToEnd()
}
}
return
}

View File

@@ -241,6 +241,17 @@ func (r *MemoryReader) ReadBE(n int) (num int, err error) {
return
}
func (r *MemoryReader) Range(yield func([]byte)) {
if yield != nil {
for r.Length > 0 {
yield(r.GetCurrent())
r.skipBuf()
}
} else {
r.MoveToEnd()
}
}
func (r *MemoryReader) RangeN(n int, yield func([]byte)) {
for good := yield != nil; r.Length > 0 && n > 0; r.skipBuf() {
curBuf := r.GetCurrent()

View File

@@ -218,9 +218,9 @@ func (sma *ScalableMemoryAllocator) Read(reader io.Reader, n int) (mem []byte, e
}
func (sma *ScalableMemoryAllocator) FreeRest(mem *[]byte, keep int) {
if keep < len(*mem) {
sma.Free((*mem)[keep:])
*mem = (*mem)[:keep]
if m := *mem; keep < len(m) {
sma.Free(m[keep:])
*mem = m[:keep]
}
}

View File

@@ -2,7 +2,6 @@ package m7s
import (
"context"
"log/slog"
"net"
"net/http"
"os"
@@ -64,12 +63,7 @@ func (plugin *PluginMeta) Init(s *Server, userConfig map[string]any) {
}
p.Config.ParseUserFile(userConfig)
finalConfig, _ := yaml.Marshal(p.Config.GetMap())
var lv slog.LevelVar
_ = lv.UnmarshalText([]byte(p.config.LogLevel))
if p.config.LogLevel == "trace" {
lv.Set(TraceLevel)
}
p.Logger.Handler().(*MultiLogHandler).SetLevel(lv.Level())
p.Logger.Handler().(*MultiLogHandler).SetLevel(ParseLevel(p.config.LogLevel))
p.Debug("config", "detail", string(finalConfig))
if s.DisableAll {
p.Disabled = true

View File

@@ -27,13 +27,8 @@ type LogRotatePlugin struct {
var _ = m7s.InstallPlugin[LogRotatePlugin](&pb.Logrotate_ServiceDesc, pb.RegisterLogrotateHandler)
func (config *LogRotatePlugin) OnInit() (err error) {
var lv slog.LevelVar
lv.UnmarshalText([]byte(config.Level))
if config.Level == "trace" {
lv.Set(pkg.TraceLevel)
}
builder := func(w io.Writer, opts *slog.HandlerOptions) slog.Handler {
return console.NewHandler(w, &console.HandlerOptions{NoColor: true, Level: lv.Level(),TimeFormat: "2006-01-02 15:04:05.000"})
return console.NewHandler(w, &console.HandlerOptions{NoColor: true, Level: pkg.ParseLevel(config.Level), TimeFormat: "2006-01-02 15:04:05.000"})
}
config.handler, err = rotoslog.NewHandler(rotoslog.LogHandlerBuilder(builder), rotoslog.LogDir(config.Path), rotoslog.MaxFileSize(config.Size), rotoslog.DateTimeLayout(config.Formatter), rotoslog.MaxRotatedFiles(config.MaxFiles))
if err == nil {

View File

@@ -319,6 +319,7 @@ func (c *NetConnection) Receive(sendMode bool) (channelID byte, buf []byte, err
}
buf = c.MemoryAllocator.Malloc(size)
if err = c.ReadNto(size, buf); err != nil {
c.MemoryAllocator.Free(buf)
return
}
}

View File

@@ -13,8 +13,8 @@ import (
type Stream struct {
*NetConnection
AudioChannelID byte
VideoChannelID byte
AudioChannelID int
VideoChannelID int
}
type Sender struct {
*m7s.Subscriber
@@ -36,7 +36,7 @@ func (ns *Stream) Close() error {
}
func (s *Sender) GetMedia() (medias []*core.Media, err error) {
if s.SubAudio && s.Publisher.PubAudio {
if s.SubAudio && s.Publisher.PubAudio && s.Publisher.HasAudioTrack() {
audioTrack := s.Publisher.GetAudioTrack(reflect.TypeOf((*mrtp.RTPAudio)(nil)))
if err = audioTrack.WaitReady(); err != nil {
return
@@ -54,11 +54,11 @@ func (s *Sender) GetMedia() (medias []*core.Media, err error) {
}},
ID: fmt.Sprintf("trackID=%d", len(medias)),
}
s.AudioChannelID = len(medias) << 1
medias = append(medias, media)
s.AudioChannelID = 0
}
if s.SubVideo && s.Publisher.PubVideo {
if s.SubVideo && s.Publisher.PubVideo && s.Publisher.HasVideoTrack() {
videoTrack := s.Publisher.GetVideoTrack(reflect.TypeOf((*mrtp.RTPVideo)(nil)))
if err = videoTrack.WaitReady(); err != nil {
return
@@ -77,20 +77,19 @@ func (s *Sender) GetMedia() (medias []*core.Media, err error) {
Codecs: []*core.Codec{&c},
ID: fmt.Sprintf("trackID=%d", len(medias)),
}
s.VideoChannelID = byte(len(medias)) << 1
s.VideoChannelID = len(medias) << 1
medias = append(medias, media)
}
return
}
func (s *Sender) Send() error {
sendRTP := func(pack *mrtp.RTPData, channel byte) (err error) {
func (s *Sender) sendRTP(pack *mrtp.RTPData, channel int) (err error) {
s.StartWrite()
defer s.StopWrite()
for _, packet := range pack.Packets {
size := packet.MarshalSize()
chunk := s.MemoryAllocator.Borrow(size + 4)
chunk[0], chunk[1], chunk[2], chunk[3] = '$', channel, byte(size>>8), byte(size)
chunk[0], chunk[1], chunk[2], chunk[3] = '$', byte(channel), byte(size>>8), byte(size)
if _, err = packet.MarshalTo(chunk[4:]); err != nil {
return
}
@@ -99,19 +98,31 @@ func (s *Sender) Send() error {
}
}
return
}
go func(err error) {
}
func (s *Sender) send() error {
return m7s.PlayBlock(s.Subscriber, func(audio *mrtp.RTPAudio) error {
return s.sendRTP(&audio.RTPData, s.AudioChannelID)
}, func(video *mrtp.RTPVideo) error {
return s.sendRTP(&video.RTPData, s.VideoChannelID)
})
}
func (s *Sender) receive() {
var err error
for err == nil {
_, _, err = s.NetConnection.Receive(true)
}
}(nil)
return m7s.PlayBlock(s.Subscriber, func(audio *mrtp.RTPAudio) error {
return sendRTP(&audio.RTPData, s.AudioChannelID)
}, func(video *mrtp.RTPVideo) error {
return sendRTP(&video.RTPData, s.VideoChannelID)
})
}
func (s *Sender) Send() (err error) {
go s.receive()
return s.send()
}
func (r *Receiver) SetMedia(medias []*core.Media) (err error) {
r.AudioChannelID = -1
r.VideoChannelID = -1
for i, media := range medias {
if codec := media.Codecs[0]; codec.IsAudio() {
r.AudioCodecParameters = &webrtc.RTPCodecParameters{
@@ -124,9 +135,9 @@ func (r *Receiver) SetMedia(medias []*core.Media) (err error) {
},
PayloadType: webrtc.PayloadType(codec.PayloadType),
}
r.AudioChannelID = byte(i) << 1
r.AudioChannelID = i << 1
} else if codec.IsVideo() {
r.VideoChannelID = byte(i) << 1
r.VideoChannelID = i << 1
r.VideoCodecParameters = &webrtc.RTPCodecParameters{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: "video/" + codec.Name,
@@ -160,7 +171,7 @@ func (r *Receiver) Receive() (err error) {
continue
}
if channelID&1 == 0 {
switch channelID {
switch int(channelID) {
case r.AudioChannelID:
if !r.PubAudio {
continue

View File

@@ -57,10 +57,6 @@ type AVTracks struct {
util.Collection[reflect.Type, *AVTrack]
}
func (t *AVTracks) IsEmpty() bool {
return t.Length == 0
}
func (t *AVTracks) CreateSubTrack(dataType reflect.Type) (track *AVTrack) {
track = NewAVTrack(dataType, t.AVTrack)
track.WrapIndex = t.Length
@@ -116,11 +112,11 @@ func (p *Publisher) checkTimeout() (err error) {
err = p.timeout()
default:
if p.PublishTimeout > 0 {
if !p.VideoTrack.IsEmpty() && !p.VideoTrack.LastValue.WriteTime.IsZero() && time.Since(p.VideoTrack.LastValue.WriteTime) > p.PublishTimeout {
if p.HasVideoTrack() && !p.VideoTrack.LastValue.WriteTime.IsZero() && time.Since(p.VideoTrack.LastValue.WriteTime) > p.PublishTimeout {
p.Error("video timeout", "writeTime", p.VideoTrack.LastValue.WriteTime)
err = ErrPublishTimeout
}
if !p.AudioTrack.IsEmpty() && !p.AudioTrack.LastValue.WriteTime.IsZero() && time.Since(p.AudioTrack.LastValue.WriteTime) > p.PublishTimeout {
if p.HasAudioTrack() && !p.AudioTrack.LastValue.WriteTime.IsZero() && time.Since(p.AudioTrack.LastValue.WriteTime) > p.PublishTimeout {
p.Error("audio timeout", "writeTime", p.AudioTrack.LastValue.WriteTime)
err = ErrPublishTimeout
}
@@ -144,10 +140,10 @@ func (p *Publisher) RemoveSubscriber(subscriber *Subscriber) {
} else {
p.BufferTime = p.Plugin.GetCommonConf().Publish.BufferTime
}
if !p.AudioTrack.IsEmpty() {
if p.HasAudioTrack() {
p.AudioTrack.AVTrack.BufferRange[0] = p.BufferTime
}
if !p.VideoTrack.IsEmpty() {
if p.HasVideoTrack() {
p.VideoTrack.AVTrack.BufferRange[0] = p.BufferTime
}
if p.State == PublisherStateSubscribed && p.Subscribers.Length == 0 {
@@ -166,10 +162,10 @@ func (p *Publisher) AddSubscriber(subscriber *Subscriber) {
p.Info("subscriber +1", "count", p.Subscribers.Length)
if subscriber.BufferTime > p.BufferTime {
p.BufferTime = subscriber.BufferTime
if !p.AudioTrack.IsEmpty() {
if p.HasAudioTrack() {
p.AudioTrack.AVTrack.BufferRange[0] = p.BufferTime
}
if !p.VideoTrack.IsEmpty() {
if p.HasVideoTrack() {
p.VideoTrack.AVTrack.BufferRange[0] = p.BufferTime
}
}
@@ -420,7 +416,7 @@ func (p *Publisher) GetAudioTrack(dataType reflect.Type) (t *AVTrack) {
if t, ok := p.AudioTrack.Get(dataType); ok {
return t
}
if !p.AudioTrack.IsEmpty() {
if p.HasAudioTrack() {
return p.AudioTrack.CreateSubTrack(dataType)
}
return
@@ -432,12 +428,20 @@ func (p *Publisher) GetVideoTrack(dataType reflect.Type) (t *AVTrack) {
if t, ok := p.VideoTrack.Get(dataType); ok {
return t
}
if !p.VideoTrack.IsEmpty() {
if p.HasVideoTrack() {
return p.VideoTrack.CreateSubTrack(dataType)
}
return
}
func (p *Publisher) HasAudioTrack() bool {
return p.AudioTrack.Length > 0
}
func (p *Publisher) HasVideoTrack() bool {
return p.VideoTrack.Length > 0
}
func (p *Publisher) Dispose(err error) {
p.Lock()
defer p.Unlock()
@@ -448,10 +452,10 @@ func (p *Publisher) Dispose(err error) {
return
}
if p.IsStopped() {
if !p.AudioTrack.IsEmpty() {
if p.HasAudioTrack() {
p.AudioTrack.Dispose()
}
if !p.VideoTrack.IsEmpty() {
if p.HasVideoTrack() {
p.VideoTrack.Dispose()
}
p.State = PublisherStateDisposed

View File

@@ -150,12 +150,7 @@ func (s *Server) run(ctx context.Context, conf any) (err error) {
if cg != nil {
s.Config.ParseUserFile(cg["global"])
}
var lv slog.LevelVar
lv.UnmarshalText([]byte(s.config.LogLevel))
if s.config.LogLevel == "trace" {
lv.Set(TraceLevel)
}
s.LogHandler.SetLevel(lv.Level())
s.LogHandler.SetLevel(ParseLevel(s.config.LogLevel))
s.registerHandler(map[string]http.HandlerFunc{
"/api/config/json/{name}": s.api_Config_JSON_,
"/api/stream/annexb/{streamPath...}": s.api_Stream_AnnexB_,