diff --git a/client.go b/client.go index 4193d8f..c4e5597 100644 --- a/client.go +++ b/client.go @@ -29,6 +29,12 @@ func (p *RTSPPuller) Pull() (err error) { return } p.setTracks() + if p.AudioTrack == nil { + p.Publisher.Config.PubAudio = false + } + if p.VideoTrack == nil { + p.Publisher.Config.PubVideo = false + } return p.Conn.Start() } diff --git a/main.go b/main.go index da40a70..9b00709 100644 --- a/main.go +++ b/main.go @@ -2,14 +2,19 @@ package rtsp2 import ( "net" + "net/http" + "strconv" "github.com/AlexxIT/go2rtc/pkg/rtsp" + "github.com/AlexxIT/go2rtc/pkg/tcp" "go.uber.org/zap" "m7s.live/engine/v4" "m7s.live/engine/v4/config" + "m7s.live/engine/v4/util" ) type RTSP2Config struct { + config.HTTP config.Publish config.Subscribe config.Pull @@ -17,7 +22,9 @@ type RTSP2Config struct { config.TCP } -var conf RTSP2Config +var conf = RTSP2Config{ + TCP: config.TCP{ListenAddr: ":554"}, +} var RTSP2Plugin = engine.InstallPlugin(&conf) @@ -47,19 +54,72 @@ func (c *RTSP2Config) OnEvent(event any) { func (c *RTSP2Config) ServeTCP(conn net.Conn) { server := rtsp.NewServer(conn) server.Listen(func(msg any) { - switch msg { - case rtsp.MethodPlay: - var suber RTSPSubscriber - if err := RTSP2Plugin.Subscribe(server.URL.Path, &suber); err != nil { - server.Stop() + RTSP2Plugin.Debug("rtsp", zap.Any("msg", msg)) + switch msg := msg.(type) { + case *tcp.Response: + switch msg.Request.Method { + case rtsp.MethodRecord, rtsp.MethodPlay: + go server.Start() } - case rtsp.MethodRecord: - var puber RTSPPublisher - if err := RTSP2Plugin.Publish(server.URL.Path, &puber); err != nil { - server.Stop() - } else { - puber.setTracks() + case string: + switch msg { + case rtsp.MethodDescribe: + var suber RTSPSubscriber + suber.Conn = server + if err := RTSP2Plugin.Subscribe(server.URL.Path, &suber); err != nil { + server.Stop() + } + case rtsp.MethodAnnounce: + var puber RTSPPublisher + puber.Conn = server + if err := RTSP2Plugin.Publish(server.URL.Path, &puber); err != nil { + server.Stop() + } else { + puber.setTracks() + if puber.AudioTrack == nil { + puber.Publisher.Config.PubAudio = false + } + if puber.VideoTrack == nil { + puber.Publisher.Config.PubVideo = false + } + } } } }) + server.Accept() +} + +func filterStreams() (ss []*engine.Stream) { + engine.Streams.Range(func(key string, s *engine.Stream) { + switch s.Publisher.(type) { + case *RTSPPublisher, *RTSPPuller: + ss = append(ss, s) + } + }) + return +} + +func (*RTSP2Config) API_list(w http.ResponseWriter, r *http.Request) { + util.ReturnFetchValue(filterStreams, w, r) +} + +func (*RTSP2Config) API_Pull(rw http.ResponseWriter, r *http.Request) { + query := r.URL.Query() + save, _ := strconv.Atoi(query.Get("save")) + err := RTSP2Plugin.Pull(query.Get("streamPath"), query.Get("target"), new(RTSPPuller), save) + if err != nil { + util.ReturnError(util.APIErrorQueryParse, err.Error(), rw, r) + } else { + util.ReturnOK(rw, r) + } +} + +func (*RTSP2Config) API_Push(rw http.ResponseWriter, r *http.Request) { + query := r.URL.Query() + err := RTSP2Plugin.Push(query.Get("streamPath"), query.Get("target"), new(RTSPPusher), query.Has("save")) + if err != nil { + util.ReturnError(util.APIErrorQueryParse, err.Error(), rw, r) + } else { + util.ReturnOK(rw, r) + } } diff --git a/publisher.go b/publisher.go index 9bb2f18..6114f52 100644 --- a/publisher.go +++ b/publisher.go @@ -1,6 +1,8 @@ package rtsp2 import ( + "encoding/hex" + "github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/h264" "github.com/AlexxIT/go2rtc/pkg/h265" @@ -32,8 +34,12 @@ func (p *RTSPPublisher) setTracks() { if p.VideoTrack == nil { p.VideoTrack = track.NewH264(p.Stream, c.PayloadType) sps, pps := h264.GetParameterSet(c.FmtpLine) - p.VideoTrack.WriteSliceBytes(sps) - p.VideoTrack.WriteSliceBytes(pps) + if len(sps) > 0 { + p.VideoTrack.WriteSliceBytes(sps) + } + if len(pps) > 0 { + p.VideoTrack.WriteSliceBytes(pps) + } } handler = p.VideoTrack.WriteRTPPack case core.CodecH265: @@ -43,23 +49,40 @@ func (p *RTSPPublisher) setTracks() { if len(vps) > 0 { p.VideoTrack.WriteSliceBytes(vps) } - p.VideoTrack.WriteSliceBytes(sps) - p.VideoTrack.WriteSliceBytes(pps) + if len(sps) > 0 { + p.VideoTrack.WriteSliceBytes(sps) + } + if len(pps) > 0 { + p.VideoTrack.WriteSliceBytes(pps) + } } handler = p.VideoTrack.WriteRTPPack case core.CodecAAC: if p.AudioTrack == nil { - p.AudioTrack = track.NewAAC(p.Stream, c.PayloadType) + s := core.Between(c.FmtpLine, "config=", ";") + asc, _ := hex.DecodeString(s) + // var aacConfig mpeg4audio.AudioSpecificConfig + // aacConfig.ChannelCount = int(c.Channels) + // aacConfig.SampleRate = int(c.ClockRate) + // aacConfig.Type = mpeg4audio.ObjectTypeAACLC + // asc, _ := aacConfig.Marshal() + aac := track.NewAAC(p.Stream, c.PayloadType, c.ClockRate) + aac.WriteSequenceHead(append([]byte{0xAF, 0x00}, asc...)) + p.AudioTrack = aac } handler = p.AudioTrack.WriteRTPPack case core.CodecPCMA: if p.AudioTrack == nil { - p.AudioTrack = track.NewG711(p.Stream, true, c.PayloadType) + g711 := track.NewG711(p.Stream, true, c.PayloadType, c.ClockRate) + g711.Channels = byte(c.Channels) + p.AudioTrack = g711 } handler = p.AudioTrack.WriteRTPPack case core.CodecPCMU: if p.AudioTrack == nil { - p.AudioTrack = track.NewG711(p.Stream, false, c.PayloadType) + g711 := track.NewG711(p.Stream, false, c.PayloadType, c.ClockRate) + g711.Channels = byte(c.Channels) + p.AudioTrack = g711 } handler = p.AudioTrack.WriteRTPPack }