Merge branch 'release/1.1.0'
Some checks failed
Go / build (push) Has been cancelled

This commit is contained in:
한경수
2025-06-23 00:12:04 +09:00
54 changed files with 6496 additions and 408 deletions

52
.cursor/rules/rule.mdc Normal file
View File

@@ -0,0 +1,52 @@
---
description:
globs:
alwaysApply: true
---
## Constructor Patterns
### Args Structures
- For complex constructors, use `XXXArgs` struct to group parameters
```go
type QUICArgs struct {
TLSConfig *tls.Config
Bootstrap *bootstrap.BootstrapAgent
StreamManager *wrtc.StreamManager
SessionManager *streamsession.SessionManager
RTPRouter *rtprouter.RTPRouter
}
func NewQUICAcceptor(config *conf.Config, args QUICArgs) (*QUICAcceptor, error) {
// implementation
}
```
### Constructor Naming
- Always use `NewXXX` pattern for constructors
- Return pointer and error when validation is needed
- Return pointer only when no validation is required
## Language and Documentation
### Language Requirements
- **ALL comments, documentation, and messages MUST be in English**
- Function documentation must be in English
- Log messages must be in English
- Error messages must be in English
- Variable names and function names in English
### Comment Style
- Write comments only when necessary for clarity or complex logic
- Avoid obvious comments that just repeat the code
- Focus on explaining WHY, not WHAT
- Use concise, clear English
```go
// ValidateArgs checks if struct fields are nil or invalid interfaces
func ValidateArgs(args interface{}) error {
if !fieldVal.CanInterface() {
continue
}
}
```

1
.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
videos/*

View File

@@ -1,6 +1,4 @@
# ubuntu
#FROM ubuntu:latest
FROM golang:1.21-bullseye
FROM golang:1.23-bullseye
RUN apt-get update
RUN apt-get upgrade -y
RUN apt-get install -y build-essential git pkg-config libunistring-dev libaom-dev libdav1d-dev bzip2 nasm wget yasm ca-certificates
@@ -8,14 +6,14 @@ COPY install-ffmpeg.sh /install-ffmpeg.sh
RUN chmod +x /install-ffmpeg.sh && /install-ffmpeg.sh
ENV PKG_CONFIG_PATH=/ffmpeg_build/lib/pkgconfig:${PKG_CONFIG_PATH}
ENV PATH="/usr/local/go/bin:${PATH}"
COPY ./ /app
WORKDIR /app
RUN ls .
RUN go mod download
RUN go build -o /app/bin/liveflow
RUN cp config.toml /app/bin/config.toml
RUN cp index.html /app/bin/index.html
RUN mkdir /app/bin/videos
WORKDIR /app/bin
ENTRYPOINT ["/app/bin/liveflow"]
COPY go.mod go.sum ./
RUN --mount=type=cache,target=/go/pkg/mod \
go mod download
COPY ./ /app
RUN mkdir -p /app/bin/videos
RUN --mount=type=cache,target=/root/.cache/go-build \
go build -o /app
WORKDIR /app
ENV GOGC=10
ENTRYPOINT ["/app/liveflow"]

View File

@@ -56,6 +56,49 @@ The system architecture can be visualized as follows:
docker-compose up liveflow -d --force-recreate --build
```
## Building and Running
There are two ways to run the project: for production and for development.
### For Production
This method builds the frontend assets and embeds them into a single Go binary.
1. **Build the frontend:**
```bash
cd front
npm install
npm run build
```
2. **Build and run the Go application:**
```bash
cd ..
go build -o liveflow
./liveflow
```
3. Access the application at `http://localhost:8044`.
### For Development
This method runs the Go backend and the Vite frontend development server separately, enabling hot-reloading for frontend changes.
1. **Run the Go backend server:**
Open a terminal and run:
```bash
go build -o liveflow
./liveflow
```
2. **Run the frontend development server:**
Open a second terminal and run:
```bash
cd front
npm install
npm run dev
```
3. Access the application via the address shown by the Vite server (e.g., `http://localhost:5173`).
## **Usage**
Start streaming by choosing from the following broadcast options:
@@ -72,9 +115,10 @@ Start streaming by choosing from the following broadcast options:
- **HLS:**
- URL: `http://127.0.0.1:8044/hls/test/master.m3u8`
- Viewer: `http://127.0.0.1:8044/player/test`
- **WHEP:**
- URL: `http://127.0.0.1:8044/`
- URL: `http://127.0.0.1:8044/wv`
- Bearer Token: `test`
- Click the **Subscribe** button.

View File

@@ -5,4 +5,15 @@ port = 1930
llhls = false
disk_ram = true
[docker]
mode = false
mode=false
[mp4]
record=true
[ebml]
record=false
[thumbnail]
enable=true
output_path="./thumbnails"
interval_seconds=5
width=320
height=180

View File

@@ -2,9 +2,12 @@ package config
// Struct to hold the configuration
type Config struct {
RTMP RTMP `mapstructure:"rtmp"`
Service Service `mapstructure:"service"`
Docker DockerConfig `mapstructure:"docker"`
RTMP RTMP `mapstructure:"rtmp"`
Service Service `mapstructure:"service"`
Docker DockerConfig `mapstructure:"docker"`
MP4 MP4 `mapstructure:"mp4"`
EBML EBML `mapstructure:"ebml"`
Thumbnail Thumbnail `mapstructure:"thumbnail"`
}
type RTMP struct {
@@ -20,3 +23,20 @@ type Service struct {
type DockerConfig struct {
Mode bool `mapstructure:"mode"`
}
type MP4 struct {
Record bool `mapstructure:"record"`
}
type EBML struct {
Record bool `mapstructure:"record"`
}
// Thumbnail configuration for thumbnail generation service
type Thumbnail struct {
Enable bool `mapstructure:"enable"`
OutputPath string `mapstructure:"output_path"`
IntervalSeconds int `mapstructure:"interval_seconds"`
Width int `mapstructure:"width"`
Height int `mapstructure:"height"`
}

View File

@@ -1,16 +1,16 @@
# docker-compose
version: "3.4"
version: "2.1"
services:
liveflow:
image: liveflow_custom:latest
mem_limit: 350m
stdin_open: true # docker run -i
tty: true # docker run -t
volumes:
- "~/.store:/app/bin/videos"
- "~/.store:/app/videos"
ports:
- "8044:8044"
- "1930:1930"
- "30000-31000:30000-31000/udp"
- "40000-40010:40000-40010/udp"
environment:
DOCKER_MODE: "true"
build:

24
front/.gitignore vendored Normal file
View File

@@ -0,0 +1,24 @@
# Logs
logs
*.log
npm-debug.log*
yarn-debug.log*
yarn-error.log*
pnpm-debug.log*
lerna-debug.log*
node_modules
dist
dist-ssr
*.local
# Editor directories and files
.vscode/*
!.vscode/extensions.json
.idea
.DS_Store
*.suo
*.ntvs*
*.njsproj
*.sln
*.sw?

1
front/README.md Normal file
View File

@@ -0,0 +1 @@
# React + TypeScript + Vite

28
front/eslint.config.js Normal file
View File

@@ -0,0 +1,28 @@
import js from '@eslint/js'
import globals from 'globals'
import reactHooks from 'eslint-plugin-react-hooks'
import reactRefresh from 'eslint-plugin-react-refresh'
import tseslint from 'typescript-eslint'
export default tseslint.config(
{ ignores: ['dist'] },
{
extends: [js.configs.recommended, ...tseslint.configs.recommended],
files: ['**/*.{ts,tsx}'],
languageOptions: {
ecmaVersion: 2020,
globals: globals.browser,
},
plugins: {
'react-hooks': reactHooks,
'react-refresh': reactRefresh,
},
rules: {
...reactHooks.configs.recommended.rules,
'react-refresh/only-export-components': [
'warn',
{ allowConstantExport: true },
],
},
},
)

13
front/index.html Normal file
View File

@@ -0,0 +1,13 @@
<!doctype html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<link rel="icon" type="image/svg+xml" href="/vite.svg" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>Vite + React + TS</title>
</head>
<body>
<div id="root"></div>
<script type="module" src="/src/main.tsx"></script>
</body>
</html>

3519
front/package-lock.json generated Normal file

File diff suppressed because it is too large Load Diff

31
front/package.json Normal file
View File

@@ -0,0 +1,31 @@
{
"name": "front",
"private": true,
"version": "0.0.0",
"type": "module",
"scripts": {
"dev": "vite",
"build": "tsc -b && vite build",
"lint": "eslint .",
"preview": "vite preview"
},
"dependencies": {
"@mux/mux-video-react": "^0.25.3",
"react": "^19.1.0",
"react-dom": "^19.1.0",
"react-router-dom": "^7.6.2"
},
"devDependencies": {
"@eslint/js": "^9.25.0",
"@types/react": "^19.1.2",
"@types/react-dom": "^19.1.2",
"@vitejs/plugin-react": "^4.4.1",
"eslint": "^9.25.0",
"eslint-plugin-react-hooks": "^5.2.0",
"eslint-plugin-react-refresh": "^0.4.19",
"globals": "^16.0.0",
"typescript": "~5.8.3",
"typescript-eslint": "^8.30.1",
"vite": "^6.3.5"
}
}

1
front/public/vite.svg Normal file
View File

@@ -0,0 +1 @@
<svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" aria-hidden="true" role="img" class="iconify iconify--logos" width="31.88" height="32" preserveAspectRatio="xMidYMid meet" viewBox="0 0 256 257"><defs><linearGradient id="IconifyId1813088fe1fbc01fb466" x1="-.828%" x2="57.636%" y1="7.652%" y2="78.411%"><stop offset="0%" stop-color="#41D1FF"></stop><stop offset="100%" stop-color="#BD34FE"></stop></linearGradient><linearGradient id="IconifyId1813088fe1fbc01fb467" x1="43.376%" x2="50.316%" y1="2.242%" y2="89.03%"><stop offset="0%" stop-color="#FFEA83"></stop><stop offset="8.333%" stop-color="#FFDD35"></stop><stop offset="100%" stop-color="#FFA800"></stop></linearGradient></defs><path fill="url(#IconifyId1813088fe1fbc01fb466)" d="M255.153 37.938L134.897 252.976c-2.483 4.44-8.862 4.466-11.382.048L.875 37.958c-2.746-4.814 1.371-10.646 6.827-9.67l120.385 21.517a6.537 6.537 0 0 0 2.322-.004l117.867-21.483c5.438-.991 9.574 4.796 6.877 9.62Z"></path><path fill="url(#IconifyId1813088fe1fbc01fb467)" d="M185.432.063L96.44 17.501a3.268 3.268 0 0 0-2.634 3.014l-5.474 92.456a3.268 3.268 0 0 0 3.997 3.378l24.777-5.718c2.318-.535 4.413 1.507 3.936 3.838l-7.361 36.047c-.495 2.426 1.782 4.5 4.151 3.78l15.304-4.649c2.372-.72 4.652 1.36 4.15 3.788l-11.698 56.621c-.732 3.542 3.979 5.473 5.943 2.437l1.313-2.028l72.516-144.72c1.215-2.423-.88-5.186-3.54-4.672l-25.505 4.922c-2.396.462-4.435-1.77-3.759-4.114l16.646-57.705c.677-2.35-1.37-4.583-3.769-4.113Z"></path></svg>

After

Width:  |  Height:  |  Size: 1.5 KiB

42
front/src/App.css Normal file
View File

