mirror of
https://github.com/hsnks100/liveflow.git
synced 2025-09-26 20:21:12 +08:00
Feature/thumbnail (#22)
* feat: thumbnail structure * feat: make thumbnail, serve thumbnail
This commit is contained in:
@@ -10,3 +10,10 @@ mode=false
|
||||
record=true
|
||||
[ebml]
|
||||
record=false
|
||||
|
||||
[thumbnail]
|
||||
enable=true
|
||||
output_path="./thumbnails"
|
||||
interval_seconds=5
|
||||
width=320
|
||||
height=180
|
||||
|
@@ -7,6 +7,7 @@ type Config struct {
|
||||
Docker DockerConfig `mapstructure:"docker"`
|
||||
MP4 MP4 `mapstructure:"mp4"`
|
||||
EBML EBML `mapstructure:"ebml"`
|
||||
Thumbnail Thumbnail `mapstructure:"thumbnail"`
|
||||
}
|
||||
|
||||
type RTMP struct {
|
||||
@@ -30,3 +31,12 @@ type MP4 struct {
|
||||
type EBML struct {
|
||||
Record bool `mapstructure:"record"`
|
||||
}
|
||||
|
||||
// Thumbnail configuration for thumbnail generation service
|
||||
type Thumbnail struct {
|
||||
Enable bool `mapstructure:"enable"`
|
||||
OutputPath string `mapstructure:"output_path"`
|
||||
IntervalSeconds int `mapstructure:"interval_seconds"`
|
||||
Width int `mapstructure:"width"`
|
||||
Height int `mapstructure:"height"`
|
||||
}
|
||||
|
81
httpsrv/thumbnailhandler.go
Normal file
81
httpsrv/thumbnailhandler.go
Normal file
@@ -0,0 +1,81 @@
|
||||
package httpsrv
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/labstack/echo/v4"
|
||||
|
||||
"liveflow/media/streamer/egress/thumbnail"
|
||||
)
|
||||
|
||||
// ThumbnailHandler handles thumbnail HTTP requests
|
||||
type ThumbnailHandler struct {
|
||||
store *thumbnail.ThumbnailStore
|
||||
}
|
||||
|
||||
// NewThumbnailHandler creates a new thumbnail handler
|
||||
func NewThumbnailHandler(store *thumbnail.ThumbnailStore) *ThumbnailHandler {
|
||||
return &ThumbnailHandler{
|
||||
store: store,
|
||||
}
|
||||
}
|
||||
|
||||
// HandleThumbnail serves the latest thumbnail for a given stream ID
|
||||
func (h *ThumbnailHandler) HandleThumbnail(c echo.Context) error {
|
||||
streamID := c.Param("streamID")
|
||||
|
||||
// Fallback: parse URL path directly if Echo param extraction fails
|
||||
if streamID == "" {
|
||||
path := c.Request().URL.Path
|
||||
// Remove /thumbnail/ prefix and extract streamID
|
||||
if strings.HasPrefix(path, "/thumbnail/") {
|
||||
remaining := strings.TrimPrefix(path, "/thumbnail/")
|
||||
// Remove .jpg suffix if present
|
||||
if strings.HasSuffix(remaining, ".jpg") {
|
||||
remaining = strings.TrimSuffix(remaining, ".jpg")
|
||||
}
|
||||
streamID = remaining
|
||||
}
|
||||
}
|
||||
|
||||
if streamID == "" {
|
||||
return c.JSON(http.StatusBadRequest, APIResponse{
|
||||
ErrorCode: 400,
|
||||
Message: "stream ID is required",
|
||||
})
|
||||
}
|
||||
|
||||
// Get thumbnail from memory store
|
||||
thumbnailData, exists := h.store.Get(streamID)
|
||||
if !exists {
|
||||
return c.JSON(http.StatusNotFound, APIResponse{
|
||||
ErrorCode: 404,
|
||||
Message: fmt.Sprintf("no thumbnail found for stream %s", streamID),
|
||||
})
|
||||
}
|
||||
|
||||
// Set appropriate headers
|
||||
c.Response().Header().Set("Content-Type", "image/jpeg")
|
||||
c.Response().Header().Set("Cache-Control", "max-age=30") // Cache for 30 seconds
|
||||
c.Response().Header().Set("Content-Length", fmt.Sprintf("%d", len(thumbnailData.Data)))
|
||||
|
||||
// Serve the thumbnail data
|
||||
return c.Blob(http.StatusOK, "image/jpeg", thumbnailData.Data)
|
||||
}
|
||||
|
||||
// HandleThumbnailWithExtension serves thumbnail with .jpg extension in URL
|
||||
func (h *ThumbnailHandler) HandleThumbnailWithExtension(c echo.Context) error {
|
||||
streamIDWithExt := c.Param("streamID")
|
||||
|
||||
// Remove .jpg extension if present
|
||||
streamID := strings.TrimSuffix(streamIDWithExt, ".jpg")
|
||||
|
||||
// Set the cleaned stream ID back to the context
|
||||
c.SetParamNames("streamID")
|
||||
c.SetParamValues(streamID)
|
||||
|
||||
// Call the main handler
|
||||
return h.HandleThumbnail(c)
|
||||
}
|
30
main.go
30
main.go
@@ -15,7 +15,6 @@ import (
|
||||
"liveflow/media/streamer/egress/record/mp4"
|
||||
"liveflow/media/streamer/egress/record/webm"
|
||||
"liveflow/media/streamer/egress/whep"
|
||||
"liveflow/media/streamer/ingress/whip"
|
||||
|
||||
"github.com/labstack/echo/v4"
|
||||
"github.com/labstack/echo/v4/middleware"
|
||||
@@ -27,7 +26,9 @@ import (
|
||||
"liveflow/log"
|
||||
"liveflow/media/hlshub"
|
||||
"liveflow/media/hub"
|
||||
"liveflow/media/streamer/egress/thumbnail"
|
||||
"liveflow/media/streamer/ingress/rtmp"
|
||||
"liveflow/media/streamer/ingress/whip"
|
||||
)
|
||||
|
||||
/*
|
||||
@@ -76,10 +77,12 @@ func main() {
|
||||
tracks = make(map[string][]*webrtc.TrackLocalStaticRTP)
|
||||
// Egress is started by streamID notification
|
||||
hlsHub := hlshub.NewHLSHub()
|
||||
thumbnailStore := thumbnail.NewThumbnailStore()
|
||||
go func() {
|
||||
api := echo.New()
|
||||
api.HideBanner = true
|
||||
hlsHandler := httpsrv.NewHandler(hlsHub)
|
||||
thumbnailHandler := httpsrv.NewThumbnailHandler(thumbnailStore)
|
||||
api.Use(middleware.Logger())
|
||||
|
||||
// 1. API routes
|
||||
@@ -100,11 +103,17 @@ func main() {
|
||||
})
|
||||
whipServer.RegisterRoute()
|
||||
|
||||
// 2. Serve static files
|
||||
api.Static("/", "front/dist")
|
||||
// Thumbnail routes - simplified without middleware
|
||||
api.GET("/thumbnail/:streamID", thumbnailHandler.HandleThumbnail)
|
||||
api.GET("/thumbnail/:streamID.jpg", thumbnailHandler.HandleThumbnailWithExtension)
|
||||
|
||||
// 3. SPA fallback for all other routes
|
||||
api.GET("/*", func(c echo.Context) error {
|
||||
// 2. Serve static files for specific paths only - avoid wildcard conflicts
|
||||
// api.Static("/static", "front/dist")
|
||||
// Use more specific routes to avoid interfering with API routes
|
||||
api.GET("/assets/*", func(c echo.Context) error {
|
||||
return c.File("front/dist" + c.Request().URL.Path)
|
||||
})
|
||||
api.GET("/", func(c echo.Context) error {
|
||||
return c.File("front/dist/index.html")
|
||||
})
|
||||
|
||||
@@ -139,6 +148,17 @@ func main() {
|
||||
}))
|
||||
}
|
||||
|
||||
if conf.Thumbnail.Enable {
|
||||
starters = append(starters, thumbnail.NewThumbnail(thumbnail.ThumbnailArgs{
|
||||
Hub: sourceHub,
|
||||
Store: thumbnailStore,
|
||||
OutputPath: conf.Thumbnail.OutputPath,
|
||||
IntervalSeconds: conf.Thumbnail.IntervalSeconds,
|
||||
Width: conf.Thumbnail.Width,
|
||||
Height: conf.Thumbnail.Height,
|
||||
}))
|
||||
}
|
||||
|
||||
starters = append(starters, hls.NewHLS(hls.HLSArgs{
|
||||
Hub: sourceHub,
|
||||
HLSHub: hlsHub,
|
||||
|
229
media/streamer/egress/thumbnail/handler.go
Normal file
229
media/streamer/egress/thumbnail/handler.go
Normal file
@@ -0,0 +1,229 @@
|
||||
package thumbnail
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"image"
|
||||
"image/jpeg"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"liveflow/log"
|
||||
"liveflow/media/hub"
|
||||
"liveflow/media/streamer/fields"
|
||||
"liveflow/media/streamer/processes"
|
||||
|
||||
"github.com/asticode/go-astiav"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrUnsupportedCodec = errors.New("unsupported codec")
|
||||
)
|
||||
|
||||
// ThumbnailData represents thumbnail data in memory
|
||||
type ThumbnailData struct {
|
||||
Data []byte
|
||||
Timestamp time.Time
|
||||
}
|
||||
|
||||
// ThumbnailStore manages thumbnails in memory
|
||||
type ThumbnailStore struct {
|
||||
mu sync.RWMutex
|
||||
thumbnails map[string]*ThumbnailData
|
||||
}
|
||||
|
||||
// NewThumbnailStore creates a new thumbnail store
|
||||
func NewThumbnailStore() *ThumbnailStore {
|
||||
return &ThumbnailStore{
|
||||
thumbnails: make(map[string]*ThumbnailData),
|
||||
}
|
||||
}
|
||||
|
||||
// Set stores thumbnail data for a stream
|
||||
func (ts *ThumbnailStore) Set(streamID string, data []byte) {
|
||||
ts.mu.Lock()
|
||||
defer ts.mu.Unlock()
|
||||
ts.thumbnails[streamID] = &ThumbnailData{
|
||||
Data: data,
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
}
|
||||
|
||||
// Get retrieves thumbnail data for a stream
|
||||
func (ts *ThumbnailStore) Get(streamID string) (*ThumbnailData, bool) {
|
||||
ts.mu.RLock()
|
||||
defer ts.mu.RUnlock()
|
||||
data, exists := ts.thumbnails[streamID]
|
||||
return data, exists
|
||||
}
|
||||
|
||||
// Delete removes thumbnail data for a stream
|
||||
func (ts *ThumbnailStore) Delete(streamID string) {
|
||||
ts.mu.Lock()
|
||||
defer ts.mu.Unlock()
|
||||
delete(ts.thumbnails, streamID)
|
||||
}
|
||||
|
||||
// ThumbnailArgs contains arguments for initializing thumbnail service
|
||||
type ThumbnailArgs struct {
|
||||
Hub *hub.Hub
|
||||
Store *ThumbnailStore
|
||||
OutputPath string // Not used anymore, kept for compatibility
|
||||
IntervalSeconds int // Thumbnail generation interval in seconds
|
||||
Width int // Thumbnail width
|
||||
Height int // Thumbnail height
|
||||
}
|
||||
|
||||
// Thumbnail represents thumbnail generation service
|
||||
type Thumbnail struct {
|
||||
hub *hub.Hub
|
||||
intervalSeconds int
|
||||
width int
|
||||
height int
|
||||
decoder *processes.VideoDecodingProcess
|
||||
lastThumbnailTime int64
|
||||
store *ThumbnailStore
|
||||
}
|
||||
|
||||
// NewThumbnail creates a new thumbnail service instance
|
||||
func NewThumbnail(args ThumbnailArgs) *Thumbnail {
|
||||
return &Thumbnail{
|
||||
hub: args.Hub,
|
||||
intervalSeconds: args.IntervalSeconds,
|
||||
width: args.Width,
|
||||
height: args.Height,
|
||||
store: args.Store,
|
||||
}
|
||||
}
|
||||
|
||||
// Start starts the thumbnail service
|
||||
func (t *Thumbnail) Start(ctx context.Context, source hub.Source) error {
|
||||
if !hub.HasCodecType(source.MediaSpecs(), hub.CodecTypeH264) {
|
||||
return ErrUnsupportedCodec
|
||||
}
|
||||
|
||||
ctx = log.WithFields(ctx, logrus.Fields{
|
||||
fields.StreamID: source.StreamID(),
|
||||
fields.SourceName: source.Name(),
|
||||
})
|
||||
log.Info(ctx, "start thumbnail")
|
||||
|
||||
// Initialize video decoder
|
||||
t.decoder = processes.NewVideoDecodingProcess(astiav.CodecIDH264)
|
||||
if err := t.decoder.Init(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sub := t.hub.Subscribe(source.StreamID())
|
||||
go func() {
|
||||
intervalMS := int64(t.intervalSeconds * 1000)
|
||||
|
||||
for data := range sub {
|
||||
if data.H264Video != nil {
|
||||
// Check if enough time has passed for next thumbnail
|
||||
if data.H264Video.RawDTS()-t.lastThumbnailTime >= intervalMS {
|
||||
// Check if this is a keyframe for better thumbnail quality
|
||||
isKeyFrame := false
|
||||
for _, sliceType := range data.H264Video.SliceTypes {
|
||||
if sliceType == hub.SliceI {
|
||||
isKeyFrame = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if isKeyFrame {
|
||||
t.onVideo(ctx, data.H264Video, source.StreamID())
|
||||
t.lastThumbnailTime = data.H264Video.RawDTS()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Clean up thumbnail when stream ends
|
||||
t.store.Delete(source.StreamID())
|
||||
log.Infof(ctx, "thumbnail cleaned up for stream %s", source.StreamID())
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// encodeImageToJPEG encodes an image to JPEG bytes
|
||||
func (t *Thumbnail) encodeImageToJPEG(img image.Image) ([]byte, error) {
|
||||
var buf bytes.Buffer
|
||||
|
||||
// JPEG encode options
|
||||
options := &jpeg.Options{
|
||||
Quality: 85, // Good quality for thumbnails
|
||||
}
|
||||
|
||||
if err := jpeg.Encode(&buf, img, options); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return buf.Bytes(), nil
|
||||
}
|
||||
|
||||
// onVideo processes H264 video data and generates thumbnail
|
||||
func (t *Thumbnail) onVideo(ctx context.Context, h264Video *hub.H264Video, streamID string) {
|
||||
// Decode H264 to AVFrame
|
||||
frames, err := t.decoder.Process(*h264Video)
|
||||
if err != nil {
|
||||
log.Error(ctx, err, "failed to decode video for thumbnail")
|
||||
return
|
||||
}
|
||||
|
||||
// Process each decoded frame
|
||||
for _, frame := range frames {
|
||||
if frame != nil {
|
||||
t.generateThumbnail(ctx, frame, streamID)
|
||||
// Only generate one thumbnail per interval
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// generateThumbnail creates thumbnail from AVFrame
|
||||
func (t *Thumbnail) generateThumbnail(ctx context.Context, frame *astiav.Frame, streamID string) {
|
||||
// Get frame data
|
||||
frameData := frame.Data()
|
||||
|
||||
// Guess the image format from pixel format
|
||||
img, err := frameData.GuessImageFormat()
|
||||
if err != nil {
|
||||
log.Error(ctx, err, "failed to guess image format")
|
||||
return
|
||||
}
|
||||
|
||||
// Convert AVFrame to Go image
|
||||
err = frameData.ToImage(img)
|
||||
if err != nil {
|
||||
log.Error(ctx, err, "failed to convert frame to image")
|
||||
return
|
||||
}
|
||||
|
||||
// Encode image to JPEG bytes
|
||||
jpegData, err := t.encodeImageToJPEG(img)
|
||||
if err != nil {
|
||||
log.Error(ctx, err, "failed to encode image to JPEG")
|
||||
return
|
||||
}
|
||||
|
||||
// Store in memory
|
||||
t.store.Set(streamID, jpegData)
|
||||
|
||||
log.Infof(ctx, "thumbnail updated in memory for stream %s (size: %d bytes, image: %dx%d)",
|
||||
streamID, len(jpegData), img.Bounds().Dx(), img.Bounds().Dy())
|
||||
}
|
||||
|
||||
// Name returns the service name
|
||||
func (t *Thumbnail) Name() string {
|
||||
return "thumbnail"
|
||||
}
|
||||
|
||||
// Stop stops the thumbnail service
|
||||
func (t *Thumbnail) Stop() error {
|
||||
// TODO: implement cleanup logic
|
||||
return nil
|
||||
}
|
@@ -57,7 +57,9 @@ func NewWHIP(args WHIPArgs) *WHIP {
|
||||
|
||||
func (r *WHIP) RegisterRoute() {
|
||||
whipServer := r.echo
|
||||
whipServer.Static("/wv", "static")
|
||||
whipServer.GET("/wv", func(c echo.Context) error {
|
||||
return c.File("static/index.html")
|
||||
})
|
||||
whipServer.POST("/whip", r.whipHandler)
|
||||
whipServer.POST("/whep", r.whepHandler)
|
||||
}
|
||||
|
Reference in New Issue
Block a user