This commit is contained in:
Diallo Han
2024-08-20 00:50:03 +09:00
commit 4cf49c1236
19 changed files with 1202 additions and 0 deletions

8
.idea/.gitignore generated vendored Normal file
View File

@@ -0,0 +1,8 @@
# 디폴트 무시된 파일
/shelf/
/workspace.xml
# 에디터 기반 HTTP 클라이언트 요청
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

35
go.mod Normal file
View File

@@ -0,0 +1,35 @@
module mrw-clone
go 1.21
require (
github.com/bluenviron/gohlslib v1.4.0
github.com/deepch/vdk v0.0.27
github.com/labstack/echo/v4 v4.12.0
github.com/pkg/errors v0.9.1
github.com/sirupsen/logrus v1.9.3
github.com/yutopp/go-flv v0.3.1
github.com/yutopp/go-rtmp v0.0.7
)
require (
github.com/abema/go-mp4 v1.2.0 // indirect
github.com/asticode/go-astikit v0.30.0 // indirect
github.com/asticode/go-astits v1.13.0 // indirect
github.com/bluenviron/mediacommon v1.11.1-0.20240525122142-20163863aa75 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.0 // indirect
github.com/labstack/gommon v0.4.2 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mitchellh/mapstructure v1.4.1 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasttemplate v1.2.2 // indirect
github.com/yapingcat/gomedia v0.0.0-20240725163034-902e6befb413 // indirect
github.com/yutopp/go-amf0 v0.1.0 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/text v0.14.0 // indirect
)

100
go.sum Normal file
View File

@@ -0,0 +1,100 @@
github.com/abema/go-mp4 v1.2.0 h1:gi4X8xg/m179N/J15Fn5ugywN9vtI6PLk6iLldHGLAk=
github.com/abema/go-mp4 v1.2.0/go.mod h1:vPl9t5ZK7K0x68jh12/+ECWBCXoWuIDtNgPtU2f04ws=
github.com/asticode/go-astikit v0.30.0 h1:DkBkRQRIxYcknlaU7W7ksNfn4gMFsB0tqMJflxkRsZA=
github.com/asticode/go-astikit v0.30.0/go.mod h1:h4ly7idim1tNhaVkdVBeXQZEE3L0xblP7fCWbgwipF0=
github.com/asticode/go-astits v1.13.0 h1:XOgkaadfZODnyZRR5Y0/DWkA9vrkLLPLeeOvDwfKZ1c=
github.com/asticode/go-astits v1.13.0/go.mod h1:QSHmknZ51pf6KJdHKZHJTLlMegIrhega3LPWz3ND/iI=
github.com/bluenviron/gohlslib v1.4.0 h1:3a9W1x8eqlxJUKt1sJCunPGtti5ALIY2ik4GU0RVe7E=
github.com/bluenviron/gohlslib v1.4.0/go.mod h1:q5ZElzNw5GRbV1VEI45qkcPbKBco6BP58QEY5HyFsmo=
github.com/bluenviron/mediacommon v1.11.1-0.20240525122142-20163863aa75 h1:5P8Um+ySuwZApuVS9gI6U0MnrIFybTfLrZSqV2ie5lA=
github.com/bluenviron/mediacommon v1.11.1-0.20240525122142-20163863aa75/go.mod h1:HDyW2CzjvhYJXtdxstdFPio3G0qSocPhqkhUt/qffec=
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/deepch/vdk v0.0.27 h1:j/SHaTiZhA47wRpaue8NRp7P9xwOOO/lunxrDJBwcao=
github.com/deepch/vdk v0.0.27/go.mod h1:JlgGyR2ld6+xOIHa7XAxJh+stSDBAkdNvIPkUIdIywk=
github.com/fortytw2/leaktest v1.2.0 h1:cj6GCiwJDH7l3tMHLjZDo0QqPtrXJiWSI9JgpeQKw+Q=
github.com/fortytw2/leaktest v1.2.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.0 h1:B9UzwGQJehnUY1yNrnwREHc3fGbC2xefo8g4TbElacI=
github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+vmowP0z+KUhOZdA=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/labstack/echo/v4 v4.12.0 h1:IKpw49IMryVB2p1a4dzwlhP1O2Tf2E0Ir/450lH+kI0=
github.com/labstack/echo/v4 v4.12.0/go.mod h1:UP9Cr2DJXbOK3Kr9ONYzNowSh7HP0aG0ShAyycHSJvM=
github.com/labstack/gommon v0.4.2 h1:F8qTUNXgG1+6WQmqoUWnz8WiEU60mXVVw0P4ht1WRA0=
github.com/labstack/gommon v0.4.2/go.mod h1:QlUFxVM+SNXhDL/Z7YhocGIBYOiwB0mXm1+1bAPHPyU=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mitchellh/mapstructure v1.4.1 h1:CpVNEelQCZBooIPDn+AR3NpivK/TIKU8bDxdASFVQag=
github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/orcaman/writerseeker v0.0.0-20200621085525-1d3f536ff85e h1:s2RNOM/IGdY0Y6qfTeUKhDawdHDpK9RGBdx80qN4Ttw=
github.com/orcaman/writerseeker v0.0.0-20200621085525-1d3f536ff85e/go.mod h1:nBdnFKj15wFbf94Rwfq4m30eAcyY9V/IyKAGQFtqkW0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/profile v1.4.0/go.mod h1:NWz/XGvpEW1FyYQ7fCx4dqYBLlfTcE+A9FLAkNKqjFE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/sunfish-shogi/bufseekio v0.0.0-20210207115823-a4185644b365/go.mod h1:dEzdXgvImkQ3WLI+0KQpmEx8T/C/ma9KeS3AfmU899I=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasttemplate v1.2.2 h1:lxLXG0uE3Qnshl9QyaK6XJxMXlQZELvChBOCmQD0Loo=
github.com/valyala/fasttemplate v1.2.2/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ=
github.com/yapingcat/gomedia v0.0.0-20240725163034-902e6befb413 h1:irprkncs5spukh9UjEBj+Ynm/uV7RMZD7/OxpVLDiFQ=
github.com/yapingcat/gomedia v0.0.0-20240725163034-902e6befb413/go.mod h1:WSZ59bidJOO40JSJmLqlkBJrjZCtjbKKkygEMfzY/kc=
github.com/yutopp/go-amf0 v0.1.0 h1:a3UeBZG7nRF0zfvmPn2iAfNo1RGzUpHz1VyJD2oGrik=
github.com/yutopp/go-amf0 v0.1.0/go.mod h1:QzDOBr9RV6sQh6E5GFEJROZbU0iQKijORBmprkb3FIk=
github.com/yutopp/go-flv v0.3.1 h1:4ILK6OgCJgUNm2WOjaucWM5lUHE0+sLNPdjq3L0Xtjk=
github.com/yutopp/go-flv v0.3.1/go.mod h1:pAlHPSVRMv5aCUKmGOS/dZn/ooTgnc09qOPmiUNMubs=
github.com/yutopp/go-rtmp v0.0.7 h1:sKKm1MVV3ANbJHZlf3Kq8ecq99y5U7XnDUDxSjuK7KU=
github.com/yutopp/go-rtmp v0.0.7/go.mod h1:KSwrC9Xj5Kf18EUlk1g7CScecjXfIqc0J5q+S0u6Irc=
golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30=
golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M=
golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w=
golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8=
golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o=
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/src-d/go-billy.v4 v4.3.2 h1:0SQA1pRztfTFx2miS8sA97XvooFeNOmvUenF4o0EcVg=
gopkg.in/src-d/go-billy.v4 v4.3.2/go.mod h1:nDjArDMp+XMs1aFAESLRjfGSgfvoYN0hDfzEk0GjC98=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

