From 6eedd16cbff2c6cb1cb183b09226968a634c2cb9 Mon Sep 17 00:00:00 2001 From: Han Gyoung-Su Date: Sun, 22 Jun 2025 23:25:32 +0900 Subject: [PATCH] Feature/thumbnail (#22) * feat: thumbnail structure * feat: make thumbnail, serve thumbnail --- config.toml | 7 + config/config.go | 20 +- httpsrv/thumbnailhandler.go | 81 ++++++++ main.go | 30 ++- media/streamer/egress/thumbnail/handler.go | 229 +++++++++++++++++++++ media/streamer/ingress/whip/serve.go | 4 +- 6 files changed, 360 insertions(+), 11 deletions(-) create mode 100644 httpsrv/thumbnailhandler.go create mode 100644 media/streamer/egress/thumbnail/handler.go diff --git a/config.toml b/config.toml index 9238598..ba272f5 100644 --- a/config.toml +++ b/config.toml @@ -10,3 +10,10 @@ mode=false record=true [ebml] record=false + +[thumbnail] +enable=true +output_path="./thumbnails" +interval_seconds=5 +width=320 +height=180 diff --git a/config/config.go b/config/config.go index 168fa98..cb3cfdb 100644 --- a/config/config.go +++ b/config/config.go @@ -2,11 +2,12 @@ package config // Struct to hold the configuration type Config struct { - RTMP RTMP `mapstructure:"rtmp"` - Service Service `mapstructure:"service"` - Docker DockerConfig `mapstructure:"docker"` - MP4 MP4 `mapstructure:"mp4"` - EBML EBML `mapstructure:"ebml"` + RTMP RTMP `mapstructure:"rtmp"` + Service Service `mapstructure:"service"` + Docker DockerConfig `mapstructure:"docker"` + MP4 MP4 `mapstructure:"mp4"` + EBML EBML `mapstructure:"ebml"` + Thumbnail Thumbnail `mapstructure:"thumbnail"` } type RTMP struct { @@ -30,3 +31,12 @@ type MP4 struct { type EBML struct { Record bool `mapstructure:"record"` } + +// Thumbnail configuration for thumbnail generation service +type Thumbnail struct { + Enable bool `mapstructure:"enable"` + OutputPath string `mapstructure:"output_path"` + IntervalSeconds int `mapstructure:"interval_seconds"` + Width int `mapstructure:"width"` + Height int `mapstructure:"height"` +} diff --git a/httpsrv/thumbnailhandler.go b/httpsrv/thumbnailhandler.go new file mode 100644 index 0000000..4bf7b0a --- /dev/null +++ b/httpsrv/thumbnailhandler.go @@ -0,0 +1,81 @@ +package httpsrv + +import ( + "fmt" + "net/http" + "strings" + + "github.com/labstack/echo/v4" + + "liveflow/media/streamer/egress/thumbnail" +) + +// ThumbnailHandler handles thumbnail HTTP requests +type ThumbnailHandler struct { + store *thumbnail.ThumbnailStore +} + +// NewThumbnailHandler creates a new thumbnail handler +func NewThumbnailHandler(store *thumbnail.ThumbnailStore) *ThumbnailHandler { + return &ThumbnailHandler{ + store: store, + } +} + +// HandleThumbnail serves the latest thumbnail for a given stream ID +func (h *ThumbnailHandler) HandleThumbnail(c echo.Context) error { + streamID := c.Param("streamID") + + // Fallback: parse URL path directly if Echo param extraction fails + if streamID == "" { + path := c.Request().URL.Path + // Remove /thumbnail/ prefix and extract streamID + if strings.HasPrefix(path, "/thumbnail/") { + remaining := strings.TrimPrefix(path, "/thumbnail/") + // Remove .jpg suffix if present + if strings.HasSuffix(remaining, ".jpg") { + remaining = strings.TrimSuffix(remaining, ".jpg") + } + streamID = remaining + } + } + + if streamID == "" { + return c.JSON(http.StatusBadRequest, APIResponse{ + ErrorCode: 400, + Message: "stream ID is required", + }) + } + + // Get thumbnail from memory store + thumbnailData, exists := h.store.Get(streamID) + if !exists { + return c.JSON(http.StatusNotFound, APIResponse{ + ErrorCode: 404, + Message: fmt.Sprintf("no thumbnail found for stream %s", streamID), + }) + } + + // Set appropriate headers + c.Response().Header().Set("Content-Type", "image/jpeg") + c.Response().Header().Set("Cache-Control", "max-age=30") // Cache for 30 seconds + c.Response().Header().Set("Content-Length", fmt.Sprintf("%d", len(thumbnailData.Data))) + + // Serve the thumbnail data + return c.Blob(http.StatusOK, "image/jpeg", thumbnailData.Data) +} + +// HandleThumbnailWithExtension serves thumbnail with .jpg extension in URL +func (h *ThumbnailHandler) HandleThumbnailWithExtension(c echo.Context) error { + streamIDWithExt := c.Param("streamID") + + // Remove .jpg extension if present + streamID := strings.TrimSuffix(streamIDWithExt, ".jpg") + + // Set the cleaned stream ID back to the context + c.SetParamNames("streamID") + c.SetParamValues(streamID) + + // Call the main handler + return h.HandleThumbnail(c) +} diff --git a/main.go b/main.go index eff1ee8..4bbddec 100644 --- a/main.go +++ b/main.go @@ -15,7 +15,6 @@ import ( "liveflow/media/streamer/egress/record/mp4" "liveflow/media/streamer/egress/record/webm" "liveflow/media/streamer/egress/whep" - "liveflow/media/streamer/ingress/whip" "github.com/labstack/echo/v4" "github.com/labstack/echo/v4/middleware" @@ -27,7 +26,9 @@ import ( "liveflow/log" "liveflow/media/hlshub" "liveflow/media/hub" + "liveflow/media/streamer/egress/thumbnail" "liveflow/media/streamer/ingress/rtmp" + "liveflow/media/streamer/ingress/whip" ) /* @@ -76,10 +77,12 @@ func main() { tracks = make(map[string][]*webrtc.TrackLocalStaticRTP) // Egress is started by streamID notification hlsHub := hlshub.NewHLSHub() + thumbnailStore := thumbnail.NewThumbnailStore() go func() { api := echo.New() api.HideBanner = true hlsHandler := httpsrv.NewHandler(hlsHub) + thumbnailHandler := httpsrv.NewThumbnailHandler(thumbnailStore) api.Use(middleware.Logger()) // 1. API routes @@ -100,11 +103,17 @@ func main() { }) whipServer.RegisterRoute() - // 2. Serve static files - api.Static("/", "front/dist") + // Thumbnail routes - simplified without middleware + api.GET("/thumbnail/:streamID", thumbnailHandler.HandleThumbnail) + api.GET("/thumbnail/:streamID.jpg", thumbnailHandler.HandleThumbnailWithExtension) - // 3. SPA fallback for all other routes - api.GET("/*", func(c echo.Context) error { + // 2. Serve static files for specific paths only - avoid wildcard conflicts + // api.Static("/static", "front/dist") + // Use more specific routes to avoid interfering with API routes + api.GET("/assets/*", func(c echo.Context) error { + return c.File("front/dist" + c.Request().URL.Path) + }) + api.GET("/", func(c echo.Context) error { return c.File("front/dist/index.html") }) @@ -139,6 +148,17 @@ func main() { })) } + if conf.Thumbnail.Enable { + starters = append(starters, thumbnail.NewThumbnail(thumbnail.ThumbnailArgs{ + Hub: sourceHub, + Store: thumbnailStore, + OutputPath: conf.Thumbnail.OutputPath, + IntervalSeconds: conf.Thumbnail.IntervalSeconds, + Width: conf.Thumbnail.Width, + Height: conf.Thumbnail.Height, + })) + } + starters = append(starters, hls.NewHLS(hls.HLSArgs{ Hub: sourceHub, HLSHub: hlsHub, diff --git a/media/streamer/egress/thumbnail/handler.go b/media/streamer/egress/thumbnail/handler.go new file mode 100644 index 0000000..244b6f9 --- /dev/null +++ b/media/streamer/egress/thumbnail/handler.go @@ -0,0 +1,229 @@ +package thumbnail + +import ( + "bytes" + "context" + "errors" + "image" + "image/jpeg" + "sync" + "time" + + "liveflow/log" + "liveflow/media/hub" + "liveflow/media/streamer/fields" + "liveflow/media/streamer/processes" + + "github.com/asticode/go-astiav" + "github.com/sirupsen/logrus" +) + +var ( + ErrUnsupportedCodec = errors.New("unsupported codec") +) + +// ThumbnailData represents thumbnail data in memory +type ThumbnailData struct { + Data []byte + Timestamp time.Time +} + +// ThumbnailStore manages thumbnails in memory +type ThumbnailStore struct { + mu sync.RWMutex + thumbnails map[string]*ThumbnailData +} + +// NewThumbnailStore creates a new thumbnail store +func NewThumbnailStore() *ThumbnailStore { + return &ThumbnailStore{ + thumbnails: make(map[string]*ThumbnailData), + } +} + +// Set stores thumbnail data for a stream +func (ts *ThumbnailStore) Set(streamID string, data []byte) { + ts.mu.Lock() + defer ts.mu.Unlock() + ts.thumbnails[streamID] = &ThumbnailData{ + Data: data, + Timestamp: time.Now(), + } +} + +// Get retrieves thumbnail data for a stream +func (ts *ThumbnailStore) Get(streamID string) (*ThumbnailData, bool) { + ts.mu.RLock() + defer ts.mu.RUnlock() + data, exists := ts.thumbnails[streamID] + return data, exists +} + +// Delete removes thumbnail data for a stream +func (ts *ThumbnailStore) Delete(streamID string) { + ts.mu.Lock() + defer ts.mu.Unlock() + delete(ts.thumbnails, streamID) +} + +// ThumbnailArgs contains arguments for initializing thumbnail service +type ThumbnailArgs struct { + Hub *hub.Hub + Store *ThumbnailStore + OutputPath string // Not used anymore, kept for compatibility + IntervalSeconds int // Thumbnail generation interval in seconds + Width int // Thumbnail width + Height int // Thumbnail height +} + +// Thumbnail represents thumbnail generation service +type Thumbnail struct { + hub *hub.Hub + intervalSeconds int + width int + height int + decoder *processes.VideoDecodingProcess + lastThumbnailTime int64 + store *ThumbnailStore +} + +// NewThumbnail creates a new thumbnail service instance +func NewThumbnail(args ThumbnailArgs) *Thumbnail { + return &Thumbnail{ + hub: args.Hub, + intervalSeconds: args.IntervalSeconds, + width: args.Width, + height: args.Height, + store: args.Store, + } +} + +// Start starts the thumbnail service +func (t *Thumbnail) Start(ctx context.Context, source hub.Source) error { + if !hub.HasCodecType(source.MediaSpecs(), hub.CodecTypeH264) { + return ErrUnsupportedCodec + } + + ctx = log.WithFields(ctx, logrus.Fields{ + fields.StreamID: source.StreamID(), + fields.SourceName: source.Name(), + }) + log.Info(ctx, "start thumbnail") + + // Initialize video decoder + t.decoder = processes.NewVideoDecodingProcess(astiav.CodecIDH264) + if err := t.decoder.Init(); err != nil { + return err + } + + sub := t.hub.Subscribe(source.StreamID()) + go func() { + intervalMS := int64(t.intervalSeconds * 1000) + + for data := range sub { + if data.H264Video != nil { + // Check if enough time has passed for next thumbnail + if data.H264Video.RawDTS()-t.lastThumbnailTime >= intervalMS { + // Check if this is a keyframe for better thumbnail quality + isKeyFrame := false + for _, sliceType := range data.H264Video.SliceTypes { + if sliceType == hub.SliceI { + isKeyFrame = true + break + } + } + + if isKeyFrame { + t.onVideo(ctx, data.H264Video, source.StreamID()) + t.lastThumbnailTime = data.H264Video.RawDTS() + } + } + } + } + + // Clean up thumbnail when stream ends + t.store.Delete(source.StreamID()) + log.Infof(ctx, "thumbnail cleaned up for stream %s", source.StreamID()) + }() + + return nil +} + +// encodeImageToJPEG encodes an image to JPEG bytes +func (t *Thumbnail) encodeImageToJPEG(img image.Image) ([]byte, error) { + var buf bytes.Buffer + + // JPEG encode options + options := &jpeg.Options{ + Quality: 85, // Good quality for thumbnails + } + + if err := jpeg.Encode(&buf, img, options); err != nil { + return nil, err + } + + return buf.Bytes(), nil +} + +// onVideo processes H264 video data and generates thumbnail +func (t *Thumbnail) onVideo(ctx context.Context, h264Video *hub.H264Video, streamID string) { + // Decode H264 to AVFrame + frames, err := t.decoder.Process(*h264Video) + if err != nil { + log.Error(ctx, err, "failed to decode video for thumbnail") + return + } + + // Process each decoded frame + for _, frame := range frames { + if frame != nil { + t.generateThumbnail(ctx, frame, streamID) + // Only generate one thumbnail per interval + break + } + } +} + +// generateThumbnail creates thumbnail from AVFrame +func (t *Thumbnail) generateThumbnail(ctx context.Context, frame *astiav.Frame, streamID string) { + // Get frame data + frameData := frame.Data() + + // Guess the image format from pixel format + img, err := frameData.GuessImageFormat() + if err != nil { + log.Error(ctx, err, "failed to guess image format") + return + } + + // Convert AVFrame to Go image + err = frameData.ToImage(img) + if err != nil { + log.Error(ctx, err, "failed to convert frame to image") + return + } + + // Encode image to JPEG bytes + jpegData, err := t.encodeImageToJPEG(img) + if err != nil { + log.Error(ctx, err, "failed to encode image to JPEG") + return + } + + // Store in memory + t.store.Set(streamID, jpegData) + + log.Infof(ctx, "thumbnail updated in memory for stream %s (size: %d bytes, image: %dx%d)", + streamID, len(jpegData), img.Bounds().Dx(), img.Bounds().Dy()) +} + +// Name returns the service name +func (t *Thumbnail) Name() string { + return "thumbnail" +} + +// Stop stops the thumbnail service +func (t *Thumbnail) Stop() error { + // TODO: implement cleanup logic + return nil +} diff --git a/media/streamer/ingress/whip/serve.go b/media/streamer/ingress/whip/serve.go index 226fc89..8496e19 100644 --- a/media/streamer/ingress/whip/serve.go +++ b/media/streamer/ingress/whip/serve.go @@ -57,7 +57,9 @@ func NewWHIP(args WHIPArgs) *WHIP { func (r *WHIP) RegisterRoute() { whipServer := r.echo - whipServer.Static("/wv", "static") + whipServer.GET("/wv", func(c echo.Context) error { + return c.File("static/index.html") + }) whipServer.POST("/whip", r.whipHandler) whipServer.POST("/whep", r.whepHandler) }