Clone
1
Source Code Analysis
yangjiechina edited this page 2024-11-20 15:43:22 +08:00
This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

前言

      在介绍系统架构之前,我们先回忆日常对流媒体服务的使用场景,无非就是推流端推流到服务器,再由服务器输出拉流端所需的流协议。单论业务逻辑,流媒体服务相比其他动辄数以百计的接口的业务系统可谓是简单之极。本文也将按照此逻辑流程,着重介绍每个角色在代码中是如何实现的,交互时是如何处理的。再多提一句,LKM定位是直播服务器,不涉及复杂的传输协议。初衷是为大家提供一款简单、纯粹的流媒体服务器,这点大家在代码中将深有体会,所以大家大可不必因陌生而产生恐惧。

如何做到高并发

      高并发的评判标准是什么需要参考哪些指标在提供同等服务质量的情况下所消耗的CPU、内存更低。其中CPU指标更为重要因为CPU过载将对整个系统所有服务带来影响。同样如果毫无节制的使用内存造成内存不足进而引发与磁盘交换空间带来的影响也是系统级的。两者不是互斥关系内存方面更多的是注意避免内存泄漏以及合理有效的使用内存缓存。所以下面只会围绕CPU的影响条件来展开讲。常见的消耗CPU多的地方

  1. 锁开销大
  2. 与内核态的交互频率过高

降低锁开销

      以生产者和消费者模型为例,假如一个生产者,多个消费者,分别运行在不同的线程,为保证读写安全,会加以锁保护。锁的开销会随着消费者数量的增加,蹭蹭往上涨。想要优化降低锁的开销,需要对业务有深刻的理解,以及较强的代码功底。除此之外,还有另外一种常见做法,通过线程切换来做到无锁并发。举个例子,生产者线程内部维护着一个任务队列(cas或锁的粒度小),将生产和消费任务都交给任务队列由生产线程处理。这样生产者和消费者之间不会有锁竞争,影响业务运作。当然这样做,也并非十全十美,其本质是将矛盾转移给了操作系统,同样线程切换的消耗也是系统级的。如何抉择,需要结合系统业务,自行评估。而协程切换是在用户态进行的,创建和切换开销小,避免了频繁的线程切换对系统性能的影响。

降低与内核态的交互频率

      无他加缓存。将多次写给fd的数据合并一次写入。LKM中合并写的目的就在此,一次发送数百毫秒的传输流,降低频率,减少开销。

推流端封装

      LKM中推流源抽象为Source接口,实现类PublishSource。具体协议的的推流源负责握手、协议解析、解析出通用的AVStream和AVPacket回调给PublishSource再生成各种协议的输出流,转发给拉流端。


type Source interface {
	// GetID 返回SourceID
	GetID() string

	SetID(id string)

	// Input 输入推流数据
	Input(data []byte) error

	// GetType 返回推流类型
	GetType() SourceType

	SetType(sourceType SourceType)

	// OriginStreams 返回推流的原始Streams
	OriginStreams() []utils.AVStream

	// TranscodeStreams 返回转码的Streams
	TranscodeStreams() []utils.AVStream

	// AddSink 添加Sink, 在此之前请确保Sink已经握手、授权通过. 如果Source还未WriteHeader先将Sink添加到等待队列.
	// 匹配拉流期望的编码器, 创建TransStream或向已经存在TransStream添加Sink
	AddSink(sink Sink)

	// RemoveSink 删除Sink
	RemoveSink(sink Sink)
    ...
}

针对每个推流源都将启动一个事件协程从StreamPipe读取推流数据、交给推流源的Input函数处理。从MainContextEvents管道读取需要在事件协程执行的函数。


// LoopEvent 循环读取事件
func LoopEvent(source Source) {
	// 将超时计时器放在此处开启, 方便在退出的时候关闭
	var receiveTimer *time.Timer
	var idleTimer *time.Timer

	defer func() {
		log.Sugar.Debugf("主协程执行结束 source: %s", source.GetID())

		// 关闭计时器
		if receiveTimer != nil {
			receiveTimer.Stop()
		}
		if idleTimer != nil {
			idleTimer.Stop()
		}
	}()

	// 开启收流超时计时器
	if AppConfig.Hooks.IsEnableOnReceiveTimeout() && AppConfig.ReceiveTimeout > 0 {
		receiveTimer = StartReceiveDataTimer(source)
	}

	// 开启拉流空闲超时计时器
	if AppConfig.Hooks.IsEnableOnIdleTimeout() && AppConfig.IdleTimeout > 0 {
		idleTimer = StartIdleTimer(source)
	}

	for {
		select {
		// 读取推流数据
		case data := <-source.StreamPipe():
			if AppConfig.ReceiveTimeout > 0 {
				source.SetLastPacketTime(time.Now())
			}
            
			if err := source.Input(data); err != nil {
				log.Sugar.Errorf("处理输入流失败 释放source:%s err:%s", source.GetID(), err.Error())
				source.DoClose()
				return
			}

			break
		// 切换到主协程,执行该函数. 目的是用于无锁化处理推拉流的连接与断开, 推流源断开, 查询推流源信息等事件. 不要做耗时操作, 否则会影响推拉流.
		case event := <-source.MainContextEvents():
			event()

			if source.IsClosed() {
				// 处理推流管道剩余的数据?
				return
			}

			break
		}
	}
}