1
handler.go Normal file
View File

@@ -0,0 +1 @@
package main

101
httpsrv/hlshandler.go Normal file
View File

@@ -0,0 +1,101 @@
package httpsrv
import (
"context"
"fmt"
"net/http"
"path"
"path/filepath"
"github.com/bluenviron/gohlslib/pkg/codecparams"
"github.com/bluenviron/gohlslib/pkg/playlist"
"github.com/labstack/echo/v4"
"mrw-clone/log"
"mrw-clone/media/hlshub"
)
const (
cacheControl = "CDN-Cache-Control"
)
type Handler struct {
endpoint *hlshub.HLSHub
}
func NewHandler(hlsEndpoint *hlshub.HLSHub) *Handler {
return &Handler{
endpoint: hlsEndpoint,
}
}
func (h *Handler) HandleMasterM3U8(c echo.Context) error {
fmt.Println("@@@ HandleMasterM3U8")
workID := c.Param("streamID")
muxers, err := h.endpoint.MuxersByWorkID(workID)
if err != nil {
fmt.Println("get muxer failed")
return fmt.Errorf("get muxer failed: %w", err)
}
m3u8Version := 3
pl := &playlist.Multivariant{
Version: func() int {
return m3u8Version
}(),
IndependentSegments: true,
}
var variants []*playlist.MultivariantVariant
for name, muxer := range muxers {
// TODO: muxer.Bandwidth() is not implemented
//_, average, err := muxer.Bandwidth()
//if err != nil {
// continue
//}
average := 33033
variant := &playlist.MultivariantVariant{
Bandwidth: average,
FrameRate: nil,
URI: path.Join(name, "stream.m3u8"),
}
// TODO: muxer.ResolutionString() is not implemented
//resolution, err := muxer.ResolutionString()
//if err == nil {
// variant.Resolution = resolution
//}
variant.Codecs = []string{}
if muxer.VideoTrack != nil {
variant.Codecs = append(variant.Codecs, codecparams.Marshal(muxer.VideoTrack.Codec))
}
if muxer.AudioTrack != nil {
variant.Codecs = append(variant.Codecs, codecparams.Marshal(muxer.AudioTrack.Codec))
}
variants = append(variants, variant)
}
pl.Variants = variants
c.Response().Header().Set(cacheControl, "max-age=1")
masterM3u8Bytes, err := pl.Marshal()
if err != nil {
return err
}
return c.Blob(http.StatusOK, "application/vnd.apple.mpegurl", masterM3u8Bytes)
}
func (h *Handler) HandleM3U8(c echo.Context) error {
workID := c.Param("streamID")
playlistName := c.Param("playlistName")
ctx := context.Background()
muxer, err := h.endpoint.Muxer(workID, playlistName)
if err != nil {
log.Error(ctx, err, "no hls stream")
return c.NoContent(http.StatusNotFound)
}
extension := filepath.Ext(c.Request().URL.String())
switch extension {
case ".m3u8":
c.Response().Header().Set(cacheControl, "max-age=1")
case ".ts", ".mp4":
c.Response().Header().Set(cacheControl, "max-age=3600")
}
muxer.Handle(c.Response(), c.Request())
return nil
}

