实现rtsp发送rtcp消息

This commit is contained in:
yangjiechina
2024-04-11 17:40:00 +08:00
parent 9f22cb7829
commit ef9a0fea4f
5 changed files with 78 additions and 23 deletions

View File

@@ -1,7 +1,6 @@
package rtsp package rtsp
import ( import (
"fmt"
"github.com/yangjiechina/avformat/transport" "github.com/yangjiechina/avformat/transport"
"github.com/yangjiechina/avformat/utils" "github.com/yangjiechina/avformat/utils"
"github.com/yangjiechina/live-server/log" "github.com/yangjiechina/live-server/log"
@@ -72,7 +71,7 @@ func (s *serverImpl) OnPacket(conn net.Conn, data []byte) {
message, url, header, err := parseMessage(data) message, url, header, err := parseMessage(data)
if err != nil { if err != nil {
println(fmt.Sprintf("failed to prase message:%s. err:%s peer:%s", string(data), err.Error(), conn.RemoteAddr().String())) log.Sugar.Errorf("failed to prase message:%s. err:%s peer:%s", string(data), err.Error(), conn.RemoteAddr().String())
_ = conn.Close() _ = conn.Close()
s.closeSession(conn) s.closeSession(conn)
return return
@@ -80,7 +79,8 @@ func (s *serverImpl) OnPacket(conn net.Conn, data []byte) {
err = t.Data.(*session).Input(message, url, header) err = t.Data.(*session).Input(message, url, header)
if err != nil { if err != nil {
println(fmt.Sprintf("failed to process message of RTSP. err:%s peer:%s msg:%s", err.Error(), conn.RemoteAddr().String(), string(data))) log.Sugar.Errorf("failed to process message of RTSP. err:%s peer:%s msg:%s", err.Error(), conn.RemoteAddr().String(), string(data))
_ = conn.Close() _ = conn.Close()
} }
} }

View File

@@ -5,6 +5,7 @@ import (
"bytes" "bytes"
"fmt" "fmt"
"github.com/yangjiechina/avformat/utils" "github.com/yangjiechina/avformat/utils"
"github.com/yangjiechina/live-server/log"
"github.com/yangjiechina/live-server/stream" "github.com/yangjiechina/live-server/stream"
"net" "net"
"net/http" "net/http"
@@ -60,7 +61,7 @@ type requestHandler interface {
type session struct { type session struct {
conn net.Conn conn net.Conn
sink_ *Sink sink_ *sink
sessionId string sessionId string
writeBuffer *bytes.Buffer writeBuffer *bytes.Buffer
} }
@@ -165,7 +166,7 @@ func (s *session) onDescribe(source string, headers textproto.MIMEHeader) error
}) })
code := utils.HookStateOK code := utils.HookStateOK
s.sink_ = sink_ s.sink_ = sink_.(*sink)
stream.HookPlaying(sink_, func() { stream.HookPlaying(sink_, func() {
@@ -209,15 +210,18 @@ func (s *session) onSetup(sourceId string, index int, headers textproto.MIMEHead
} }
_ = port _ = port
port, err = strconv.Atoi(pairPort[1]) port2, err := strconv.Atoi(pairPort[1])
if err != nil { if err != nil {
return err return err
} }
_ = port _ = port2
log.Sugar.Debugf("client port:%d-%d", port, port2)
} }
} }
rtpPort, rtcpPort, err := s.sink_.addTrack(index, tcp) ssrc := 0xFFFFFFFF
rtpPort, rtcpPort, err := s.sink_.addTrack(index, tcp, uint32(ssrc))
if err != nil { if err != nil {
return err return err
} }
@@ -229,7 +233,8 @@ func (s *session) onSetup(sourceId string, index int, headers textproto.MIMEHead
} else { } else {
responseHeader += ";server_port=" + fmt.Sprintf("%d-%d", rtpPort, rtcpPort) responseHeader += ";server_port=" + fmt.Sprintf("%d-%d", rtpPort, rtcpPort)
} }
responseHeader += ";ssrc=FFFFFFFF"
responseHeader += ";ssrc=" + strconv.FormatInt(int64(ssrc), 16)
response := NewOKResponse(headers.Get("Cseq")) response := NewOKResponse(headers.Get("Cseq"))
response.Header.Set("Transport", responseHeader) response.Header.Set("Transport", responseHeader)
@@ -297,5 +302,8 @@ func (s *session) Input(method string, url_ *url.URL, headers textproto.MIMEHead
} }
func (s *session) close() { func (s *session) close() {
if s.sink_ != nil {
s.sink_.Close()
s.sink_ = nil
}
} }

View File

