mirror of
https://github.com/lkmio/lkm.git
synced 2025-09-26 19:21:14 +08:00
158 lines
3.7 KiB
Go
158 lines
3.7 KiB
Go
package rtsp
|
|
|
|
import (
|
|
"github.com/lkmio/avformat/collections"
|
|
"github.com/lkmio/avformat/utils"
|
|
"github.com/lkmio/lkm/log"
|
|
"github.com/lkmio/lkm/stream"
|
|
"github.com/lkmio/rtp"
|
|
"github.com/lkmio/transport"
|
|
"github.com/pion/rtcp"
|
|
"net"
|
|
"time"
|
|
)
|
|
|
|
var (
|
|
TransportManger transport.Manager
|
|
)
|
|
|
|
// Sink rtsp拉流sink
|
|
// 对于udp而言, 每个sink维护多个transport
|
|
// tcp使用信令链路传输
|
|
type Sink struct {
|
|
stream.BaseSink
|
|
|
|
senders []*rtp.RtpSender // 一个rtsp源, 可能存在多个流, 每个流都需要拉取
|
|
cb func(sdp string) // sdp回调, 响应describe
|
|
}
|
|
|
|
func (s *Sink) StartStreaming(transStream stream.TransStream) error {
|
|
utils.Assert(transStream.TrackSize() > 0)
|
|
if s.senders != nil {
|
|
return nil
|
|
}
|
|
|
|
s.senders = make([]*rtp.RtpSender, transStream.TrackSize())
|
|
// sdp回调给sink, sink应答给describe请求
|
|
if s.cb != nil {
|
|
s.cb(transStream.(*TransStream).sdp)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Sink) AddSender(index int, tcp bool, ssrc uint32) (uint16, uint16, error) {
|
|
utils.Assert(index < cap(s.senders))
|
|
utils.Assert(s.senders[index] == nil)
|
|
|
|
var err error
|
|
var rtpPort uint16
|
|
var rtcpPort uint16
|
|
|
|
sender := rtp.RtpSender{
|
|
SSRC: ssrc,
|
|
}
|
|
|
|
if tcp {
|
|
s.TCPStreaming = true
|
|
} else {
|
|
sender.Rtp, err = TransportManger.NewUDPServer()
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
|
|
sender.Rtcp, err = TransportManger.NewUDPServer()
|
|
if err != nil {
|
|
sender.Rtp.Close()
|
|
sender.Rtp = nil
|
|
return 0, 0, err
|
|
}
|
|
|
|
sender.Rtp.SetHandler2(nil, sender.OnRTPPacket, nil)
|
|
sender.Rtcp.SetHandler2(nil, sender.OnRTCPPacket, nil)
|
|
sender.Rtp.(*transport.UDPServer).Receive()
|
|
sender.Rtcp.(*transport.UDPServer).Receive()
|
|
|
|
rtpPort = uint16(sender.Rtp.ListenPort())
|
|
rtcpPort = uint16(sender.Rtcp.ListenPort())
|
|
}
|
|
|
|
s.senders[index] = &sender
|
|
return rtpPort, rtcpPort, err
|
|
}
|
|
|
|
func (s *Sink) Write(index int, data []*collections.ReferenceCounter[[]byte], rtpTime int64, keyVideo bool) error {
|
|
// 拉流方还没有连接上来
|
|
if index >= cap(s.senders) || s.senders[index] == nil {
|
|
return nil
|
|
}
|
|
|
|
for _, bytes := range data {
|
|
sender := s.senders[index]
|
|
sender.PktCount++
|
|
sender.OctetCount += len(bytes.Get())
|
|
if s.TCPStreaming {
|
|
return s.BaseSink.Write(index, data, rtpTime, keyVideo)
|
|
} else if sender.RtpConn != nil {
|
|
// 发送rtcp sr包
|
|
sender.RtpConn.Write(bytes.Get()[OverTcpHeaderSize:])
|
|
|
|
if sender.RtcpConn == nil || sender.PktCount%100 != 0 {
|
|
continue
|
|
}
|
|
|
|
nano := time.Now().UnixNano()
|
|
seconds := uint64(nano/1e9 + 2208988800)
|
|
fraction := uint64((nano % 1e9) * (1 << 32) / 1e9)
|
|
ntp := (seconds << 32) | fraction
|
|
sr := rtcp.SenderReport{
|
|
SSRC: sender.SSRC,
|
|
NTPTime: ntp,
|
|
RTPTime: uint32(rtpTime),
|
|
PacketCount: uint32(sender.PktCount),
|
|
OctetCount: uint32(sender.OctetCount),
|
|
}
|
|
|
|
marshal, err := sr.Marshal()
|
|
if err != nil {
|
|
log.Sugar.Errorf("创建rtcp sr消息失败 err:%s msg:%v", err.Error(), sr)
|
|
}
|
|
|
|
sender.RtcpConn.Write(marshal)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// 拉流链路是否已经连接上
|
|
// 拉流测发送了play请求, 并且对于udp而言, 还需要收到nat穿透包
|
|
func (s *Sink) isConnected(index int) bool {
|
|
return s.TCPStreaming || (s.senders[index] != nil && s.senders[index].RtpConn != nil)
|
|
}
|
|
|
|
func (s *Sink) Close() {
|
|
s.BaseSink.Close()
|
|
|
|
for _, sender := range s.senders {
|
|
if sender == nil {
|
|
continue
|
|
}
|
|
|
|
if sender.Rtp != nil {
|
|
sender.Rtp.Close()
|
|
}
|
|
|
|
if sender.Rtcp != nil {
|
|
sender.Rtcp.Close()
|
|
}
|
|
}
|
|
}
|
|
|
|
func NewSink(id stream.SinkID, sourceId string, conn net.Conn, cb func(sdp string)) stream.Sink {
|
|
return &Sink{
|
|
stream.BaseSink{ID: id, SourceID: sourceId, State: stream.SessionStateCreated, Protocol: stream.TransStreamRtsp, Conn: conn},
|
|
nil,
|
|
cb,
|
|
}
|
|
}
|