RTMP对Input处理从rtmp_stack解析出Audio和Video的Message


func (p *Publisher) Input(data []byte) error {
	return p.stack.Input(nil, data)
}

GB28181对Input处理解析rtp包再解析PS流


func (source *BaseGBSource) Input(data []byte) error {
	// 国标级联转发
	for _, transStream := range source.TransStreams {
		if transStream.GetProtocol() != stream.TransStreamGBStreamForward {
			continue
		}

		bytes := transStream.(*ForwardStream).WrapData(data)
		rtpPacket := [1][]byte{bytes}
		source.DispatchBuffer(transStream, -1, rtpPacket[:], -1, true)
	}

	packet := rtp.Packet{}
	_ = packet.Unmarshal(data)
	return source.deMuxerCtx.Input(packet.Payload)
}

1078对Input处理丢给粘包解码器


func (s *Session) Input(data []byte) error {
	return s.decoder.Input(data)
}

所有推流源解析出AVpacket都将回调到OnDeMuxPacket函数再生成各种协议的输出流转发给拉流端。

func (s *PublishSource) OnDeMuxPacket(packet utils.AVPacket) {
	if AppConfig.GOPCache && s.existVideo {
		s.gopBuffer.AddPacket(packet)
	}

	// 分发给各个传输流
	for _, transStream := range s.TransStreams {
		s.DispatchPacket(transStream, packet)
	}

	// 未开启GOP缓存或只存在音频流, 释放掉内存
	if !AppConfig.GOPCache || !s.existVideo {
		s.FindOrCreatePacketBuffer(packet.Index(), packet.MediaType()).FreeTail()
	}
}

// DispatchPacket 分发AVPacket
func (s *PublishSource) DispatchPacket(transStream TransStream, packet utils.AVPacket) {
	data, timestamp, videoKey, err := transStream.Input(packet)
	if err != nil || len(data) < 1 {
		return
	}

	s.DispatchBuffer(transStream, packet.Index(), data, timestamp, videoKey)
}

// DispatchBuffer 分发传输流
func (s *PublishSource) DispatchBuffer(transStream TransStream, index int, data [][]byte, timestamp int64, videoKey bool) {
	sinks := s.TransStreamSinks[transStream.GetID()]
	exist := transStream.IsExistVideo()
	for _, sink := range sinks {

		// 如果存在视频, 确保向sink发送的第一帧是关键帧
		if exist && sink.GetSentPacketCount() < 1 {
			if !videoKey {
				continue
			}

			if extraData, _, _ := transStream.ReadExtraData(timestamp); len(extraData) > 0 {
				s.write(sink, index, extraData, timestamp)
			}
		}

		s.write(sink, index, data, timestamp)
	}
}

// 向sink推流
func (s *PublishSource) write(sink Sink, index int, data [][]byte, timestamp int64) {
	err := sink.Write(index, data, timestamp)
	if err == nil {
		sink.IncreaseSentPacketCount()
		//return
	}

	// 推流超时, 可能是服务器或拉流端带宽不够、拉流端不读取数据等情况造成内核发送缓冲区满, 进而阻塞.
	// 直接关闭连接. 当然也可以将sink先挂起, 后续再继续推流.
	_, ok := err.(*transport.ZeroWindowSizeError)
	if ok {
		log.Sugar.Errorf("向sink推流超时,关闭连接. sink: %s", sink.GetID())
		sink.Close()
	}
}

拉流和结束拉流都切换到事件协程执行。


func (s *PublishSource) PostEvent(cb func()) {
	s.mainContextEvents <- cb
}

func (s *PublishSource) AddSink(sink Sink) {
	s.PostEvent(func() {
		if !s.completed {
			AddSinkToWaitingQueue(sink.GetSourceID(), sink)
		} else {
			if !s.doAddSink(sink) {
				sink.Close()
			}
		}
	})
}