@@ -2,10 +2,13 @@ package rtsp
import ( import (
"fmt" "fmt"
"github.com/pion/rtcp"
"github.com/yangjiechina/avformat/transport" "github.com/yangjiechina/avformat/transport"
"github.com/yangjiechina/avformat/utils" "github.com/yangjiechina/avformat/utils"
"github.com/yangjiechina/live-server/log"
"github.com/yangjiechina/live-server/stream" "github.com/yangjiechina/live-server/stream"
"net" "net"
"time"
) )
// 对于UDP而言, 每个sink维护一对UDPTransport // 对于UDP而言, 每个sink维护一对UDPTransport
@@ -17,6 +20,7 @@ type sink struct {
tracks []*rtspTrack tracks []*rtspTrack
sdpCb func(sdp string) sdpCb func(sdp string)
//是否是TCP拉流
tcp bool tcp bool
playing bool playing bool
} }
@@ -35,7 +39,7 @@ func (s *sink) setTrackCount(count int) {
s.tracks = make([]*rtspTrack, count) s.tracks = make([]*rtspTrack, count)
} }
func (s *sink) addTrack(index int, tcp bool) (int, int, error) { func (s *sink) addTrack(index int, tcp bool, ssrc uint32) (int, int, error) {
utils.Assert(index < cap(s.tracks)) utils.Assert(index < cap(s.tracks))
utils.Assert(s.tracks[index] == nil) utils.Assert(s.tracks[index] == nil)
@@ -43,7 +47,9 @@ func (s *sink) addTrack(index int, tcp bool) (int, int, error) {
var rtpPort int var rtpPort int
var rtcpPort int var rtcpPort int
track := rtspTrack{} track := rtspTrack{
ssrc: ssrc,
}
if tcp { if tcp {
s.tcp = true s.tcp = true
} else { } else {
@@ -83,14 +89,38 @@ func (s *sink) addTrack(index int, tcp bool) (int, int, error) {
return rtpPort, rtcpPort, err return rtpPort, rtcpPort, err
} }
func (s *sink) input(index int, data []byte) error { func (s *sink) input(index int, data []byte, rtpTime uint32) error {
utils.Assert(index < cap(s.tracks))
//拉流方还没有连上来 //拉流方还没有连上来
s.tracks[index].pktCount++ utils.Assert(index < cap(s.tracks))
track := s.tracks[index]
track.pktCount++
track.octetCount += len(data)
if s.tcp { if s.tcp {
s.Conn.Write(data) s.Conn.Write(data)
} else { } else {
s.tracks[index].rtpConn.Write(data) track.rtpConn.Write(data)
if track.rtcpConn == nil || track.pktCount%100 != 0 {
return nil
}
nano := uint64(time.Now().UnixNano())
ntp := (nano/1000000000 + 2208988800<<32) | (nano % 1000000000)
sr := rtcp.SenderReport{
SSRC: track.ssrc,
NTPTime: ntp,
RTPTime: rtpTime,
PacketCount: uint32(track.pktCount),
OctetCount: uint32(track.octetCount),
}
marshal, err := sr.Marshal()
if err != nil {
log.Sugar.Errorf("创建rtcp sr消息失败 err:%s msg:%v", err.Error(), sr)
}
track.rtcpConn.Write(marshal)
} }
return nil return nil
} }
@@ -117,6 +147,8 @@ func (s *sink) TrackConnected(index int) bool {
} }
func (s *sink) Close() { func (s *sink) Close() {
s.SinkImpl.Close()
for _, track := range s.tracks { for _, track := range s.tracks {
if track.rtp != nil { if track.rtp != nil {
track.rtp.Close() track.rtp.Close()

View File

@@ -73,9 +73,9 @@ func (t *tranStream) onRtpPacket(data []byte, timestamp uint32, params interface
for i, rtp := range track.header { for i, rtp := range track.header {
librtp.RollbackSeq(rtp[OverTcpHeaderSize:], int(seq)-(count-i-1)) librtp.RollbackSeq(rtp[OverTcpHeaderSize:], int(seq)-(count-i-1))
if sink_.tcp { if sink_.tcp {
sink_.input(index, rtp) sink_.input(index, rtp, 0)
} else { } else {
sink_.input(index, rtp[OverTcpHeaderSize:]) sink_.input(index, rtp[OverTcpHeaderSize:], timestamp)
} }
} }
} }
@@ -84,9 +84,9 @@ func (t *tranStream) onRtpPacket(data []byte, timestamp uint32, params interface
t.overTCP(track.buffer[:end], index) t.overTCP(track.buffer[:end], index)
if sink_.tcp { if sink_.tcp {
sink_.input(index, track.buffer[:end]) sink_.input(index, track.buffer[:end], 0)
} else { } else {
sink_.input(index, data) sink_.input(index, data, timestamp)
} }
} }
} }
@@ -133,8 +133,8 @@ func (t *tranStream) Input(packet utils.AVPacket) error {
} }
func (t *tranStream) AddSink(sink_ stream.ISink) error { func (t *tranStream) AddSink(sink_ stream.ISink) error {
sink_.(*Sink).setTrackCount(len(t.TransStreamImpl.Tracks)) sink_.(*sink).setTrackCount(len(t.TransStreamImpl.Tracks))
if err := sink_.(*Sink).SendHeader([]byte(t.sdp)); err != nil { if err := sink_.(*sink).SendHeader([]byte(t.sdp)); err != nil {
return err return err
} }

View File

@@ -13,7 +13,9 @@ type rtspTrack struct {
rtcpConn net.Conn rtcpConn net.Conn
//rtcp //rtcp
pktCount int pktCount int
ssrc uint32
octetCount int
} }
func (s *rtspTrack) onRTPPacket(conn net.Conn, data []byte) { func (s *rtspTrack) onRTPPacket(conn net.Conn, data []byte) {
@@ -26,6 +28,19 @@ func (s *rtspTrack) onRTCPPacket(conn net.Conn, data []byte) {
if s.rtcpConn == nil { if s.rtcpConn == nil {
s.rtcpConn = conn s.rtcpConn = conn
} }
//packs, err := rtcp.Unmarshal(data)
//if err != nil {
// log.Sugar.Warnf("解析rtcp包失败 err:%s conn:%s pkt:%s", err.Error(), conn.RemoteAddr().String(), hex.EncodeToString(data))
// return
//}
//
//for _, pkt := range packs {
// if _, ok := pkt.(*rtcp.ReceiverReport); ok {
// } else if _, ok := pkt.(*rtcp.SourceDescription); ok {
// } else if _, ok := pkt.(*rtcp.Goodbye); ok {
// }
//}
} }
// tcp链接成功回调 // tcp链接成功回调