Compare commits

...

12 Commits

Author SHA1 Message Date
Alexey Khit
9f0153e2a8 Adds skip SEI frame 2022-09-09 19:32:56 +03:00
Alexey Khit
b2eaf03914 Adds match any clockrate or channels 2022-09-09 19:32:36 +03:00
Alexey Khit
8b54444c89 Adds mp4 module 2022-09-09 19:31:52 +03:00
Alexey Khit
76b352d67f Add exec launch time to debug 2022-09-07 12:21:36 +03:00
Alexey Khit
e8edb65a31 Adds ignoring unnecessary ffmpeg rtsp input tracks 2022-09-07 11:31:15 +03:00
Alexey Khit
88a6208912 Update ffmpeg output param name 2022-09-07 11:29:59 +03:00
Alexey Khit
14b6df68ce Adds support nginx with wrong port 2022-09-06 18:09:44 +03:00
Alexey Khit
77080663ee Add the feature for update to any version 2022-09-06 14:10:08 +03:00
Alexey Khit
d25d27a0ee Fix SDP parsing for noname camera 2022-09-06 12:43:10 +03:00
Alexey Khit
5460e194e8 Update loggers 2022-09-06 07:08:35 +03:00
Alexey Khit
e4f565f343 Fix H264 in RTSP processing 2022-09-06 07:00:22 +03:00
Alexey Khit
6b274f2a37 Adds Hass stream info to log 2022-09-06 06:36:13 +03:00
25 changed files with 667 additions and 375 deletions

View File

@@ -2,6 +2,16 @@
set +e
# add the feature for update to any version
if [ -f "/config/go2rtc.version" ]; then
branch=`cat /config/go2rtc.version`
echo "Update to version $branch"
git clone --depth 1 --branch "$branch" https://github.com/AlexxIT/go2rtc \
&& cd go2rtc \
&& CGO_ENABLED=0 go build -ldflags "-s -w" -trimpath -o /usr/local/bin \
&& rm -r /go2rtc && rm /config/go2rtc.version
fi
# set cwd for go2rtc (for config file, Hass itegration, etc)
cd /config

View File