90
log/log.go Normal file
View File

@@ -0,0 +1,90 @@
package log
import (
"context"
"github.com/sirupsen/logrus"
)
type ctxKey string
const loggerKey ctxKey = "logger"
// 컨텍스트에서 로거를 가져오는 헬퍼 함수
func getLogger(ctx context.Context) *logrus.Entry {
logger, ok := ctx.Value(loggerKey).(*logrus.Entry)
if !ok {
// 기본 로거를 반환하거나 오류 처리
return logrus.NewEntry(logrus.StandardLogger())
}
return logger
}
// 로거에 필드를 추가하는 헬퍼 함수
func WithFields(ctx context.Context, fields map[string]interface{}) context.Context {
logger := getLogger(ctx).WithFields(fields)
return context.WithValue(ctx, loggerKey, logger)
}
func Info(ctx context.Context, args ...interface{}) {
getLogger(ctx).Info(args...)
}
func Infof(ctx context.Context, format string, args ...interface{}) {
getLogger(ctx).Infof(format, args...)
}
func Debug(ctx context.Context, args ...interface{}) {
getLogger(ctx).Debug(args...)
}
func Debugf(ctx context.Context, format string, args ...interface{}) {
getLogger(ctx).Debugf(format, args...)
}
func Warn(ctx context.Context, args ...interface{}) {
getLogger(ctx).Warn(args...)
}
func Warnf(ctx context.Context, format string, args ...interface{}) {
getLogger(ctx).Warnf(format, args...)
}
func Error(ctx context.Context, args ...interface{}) {
getLogger(ctx).Error(args...)
}
func Errorf(ctx context.Context, format string, args ...interface{}) {
getLogger(ctx).Errorf(format, args...)
}
func Fatal(ctx context.Context, args ...interface{}) {
getLogger(ctx).Fatal(args...)
}
func Fatalf(ctx context.Context, format string, args ...interface{}) {
getLogger(ctx).Fatalf(format, args...)
}
func Panic(ctx context.Context, args ...interface{}) {
getLogger(ctx).Panic(args...)
}
func Panicf(ctx context.Context, format string, args ...interface{}) {
getLogger(ctx).Panicf(format, args...)
}
func Print(ctx context.Context, args ...interface{}) {
getLogger(ctx).Print(args...)
}
func SetLevel(ctx context.Context, level logrus.Level) {
logrus.SetLevel(level)
getLogger(ctx).Logger.SetLevel(level)
}
func SetFormatter(ctx context.Context, formatter logrus.Formatter) {
logrus.SetFormatter(formatter)
getLogger(ctx).Logger.SetFormatter(formatter)
}

58
main.go Normal file
View File

@@ -0,0 +1,58 @@
package main
import (
"context"
"fmt"
"github.com/labstack/echo/v4"
"mrw-clone/httpsrv"
"mrw-clone/media/hlshub"
"mrw-clone/media/hub"
"mrw-clone/media/streamer/hls"
"mrw-clone/media/streamer/record/mp4"
"mrw-clone/media/streamer/rtmp"
)
// RTMP 받으면 자동으로 HLS 서비스 동작, 녹화 서비스까지~?
func main() {
ctx := context.Background()
hub := hub.NewHub()
// ingress
// Egress 서비스는 streamID 알림을 구독하여 처리 시작
go func() {
api := echo.New()
hlsHub := hlshub.NewHLSHub()
hlsHandler := httpsrv.NewHandler(hlsHub)
api.GET("/health", func(c echo.Context) error {
fmt.Println("hello")
return c.String(200, "ok")
})
api.GET("/hls/:streamID/master.m3u8", hlsHandler.HandleMasterM3U8)
api.GET("/hls/:streamID/:playlistName/stream.m3u8", hlsHandler.HandleM3U8)
api.GET("/hls/:streamID/:playlistName/:resourceName", hlsHandler.HandleM3U8)
go func() {
api.Start("0.0.0.0:8044")
}()
for streamID := range hub.SubscribeToStreamID() {
fmt.Printf("New streamID received: %s\n", streamID)
hls := hls.NewHLS(hls.HLSArgs{
Hub: hub,
HLSHub: hlsHub,
})
mp4 := mp4.NewMP4(mp4.MP4Args{
Hub: hub,
})
hls.Start(ctx, streamID)
mp4.Start(ctx, streamID)
}
}()
rtmpServer := rtmp.NewRTMP(rtmp.RTMPArgs{
Hub: hub,
})
rtmpServer.Serve(ctx)
}

