refactor: starter interface (#21)

This commit is contained in:
Han Gyoung-Su
2025-06-22 22:19:17 +09:00
committed by GitHub
parent eda72c5449
commit 497a4cd51b
5 changed files with 49 additions and 32 deletions

65
main.go
View File

@@ -71,7 +71,7 @@ func main() {
"app": "liveflow",
})
log.Info(ctx, "liveflow is started")
hub := hub.NewHub()
sourceHub := hub.NewHub()
var tracks map[string][]*webrtc.TrackLocalStaticRTP
tracks = make(map[string][]*webrtc.TrackLocalStaticRTP)
// Egress is started by streamID notification
@@ -93,7 +93,7 @@ func main() {
api.GET("/streams", hlsHandler.HandleListStreams)
whipServer := whip.NewWHIP(whip.WHIPArgs{
Hub: hub,
Hub: sourceHub,
Tracks: tracks,
DockerMode: conf.Docker.Mode,
Echo: api,
@@ -112,55 +112,56 @@ func main() {
fmt.Println("----------------", conf.Service.Port)
api.Start("0.0.0.0:" + strconv.Itoa(conf.Service.Port))
}()
type Starter interface {
Start(ctx context.Context, source hub.Source) error
Name() string
}
// ingress 의 rtmp, whip 서비스로부터 streamID를 받아 Service, ContainerMP4, WHEP 서비스 시작
for source := range hub.SubscribeToStreamID() {
for source := range sourceHub.SubscribeToStreamID() {
log.Infof(ctx, "New streamID received: %s", source.StreamID())
var starters []Starter
if conf.MP4.Record {
mp4 := mp4.NewMP4(mp4.MP4Args{
Hub: hub,
starters = append(starters, mp4.NewMP4(mp4.MP4Args{
Hub: sourceHub,
SplitIntervalMS: 3000,
})
err = mp4.Start(ctx, source)
if err != nil {
log.Errorf(ctx, "failed to start mp4: %v", err)
}
}))
}
if conf.EBML.Record {
webmStarter := webm.NewWEBM(webm.WebMArgs{
Hub: hub,
starters = append(starters, webm.NewWEBM(webm.WebMArgs{
Hub: sourceHub,
SplitIntervalMS: 6000,
StreamID: source.StreamID(),
})
err = webmStarter.Start(ctx, source)
if err != nil {
log.Errorf(ctx, "failed to start webm: %v", err)
}
}))
}
hls := hls.NewHLS(hls.HLSArgs{
Hub: hub,
starters = append(starters, hls.NewHLS(hls.HLSArgs{
Hub: sourceHub,
HLSHub: hlsHub,
Port: conf.Service.Port,
LLHLS: conf.Service.LLHLS,
DiskRam: conf.Service.DiskRam,
})
err := hls.Start(ctx, source)
if err != nil {
log.Errorf(ctx, "failed to start hls: %v", err)
}
whep := whep.NewWHEP(whep.WHEPArgs{
}))
starters = append(starters, whep.NewWHEP(whep.WHEPArgs{
Tracks: tracks,
Hub: hub,
})
_ = whep
err = whep.Start(ctx, source)
if err != nil {
log.Errorf(ctx, "failed to start whep: %v", err)
Hub: sourceHub,
}))
for _, starter := range starters {
if err := starter.Start(ctx, source); err != nil {
log.Errorf(ctx, "failed to start %s for stream %s: %v", starter.Name(), source.StreamID(), err)
}
}
}
}()
rtmpServer := rtmp.NewRTMP(rtmp.RTMPArgs{
Hub: hub,
Hub: sourceHub,
Port: conf.RTMP.Port,
HLSHub: hlsHub,
})

View File

@@ -212,3 +212,7 @@ func (h *HLS) makeMuxer(extraData []byte) (*gohlslib.Muxer, error) {
}
return muxer, nil
}
func (h *HLS) Name() string {
return "hls-server"
}

View File

@@ -278,3 +278,7 @@ func (m *MP4) onOPUSAudio(ctx context.Context, audioTranscodingProcess *processe
})
}
}
func (m *MP4) Name() string {
return "mp4-recorder"
}

View File

@@ -236,3 +236,7 @@ func (w *WebM) onAACAudio(ctx context.Context, aac *hub.AACAudio) {
})
}
}
func (w *WebM) Name() string {
return "webm-recorder"
}

View File

@@ -256,3 +256,7 @@ func (w *WHEP) syncAndSendPackets() error {
}
return nil
}
func (w *WHEP) Name() string {
return "whep-server"
}