diff --git a/api.go b/api.go index 6af643f..fbc051a 100644 --- a/api.go +++ b/api.go @@ -1,25 +1,30 @@ package main import ( + "context" "github.com/gorilla/mux" "github.com/yangjiechina/avformat/utils" - "github.com/yangjiechina/live-server/hls" + "github.com/yangjiechina/live-server/flv" "github.com/yangjiechina/live-server/stream" + "net" "net/http" + "strings" "time" ) func startApiServer(addr string) { r := mux.NewRouter() - r.HandleFunc("/live/hls/{id}", onHLS) + r.HandleFunc("/live/flv/{source}", onFLV) + r.HandleFunc("/live/hls/{source}", onHLS) + http.Handle("/", r) srv := &http.Server{ Handler: r, Addr: addr, // Good practice: enforce timeouts for servers you create! - WriteTimeout: 15 * time.Second, - ReadTimeout: 15 * time.Second, + WriteTimeout: 30 * time.Second, + ReadTimeout: 30 * time.Second, } err := srv.ListenAndServe() @@ -29,15 +34,21 @@ func startApiServer(addr string) { } } -func onHLS(w http.ResponseWriter, r *http.Request) { +func onFLV(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) - sourceId := vars["id"] + source := vars["source"] + + w.Header().Set("Content-Type", "video/x-flv") + w.Header().Set("Connection", "Keep-Alive") + w.Header().Set("Transfer-Encoding", "chunked") hj, ok := w.(http.Hijacker) if !ok { http.Error(w, "webserver doesn't support hijacking", http.StatusInternalServerError) return } + context_ := r.Context() + w.WriteHeader(http.StatusOK) conn, _, err := hj.Hijack() if err != nil { @@ -45,19 +56,43 @@ func onHLS(w http.ResponseWriter, r *http.Request) { return } + var sourceId string + if index := strings.LastIndex(source, "."); index > -1 { + sourceId = source[:index] + } + + tcpAddr, _ := net.ResolveTCPAddr("tcp", r.RemoteAddr) + sinkId := stream.GenerateSinkId(tcpAddr) + sink := flv.NewFLVSink(sinkId, sourceId, conn) + + go func(ctx context.Context) { + sink.(*stream.SinkImpl).Play(sink, func() { + //sink.(*stream.SinkImpl).PlayDone(sink, nil, nil) + }, func(state utils.HookState) { + conn.Close() + }) + }(context_) +} + +func onHLS(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + source := vars["source"] + w.Header().Set("Content-Type", "application/vnd.apple.mpegurl") - sinkId := stream.GenerateSinkId(conn) - /* requestTS := strings.HasSuffix(r.URL.Path, ".ts") - if requestTS { - stream.sink - }*/ - - sink := hls.NewSink(sinkId, sourceId, w) - sink.(*stream.SinkImpl).Play(sink, func() { - - }, func(state utils.HookState) { - w.WriteHeader(http.StatusForbidden) - }) + //删除末尾的.ts/.m3u8, 请确保id中不存在. + //var sourceId string + //if index := strings.LastIndex(source, "."); index > -1 { + // sourceId = source[:index] + //} + // + //tcpAddr, _ := net.ResolveTCPAddr("tcp", r.RemoteAddr) + //sinkId := stream.GenerateSinkId(tcpAddr) + if strings.HasSuffix(source, ".m3u8") { + //查询是否存在hls流, 不存在-等生成后再响应m3u8文件. 存在-直接响应m3u8文件 + http.ServeFile(w, r, "../tmp/"+source) + } else if strings.HasSuffix(source, ".ts") { + http.ServeFile(w, r, "../tmp/"+source) + } } diff --git a/flv/flv_sink.go b/flv/flv_sink.go new file mode 100644 index 0000000..ed3eb51 --- /dev/null +++ b/flv/flv_sink.go @@ -0,0 +1,10 @@ +package flv + +import ( + "github.com/yangjiechina/live-server/stream" + "net" +) + +func NewFLVSink(id stream.SinkId, sourceId string, conn net.Conn) stream.ISink { + return &stream.SinkImpl{Id_: id, SourceId_: sourceId, Protocol_: stream.ProtocolFlv, Conn: conn} +} diff --git a/flv/http_flv.go b/flv/http_flv.go new file mode 100644 index 0000000..11aa187 --- /dev/null +++ b/flv/http_flv.go @@ -0,0 +1,191 @@ +package flv + +import ( + "encoding/binary" + "fmt" + "github.com/yangjiechina/avformat/libflv" + "github.com/yangjiechina/avformat/utils" + "github.com/yangjiechina/live-server/stream" +) + +const ( + // HttpFlvBlockLengthSize 响应flv数据时,需要添加flv块长度, 缓存时预留该大小字节 + HttpFlvBlockLengthSize = 20 +) + +var separator []byte + +func init() { + separator = make([]byte, 2) + separator[0] = 0x0D + separator[1] = 0x0A +} + +type httpTransStream struct { + stream.CacheTransStream + muxer libflv.Muxer + header []byte + headerSize int +} + +func NewHttpTransStream() stream.ITransStream { + return &httpTransStream{ + muxer: libflv.NewMuxer(), + header: make([]byte, 1024), + headerSize: HttpFlvBlockLengthSize + 9, + } +} + +func (t *httpTransStream) Input(packet utils.AVPacket) error { + var flvSize int + var data []byte + var videoKey bool + + if utils.AVMediaTypeAudio == packet.MediaType() { + flvSize = 17 + len(packet.Data()) + data = packet.Data() + } else if utils.AVMediaTypeVideo == packet.MediaType() { + flvSize = 20 + len(packet.AVCCPacketData()) + data = packet.AVCCPacketData() + videoKey = packet.KeyFrame() + } + + if videoKey { + head, _ := t.StreamBuffers[0].Data() + if len(head) > t.SegmentOffset { + t.StreamBuffers[0].Mark() + t.StreamBuffers[0].Allocate(2) + t.StreamBuffers[0].Fetch() + + head, _ = t.StreamBuffers[0].Data() + t.writeSeparator(head[t.SegmentOffset:]) + skip := t.computeSikCount(head[t.SegmentOffset:]) + t.SendPacketWithOffset(head, t.SegmentOffset+skip) + } + + t.SwapStreamBuffer() + } + + var n int + var separatorSize int + full := t.Full(packet.Pts()) + if head, _ := t.StreamBuffers[0].Data(); t.SegmentOffset == len(head) { + separatorSize = HttpFlvBlockLengthSize + //10字节描述flv包长, 前2个字节描述无效字节长度 + n = HttpFlvBlockLengthSize + } + if full { + separatorSize = 2 + } + + t.StreamBuffers[0].Mark() + allocate := t.StreamBuffers[0].Allocate(separatorSize + flvSize) + n += t.muxer.Input(allocate[n:], packet.MediaType(), len(data), packet.Dts(), packet.Pts(), packet.KeyFrame(), false) + copy(allocate[n:], data) + _ = t.StreamBuffers[0].Fetch() + + if !full { + return nil + } + + head, _ := t.StreamBuffers[0].Data() + //添加长度和换行符 + //每一个合并写切片开始和预留长度所需的字节数 + //合并写切片末尾加上换行符 + //长度是16进制字符串 + t.writeSeparator(head[t.SegmentOffset:]) + + skip := t.computeSikCount(head[t.SegmentOffset:]) + t.SendPacketWithOffset(head, t.SegmentOffset+skip) + return nil +} + +func (t *httpTransStream) AddTrack(stream utils.AVStream) error { + if err := t.TransStreamImpl.AddTrack(stream); err != nil { + return err + } + + var data []byte + if utils.AVMediaTypeAudio == stream.Type() { + t.muxer.AddAudioTrack(stream.CodecId(), 0, 0, 0) + data = stream.Extra() + } else if utils.AVMediaTypeVideo == stream.Type() { + t.muxer.AddVideoTrack(stream.CodecId()) + data, _ = stream.M4VCExtraData() + } + + t.headerSize += t.muxer.Input(t.header[t.headerSize:], stream.Type(), len(data), 0, 0, false, true) + copy(t.header[t.headerSize:], data) + t.headerSize += len(data) + return nil +} + +func (t *httpTransStream) sendBuffer(sink stream.ISink, data []byte) error { + return sink.Input(data[t.computeSikCount(data):]) +} + +func (t *httpTransStream) computeSikCount(data []byte) int { + return int(6 + binary.BigEndian.Uint16(data[4:])) +} + +func (t *httpTransStream) AddSink(sink stream.ISink) error { + utils.Assert(t.headerSize > 0) + + t.TransStreamImpl.AddSink(sink) + //发送sequence header + t.sendBuffer(sink, t.header[:t.headerSize]) + + send := func(sink stream.ISink, data []byte) { + var index int + for ; index < len(data); index += 4 { + size := binary.BigEndian.Uint32(data[index:]) + t.sendBuffer(sink, data[index:index+4+int(size)]) + index += int(size) + } + } + + //发送当前内存池已有的合并写切片 + if t.SegmentOffset > 0 { + data, _ := t.StreamBuffers[0].Data() + utils.Assert(len(data) > 0) + send(sink, data[:t.SegmentOffset]) + return nil + } + + //发送上一组GOP + if t.StreamBuffers[1] != nil && !t.StreamBuffers[1].Empty() { + data, _ := t.StreamBuffers[1].Data() + utils.Assert(len(data) > 0) + send(sink, data) + return nil + } + + return nil +} + +func (t *httpTransStream) writeSeparator(dst []byte) { + + dst[HttpFlvBlockLengthSize-2] = 0x0D + dst[HttpFlvBlockLengthSize-1] = 0x0A + + flvSize := len(dst) - HttpFlvBlockLengthSize - 2 + hexStr := fmt.Sprintf("%X", flvSize) + //长度+换行符 + n := len(hexStr) + 2 + binary.BigEndian.PutUint16(dst[4:], uint16(HttpFlvBlockLengthSize-n-6)) + copy(dst[HttpFlvBlockLengthSize-n:], hexStr) + + dst[HttpFlvBlockLengthSize+flvSize] = 0x0D + dst[HttpFlvBlockLengthSize+flvSize+1] = 0x0A + + binary.BigEndian.PutUint32(dst, uint32(len(dst)-4)) +} + +func (t *httpTransStream) WriteHeader() error { + t.Init() + + _ = t.muxer.WriteHeader(t.header[HttpFlvBlockLengthSize:]) + t.headerSize += 2 + t.writeSeparator(t.header[:t.headerSize]) + return nil +} diff --git a/main.go b/main.go index 3d036a2..82105c2 100644 --- a/main.go +++ b/main.go @@ -1,6 +1,7 @@ package main import ( + "github.com/yangjiechina/live-server/flv" "github.com/yangjiechina/live-server/hls" "net" "net/http" @@ -27,6 +28,8 @@ func CreateTransStream(source stream.ISource, protocol stream.Protocol, streams } return transStream + } else if stream.ProtocolFlv == protocol { + return flv.NewHttpTransStream() } return nil diff --git a/rtmp/rtmp_session.go b/rtmp/rtmp_session.go index 3af487d..c179fe8 100644 --- a/rtmp/rtmp_session.go +++ b/rtmp/rtmp_session.go @@ -55,7 +55,7 @@ func (s *sessionImpl) OnPlay(app, stream_ string, response chan utils.HookState) sourceId := app + "_" + stream_ //拉流事件Sink统一处理 - sink := NewSink(stream.GenerateSinkId(s.conn), sourceId, s.conn) + sink := NewSink(stream.GenerateSinkId(s.conn.RemoteAddr()), sourceId, s.conn) sink.(*stream.SinkImpl).Play(sink, func() { s.handle = sink response <- utils.HookStateOK diff --git a/rtmp/rtmp_stream.go b/rtmp/rtmp_stream.go index 6222071..e280223 100644 --- a/rtmp/rtmp_stream.go +++ b/rtmp/rtmp_stream.go @@ -1,7 +1,6 @@ package rtmp import ( - "fmt" "github.com/yangjiechina/avformat/libflv" "github.com/yangjiechina/avformat/librtmp" "github.com/yangjiechina/avformat/utils" @@ -9,41 +8,19 @@ import ( ) type TransStream struct { - stream.TransStreamImpl + stream.CacheTransStream chunkSize int //sequence header header []byte headerSize int - muxer *libflv.Muxer + muxer libflv.Muxer - //只存在音频流 - onlyAudio bool audioChunk librtmp.Chunk videoChunk librtmp.Chunk - - //只需要缓存一组GOP+第2组GOP的第一个合并写切片 - //当缓存到第2组GOP的第二个合并写切片时,将上一个GOP缓存释放掉 - //使用2块内存池,分别缓存2个GOP,保证内存连续,一次发送 - //不开启GOP缓存和只有音频包的情况下,创建使用一个MemoryPool - memoryPool [2]stream.MemoryPool - - //当前合并写切片的缓存时长 - segmentDuration int - //当前合并写切片位于memoryPool的开始偏移量 - segmentOffset int - //前一个包的时间戳 - prePacketTS int64 - - firstVideoPacket bool - - //发送未完整切片的Sinks - //当AddSink时,还未缓存到一组切片,有多少先发多少. 后续切片未满之前的生成的rtmp包都将直接发送给sink. - //只要满了一组切片后,这些sink都不单独发包, 统一发送切片. - incompleteSinks []stream.ISink } func NewTransStream(chunkSize int) stream.ITransStream { - transStream := &TransStream{chunkSize: chunkSize, TransStreamImpl: stream.TransStreamImpl{Sinks: make(map[stream.SinkId]stream.ISink, 64)}} + transStream := &TransStream{chunkSize: chunkSize} return transStream } @@ -64,15 +41,6 @@ func (t *TransStream) Input(packet utils.AVPacket) error { chunk = &t.audioChunk payloadSize += 2 + length } else if utils.AVMediaTypeVideo == packet.MediaType() { - //首帧必须要视频关键帧 - if !t.firstVideoPacket { - if !packet.KeyFrame() { - return fmt.Errorf("the first video frame must be a keyframe") - } - - t.firstVideoPacket = true - } - videoPkt = true videoKey = packet.KeyFrame() data = packet.AVCCPacketData() @@ -81,33 +49,16 @@ func (t *TransStream) Input(packet utils.AVPacket) error { payloadSize += 5 + length } - //即不开启GOP缓存又不合并发送. 直接使用AVPacket的预留头封装发送 - if !stream.AppConfig.GOPCache || t.onlyAudio { - //首帧视频帧必须要关键帧 - return nil - } - if videoKey { - tmp := t.memoryPool[0] + tmp := t.StreamBuffers[0] head, _ := tmp.Data() - if len(head) > t.segmentOffset { - for _, sink := range t.Sinks { - sink.Input(head[t.segmentOffset:]) - } - } - - t.memoryPool[0].Clear() - //交替使用缓存 - t.memoryPool[0] = t.memoryPool[1] - t.memoryPool[1] = tmp - - t.segmentDuration = 0 - t.segmentOffset = 0 + t.SendPacket(head[t.SegmentOffset:]) + t.SwapStreamBuffer() } //分配内存 - t.memoryPool[0].Mark() - allocate := t.memoryPool[0].Allocate(12 + payloadSize + (payloadSize / t.chunkSize)) + t.StreamBuffers[0].Mark() + allocate := t.StreamBuffers[0].Allocate(12 + payloadSize + (payloadSize / t.chunkSize)) //写chunk头 chunk.Length = payloadSize @@ -119,77 +70,19 @@ func (t *TransStream) Input(packet utils.AVPacket) error { ct := packet.Pts() - packet.Dts() if videoPkt { n += t.muxer.WriteVideoData(allocate[12:], uint32(ct), packet.KeyFrame(), false) + n += chunk.WriteData(allocate[n:], data, t.chunkSize) } else { n += t.muxer.WriteAudioData(allocate[12:], false) + n += chunk.WriteData(allocate[n:], data, t.chunkSize) } - first := true - for length > 0 { - var min int - if first { - min = utils.MinInt(length, t.chunkSize-5) - first = false - } else { - min = utils.MinInt(length, t.chunkSize) - } - - copy(allocate[n:], data[:min]) - n += min - - length -= min - data = data[min:] - - //写一个ChunkType3用作分割 - if length > 0 { - if videoPkt { - allocate[n] = (0x3 << 6) | byte(librtmp.ChunkStreamIdVideo) - } else { - allocate[n] = (0x3 << 6) | byte(librtmp.ChunkStreamIdAudio) - } - n++ - } - } - - rtmpData := t.memoryPool[0].Fetch()[:n] - t.segmentDuration += int(packet.Pts() - t.prePacketTS) - t.prePacketTS = packet.Pts() - - //给不完整切片的Sink补齐包 - if len(t.incompleteSinks) > 0 { - for _, sink := range t.incompleteSinks { - sink.Input(rtmpData) - } - - if t.segmentDuration >= stream.AppConfig.MergeWriteLatency { - head, tail := t.memoryPool[0].Data() - utils.Assert(len(tail) == 0) - - t.segmentOffset = len(head) - t.segmentDuration = 0 - t.incompleteSinks = nil - } - + _ = t.StreamBuffers[0].Fetch()[:n] + if t.Full(packet.Pts()) { return nil } - if t.segmentDuration < stream.AppConfig.MergeWriteLatency { - return nil - } - - head, tail := t.memoryPool[0].Data() - utils.Assert(len(tail) == 0) - for _, sink := range t.Sinks { - sink.Input(head[t.segmentOffset:]) - } - - t.segmentOffset = len(head) - t.segmentDuration = 0 - - //当缓存到第2组GOP的第二个合并写切片时,将上一个GOP缓存释放掉 - if t.segmentOffset > len(head) && t.memoryPool[1] != nil && !t.memoryPool[1].Empty() { - t.memoryPool[1].Clear() - } - + head, _ := t.StreamBuffers[0].Data() + t.SendPacketWithOffset(head[:], t.SegmentOffset) return nil } @@ -200,36 +93,22 @@ func (t *TransStream) AddSink(sink stream.ISink) error { //发送sequence header sink.Input(t.header[:t.headerSize]) - if !stream.AppConfig.GOPCache || t.onlyAudio { - return nil - } - //发送当前内存池已有的合并写切片 - if t.segmentOffset > 0 { - data, tail := t.memoryPool[0].Data() + if t.SegmentOffset > 0 { + data, _ := t.StreamBuffers[0].Data() utils.Assert(len(data) > 0) - utils.Assert(len(tail) == 0) - sink.Input(data[:t.segmentOffset]) + sink.Input(data[:t.SegmentOffset]) return nil } //发送上一组GOP - if t.memoryPool[1] != nil && !t.memoryPool[1].Empty() { - data, tail := t.memoryPool[0].Data() + if t.StreamBuffers[1] != nil && !t.StreamBuffers[1].Empty() { + data, _ := t.StreamBuffers[0].Data() utils.Assert(len(data) > 0) - utils.Assert(len(tail) == 0) sink.Input(data) return nil } - //不足一个合并写切片, 有多少发多少 - data, tail := t.memoryPool[0].Data() - utils.Assert(len(tail) == 0) - if len(data) > 0 { - sink.Input(data) - t.incompleteSinks = append(t.incompleteSinks, sink) - } - return nil } @@ -237,6 +116,8 @@ func (t *TransStream) WriteHeader() error { utils.Assert(t.Tracks != nil) utils.Assert(!t.TransStreamImpl.Completed) + t.Init() + var audioStream utils.AVStream var videoStream utils.AVStream var audioCodecId utils.AVCodecID @@ -259,16 +140,16 @@ func (t *TransStream) WriteHeader() error { //初始化 t.TransStreamImpl.Completed = true t.header = make([]byte, 1024) - t.muxer = libflv.NewMuxer(audioCodecId, videoCodecId, 0, 0, 0) - - if stream.AppConfig.GOPCache { - //创建2块内存 - t.memoryPool[0] = stream.NewMemoryPoolWithDirect(1024*4000, true) - t.memoryPool[1] = stream.NewMemoryPoolWithDirect(1024*4000, true) - } else { - + t.muxer = libflv.NewMuxer() + if utils.AVCodecIdNONE != audioCodecId { + t.muxer.AddAudioTrack(audioCodecId, 0, 0, 0) } + if utils.AVCodecIdNONE != videoCodecId { + t.muxer.AddVideoTrack(videoCodecId) + } + + //统一生成rtmp拉流需要的数据头(chunk+sequence header) var n int if audioStream != nil { n += t.muxer.WriteAudioData(t.header[12:], true) diff --git a/stream/hook.go b/stream/hook.go index 79ddbf9..8c88b73 100644 --- a/stream/hook.go +++ b/stream/hook.go @@ -8,23 +8,6 @@ import ( ) type HookFunc func(m map[string]interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error - -type Hook interface { - DoPublish(m map[string]interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error - - DoPublishDone(m map[string]interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error - - DoPlay(m map[string]interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error - - DoPlayDone(m map[string]interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error - - DoRecord(m map[string]interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error - - DoIdleTimeout(m map[string]interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error - - DoRecvTimeout(m map[string]interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error -} - type HookEvent int const ( @@ -44,8 +27,12 @@ type eventInfo struct { remoteAddr string //peer地址 } -func NewHookEventInfo(stream, protocol, remoteAddr string) eventInfo { - return eventInfo{stream: stream, protocol: protocol, remoteAddr: remoteAddr} +func NewPlayHookEventInfo(stream, remoteAddr string, protocol Protocol) eventInfo { + return eventInfo{stream: stream, protocol: streamTypeToStr(protocol), remoteAddr: remoteAddr} +} + +func NewPublishHookEventInfo(stream, remoteAddr string, protocol SourceType) eventInfo { + return eventInfo{stream: stream, protocol: sourceTypeToStr(protocol), remoteAddr: remoteAddr} } type HookSession interface { diff --git a/stream/memory_pool.go b/stream/memory_pool.go index cec23fd..80a93ad 100644 --- a/stream/memory_pool.go +++ b/stream/memory_pool.go @@ -187,12 +187,11 @@ func (m *memoryPool) FreeTail() { } func (m *memoryPool) Data() ([]byte, []byte) { - if m.tail <= m.head { + if m.tail <= m.head && !m.blockQueue.IsEmpty() { return m.data[m.head:m.capacity], m.data[:m.tail] } else { return m.data[m.head:m.tail], nil } - } func (m *memoryPool) Clear() { diff --git a/stream/sink.go b/stream/sink.go index fe71271..814f00f 100644 --- a/stream/sink.go +++ b/stream/sink.go @@ -5,7 +5,6 @@ import ( "github.com/yangjiechina/avformat/utils" "net" "net/http" - "sync/atomic" ) type SinkId interface{} @@ -41,24 +40,24 @@ type ISink interface { Close() } -// GenerateSinkId 根据Conn生成SinkId IPV4使用一个uint64, IPV6使用String -func GenerateSinkId(conn net.Conn) SinkId { - network := conn.RemoteAddr().Network() +// GenerateSinkId 根据网络地址生成SinkId IPV4使用一个uint64, IPV6使用String +func GenerateSinkId(addr net.Addr) SinkId { + network := addr.Network() if "tcp" == network { - id := uint64(utils.BytesToInt(conn.RemoteAddr().(*net.TCPAddr).IP.To4())) + id := uint64(utils.BytesToInt(addr.(*net.TCPAddr).IP.To4())) id <<= 32 - id |= uint64(conn.RemoteAddr().(*net.TCPAddr).Port << 16) + id |= uint64(addr.(*net.TCPAddr).Port << 16) return id } else if "udp" == network { - id := uint64(utils.BytesToInt(conn.RemoteAddr().(*net.UDPAddr).IP.To4())) + id := uint64(utils.BytesToInt(addr.(*net.UDPAddr).IP.To4())) id <<= 32 - id |= uint64(conn.RemoteAddr().(*net.UDPAddr).Port << 16) + id |= uint64(addr.(*net.UDPAddr).Port << 16) return id } - return conn.RemoteAddr().String() + return addr.String() } type SinkImpl struct { @@ -74,7 +73,7 @@ type SinkImpl struct { //Sink在请求拉流->Source推流->Sink断开整个阶段 是无锁线程安全 //如果Sink在等待队列-Sink断开,这个过程是非线程安全的 //SetState的时候,如果closed为true,返回false, 调用者自行删除sink - closed atomic.Bool + //closed atomic.Bool //HasSentKeyVideo 是否已经发送视频关键帧 //未开启GOP缓存的情况下,为避免播放花屏,发送的首个视频帧必须为关键帧 @@ -121,10 +120,10 @@ func (s *SinkImpl) State() SessionState { } func (s *SinkImpl) SetState(state SessionState) bool { - load := s.closed.Load() - if load { - return false - } + //load := s.closed.Load() + //if load { + // return false + //} if s.State_ < SessionStateClose { s.State_ = state @@ -136,7 +135,8 @@ func (s *SinkImpl) SetState(state SessionState) bool { // //} - return !s.closed.Load() + //return !s.closed.Load() + return true } func (s *SinkImpl) EnableVideo() bool { @@ -166,7 +166,7 @@ func (s *SinkImpl) Close() { //从等待队列中删除sink RemoveSinkFromWaitingQueue(s.SourceId_, s.Id_) s.State_ = SessionStateClose - s.closed.Store(true) + //s.closed.Store(true) } } @@ -188,7 +188,7 @@ func (s *SinkImpl) Play(sink ISink, success func(), failure func(state utils.Hoo return } - err := s.Hook(HookEventPlay, NewHookEventInfo(sink.SourceId(), streamTypeToStr(sink.Protocol()), ""), func(response *http.Response) { + err := s.Hook(HookEventPlay, NewPlayHookEventInfo(sink.SourceId(), "", sink.Protocol()), func(response *http.Response) { f() success() }, func(response *http.Response, err error) { diff --git a/stream/sink_manager.go b/stream/sink_manager.go index e3ff3ac..3231761 100644 --- a/stream/sink_manager.go +++ b/stream/sink_manager.go @@ -1,10 +1,14 @@ package stream -import "sync" +import ( + "fmt" + "sync" +) +// 等待队列所有的Sink var waitingSinks map[string]map[SinkId]ISink -var mutex sync.Mutex +var mutex sync.RWMutex func init() { waitingSinks = make(map[string]map[SinkId]ISink, 1024) @@ -56,5 +60,83 @@ func PopWaitingSinks(sourceId string) []ISink { for _, sink := range source { sinks[index] = sink } + + delete(waitingSinks, sourceId) return sinks } + +func ExistSinkInWaitingQueue(sourceId string, sinkId SinkId) bool { + mutex.RLock() + defer mutex.RUnlock() + + source, ok := waitingSinks[sourceId] + if !ok { + return false + } + + _, ok = source[sinkId] + return ok +} + +func ExistSink(sourceId string, sinkId SinkId) bool { + if sourceId != "" { + if exist := ExistSinkInWaitingQueue(sourceId, sinkId); exist { + return true + } + } + + return SinkManager.Exist(sinkId) +} + +// ISinkManager 添加到TransStream的所有Sink +type ISinkManager interface { + Add(source ISink) error + + Find(id SinkId) ISink + + Remove(id SinkId) (ISink, error) + + Exist(id SinkId) bool +} + +var SinkManager ISinkManager + +func init() { + SinkManager = &sinkManagerImpl{} +} + +type sinkManagerImpl struct { + m sync.Map +} + +func (s *sinkManagerImpl) Add(source ISink) error { + _, ok := s.m.LoadOrStore(source.Id(), source) + if ok { + return fmt.Errorf("the source %s has been exist", source.Id()) + } + + return nil +} + +func (s *sinkManagerImpl) Find(id SinkId) ISink { + value, ok := s.m.Load(id) + if ok { + return value.(ISink) + } + + return nil +} + +func (s *sinkManagerImpl) Remove(id SinkId) (ISink, error) { + value, loaded := s.m.LoadAndDelete(id) + if loaded { + return value.(ISink), nil + } + + return nil, fmt.Errorf("source with id %s was not find", id) +} + +func (s *sinkManagerImpl) Exist(id SinkId) bool { + _, ok := s.m.Load(id) + return ok +} diff --git a/stream/source.go b/stream/source.go index 27db6ed..4e4b8a3 100644 --- a/stream/source.go +++ b/stream/source.go @@ -494,7 +494,7 @@ func (s *SourceImpl) Publish(source ISource, success func(), failure func(state return } - err := s.Hook(HookEventPublish, NewHookEventInfo(source.Id(), sourceTypeToStr(source.Type()), ""), + err := s.Hook(HookEventPublish, NewPublishHookEventInfo(source.Id(), "", source.Type()), func(response *http.Response) { if err := SourceManager.Add(source); err == nil { success() diff --git a/stream/stream_manager.go b/stream/stream_manager.go index 673a90b..47162c8 100644 --- a/stream/stream_manager.go +++ b/stream/stream_manager.go @@ -4,9 +4,22 @@ import ( "github.com/yangjiechina/avformat/utils" ) +type IStreamManager interface { + Add(stream utils.AVStream) + + FindStream(id utils.AVCodecID) utils.AVStream + + FindStreamWithType(mediaType utils.AVMediaType) utils.AVStream + + FindStreams(id utils.AVCodecID) []utils.AVStream + + FindStreamsWithType(mediaType utils.AVMediaType) []utils.AVStream + + All() []utils.AVStream +} + type StreamManager struct { - streams []utils.AVStream - completed bool + streams []utils.AVStream } func (s *StreamManager) Add(stream utils.AVStream) { diff --git a/stream/trans_stream.go b/stream/trans_stream.go index aec34ce..a0e7463 100644 --- a/stream/trans_stream.go +++ b/stream/trans_stream.go @@ -66,6 +66,8 @@ var TransStreamFactory func(source ISource, protocol Protocol, streams []utils.A // ITransStream 讲AVPacket封装成传输流,转发给各个Sink type ITransStream interface { + Init() + Input(packet utils.AVPacket) error AddTrack(stream utils.AVStream) error @@ -81,6 +83,8 @@ type ITransStream interface { AllSink() []ISink Close() error + + SendPacket(data []byte) error } type TransStreamImpl struct { @@ -92,6 +96,10 @@ type TransStreamImpl struct { ExistVideo bool } +func (t *TransStreamImpl) Init() { + t.Sinks = make(map[SinkId]ISink, 64) +} + func (t *TransStreamImpl) Input(packet utils.AVPacket) error { return nil } @@ -134,3 +142,68 @@ func (t *TransStreamImpl) AllSink() []ISink { func (t *TransStreamImpl) Close() error { return nil } +func (t *TransStreamImpl) SendPacket(data []byte) error { + for _, sink := range t.Sinks { + sink.Input(data) + } + + return nil +} + +// CacheTransStream 针对RTMP/FLV/HLS等基于TCP传输的带缓存传输流. +type CacheTransStream struct { + TransStreamImpl + + //作为封装流的内存缓存区, 即使没有开启GOP缓存也创建一个, 开启GOP缓存的情况下, 创建2个, 反复交替使用. + StreamBuffers []MemoryPool + + //当前合并写切片位于memoryPool的开始偏移量 + SegmentOffset int + //前一个包的时间戳 + PrePacketTS int64 +} + +func (c *CacheTransStream) Init() { + c.TransStreamImpl.Init() + + c.StreamBuffers = make([]MemoryPool, 2) + c.StreamBuffers[0] = NewMemoryPoolWithDirect(1024*4000, true) + + if c.ExistVideo && AppConfig.MergeWriteLatency > 0 { + c.StreamBuffers[1] = NewMemoryPoolWithDirect(1024*4000, true) + } + + c.SegmentOffset = 0 + c.PrePacketTS = -1 +} + +func (c *CacheTransStream) Full(ts int64) bool { + if c.PrePacketTS == -1 { + c.PrePacketTS = ts + } + + if ts < c.PrePacketTS { + c.PrePacketTS = ts + } + + return int(ts-c.PrePacketTS) >= AppConfig.MergeWriteLatency +} + +func (c *CacheTransStream) SwapStreamBuffer() { + utils.Assert(c.ExistVideo) + + tmp := c.StreamBuffers[0] + c.StreamBuffers[0] = c.StreamBuffers[1] + c.StreamBuffers[1] = tmp + c.StreamBuffers[0].Clear() + + c.PrePacketTS = -1 + c.SegmentOffset = 0 +} + +func (c *CacheTransStream) SendPacketWithOffset(data []byte, offset int) error { + c.TransStreamImpl.SendPacket(data[offset:]) + c.SegmentOffset = len(data) + c.PrePacketTS = -1 + return nil +}