65
media/hlshub/hub.go Normal file
View File

@@ -0,0 +1,65 @@
package hlshub
import (
"errors"
"sync"
"github.com/bluenviron/gohlslib"
)
var (
errNotFoundStream = errors.New("no HLS stream")
)
type HLSHub struct {
mu sync.RWMutex
// [workID][name(low|pass)]muxer
hlsMuxers map[string]map[string]*gohlslib.Muxer
}
func NewHLSHub() *HLSHub {
return &HLSHub{
mu: sync.RWMutex{},
hlsMuxers: map[string]map[string]*gohlslib.Muxer{},
}
}
func (s *HLSHub) StoreMuxer(workID string, name string, muxer *gohlslib.Muxer) {
s.mu.Lock()
defer s.mu.Unlock()
if s.hlsMuxers[workID] == nil {
s.hlsMuxers[workID] = map[string]*gohlslib.Muxer{}
}
s.hlsMuxers[workID][name] = muxer
}
func (s *HLSHub) DeleteMuxer(workID string) {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.hlsMuxers, workID)
}
func (s *HLSHub) Muxer(workID string, name string) (*gohlslib.Muxer, error) {
s.mu.RLock()
defer s.mu.RUnlock()
muxers, prs := s.hlsMuxers[workID]
if !prs {
return nil, errNotFoundStream
}
for n, muxer := range muxers {
if n == name {
return muxer, nil
}
}
return nil, errNotFoundStream
}
func (s *HLSHub) MuxersByWorkID(workID string) (map[string]*gohlslib.Muxer, error) {
s.mu.RLock()
defer s.mu.RUnlock()
muxers, prs := s.hlsMuxers[workID]
if !prs {
return nil, errNotFoundStream
}
return muxers, nil
}

60
media/hub/dto.go Normal file
View File

@@ -0,0 +1,60 @@
package hub
type FrameData struct {
H264Video *H264Video
AACAudio *AACAudio
//AudioCodecData *AudioCodecData
//MediaInfo *MediaInfo
}
type H264Video struct {
PTS int64
DTS int64
VideoClockRate uint32
Data []byte
SPS []byte
PPS []byte
SliceType SliceType
CodecData []byte
}
func (h *H264Video) RawTimestamp() int64 {
if h.VideoClockRate == 0 {
return h.PTS
} else {
return h.PTS / int64(h.VideoClockRate)
}
}
type AACAudio struct {
Timestamp uint32
Data []byte
CodecData []byte
AudioClockRate uint32
}
type AudioCodecData struct {
Timestamp uint32
Data []byte
}
type VideoCodecData struct {
Timestamp uint32
Data []byte
}
type MetaData struct {
Timestamp uint32
Data []byte
}
type MediaInfo struct {
VCodec VideoCodecType
}
type VideoCodecType int
const (
H264 VideoCodecType = iota
VP8
// Add other codecs as needed
)

80
media/hub/hub.go Normal file
View File

