feat: /hls/streams (#19)

This commit is contained in:
Han Gyoung-Su
2025-06-22 20:47:40 +09:00
committed by GitHub
parent 1359426f1b
commit e8876671b9
5 changed files with 44 additions and 6 deletions

View File

@@ -19,6 +19,12 @@ const (
cacheControl = "CDN-Cache-Control" cacheControl = "CDN-Cache-Control"
) )
type APIResponse struct {
ErrorCode int `json:"error_code"`
Message string `json:"message"`
Data interface{} `json:"data,omitempty"`
}
type Handler struct { type Handler struct {
endpoint *hlshub.HLSHub endpoint *hlshub.HLSHub
} }
@@ -29,6 +35,19 @@ func NewHandler(hlsEndpoint *hlshub.HLSHub) *Handler {
} }
} }
type StreamsResponse struct {
Streams []string `json:"streams"`
}
func (h *Handler) HandleListStreams(c echo.Context) error {
streams := h.endpoint.WorkIDs()
return c.JSON(http.StatusOK, APIResponse{
ErrorCode: 0,
Message: "success",
Data: StreamsResponse{Streams: streams},
})
}
func (h *Handler) HandleMasterM3U8(c echo.Context) error { func (h *Handler) HandleMasterM3U8(c echo.Context) error {
ctx := context.Background() ctx := context.Background()
log.Info(ctx, "HandleMasterM3U8") log.Info(ctx, "HandleMasterM3U8")

View File

@@ -75,12 +75,11 @@ func main() {
hub := hub.NewHub() hub := hub.NewHub()
var tracks map[string][]*webrtc.TrackLocalStaticRTP var tracks map[string][]*webrtc.TrackLocalStaticRTP
tracks = make(map[string][]*webrtc.TrackLocalStaticRTP) tracks = make(map[string][]*webrtc.TrackLocalStaticRTP)
// ingress // Egress is started by streamID notification
// Egress 서비스는 streamID 알림을 구독하여 처리 시작 hlsHub := hlshub.NewHLSHub()
go func() { go func() {
api := echo.New() api := echo.New()
api.HideBanner = true api.HideBanner = true
hlsHub := hlshub.NewHLSHub()
hlsHandler := httpsrv.NewHandler(hlsHub) hlsHandler := httpsrv.NewHandler(hlsHub)
hlsRoute := api.Group("/hls", middleware.CORSWithConfig(middleware.CORSConfig{ hlsRoute := api.Group("/hls", middleware.CORSWithConfig(middleware.CORSConfig{
AllowOrigins: []string{"*"}, // Adjust origins as necessary AllowOrigins: []string{"*"}, // Adjust origins as necessary
@@ -89,6 +88,7 @@ func main() {
api.GET("/prometheus", echo.WrapHandler(promhttp.Handler())) api.GET("/prometheus", echo.WrapHandler(promhttp.Handler()))
api.GET("/debug/pprof/*", echo.WrapHandler(http.DefaultServeMux)) api.GET("/debug/pprof/*", echo.WrapHandler(http.DefaultServeMux))
// Enable CORS only for /hls routes // Enable CORS only for /hls routes
hlsRoute.GET("/streams", hlsHandler.HandleListStreams)
hlsRoute.GET("/:streamID/master.m3u8", hlsHandler.HandleMasterM3U8) hlsRoute.GET("/:streamID/master.m3u8", hlsHandler.HandleMasterM3U8)
hlsRoute.GET("/:streamID/:playlistName/stream.m3u8", hlsHandler.HandleM3U8) hlsRoute.GET("/:streamID/:playlistName/stream.m3u8", hlsHandler.HandleM3U8)
hlsRoute.GET("/:streamID/:playlistName/:resourceName", hlsHandler.HandleM3U8) hlsRoute.GET("/:streamID/:playlistName/:resourceName", hlsHandler.HandleM3U8)
@@ -153,6 +153,7 @@ func main() {
rtmpServer := rtmp.NewRTMP(rtmp.RTMPArgs{ rtmpServer := rtmp.NewRTMP(rtmp.RTMPArgs{
Hub: hub, Hub: hub,
Port: conf.RTMP.Port, Port: conf.RTMP.Port,
HLSHub: hlsHub,
}) })
rtmpServer.Serve(ctx) rtmpServer.Serve(ctx)
} }

View File

@@ -63,3 +63,13 @@ func (s *HLSHub) MuxersByWorkID(workID string) (map[string]*gohlslib.Muxer, erro
} }
return muxers, nil return muxers, nil
} }
func (s *HLSHub) WorkIDs() []string {
s.mu.RLock()
defer s.mu.RUnlock()
keys := make([]string, 0, len(s.hlsMuxers))
for k := range s.hlsMuxers {
keys = append(keys, k)
}
return keys
}

View File

@@ -18,11 +18,13 @@ import (
rtmpmsg "github.com/yutopp/go-rtmp/message" rtmpmsg "github.com/yutopp/go-rtmp/message"
"liveflow/log" "liveflow/log"
"liveflow/media/hlshub"
"liveflow/media/hub" "liveflow/media/hub"
) )
type Handler struct { type Handler struct {
hub *hub.Hub hub *hub.Hub
HLSHub *hlshub.HLSHub
streamID string streamID string
rtmp.DefaultHandler rtmp.DefaultHandler
flvFile *os.File flvFile *os.File
@@ -358,6 +360,7 @@ func (h *Handler) OnClose() {
_ = h.flvFile.Close() _ = h.flvFile.Close()
} }
h.hub.Unpublish(h.streamID) h.hub.Unpublish(h.streamID)
h.HLSHub.DeleteMuxer(h.streamID)
} }
func flvSampleRate(soundRate flvtag.SoundRate) uint32 { func flvSampleRate(soundRate flvtag.SoundRate) uint32 {

View File

@@ -9,6 +9,7 @@ import (
"github.com/yutopp/go-rtmp" "github.com/yutopp/go-rtmp"
"liveflow/log" "liveflow/log"
"liveflow/media/hlshub"
"liveflow/media/hub" "liveflow/media/hub"
) )
@@ -20,12 +21,14 @@ type RTMP struct {
serverConfig *rtmp.ServerConfig serverConfig *rtmp.ServerConfig
hub *hub.Hub hub *hub.Hub
port int port int
args RTMPArgs
} }
type RTMPArgs struct { type RTMPArgs struct {
ServerConfig *rtmp.ServerConfig ServerConfig *rtmp.ServerConfig
Hub *hub.Hub Hub *hub.Hub
Port int Port int
HLSHub *hlshub.HLSHub
} }
func NewRTMP(args RTMPArgs) *RTMP { func NewRTMP(args RTMPArgs) *RTMP {
@@ -33,6 +36,7 @@ func NewRTMP(args RTMPArgs) *RTMP {
//serverConfig: args.ServerConfig, //serverConfig: args.ServerConfig,
hub: args.Hub, hub: args.Hub,
port: args.Port, port: args.Port,
args: args,
} }
} }
@@ -49,6 +53,7 @@ func (r *RTMP) Serve(ctx context.Context) error {
OnConnect: func(conn net.Conn) (io.ReadWriteCloser, *rtmp.ConnConfig) { OnConnect: func(conn net.Conn) (io.ReadWriteCloser, *rtmp.ConnConfig) {
h := &Handler{ h := &Handler{
hub: r.hub, hub: r.hub,
HLSHub: r.args.HLSHub,
} }
return conn, &rtmp.ConnConfig{ return conn, &rtmp.ConnConfig{
Handler: h, Handler: h,