feat: add server func

This commit is contained in:
langhuihui
2023-10-06 09:40:39 +08:00
parent 23c97cc61a
commit ddc541776d
4 changed files with 204 additions and 139 deletions

150
client.go
View File

@@ -1,179 +1,53 @@
package rtsp2
import (
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/rtsp"
"go.uber.org/zap"
"m7s.live/engine/v4"
"m7s.live/engine/v4/codec"
"m7s.live/engine/v4/track"
)
type RTSPClient struct {
*rtsp.Conn `json:"-" yaml:"-"`
// DialContext func(ctx context.Context, network, address string) (net.Conn, error) `json:"-" yaml:"-"`
}
type RTSPPuller struct {
engine.Publisher
RTSPPublisher
engine.Puller
RTSPClient
}
func (p *RTSPClient) Disconnect() {
func (p *RTSPPuller) Disconnect() {
if p.Conn != nil {
p.Conn.Close()
p.Conn.Stop()
}
}
func (p *RTSPPuller) Connect() (err error) {
p.Conn = rtsp.NewClient(p.RemoteURL)
p.SetIO(p.Conn)
return p.Conn.Dial()
}
func (p *RTSPPuller) Pull() (err error) {
if err = p.Options(); err != nil {
if err = p.Conn.Options(); err != nil {
return
}
if err = p.Describe(); err != nil {
if err = p.Conn.Describe(); err != nil {
return
}
p.setTracks()
return p.Start()
}
func (p *RTSPPuller) setTracks() {
for _, m := range p.Conn.Medias {
for _, c := range m.Codecs {
sender := core.NewSender(m, c)
rec, err := p.Conn.GetTrack(m, c)
if err != nil {
p.Error("get track", zap.Error(err))
continue
}
switch c.Name {
case core.CodecH264:
p.VideoTrack = track.NewH264(p.Stream, c.PayloadType)
sender.Handler = p.VideoTrack.WriteRTPPack
case core.CodecH265:
p.VideoTrack = track.NewH265(p.Stream, c.PayloadType)
sender.Handler = p.VideoTrack.WriteRTPPack
case core.CodecAAC:
p.AudioTrack = track.NewAAC(p.Stream, c.PayloadType)
sender.Handler = p.AudioTrack.WriteRTPPack
case core.CodecPCMA:
p.AudioTrack = track.NewG711(p.Stream, true, c.PayloadType)
sender.Handler = p.AudioTrack.WriteRTPPack
case core.CodecPCMU:
p.AudioTrack = track.NewG711(p.Stream, false, c.PayloadType)
sender.Handler = p.AudioTrack.WriteRTPPack
}
sender.HandleRTP(rec)
}
}
return p.Conn.Start()
}
type RTSPPusher struct {
engine.Subscriber
RTSPSubscriber
engine.Pusher
RTSPClient
videoSender *core.Receiver
audioSender *core.Receiver
}
func (p *RTSPPusher) OnEvent(event any) {
switch v := event.(type) {
case *track.Audio:
if p.audioSender != nil {
break
}
var c *core.Codec
var media *core.Media
switch v.CodecID {
case codec.CodecID_AAC:
c = &core.Codec{
Name: core.CodecAAC,
ClockRate: v.SampleRate,
Channels: uint16(v.Channels),
PayloadType: v.PayloadType,
}
case codec.CodecID_PCMA:
c = &core.Codec{
Name: core.CodecPCMA,
ClockRate: v.SampleRate,
Channels: uint16(v.Channels),
PayloadType: v.PayloadType,
}
case codec.CodecID_PCMU:
c = &core.Codec{
Name: core.CodecPCMU,
ClockRate: v.SampleRate,
Channels: uint16(v.Channels),
PayloadType: v.PayloadType,
}
}
media = &core.Media{
Kind: "audio",
Direction: "sendonly",
Codecs: []*core.Codec{c},
}
if p.videoSender == nil {
media.ID = "0"
} else {
media.ID = "1"
}
p.audioSender = core.NewReceiver(media, c)
p.Conn.AddTrack(media, c, p.audioSender)
p.AddTrack(v)
case *track.Video:
if p.videoSender != nil {
break
}
var c *core.Codec
var media *core.Media
switch v.CodecID {
case codec.CodecID_H264:
c = &core.Codec{
Name: core.CodecH264,
ClockRate: v.SampleRate,
PayloadType: v.PayloadType,
}
case codec.CodecID_H265:
c = &core.Codec{
Name: core.CodecH265,
ClockRate: v.SampleRate,
PayloadType: v.PayloadType,
}
}
media = &core.Media{
Kind: "video",
Direction: "sendonly",
Codecs: []*core.Codec{c},
}
if p.audioSender == nil {
media.ID = "0"
} else {
media.ID = "1"
}
p.videoSender = core.NewReceiver(media, c)
p.Conn.AddTrack(media, c, p.videoSender)
p.AddTrack(v)
case engine.VideoRTP:
p.videoSender.WriteRTP(v.Packet)
case engine.AudioRTP:
p.audioSender.WriteRTP(v.Packet)
default:
p.Subscriber.OnEvent(event)
func (p *RTSPPusher) Disconnect() {
if p.Conn != nil {
p.Conn.Stop()
}
}
func (p *RTSPPusher) Connect() (err error) {
p.Conn = rtsp.NewClient(p.RemoteURL)
p.SetIO(p.Conn)
return p.Conn.Dial()
}
func (p *RTSPPuller) Push() (err error) {
return p.Announce()
func (p *RTSPPusher) Push() (err error) {
return p.Conn.Announce()
}

34
main.go
View File

@@ -1,6 +1,9 @@
package rtsp2
import (
"net"
"github.com/AlexxIT/go2rtc/pkg/rtsp"
"go.uber.org/zap"
"m7s.live/engine/v4"
"m7s.live/engine/v4/config"
@@ -8,14 +11,17 @@ import (
type RTSP2Config struct {
config.Publish
config.Subscribe
config.Pull
config.Push
config.TCP
}
var conf RTSP2Config
var RTSP2Plugin = engine.InstallPlugin(&conf)
func (*RTSP2Config) OnEvent(event any) {
func (c *RTSP2Config) OnEvent(event any) {
switch v := event.(type) {
case engine.FirstConfig:
for streamPath, url := range conf.PullOnStart {
@@ -23,6 +29,12 @@ func (*RTSP2Config) OnEvent(event any) {
RTSP2Plugin.Error("pull", zap.String("streamPath", streamPath), zap.String("url", url), zap.Error(err))
}
}
case engine.SEpublish:
if url, ok := conf.PushList[v.Target.Path]; ok {
if err := RTSP2Plugin.Push(v.Target.Path, url, new(RTSPPusher), false); err != nil {
RTSP2Plugin.Error("push", zap.String("streamPath", v.Target.Path), zap.String("url", url), zap.Error(err))
}
}
case engine.InvitePublish: //按需拉流
if url, ok := conf.PullOnSub[v.Target]; ok {
if err := RTSP2Plugin.Pull(v.Target, url, new(RTSPPuller), 0); err != nil {
@@ -31,3 +43,23 @@ func (*RTSP2Config) OnEvent(event any) {
}
}
}
func (c *RTSP2Config) ServeTCP(conn net.Conn) {
server := rtsp.NewServer(conn)
server.Listen(func(msg any) {
switch msg {
case rtsp.MethodPlay:
var suber RTSPSubscriber
if err := RTSP2Plugin.Subscribe(server.URL.Path, &suber); err != nil {
server.Stop()
}
case rtsp.MethodRecord:
var puber RTSPPublisher
if err := RTSP2Plugin.Publish(server.URL.Path, &puber); err != nil {
server.Stop()
} else {
puber.setTracks()
}
}
})
}

57
publisher.go Normal file
View File

@@ -0,0 +1,57 @@
package rtsp2
import (
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/rtsp"
"go.uber.org/zap"
"m7s.live/engine/v4"
"m7s.live/engine/v4/track"
)
type RTSPPublisher struct {
engine.Publisher
Conn *rtsp.Conn `json:"-" yaml:"-"`
}
func (p *RTSPPublisher) OnEvent(event any) {
switch event.(type) {
case engine.SEclose, engine.SEKick:
p.Conn.Stop()
}
p.Publisher.OnEvent(event)
}
func (p *RTSPPublisher) setTracks() {
for _, m := range p.Conn.Medias {
for _, c := range m.Codecs {
var handler core.HandlerFunc
switch c.Name {
case core.CodecH264:
p.VideoTrack = track.NewH264(p.Stream, c.PayloadType)
handler = p.VideoTrack.WriteRTPPack
case core.CodecH265:
p.VideoTrack = track.NewH265(p.Stream, c.PayloadType)
handler = p.VideoTrack.WriteRTPPack
case core.CodecAAC:
p.AudioTrack = track.NewAAC(p.Stream, c.PayloadType)
handler = p.AudioTrack.WriteRTPPack
case core.CodecPCMA:
p.AudioTrack = track.NewG711(p.Stream, true, c.PayloadType)
handler = p.AudioTrack.WriteRTPPack
case core.CodecPCMU:
p.AudioTrack = track.NewG711(p.Stream, false, c.PayloadType)
handler = p.AudioTrack.WriteRTPPack
}
if handler != nil {
sender := core.NewSender(m, c)
rec, err := p.Conn.GetTrack(m, c)
if err != nil {
p.Error("get track", zap.Error(err))
continue
}
sender.Handler = handler
sender.HandleRTP(rec)
}
}
}
}

102
subscriber.go Normal file
View File

@@ -0,0 +1,102 @@
package rtsp2
import (
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/rtsp"
"m7s.live/engine/v4"
"m7s.live/engine/v4/codec"
"m7s.live/engine/v4/track"
)
type RTSPSubscriber struct {
engine.Subscriber
videoSender *core.Receiver
audioSender *core.Receiver
Conn *rtsp.Conn `json:"-" yaml:"-"`
}
func (p *RTSPSubscriber) OnEvent(event any) {
switch v := event.(type) {
case *track.Audio:
if p.audioSender != nil {
break
}
var c *core.Codec
var media *core.Media
switch v.CodecID {
case codec.CodecID_AAC:
c = &core.Codec{
Name: core.CodecAAC,
ClockRate: v.SampleRate,
Channels: uint16(v.Channels),
PayloadType: v.PayloadType,
}
case codec.CodecID_PCMA:
c = &core.Codec{
Name: core.CodecPCMA,
ClockRate: v.SampleRate,
Channels: uint16(v.Channels),
PayloadType: v.PayloadType,
}
case codec.CodecID_PCMU:
c = &core.Codec{
Name: core.CodecPCMU,
ClockRate: v.SampleRate,
Channels: uint16(v.Channels),
PayloadType: v.PayloadType,
}
}
media = &core.Media{
Kind: "audio",
Direction: "sendonly",
Codecs: []*core.Codec{c},
}
if p.videoSender == nil {
media.ID = "0"
} else {
media.ID = "1"
}
p.audioSender = core.NewReceiver(media, c)
p.Conn.AddTrack(media, c, p.audioSender)
p.AddTrack(v)
case *track.Video:
if p.videoSender != nil {
break
}
var c *core.Codec
var media *core.Media
switch v.CodecID {
case codec.CodecID_H264:
c = &core.Codec{
Name: core.CodecH264,
ClockRate: v.SampleRate,
PayloadType: v.PayloadType,
}
case codec.CodecID_H265:
c = &core.Codec{
Name: core.CodecH265,
ClockRate: v.SampleRate,
PayloadType: v.PayloadType,
}
}
media = &core.Media{
Kind: "video",
Direction: "sendonly",
Codecs: []*core.Codec{c},
}
if p.audioSender == nil {
media.ID = "0"
} else {
media.ID = "1"
}
p.videoSender = core.NewReceiver(media, c)
p.Conn.AddTrack(media, c, p.videoSender)
p.AddTrack(v)
case engine.VideoRTP:
p.videoSender.WriteRTP(v.Packet)
case engine.AudioRTP:
p.audioSender.WriteRTP(v.Packet)
default:
p.Subscriber.OnEvent(event)
}
}