@@ -34,9 +34,8 @@ func Init() {
log = app.GetLogger("api")
initStatic(cfg.Mod.StaticDir)
initWS()
HandleFunc("/api/frame.mp4", frameHandler)
HandleFunc("/api/frame.raw", frameHandler)
HandleFunc("/api/streams", streamsHandler)
HandleFunc("/api/ws", apiWS)

View File

@@ -1,40 +0,0 @@
package api
import (
"github.com/AlexxIT/go2rtc/cmd/streams"
"github.com/AlexxIT/go2rtc/pkg/keyframe"
"net/http"
"strings"
)
func frameHandler(w http.ResponseWriter, r *http.Request) {
src := r.URL.Query().Get("src")
stream := streams.Get(src)
if stream == nil {
return
}
var ch = make(chan []byte)
cons := new(keyframe.Consumer)
cons.IsMP4 = strings.HasSuffix(r.URL.Path, ".mp4")
cons.Listen(func(msg interface{}) {
switch msg.(type) {
case []byte:
ch <- msg.([]byte)
}
})
if err := stream.AddConsumer(cons); err != nil {
log.Warn().Err(err).Msg("[api.frame] add consumer")
return
}
data := <-ch
stream.RemoveConsumer(cons)
if _, err := w.Write(data); err != nil {
log.Error().Err(err).Msg("[api.frame] write")
}
}

View File

@@ -4,16 +4,42 @@ import (
"github.com/AlexxIT/go2rtc/pkg/streamer"
"github.com/gorilla/websocket"
"net/http"
"net/url"
"strings"
"sync"
)
type WSHandler func(ctx *Context, msg *streamer.Message)
var apiWsUp = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 512000,
func initWS() {
wsUp = &websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 512000,
}
wsUp.CheckOrigin = func(r *http.Request) bool {
origin := r.Header["Origin"]
if len(origin) == 0 {
return true
}
o, err := url.Parse(origin[0])
if err != nil {
return false
}
if o.Host == r.Host {
return true
}
log.Trace().Msgf("[api.ws] origin: %s, host: %s", o.Host, r.Host)
// some users change Nginx external port using Docker port
// so origin will be with a port and host without
if i := strings.IndexByte(o.Host, ':'); i > 0 {
return o.Host[:i] == r.Host
}
return false
}
}
var wsUp *websocket.Upgrader
type WSHandler func(ctx *Context, msg *streamer.Message)
type Context struct {
Conn *websocket.Conn
Request *http.Request
@@ -24,7 +50,7 @@ type Context struct {
}
func (ctx *Context) Upgrade(w http.ResponseWriter, r *http.Request) (err error) {
ctx.Conn, err = apiWsUp.Upgrade(w, r, nil)
ctx.Conn, err = wsUp.Upgrade(w, r, nil)
ctx.Request = r
return
}

View File

@@ -64,6 +64,8 @@ func Handle(url string) (streamer.Producer, error) {
log.Debug().Str("url", url).Msg("[exec] run")
ts := time.Now()
if err := cmd.Start(); err != nil {
log.Error().Err(err).Str("url", url).Msg("[exec]")
return nil, err
@@ -75,6 +77,7 @@ func Handle(url string) (streamer.Producer, error) {
log.Error().Str("url", url).Msg("[exec] timeout")
return nil, errors.New("timeout")
case prod := <-ch:
log.Debug().Stringer("launch", time.Since(ts)).Msg("[exec] run")
return prod, nil
}
}

View File

@@ -26,7 +26,7 @@ func Init() {
"rtsp": "-fflags nobuffer -flags low_delay -rtsp_transport tcp -i {input}",
// output
"out": "-rtsp_transport tcp -f rtsp {output}",
"output": "-rtsp_transport tcp -f rtsp {output}",
// `-g 30` - group of picture, GOP, keyframe interval
// `-preset superfast` - we can't use ultrafast because it doesn't support `-profile main -level 4.1`
@@ -55,9 +55,16 @@ func Init() {
s = s[7:] // remove `ffmpeg:`
var query url.Values
var queryVideo, queryAudio bool
if i := strings.IndexByte(s, '#'); i > 0 {
query = parseQuery(s[i+1:])
queryVideo = query["video"] != nil
queryAudio = query["audio"] != nil
s = s[:i]
} else {
// by default query both video and audio
queryVideo = true
queryAudio = true
}
var input string
@@ -66,7 +73,18 @@ func Init() {
case "http", "https":
input = strings.Replace(tpl["http"], "{input}", s, 1)
case "rtsp", "rtsps":
input = strings.Replace(tpl["rtsp"], "{input}", s, 1)
// https://ffmpeg.org/ffmpeg-protocols.html#rtsp
// skip unnecessary input tracks
switch {
case queryVideo && queryAudio:
input = "-allowed_media_types video+audio "
case queryVideo:
input = "-allowed_media_types video "
case queryAudio:
input = "-allowed_media_types audio "
}
input += strings.Replace(tpl["rtsp"], "{input}", s, 1)
}
}
@@ -108,17 +126,17 @@ func Init() {
}
}
if query["video"] == nil {
s += " -vn"
}
if query["audio"] == nil {
switch {
case queryVideo && !queryAudio:
s += " -an"
case queryAudio && !queryVideo:
s += " -vn"
}
} else {
s += " -c copy"
}
s += " " + tpl["out"]
s += " " + tpl["output"]
return exec.Handle(s)
})

View File