@@ -0,0 +1,80 @@
package hub
import (
"sync"
)
// Hub 구조체: streamID별로 독립적으로 데이터를 관리하고, Pub/Sub 메커니즘을 지원합니다.
type Hub struct {
streams map[string][]chan FrameData // 각 streamID에 대한 채널을 저장
notifyChan chan string // streamID가 결정되었을 때 노티하는 채널
mu sync.RWMutex // 동시성을 위한 Mutex
}
// NewHub : Hub 생성자
func NewHub() *Hub {
return &Hub{
streams: make(map[string][]chan FrameData),
notifyChan: make(chan string, 1024), // 버퍼 크기를 조절할 수 있습니다.
}
}
func (h *Hub) Notify(streamID string) {
h.notifyChan <- streamID
}
// Publish : 주어진 streamID에 데이터를 Publish합니다.
func (h *Hub) Publish(streamID string, data FrameData) {
h.mu.Lock()
defer h.mu.Unlock()
if _, exists := h.streams[streamID]; !exists {
h.streams[streamID] = make([]chan FrameData, 0)
}
for _, ch := range h.streams[streamID] {
ch <- data
}
}
func (h *Hub) Unpublish(streamID string) {
h.mu.Lock()
defer h.mu.Unlock()
if _, exists := h.streams[streamID]; !exists {
return
}
for _, ch := range h.streams[streamID] {
close(ch)
}
delete(h.streams, streamID)
}
// Subscribe : 주어진 streamID에 대해 구독합니다.
func (h *Hub) Subscribe(streamID string) <-chan FrameData {
h.mu.RLock()
defer h.mu.RUnlock()
ch := make(chan FrameData)
h.streams[streamID] = append(h.streams[streamID], ch)
return ch
}
// SubscribeToStreamID : 스트림 ID가 결정되었을 때 이를 구독하는 채널을 반환합니다.
func (h *Hub) SubscribeToStreamID() <-chan string {
return h.notifyChan
}
// RemoveStream : 사용하지 않는 스트림을 제거하는 함수 (리소스 해제)
func (h *Hub) RemoveStream(streamID string) {
h.mu.Lock()
defer h.mu.Unlock()
if chs, exists := h.streams[streamID]; exists {
for _, ch := range chs {
close(ch)
}
delete(h.streams, streamID)
}
}

30
media/hub/slicetype.go Normal file
View File

@@ -0,0 +1,30 @@
package hub
import "fmt"
type SliceType int
func (s SliceType) String() string {
switch s {
case SliceI:
return "I"
case SliceP:
return "P"
case SliceB:
return "B"
case SliceSPS:
return "SPS"
case SlicePPS:
return "PPS"
default:
return fmt.Sprintf("Unknown SliceType: %d", s)
}
}
const (
SliceI SliceType = 0
SliceP SliceType = 1
SliceB SliceType = 2
SliceSPS SliceType = 3
SlicePPS SliceType = 4
)

View File

@@ -0,0 +1,17 @@
package streamer
func ConcatByteSlices(slices ...[]byte) []byte {
// 결과 슬라이스의 길이를 계산합니다.
totalLength := 0
for _, slice := range slices {
totalLength += len(slice)
}
// 결과 슬라이스를 할당하고 가변 인자로 받은 슬라이스들을 연결합니다.
result := make([]byte, 0, totalLength)
for _, slice := range slices {
result = append(result, slice...)
}
return result
}

View File

@@ -0,0 +1,97 @@
package hls
import (
"context"
"errors"
"fmt"
"time"
"github.com/bluenviron/gohlslib"
"github.com/bluenviron/gohlslib/pkg/codecs"
"github.com/deepch/vdk/codec/h264parser"
"mrw-clone/log"
"mrw-clone/media/hlshub"
"mrw-clone/media/hub"
)
type HLS struct {
hub *hub.Hub
hlsHub *hlshub.HLSHub
muxer *gohlslib.Muxer
}
type HLSArgs struct {
Hub *hub.Hub
HLSHub *hlshub.HLSHub
}
func NewHLS(args HLSArgs) *HLS {
return &HLS{
hub: args.Hub,
hlsHub: args.HLSHub,
}
}
func (h *HLS) Start(ctx context.Context, streamID string) {
fmt.Println("@@@ Start StreamID: ", streamID)
sub := h.hub.Subscribe(streamID)
go func() {
for data := range sub {
if data.AACAudio != nil {
if data.AACAudio.CodecData != nil {
muxer, err := h.makeLiveMuxer(data.AACAudio.CodecData)
if err != nil {
log.Error(ctx, err)
}
h.hlsHub.StoreMuxer(streamID, "pass", muxer)
err = muxer.Start()
if err != nil {
log.Error(ctx, err)
}
h.muxer = muxer
}
if h.muxer != nil {
h.muxer.WriteMPEG4Audio(time.Now(), time.Duration(data.AACAudio.Timestamp)*time.Millisecond, [][]byte{data.AACAudio.Data})
}
}
if data.H264Video != nil {
if h.muxer != nil {
au, _ := h264parser.SplitNALUs(data.H264Video.Data)
h.muxer.WriteH264(time.Now(), time.Duration(data.H264Video.RawTimestamp())*time.Millisecond, au)
}
}
}
fmt.Println("@@@ [HLS] end of streamID: ", streamID)
}()
}
func (h *HLS) makeLiveMuxer(extraData []byte) (*gohlslib.Muxer, error) {
var audioTrack *gohlslib.Track
if len(extraData) > 0 {
mpeg4Audio := &codecs.MPEG4Audio{}
err := mpeg4Audio.Unmarshal(extraData)
if err != nil {
return nil, errors.New("failed to unmarshal mpeg4 audio")
}
audioTrack = &gohlslib.Track{
Codec: mpeg4Audio,
}
}
muxer := &gohlslib.Muxer{
VideoTrack: &gohlslib.Track{
Codec: &codecs.H264{},
},
AudioTrack: audioTrack,
}
llHLS := false
if llHLS {
muxer.Variant = gohlslib.MuxerVariantLowLatency
muxer.PartDuration = 500 * time.Millisecond
} else {
muxer.Variant = gohlslib.MuxerVariantMPEGTS
muxer.SegmentDuration = 2 * time.Second
}
return muxer, nil
}