@@ -0,0 +1,42 @@
#root {
max-width: 1280px;
margin: 0 auto;
padding: 2rem;
text-align: center;
}
.logo {
height: 6em;
padding: 1.5em;
will-change: filter;
transition: filter 300ms;
}
.logo:hover {
filter: drop-shadow(0 0 2em #646cffaa);
}
.logo.react:hover {
filter: drop-shadow(0 0 2em #61dafbaa);
}
@keyframes logo-spin {
from {
transform: rotate(0deg);
}
to {
transform: rotate(360deg);
}
}
@media (prefers-reduced-motion: no-preference) {
a:nth-of-type(2) .logo {
animation: logo-spin infinite 20s linear;
}
}
.card {
padding: 2em;
}
.read-the-docs {
color: #888;
}

22
front/src/App.tsx Normal file
View File

@@ -0,0 +1,22 @@
import {
BrowserRouter as Router,
Routes,
Route,
} from "react-router-dom";
import StreamList from "./pages/StreamList.tsx";
import Player from "./pages/Player.tsx";
import './App.css'
import './styles/common.css'
function App() {
return (
<Router>
<Routes>
<Route path="/" element={<StreamList />} />
<Route path="/player/:streamId" element={<Player />} />
</Routes>
</Router>
)
}
export default App

View File

@@ -0,0 +1 @@
<svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" aria-hidden="true" role="img" class="iconify iconify--logos" width="35.93" height="32" preserveAspectRatio="xMidYMid meet" viewBox="0 0 256 228"><path fill="#00D8FF" d="M210.483 73.824a171.49 171.49 0 0 0-8.24-2.597c.465-1.9.893-3.777 1.273-5.621c6.238-30.281 2.16-54.676-11.769-62.708c-13.355-7.7-35.196.329-57.254 19.526a171.23 171.23 0 0 0-6.375 5.848a155.866 155.866 0 0 0-4.241-3.917C100.759 3.829 77.587-4.822 63.673 3.233C50.33 10.957 46.379 33.89 51.995 62.588a170.974 170.974 0 0 0 1.892 8.48c-3.28.932-6.445 1.924-9.474 2.98C17.309 83.498 0 98.307 0 113.668c0 15.865 18.582 31.778 46.812 41.427a145.52 145.52 0 0 0 6.921 2.165a167.467 167.467 0 0 0-2.01 9.138c-5.354 28.2-1.173 50.591 12.134 58.266c13.744 7.926 36.812-.22 59.273-19.855a145.567 145.567 0 0 0 5.342-4.923a168.064 168.064 0 0 0 6.92 6.314c21.758 18.722 43.246 26.282 56.54 18.586c13.731-7.949 18.194-32.003 12.4-61.268a145.016 145.016 0 0 0-1.535-6.842c1.62-.48 3.21-.974 4.76-1.488c29.348-9.723 48.443-25.443 48.443-41.52c0-15.417-17.868-30.326-45.517-39.844Zm-6.365 70.984c-1.4.463-2.836.91-4.3 1.345c-3.24-10.257-7.612-21.163-12.963-32.432c5.106-11 9.31-21.767 12.459-31.957c2.619.758 5.16 1.557 7.61 2.4c23.69 8.156 38.14 20.213 38.14 29.504c0 9.896-15.606 22.743-40.946 31.14Zm-10.514 20.834c2.562 12.94 2.927 24.64 1.23 33.787c-1.524 8.219-4.59 13.698-8.382 15.893c-8.067 4.67-25.32-1.4-43.927-17.412a156.726 156.726 0 0 1-6.437-5.87c7.214-7.889 14.423-17.06 21.459-27.246c12.376-1.098 24.068-2.894 34.671-5.345a134.17 134.17 0 0 1 1.386 6.193ZM87.276 214.515c-7.882 2.783-14.16 2.863-17.955.675c-8.075-4.657-11.432-22.636-6.853-46.752a156.923 156.923 0 0 1 1.869-8.499c10.486 2.32 22.093 3.988 34.498 4.994c7.084 9.967 14.501 19.128 21.976 27.15a134.668 134.668 0 0 1-4.877 4.492c-9.933 8.682-19.886 14.842-28.658 17.94ZM50.35 144.747c-12.483-4.267-22.792-9.812-29.858-15.863c-6.35-5.437-9.555-10.836-9.555-15.216c0-9.322 13.897-21.212 37.076-29.293c2.813-.98 5.757-1.905 8.812-2.773c3.204 10.42 7.406 21.315 12.477 32.332c-5.137 11.18-9.399 22.249-12.634 32.792a134.718 134.718 0 0 1-6.318-1.979Zm12.378-84.26c-4.811-24.587-1.616-43.134 6.425-47.789c8.564-4.958 27.502 2.111 47.463 19.835a144.318 144.318 0 0 1 3.841 3.545c-7.438 7.987-14.787 17.08-21.808 26.988c-12.04 1.116-23.565 2.908-34.161 5.309a160.342 160.342 0 0 1-1.76-7.887Zm110.427 27.268a347.8 347.8 0 0 0-7.785-12.803c8.168 1.033 15.994 2.404 23.343 4.08c-2.206 7.072-4.956 14.465-8.193 22.045a381.151 381.151 0 0 0-7.365-13.322Zm-45.032-43.861c5.044 5.465 10.096 11.566 15.065 18.186a322.04 322.04 0 0 0-30.257-.006c4.974-6.559 10.069-12.652 15.192-18.18ZM82.802 87.83a323.167 323.167 0 0 0-7.227 13.238c-3.184-7.553-5.909-14.98-8.134-22.152c7.304-1.634 15.093-2.97 23.209-3.984a321.524 321.524 0 0 0-7.848 12.897Zm8.081 65.352c-8.385-.936-16.291-2.203-23.593-3.793c2.26-7.3 5.045-14.885 8.298-22.6a321.187 321.187 0 0 0 7.257 13.246c2.594 4.48 5.28 8.868 8.038 13.147Zm37.542 31.03c-5.184-5.592-10.354-11.779-15.403-18.433c4.902.192 9.899.29 14.978.29c5.218 0 10.376-.117 15.453-.343c-4.985 6.774-10.018 12.97-15.028 18.486Zm52.198-57.817c3.422 7.8 6.306 15.345 8.596 22.52c-7.422 1.694-15.436 3.058-23.88 4.071a382.417 382.417 0 0 0 7.859-13.026a347.403 347.403 0 0 0 7.425-13.565Zm-16.898 8.101a358.557 358.557 0 0 1-12.281 19.815a329.4 329.4 0 0 1-23.444.823c-7.967 0-15.716-.248-23.178-.732a310.202 310.202 0 0 1-12.513-19.846h.001a307.41 307.41 0 0 1-10.923-20.627a310.278 310.278 0 0 1 10.89-20.637l-.001.001a307.318 307.318 0 0 1 12.413-19.761c7.613-.576 15.42-.876 23.31-.876H128c7.926 0 15.743.303 23.354.883a329.357 329.357 0 0 1 12.335 19.695a358.489 358.489 0 0 1 11.036 20.54a329.472 329.472 0 0 1-11 20.722Zm22.56-122.124c8.572 4.944 11.906 24.881 6.52 51.026c-.344 1.668-.73 3.367-1.15 5.09c-10.622-2.452-22.155-4.275-34.23-5.408c-7.034-10.017-14.323-19.124-21.64-27.008a160.789 160.789 0 0 1 5.888-5.4c18.9-16.447 36.564-22.941 44.612-18.3ZM128 90.808c12.625 0 22.86 10.235 22.86 22.86s-10.235 22.86-22.86 22.86s-22.86-10.235-22.86-22.86s10.235-22.86 22.86-22.86Z"></path></svg>

After

Width:  |  Height:  |  Size: 4.0 KiB

68
front/src/index.css Normal file
View File

@@ -0,0 +1,68 @@
:root {
font-family: system-ui, Avenir, Helvetica, Arial, sans-serif;
line-height: 1.5;
font-weight: 400;
color-scheme: light dark;
color: rgba(255, 255, 255, 0.87);
background-color: #242424;
font-synthesis: none;
text-rendering: optimizeLegibility;
-webkit-font-smoothing: antialiased;
-moz-osx-font-smoothing: grayscale;
}
a {
font-weight: 500;
color: #646cff;
text-decoration: inherit;
}
a:hover {
color: #535bf2;
}
body {
margin: 0;
display: flex;
place-items: center;
min-width: 320px;
min-height: 100vh;
}
h1 {
font-size: 3.2em;
line-height: 1.1;
}
button {
border-radius: 8px;
border: 1px solid transparent;
padding: 0.6em 1.2em;
font-size: 1em;
font-weight: 500;
font-family: inherit;
background-color: #1a1a1a;
cursor: pointer;
transition: border-color 0.25s;
}
button:hover {
border-color: #646cff;
}
button:focus,
button:focus-visible {
outline: 4px auto -webkit-focus-ring-color;
}
@media (prefers-color-scheme: light) {
:root {
color: #213547;
background-color: #ffffff;
}
a:hover {
color: #747bff;
}
button {
background-color: #f9f9f9;
}
}

10
front/src/main.tsx Normal file
View File

@@ -0,0 +1,10 @@
import { StrictMode } from 'react'
import { createRoot } from 'react-dom/client'
import './index.css'
import App from './App.tsx'
createRoot(document.getElementById('root')!).render(
<StrictMode>
<App />
</StrictMode>,
)

373
front/src/pages/Player.css Normal file
View File

@@ -0,0 +1,373 @@
.player-container {
min-height: 100vh;
background: var(--primary-gradient);
font-family: var(--font-family);
color: var(--text-primary);
}
.player-header {
display: flex;
align-items: center;
gap: 2rem;
padding: 2rem;
border-bottom: 1px solid var(--glass-border);
}
.back-button {
background: var(--glass-bg);
color: var(--text-primary);
border: none;
padding: 0.75rem 1.5rem;
border-radius: var(--border-radius-small);
font-size: 1rem;
font-weight: 500;
cursor: pointer;
transition: all 0.3s ease;
text-decoration: none;
display: inline-flex;
align-items: center;
backdrop-filter: blur(10px);
border: 1px solid var(--glass-border);
}
.back-button:hover {
background: rgba(255, 255, 255, 0.3);
transform: translateY(-2px);
box-shadow: var(--shadow-light);
}
.stream-meta {
flex: 1;
}
.stream-title {
font-size: 2.5rem;
font-weight: 700;
margin: 0 0 0.5rem 0;
}
.stream-stats {
display: flex;
align-items: center;
gap: 1.5rem;
flex-wrap: wrap;
}
.live-badge {
background: var(--live-gradient);
color: white;
padding: 0.5rem 1rem;
border-radius: var(--border-radius-large);
font-size: 0.9rem;
font-weight: 600;
text-transform: uppercase;
letter-spacing: 0.05em;
animation: pulse-live 2s infinite;
box-shadow: 0 4px 12px rgba(239, 68, 68, 0.3);
}
@keyframes pulse-live {
0%, 50% { opacity: 1; }
51%, 100% { opacity: 0.8; }
}
.viewer-count {
font-size: 0.9rem;
font-weight: 500;
}
.player-wrapper {
padding: 2rem;
max-width: 1400px;
margin: 0 auto;
}
.video-container {
position: relative;
background: #000;
border-radius: var(--border-radius);
overflow: hidden;
box-shadow: var(--shadow-heavy);
aspect-ratio: 16/9;
}
.video-player {
width: 100%;
height: 100%;
border-radius: var(--border-radius);
}
.video-loading-overlay {
position: absolute;
top: 0;
left: 0;
right: 0;
bottom: 0;
background: rgba(0, 0, 0, 0.8);
display: flex;
flex-direction: column;
align-items: center;
justify-content: center;
z-index: 10;
backdrop-filter: blur(5px);
}
.video-loading-overlay p {
margin-top: 1rem;
font-size: 1.1rem;
color: white;
}
.player-loading {
display: flex;
flex-direction: column;
align-items: center;
justify-content: center;
min-height: 400px;
}
.player-loading p {
font-size: 1.2rem;
margin: 0;
opacity: 0.9;
}
.player-error {
text-align: center;
padding: 3rem;
background: var(--glass-bg);
border-radius: var(--border-radius);
backdrop-filter: blur(10px);
border: 1px solid var(--glass-border);
}
.error-icon {
font-size: 4rem;
margin-bottom: 1rem;
}
.player-error h2,
.player-error h3 {
color: #fca5a5;
margin-bottom: 1rem;
font-size: 2rem;
}
.player-error p {
margin-bottom: 2rem;
font-size: 1.1rem;
opacity: 0.9;
line-height: 1.6;
}
.error-actions {
display: flex;
gap: 1rem;
justify-content: center;
flex-wrap: wrap;
}
.retry-button {
background: var(--button-primary);
color: white;
border: none;
padding: 0.75rem 2rem;
border-radius: var(--border-radius-small);
font-size: 1rem;
font-weight: 600;
cursor: pointer;
transition: all 0.3s ease;
}
.retry-button:hover {
transform: translateY(-2px);
box-shadow: 0 8px 16px rgba(59, 130, 246, 0.3);
}
.back-link {
background: var(--glass-bg);
color: var(--text-primary);
text-decoration: none;
padding: 0.75rem 2rem;
border-radius: var(--border-radius-small);
font-size: 1rem;
font-weight: 500;
transition: all 0.3s ease;
backdrop-filter: blur(10px);
border: 1px solid var(--glass-border);
}
.back-link:hover {
background: rgba(255, 255, 255, 0.3);
transform: translateY(-2px);
}
.player-info {
max-width: 1400px;
margin: 0 auto;
padding: 2rem;
}
.stream-details,
.player-controls-info {
background: var(--glass-bg);
padding: 2rem;
border-radius: var(--border-radius);
backdrop-filter: blur(10px);
border: 1px solid var(--glass-border);
}
.stream-details h3,
.player-controls-info h3 {
font-size: 1.5rem;
font-weight: 600;
margin: 0 0 1.5rem 0;
color: #e0e7ff;
}
.detail-item {
display: flex;
justify-content: space-between;
align-items: center;
margin-bottom: 1rem;
padding: 0.75rem 0;
border-bottom: 1px solid rgba(255, 255, 255, 0.1);
}
.detail-item:last-child {
border-bottom: none;
margin-bottom: 0;
}
.detail-label {
font-weight: 500;
opacity: 0.8;
}
.detail-value {
font-weight: 600;
}
.status-live {
color: #fca5a5;
font-weight: 600;
}
.status-offline {
color: #9ca3af;
font-weight: 600;
}
.player-controls-info ul {
list-style: none;
padding: 0;
margin: 0;
}
.player-controls-info li {
padding: 0.75rem 0;
border-bottom: 1px solid rgba(255, 255, 255, 0.1);
display: flex;
align-items: center;
gap: 1rem;
}
.player-controls-info li:last-child {
border-bottom: none;
}
.player-controls-info kbd {
padding: 0.25rem 0.5rem;
border-radius: 6px;
font-size: 0.9rem;
font-weight: 600;
min-width: 3rem;
text-align: center;
}
/* 반응형 디자인 */
@media (max-width: 768px) {
.player-header {
flex-direction: column;
align-items: flex-start;
gap: 1rem;
padding: 1.5rem;
}
.stream-title {
font-size: 2rem;
}
.stream-stats {
gap: 1rem;
}
.player-wrapper {
padding: 1rem;
}
.player-info {
padding: 1rem;
}
.stream-details,
.player-controls-info {
padding: 1.5rem;
}
.error-actions {
flex-direction: column;
align-items: center;
}
}
@media (max-width: 480px) {
.stream-title {
font-size: 1.5rem;
}
.live-badge,
.viewer-count {
font-size: 0.8rem;
padding: 0.4rem 0.8rem;
}
.back-button {
padding: 0.6rem 1.2rem;
font-size: 0.9rem;
}
.detail-item {
flex-direction: column;
align-items: flex-start;
gap: 0.5rem;
}
}
/* 다크 모드 지원 */
@media (prefers-color-scheme: dark) {
.video-container {
box-shadow: var(--shadow-heavy);
}
}
/* 접근성 개선 */
@media (prefers-reduced-motion: reduce) {
.live-badge {
animation: none;
}
.back-button:hover,
.retry-button:hover,
.back-link:hover {
transform: none;
}
}
/* 포커스 스타일 */
.back-button:focus,
.retry-button:focus,
.back-link:focus {
outline: 3px solid #60a5fa;
outline-offset: 2px;
}

162
front/src/pages/Player.tsx Normal file
View File

@@ -0,0 +1,162 @@
import MuxPlayer from "@mux/mux-video-react";
import { useParams, Link, useNavigate } from 'react-router-dom';
import { useState, useEffect } from 'react';
import './Player.css';
interface StreamInfo {
id: string;
title: string;
isLive: boolean;
viewers?: number;
}
function Player() {
const { streamId } = useParams<{ streamId: string }>();
const navigate = useNavigate();
const [isLoading, setIsLoading] = useState(true);
const [error, setError] = useState<string | null>(null);
const [streamInfo, setStreamInfo] = useState<StreamInfo | null>(null);
const [isPlayerReady, setIsPlayerReady] = useState(false);
useEffect(() => {
if (!streamId) {
setError('Stream ID not provided');
setIsLoading(false);
return;
}
// 스트림 정보 설정 (실제로는 API에서 가져올 수 있음)
setStreamInfo({
id: streamId,
title: streamId,
isLive: true,
viewers: Math.floor(Math.random() * 1000) + 1
});
setIsLoading(false);
}, [streamId]);
const handlePlayerLoadStart = () => {
setIsPlayerReady(false);
};
const handlePlayerLoadedData = () => {
setIsPlayerReady(true);
};
const handlePlayerError = () => {
setError('Failed to load stream. The stream may be offline or unavailable.');
};
const handleGoBack = () => {
navigate('/');
};
if (!streamId) {
return (
<div className="player-container">
<div className="error-container">
<h2>Error</h2>
<p>Stream ID not provided</p>
<Link to="/" className="glass-button">
Back to Streams
</Link>
</div>
</div>
);
}
const playbackUrl = `${window.location.protocol}//${window.location.host}/hls/${streamId}/master.m3u8`;
return (
<div className="player-container">
<div className="player-header glass-panel">
<button onClick={handleGoBack} className="glass-button">
Back to Streams
</button>
<div className="stream-meta">
<h1 className="stream-title title-gradient">{streamInfo?.title || streamId}</h1>
<div className="stream-stats">
{streamInfo?.isLive && (
<span className="live-badge"> LIVE</span>
)}
{streamInfo?.viewers && (
<span className="viewer-count">
👥 {streamInfo.viewers.toLocaleString()} viewers
</span>
)}
</div>
</div>
</div>
<div className="player-wrapper">
{isLoading ? (
<div className="player-loading glass-panel">
<div className="loading-spinner"></div>
<p>Loading stream...</p>
</div>
) : error ? (
<div className="error-container">
<div className="error-icon"></div>
<h3>Stream Unavailable</h3>
<p>{error}</p>
<div className="error-actions">
<button onClick={() => window.location.reload()} className="btn-primary">
🔄 Retry
</button>
<Link to="/" className="btn-secondary">
Back to Streams
</Link>
</div>
</div>
) : (
<div className="video-container">
{!isPlayerReady && (
<div className="video-loading-overlay">
<div className="loading-spinner"></div>
<p>Loading video...</p>
</div>
)}
<MuxPlayer
className="video-player"
src={playbackUrl}
autoPlay
muted
controls
preferPlayback="mse"
onLoadStart={handlePlayerLoadStart}
onLoadedData={handlePlayerLoadedData}
onError={handlePlayerError}
/>
</div>
)}
</div>
<div className="player-info grid-responsive grid-2">
<div className="stream-details glass-panel">
<h3>Stream Information</h3>
<div className="detail-item">
<span className="detail-label">Stream ID:</span>
<span className="detail-value">{streamId}</span>
</div>
<div className="detail-item">
<span className="detail-label">Status:</span>
<span className="detail-value">
{streamInfo?.isLive ? (
<span className="status-live">🔴 Live</span>
) : (
<span className="status-offline"> Offline</span>
)}
</span>
</div>
<div className="detail-item">
<span className="detail-label">Quality:</span>
<span className="detail-value">Auto (HLS)</span>
</div>
</div>
</div>
</div>
);
}
export default Player;

View File

@@ -0,0 +1,308 @@
.stream-list-container {
max-width: 1200px;
margin: 0 auto;
padding: 2rem;
min-height: 100vh;
background: var(--primary-gradient);
font-family: var(--font-family);
}
.stream-list-header {
text-align: center;
margin-bottom: 3rem;
color: var(--text-primary);
}
.stream-list-header h1 {
margin: 0 0 0.5rem 0;
}
.stream-count {
opacity: 0.9;
margin: 0;
font-weight: 400;
}
.streams-grid {
margin-bottom: 2rem;
}
.stream-card {
background: white;
border-radius: var(--border-radius);
overflow: hidden;
box-shadow: var(--shadow-light);
transition: all 0.3s cubic-bezier(0.4, 0, 0.2, 1);
text-decoration: none;
color: inherit;
position: relative;
backdrop-filter: blur(10px);
border: 1px solid var(--glass-border);
}
.stream-card:hover {
transform: translateY(-8px) scale(1.02);
box-shadow: var(--shadow-medium);
}
.stream-card:active {
transform: translateY(-4px) scale(1.01);
}
.stream-thumbnail {
width: 100%;
height: 200px;
position: relative;
overflow: hidden;
background: linear-gradient(45deg, #f0f0f0, #e0e0e0);
}
.stream-thumbnail img {
width: 100%;
height: 100%;
object-fit: cover;
transition: transform 0.3s ease;
}
.stream-card:hover .stream-thumbnail img {
transform: scale(1.05);
}
.thumbnail-placeholder {
width: 100%;
height: 100%;
display: flex;
align-items: center;
justify-content: center;
background: var(--primary-gradient);
position: relative;
}
.play-icon {
font-size: 3rem;
color: white;
text-shadow: 0 2px 4px rgba(0, 0, 0, 0.3);
animation: pulse 2s infinite;
}
@keyframes pulse {
0%, 100% { transform: scale(1); }
50% { transform: scale(1.05); }
}
.stream-info {
padding: 1.5rem;
}
.stream-title {
font-size: 1.25rem;
font-weight: 600;
margin: 0 0 0.5rem 0;
color: #1f2937;
line-height: 1.4;
}
.stream-status {
display: flex;
align-items: center;
gap: 0.5rem;
}
.live-indicator {
background: var(--live-gradient);
color: white;
padding: 0.25rem 0.75rem;
border-radius: var(--border-radius-large);
font-size: 0.75rem;
font-weight: 600;
text-transform: uppercase;
letter-spacing: 0.05em;
animation: blink 2s infinite;
}
@keyframes blink {
0%, 50% { opacity: 1; }
51%, 100% { opacity: 0.7; }
}
.loading {
display: flex;
flex-direction: column;
align-items: center;
justify-content: center;
min-height: 60vh;
color: var(--text-primary);
}
.loading-spinner {
width: 50px;
height: 50px;
border: 4px solid rgba(255, 255, 255, 0.3);
border-top: 4px solid white;
border-radius: 50%;
animation: spin 1s linear infinite;
margin-bottom: 1rem;
}
@keyframes spin {
0% { transform: rotate(0deg); }
100% { transform: rotate(360deg); }
}
.loading p {
font-size: 1.2rem;
margin: 0;
opacity: 0.9;
}
.error {
text-align: center;
padding: 3rem;
background: var(--glass-bg);
border-radius: var(--border-radius);
color: var(--text-primary);
backdrop-filter: blur(10px);
border: 1px solid var(--glass-border);
}
.error h2 {
color: #fca5a5;
margin-bottom: 1rem;
font-size: 2rem;
}
.error p {
margin-bottom: 2rem;
font-size: 1.1rem;
opacity: 0.9;
}
.error button {
background: var(--button-primary);
color: white;
border: none;
padding: 0.75rem 2rem;
border-radius: var(--border-radius-small);
font-size: 1rem;
font-weight: 600;
cursor: pointer;
transition: all 0.3s ease;
}
.error button:hover {
transform: translateY(-2px);
box-shadow: 0 8px 16px rgba(59, 130, 246, 0.3);
}
.empty-state {
text-align: center;
padding: 4rem 2rem;
color: var(--text-primary);
}
.empty-icon {
font-size: 4rem;
margin-bottom: 1.5rem;
opacity: 0.8;
}
.empty-state h2 {
margin-bottom: 1rem;
font-weight: 600;
}
.empty-state p {
font-size: 1.2rem;
margin-bottom: 0.5rem;
opacity: 0.9;
line-height: 1.6;
}
/* 반응형 디자인 */
@media (max-width: 768px) {
.stream-list-container {
padding: 1rem;
}
.stream-list-header h1 {
font-size: 2.5rem;
}
.streams-grid {
grid-template-columns: 1fr;
gap: 1.5rem;
}
.stream-thumbnail {
height: 180px;
}
.empty-state {
padding: 3rem 1rem;
}
.empty-state h2 {
font-size: 2rem;
}
}
@media (max-width: 480px) {
.stream-list-header h1 {
font-size: 2rem;
}
.stream-count {
font-size: 1rem;
}
.stream-thumbnail {
height: 160px;
}
.stream-info {
padding: 1rem;
}
.stream-title {
font-size: 1.1rem;
}
}
/* 다크 모드 지원 */
@media (prefers-color-scheme: dark) {
.stream-card {
background: rgba(255, 255, 255, 0.05);
border: 1px solid rgba(255, 255, 255, 0.1);
}
.stream-title {
color: #f9fafb;
}
}
/* 접근성 개선 */
@media (prefers-reduced-motion: reduce) {
.stream-card,
.stream-thumbnail img,
.play-icon,
.live-indicator,
.loading-spinner {
animation: none;
transition: none;
}
.stream-card:hover {
transform: none;
}
}
/* 포커스 스타일 */
.stream-card:focus {
outline: 3px solid #3b82f6;
outline-offset: 2px;
}
.error button:focus {
outline: 3px solid #60a5fa;
outline-offset: 2px;
}

View File

@@ -0,0 +1,142 @@
import { useState, useEffect } from 'react';
import { Link } from 'react-router-dom';
import './StreamList.css';
interface StreamsResponse {
error_code: number;
message: string;
data: {
streams: string[];
};
}
interface StreamCardProps {
streamId: string;
}
function StreamCard({ streamId }: StreamCardProps) {
// 썸네일 이미지가 로드되지 않을 경우 기본 이미지를 보여주기 위한 상태
const [imageError, setImageError] = useState(false);
const handleImageError = () => {
setImageError(true);
};
return (
<Link to={`/player/${streamId}`} className="stream-card">
<div className="stream-thumbnail">
{!imageError ? (
<img
src={`/thumbnail/${streamId}`}
alt={`${streamId} thumbnail`}
onError={handleImageError}
loading="lazy"
/>
) : (
<div className="thumbnail-placeholder">
<div className="play-icon"></div>
</div>
)}
</div>
<div className="stream-info">
<h3 className="stream-title">{streamId}</h3>
<div className="stream-status">
<span className="live-badge"> LIVE</span>
</div>
</div>
</Link>
);
}
function StreamList() {
const [streams, setStreams] = useState<string[]>([]);
const [error, setError] = useState<string | null>(null);
const [loading, setLoading] = useState(true);
useEffect(() => {
const fetchStreams = async () => {
try {
setLoading(true);
const response = await fetch('/streams');
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
const result: StreamsResponse = await response.json();
if (result.error_code === 0 && result.data && result.data.streams) {
setStreams(result.data.streams);
} else {
throw new Error(result.message || 'Failed to fetch streams');
}
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error occurred';
setError(errorMessage);
console.error("Error fetching streams:", error);
} finally {
setLoading(false);
}
};
fetchStreams();
// 30초마다 스트림 목록을 새로고침
const interval = setInterval(fetchStreams, 30000);
return () => clearInterval(interval);
}, []);
if (loading) {
return (
<div className="stream-list-container">
<div className="loading">
<div className="loading-spinner"></div>
<p>Loading streams...</p>
</div>
</div>
);
}
if (error) {
return (
<div className="stream-list-container">
<div className="error-container">
<h2>Error</h2>
<p>{error}</p>
<button onClick={() => window.location.reload()} className="btn-primary">
Retry
</button>
</div>
</div>
);
}
return (
<div className="stream-list-container">
<header className="stream-list-header">
<h1 className="title-gradient text-large">Live Streams</h1>
<p className="stream-count text-normal">
{streams.length > 0
? `${streams.length} active stream${streams.length > 1 ? 's' : ''}`
: 'No active streams'
}
</p>
</header>
{streams.length > 0 ? (
<div className="streams-grid grid-responsive grid-auto">
{streams.map(streamId => (
<StreamCard key={streamId} streamId={streamId} />
))}
</div>
) : (
<div className="empty-state">
<div className="empty-icon">📺</div>
<h2 className="text-medium">No Live Streams</h2>
<p>There are no active streams at the moment.</p>
<p>Check back later or start streaming!</p>
</div>
)}
</div>
);
}
export default StreamList;

246
front/src/styles/common.css Normal file
View File

@@ -0,0 +1,246 @@
/* 공통 변수 */
:root {
--primary-gradient: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
--glass-bg: rgba(255, 255, 255, 0.1);
--glass-border: rgba(255, 255, 255, 0.2);
--text-primary: white;
--text-secondary: rgba(255, 255, 255, 0.9);
--text-muted: rgba(255, 255, 255, 0.7);
--live-gradient: linear-gradient(45deg, #ef4444, #dc2626);
--button-primary: linear-gradient(45deg, #3b82f6, #1d4ed8);
--shadow-light: 0 8px 32px rgba(0, 0, 0, 0.1);
--shadow-medium: 0 20px 40px rgba(0, 0, 0, 0.15);
--shadow-heavy: 0 20px 40px rgba(0, 0, 0, 0.3);
--border-radius: 16px;
--border-radius-small: 12px;
--border-radius-large: 20px;
--font-family: 'Inter', -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
}
/* 공통 컨테이너 */
.app-container {
min-height: 100vh;
background: var(--primary-gradient);
font-family: var(--font-family);
color: var(--text-primary);
}
/* 공통 글래스모피즘 스타일 */
.glass-panel {
background: var(--glass-bg);
backdrop-filter: blur(10px);
border: 1px solid var(--glass-border);
border-radius: var(--border-radius);
}
.glass-button {
background: var(--glass-bg);
color: var(--text-primary);
border: none;
padding: 0.75rem 1.5rem;
border-radius: var(--border-radius-small);
font-size: 1rem;
font-weight: 500;
cursor: pointer;
transition: all 0.3s ease;
text-decoration: none;
display: inline-flex;
align-items: center;
backdrop-filter: blur(10px);
border: 1px solid var(--glass-border);
}
.glass-button:hover {
background: rgba(255, 255, 255, 0.3);
transform: translateY(-2px);
box-shadow: var(--shadow-light);
}
/* 공통 로딩 스피너 */
.loading-spinner {
width: 50px;
height: 50px;
border: 4px solid rgba(255, 255, 255, 0.3);
border-top: 4px solid white;
border-radius: 50%;
animation: spin 1s linear infinite;
}
@keyframes spin {
0% { transform: rotate(0deg); }
100% { transform: rotate(360deg); }
}
/* 공통 라이브 배지 */
.live-badge {
background: var(--live-gradient);
color: white;
padding: 0.5rem 1rem;
border-radius: var(--border-radius-large);
font-size: 0.9rem;
font-weight: 600;
text-transform: uppercase;
letter-spacing: 0.05em;
animation: pulse-live 2s infinite;
box-shadow: 0 4px 12px rgba(239, 68, 68, 0.3);
}
@keyframes pulse-live {
0%, 50% { opacity: 1; }
51%, 100% { opacity: 0.8; }
}
/* 공통 버튼 스타일 */
.btn-primary {
background: var(--button-primary);
color: white;
border: none;
padding: 0.75rem 2rem;
border-radius: var(--border-radius-small);
font-size: 1rem;
font-weight: 600;
cursor: pointer;
transition: all 0.3s ease;
text-decoration: none;
display: inline-flex;
align-items: center;
justify-content: center;
}
.btn-primary:hover {
transform: translateY(-2px);
box-shadow: 0 8px 16px rgba(59, 130, 246, 0.3);
}
.btn-secondary {
background: var(--glass-bg);
color: var(--text-primary);
border: 1px solid var(--glass-border);
padding: 0.75rem 2rem;
border-radius: var(--border-radius-small);
font-size: 1rem;
font-weight: 500;
cursor: pointer;
transition: all 0.3s ease;
text-decoration: none;
display: inline-flex;
align-items: center;
justify-content: center;
backdrop-filter: blur(10px);
}
.btn-secondary:hover {
background: rgba(255, 255, 255, 0.3);
transform: translateY(-2px);
}
/* 공통 타이포그래피 */
.title-gradient {
background: linear-gradient(45deg, #fff, #e0e7ff);
-webkit-background-clip: text;
-webkit-text-fill-color: transparent;
background-clip: text;
text-shadow: 0 2px 4px rgba(0, 0, 0, 0.3);
}
.text-large {
font-size: 3rem;
font-weight: 700;
line-height: 1.2;
}
.text-medium {
font-size: 2rem;
font-weight: 600;
line-height: 1.3;
}
.text-normal {
font-size: 1.2rem;
font-weight: 400;
line-height: 1.5;
}
/* 공통 에러 스타일 */
.error-container {
text-align: center;
padding: 3rem;
background: var(--glass-bg);
border-radius: var(--border-radius);
backdrop-filter: blur(10px);
border: 1px solid var(--glass-border);
}
.error-container h2,
.error-container h3 {
color: #fca5a5;
margin-bottom: 1rem;
}
.error-container p {
margin-bottom: 2rem;
font-size: 1.1rem;
opacity: 0.9;
line-height: 1.6;
}
/* 공통 그리드 */
.grid-responsive {
display: grid;
gap: 2rem;
}
.grid-1 { grid-template-columns: 1fr; }
.grid-2 { grid-template-columns: repeat(2, 1fr); }
.grid-3 { grid-template-columns: repeat(3, 1fr); }
.grid-auto { grid-template-columns: repeat(auto-fill, minmax(320px, 1fr)); }
/* 공통 반응형 */
@media (max-width: 1024px) {
.grid-3 { grid-template-columns: repeat(2, 1fr); }
.grid-2 { grid-template-columns: 1fr; }
}
@media (max-width: 768px) {
.text-large { font-size: 2.5rem; }
.text-medium { font-size: 1.8rem; }
.grid-responsive { gap: 1.5rem; }
.grid-auto { grid-template-columns: 1fr; }
}
@media (max-width: 480px) {
.text-large { font-size: 2rem; }
.text-medium { font-size: 1.5rem; }
.text-normal { font-size: 1rem; }
.grid-responsive { gap: 1rem; }
}
/* 접근성 */
@media (prefers-reduced-motion: reduce) {
.loading-spinner,
.live-badge {
animation: none;
}
.glass-button:hover,
.btn-primary:hover,
.btn-secondary:hover {
transform: none;
}
}
/* 포커스 스타일 */
.glass-button:focus,
.btn-primary:focus,
.btn-secondary:focus {
outline: 3px solid #60a5fa;
outline-offset: 2px;
}
/* 다크 모드 지원 */
@media (prefers-color-scheme: dark) {
:root {
--glass-bg: rgba(255, 255, 255, 0.05);
--glass-border: rgba(255, 255, 255, 0.1);
}
}

1
front/src/vite-env.d.ts vendored Normal file
View File

@@ -0,0 +1 @@
/// <reference types="vite/client" />

27
front/tsconfig.app.json Normal file
View File

@@ -0,0 +1,27 @@
{
"compilerOptions": {
"tsBuildInfoFile": "./node_modules/.tmp/tsconfig.app.tsbuildinfo",
"target": "ES2020",
"useDefineForClassFields": true,
"lib": ["ES2020", "DOM", "DOM.Iterable"],
"module": "ESNext",
"skipLibCheck": true,
/* Bundler mode */
"moduleResolution": "bundler",
"allowImportingTsExtensions": true,
"verbatimModuleSyntax": true,
"moduleDetection": "force",
"noEmit": true,
"jsx": "react-jsx",
/* Linting */
"strict": true,
"noUnusedLocals": true,
"noUnusedParameters": true,
"erasableSyntaxOnly": true,
"noFallthroughCasesInSwitch": true,
"noUncheckedSideEffectImports": true
},
"include": ["src"]
}

7
front/tsconfig.json Normal file
View File

@@ -0,0 +1,7 @@
{
"files": [],
"references": [
{ "path": "./tsconfig.app.json" },
{ "path": "./tsconfig.node.json" }
]
}

25
front/tsconfig.node.json Normal file
View File

@@ -0,0 +1,25 @@
{
"compilerOptions": {
"tsBuildInfoFile": "./node_modules/.tmp/tsconfig.node.tsbuildinfo",
"target": "ES2022",
"lib": ["ES2023"],
"module": "ESNext",
"skipLibCheck": true,
/* Bundler mode */
"moduleResolution": "bundler",
"allowImportingTsExtensions": true,
"verbatimModuleSyntax": true,
"moduleDetection": "force",
"noEmit": true,
/* Linting */
"strict": true,
"noUnusedLocals": true,
"noUnusedParameters": true,
"erasableSyntaxOnly": true,
"noFallthroughCasesInSwitch": true,
"noUncheckedSideEffectImports": true
},
"include": ["vite.config.ts"]
}

20
front/vite.config.ts Normal file
View File

@@ -0,0 +1,20 @@
import { defineConfig } from 'vite'
import react from '@vitejs/plugin-react'
// https://vitejs.dev/config/
export default defineConfig({
plugins: [react()],
server: {
host: '0.0.0.0',
proxy: {
'/hls': {
target: 'http://localhost:8044',
changeOrigin: true,
},
'/streams': {
target: 'http://localhost:8044',
changeOrigin: true,
},
}
}
})

4
go.mod
View File

@@ -1,6 +1,6 @@
module liveflow
go 1.21
go 1.23
require (
github.com/asticode/go-astiav v0.19.0
@@ -31,6 +31,7 @@ require (
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
github.com/google/uuid v1.4.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
@@ -77,6 +78,7 @@ require (
golang.org/x/net v0.26.0 // indirect
golang.org/x/sys v0.22.0 // indirect
golang.org/x/text v0.16.0 // indirect
golang.org/x/time v0.5.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect

4
go.sum
View File

@@ -30,6 +30,8 @@ github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHk
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY=
github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
@@ -248,6 +250,8 @@ golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=

View File

@@ -19,6 +19,12 @@ const (
cacheControl = "CDN-Cache-Control"
)
type APIResponse struct {
ErrorCode int `json:"error_code"`
Message string `json:"message"`
Data interface{} `json:"data,omitempty"`
}
type Handler struct {
endpoint *hlshub.HLSHub
}
@@ -29,6 +35,19 @@ func NewHandler(hlsEndpoint *hlshub.HLSHub) *Handler {
}
}
type StreamsResponse struct {
Streams []string `json:"streams"`
}
func (h *Handler) HandleListStreams(c echo.Context) error {
streams := h.endpoint.WorkIDs()
return c.JSON(http.StatusOK, APIResponse{
ErrorCode: 0,
Message: "success",
Data: StreamsResponse{Streams: streams},
})
}
func (h *Handler) HandleMasterM3U8(c echo.Context) error {
ctx := context.Background()
log.Info(ctx, "HandleMasterM3U8")
@@ -94,6 +113,7 @@ func (h *Handler) HandleM3U8(c echo.Context) error {
extension := filepath.Ext(c.Request().URL.String())
switch extension {
case ".m3u8":
c.Response().Header().Set(echo.HeaderContentType, "application/vnd.apple.mpegurl")
c.Response().Header().Set(cacheControl, "max-age=1")
case ".ts", ".mp4":
c.Response().Header().Set(cacheControl, "max-age=3600")

View File

@@ -0,0 +1,81 @@
package httpsrv
import (
"fmt"
"net/http"
"strings"
"github.com/labstack/echo/v4"
"liveflow/media/streamer/egress/thumbnail"
)
// ThumbnailHandler handles thumbnail HTTP requests
type ThumbnailHandler struct {
store *thumbnail.ThumbnailStore
}
// NewThumbnailHandler creates a new thumbnail handler
func NewThumbnailHandler(store *thumbnail.ThumbnailStore) *ThumbnailHandler {
return &ThumbnailHandler{
store: store,
}
}
// HandleThumbnail serves the latest thumbnail for a given stream ID
func (h *ThumbnailHandler) HandleThumbnail(c echo.Context) error {
streamID := c.Param("streamID")
// Fallback: parse URL path directly if Echo param extraction fails
if streamID == "" {
path := c.Request().URL.Path
// Remove /thumbnail/ prefix and extract streamID
if strings.HasPrefix(path, "/thumbnail/") {
remaining := strings.TrimPrefix(path, "/thumbnail/")
// Remove .jpg suffix if present
if strings.HasSuffix(remaining, ".jpg") {
remaining = strings.TrimSuffix(remaining, ".jpg")
}
streamID = remaining
}
}
if streamID == "" {
return c.JSON(http.StatusBadRequest, APIResponse{
ErrorCode: 400,
Message: "stream ID is required",
})
}
// Get thumbnail from memory store
thumbnailData, exists := h.store.Get(streamID)
if !exists {
return c.JSON(http.StatusNotFound, APIResponse{
ErrorCode: 404,
Message: fmt.Sprintf("no thumbnail found for stream %s", streamID),
})
}
// Set appropriate headers
c.Response().Header().Set("Content-Type", "image/jpeg")
c.Response().Header().Set("Cache-Control", "max-age=30") // Cache for 30 seconds
c.Response().Header().Set("Content-Length", fmt.Sprintf("%d", len(thumbnailData.Data)))
// Serve the thumbnail data
return c.Blob(http.StatusOK, "image/jpeg", thumbnailData.Data)
}
// HandleThumbnailWithExtension serves thumbnail with .jpg extension in URL
func (h *ThumbnailHandler) HandleThumbnailWithExtension(c echo.Context) error {
streamIDWithExt := c.Param("streamID")
// Remove .jpg extension if present
streamID := strings.TrimSuffix(streamIDWithExt, ".jpg")
// Set the cleaned stream ID back to the context
c.SetParamNames("streamID")
c.SetParamValues(streamID)
// Call the main handler
return h.HandleThumbnail(c)
}

41
install-ffmpeg-lsan.sh Normal file
View File

@@ -0,0 +1,41 @@
#!/bin/sh
set -ex
# Leak Sanitizer flags
export SANITIZE_FLAGS="-fsanitize=address -g -O1"
mkdir -p /ffmpeg_build
cd /ffmpeg_build
git config --global http.sslVerify false
git clone --depth 1 https://code.videolan.org/videolan/x264.git
cd x264
./configure --prefix="/ffmpeg_build" --enable-static --disable-opencl LDFLAGS="$SANITIZE_FLAGS"
make
make install
cd ..
wget --no-check-certificate -O ffmpeg-7.0.1.tar.bz2 https://ffmpeg.org/releases/ffmpeg-7.0.1.tar.bz2
tar xjf ffmpeg-7.0.1.tar.bz2
cd ffmpeg-7.0.1
PKG_CONFIG_PATH="/ffmpeg_build/lib/pkgconfig" ./configure \
--prefix="/ffmpeg_build" \
--pkg-config-flags="--static" \
--extra-cflags="-I/ffmpeg_build/include" \
--extra-ldflags="$SANITIZE_FLAGS -L/ffmpeg_build/lib" \
--extra-libs="-lpthread -lm" \
--bindir="/usr/local/bin" \
--enable-gpl \
--enable-libx264 \
--enable-nonfree
make -j8
make install
cd ..
# Clean up
rm -rf /ffmpeg_build/src /ffmpeg_build/*.tar.bz2
echo "FFmpeg 7.0.1 with x264 (LSan enabled) has been successfully installed to /ffmpeg_build."

26
lsan.Dockerfile Normal file
View File

@@ -0,0 +1,26 @@
FROM golang:1.23-bullseye
RUN apt-get update && \
apt-get upgrade -y && \
apt-get install -y build-essential git pkg-config libunistring-dev libaom-dev libdav1d-dev bzip2 nasm wget yasm ca-certificates
COPY install-ffmpeg-lsan.sh /install-ffmpeg-lsan.sh
RUN chmod +x /install-ffmpeg-lsan.sh && /install-ffmpeg-lsan.sh
ENV PKG_CONFIG_PATH=/ffmpeg_build/lib/pkgconfig:${PKG_CONFIG_PATH}
ENV PATH="/usr/local/go/bin:${PATH}"
WORKDIR /app
COPY go.mod go.sum ./
ENV CGO_LDFLAGS='-fsanitize=address'
ENV CGO_CFLAGS='-fsanitize=address'
RUN --mount=type=cache,target=/go/pkg/mod \
go mod download
COPY ./ /app
RUN mkdir -p /app/bin/videos
RUN --mount=type=cache,target=/root/.cache/go-build \
go build -o /app
WORKDIR /app
ENV GOGC 10
ENV ASAN_OPTIONS quarantine_size_mb=32
ENTRYPOINT ["/app/liveflow"]

164
main.go
View File

@@ -3,20 +3,22 @@ package main
import (
"context"
"fmt"
"net/http"
_ "net/http/pprof" // pprof을 사용하기 위한 패키지
"os"
"os/signal"
"strconv"
"syscall"
"liveflow/config"
"liveflow/media/streamer/egress/hls"
"liveflow/media/streamer/egress/record/mp4"
"liveflow/media/streamer/egress/record/webm"
"liveflow/media/streamer/egress/whep"
"liveflow/media/streamer/ingress/whip"
"net/http"
"strconv"
_ "net/http/pprof" // pprof을 사용하기 위한 패키지
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
"github.com/pion/webrtc/v3"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
@@ -24,12 +26,23 @@ import (
"liveflow/log"
"liveflow/media/hlshub"
"liveflow/media/hub"
"liveflow/media/streamer/egress/thumbnail"
"liveflow/media/streamer/ingress/rtmp"
"liveflow/media/streamer/ingress/whip"
)
/*
#include <stdio.h>
#include <stdlib.h>
void __lsan_do_leak_check(void);
void leak_bit() {
int *p = (int *)malloc(sizeof(int));
}
*/
import "C"
// RTMP 받으면 자동으로 Service 서비스 동작, 녹화 서비스까지~?
func main() {
ctx := context.Background()
viper.SetConfigName("config") // name of config file (without extension)
viper.SetConfigType("toml") // REQUIRED if the config file does not have the extension in the name
viper.AddConfigPath(".") // optionally look for config in the working directory
@@ -45,82 +58,135 @@ func main() {
}
fmt.Printf("Config: %+v\n", conf)
log.Init()
//log.SetCaller(ctx, true)
//log.SetFormatter(ctx, &logrus.JSONFormatter{
// TimestampFormat: "2006-01-02 15:04:05",
//})
ctx, cancel := context.WithCancel(context.Background())
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-sigs
log.Info(ctx, "Received signal:", sig)
log.Info(ctx, "Initiating shutdown...")
//C.__lsan_do_leak_check()
cancel()
}()
ctx = log.WithFields(ctx, logrus.Fields{
"app": "liveflow",
})
log.Info(ctx, "liveflow is started")
hub := hub.NewHub()
sourceHub := hub.NewHub()
var tracks map[string][]*webrtc.TrackLocalStaticRTP
tracks = make(map[string][]*webrtc.TrackLocalStaticRTP)
// ingress
// Egress 서비스는 streamID 알림을 구독하여 처리 시작
// Egress is started by streamID notification
hlsHub := hlshub.NewHLSHub()
thumbnailStore := thumbnail.NewThumbnailStore()
go func() {
api := echo.New()
api.HideBanner = true
hlsHub := hlshub.NewHLSHub()
hlsHandler := httpsrv.NewHandler(hlsHub)
api.GET("/prometheus", echo.WrapHandler(promhttp.Handler()))
api.GET("/debug/pprof/*", echo.WrapHandler(http.DefaultServeMux))
api.GET("/hls/:streamID/master.m3u8", hlsHandler.HandleMasterM3U8)
api.GET("/hls/:streamID/:playlistName/stream.m3u8", hlsHandler.HandleM3U8)
api.GET("/hls/:streamID/:playlistName/:resourceName", hlsHandler.HandleM3U8)
thumbnailHandler := httpsrv.NewThumbnailHandler(thumbnailStore)
api.Use(middleware.Logger())
// 1. API routes
hlsRoute := api.Group("/hls", middleware.CORSWithConfig(middleware.CORSConfig{
AllowOrigins: []string{"*"}, // Adjust origins as necessary
AllowMethods: []string{http.MethodGet, http.MethodHead, http.MethodOptions},
}))
hlsRoute.GET("/:streamID/master.m3u8", hlsHandler.HandleMasterM3U8)
hlsRoute.GET("/:streamID/:playlistName/stream.m3u8", hlsHandler.HandleM3U8)
hlsRoute.GET("/:streamID/:playlistName/:resourceName", hlsHandler.HandleM3U8)
api.GET("/streams", hlsHandler.HandleListStreams)
whipServer := whip.NewWHIP(whip.WHIPArgs{
Hub: hub,
Hub: sourceHub,
Tracks: tracks,
DockerMode: conf.Docker.Mode,
Echo: api,
})
whipServer.RegisterRoute()
// Thumbnail routes - simplified without middleware
api.GET("/thumbnail/:streamID", thumbnailHandler.HandleThumbnail)
api.GET("/thumbnail/:streamID.jpg", thumbnailHandler.HandleThumbnailWithExtension)
// 2. Serve static files for specific paths only - avoid wildcard conflicts
// api.Static("/static", "front/dist")
// Use more specific routes to avoid interfering with API routes
api.GET("/assets/*", func(c echo.Context) error {
return c.File("front/dist" + c.Request().URL.Path)
})
// SPA fallback - serve index.html for all unmatched routes
// This must be registered LAST to act as a catch-all
api.GET("/*", func(c echo.Context) error {
return c.File("front/dist/index.html")
})
go func() {
fmt.Println("----------------", conf.Service.Port)
api.Start("0.0.0.0:" + strconv.Itoa(conf.Service.Port))
}()
type Starter interface {
Start(ctx context.Context, source hub.Source) error
Name() string
}
// ingress 의 rtmp, whip 서비스로부터 streamID를 받아 Service, ContainerMP4, WHEP 서비스 시작
for source := range hub.SubscribeToStreamID() {
for source := range sourceHub.SubscribeToStreamID() {
log.Infof(ctx, "New streamID received: %s", source.StreamID())
mp4 := mp4.NewMP4(mp4.MP4Args{
Hub: hub,
})
err = mp4.Start(ctx, source)
if err != nil {
log.Errorf(ctx, "failed to start mp4: %v", err)
var starters []Starter
if conf.MP4.Record {
starters = append(starters, mp4.NewMP4(mp4.MP4Args{
Hub: sourceHub,
SplitIntervalMS: 3000,
}))
}
webmStarter := webm.NewWEBM(webm.WebMArgs{
Hub: hub,
})
err = webmStarter.Start(ctx, source)
if err != nil {
log.Errorf(ctx, "failed to start webm: %v", err)
if conf.EBML.Record {
starters = append(starters, webm.NewWEBM(webm.WebMArgs{
Hub: sourceHub,
SplitIntervalMS: 6000,
StreamID: source.StreamID(),
}))
}
hls := hls.NewHLS(hls.HLSArgs{
Hub: hub,
if conf.Thumbnail.Enable {
starters = append(starters, thumbnail.NewThumbnail(thumbnail.ThumbnailArgs{
Hub: sourceHub,
Store: thumbnailStore,
OutputPath: conf.Thumbnail.OutputPath,
IntervalSeconds: conf.Thumbnail.IntervalSeconds,
Width: conf.Thumbnail.Width,
Height: conf.Thumbnail.Height,
}))
}
starters = append(starters, hls.NewHLS(hls.HLSArgs{
Hub: sourceHub,
HLSHub: hlsHub,
Port: conf.Service.Port,
LLHLS: conf.Service.LLHLS,
DiskRam: conf.Service.DiskRam,
})
err := hls.Start(ctx, source)
if err != nil {
log.Errorf(ctx, "failed to start hls: %v", err)
}
whep := whep.NewWHEP(whep.WHEPArgs{
}))
starters = append(starters, whep.NewWHEP(whep.WHEPArgs{
Tracks: tracks,
Hub: hub,
})
err = whep.Start(ctx, source)
if err != nil {
log.Errorf(ctx, "failed to start whep: %v", err)
Hub: sourceHub,
}))
for _, starter := range starters {
if err := starter.Start(ctx, source); err != nil {
log.Errorf(ctx, "failed to start %s for stream %s: %v", starter.Name(), source.StreamID(), err)
}
}
}
}()
rtmpServer := rtmp.NewRTMP(rtmp.RTMPArgs{
Hub: hub,
Port: conf.RTMP.Port,
Hub: sourceHub,
Port: conf.RTMP.Port,
HLSHub: hlsHub,
})
rtmpServer.Serve(ctx)
}

View File

@@ -63,3 +63,13 @@ func (s *HLSHub) MuxersByWorkID(workID string) (map[string]*gohlslib.Muxer, erro
}
return muxers, nil
}
func (s *HLSHub) WorkIDs() []string {
s.mu.RLock()
defer s.mu.RUnlock()
keys := make([]string, 0, len(s.hlsMuxers))
for k := range s.hlsMuxers {
keys = append(keys, k)
}
return keys
}

View File

@@ -17,7 +17,7 @@ type H264Video struct {
Data []byte
SPS []byte
PPS []byte
SliceType SliceType
SliceTypes []SliceType
CodecData []byte
}

View File

@@ -1,5 +1,10 @@
package hub
// #include <stdio.h>
// #include <stdlib.h>
//
// void __lsan_do_leak_check(void);
import "C"
import (
"context"
"fmt"
@@ -101,7 +106,8 @@ func (h *Hub) Publish(streamID string, data *FrameData) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
for _, ch := range h.streams[streamID] {
channels := h.streams[streamID]
for _, ch := range channels {
select {
case ch <- data:
case <-ctx.Done():
@@ -122,6 +128,7 @@ func (h *Hub) Unpublish(streamID string) {
close(ch)
}
delete(h.streams, streamID)
//checkLeak()
}
// Subscribe : Subscribes to the given streamID.
@@ -129,7 +136,7 @@ func (h *Hub) Subscribe(streamID string) <-chan *FrameData {
h.mu.RLock()
defer h.mu.RUnlock()
ch := make(chan *FrameData)
ch := make(chan *FrameData, 1000)
h.streams[streamID] = append(h.streams[streamID], ch)
return ch
}

View File

@@ -4,9 +4,10 @@ import (
"context"
"errors"
"fmt"
"liveflow/media/streamer/processes"
"time"
"liveflow/media/streamer/processes"
"github.com/asticode/go-astiav"
"github.com/bluenviron/gohlslib"
"github.com/bluenviron/gohlslib/pkg/codecs"
@@ -70,7 +71,8 @@ func (h *HLS) Start(ctx context.Context, source hub.Source) error {
fields.SourceName: source.Name(),
})
log.Info(ctx, "start hls")
log.Info(ctx, "view url: ", fmt.Sprintf("http://127.0.0.1:%d/hls/%s/master.m3u8", h.port, source.StreamID()))
log.Info(ctx, "view url: ",
fmt.Sprintf("http://localhost:8044/player/%s", source.StreamID()))
sub := h.hub.Subscribe(source.StreamID())
go func() {
@@ -98,11 +100,43 @@ func (h *HLS) Start(ctx context.Context, source hub.Source) error {
h.onVideo(ctx, data.H264Video)
}
}
if audioTranscodingProcess != nil {
log.Info(ctx, "draining audio transcoding process for HLS")
packets, err := audioTranscodingProcess.Drain()
if err != nil {
log.Error(ctx, err, "failed to drain audio transcoder for HLS")
}
for _, packet := range packets {
h.onAudio(ctx, source, &hub.AACAudio{
Data: packet.Data,
SequenceHeader: false,
MPEG4AudioConfigBytes: h.mpeg4AudioConfigBytes,
MPEG4AudioConfig: h.mpeg4AudioConfig,
PTS: packet.PTS,
DTS: packet.DTS,
AudioClockRate: uint32(packet.SampleRate),
})
}
log.Info(ctx, "audio transcoding process for HLS drained")
}
log.Info(ctx, "[HLS] end of streamID: ", source.StreamID())
if h.muxer != nil {
h.muxer.Close()
}
}()
return nil
}
func (h *HLS) onVideo(ctx context.Context, h264Video *hub.H264Video) {
if h.muxer != nil {
au, _ := h264parser.SplitNALUs(h264Video.Data)
err := h.muxer.WriteH264(time.Now(), time.Duration(h264Video.RawDTS())*time.Millisecond, au)
if err != nil {
log.Errorf(ctx, "failed to write h264: %v", err)
}
}
}
func (h *HLS) onAudio(ctx context.Context, source hub.Source, aacAudio *hub.AACAudio) {
if len(aacAudio.MPEG4AudioConfigBytes) > 0 {
if h.muxer == nil {
@@ -124,17 +158,6 @@ func (h *HLS) onAudio(ctx context.Context, source hub.Source, aacAudio *hub.AACA
h.muxer.WriteMPEG4Audio(time.Now(), time.Duration(aacAudio.RawDTS())*time.Millisecond, [][]byte{audioData})
}
}
func (h *HLS) onVideo(ctx context.Context, h264Video *hub.H264Video) {
if h.muxer != nil {
au, _ := h264parser.SplitNALUs(h264Video.Data)
err := h.muxer.WriteH264(time.Now(), time.Duration(h264Video.RawDTS())*time.Millisecond, au)
if err != nil {
log.Errorf(ctx, "failed to write h264: %v", err)
}
}
}
func (h *HLS) onOPUSAudio(ctx context.Context, source hub.Source, audioTranscodingProcess *processes.AudioTranscodingProcess, opusAudio *hub.OPUSAudio) {
packets, err := audioTranscodingProcess.Process(&processes.MediaPacket{
Data: opusAudio.Data,
@@ -189,3 +212,7 @@ func (h *HLS) makeMuxer(extraData []byte) (*gohlslib.Muxer, error) {
}
return muxer, nil
}
func (h *HLS) Name() string {
return "hls-server"
}

View File

@@ -5,14 +5,13 @@ import (
"context"
"errors"
"fmt"
"io"
"os"
"time"
"liveflow/media/streamer/egress/record"
"liveflow/media/streamer/processes"
"math/rand"
"os"
astiav "github.com/asticode/go-astiav"
"github.com/deepch/vdk/codec/aacparser"
"github.com/sirupsen/logrus"
gomp4 "github.com/yapingcat/gomedia/go-mp4"
@@ -31,56 +30,6 @@ const (
audioSampleRate = 48000
)
type cacheWriterSeeker struct {
buf []byte
offset int
}
func newCacheWriterSeeker(capacity int) *cacheWriterSeeker {
return &cacheWriterSeeker{
buf: make([]byte, 0, capacity),
offset: 0,
}
}
func (ws *cacheWriterSeeker) Write(p []byte) (n int, err error) {
if cap(ws.buf)-ws.offset >= len(p) {
if len(ws.buf) < ws.offset+len(p) {
ws.buf = ws.buf[:ws.offset+len(p)]
}
copy(ws.buf[ws.offset:], p)
ws.offset += len(p)
return len(p), nil
}
tmp := make([]byte, len(ws.buf), cap(ws.buf)+len(p)*2)
copy(tmp, ws.buf)
if len(ws.buf) < ws.offset+len(p) {
tmp = tmp[:ws.offset+len(p)]
}
copy(tmp[ws.offset:], p)
ws.buf = tmp
ws.offset += len(p)
return len(p), nil
}
func (ws *cacheWriterSeeker) Seek(offset int64, whence int) (int64, error) {
if whence == io.SeekCurrent {
if ws.offset+int(offset) > len(ws.buf) {
return -1, errors.New(fmt.Sprint("SeekCurrent out of range", len(ws.buf), offset, ws.offset))
}
ws.offset += int(offset)
return int64(ws.offset), nil
} else if whence == io.SeekStart {
if offset > int64(len(ws.buf)) {
return -1, errors.New(fmt.Sprint("SeekStart out of range", len(ws.buf), offset, ws.offset))
}
ws.offset = int(offset)
return offset, nil
} else {
return 0, errors.New("unsupport SeekEnd")
}
}
type MP4 struct {
hub *hub.Hub
muxer *gomp4.Movmuxer
@@ -91,15 +40,24 @@ type MP4 struct {
audioIndex uint32
mpeg4AudioConfigBytes []byte
mpeg4AudioConfig *aacparser.MPEG4AudioConfig
streamID string
// New fields for splitting
splitIntervalMS int64
lastSplitTime int64
fileIndex int
splitPending bool // Indicates if a split is pending
}
type MP4Args struct {
Hub *hub.Hub
Hub *hub.Hub
SplitIntervalMS int64
}
func NewMP4(args MP4Args) *MP4 {
return &MP4{
hub: args.Hub,
hub: args.Hub,
splitIntervalMS: args.SplitIntervalMS,
}
}
@@ -110,6 +68,7 @@ func (m *MP4) Start(ctx context.Context, source hub.Source) error {
if !hub.HasCodecType(source.MediaSpecs(), hub.CodecTypeH264) {
return ErrUnsupportedCodec
}
m.streamID = source.StreamID()
ctx = log.WithFields(ctx, logrus.Fields{
fields.StreamID: source.StreamID(),
fields.SourceName: source.Name(),
@@ -118,26 +77,25 @@ func (m *MP4) Start(ctx context.Context, source hub.Source) error {
sub := m.hub.Subscribe(source.StreamID())
go func() {
var err error
mp4File, err := record.CreateFileInDir(fmt.Sprintf("videos/%d.mp4", rand.Int()))
// Initialize the splitting logic
m.fileIndex = 0
err = m.createNewFile(ctx)
if err != nil {
fmt.Println(err)
log.Error(ctx, err, "failed to create mp4 file")
return
}
defer func() {
err := mp4File.Close()
if err != nil {
log.Error(ctx, err, "failed to close mp4 file")
}
}()
muxer, err := gomp4.CreateMp4Muxer(mp4File)
if err != nil {
fmt.Println(err)
return
}
m.muxer = muxer
defer m.closeFile(ctx)
var audioTranscodingProcess *processes.AudioTranscodingProcess
for data := range sub {
// Check if we need to initiate a split
if data.H264Video != nil {
if !m.splitPending && data.H264Video.RawDTS()-m.lastSplitTime >= m.splitIntervalMS {
m.splitPending = true
}
}
if data.H264Video != nil {
m.onVideo(ctx, data.H264Video)
}
@@ -160,22 +118,113 @@ func (m *MP4) Start(ctx context.Context, source hub.Source) error {
}
}
}
err = muxer.WriteTrailer()
if err != nil {
log.Error(ctx, err, "failed to write trailer")
if audioTranscodingProcess != nil {
log.Info(ctx, "draining audio transcoding process")
// Drain the remaining frames from the transcoder
packets, err := audioTranscodingProcess.Drain()
if err != nil {
log.Error(ctx, err, "failed to drain audio transcoder")
}
for _, packet := range packets {
m.onAudio(ctx, &hub.AACAudio{
Data: packet.Data,
SequenceHeader: false,
MPEG4AudioConfigBytes: m.mpeg4AudioConfigBytes,
MPEG4AudioConfig: m.mpeg4AudioConfig,
PTS: packet.PTS,
DTS: packet.DTS,
AudioClockRate: uint32(packet.SampleRate),
})
}
log.Info(ctx, "audio transcoding process drained")
} else {
log.Info(ctx, "no audio transcoding process to drain")
}
}()
return nil
}
// createNewFile creates a new MP4 file and initializes the muxer
func (m *MP4) createNewFile(ctx context.Context) error {
var err error
m.closeFile(ctx) // Close previous file if any
timestamp := time.Now().Format("2006-01-02-15-04-05")
fileName := fmt.Sprintf("videos/%s_%s.mp4", m.streamID, timestamp)
m.tempFile, err = record.CreateFileInDir(fileName)
if err != nil {
return err
}
m.muxer, err = gomp4.CreateMp4Muxer(m.tempFile)
if err != nil {
return err
}
m.hasVideo = false
m.hasAudio = false
m.videoIndex = 0
m.audioIndex = 0
m.lastSplitTime = 0
m.fileIndex++
return nil
}
// closeFile closes the current MP4 file and muxer
func (m *MP4) closeFile(ctx context.Context) {
if m.muxer != nil {
log.Info(ctx, "writing mp4 trailer")
err := m.muxer.WriteTrailer()
if err != nil {
log.Error(ctx, err, "failed to write trailer")
}
m.muxer = nil
}
if m.tempFile != nil {
err := m.tempFile.Close()
if err != nil {
log.Error(ctx, err, "failed to close mp4 file")
}
log.Info(ctx, "mp4 file closed")
m.tempFile = nil
}
}
// splitFile handles the logic to split the MP4 file
func (m *MP4) splitFile(ctx context.Context) error {
// Close current file
m.closeFile(ctx)
// Create a new file
return m.createNewFile(ctx)
}
func (m *MP4) onVideo(ctx context.Context, h264Video *hub.H264Video) {
// Check if this is a keyframe
isKeyFrame := false
for _, sliceType := range h264Video.SliceTypes {
if sliceType == hub.SliceI {
isKeyFrame = true
break
}
}
// If a split is pending and we have a keyframe, perform the split
if m.splitPending && isKeyFrame {
err := m.splitFile(ctx)
if err != nil {
log.Error(ctx, err, "failed to split mp4 file")
return
}
m.lastSplitTime = h264Video.RawDTS()
m.splitPending = false // Reset the split pending flag
}
if !m.hasVideo {
m.hasVideo = true
m.videoIndex = m.muxer.AddVideoTrack(gomp4.MP4_CODEC_H264)
}
videoData := make([]byte, len(h264Video.Data))
copy(videoData, h264Video.Data)
err := m.muxer.Write(m.videoIndex, videoData, uint64(h264Video.RawPTS()), uint64(h264Video.RawDTS()))
err := m.muxer.Write(m.videoIndex, videoData, uint64(h264Video.RawPTS()-m.lastSplitTime), uint64(h264Video.RawDTS()-m.lastSplitTime))
if err != nil {
log.Error(ctx, err, "failed to write video")
}
@@ -201,7 +250,7 @@ func (m *MP4) onAudio(ctx context.Context, aacAudio *hub.AACAudio) {
adtsHeader := make([]byte, adtsHeaderSize)
aacparser.FillADTSHeader(adtsHeader, *m.mpeg4AudioConfig, aacSamples, len(aacAudio.Data))
audioData = append(adtsHeader, aacAudio.Data...)
err := m.muxer.Write(m.audioIndex, audioData, uint64(aacAudio.RawPTS()), uint64(aacAudio.RawDTS()))
err := m.muxer.Write(m.audioIndex, audioData, uint64(aacAudio.RawPTS()-m.lastSplitTime), uint64(aacAudio.RawDTS()-m.lastSplitTime))
if err != nil {
log.Error(ctx, err, "failed to write audio")
}
@@ -229,3 +278,7 @@ func (m *MP4) onOPUSAudio(ctx context.Context, audioTranscodingProcess *processe
})
}
}
func (m *MP4) Name() string {
return "mp4-recorder"
}

View File

@@ -0,0 +1,57 @@
package mp4
import (
"errors"
"fmt"
"io"
)
type cacheWriterSeeker struct {
buf []byte
offset int
}
func newCacheWriterSeeker(capacity int) *cacheWriterSeeker {
return &cacheWriterSeeker{
buf: make([]byte, 0, capacity),
offset: 0,
}
}
func (ws *cacheWriterSeeker) Write(p []byte) (n int, err error) {
if cap(ws.buf)-ws.offset >= len(p) {
if len(ws.buf) < ws.offset+len(p) {
ws.buf = ws.buf[:ws.offset+len(p)]
}
copy(ws.buf[ws.offset:], p)
ws.offset += len(p)
return len(p), nil
}
tmp := make([]byte, len(ws.buf), cap(ws.buf)+len(p)*2)
copy(tmp, ws.buf)
if len(ws.buf) < ws.offset+len(p) {
tmp = tmp[:ws.offset+len(p)]
}
copy(tmp[ws.offset:], p)
ws.buf = tmp
ws.offset += len(p)
return len(p), nil
}
func (ws *cacheWriterSeeker) Seek(offset int64, whence int) (int64, error) {
if whence == io.SeekCurrent {
if ws.offset+int(offset) > len(ws.buf) {
return -1, errors.New(fmt.Sprint("SeekCurrent out of range", len(ws.buf), offset, ws.offset))
}
ws.offset += int(offset)
return int64(ws.offset), nil
} else if whence == io.SeekStart {
if offset > int64(len(ws.buf)) {
return -1, errors.New(fmt.Sprint("SeekStart out of range", len(ws.buf), offset, ws.offset))
}
ws.offset = int(offset)
return offset, nil
} else {
return 0, errors.New("unsupport SeekEnd")
}
}

View File

@@ -6,12 +6,13 @@ import (
"fmt"
"liveflow/log"
"liveflow/media/hub"
"liveflow/media/streamer/egress/record"
"liveflow/media/streamer/fields"
"liveflow/media/streamer/processes"
"time"
"github.com/asticode/go-astiav"
"github.com/deepch/vdk/codec/aacparser"
"github.com/pion/webrtc/v3"
"github.com/sirupsen/logrus"
)
@@ -24,19 +25,28 @@ const (
)
type WebMArgs struct {
Tracks map[string][]*webrtc.TrackLocalStaticRTP
Hub *hub.Hub
Hub *hub.Hub
SplitIntervalMS int64 // Add SplitIntervalMS to arguments
StreamID string // Add StreamID
}
type WebM struct {
hub *hub.Hub
webmMuxer *EBMLMuxer
samples int
hub *hub.Hub
webmMuxer *EBMLMuxer
samples int
splitIntervalMS int64
lastSplitTime int64
splitPending bool
streamID string
audioTranscodingProcess *processes.AudioTranscodingProcess
mediaSpecs []hub.MediaSpec
}
func NewWEBM(args WebMArgs) *WebM {
return &WebM{
hub: args.Hub,
hub: args.Hub,
splitIntervalMS: args.SplitIntervalMS,
streamID: args.StreamID,
}
}
@@ -51,59 +61,148 @@ func (w *WebM) Start(ctx context.Context, source hub.Source) error {
if err != nil {
return err
}
w.mediaSpecs = source.MediaSpecs()
ctx = log.WithFields(ctx, logrus.Fields{
fields.StreamID: source.StreamID(),
fields.SourceName: source.Name(),
})
muxer := NewEBMLMuxer(int(audioClockRate), 2, ContainerMKV)
err = muxer.Init(ctx)
if err != nil {
return err
}
log.Info(ctx, "start webm")
sub := w.hub.Subscribe(source.StreamID())
go func() {
var audioTranscodingProcess *processes.AudioTranscodingProcess
// Initialize splitting logic
err := w.createNewMuxer(ctx, int(audioClockRate))
if err != nil {
log.Error(ctx, err, "failed to create webm muxer")
return
}
// Initialize audio transcoding process if needed
if hub.HasCodecType(source.MediaSpecs(), hub.CodecTypeAAC) {
w.audioTranscodingProcess = processes.NewTranscodingProcess(astiav.CodecIDAac, astiav.CodecIDOpus, audioSampleRate)
w.audioTranscodingProcess.Init()
defer w.audioTranscodingProcess.Close()
}
for data := range sub {
// Check if we need to initiate a split
if data.H264Video != nil {
w.onVideo(ctx, muxer, data.H264Video)
if !w.splitPending && data.H264Video.RawDTS()-w.lastSplitTime >= w.splitIntervalMS {
w.splitPending = true
}
w.onVideo(ctx, data.H264Video)
}
if data.AACAudio != nil {
if audioTranscodingProcess == nil {
audioTranscodingProcess = processes.NewTranscodingProcess(astiav.CodecIDAac, astiav.CodecIDOpus, audioSampleRate)
audioTranscodingProcess.Init()
defer audioTranscodingProcess.Close()
}
w.onAACAudio(ctx, muxer, data.AACAudio, audioTranscodingProcess)
w.onAACAudio(ctx, data.AACAudio)
} else if data.OPUSAudio != nil {
w.onAudio(ctx, muxer, data.OPUSAudio)
w.onAudio(ctx, data.OPUSAudio)
}
}
err = muxer.Finalize(ctx)
if err != nil {
log.Error(ctx, err, "failed to finalize")
if w.audioTranscodingProcess != nil {
log.Info(ctx, "draining audio transcoding process for WebM")
packets, err := w.audioTranscodingProcess.Drain()
if err != nil {
log.Error(ctx, err, "failed to drain audio transcoder for WebM")
}
for _, packet := range packets {
w.onAudio(ctx, &hub.OPUSAudio{
Data: packet.Data,
PTS: packet.PTS,
DTS: packet.DTS,
AudioClockRate: uint32(packet.SampleRate),
})
}
log.Info(ctx, "audio transcoding process for WebM drained")
}
// Ensure the muxer is finalized
w.closeMuxer(ctx)
}()
return nil
}
func (w *WebM) onVideo(ctx context.Context, muxer *EBMLMuxer, data *hub.H264Video) {
keyFrame := data.SliceType == hub.SliceI
err := muxer.WriteVideo(data.Data, keyFrame, uint64(data.RawPTS()), uint64(data.RawDTS()))
// createNewMuxer initializes a new EBMLMuxer
func (w *WebM) createNewMuxer(ctx context.Context, audioClockRate int) error {
// Initialize new muxer
w.webmMuxer = NewEBMLMuxer(audioClockRate, 2, ContainerMKV)
err := w.webmMuxer.Init(ctx)
if err != nil {
return err
}
return nil
}
// closeMuxer finalizes the current muxer and writes to the output file
func (w *WebM) closeMuxer(ctx context.Context) {
if w.webmMuxer != nil {
// Create output file with timestamp
timestamp := time.Now().Format("2006-01-02-15-04-05")
fileName := fmt.Sprintf("videos/%s_%s.mkv", w.streamID, timestamp)
outputFile, err := record.CreateFileInDir(fileName)
if err != nil {
log.Error(ctx, err, "failed to create output file")
return
}
defer outputFile.Close()
// Finalize muxer with output file
err = w.webmMuxer.Finalize(ctx, outputFile)
if err != nil {
log.Error(ctx, err, "failed to finalize muxer")
}
w.webmMuxer = nil
}
}
// splitMuxer handles the logic to split the WebM file
func (w *WebM) splitMuxer(ctx context.Context) error {
// Close current muxer
w.closeMuxer(ctx)
// Create a new muxer
audioClockRate, err := hub.AudioClockRate(w.mediaSpecs)
if err != nil {
return err
}
return w.createNewMuxer(ctx, int(audioClockRate))
}
func (w *WebM) onVideo(ctx context.Context, data *hub.H264Video) {
keyFrame := false
for _, sliceType := range data.SliceTypes {
if sliceType == hub.SliceI {
keyFrame = true
break
}
}
// If a split is pending and we have a keyframe, perform the split
if w.splitPending && keyFrame {
err := w.splitMuxer(ctx)
if err != nil {
log.Error(ctx, err, "failed to split webm file")
return
}
w.lastSplitTime = data.RawDTS()
w.splitPending = false // Reset the split pending flag
}
err := w.webmMuxer.WriteVideo(data.Data, keyFrame, uint64(data.RawPTS()-w.lastSplitTime), uint64(data.RawDTS()-w.lastSplitTime))
if err != nil {
log.Error(ctx, err, "failed to write video")
}
}
func (w *WebM) onAudio(ctx context.Context, muxer *EBMLMuxer, data *hub.OPUSAudio) {
err := muxer.WriteAudio(data.Data, false, uint64(data.RawPTS()), uint64(data.RawDTS()))
func (w *WebM) onAudio(ctx context.Context, data *hub.OPUSAudio) {
fmt.Println("dts: ", data.RawDTS())
err := w.webmMuxer.WriteAudio(data.Data, false, uint64(data.RawPTS()-w.lastSplitTime), uint64(data.RawDTS()-w.lastSplitTime))
if err != nil {
log.Error(ctx, err, "failed to write audio")
}
}
func (w *WebM) onAACAudio(ctx context.Context, muxer *EBMLMuxer, aac *hub.AACAudio, transcodingProcess *processes.AudioTranscodingProcess) {
func (w *WebM) onAACAudio(ctx context.Context, aac *hub.AACAudio) {
if len(aac.Data) == 0 {
log.Warn(ctx, "no data")
return
@@ -120,7 +219,7 @@ func (w *WebM) onAACAudio(ctx context.Context, muxer *EBMLMuxer, aac *hub.AACAud
aacparser.FillADTSHeader(adtsHeader, *aac.MPEG4AudioConfig, aacSamples, len(aac.Data))
audioDataWithADTS := append(adtsHeader, aac.Data...)
packets, err := transcodingProcess.Process(&processes.MediaPacket{
packets, err := w.audioTranscodingProcess.Process(&processes.MediaPacket{
Data: audioDataWithADTS,
PTS: aac.PTS,
DTS: aac.DTS,
@@ -129,7 +228,7 @@ func (w *WebM) onAACAudio(ctx context.Context, muxer *EBMLMuxer, aac *hub.AACAud
fmt.Println(err)
}
for _, packet := range packets {
w.onAudio(ctx, muxer, &hub.OPUSAudio{
w.onAudio(ctx, &hub.OPUSAudio{
Data: packet.Data,
PTS: packet.PTS,
DTS: packet.DTS,
@@ -137,3 +236,7 @@ func (w *WebM) onAACAudio(ctx context.Context, muxer *EBMLMuxer, aac *hub.AACAud
})
}
}
func (w *WebM) Name() string {
return "webm-recorder"
}

View File

@@ -5,10 +5,9 @@ import (
"encoding/binary"
"errors"
"fmt"
"liveflow/log"
"liveflow/media/streamer/egress/record"
"io"
"io/ioutil"
"math"
"math/rand"
"os"
"github.com/at-wat/ebml-go"
@@ -44,11 +43,11 @@ const (
type EBMLMuxer struct {
writers []mkvcore.BlockWriteCloser
tempFile *os.File
container Name
tempFileName string
audioSampleRate float64
audioChannels uint64
durationPos uint64
durationPos int64
duration int64
audioStreamIndex int
videoStreamIndex int
@@ -57,7 +56,6 @@ type EBMLMuxer struct {
func NewEBMLMuxer(sampleRate int, channels int, container Name) *EBMLMuxer {
return &EBMLMuxer{
writers: nil,
tempFileName: "",
audioSampleRate: float64(sampleRate),
audioChannels: uint64(channels),
durationPos: 0,
@@ -71,12 +69,9 @@ func (w *EBMLMuxer) makeWebmWriters() ([]mkvcore.BlockWriteCloser, error) {
trackTypeVideo = 1
trackTypeAudio = 2
)
tempFile, err := record.CreateFileInDir(fmt.Sprintf("videos/%d.webm", rand.Int()))
if err != nil {
return nil, err
}
w.audioStreamIndex = 0
w.videoStreamIndex = 1
trackEntries := []webm.TrackEntry{
{
Name: trackNameAudio,
@@ -101,31 +96,30 @@ func (w *EBMLMuxer) makeWebmWriters() ([]mkvcore.BlockWriteCloser, error) {
},
},
}
writers, err := webm.NewSimpleBlockWriter(tempFile, trackEntries,
var err error
w.tempFile, err = ioutil.TempFile("", "ebmlmuxer-*.webm")
if err != nil {
return nil, err
}
writers, err := webm.NewSimpleBlockWriter(w.tempFile, trackEntries,
mkvcore.WithSeekHead(true),
mkvcore.WithOnErrorHandler(func(err error) {
log.Error(context.Background(), err, "failed to construct webm writer (error)")
}),
mkvcore.WithOnFatalHandler(func(err error) {
log.Error(context.Background(), err, "failed to construct webm writer (fatal)")
}),
mkvcore.WithSegmentInfo(&webm.Info{
TimecodeScale: defaultTimecode, // 1ms
MuxingApp: "mrw-v4.ebml-go.webm",
WritingApp: "mrw-v4.ebml-go.webm",
Duration: defaultDuration, // Arbitrarily set to default videoSplitIntervalMs, final value is adjusted in writeTrailer.
MuxingApp: "your_app_name",
WritingApp: "your_app_name",
Duration: defaultDuration, // Placeholder duration; final value is adjusted in overwritePTS.
}),
mkvcore.WithMarshalOptions(ebml.WithElementWriteHooks(func(e *ebml.Element) {
switch e.Name {
case "Duration":
w.durationPos = e.Position + 4 // Duration header size = 3, SegmentInfo header size delta = 1
if e.Name == "Duration" {
w.durationPos = int64(e.Position + 4) // Adjust position to overwrite duration later.
}
})),
)
if err != nil {
return nil, err
}
w.tempFileName = tempFile.Name()
var mkvWriters []mkvcore.BlockWriteCloser
for _, writer := range writers {
mkvWriters = append(mkvWriters, writer)
@@ -138,67 +132,62 @@ func (w *EBMLMuxer) makeMKVWriters() ([]mkvcore.BlockWriteCloser, error) {
trackTypeVideo = 1
trackTypeAudio = 2
)
tempFile, err := record.CreateFileInDir(fmt.Sprintf("videos/%d.mkv", rand.Int()))
w.audioStreamIndex = 0
w.videoStreamIndex = 1
mkvTrackDesc := []mkvcore.TrackDescription{
{
TrackNumber: 1,
TrackEntry: webm.TrackEntry{
Name: trackNameAudio,
TrackNumber: 1,
TrackUID: 1,
CodecID: codecIDOPUS,
TrackType: trackTypeAudio,
Audio: &webm.Audio{
SamplingFrequency: w.audioSampleRate,
Channels: w.audioChannels,
},
},
},
{
TrackNumber: webmVideoTrackNumber,
TrackEntry: webm.TrackEntry{
TrackNumber: webmVideoTrackNumber,
TrackUID: webmVideoTrackNumber,
TrackType: trackTypeVideo,
Name: trackNameVideo,
CodecID: codecIDH264,
DefaultDuration: 0,
},
},
}
var err error
w.tempFile, err = ioutil.TempFile("/tmp", "ebmlmuxer-*.mkv")
if err != nil {
return nil, err
}
var mkvTrackDesc []mkvcore.TrackDescription
w.audioStreamIndex = 0
w.videoStreamIndex = 1
mkvTrackDesc = append(mkvTrackDesc, mkvcore.TrackDescription{
TrackNumber: 1,
TrackEntry: webm.TrackEntry{
Name: trackNameAudio,
TrackNumber: 1,
TrackUID: 1,
CodecID: codecIDOPUS,
TrackType: trackTypeAudio,
Audio: &webm.Audio{
SamplingFrequency: w.audioSampleRate,
Channels: 2,
},
},
})
mkvTrackDesc = append(mkvTrackDesc, mkvcore.TrackDescription{
TrackNumber: webmVideoTrackNumber,
TrackEntry: webm.TrackEntry{
TrackNumber: webmVideoTrackNumber,
TrackUID: webmVideoTrackNumber,
TrackType: trackTypeVideo,
DefaultDuration: 0,
Name: trackNameVideo,
CodecID: codecIDH264,
SeekPreRoll: 0,
// TODO: The resolution may need to be written later, but it works fine without it for now.
//Video: &webm.Video{
// PixelWidth: 1280,
// PixelHeight: 720,
//},
},
})
var mkvWriters []mkvcore.BlockWriteCloser
mkvWriters, err = mkvcore.NewSimpleBlockWriter(tempFile, mkvTrackDesc,
writers, err := mkvcore.NewSimpleBlockWriter(w.tempFile, mkvTrackDesc,
mkvcore.WithSeekHead(true),
mkvcore.WithEBMLHeader(mkv.DefaultEBMLHeader),
mkvcore.WithSegmentInfo(&webm.Info{
TimecodeScale: defaultTimecode, // 1ms
MuxingApp: "mrw-v4.ebml-go.mkv",
WritingApp: "mrw-v4.ebml-go.mkv",
Duration: defaultDuration, // Arbitrarily set to default videoSplitIntervalMs, final value is adjusted in writeTrailer.
TimecodeScale: defaultTimecode,
MuxingApp: "your_app_name",
WritingApp: "your_app_name",
Duration: defaultDuration,
}),
mkvcore.WithBlockInterceptor(webm.DefaultBlockInterceptor),
mkvcore.WithMarshalOptions(ebml.WithElementWriteHooks(func(e *ebml.Element) {
switch e.Name {
case "Duration":
w.durationPos = e.Position + 4 // Duration header size = 3, SegmentInfo header size delta = 1
if e.Name == "Duration" {
w.durationPos = int64(e.Position + 4)
}
})),
)
if err != nil {
return nil, err
}
w.tempFileName = tempFile.Name()
return mkvWriters, nil
return writers, nil
}
func (w *EBMLMuxer) Init(_ context.Context) error {
@@ -238,53 +227,52 @@ func (w *EBMLMuxer) WriteAudio(data []byte, keyframe bool, pts uint64, _ uint64)
return nil
}
func (w *EBMLMuxer) Finalize(ctx context.Context) error {
defer func() {
w.cleanup()
}()
log.Info(ctx, "finalize webm muxer")
fileName := w.tempFileName
func (w *EBMLMuxer) Finalize(ctx context.Context, output io.Writer) error {
defer w.cleanup()
if err := w.overwritePTS(); err != nil {
return fmt.Errorf("overwrite PTS error: %w", err)
}
// Copy the data from the temporary file to the output writer
if _, err := w.tempFile.Seek(0, io.SeekStart); err != nil {
return fmt.Errorf("seek error: %w", err)
}
if _, err := io.Copy(output, w.tempFile); err != nil {
return fmt.Errorf("copy error: %w", err)
}
for _, writer := range w.writers {
if err := writer.Close(); err != nil {
log.Error(ctx, err, "failed to close writer")
return fmt.Errorf("writer close error: %w", err)
}
}
if err := w.overwritePTS(ctx, fileName); err != nil {
return err
}
return nil
}
func (w *EBMLMuxer) ContainerName() string {
return string(w.container)
}
func (w *EBMLMuxer) overwritePTS(ctx context.Context, fileName string) error {
tempFile, err := os.OpenFile(fileName, os.O_RDWR, 0o600)
if err != nil {
func (w *EBMLMuxer) overwritePTS() error {
ptsBytes := make([]byte, 8)
binary.BigEndian.PutUint64(ptsBytes, math.Float64bits(float64(w.duration)))
if _, err := w.tempFile.Seek(w.durationPos, io.SeekStart); err != nil {
return err
}
defer func() {
if err := tempFile.Close(); err != nil {
log.Error(ctx, err, "failed to close temp file")
}
}()
ptsBytes, _ := EncodeFloat64(float64(w.duration))
if _, err := tempFile.WriteAt(ptsBytes, int64(w.durationPos)); err != nil {
if _, err := w.tempFile.Write(ptsBytes); err != nil {
return err
}
return nil
}
func (w *EBMLMuxer) cleanup() {
if w.tempFile != nil {
w.tempFile.Close()
//os.Remove(w.tempFile.Name())
w.tempFile = nil
}
w.writers = nil
w.tempFileName = ""
w.duration = 0
w.durationPos = 0
}
func EncodeFloat64(i float64) ([]byte, error) {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, math.Float64bits(i))
return b, nil
func (w *EBMLMuxer) ContainerName() string {
return string(w.container)
}

View File

@@ -0,0 +1,229 @@
package thumbnail
import (
"bytes"
"context"
"errors"
"image"
"image/jpeg"
"sync"
"time"
"liveflow/log"
"liveflow/media/hub"
"liveflow/media/streamer/fields"
"liveflow/media/streamer/processes"
"github.com/asticode/go-astiav"
"github.com/sirupsen/logrus"
)
var (
ErrUnsupportedCodec = errors.New("unsupported codec")
)
// ThumbnailData represents thumbnail data in memory
type ThumbnailData struct {
Data []byte
Timestamp time.Time
}
// ThumbnailStore manages thumbnails in memory
type ThumbnailStore struct {
mu sync.RWMutex
thumbnails map[string]*ThumbnailData
}
// NewThumbnailStore creates a new thumbnail store
func NewThumbnailStore() *ThumbnailStore {
return &ThumbnailStore{
thumbnails: make(map[string]*ThumbnailData),
}
}
// Set stores thumbnail data for a stream
func (ts *ThumbnailStore) Set(streamID string, data []byte) {
ts.mu.Lock()
defer ts.mu.Unlock()
ts.thumbnails[streamID] = &ThumbnailData{
Data: data,
Timestamp: time.Now(),
}
}
// Get retrieves thumbnail data for a stream
func (ts *ThumbnailStore) Get(streamID string) (*ThumbnailData, bool) {
ts.mu.RLock()
defer ts.mu.RUnlock()
data, exists := ts.thumbnails[streamID]
return data, exists
}
// Delete removes thumbnail data for a stream
func (ts *ThumbnailStore) Delete(streamID string) {
ts.mu.Lock()
defer ts.mu.Unlock()
delete(ts.thumbnails, streamID)
}
// ThumbnailArgs contains arguments for initializing thumbnail service
type ThumbnailArgs struct {
Hub *hub.Hub
Store *ThumbnailStore
OutputPath string // Not used anymore, kept for compatibility
IntervalSeconds int // Thumbnail generation interval in seconds
Width int // Thumbnail width
Height int // Thumbnail height
}
// Thumbnail represents thumbnail generation service
type Thumbnail struct {
hub *hub.Hub
intervalSeconds int
width int
height int
decoder *processes.VideoDecodingProcess
lastThumbnailTime int64
store *ThumbnailStore
}
// NewThumbnail creates a new thumbnail service instance
func NewThumbnail(args ThumbnailArgs) *Thumbnail {
return &Thumbnail{
hub: args.Hub,
intervalSeconds: args.IntervalSeconds,
width: args.Width,
height: args.Height,
store: args.Store,
}
}
// Start starts the thumbnail service
func (t *Thumbnail) Start(ctx context.Context, source hub.Source) error {
if !hub.HasCodecType(source.MediaSpecs(), hub.CodecTypeH264) {
return ErrUnsupportedCodec
}
ctx = log.WithFields(ctx, logrus.Fields{
fields.StreamID: source.StreamID(),
fields.SourceName: source.Name(),
})
log.Info(ctx, "start thumbnail")
// Initialize video decoder
t.decoder = processes.NewVideoDecodingProcess(astiav.CodecIDH264)
if err := t.decoder.Init(); err != nil {
return err
}
sub := t.hub.Subscribe(source.StreamID())
go func() {
intervalMS := int64(t.intervalSeconds * 1000)
for data := range sub {
if data.H264Video != nil {
// Check if enough time has passed for next thumbnail
if data.H264Video.RawDTS()-t.lastThumbnailTime >= intervalMS {
// Check if this is a keyframe for better thumbnail quality
isKeyFrame := false
for _, sliceType := range data.H264Video.SliceTypes {
if sliceType == hub.SliceI {
isKeyFrame = true
break
}
}
if isKeyFrame {
t.onVideo(ctx, data.H264Video, source.StreamID())
t.lastThumbnailTime = data.H264Video.RawDTS()
}
}
}
}
// Clean up thumbnail when stream ends
t.store.Delete(source.StreamID())
log.Infof(ctx, "thumbnail cleaned up for stream %s", source.StreamID())
}()
return nil
}
// encodeImageToJPEG encodes an image to JPEG bytes
func (t *Thumbnail) encodeImageToJPEG(img image.Image) ([]byte, error) {
var buf bytes.Buffer
// JPEG encode options
options := &jpeg.Options{
Quality: 85, // Good quality for thumbnails
}
if err := jpeg.Encode(&buf, img, options); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// onVideo processes H264 video data and generates thumbnail
func (t *Thumbnail) onVideo(ctx context.Context, h264Video *hub.H264Video, streamID string) {
// Decode H264 to AVFrame
frames, err := t.decoder.Process(*h264Video)
if err != nil {
log.Error(ctx, err, "failed to decode video for thumbnail")
return
}
// Process each decoded frame
for _, frame := range frames {
if frame != nil {
t.generateThumbnail(ctx, frame, streamID)
// Only generate one thumbnail per interval
break
}
}
}
// generateThumbnail creates thumbnail from AVFrame
func (t *Thumbnail) generateThumbnail(ctx context.Context, frame *astiav.Frame, streamID string) {
// Get frame data
frameData := frame.Data()
// Guess the image format from pixel format
img, err := frameData.GuessImageFormat()
if err != nil {
log.Error(ctx, err, "failed to guess image format")
return
}
// Convert AVFrame to Go image
err = frameData.ToImage(img)
if err != nil {
log.Error(ctx, err, "failed to convert frame to image")
return
}
// Encode image to JPEG bytes
jpegData, err := t.encodeImageToJPEG(img)
if err != nil {
log.Error(ctx, err, "failed to encode image to JPEG")
return
}
// Store in memory
t.store.Set(streamID, jpegData)
log.Infof(ctx, "thumbnail updated in memory for stream %s (size: %d bytes, image: %dx%d)",
streamID, len(jpegData), img.Bounds().Dx(), img.Bounds().Dy())
}
// Name returns the service name
func (t *Thumbnail) Name() string {
return "thumbnail"
}
// Stop stops the thumbnail service
func (t *Thumbnail) Stop() error {
// TODO: implement cleanup logic
return nil
}

View File

@@ -1,11 +1,17 @@
package whep
// #include <stdio.h>
// #include <stdlib.h>
//
// void __lsan_do_leak_check(void);
import "C"
import (
"context"
"errors"
"liveflow/media/streamer/processes"
astiav "github.com/asticode/go-astiav"
"github.com/asticode/go-astiav"
"liveflow/media/streamer/processes"
"github.com/deepch/vdk/codec/aacparser"
"github.com/pion/rtp"
@@ -83,40 +89,85 @@ func (w *WHEP) Start(ctx context.Context, source hub.Source) error {
if audioTranscodingProcess == nil {
audioTranscodingProcess = processes.NewTranscodingProcess(astiav.CodecIDAac, astiav.CodecIDOpus, audioSampleRate)
audioTranscodingProcess.Init()
defer audioTranscodingProcess.Close()
}
w.addAudioTrack(ctx, source.StreamID())
err := w.onAACAudio(ctx, source, data.AACAudio, audioTranscodingProcess)
if err != nil {
log.Error(ctx, err, "failed to process AAC audio")
}
} else {
if data.OPUSAudio != nil {
err := w.onAudio(source, data.OPUSAudio)
if err != nil {
log.Error(ctx, err, "failed to process OPUS audio")
}
} else if data.OPUSAudio != nil {
w.addAudioTrack(ctx, source.StreamID())
err := w.onAudio(source, data.OPUSAudio)
if err != nil {
log.Error(ctx, err, "failed to process OPUS audio")
}
}
}
if audioTranscodingProcess != nil {
log.Info(ctx, "draining audio transcoding process for WHEP")
packets, err := audioTranscodingProcess.Drain()
if err != nil {
log.Error(ctx, err, "failed to drain audio transcoder for WHEP")
}
for _, packet := range packets {
err := w.onAudio(source, &hub.OPUSAudio{
Data: packet.Data,
PTS: packet.PTS,
DTS: packet.DTS,
AudioClockRate: uint32(packet.SampleRate),
})
if err != nil {
log.Error(ctx, err, "failed to process drained OPUS audio for WHEP")
}
}
audioTranscodingProcess.Close()
log.Info(ctx, "audio transcoding process for WHEP drained and closed")
}
log.Info(ctx, "end whep")
//C.__lsan_do_leak_check()
}()
return nil
}
func (w *WHEP) onVideo(source hub.Source, h264Video *hub.H264Video) error {
func (w *WHEP) addVideoTrack(streamID string, videoClockRate uint32) error {
if w.videoTrack == nil {
var err error
w.videoTrack, err = webrtc.NewTrackLocalStaticRTP(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264}, "video", "pion")
if err != nil {
return err
}
w.tracks[source.StreamID()] = append(w.tracks[source.StreamID()], w.videoTrack)
w.tracks[streamID] = append(w.tracks[streamID], w.videoTrack)
ssrc := uint32(110)
const (
h264PayloadType = 96
mtu = 1400
)
w.videoPacketizer = rtp.NewPacketizer(mtu, h264PayloadType, ssrc, &codecs.H264Payloader{}, rtp.NewRandomSequencer(), h264Video.VideoClockRate)
w.videoPacketizer = rtp.NewPacketizer(mtu, h264PayloadType, ssrc, &codecs.H264Payloader{}, rtp.NewRandomSequencer(), videoClockRate)
}
return nil
}
func (w *WHEP) addAudioTrack(ctx context.Context, streamID string) error {
if w.audioTrack == nil {
var err error
w.audioTrack, err = webrtc.NewTrackLocalStaticRTP(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus}, "audio", "pion")
if err != nil {
log.Error(ctx, err, "failed to create audio track")
}
w.tracks[streamID] = append(w.tracks[streamID], w.audioTrack)
ssrc := uint32(111)
const (
opusPayloadType = 111
mtu = 1400
)
w.audioPacketizer = rtp.NewPacketizer(mtu, opusPayloadType, ssrc, &codecs.OpusPayloader{}, rtp.NewRandomSequencer(), 48000)
}
return nil
}
func (w *WHEP) onVideo(source hub.Source, h264Video *hub.H264Video) error {
w.addVideoTrack(source.StreamID(), h264Video.VideoClockRate)
videoDuration := h264Video.DTS - w.lastVideoTimestamp
videoPackets := w.videoPacketizer.Packetize(h264Video.Data, uint32(videoDuration))
@@ -131,21 +182,6 @@ func (w *WHEP) onVideo(source hub.Source, h264Video *hub.H264Video) error {
}
func (w *WHEP) onAudio(source hub.Source, opusAudio *hub.OPUSAudio) error {
if w.audioTrack == nil {
var err error
w.audioTrack, err = webrtc.NewTrackLocalStaticRTP(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus}, "audio", "pion")
if err != nil {
return err
}
w.tracks[source.StreamID()] = append(w.tracks[source.StreamID()], w.audioTrack)
ssrc := uint32(111)
const (
opusPayloadType = 111
mtu = 1400
)
w.audioPacketizer = rtp.NewPacketizer(mtu, opusPayloadType, ssrc, &codecs.OpusPayloader{}, rtp.NewRandomSequencer(), opusAudio.AudioClockRate)
}
audioDuration := opusAudio.DTS - w.lastAudioTimestamp
audioPackets := w.audioPacketizer.Packetize(opusAudio.Data, uint32(audioDuration))
@@ -161,34 +197,6 @@ func (w *WHEP) onAudio(source hub.Source, opusAudio *hub.OPUSAudio) error {
return nil
}
func (w *WHEP) syncAndSendPackets() error {
for len(w.videoBuffer) > 0 && len(w.audioBuffer) > 0 {
videoPacket := w.videoBuffer[0]
audioPacket := w.audioBuffer[0]
// Remove lagging packet from buffer
if videoPacket.timestamp <= audioPacket.timestamp {
// If audio is ahead, remove video from buffer
w.videoBuffer = w.videoBuffer[1:]
if err := w.videoTrack.WriteRTP(videoPacket.packet); err != nil {
return err
}
} else {
// If video is ahead, remove audio from buffer
w.audioBuffer = w.audioBuffer[1:]
if err := w.audioTrack.WriteRTP(audioPacket.packet); err != nil {
return err
}
}
}
return nil
}
func abs(x int64) int64 {
if x < 0 {
return -x
}
return x
}
func (w *WHEP) onAACAudio(ctx context.Context, source hub.Source, aac *hub.AACAudio, transcodingProcess *processes.AudioTranscodingProcess) error {
if len(aac.Data) == 0 {
log.Warn(ctx, "no data")
@@ -224,3 +232,31 @@ func (w *WHEP) onAACAudio(ctx context.Context, source hub.Source, aac *hub.AACAu
}
return nil
}
func (w *WHEP) syncAndSendPackets() error {
for len(w.videoBuffer) > 0 && len(w.audioBuffer) > 0 {
videoPacket := w.videoBuffer[0]
audioPacket := w.audioBuffer[0]
// Remove lagging packet from buffer
if videoPacket.timestamp <= audioPacket.timestamp {
// If audio is ahead, remove video from buffer
w.videoBuffer[0] = nil
w.videoBuffer = w.videoBuffer[1:]
if err := w.videoTrack.WriteRTP(videoPacket.packet); err != nil {
return err
}
} else {
// If video is ahead, remove audio from buffer
w.audioBuffer[0] = nil
w.audioBuffer = w.audioBuffer[1:]
if err := w.audioTrack.WriteRTP(audioPacket.packet); err != nil {
return err
}
}
}
return nil
}
func (w *WHEP) Name() string {
return "whep-server"
}

View File

@@ -0,0 +1,35 @@
package ingress
import (
"liveflow/media/hub"
"github.com/deepch/vdk/codec/h264parser"
)
func SliceTypes(payload []byte) []hub.SliceType {
nalus, _ := h264parser.SplitNALUs(payload)
slices := make([]hub.SliceType, 0)
for _, nalu := range nalus {
if len(nalu) < 1 {
continue
}
nalUnitType := nalu[0] & 0x1f
switch nalUnitType {
case h264parser.NALU_SPS:
slices = append(slices, hub.SliceSPS)
case h264parser.NALU_PPS:
slices = append(slices, hub.SlicePPS)
default:
sliceType, _ := h264parser.ParseSliceHeaderFromNALU(nalu)
switch sliceType {
case h264parser.SLICE_I:
slices = append(slices, hub.SliceI)
case h264parser.SLICE_P:
slices = append(slices, hub.SliceP)
case h264parser.SLICE_B:
slices = append(slices, hub.SliceB)
}
}
}
return slices
}

View File

@@ -5,6 +5,7 @@ import (
"context"
"fmt"
"io"
"liveflow/media/streamer/ingress"
"os"
"path/filepath"
@@ -17,11 +18,13 @@ import (
rtmpmsg "github.com/yutopp/go-rtmp/message"
"liveflow/log"
"liveflow/media/hlshub"
"liveflow/media/hub"
)
type Handler struct {
hub *hub.Hub
HLSHub *hlshub.HLSHub
streamID string
rtmp.DefaultHandler
flvFile *os.File
@@ -335,6 +338,7 @@ func (h *Handler) publishVideoData(timestamp uint32, compositionTime int32, vide
dts := int64(timestamp)
pts := int64(compositionTime) + dts
sliceTypes := ingress.SliceTypes(videoDataToSend)
h.hub.Publish(h.streamID, &hub.FrameData{
H264Video: &hub.H264Video{
VideoClockRate: 90000,
@@ -344,6 +348,7 @@ func (h *Handler) publishVideoData(timestamp uint32, compositionTime int32, vide
SPS: h.sps,
PPS: h.pps,
CodecData: nil,
SliceTypes: sliceTypes,
},
})
}
@@ -355,6 +360,7 @@ func (h *Handler) OnClose() {
_ = h.flvFile.Close()
}
h.hub.Unpublish(h.streamID)
h.HLSHub.DeleteMuxer(h.streamID)
}
func flvSampleRate(soundRate flvtag.SoundRate) uint32 {

View File

@@ -9,6 +9,7 @@ import (
"github.com/yutopp/go-rtmp"
"liveflow/log"
"liveflow/media/hlshub"
"liveflow/media/hub"
)
@@ -20,12 +21,14 @@ type RTMP struct {
serverConfig *rtmp.ServerConfig
hub *hub.Hub
port int
args RTMPArgs
}
type RTMPArgs struct {
ServerConfig *rtmp.ServerConfig
Hub *hub.Hub
Port int
HLSHub *hlshub.HLSHub
}
func NewRTMP(args RTMPArgs) *RTMP {
@@ -33,6 +36,7 @@ func NewRTMP(args RTMPArgs) *RTMP {
//serverConfig: args.ServerConfig,
hub: args.Hub,
port: args.Port,
args: args,
}
}
@@ -48,7 +52,8 @@ func (r *RTMP) Serve(ctx context.Context) error {
srv := rtmp.NewServer(&rtmp.ServerConfig{
OnConnect: func(conn net.Conn) (io.ReadWriteCloser, *rtmp.ConnConfig) {
h := &Handler{
hub: r.hub,
hub: r.hub,
HLSHub: r.args.HLSHub,
}
return conn, &rtmp.ConnConfig{
Handler: h,
@@ -67,6 +72,10 @@ func (r *RTMP) Serve(ctx context.Context) error {
},
})
log.Info(ctx, "RTMP server started")
go func() {
<-ctx.Done()
srv.Close()
}()
if err := srv.Serve(listener); err != nil {
log.Errorf(ctx, "Failed: %+v", err)
}

View File

@@ -8,7 +8,8 @@ import (
"strings"
"time"
"github.com/deepch/vdk/codec/h264parser"
"liveflow/media/streamer/ingress"
"github.com/labstack/echo/v4"
"github.com/pion/rtp"
"github.com/pion/rtp/codecs"
@@ -227,30 +228,7 @@ func (w *WebRTCHandler) onVideo(ctx context.Context, packets []*rtp.Packet) erro
return nil
}
pts := w.videoTimestampGen.Generate(int64(packets[0].Timestamp))
nalus, _ := h264parser.SplitNALUs(payload)
var slice hub.SliceType
for _, nalu := range nalus {
if len(nalu) < 1 {
continue
}
nalUnitType := nalu[0] & 0x1f
switch nalUnitType {
case h264parser.NALU_SPS:
slice = hub.SliceSPS
case h264parser.NALU_PPS:
slice = hub.SlicePPS
default:
sliceType, _ := h264parser.ParseSliceHeaderFromNALU(nalu)
switch sliceType {
case h264parser.SLICE_I:
slice = hub.SliceI
case h264parser.SLICE_P:
slice = hub.SliceP
case h264parser.SLICE_B:
slice = hub.SliceB
}
}
}
sliceTypes := ingress.SliceTypes(payload)
w.hub.Publish(w.streamID, &hub.FrameData{
H264Video: &hub.H264Video{
PTS: pts,
@@ -259,7 +237,7 @@ func (w *WebRTCHandler) onVideo(ctx context.Context, packets []*rtp.Packet) erro
Data: payload,
SPS: nil,
PPS: nil,
SliceType: slice,
SliceTypes: sliceTypes,
CodecData: nil,
},
AACAudio: nil,
@@ -305,6 +283,7 @@ func (r *WHIP) whepHandler(c echo.Context) error {
}
streamKey, err := r.bearerToken(c)
if err != nil {
log.Error(context.Background(), err, "failed to get stream key")
return c.JSON(http.StatusInternalServerError, err.Error())
}
@@ -312,11 +291,12 @@ func (r *WHIP) whepHandler(c echo.Context) error {
m := &webrtc.MediaEngine{}
err = registerCodec(m)
if err != nil {
log.Error(context.Background(), err, "failed to register codec")
return c.JSON(http.StatusInternalServerError, err.Error())
}
se := webrtc.SettingEngine{}
se.SetEphemeralUDPPortRange(30000, 30500)
se.SetEphemeralUDPPortRange(40000, 40010)
if r.dockerMode {
se.SetNAT1To1IPs([]string{"127.0.0.1"}, webrtc.ICECandidateTypeHost)
}
@@ -324,6 +304,7 @@ func (r *WHIP) whepHandler(c echo.Context) error {
api := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithSettingEngine(se))
peerConnection, err := api.NewPeerConnection(peerConnectionConfiguration)
if err != nil {
log.Error(context.Background(), err, "failed to create peer connection")
return c.JSON(http.StatusInternalServerError, err.Error())
}
@@ -332,6 +313,7 @@ func (r *WHIP) whepHandler(c echo.Context) error {
for _, track := range r.tracks[streamKey] {
sender, err := peerConnection.AddTrack(track)
if err != nil {
log.Error(context.Background(), err, "failed to add track")
return c.JSON(http.StatusInternalServerError, err.Error())
}
rtpSenders = append(rtpSenders, sender)
@@ -353,6 +335,7 @@ func (r *WHIP) whepHandler(c echo.Context) error {
fmt.Printf("ICE Connection State has changed: %s\n", connectionState.String())
if connectionState == webrtc.ICEConnectionStateFailed {
delete(r.tracks, streamKey)
_ = peerConnection.Close()
}
})

View File

@@ -4,10 +4,11 @@ import (
"context"
"fmt"
"io"
"liveflow/log"
"net/http"
"strings"
"liveflow/log"
"github.com/labstack/echo/v4"
"github.com/pion/interceptor"
"github.com/pion/interceptor/pkg/intervalpli"
@@ -56,7 +57,9 @@ func NewWHIP(args WHIPArgs) *WHIP {
func (r *WHIP) RegisterRoute() {
whipServer := r.echo
whipServer.Static("/", ".")
whipServer.GET("/wv", func(c echo.Context) error {
return c.File("static/index.html")
})
whipServer.POST("/whip", r.whipHandler)
whipServer.POST("/whep", r.whepHandler)
}
@@ -125,7 +128,7 @@ func (r *WHIP) whipHandler(c echo.Context) error {
// Create the API object with the MediaEngine
se := webrtc.SettingEngine{}
se.SetEphemeralUDPPortRange(30000, 30500)
se.SetEphemeralUDPPortRange(40000, 40010)
if r.dockerMode {
se.SetNAT1To1IPs([]string{"127.0.0.1"}, webrtc.ICECandidateTypeHost)
se.SetNetworkTypes([]webrtc.NetworkType{webrtc.NetworkTypeUDP4})

View File

@@ -4,10 +4,10 @@ import (
"context"
"errors"
"fmt"
"liveflow/log"
"liveflow/media/streamer/pipe"
astiav "github.com/asticode/go-astiav"
"liveflow/log"
)
type MediaPacket struct {
@@ -17,7 +17,7 @@ type MediaPacket struct {
SampleRate int
}
type AudioTranscodingProcess struct {
pipe.BaseProcess[*MediaPacket, []*MediaPacket]
//pipe.BaseProcess[*MediaPacket, []*MediaPacket]
decCodecID astiav.CodecID
encCodecID astiav.CodecID
decCodec *astiav.Codec
@@ -68,6 +68,7 @@ func (t *AudioTranscodingProcess) Init() error {
if t.encCodecContext == nil {
return errors.New("codec context is nil")
}
if t.decCodecContext.MediaType() == astiav.MediaTypeAudio {
t.encCodecContext.SetChannelLayout(astiav.ChannelLayoutStereo)
t.encCodecContext.SetSampleRate(t.encSampleRate)
@@ -108,28 +109,32 @@ func (t *AudioTranscodingProcess) Close() {
func (t *AudioTranscodingProcess) Process(data *MediaPacket) ([]*MediaPacket, error) {
ctx := context.Background()
frame := astiav.AllocFrame()
defer frame.Free()
packet := astiav.AllocPacket()
defer packet.Free()
err := packet.FromData(data.Data)
if err != nil {
if err := packet.FromData(data.Data); err != nil {
log.Error(ctx, err, "failed to create packet")
}
packet.SetPts(data.PTS)
packet.SetDts(data.DTS)
err = t.decCodecContext.SendPacket(packet)
if err != nil {
if err := t.decCodecContext.SendPacket(packet); err != nil {
log.Error(ctx, err, "failed to send packet")
}
frameToSend := astiav.AllocFrame()
defer frameToSend.Free()
if t.audioFifo == nil {
t.audioFifo = astiav.AllocAudioFifo(
t.encCodecContext.SampleFormat(),
t.encCodecContext.ChannelLayout().Channels(),
t.encCodecContext.SampleRate())
t.encCodecContext.SampleRate(),
)
}
var opusAudio []*MediaPacket
for {
frame := astiav.AllocFrame()
defer frame.Free()
err := t.decCodecContext.ReceiveFrame(frame)
if errors.Is(err, astiav.ErrEof) {
fmt.Println("EOF: ", err.Error())
@@ -138,21 +143,21 @@ func (t *AudioTranscodingProcess) Process(data *MediaPacket) ([]*MediaPacket, er
break
}
t.audioFifo.Write(frame)
nbSamples := 0
// check whether we have enough samples to encode
for t.audioFifo.Size() >= t.encCodecContext.FrameSize() {
frameToSend := astiav.AllocFrame()
defer frameToSend.Free()
// initialize frameToSend before reusing it
frameToSend.Unref()
frameToSend.SetNbSamples(t.encCodecContext.FrameSize())
frameToSend.SetChannelLayout(t.encCodecContext.ChannelLayout()) // t.encCodecContext.ChannelLayout())
frameToSend.SetChannelLayout(t.encCodecContext.ChannelLayout())
frameToSend.SetSampleFormat(t.encCodecContext.SampleFormat())
frameToSend.SetSampleRate(t.encCodecContext.SampleRate())
frameToSend.SetPts(t.lastPts + int64(t.encCodecContext.FrameSize()))
t.lastPts = frameToSend.Pts()
nbSamples += frame.NbSamples()
err := frameToSend.AllocBuffer(0)
if err != nil {
if err := frameToSend.AllocBuffer(0); err != nil {
log.Error(ctx, err, "failed to alloc buffer")
}
t.lastPts = frameToSend.Pts()
read, err := t.audioFifo.Read(frameToSend)
if err != nil {
log.Error(ctx, err, "failed to read fifo")
@@ -160,14 +165,13 @@ func (t *AudioTranscodingProcess) Process(data *MediaPacket) ([]*MediaPacket, er
if read < frameToSend.NbSamples() {
log.Error(ctx, err, "failed to read fifo")
}
// Encode the frame
err = t.encCodecContext.SendFrame(frameToSend)
if err != nil {
if err := t.encCodecContext.SendFrame(frameToSend); err != nil {
log.Error(ctx, err, "failed to send frame")
}
pkt := astiav.AllocPacket()
for {
pkt := astiav.AllocPacket()
defer pkt.Free()
err := t.encCodecContext.ReceivePacket(pkt)
if errors.Is(err, astiav.ErrEof) {
fmt.Println("EOF: ", err.Error())
@@ -181,12 +185,72 @@ func (t *AudioTranscodingProcess) Process(data *MediaPacket) ([]*MediaPacket, er
DTS: pkt.Dts(),
SampleRate: t.encCodecContext.SampleRate(),
})
pkt.Unref()
}
pkt.Free()
}
frame.Unref()
}
select {
case t.ResultChan() <- opusAudio:
default:
}
return opusAudio, nil
}
func (t *AudioTranscodingProcess) Drain() ([]*MediaPacket, error) {
var packets []*MediaPacket
ctx := context.Background()
// 1. Flush Decoder
if err := t.decCodecContext.SendPacket(nil); err != nil {
log.Error(ctx, err, "failed to send nil packet to decoder for draining")
// Continue to encoder flushing
}
frame := astiav.AllocFrame()
defer frame.Free()
for {
err := t.decCodecContext.ReceiveFrame(frame)
if errors.Is(err, astiav.ErrEof) || errors.Is(err, astiav.ErrEagain) {
break
}
if err != nil {
log.Error(ctx, err, "failed to receive frame from decoder during draining")
continue
}
if err := t.encCodecContext.SendFrame(frame); err != nil {
log.Error(ctx, err, "failed to send flushed frame to encoder")
}
frame.Unref()
}
// 2. Flush Encoder
if err := t.encCodecContext.SendFrame(nil); err != nil {
log.Error(ctx, err, "failed to send nil frame to encoder for draining")
return packets, err
}
pkt := astiav.AllocPacket()
defer pkt.Free()
for {
err := t.encCodecContext.ReceivePacket(pkt)
if errors.Is(err, astiav.ErrEof) || errors.Is(err, astiav.ErrEagain) {
break
}
if err != nil {
log.Error(ctx, err, "failed to receive packet from encoder during draining")
continue
}
packets = append(packets, &MediaPacket{
Data: append([]byte{}, pkt.Data()...),
PTS: pkt.Pts(),
DTS: pkt.Dts(),
SampleRate: t.encCodecContext.SampleRate(),
})
pkt.Unref()
}
return packets, nil
}

41
static/m3u8player.html Normal file
View File

@@ -0,0 +1,41 @@
<!DOCTYPE html>
<html lang="en">
<head>
<script src="https://cdn.jsdelivr.net/npm/@mux/mux-video@0"></script>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>M3U8 Player</title>
</head>
<body>
<div id="root"></div>
<mux-video
id="hls-player"
prefer-playback="mse"
controls
autoplay
muted
>
</mux-video>
<script>
// Function to get the query parameters from the URL
function getQueryParam(param) {
const urlParams = new URLSearchParams(window.location.search);
return urlParams.get(param);
}
// Get streamid from the URL query parameters
const streamId = getQueryParam('streamid');
if (streamId) {
// Update the src dynamically
const host = window.location.protocol + '//' + window.location.host;
const player = document.getElementById('hls-player');
player.src = `${host}/hls/${streamId}/master.m3u8`;
} else {
console.error('Stream ID not provided in the URL');
}
</script>
</body>
</html>