func (s *PublishSource) RemoveSink(sink Sink) {
	s.PostEvent(func() {
		s.doRemoveSink(sink)
	})
}

拉流端封装

拉流端封装为Sink。每个拉流Server创建对应协议的Sink,添加到Source根据拉流协议创建对应的TransStream。

RTMP创建Sink


func (s *Session) OnPlay(app, stream_ string) utils.HookState {
	streamName, values := stream.ParseUrl(stream_)

	sourceId := s.generateSourceID(app, streamName)
	sink := NewSink(stream.NetAddr2SinkId(s.conn.RemoteAddr()), sourceId, s.conn, s.stack)
	sink.SetUrlValues(values)

	log.Sugar.Infof("rtmp onplay app: %s stream: %s sink: %v conn: %s", app, stream_, sink.GetID(), s.conn.RemoteAddr().String())

	_, state := stream.PreparePlaySink(sink)
	if utils.HookStateOK != state {
		log.Sugar.Errorf("rtmp拉流失败 source: %s sink: %s", sourceId, sink.GetID())
	} else {
		s.handle = sink
	}

	return state
}

Http-FLV创建Sink


func (api *ApiServer) onHttpFLV(sourceId string, w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "video/x-flv")
	w.Header().Set("Connection", "Keep-Alive")
	w.Header().Set("Transfer-Encoding", "chunked")

	hj, ok := w.(http.Hijacker)
	if !ok {
		http.Error(w, "webserver doesn't support hijacking", http.StatusInternalServerError)
		return
	}

	w.WriteHeader(http.StatusOK)
	conn, _, err := hj.Hijack()
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}

	sink := flv.NewFLVSink(api.generateSinkID(r.RemoteAddr), sourceId, conn)
	sink.SetUrlValues(r.URL.Query())
	log.Sugar.Infof("http-flv 连接 sink:%s", sink.String())

	_, state := stream.PreparePlaySink(sink)
	if utils.HookStateOK != state {
		log.Sugar.Warnf("http-flv 播放失败 sink:%s", sink.String())

		w.WriteHeader(http.StatusForbidden)
		return
	}

	bytes := make([]byte, 64)
	for {
		if _, err := conn.Read(bytes); err != nil {
			log.Sugar.Infof("http-flv 断开连接 sink:%s", sink.String())
			sink.Close()
			break
		}
	}
}

Source创建对应的TransStream。


// 创建sink需要的输出流
func (s *PublishSource) doAddSink(sink Sink) bool {
    ....

	var streams [5]utils.AVStream
	var size int

	for _, stream_ := range s.originStreams.All() {
		if disableVideo && stream_.Type() == utils.AVMediaTypeVideo {
			continue
		}

		streams[size] = stream_
		size++
	}

    // 查找或创建对应的TransStream
	transStreamId := GenerateTransStreamID(sink.GetProtocol(), streams[:size]...)
	transStream, exist := s.TransStreams[transStreamId]
	if !exist {
		var err error
		transStream, err = s.CreateTransStream(transStreamId, sink.GetProtocol(), streams[:size])
		if err != nil {
			log.Sugar.Errorf("创建传输流失败 err: %s source: %s", err.Error(), s.ID)
			return false
		}

		s.TransStreams[transStreamId] = transStream
	}

	sink.SetTransStreamID(transStreamId)

	{
		sink.Lock()
		defer sink.UnLock()

		if SessionStateClosed == sink.GetState() {
			log.Sugar.Warnf("AddSink失败, sink已经断开连接 %s", sink.String())
		} else {
			sink.SetState(SessionStateTransferring)
		}
	}

	err := sink.StartStreaming(transStream)
	if err != nil {
		log.Sugar.Errorf("开始推流失败 err: %s", err.Error())
		return false
	}

	// 还没做好准备(rtsp拉流还在协商sdp中), 暂不推流
	if !sink.IsReady() {
		return true
	}

	// TCP拉流开启异步发包, 一旦出现网络不好的链路, 其余正常链路不受影响.
	conn, ok := sink.GetConn().(*transport.Conn)
	if ok && sink.IsTCPStreaming() && transStream.OutStreamBufferCapacity() > 2 {
		conn.EnableAsyncWriteMode(transStream.OutStreamBufferCapacity() - 2)
	}

	// 发送已有的缓存数据
	// 此处发送缓存数据必须要存在关键帧的输出流才发否则等DispatchPacket时再发送extra。
	data, timestamp, _ := transStream.ReadKeyFrameBuffer()
	if len(data) > 0 {
		if extraData, _, _ := transStream.ReadExtraData(timestamp); len(extraData) > 0 {
			s.write(sink, 0, extraData, timestamp)
		}

		s.write(sink, 0, data, timestamp)
	}

	// 累加拉流计数
	if s.recordSink != sink {
		s.sinkCount++
		log.Sugar.Infof("sink count: %d source: %s", s.sinkCount, s.ID)
	}

	s.sinks[sink.GetID()] = sink
	s.TransStreamSinks[transStreamId][sink.GetID()] = sink

	// 新建传输流,发送已经缓存的音视频帧
	if !exist && AppConfig.GOPCache && s.existVideo {
		s.DispatchGOPBuffer(transStream)
	}

	return true
}