View File

@@ -0,0 +1 @@
package streamer

View File

@@ -0,0 +1 @@
package flv

View File

@@ -0,0 +1,150 @@
package mp4
import (
"context"
"errors"
"fmt"
"io"
"os"
"time"
"github.com/deepch/vdk/codec/h264parser"
gomp4 "github.com/yapingcat/gomedia/go-mp4"
"mrw-clone/log"
"mrw-clone/media/hub"
)
type cacheWriterSeeker struct {
buf []byte
offset int
}
func newCacheWriterSeeker(capacity int) *cacheWriterSeeker {
return &cacheWriterSeeker{
buf: make([]byte, 0, capacity),
offset: 0,
}
}
func (ws *cacheWriterSeeker) Write(p []byte) (n int, err error) {
fmt.Println("@@@ Write: ", len(p))
if cap(ws.buf)-ws.offset >= len(p) {
if len(ws.buf) < ws.offset+len(p) {
ws.buf = ws.buf[:ws.offset+len(p)]
}
copy(ws.buf[ws.offset:], p)
ws.offset += len(p)
return len(p), nil
}
tmp := make([]byte, len(ws.buf), cap(ws.buf)+len(p)*2)
copy(tmp, ws.buf)
if len(ws.buf) < ws.offset+len(p) {
tmp = tmp[:ws.offset+len(p)]
}
copy(tmp[ws.offset:], p)
ws.buf = tmp
ws.offset += len(p)
return len(p), nil
}
func (ws *cacheWriterSeeker) Seek(offset int64, whence int) (int64, error) {
if whence == io.SeekCurrent {
if ws.offset+int(offset) > len(ws.buf) {
return -1, errors.New(fmt.Sprint("SeekCurrent out of range", len(ws.buf), offset, ws.offset))
}
ws.offset += int(offset)
return int64(ws.offset), nil
} else if whence == io.SeekStart {
if offset > int64(len(ws.buf)) {
return -1, errors.New(fmt.Sprint("SeekStart out of range", len(ws.buf), offset, ws.offset))
}
ws.offset = int(offset)
return offset, nil
} else {
return 0, errors.New("unsupport SeekEnd")
}
}
type MP4 struct {
hub *hub.Hub
muxer *gomp4.Movmuxer
tempFile *os.File
videoIndex uint32
audioIndex uint32
}
type MP4Args struct {
Hub *hub.Hub
}
func NewMP4(args MP4Args) *MP4 {
return &MP4{
hub: args.Hub,
}
}
func (h *MP4) Start(ctx context.Context, streamID string) error {
sub := h.hub.Subscribe(streamID)
//h.audioIndex = mp4Muxer.AddAudioTrack(gomp4.MP4_CODEC_AAC)
go func() {
var err error
mp4FileName := fmt.Sprintf("%s_%s.mp4", streamID, time.Now().Format("20060102150405"))
mp4File, err := os.OpenFile(mp4FileName, os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
fmt.Println(err)
return
}
defer mp4File.Close()
fmt.Println(mp4File.Seek(0, io.SeekCurrent))
cws := newCacheWriterSeeker(4096)
muxer, err := gomp4.CreateMp4Muxer(cws)
if err != nil {
fmt.Println(err)
return
}
vtid := muxer.AddVideoTrack(gomp4.MP4_CODEC_H264)
h.muxer = muxer
h.videoIndex = vtid
for data := range sub {
//fmt.Println("@@@ MP4")
if data.H264Video != nil {
//fmt.Printf("MP4: %d, size: %d\n", data.H264Video.Timestamp, len(data.H264Video.Data))
if data.H264Video.SliceType == h264parser.SLICE_I {
err := h.muxer.Write(h.videoIndex, data.H264Video.SPS, uint64(data.H264Video.PTS), uint64(data.H264Video.DTS))
if err != nil {
log.Error(ctx, err, "failed to write video")
}
err = h.muxer.Write(h.videoIndex, data.H264Video.PPS, uint64(data.H264Video.PTS), uint64(data.H264Video.DTS))
if err != nil {
log.Error(ctx, err, "failed to write video")
}
err = h.muxer.Write(h.videoIndex, data.H264Video.Data, uint64(data.H264Video.PTS), uint64(data.H264Video.DTS))
if err != nil {
log.Error(ctx, err, "failed to write video")
}
} else {
err := h.muxer.Write(h.videoIndex, data.H264Video.Data, uint64(data.H264Video.PTS), uint64(data.H264Video.DTS))
if err != nil {
log.Error(ctx, err, "failed to write video")
}
}
}
if data.AACAudio != nil {
//fmt.Printf("MP4: %d\n", data.AACAudio.Timestamp)
}
}
err = muxer.WriteTrailer()
if err != nil {
panic(err)
}
fmt.Println("video len: ", len(cws.buf))
_, err = mp4File.Write(cws.buf)
if err != nil {
panic(err)
}
}()
return nil
}