@@ -26,7 +26,7 @@ func Init() {
app.LoadConfig(&conf)
log = app.GetLogger("api")
log = app.GetLogger("hass")
// support https://www.home-assistant.io/integrations/rtsp_to_webrtc/
api.HandleFunc("/static", func(w http.ResponseWriter, r *http.Request) {
@@ -78,6 +78,7 @@ func Init() {
continue
}
log.Info().Str("url", "hass:" + entrie.Title).Msg("[hass] load stream")
//streams.Get("hass:" + entrie.Title)
}
}

129
cmd/mp4/mp4.go Normal file
View File

@@ -0,0 +1,129 @@
package mp4
import (
"github.com/AlexxIT/go2rtc/cmd/api"
"github.com/AlexxIT/go2rtc/cmd/app"
"github.com/AlexxIT/go2rtc/cmd/streams"
"github.com/AlexxIT/go2rtc/pkg/mp4"
"github.com/rs/zerolog"
"net/http"
"strconv"
"strings"
)
func Init() {
log = app.GetLogger("mp4")
api.HandleWS(MsgTypeMSE, handlerWS)
api.HandleFunc("/api/frame.mp4", handlerKeyframe)
api.HandleFunc("/api/stream.mp4", handlerMP4)
}
var log zerolog.Logger
func handlerKeyframe(w http.ResponseWriter, r *http.Request) {
if isChromeFirst(w, r) {
return
}
src := r.URL.Query().Get("src")
stream := streams.Get(src)
if stream == nil {
return
}
exit := make(chan []byte)
cons := &mp4.Consumer{}
cons.Listen(func(msg interface{}) {
switch msg := msg.(type) {
case []byte:
exit <- msg
}
})
if err := stream.AddConsumer(cons); err != nil {
log.Error().Err(err).Msg("[api.keyframe] add consumer")
return
}
w.Header().Set("Content-Type", cons.MimeType())
data := cons.Init()
data = append(data, <-exit...)
stream.RemoveConsumer(cons)
// Apple Safari won't show frame without length
w.Header().Set("Content-Length", strconv.Itoa(len(data)))
if _, err := w.Write(data); err != nil {
log.Error().Err(err).Msg("[api.keyframe] add consumer")
}
}
func handlerMP4(w http.ResponseWriter, r *http.Request) {
if isChromeFirst(w, r) || isSafari(w, r) {
return
}
log.Trace().Msgf("[api.mp4] %+v", r)
src := r.URL.Query().Get("src")
stream := streams.Get(src)
if stream == nil {
return
}
exit := make(chan struct{})
cons := &mp4.Consumer{}
cons.Listen(func(msg interface{}) {
switch msg := msg.(type) {
case []byte:
if _, err := w.Write(msg); err != nil {
exit <- struct{}{}
}
}
})
if err := stream.AddConsumer(cons); err != nil {
log.Error().Err(err).Msg("[api.mp4] add consumer")
return
}
defer stream.RemoveConsumer(cons)
w.Header().Set("Content-Type", cons.MimeType())
data := cons.Init()
if _, err := w.Write(data); err != nil {
log.Error().Err(err).Msg("[api.mp4] write")
return
}
<-exit
log.Trace().Msg("[api.mp4] close")
}
func isChromeFirst(w http.ResponseWriter, r *http.Request) bool {
// Chrome 105 does two requests: without Range and with `Range: bytes=0-`
if strings.Contains(r.UserAgent(), " Chrome/") {
if r.Header.Values("Range") == nil {
w.Header().Set("Content-Type", "video/mp4")
w.WriteHeader(http.StatusOK)
return true
}
}
return false
}
func isSafari(w http.ResponseWriter, r *http.Request) bool {
if r.Header.Get("Range") == "bytes=0-1" {
handlerKeyframe(w, r)
return true
}
return false
}

View File

@@ -1,35 +1,34 @@
package mse
package mp4
import (
"github.com/AlexxIT/go2rtc/cmd/api"
"github.com/AlexxIT/go2rtc/cmd/streams"
"github.com/AlexxIT/go2rtc/pkg/mse"
"github.com/AlexxIT/go2rtc/pkg/mp4"
"github.com/AlexxIT/go2rtc/pkg/streamer"
"github.com/rs/zerolog/log"
)
func Init() {
api.HandleWS("mse", handler)
}
const MsgTypeMSE = "mse" // fMP4
func handler(ctx *api.Context, msg *streamer.Message) {
func handlerWS(ctx *api.Context, msg *streamer.Message) {
src := ctx.Request.URL.Query().Get("src")
stream := streams.Get(src)
if stream == nil {
return
}
cons := new(mse.Consumer)
cons := &mp4.Consumer{}
cons.UserAgent = ctx.Request.UserAgent()
cons.RemoteAddr = ctx.Request.RemoteAddr
cons.Listen(func(msg interface{}) {
switch msg.(type) {
case *streamer.Message, []byte:
ctx.Write(msg)
}
})
if err := stream.AddConsumer(cons); err != nil {
log.Warn().Err(err).Msg("[api.mse] Add consumer")
log.Warn().Err(err).Msg("[api.mse] add consumer")
ctx.Error(err)
return
}
@@ -38,5 +37,9 @@ func handler(ctx *api.Context, msg *streamer.Message) {
stream.RemoveConsumer(cons)
})
cons.Init()
ctx.Write(&streamer.Message{
Type: MsgTypeMSE, Value: cons.MimeType(),
})
ctx.Write(cons.Init())
}

View File

@@ -69,7 +69,7 @@ func offerHandler(ctx *api.Context, msg *streamer.Message) {
return
}
log.Debug().Str("src", src).Msg("[webrtc] new consumer")
log.Debug().Str("url", src).Msg("[webrtc] new consumer")
var err error

View File

@@ -8,7 +8,7 @@ import (
"github.com/AlexxIT/go2rtc/cmd/ffmpeg"
"github.com/AlexxIT/go2rtc/cmd/hass"
"github.com/AlexxIT/go2rtc/cmd/homekit"
"github.com/AlexxIT/go2rtc/cmd/mse"
"github.com/AlexxIT/go2rtc/cmd/mp4"
"github.com/AlexxIT/go2rtc/cmd/ngrok"
"github.com/AlexxIT/go2rtc/cmd/rtmp"
"github.com/AlexxIT/go2rtc/cmd/rtsp"
@@ -33,7 +33,7 @@ func main() {
api.Init() // init HTTP API server
webrtc.Init()
mse.Init()
mp4.Init()
srtp.Init()
homekit.Init()

5
pkg/README.md Normal file
View File

@@ -0,0 +1,5 @@
## Useful links
- https://www.wowza.com/blog/streaming-protocols
- https://vimeo.com/blog/post/rtmp-stream/
- https://sanjeev-pandey.medium.com/understanding-the-mpeg-4-moov-atom-pseudo-streaming-in-mp4-93935e1b9e9a

View File

@@ -9,6 +9,7 @@ import (
const (
NALUTypePFrame = 1
NALUTypeIFrame = 5
NALUTypeSEI = 6
NALUTypeSPS = 7
NALUTypePPS = 8
)
@@ -17,6 +18,17 @@ func NALUType(b []byte) byte {
return b[4] & 0x1F
}
func IsKeyframe(b []byte) bool {
return NALUType(b) == NALUTypeIFrame
}
func GetProfileLevelID(fmtp string) string {
if fmtp == "" {
return ""
}
return streamer.Between(fmtp, "profile-level-id=", ";")
}
func GetParameterSet(fmtp string) (sps, pps []byte) {
if fmtp == "" {
return

View File

@@ -37,32 +37,37 @@ func RTPDepay(track *streamer.Track) streamer.WrapperFunc {
return nil
}
for {
for len(units) > 0 {
i := int(binary.BigEndian.Uint32(units)) + 4
unitAVC := units[:i]
unit := units[:i] // NAL Unit with AVC header
units = units[i:]
unitType := NALUType(unitAVC)
unitType := NALUType(unit)
//fmt.Printf("[H264] type: %2d, size: %6d\n", unitType, i)
switch unitType {
case NALUTypeSPS:
//println("new SPS")
sps = unitAVC
return nil
sps = unit
continue
case NALUTypePPS:
//println("new PPS")
pps = unitAVC
return nil
pps = unit
continue
case NALUTypeSEI:
// some unnecessary text information
continue
}
// ffmpeg with `-tune zerolatency` enable option `-x264opts sliced-threads=1`
// and every NALU will be sliced to multiple NALUs
if !packet.Marker {
buffer = append(buffer, unitAVC...)
return nil
buffer = append(buffer, unit...)
continue
}
if buffer != nil {
buffer = append(buffer, unitAVC...)
unitAVC = buffer
buffer = append(buffer, unit...)
unit = buffer
buffer = nil
}
@@ -86,17 +91,13 @@ func RTPDepay(track *streamer.Track) streamer.WrapperFunc {
clone = *packet
clone.Version = RTPPacketVersionAVC
clone.Payload = unitAVC
clone.Payload = unit
if err = push(&clone); err != nil {
return err
}
if len(units) == i {
return nil
}
units = units[i:]
}
return nil
}
}
}

View File

@@ -1,72 +0,0 @@
package keyframe
import (
"github.com/AlexxIT/go2rtc/pkg/h264"
"github.com/AlexxIT/go2rtc/pkg/mp4"
"github.com/AlexxIT/go2rtc/pkg/streamer"
"github.com/pion/rtp"
)
var annexB = []byte{0, 0, 0, 1}
type Consumer struct {
streamer.Element
IsMP4 bool
}
func (k *Consumer) GetMedias() []*streamer.Media {
// support keyframe extraction only for one coded...
codec := streamer.NewCodec(streamer.CodecH264)
return []*streamer.Media{
{
Kind: streamer.KindVideo, Direction: streamer.DirectionRecvonly,
Codecs: []*streamer.Codec{codec},
},
}
}
func (k *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *streamer.Track {
// sps and pps without AVC headers
sps, pps := h264.GetParameterSet(track.Codec.FmtpLine)
push := func(packet *rtp.Packet) error {
// TODO: remove it, unnecessary
if packet.Version != h264.RTPPacketVersionAVC {
panic("wrong packet type")
}
switch h264.NALUType(packet.Payload) {
case h264.NALUTypeSPS:
sps = packet.Payload[4:] // remove AVC header
case h264.NALUTypePPS:
pps = packet.Payload[4:] // remove AVC header
case h264.NALUTypeIFrame:
if sps == nil || pps == nil {
return nil
}
var data []byte
if k.IsMP4 {
data = mp4.MarshalMP4(sps, pps, packet.Payload)
} else {
data = append(data, annexB...)
data = append(data, sps...)
data = append(data, annexB...)
data = append(data, pps...)
data = append(data, annexB...)
data = append(data, packet.Payload[4:]...)
}
k.Fire(data)
}
return nil
}
if !h264.IsAVC(track.Codec) {
wrapper := h264.RTPDepay(track)
push = wrapper(push)
}
return track.Bind(push)
}

94
pkg/mp4/const.go Normal file
View File

@@ -0,0 +1,94 @@
package mp4
import (
"encoding/binary"
"github.com/deepch/vdk/format/mp4/mp4io"
"time"
)
var matrix = [9]int32{0x10000, 0, 0, 0, 0x10000, 0, 0, 0, 0x40000000}
var time0 = time.Date(1904, time.January, 1, 0, 0, 0, 0, time.UTC)
func FTYP() []byte {
b := make([]byte, 0x18)
binary.BigEndian.PutUint32(b, 0x18)
copy(b[0x04:], "ftyp")
copy(b[0x08:], "iso5")
copy(b[0x10:], "iso5")
copy(b[0x14:], "avc1")
return b
}
func MOOV() *mp4io.Movie {
return &mp4io.Movie{
Header: &mp4io.MovieHeader{
PreferredRate: 1,
PreferredVolume: 1,
Matrix: matrix,
NextTrackId: -1,
Duration: 0,
TimeScale: 1000,
CreateTime: time0,
ModifyTime: time0,
PreviewTime: time0,
PreviewDuration: time0,
PosterTime: time0,
SelectionTime: time0,
SelectionDuration: time0,
CurrentTime: time0,
},
MovieExtend: &mp4io.MovieExtend{
Tracks: []*mp4io.TrackExtend{
{
TrackId: 1,
DefaultSampleDescIdx: 1,
DefaultSampleDuration: 40,
},
},
},
}
}
func TRAK() *mp4io.Track {
return &mp4io.Track{
// trak > tkhd
Header: &mp4io.TrackHeader{
TrackId: int32(1), // change me
Flags: 0x0007, // 7 ENABLED IN-MOVIE IN-PREVIEW
Duration: 0, // OK
Matrix: matrix,
CreateTime: time0,
ModifyTime: time0,
},
// trak > mdia
Media: &mp4io.Media{
// trak > mdia > mdhd
Header: &mp4io.MediaHeader{
TimeScale: 1000,
Duration: 0,
Language: 0x55C4,
CreateTime: time0,
ModifyTime: time0,
},
// trak > mdia > minf
Info: &mp4io.MediaInfo{
// trak > mdia > minf > dinf
Data: &mp4io.DataInfo{
Refer: &mp4io.DataRefer{
Url: &mp4io.DataReferUrl{
Flags: 0x000001, // self reference
},
},
},
// trak > mdia > minf > stbl
Sample: &mp4io.SampleTable{
SampleDesc: &mp4io.SampleDesc{},
TimeToSample: &mp4io.TimeToSample{},
SampleToChunk: &mp4io.SampleToChunk{},
SampleSize: &mp4io.SampleSize{},
ChunkOffset: &mp4io.ChunkOffset{},
},
},
},
}
}

107
pkg/mp4/consumer.go Normal file
View File

@@ -0,0 +1,107 @@
package mp4
import (
"encoding/json"
"fmt"
"github.com/AlexxIT/go2rtc/pkg/h264"
"github.com/AlexxIT/go2rtc/pkg/streamer"
"github.com/pion/rtp"
)
type Consumer struct {
streamer.Element
UserAgent string
RemoteAddr string
muxer *Muxer
codecs []*streamer.Codec
start bool
send int
}
func (c *Consumer) GetMedias() []*streamer.Media {
return []*streamer.Media{
{
Kind: streamer.KindVideo,
Direction: streamer.DirectionRecvonly,
Codecs: []*streamer.Codec{
{Name: streamer.CodecH264, ClockRate: 90000},
},
},
//{
// Kind: streamer.KindAudio,
// Direction: streamer.DirectionRecvonly,
// Codecs: []*streamer.Codec{
// {Name: streamer.CodecAAC, ClockRate: 16000},
// },
//},
}
}
func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *streamer.Track {
codec := track.Codec
switch codec.Name {
case streamer.CodecH264:
c.codecs = append(c.codecs, track.Codec)
push := func(packet *rtp.Packet) error {
if packet.Version != h264.RTPPacketVersionAVC {
return nil
}
switch h264.NALUType(packet.Payload) {
case h264.NALUTypeIFrame:
c.start = true
case h264.NALUTypePFrame:
if !c.start {
return nil
}
default:
return nil
}
buf := c.muxer.Marshal(packet)
c.send += len(buf)
c.Fire(buf)
return nil
}
if !h264.IsAVC(codec) {
wrapper := h264.RTPDepay(track)
push = wrapper(push)
}
return track.Bind(push)
}
fmt.Printf("[rtmp] unsupported codec: %+v\n", track.Codec)
return nil
}
func (c *Consumer) MimeType() string {
return c.muxer.MimeType(c.codecs)
}
func (c *Consumer) Init() []byte {
if c.muxer == nil {
c.muxer = &Muxer{}
}
return c.muxer.GetInit(c.codecs)
}
//
func (c *Consumer) MarshalJSON() ([]byte, error) {
v := map[string]interface{}{
"type": "MP4 server consumer",
"send": c.send,
"remote_addr": c.RemoteAddr,
"user_agent": c.UserAgent,
}
return json.Marshal(v)
}

View File

@@ -1,47 +0,0 @@
package mp4
import (
"errors"
"io"
)
type MemoryWriter struct {
buf []byte
pos int
}
func (m *MemoryWriter) Write(p []byte) (n int, err error) {
minCap := m.pos + len(p)
if minCap > cap(m.buf) { // Make sure buf has enough capacity:
buf2 := make([]byte, len(m.buf), minCap+len(p)) // add some extra
copy(buf2, m.buf)
m.buf = buf2
}
if minCap > len(m.buf) {
m.buf = m.buf[:minCap]
}
copy(m.buf[m.pos:], p)
m.pos += len(p)
return len(p), nil
}
func (m *MemoryWriter) Seek(offset int64, whence int) (int64, error) {
newPos, offs := 0, int(offset)
switch whence {
case io.SeekStart:
newPos = offs
case io.SeekCurrent:
newPos = m.pos + offs
case io.SeekEnd:
newPos = len(m.buf) + offs
}
if newPos < 0 {
return 0, errors.New("negative result pos")
}
m.pos = newPos
return int64(newPos), nil
}
func (m *MemoryWriter) Bytes() []byte {
return m.buf
}

View File

@@ -1,37 +1,155 @@
package mp4
import (
"github.com/deepch/vdk/av"
"encoding/binary"
"github.com/AlexxIT/go2rtc/pkg/h264"
"github.com/AlexxIT/go2rtc/pkg/streamer"
"github.com/deepch/vdk/codec/h264parser"
"github.com/deepch/vdk/format/mp4"
"time"
"github.com/deepch/vdk/format/fmp4/fmp4io"
"github.com/deepch/vdk/format/mp4/mp4io"
"github.com/deepch/vdk/format/mp4f/mp4fio"
"github.com/pion/rtp"
)
func MarshalMP4(sps, pps, frame []byte) []byte {
writer := &MemoryWriter{}
muxer := mp4.NewMuxer(writer)
stream, err := h264parser.NewCodecDataFromSPSAndPPS(sps, pps)
if err != nil {
panic(err)
}
if err = muxer.WriteHeader([]av.CodecData{stream}); err != nil {
panic(err)
}
pkt := av.Packet{
CompositionTime: time.Millisecond,
IsKeyFrame: true,
Duration: time.Second,
Data: frame,
}
if err = muxer.WritePacket(pkt); err != nil {
panic(err)
}
if err = muxer.WriteTrailer(); err != nil {
panic(err)
}
return writer.buf
type Muxer struct {
fragIndex uint32
dts uint64
pts uint32
data []byte
total int
}
func (m *Muxer) MimeType(codecs []*streamer.Codec) string {
s := `video/mp4; codecs="`
for _, codec := range codecs {
switch codec.Name {
case streamer.CodecH264:
s += "avc1." + h264.GetProfileLevelID(codec.FmtpLine)
}
}
return s + `"`
}
func (m *Muxer) GetInit(codecs []*streamer.Codec) []byte {
moov := MOOV()
for _, codec := range codecs {
switch codec.Name {
case streamer.CodecH264:
sps, pps := h264.GetParameterSet(codec.FmtpLine)
// TODO: remove
codecData, err := h264parser.NewCodecDataFromSPSAndPPS(sps, pps)
if err != nil {
return nil
}
width := codecData.Width()
height := codecData.Height()
trak := TRAK()
trak.Media.Header.TimeScale = int32(codec.ClockRate)
trak.Header.TrackWidth = float64(width)
trak.Header.TrackHeight = float64(height)
trak.Media.Info.Video = &mp4io.VideoMediaInfo{
Flags: 0x000001,
}
trak.Media.Info.Sample.SampleDesc.AVC1Desc = &mp4io.AVC1Desc{
DataRefIdx: 1,
HorizontalResolution: 72,
VorizontalResolution: 72,
Width: int16(width),
Height: int16(height),
FrameCount: 1,
Depth: 24,
ColorTableId: -1,
Conf: &mp4io.AVC1Conf{
Data: codecData.AVCDecoderConfRecordBytes(),
},
}
trak.Media.Handler = &mp4io.HandlerRefer{
SubType: [4]byte{'v', 'i', 'd', 'e'},
Name: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 'm', 'a', 'i', 'n', 0},
}
moov.Tracks = append(moov.Tracks, trak)
}
}
data := make([]byte, moov.Len())
moov.Marshal(data)
return append(FTYP(), data...)
}
func (m *Muxer) Rewind() {
m.dts = 0
m.pts = 0
}
func (m *Muxer) Marshal(packet *rtp.Packet) []byte {
trackID := uint8(1)
run := &mp4fio.TrackFragRun{
Flags: 0x000b05,
FirstSampleFlags: uint32(fmp4io.SampleNoDependencies),
DataOffset: 0,
Entries: []mp4io.TrackFragRunEntry{},
}
moof := &mp4fio.MovieFrag{
Header: &mp4fio.MovieFragHeader{
Seqnum: m.fragIndex + 1,
},
Tracks: []*mp4fio.TrackFrag{
{
Header: &mp4fio.TrackFragHeader{
Data: []byte{0x00, 0x02, 0x00, 0x20, 0x00, 0x00, 0x00, trackID, 0x01, 0x01, 0x00, 0x00},
},
DecodeTime: &mp4fio.TrackFragDecodeTime{
Version: 1,
Flags: 0,
Time: m.dts,
},
Run: run,
},
},
}
entry := mp4io.TrackFragRunEntry{
Duration: 90000,
Size: uint32(len(packet.Payload)),
}
newTime := packet.Timestamp
if m.pts > 0 {
m.dts += uint64(newTime - m.pts)
}
m.pts = newTime
// important before moof.Len()
run.Entries = append(run.Entries, entry)
moofLen := moof.Len()
mdatLen := 8 + len(packet.Payload)
// important after moof.Len()
run.DataOffset = uint32(moofLen + 8)
buf := make([]byte, moofLen+mdatLen)
moof.Marshal(buf)
binary.BigEndian.PutUint32(buf[moofLen:], uint32(mdatLen))
copy(buf[moofLen+4:], "mdat")
copy(buf[moofLen+8:], packet.Payload)
m.fragIndex++
m.total += moofLen + mdatLen
return buf
}

View File

@@ -1,131 +0,0 @@
package mse
import (
"encoding/json"
"github.com/AlexxIT/go2rtc/pkg/h264"
"github.com/AlexxIT/go2rtc/pkg/streamer"
"github.com/deepch/vdk/av"
"github.com/deepch/vdk/codec/h264parser"
"github.com/deepch/vdk/format/mp4f"
"github.com/pion/rtp"
"time"
)
const MsgTypeMSE = "mse"
type Consumer struct {
streamer.Element
UserAgent string
RemoteAddr string
muxer *mp4f.Muxer
streams []av.CodecData
start bool
send int
}
func (c *Consumer) GetMedias() []*streamer.Media {
return []*streamer.Media{
{
Kind: streamer.KindVideo,
Direction: streamer.DirectionRecvonly,
Codecs: []*streamer.Codec{
{Name: streamer.CodecH264, ClockRate: 90000},
},
}, {
Kind: streamer.KindAudio,
Direction: streamer.DirectionRecvonly,
Codecs: []*streamer.Codec{
{Name: streamer.CodecAAC, ClockRate: 16000},
},
},
}
}
func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *streamer.Track {
codec := track.Codec
switch codec.Name {
case streamer.CodecH264:
idx := int8(len(c.streams))
sps, pps := h264.GetParameterSet(codec.FmtpLine)
stream, err := h264parser.NewCodecDataFromSPSAndPPS(sps, pps)
if err != nil {
return nil
}
c.streams = append(c.streams, stream)
pkt := av.Packet{Idx: idx, CompositionTime: time.Millisecond}
ts2time := time.Second / time.Duration(codec.ClockRate)
push := func(packet *rtp.Packet) error {
if packet.Version != h264.RTPPacketVersionAVC {
return nil
}
switch h264.NALUType(packet.Payload) {
case h264.NALUTypeIFrame:
c.start = true
pkt.IsKeyFrame = true
case h264.NALUTypePFrame:
if !c.start {
return nil
}
default:
return nil
}
pkt.Data = packet.Payload
newTime := time.Duration(packet.Timestamp) * ts2time
if pkt.Time > 0 {
pkt.Duration = newTime - pkt.Time
}
pkt.Time = newTime
for _, buf := range c.muxer.WritePacketV5(pkt) {
c.send += len(buf)
c.Fire(buf)
}
return nil
}
if !h264.IsAVC(codec) {
wrapper := h264.RTPDepay(track)
push = wrapper(push)
}
return track.Bind(push)
}
panic("unsupported codec")
}
func (c *Consumer) Init() {
c.muxer = mp4f.NewMuxer(nil)
if err := c.muxer.WriteHeader(c.streams); err != nil {
return
}
codecs, buf := c.muxer.GetInit(c.streams)
c.Fire(&streamer.Message{Type: MsgTypeMSE, Value: codecs})
c.send += len(buf)
c.Fire(buf)
}
//
func (c *Consumer) MarshalJSON() ([]byte, error) {
v := map[string]interface{}{
"type": "MSE server consumer",
"send": c.send,
"remote_addr": c.RemoteAddr,
"user_agent": c.UserAgent,
}
return json.Marshal(v)
}

View File

@@ -5,6 +5,7 @@ import (
"github.com/pion/sdp/v3"
"strconv"
"strings"
"unicode"
)
const (
@@ -154,8 +155,8 @@ func (c *Codec) Clone() *Codec {
func (c *Codec) Match(codec *Codec) bool {
return c.Name == codec.Name &&
c.ClockRate == codec.ClockRate &&
c.Channels == codec.Channels
(c.ClockRate == codec.ClockRate || codec.ClockRate == 0) &&
(c.Channels == codec.Channels || codec.Channels == 0)
}
func UnmarshalSDP(rawSDP []byte) ([]*Media, error) {
@@ -242,7 +243,8 @@ func UnmarshalCodec(md *sdp.MediaDescription, payloadType string) *Codec {
ss := strings.Split(attr.Value[i+1:], "/")
c.Name = strings.ToUpper(ss[0])
c.ClockRate = uint32(atoi(ss[1]))
// fix tailing space: `a=rtpmap:96 H264/90000 `
c.ClockRate = uint32(atoi(strings.TrimRightFunc(ss[1], unicode.IsSpace)))
if len(ss) == 3 && ss[2] == "2" {
c.Channels = 2

View File

@@ -51,4 +51,5 @@ pc.ontrack = ev => {
## Useful links
- https://www.webrtc-experiment.com/DetectRTC/
- https://divtable.com/table-styler/
- https://divtable.com/table-styler/
- https://www.chromium.org/audio-video/

View File

@@ -66,7 +66,9 @@
const links = [
'<a href="webrtc.html?src={name}">webrtc</a>',
'<a href="mse.html?src={name}">mse</a>',
'<a href="api/frame.mp4?src={name}">frame.mp4</a>',
// '<a href="video.html?src={name}">video</a>',
'<a href="api/stream.mp4?src={name}">mp4</a>',
'<a href="api/frame.mp4?src={name}">frame</a>',
'<a href="api/streams?src={name}">info</a>',
];

View File

@@ -60,9 +60,7 @@
console.debug("ws.onmessage", data);
if (data.type === "mse") {
sourceBuffer = mediaSource.addSourceBuffer(
`video/mp4; codecs="${data.value}"`
);
sourceBuffer = mediaSource.addSourceBuffer(data.value);
// important: segments supports TrackFragDecodeTime
// sequence supports only TrackFragRunEntry Duration
sourceBuffer.mode = "segments";

53
www/video.html Normal file
View File

@@ -0,0 +1,53 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>go2rtc - WebRTC</title>
<style>
body {
margin: 0;
padding: 0;
}
html, body {
height: 100%;
width: 100%;
}
#video {
/* video "container" size */
width: 100%;
height: 100%;
background: black;
}
</style>
</head>
<body>
<video id="video" autoplay controls playsinline muted></video>
<!--<video id="video" preload="auto" controls playsinline muted></video>-->
<script>
const baseUrl = location.origin + location.pathname.substr(
0, location.pathname.lastIndexOf("/")
);
const video = document.getElementById('video');
video.oncanplay = ev => console.log(ev.type, ev);
video.onplaying = ev => console.log(ev.type, ev);
video.onwaiting = ev => console.log(ev.type, ev);
video.onseeking = ev => console.log(ev.type, ev);
video.onloadeddata = ev => console.log(ev.type, ev);
video.oncanplaythrough = ev => console.log(ev.type, ev);
// video.ondurationchange = ev => console.log(ev.type, ev);
// video.ontimeupdate = ev => console.log(ev.type, ev);
video.onplay = ev => console.log(ev.type, ev);
video.onpause = ev => console.log(ev.type, ev);
video.onsuspended = ev => console.log(ev.type, ev);
video.onemptied = ev => console.log(ev.type, ev);
video.onstalled = ev => console.log(ev.type, ev);
console.log("start");
video.src = baseUrl + "/api/stream.mp4" + location.search;
</script>
</body>
</html>