支持rtsp over tcp

This commit is contained in:
yangjiechina
2024-03-17 20:20:43 +08:00
parent d325daeb8b
commit 7694237dc7
5 changed files with 102 additions and 56 deletions

View File

@@ -19,12 +19,13 @@ type rtpTrack struct {
tmp [][]byte tmp [][]byte
} }
func NewRTPTrack(muxer librtp.Muxer, pt byte, rate int) *rtpTrack { func NewRTPTrack(muxer librtp.Muxer, pt byte, rate int, mediaType utils.AVMediaType) *rtpTrack {
stream := &rtpTrack{ stream := &rtpTrack{
pt: pt, pt: pt,
rate: rate, rate: rate,
muxer: muxer, muxer: muxer,
buffer: make([]byte, 1500), buffer: make([]byte, 1500),
mediaType: mediaType,
} }
return stream return stream

View File

@@ -15,6 +15,8 @@ import (
"time" "time"
) )
type SessionState int
const ( const (
MethodOptions = "OPTIONS" MethodOptions = "OPTIONS"
MethodDescribe = "DESCRIBE" MethodDescribe = "DESCRIBE"
@@ -29,6 +31,13 @@ const (
MethodRecord = "RECORD" MethodRecord = "RECORD"
Version = "RTSP/1.0" Version = "RTSP/1.0"
SessionSateOptions = SessionState(0x1)
SessionSateDescribe = SessionState(0x2)
SessionSateSetup = SessionState(0x3)
SessionSatePlay = SessionState(0x4)
SessionSateTeardown = SessionState(0x5)
SessionSatePause = SessionState(0x6)
) )
type requestHandler interface { type requestHandler interface {
@@ -181,27 +190,29 @@ func (s *session) onSetup(sourceId string, index int, headers textproto.MIMEHead
var clientRtpPort int var clientRtpPort int
var clientRtcpPort int var clientRtcpPort int
tcp := "RTP/AVP" != split[0] && "RTP/AVP/UDP" != split[0] tcp := "RTP/AVP" != split[0] && "RTP/AVP/UDP" != split[0]
for _, value := range split { if !tcp {
if !strings.HasPrefix(value, "client_port=") { for _, value := range split {
continue if !strings.HasPrefix(value, "client_port=") {
} continue
}
pairPort := strings.Split(value[len("client_port="):], "-") pairPort := strings.Split(value[len("client_port="):], "-")
if len(pairPort) != 2 { if len(pairPort) != 2 {
return fmt.Errorf("failed to parsing client_port:%s", value) return fmt.Errorf("failed to parsing client_port:%s", value)
} }
port, err := strconv.Atoi(pairPort[0]) port, err := strconv.Atoi(pairPort[0])
if err != nil { if err != nil {
return err return err
} }
clientRtpPort = port clientRtpPort = port
port, err = strconv.Atoi(pairPort[1]) port, err = strconv.Atoi(pairPort[1])
if err != nil { if err != nil {
return err return err
}
clientRtcpPort = port
} }
clientRtcpPort = port
} }
rtpPort, rtcpPort, err := s.sink_.addTrack(index, tcp) rtpPort, rtcpPort, err := s.sink_.addTrack(index, tcp)
@@ -211,7 +222,15 @@ func (s *session) onSetup(sourceId string, index int, headers textproto.MIMEHead
println(clientRtpPort) println(clientRtpPort)
println(clientRtcpPort) println(clientRtcpPort)
responseHeader := transportHeader + ";server_port=" + fmt.Sprintf("%d-%d", rtpPort, rtcpPort) + ";ssrc=FFFFFFFF" responseHeader := transportHeader
if tcp {
//修改interleaved为实际的stream index
responseHeader += ";interleaved=" + fmt.Sprintf("%d-%d", index, index)
} else {
responseHeader += ";server_port=" + fmt.Sprintf("%d-%d", rtpPort, rtcpPort)
}
responseHeader += ";ssrc=FFFFFFFF"
response := NewOKResponse(headers.Get("Cseq")) response := NewOKResponse(headers.Get("Cseq"))
response.Header.Set("Transport", responseHeader) response.Header.Set("Transport", responseHeader)
response.Header.Set("Session", s.sessionId) response.Header.Set("Session", s.sessionId)
@@ -226,7 +245,11 @@ func (s *session) onPlay(sourceId string, headers textproto.MIMEHeader) error {
response.Header.Set("Session", sessionHeader) response.Header.Set("Session", sessionHeader)
} }
return s.response(response, nil) err := s.response(response, nil)
if err == nil {
s.sink_.playing = true
}
return err
} }
func (s *session) onTeardown() { func (s *session) onTeardown() {

View File

@@ -15,7 +15,10 @@ type sink struct {
//一个rtsp源可能存在多个流, 每个流都需要拉取拉取 //一个rtsp源可能存在多个流, 每个流都需要拉取拉取
tracks []*rtspTrack tracks []*rtspTrack
sdpCB func(sdp string) sdpCb func(sdp string)
tcp bool
playing bool
} }
func NewSink(id stream.SinkId, sourceId string, conn net.Conn, cb func(sdp string)) stream.ISink { func NewSink(id stream.SinkId, sourceId string, conn net.Conn, cb func(sdp string)) stream.ISink {
@@ -23,6 +26,8 @@ func NewSink(id stream.SinkId, sourceId string, conn net.Conn, cb func(sdp strin
stream.SinkImpl{Id_: id, SourceId_: sourceId, Protocol_: stream.ProtocolRtsp, Conn: conn}, stream.SinkImpl{Id_: id, SourceId_: sourceId, Protocol_: stream.ProtocolRtsp, Conn: conn},
nil, nil,
cb, cb,
false,
false,
} }
} }
@@ -40,18 +45,7 @@ func (s *sink) addTrack(index int, tcp bool) (int, int, error) {
track := rtspTrack{} track := rtspTrack{}
if tcp { if tcp {
err = rtspTransportManger.AllocTransport(true, func(port int) { s.tcp = true
var addr *net.TCPAddr
addr, err = net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", "0.0.0.0", port))
if err == nil {
track.rtp = &transport.TCPServer{}
track.rtp.SetHandler2(track.onTCPConnected, nil, track.onTCPDisconnected)
err = track.rtp.Bind(addr)
}
rtpPort = port
})
} else { } else {
err = rtspTransportManger.AllocPairTransport(func(port int) { err = rtspTransportManger.AllocPairTransport(func(port int) {
//rtp port //rtp port
@@ -91,16 +85,18 @@ func (s *sink) addTrack(index int, tcp bool) (int, int, error) {
func (s *sink) input(index int, data []byte) error { func (s *sink) input(index int, data []byte) error {
utils.Assert(index < cap(s.tracks)) utils.Assert(index < cap(s.tracks))
//拉流方还没有连上来 //拉流方还没有连上来
s.tracks[index].pktCount++ s.tracks[index].pktCount++
s.tracks[index].rtpConn.Write(data) if s.tcp {
s.Conn.Write(data)
} else {
s.tracks[index].rtpConn.Write(data)
}
return nil return nil
} }
func (s *sink) isConnected(index int) bool { func (s *sink) isConnected(index int) bool {
return s.tracks[index] != nil && s.tracks[index].rtpConn != nil return s.playing && (s.tcp || (s.tracks[index] != nil && s.tracks[index].rtpConn != nil))
} }
func (s *sink) pktCount(index int) int { func (s *sink) pktCount(index int) int {
@@ -109,7 +105,7 @@ func (s *sink) pktCount(index int) int {
// SendHeader 回调rtsp流的sdp信息 // SendHeader 回调rtsp流的sdp信息
func (s *sink) SendHeader(data []byte) error { func (s *sink) SendHeader(data []byte) error {
s.sdpCB(string(data)) s.sdpCb(string(data))
return nil return nil
} }

View File

@@ -10,6 +10,11 @@ import (
"net" "net"
) )
const (
OverTcpHeaderSize = 4
OverTcpMagic = 0x24
)
// 低延迟是rtsp特性, 不考虑实现GOP缓存 // 低延迟是rtsp特性, 不考虑实现GOP缓存
type tranStream struct { type tranStream struct {
stream.TransStreamImpl stream.TransStreamImpl
@@ -38,39 +43,59 @@ func NewTransStream(addr net.IPAddr, urlFormat string) stream.ITransStream {
} }
func (t *tranStream) onAllocBuffer(params interface{}) []byte { func (t *tranStream) onAllocBuffer(params interface{}) []byte {
return t.rtpTracks[params.(int)].buffer return t.rtpTracks[params.(int)].buffer[OverTcpHeaderSize:]
} }
func (t *tranStream) onRtpPacket(data []byte, timestamp uint32, params interface{}) { func (t *tranStream) onRtpPacket(data []byte, timestamp uint32, params interface{}) {
index := params.(int) index := params.(int)
track := t.rtpTracks[index]
if t.rtpTracks[index].cache && t.rtpTracks[index].header == nil { if track.cache && track.header == nil {
bytes := make([]byte, len(data)) bytes := make([]byte, OverTcpHeaderSize+len(data))
copy(bytes, data) copy(bytes[OverTcpHeaderSize:], data)
t.rtpTracks[index].tmp = append(t.rtpTracks[index].tmp, bytes) track.tmp = append(track.tmp, bytes)
t.overTCP(bytes, index)
return return
} }
for _, iSink := range t.Sinks { for _, value := range t.Sinks {
if !iSink.(*sink).isConnected(index) { sink_ := value.(*sink)
if !sink_.isConnected(index) {
continue continue
} }
if iSink.(*sink).pktCount(index) < 1 && utils.AVMediaTypeVideo == t.rtpTracks[index].mediaType { if sink_.pktCount(index) < 1 && utils.AVMediaTypeVideo == track.mediaType {
seq := binary.BigEndian.Uint16(data[2:]) seq := binary.BigEndian.Uint16(data[2:])
count := len(t.rtpTracks[index].header) count := len(track.header)
for i, rtp := range t.rtpTracks[index].header { for i, rtp := range track.header {
librtp.RollbackSeq(rtp, int(seq)-(count-i-1)) librtp.RollbackSeq(rtp[OverTcpHeaderSize:], int(seq)-(count-i-1))
iSink.(*sink).input(index, rtp) if sink_.tcp {
sink_.input(index, rtp)
} else {
sink_.input(index, rtp[OverTcpHeaderSize:])
}
} }
} }
iSink.(*sink).input(index, data) end := OverTcpHeaderSize + len(data)
t.overTCP(track.buffer[:end], index)
if sink_.tcp {
sink_.input(index, track.buffer[:end])
} else {
sink_.input(index, data)
}
} }
} }
func (t *tranStream) overTCP(data []byte, channel int) {
data[0] = OverTcpMagic
data[1] = byte(channel)
binary.BigEndian.PutUint16(data[2:], uint16(len(data)-4))
}
func (t *tranStream) Input(packet utils.AVPacket) error { func (t *tranStream) Input(packet utils.AVPacket) error {
stream_ := t.rtpTracks[packet.Index()] stream_ := t.rtpTracks[packet.Index()]
if utils.AVMediaTypeAudio == packet.MediaType() { if utils.AVMediaTypeAudio == packet.MediaType() {
@@ -135,7 +160,7 @@ func (t *tranStream) AddTrack(stream utils.AVStream) error {
muxer.SetAllocHandler(t.onAllocBuffer) muxer.SetAllocHandler(t.onAllocBuffer)
muxer.SetWriteHandler(t.onRtpPacket) muxer.SetWriteHandler(t.onRtpPacket)
t.rtpTracks = append(t.rtpTracks, NewRTPTrack(muxer, byte(payloadType.Pt), payloadType.ClockRate)) t.rtpTracks = append(t.rtpTracks, NewRTPTrack(muxer, byte(payloadType.Pt), payloadType.ClockRate, stream.Type()))
muxer.SetParams(len(t.rtpTracks) - 1) muxer.SetParams(len(t.rtpTracks) - 1)
return nil return nil
} }

View File

@@ -59,6 +59,7 @@ func PopWaitingSinks(sourceId string) []ISink {
var index = 0 var index = 0
for _, sink := range source { for _, sink := range source {
sinks[index] = sink sinks[index] = sink
index++
} }
delete(waitingSinks, sourceId) delete(waitingSinks, sourceId)