View File

@@ -0,0 +1,235 @@
package rtmp
import (
"bytes"
"context"
"encoding/hex"
"fmt"
"io"
"os"
"path/filepath"
"github.com/deepch/vdk/codec/h264parser"
"github.com/pkg/errors"
"github.com/yutopp/go-flv"
flvtag "github.com/yutopp/go-flv/tag"
"github.com/yutopp/go-rtmp"
rtmpmsg "github.com/yutopp/go-rtmp/message"
"mrw-clone/log"
"mrw-clone/media/hub"
"mrw-clone/media/streamer"
)
type Handler struct {
hub *hub.Hub
streamID string
rtmp.DefaultHandler
flvFile *os.File
flvEnc *flv.Encoder
width int
height int
sps []byte
pps []byte
hasSPS bool
}
func (h *Handler) OnServe(conn *rtmp.Conn) {
}
func (h *Handler) OnConnect(timestamp uint32, cmd *rtmpmsg.NetConnectionConnect) error {
log.Infof(context.Background(), "OnConnect: %#v", cmd)
return nil
}
func (h *Handler) OnCreateStream(timestamp uint32, cmd *rtmpmsg.NetConnectionCreateStream) error {
log.Infof(context.Background(), "OnCreateStream: %#v", cmd)
return nil
}
func (h *Handler) OnPublish(_ *rtmp.StreamContext, timestamp uint32, cmd *rtmpmsg.NetStreamPublish) error {
log.Infof(context.Background(), "OnPublish: %#v", cmd)
// (example) Reject a connection when PublishingName is empty
if cmd.PublishingName == "" {
return errors.New("PublishingName is empty")
}
// Record streams as FLV!
p := filepath.Join(
os.TempDir(),
filepath.Clean(filepath.Join("/", fmt.Sprintf("%s.flv", cmd.PublishingName))),
)
f, err := os.OpenFile(p, os.O_CREATE|os.O_WRONLY, 0666)
if err != nil {
return errors.Wrap(err, "Failed to create flv file")
}
h.flvFile = f
enc, err := flv.NewEncoder(f, flv.FlagsAudio|flv.FlagsVideo)
if err != nil {
_ = f.Close()
return errors.Wrap(err, "Failed to create flv encoder")
}
h.flvEnc = enc
h.streamID = cmd.PublishingName
h.hub.Notify(cmd.PublishingName)
return nil
}
func (h *Handler) OnSetDataFrame(timestamp uint32, data *rtmpmsg.NetStreamSetDataFrame) error {
r := bytes.NewReader(data.Payload)
var script flvtag.ScriptData
if err := flvtag.DecodeScriptData(r, &script); err != nil {
log.Infof(context.Background(), "Failed to decode script data: Err = %+v", err)
return nil // ignore
}
log.Infof(context.Background(), "SetDataFrame: Script = %#v", script)
if err := h.flvEnc.Encode(&flvtag.FlvTag{
TagType: flvtag.TagTypeScriptData,
Timestamp: timestamp,
Data: &script,
}); err != nil {
log.Infof(context.Background(), "Failed to write script data: Err = %+v", err)
}
return nil
}
func (h *Handler) OnAudio(timestamp uint32, payload io.Reader) error {
ctx := context.Background()
var buf bytes.Buffer
_, err := io.Copy(&buf, payload)
if err != nil {
log.Error(ctx, err, "failed to read audio")
return err
}
var audio flvtag.AudioData
if err := flvtag.DecodeAudioData(bytes.NewBuffer(buf.Bytes()), &audio); err != nil {
return err
}
flvBody := new(bytes.Buffer)
if _, err := io.Copy(flvBody, audio.Data); err != nil {
return err
}
audio.Data = flvBody
frameData := hub.FrameData{
AACAudio: &hub.AACAudio{
AudioClockRate: flvSampleRate(audio.SoundRate),
},
}
switch audio.AACPacketType {
case flvtag.AACPacketTypeSequenceHeader:
frameData.AACAudio.CodecData = flvBody.Bytes()
log.Infof(ctx, "AACAudio Sequence Header: %s", hex.Dump(flvBody.Bytes()))
case flvtag.AACPacketTypeRaw:
frameData.AACAudio = &hub.AACAudio{
Timestamp: timestamp,
Data: flvBody.Bytes(),
}
}
h.hub.Publish(h.streamID, frameData)
return nil
}
func (h *Handler) OnVideo(timestamp uint32, payload io.Reader) error {
ctx := context.Background()
var buf bytes.Buffer
_, err := io.Copy(&buf, payload)
if err != nil {
log.Error(ctx, err, "failed to read audio")
return err
}
var video flvtag.VideoData
if err := flvtag.DecodeVideoData(bytes.NewBuffer(buf.Bytes()), &video); err != nil {
return err
}
flvBody := new(bytes.Buffer)
if _, err := io.Copy(flvBody, video.Data); err != nil {
return err
}
video.Data = flvBody
switch video.AVCPacketType {
case flvtag.AVCPacketTypeSequenceHeader:
log.Info(ctx, "Received AVCPacketTypeSequenceHeader")
seqHeader, err := h264parser.NewCodecDataFromAVCDecoderConfRecord(flvBody.Bytes())
if err != nil {
log.Error(ctx, err, "Failed to NewCodecDataFromAVCDecoderConfRecord")
} else {
h.width = seqHeader.Width()
h.height = seqHeader.Height()
h.sps = make([]byte, len(seqHeader.SPS()))
copy(h.sps, seqHeader.SPS())
h.pps = make([]byte, len(seqHeader.PPS()))
copy(h.pps, seqHeader.PPS())
}
h.hasSPS = true
return nil
case flvtag.AVCPacketTypeNALU:
annexB := []byte{0, 0, 0, 1}
nals, _ := h264parser.SplitNALUs(flvBody.Bytes())
for _, n := range nals {
sliceType, _ := h264parser.ParseSliceHeaderFromNALU(n)
dts := int64(timestamp)
pts := int64(video.CompositionTime) + dts
var hubSliceType hub.SliceType
switch sliceType {
case h264parser.SLICE_I:
hubSliceType = hub.SliceI
case h264parser.SLICE_P:
hubSliceType = hub.SliceP
case h264parser.SLICE_B:
hubSliceType = hub.SliceB
}
h.hub.Publish(h.streamID, hub.FrameData{
H264Video: &hub.H264Video{
VideoClockRate: 90000,
DTS: dts,
PTS: pts,
Data: streamer.ConcatByteSlices(annexB, n),
SPS: h.sps,
PPS: h.pps,
SliceType: hubSliceType,
CodecData: nil,
},
})
}
//sliceTypes := parsers.ParseH264Payload(annexBs)
// chunkmessage 의 timestamp 는 dts 임
}
//////////////////////////
return nil
}
func (h *Handler) OnClose() {
log.Infof(context.Background(), "OnClose")
if h.flvFile != nil {
_ = h.flvFile.Close()
}
h.hub.Unpublish(h.streamID)
}
func flvSampleRate(soundRate flvtag.SoundRate) uint32 {
switch soundRate {
case flvtag.SoundRate5_5kHz:
return 5500
case flvtag.SoundRate11kHz:
return 11000
case flvtag.SoundRate22kHz:
return 22000
case flvtag.SoundRate44kHz:
return 44000
default:
return aacDefaultSampleRate
}
}