传输流封装

TranStream 调用Input函数输入AVPacket按照流协议封装音视频帧返回传输流。需要注意一点目前传输流只会生成一份转发给该协议的所有Sink如果某个Sink拉流从未断开而Source是重新推流该Sink会出现时间戳错误可能会影响到播放。

RTMP封装传输流Input返回值分别为由chunk组成的合并写块、时间戳、合并写块是否存在关键视频帧。ReadExtraData函数返回传输流的编码器信息ReadKeyFrameBuffer函数返回最近的包含视频关键帧的合并写切片。Sink在拉流时会优先调用这2个函数将最新的传输流缓存发送给Sink实现视频秒开。

func (t *transStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, error) {
	t.ClearOutStreamBuffer()

	var data []byte
	var chunk *librtmp.Chunk
	var videoPkt bool
	var videoKey bool
	// rtmp chunk消息体的数据大小
	var payloadSize int
	// 先向rtmp buffer写的flv头,再按照chunk size分割,所以第一个chunk要跳过flv头大小
	var chunkPayloadOffset int
	var dts int64
	var pts int64

	dts = packet.ConvertDts(1000)
	pts = packet.ConvertPts(1000)
	ct := pts - dts

	if utils.AVMediaTypeAudio == packet.MediaType() {
		data = packet.Data()
		chunk = &t.audioChunk
		chunkPayloadOffset = 2
		payloadSize += chunkPayloadOffset + len(data)
	} else if utils.AVMediaTypeVideo == packet.MediaType() {
		videoPkt = true
		videoKey = packet.KeyFrame()
		data = packet.AVCCPacketData()
		chunk = &t.videoChunk
		chunkPayloadOffset = t.muxer.ComputeVideoDataSize(uint32(ct))
		payloadSize += chunkPayloadOffset + len(data)
	}

	// 遇到视频关键帧, 发送剩余的流, 创建新切片
	if videoKey {
		if segment := t.MWBuffer.FlushSegment(); len(segment) > 0 {
			t.AppendOutStreamBuffer(segment)
		}
	}

	// 分配内存
	// 固定type0
	chunkHeaderSize := 12
	// type3chunk数量
	numChunks := (payloadSize - 1) / t.chunkSize
	rtmpMsgSize := chunkHeaderSize + payloadSize + numChunks
	// 如果时间戳超过3字节, 每个chunk都需要多4字节的扩展时间戳
	if dts >= 0xFFFFFF && dts <= 0xFFFFFFFF {
		rtmpMsgSize += (1 + numChunks) * 4
	}

	allocate := t.MWBuffer.Allocate(rtmpMsgSize, dts, videoKey)

	// 写chunk header
	chunk.Length = payloadSize
	chunk.Timestamp = uint32(dts)
	n := chunk.ToBytes(allocate)

	// 写flv
	if videoPkt {
		n += t.muxer.WriteVideoData(allocate[n:], uint32(ct), packet.KeyFrame(), false)
	} else {
		n += t.muxer.WriteAudioData(allocate[n:], false)
	}

	n += chunk.WriteData(allocate[n:], data, t.chunkSize, chunkPayloadOffset)
	utils.Assert(len(allocate) == n)

	// 合并写满了再发
	if segment := t.MWBuffer.PeekCompletedSegment(); len(segment) > 0 {
		t.AppendOutStreamBuffer(segment)
	}

	return t.OutBuffer[:t.OutBufferSize], 0, true, nil
}

func (t *transStream) ReadExtraData(_ int64) ([][]byte, int64, error) {
	utils.Assert(t.headerSize > 0)

	// 发送sequence header
	return [][]byte{t.header[:t.headerSize]}, 0, nil
}

func (t *transStream) ReadKeyFrameBuffer() ([][]byte, int64, error) {
	t.ClearOutStreamBuffer()

	// 发送当前内存池已有的合并写切片
	t.MWBuffer.ReadSegmentsFromKeyFrameIndex(func(bytes []byte) {
		t.AppendOutStreamBuffer(bytes)
	})

	return t.OutBuffer[:t.OutBufferSize], 0, nil
}