diff --git a/client.go b/client.go index 9a09c1d..4193d8f 100644 --- a/client.go +++ b/client.go @@ -1,179 +1,53 @@ package rtsp2 import ( - "github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/rtsp" - "go.uber.org/zap" "m7s.live/engine/v4" - "m7s.live/engine/v4/codec" - "m7s.live/engine/v4/track" ) -type RTSPClient struct { - *rtsp.Conn `json:"-" yaml:"-"` - // DialContext func(ctx context.Context, network, address string) (net.Conn, error) `json:"-" yaml:"-"` -} - type RTSPPuller struct { - engine.Publisher + RTSPPublisher engine.Puller - RTSPClient } -func (p *RTSPClient) Disconnect() { +func (p *RTSPPuller) Disconnect() { if p.Conn != nil { - p.Conn.Close() + p.Conn.Stop() } } func (p *RTSPPuller) Connect() (err error) { p.Conn = rtsp.NewClient(p.RemoteURL) - p.SetIO(p.Conn) return p.Conn.Dial() } func (p *RTSPPuller) Pull() (err error) { - if err = p.Options(); err != nil { + if err = p.Conn.Options(); err != nil { return } - if err = p.Describe(); err != nil { + if err = p.Conn.Describe(); err != nil { return } p.setTracks() - return p.Start() -} - -func (p *RTSPPuller) setTracks() { - for _, m := range p.Conn.Medias { - for _, c := range m.Codecs { - sender := core.NewSender(m, c) - rec, err := p.Conn.GetTrack(m, c) - if err != nil { - p.Error("get track", zap.Error(err)) - continue - } - switch c.Name { - case core.CodecH264: - p.VideoTrack = track.NewH264(p.Stream, c.PayloadType) - sender.Handler = p.VideoTrack.WriteRTPPack - case core.CodecH265: - p.VideoTrack = track.NewH265(p.Stream, c.PayloadType) - sender.Handler = p.VideoTrack.WriteRTPPack - case core.CodecAAC: - p.AudioTrack = track.NewAAC(p.Stream, c.PayloadType) - sender.Handler = p.AudioTrack.WriteRTPPack - case core.CodecPCMA: - p.AudioTrack = track.NewG711(p.Stream, true, c.PayloadType) - sender.Handler = p.AudioTrack.WriteRTPPack - case core.CodecPCMU: - p.AudioTrack = track.NewG711(p.Stream, false, c.PayloadType) - sender.Handler = p.AudioTrack.WriteRTPPack - } - sender.HandleRTP(rec) - } - } + return p.Conn.Start() } type RTSPPusher struct { - engine.Subscriber + RTSPSubscriber engine.Pusher - RTSPClient - videoSender *core.Receiver - audioSender *core.Receiver } -func (p *RTSPPusher) OnEvent(event any) { - switch v := event.(type) { - case *track.Audio: - if p.audioSender != nil { - break - } - var c *core.Codec - var media *core.Media - switch v.CodecID { - case codec.CodecID_AAC: - c = &core.Codec{ - Name: core.CodecAAC, - ClockRate: v.SampleRate, - Channels: uint16(v.Channels), - PayloadType: v.PayloadType, - } - case codec.CodecID_PCMA: - c = &core.Codec{ - Name: core.CodecPCMA, - ClockRate: v.SampleRate, - Channels: uint16(v.Channels), - PayloadType: v.PayloadType, - } - case codec.CodecID_PCMU: - c = &core.Codec{ - Name: core.CodecPCMU, - ClockRate: v.SampleRate, - Channels: uint16(v.Channels), - PayloadType: v.PayloadType, - } - } - media = &core.Media{ - Kind: "audio", - Direction: "sendonly", - Codecs: []*core.Codec{c}, - } - if p.videoSender == nil { - media.ID = "0" - } else { - media.ID = "1" - } - p.audioSender = core.NewReceiver(media, c) - p.Conn.AddTrack(media, c, p.audioSender) - p.AddTrack(v) - case *track.Video: - if p.videoSender != nil { - break - } - var c *core.Codec - var media *core.Media - switch v.CodecID { - case codec.CodecID_H264: - c = &core.Codec{ - Name: core.CodecH264, - ClockRate: v.SampleRate, - PayloadType: v.PayloadType, - } - case codec.CodecID_H265: - c = &core.Codec{ - Name: core.CodecH265, - ClockRate: v.SampleRate, - PayloadType: v.PayloadType, - } - } - media = &core.Media{ - Kind: "video", - Direction: "sendonly", - Codecs: []*core.Codec{c}, - } - if p.audioSender == nil { - media.ID = "0" - } else { - media.ID = "1" - } - p.videoSender = core.NewReceiver(media, c) - p.Conn.AddTrack(media, c, p.videoSender) - p.AddTrack(v) - case engine.VideoRTP: - p.videoSender.WriteRTP(v.Packet) - case engine.AudioRTP: - p.audioSender.WriteRTP(v.Packet) - default: - p.Subscriber.OnEvent(event) +func (p *RTSPPusher) Disconnect() { + if p.Conn != nil { + p.Conn.Stop() } } func (p *RTSPPusher) Connect() (err error) { p.Conn = rtsp.NewClient(p.RemoteURL) - p.SetIO(p.Conn) return p.Conn.Dial() } -func (p *RTSPPuller) Push() (err error) { - return p.Announce() +func (p *RTSPPusher) Push() (err error) { + return p.Conn.Announce() } diff --git a/main.go b/main.go index 2e5a4df..da40a70 100644 --- a/main.go +++ b/main.go @@ -1,6 +1,9 @@ package rtsp2 import ( + "net" + + "github.com/AlexxIT/go2rtc/pkg/rtsp" "go.uber.org/zap" "m7s.live/engine/v4" "m7s.live/engine/v4/config" @@ -8,14 +11,17 @@ import ( type RTSP2Config struct { config.Publish + config.Subscribe config.Pull + config.Push + config.TCP } var conf RTSP2Config var RTSP2Plugin = engine.InstallPlugin(&conf) -func (*RTSP2Config) OnEvent(event any) { +func (c *RTSP2Config) OnEvent(event any) { switch v := event.(type) { case engine.FirstConfig: for streamPath, url := range conf.PullOnStart { @@ -23,6 +29,12 @@ func (*RTSP2Config) OnEvent(event any) { RTSP2Plugin.Error("pull", zap.String("streamPath", streamPath), zap.String("url", url), zap.Error(err)) } } + case engine.SEpublish: + if url, ok := conf.PushList[v.Target.Path]; ok { + if err := RTSP2Plugin.Push(v.Target.Path, url, new(RTSPPusher), false); err != nil { + RTSP2Plugin.Error("push", zap.String("streamPath", v.Target.Path), zap.String("url", url), zap.Error(err)) + } + } case engine.InvitePublish: //按需拉流 if url, ok := conf.PullOnSub[v.Target]; ok { if err := RTSP2Plugin.Pull(v.Target, url, new(RTSPPuller), 0); err != nil { @@ -31,3 +43,23 @@ func (*RTSP2Config) OnEvent(event any) { } } } + +func (c *RTSP2Config) ServeTCP(conn net.Conn) { + server := rtsp.NewServer(conn) + server.Listen(func(msg any) { + switch msg { + case rtsp.MethodPlay: + var suber RTSPSubscriber + if err := RTSP2Plugin.Subscribe(server.URL.Path, &suber); err != nil { + server.Stop() + } + case rtsp.MethodRecord: + var puber RTSPPublisher + if err := RTSP2Plugin.Publish(server.URL.Path, &puber); err != nil { + server.Stop() + } else { + puber.setTracks() + } + } + }) +} diff --git a/publisher.go b/publisher.go new file mode 100644 index 0000000..cc84e4d --- /dev/null +++ b/publisher.go @@ -0,0 +1,57 @@ +package rtsp2 + +import ( + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/rtsp" + "go.uber.org/zap" + "m7s.live/engine/v4" + "m7s.live/engine/v4/track" +) + +type RTSPPublisher struct { + engine.Publisher + Conn *rtsp.Conn `json:"-" yaml:"-"` +} + +func (p *RTSPPublisher) OnEvent(event any) { + switch event.(type) { + case engine.SEclose, engine.SEKick: + p.Conn.Stop() + } + p.Publisher.OnEvent(event) +} + +func (p *RTSPPublisher) setTracks() { + for _, m := range p.Conn.Medias { + for _, c := range m.Codecs { + var handler core.HandlerFunc + switch c.Name { + case core.CodecH264: + p.VideoTrack = track.NewH264(p.Stream, c.PayloadType) + handler = p.VideoTrack.WriteRTPPack + case core.CodecH265: + p.VideoTrack = track.NewH265(p.Stream, c.PayloadType) + handler = p.VideoTrack.WriteRTPPack + case core.CodecAAC: + p.AudioTrack = track.NewAAC(p.Stream, c.PayloadType) + handler = p.AudioTrack.WriteRTPPack + case core.CodecPCMA: + p.AudioTrack = track.NewG711(p.Stream, true, c.PayloadType) + handler = p.AudioTrack.WriteRTPPack + case core.CodecPCMU: + p.AudioTrack = track.NewG711(p.Stream, false, c.PayloadType) + handler = p.AudioTrack.WriteRTPPack + } + if handler != nil { + sender := core.NewSender(m, c) + rec, err := p.Conn.GetTrack(m, c) + if err != nil { + p.Error("get track", zap.Error(err)) + continue + } + sender.Handler = handler + sender.HandleRTP(rec) + } + } + } +} diff --git a/subscriber.go b/subscriber.go new file mode 100644 index 0000000..75fb279 --- /dev/null +++ b/subscriber.go @@ -0,0 +1,102 @@ +package rtsp2 + +import ( + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/rtsp" + "m7s.live/engine/v4" + "m7s.live/engine/v4/codec" + "m7s.live/engine/v4/track" +) + +type RTSPSubscriber struct { + engine.Subscriber + videoSender *core.Receiver + audioSender *core.Receiver + Conn *rtsp.Conn `json:"-" yaml:"-"` +} + +func (p *RTSPSubscriber) OnEvent(event any) { + switch v := event.(type) { + case *track.Audio: + if p.audioSender != nil { + break + } + var c *core.Codec + var media *core.Media + switch v.CodecID { + case codec.CodecID_AAC: + c = &core.Codec{ + Name: core.CodecAAC, + ClockRate: v.SampleRate, + Channels: uint16(v.Channels), + PayloadType: v.PayloadType, + } + case codec.CodecID_PCMA: + c = &core.Codec{ + Name: core.CodecPCMA, + ClockRate: v.SampleRate, + Channels: uint16(v.Channels), + PayloadType: v.PayloadType, + } + case codec.CodecID_PCMU: + c = &core.Codec{ + Name: core.CodecPCMU, + ClockRate: v.SampleRate, + Channels: uint16(v.Channels), + PayloadType: v.PayloadType, + } + } + media = &core.Media{ + Kind: "audio", + Direction: "sendonly", + Codecs: []*core.Codec{c}, + } + if p.videoSender == nil { + media.ID = "0" + } else { + media.ID = "1" + } + p.audioSender = core.NewReceiver(media, c) + p.Conn.AddTrack(media, c, p.audioSender) + p.AddTrack(v) + case *track.Video: + if p.videoSender != nil { + break + } + var c *core.Codec + var media *core.Media + switch v.CodecID { + case codec.CodecID_H264: + c = &core.Codec{ + Name: core.CodecH264, + ClockRate: v.SampleRate, + PayloadType: v.PayloadType, + } + case codec.CodecID_H265: + c = &core.Codec{ + Name: core.CodecH265, + ClockRate: v.SampleRate, + PayloadType: v.PayloadType, + } + } + media = &core.Media{ + Kind: "video", + Direction: "sendonly", + Codecs: []*core.Codec{c}, + } + if p.audioSender == nil { + media.ID = "0" + } else { + media.ID = "1" + } + p.videoSender = core.NewReceiver(media, c) + p.Conn.AddTrack(media, c, p.videoSender) + p.AddTrack(v) + case engine.VideoRTP: + p.videoSender.WriteRTP(v.Packet) + case engine.AudioRTP: + p.audioSender.WriteRTP(v.Packet) + default: + p.Subscriber.OnEvent(event) + } +}