From edbdd64acf3c99d0a16abc021bceedc9a7926cdf Mon Sep 17 00:00:00 2001 From: yangjiechina <1534796060@qq.com> Date: Sun, 21 Jul 2024 23:12:16 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E7=8E=B0HLS=E6=92=AD=E6=94=BE?= =?UTF-8?q?=E6=8B=89=E6=B5=81=E8=AE=A1=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api.go | 93 ++++++++++++++++++++++++++++++----------------- hls/hls_sink.go | 67 +++++++++++++++++++++++++++------- hls/hls_stream.go | 39 +++++++++----------- hls/m3u8.go | 2 +- 4 files changed, 131 insertions(+), 70 deletions(-) diff --git a/api.go b/api.go index 7e0dd07..e4198cb 100644 --- a/api.go +++ b/api.go @@ -359,6 +359,17 @@ func (api *ApiServer) onTS(source string, w http.ResponseWriter, r *http.Request return } + sid := r.URL.Query().Get(hls.SessionIdKey) + var sink stream.Sink + if sid != "" { + sink = stream.SinkManager.Find(stream.SinkId(sid)) + } + if sink == nil { + log.Sugar.Errorf("hls session with id '%s' has expired.", sid) + w.WriteHeader(http.StatusForbidden) + return + } + index := strings.LastIndex(source, "_") if index < 0 || index == len(source)-1 { w.WriteHeader(http.StatusBadRequest) @@ -366,19 +377,19 @@ func (api *ApiServer) onTS(source string, w http.ResponseWriter, r *http.Request } seq := source[index+1:] - sourceId := source[:index] - tsPath := stream.AppConfig.Hls.TSPath(sourceId, seq) + tsPath := stream.AppConfig.Hls.TSPath(sink.SourceId(), seq) if _, err := os.Stat(tsPath); err != nil { w.WriteHeader(http.StatusNotFound) return } - //链路复用无法获取http断开回调 - //Hijack需要自行解析http + sink.(*hls.M3U8Sink).RefreshPlayTime() + w.Header().Set("Content-Type", "video/MP2T") http.ServeFile(w, r, tsPath) } func (api *ApiServer) onHLS(sourceId string, w http.ResponseWriter, r *http.Request) { + log.Sugar.Infof("请求m3u8") if !stream.AppConfig.Hls.Enable { log.Sugar.Warnf("处理hls请求失败 server未开启hls") http.Error(w, "hls disable", http.StatusInternalServerError) @@ -386,39 +397,53 @@ func (api *ApiServer) onHLS(sourceId string, w http.ResponseWriter, r *http.Requ } w.Header().Set("Content-Type", "application/vnd.apple.mpegurl") - //m3u8和ts会一直刷新, 每个请求只hook一次. - sinkId := api.generateSinkId(r.RemoteAddr) + //hls_sid是流媒体服务器让播放端, 携带的会话id, 如果没有携带说明是第一次请求播放. + //播放端不要使用hls_sid这个key, 否则会一直拉流失败 + sid := r.URL.Query().Get(hls.SessionIdKey) + if sid == "" { + //让播放端携带会话id + sid = utils.RandStringBytes(10) - //hook成功后, 如果还没有m3u8文件,等生成m3u8文件 - //后续直接返回当前m3u8文件 - if stream.SinkManager.Exist(sinkId) { - http.ServeFile(w, r, stream.AppConfig.Hls.M3U8Path(sourceId)) + query := r.URL.Query() + query.Add(hls.SessionIdKey, sid) + path := fmt.Sprintf("/%s.m3u8?%s", sourceId, query.Encode()) + + response := "#EXTM3U\r\n" + + "#EXT-X-STREAM-INF:BANDWIDTH=1,AVERAGE-BANDWIDTH=1\r\n" + + path + "\r\n" + w.Write([]byte(response)) + return + } + + sink := stream.SinkManager.Find(sid) + if sink != nil { + w.Write([]byte(sink.(*hls.M3U8Sink).GetM3U8String())) + return + } + + context := r.Context() + done := make(chan int, 0) + sink = hls.NewM3U8Sink(sid, sourceId, func(m3u8 []byte) { + w.Write(m3u8) + done <- 0 + }, sid) + + sink.SetUrlValues(r.URL.Query()) + _, state := stream.PreparePlaySink(sink) + if utils.HookStateOK != state { + log.Sugar.Warnf("m3u8 请求失败 sink:%s", sink.PrintInfo()) + + w.WriteHeader(http.StatusForbidden) + return } else { - context := r.Context() - done := make(chan int, 0) + err := stream.SinkManager.Add(sink) + utils.Assert(err == nil) + } - sink := hls.NewM3U8Sink(sinkId, sourceId, func(m3u8 []byte) { - w.Write(m3u8) - done <- 0 - }) - - sink.SetUrlValues(r.URL.Query()) - _, state := stream.PreparePlaySink(sink) - if utils.HookStateOK != state { - log.Sugar.Warnf("m3u8 请求失败 sink:%s", sink.PrintInfo()) - - w.WriteHeader(http.StatusForbidden) - return - } else { - err := stream.SinkManager.Add(sink) - utils.Assert(err == nil) - } - - select { - case <-done: - case <-context.Done(): - break - } + select { + case <-done: + case <-context.Done(): + break } } diff --git a/hls/hls_sink.go b/hls/hls_sink.go index 8bf3abe..48fc327 100644 --- a/hls/hls_sink.go +++ b/hls/hls_sink.go @@ -1,31 +1,70 @@ package hls import ( + "fmt" + "github.com/lkmio/lkm/log" "github.com/lkmio/lkm/stream" + "strings" + "time" ) -type tsSink struct { +const ( + SessionIdKey = "hls_sid" +) + +type M3U8Sink struct { stream.BaseSink + cb func(m3u8 []byte) //生成m3u8文件的发送回调 + sessionId string + playtime time.Time + playTimer *time.Timer + m3u8StringFormat *string } -func NewTSSink(id stream.SinkId, sourceId string) stream.Sink { - return &tsSink{stream.BaseSink{Id_: id, SourceId_: sourceId, Protocol_: stream.ProtocolHls}} -} - -func (s *tsSink) Input(data []byte) error { +func (s *M3U8Sink) SendM3U8Data(data *string) error { + s.m3u8StringFormat = data + s.cb([]byte(s.GetM3U8String())) return nil } -type m3u8Sink struct { - stream.BaseSink - cb func(m3u8 []byte) +func (s *M3U8Sink) Start() { + timeout := time.Duration(stream.AppConfig.IdleTimeout) + if timeout < time.Second { + timeout = time.Duration(stream.AppConfig.Hls.Duration) * 2 * 3 * time.Second + } + + s.playTimer = time.AfterFunc(timeout, func() { + sub := time.Now().Sub(s.playtime) + if sub > timeout { + log.Sugar.Errorf("长时间没有拉取TS切片 sink:%d 超时", s.Id_) + s.Close() + return + } + + s.playTimer.Reset(timeout) + }) } -func (s *m3u8Sink) Input(data []byte) error { - s.cb(data) - return nil +func (s *M3U8Sink) GetM3U8String() string { + param := fmt.Sprintf("?%s=%s", SessionIdKey, s.sessionId) + all := strings.ReplaceAll(string(*s.m3u8StringFormat), "%s", param) + log.Sugar.Infof("m3u8 list:%s", all) + return all } -func NewM3U8Sink(id stream.SinkId, sourceId string, cb func(m3u8 []byte)) stream.Sink { - return &m3u8Sink{stream.BaseSink{Id_: id, SourceId_: sourceId, Protocol_: stream.ProtocolHls}, cb} +func (s *M3U8Sink) RefreshPlayTime() { + s.playtime = time.Now() +} + +func (s *M3U8Sink) Close() { + stream.SinkManager.Remove(s.Id_) + s.BaseSink.Close() +} + +func NewM3U8Sink(id stream.SinkId, sourceId string, cb func(m3u8 []byte), sessionId string) stream.Sink { + return &M3U8Sink{ + BaseSink: stream.BaseSink{Id_: id, SourceId_: sourceId, Protocol_: stream.ProtocolHls}, + cb: cb, + sessionId: sessionId, + } } diff --git a/hls/hls_stream.go b/hls/hls_stream.go index 674873e..f55d4b9 100644 --- a/hls/hls_stream.go +++ b/hls/hls_stream.go @@ -35,7 +35,8 @@ type transStream struct { duration int //切片时长, 单位秒 playlistLength int //最大切片文件个数 - m3u8Sinks map[stream.SinkId]stream.Sink + m3u8Sinks map[stream.SinkId]*M3U8Sink //等待响应m3u8文件的sink + m3u8StringFormat string //一个协程写, 多个协程读, 不用加锁保护 } func (t *transStream) Input(packet utils.AVPacket) error { @@ -89,16 +90,14 @@ func (t *transStream) WriteHeader() error { } func (t *transStream) AddSink(sink stream.Sink) error { - if sink_, ok := sink.(*m3u8Sink); ok { - if t.m3u8.Size() > 0 { - return sink.Input([]byte(t.m3u8.ToString())) - } else { - t.m3u8Sinks[sink.Id()] = sink_ - return nil - } + t.BaseTransStream.AddSink(sink) + + if t.m3u8.Size() > 0 { + return sink.Input([]byte(t.m3u8.ToString())) } - return t.BaseTransStream.AddSink(sink) + t.m3u8Sinks[sink.Id()] = sink.(*M3U8Sink) + return nil } func (t *transStream) onTSWrite(data []byte) { @@ -139,19 +138,17 @@ func (t *transStream) flushSegment(end bool) error { duration := float32(t.muxer.Duration()) / 90000 t.m3u8.AddSegment(duration, t.context.url, t.context.segmentSeq, t.context.path) - if _, err := t.m3u8File.Seek(0, 0); err != nil { - return err - } - if err := t.m3u8File.Truncate(0); err != nil { - return err - } - m3u8Txt := t.m3u8.ToString() if end { m3u8Txt += "#EXT-X-ENDLIST" } + t.m3u8StringFormat = m3u8Txt - if _, err := t.m3u8File.Write([]byte(m3u8Txt)); err != nil { + if _, err := t.m3u8File.Seek(0, 0); err != nil { + return err + } else if err := t.m3u8File.Truncate(0); err != nil { + return err + } else if _, err := t.m3u8File.Write([]byte(m3u8Txt)); err != nil { return err } @@ -159,9 +156,10 @@ func (t *transStream) flushSegment(end bool) error { //缓存完第二个切片, 才响应发送m3u8文件. 如果一个切片就发, 播放器缓存少会卡顿. if len(t.m3u8Sinks) > 0 && t.m3u8.Size() > 1 { for _, sink := range t.m3u8Sinks { - sink.Input([]byte(m3u8Txt)) + sink.SendM3U8Data(&t.m3u8StringFormat) } - t.m3u8Sinks = make(map[stream.SinkId]stream.Sink, 0) + + t.m3u8Sinks = make(map[stream.SinkId]*M3U8Sink, 0) } return nil } @@ -283,8 +281,7 @@ func NewTransStream(dir, m3u8Name, tsFormat, tsUrl string, segmentDuration, play stream_.m3u8 = NewM3U8Writer(playlistLength) stream_.m3u8File = file - //等待响应m3u8文件的sink - stream_.m3u8Sinks = make(map[stream.SinkId]stream.Sink, 24) + stream_.m3u8Sinks = make(map[stream.SinkId]*M3U8Sink, 24) return stream_, nil } diff --git a/hls/m3u8.go b/hls/m3u8.go index 3912e14..49135a7 100644 --- a/hls/m3u8.go +++ b/hls/m3u8.go @@ -131,7 +131,7 @@ func (m *m3u8Writer) ToString() string { m.stringBuffer.WriteString("#EXTINF:") m.stringBuffer.WriteString(strconv.FormatFloat(float64(segment.(Segment).duration), 'f', -1, 32)) m.stringBuffer.WriteString(",\r\n") - m.stringBuffer.WriteString(segment.(Segment).url) + m.stringBuffer.WriteString(segment.(Segment).url + "%s") m.stringBuffer.WriteString("\r\n") } }