View File

@@ -0,0 +1,62 @@
package rtmp
import (
"context"
"io"
"net"
"github.com/yutopp/go-rtmp"
"mrw-clone/log"
"mrw-clone/media/hub"
)
const (
aacDefaultSampleRate = 44100
)
type RTMP struct {
serverConfig *rtmp.ServerConfig
hub *hub.Hub
}
type RTMPArgs struct {
ServerConfig *rtmp.ServerConfig
Hub *hub.Hub
}
func NewRTMP(args RTMPArgs) *RTMP {
return &RTMP{
//serverConfig: args.ServerConfig,
hub: args.Hub,
}
}
func (r *RTMP) Serve(ctx context.Context) error {
tcpAddr, err := net.ResolveTCPAddr("tcp", ":1930")
if err != nil {
log.Errorf(ctx, "Failed: %+v", err)
}
listener, err := net.ListenTCP("tcp", tcpAddr)
if err != nil {
log.Errorf(ctx, "Failed: %+v", err)
}
srv := rtmp.NewServer(&rtmp.ServerConfig{
OnConnect: func(conn net.Conn) (io.ReadWriteCloser, *rtmp.ConnConfig) {
h := &Handler{
hub: r.hub,
}
return conn, &rtmp.ConnConfig{
Handler: h,
ControlState: rtmp.StreamControlStateConfig{
DefaultBandwidthWindowSize: 6 * 1024 * 1024 / 8,
},
Logger: nil,
}
},
})
if err := srv.Serve(listener); err != nil {
log.Errorf(ctx, "Failed: %+v", err)
}
return nil
}

View File

@@ -0,0 +1,11 @@
package streamer
import (
"context"
"mrw-clone/media/hub"
)
type Streamer interface {
Start(ctx context.Context, hub hub.Hub) error
}