diff --git a/api.go b/api.go index 7660089..43eb89e 100644 --- a/api.go +++ b/api.go @@ -58,7 +58,7 @@ func (s *Server) api_Stream_AnnexB_(rw http.ResponseWriter, r *http.Request) { http.Error(rw, pkg.ErrNotFound.Error(), http.StatusNotFound) return } - _, err := publisher.VideoTrack.Ready.Await() + err := publisher.VideoTrack.WaitReady() if err != nil { http.Error(rw, err.Error(), http.StatusInternalServerError) return diff --git a/example/default/config.yaml b/example/default/config.yaml index 6c16af1..2531a9c 100644 --- a/example/default/config.yaml +++ b/example/default/config.yaml @@ -1,5 +1,5 @@ global: - loglevel: trace + loglevel: debug enableauth: true tcp: listenaddr: :50051 @@ -12,8 +12,12 @@ console: secret: de2c0bb9fd47684adc07a426e139239b logrotate: level: debug +rtsp: + subscribe: + subaudio: false rtmp: - chunksize: 2048 + tcp: + listenaddr: :11935 publish: # idletimeout: 10s # closedelaytimeout: 4s diff --git a/pkg/annexb.go b/pkg/annexb.go index 2495fa9..72ab688 100644 --- a/pkg/annexb.go +++ b/pkg/annexb.go @@ -1,7 +1,9 @@ package pkg import ( + "encoding/binary" "fmt" + "io" "time" "m7s.live/m7s/v5/pkg/codec" @@ -14,6 +16,13 @@ type AnnexB struct { util.RecyclableMemory } +func (a *AnnexB) Dump(t byte, w io.Writer) { + m := a.Borrow(4 + a.Size) + binary.BigEndian.PutUint32(m, uint32(a.Size)) + a.CopyTo(m[4:]) + w.Write(m) +} + // DecodeConfig implements pkg.IAVFrame. func (a *AnnexB) DecodeConfig(t *AVTrack, ctx ICodecCtx) error { switch c := ctx.(type) { diff --git a/pkg/avframe.go b/pkg/avframe.go index a3f5745..0005868 100644 --- a/pkg/avframe.go +++ b/pkg/avframe.go @@ -1,6 +1,7 @@ package pkg import ( + "io" "net" "sync" "time" @@ -38,6 +39,7 @@ type ( GetSize() int Recycle() String() string + Dump(byte, io.Writer) } Nalu = [][]byte diff --git a/pkg/codec/audio.go b/pkg/codec/audio.go index ed62a47..5f4417d 100644 --- a/pkg/codec/audio.go +++ b/pkg/codec/audio.go @@ -11,6 +11,10 @@ type ( OPUSCtx AudioCtx AACCtx struct { AudioCtx + Asc []byte + } + IAACCtx interface { + GetAACCtx() *AACCtx } ) @@ -26,6 +30,10 @@ func (ctx *AudioCtx) GetSampleSize() int { return ctx.SampleSize } +func (ctx *AACCtx) GetAACCtx() *AACCtx { + return ctx +} + func (*PCMUCtx) FourCC() FourCC { return FourCC_ULAW } diff --git a/pkg/codec/video.go b/pkg/codec/video.go index f24d9a6..3656b9d 100644 --- a/pkg/codec/video.go +++ b/pkg/codec/video.go @@ -25,6 +25,30 @@ func (f FourCC) String() string { return string(f[:]) } +func (f FourCC) Name() string { + switch f { + case FourCC_H264: + return "H264" + case FourCC_H265: + return "H265" + case FourCC_AV1: + return "AV1" + case FourCC_VP9: + return "VP9" + case FourCC_VP8: + return "VP8" + case FourCC_MP4A: + return "AAC" + case FourCC_OPUS: + return "OPUS" + case FourCC_ALAW: + return "PCMA" + case FourCC_ULAW: + return "PCMU" + } + return "" +} + func (f FourCC) Uint32() uint32 { return binary.BigEndian.Uint32(f[:]) } @@ -244,4 +268,4 @@ func (info *SPSInfo) Unmarshal(data []byte) (err error) { info.Height = ((2 - frame_mbs_only_flag) * info.MbHeight * 16) - info.CropTop*2 - info.CropBottom*2 return -} \ No newline at end of file +} diff --git a/pkg/config/types.go b/pkg/config/types.go index 000c1f4..1ad593a 100755 --- a/pkg/config/types.go +++ b/pkg/config/types.go @@ -36,6 +36,7 @@ type Publish struct { Speed float64 `default:"0" desc:"倍速"` // 倍速,0 为不限速 Key string `desc:"发布鉴权key"` // 发布鉴权key RingSize util.Range[int] `default:"20-1024" desc:"RingSize范围"` // 缓冲区大小范围 + Dump bool } func (c *Publish) GetPublishConfig() *Publish { @@ -45,14 +46,14 @@ func (c *Publish) GetPublishConfig() *Publish { type Subscribe struct { SubAudio bool `default:"true" desc:"是否订阅音频"` SubVideo bool `default:"true" desc:"是否订阅视频"` - BufferTime time.Duration `desc:"缓冲时长,submode=2时有效"` - SubMode int `desc:"订阅模式" enum:"0:实时模式,1:首屏后不进行追赶,2:从缓冲时长的关键帧开始播放"` // 0,实时模式:追赶发布者进度,在播放首屏后等待发布者的下一个关键帧,然后跳到该帧。1、首屏后不进行追赶。2、从缓冲最大的关键帧开始播放,也不追赶,需要发布者配置缓存长度 - SyncMode int `desc:"同步模式" enum:"0:采用时间戳同步,1:采用写入时间同步"` // 0,采用时间戳同步,1,采用写入时间同步 - IFrameOnly bool `desc:"只要关键帧"` // 只要关键帧 - WaitTimeout time.Duration `default:"10s" desc:"等待流超时时间"` // 等待流超时 - WriteBufferSize int `desc:"写缓冲大小"` // 写缓冲大小 - Key string `desc:"订阅鉴权key"` // 订阅鉴权key - Internal bool `default:"false" desc:"是否内部订阅"` // 是否内部订阅 + BufferTime time.Duration `desc:"缓冲时长,从缓冲时长的关键帧开始播放"` + SubMode int `desc:"订阅模式" enum:"0:实时模式,1:首屏后不进行追赶"` // 0,实时模式:追赶发布者进度,在播放首屏后等待发布者的下一个关键帧,然后跳到该帧。1、首屏后不进行追赶。2、从缓冲最大的关键帧开始播放,也不追赶,需要发布者配置缓存长度 + SyncMode int `desc:"同步模式" enum:"0:采用时间戳同步,1:采用写入时间同步"` // 0,采用时间戳同步,1,采用写入时间同步 + IFrameOnly bool `desc:"只要关键帧"` // 只要关键帧 + WaitTimeout time.Duration `default:"10s" desc:"等待流超时时间"` // 等待流超时 + WriteBufferSize int `desc:"写缓冲大小"` // 写缓冲大小 + Key string `desc:"订阅鉴权key"` // 订阅鉴权key + Internal bool `default:"false" desc:"是否内部订阅"` // 是否内部订阅 } func (c *Subscribe) GetSubscribeConfig() *Subscribe { diff --git a/pkg/ring-writer.go b/pkg/ring-writer.go index eb903dc..e4ec8a2 100644 --- a/pkg/ring-writer.go +++ b/pkg/ring-writer.go @@ -16,6 +16,7 @@ type RingWriter struct { SizeRange util.Range[int] pool *util.Ring[AVFrame] poolSize int + reduceVol int Size int LastValue *AVFrame SLogger *slog.Logger @@ -94,7 +95,19 @@ func (rb *RingWriter) Dispose() { func (rb *RingWriter) GetIDR() *util.Ring[AVFrame] { rb.RLock() defer rb.RUnlock() - return rb.IDRingList.Back().Value + if latest := rb.IDRingList.Back(); latest != nil { + return latest.Value + } + return nil +} + +func (rb *RingWriter) GetOldestIDR() *util.Ring[AVFrame] { + rb.RLock() + defer rb.RUnlock() + if latest := rb.IDRingList.Front(); latest != nil { + return latest.Value + } + return nil } func (rb *RingWriter) GetHistoryIDR(bufTime time.Duration) *util.Ring[AVFrame] { @@ -116,14 +129,18 @@ func (rb *RingWriter) CurrentBufferTime() time.Duration { return rb.BufferRange[1] } +func (rb *RingWriter) PushIDR() { + rb.Lock() + rb.IDRingList.PushBack(rb.Ring) + rb.Unlock() +} + func (rb *RingWriter) Step() (normal bool) { isIDR := rb.Value.IDR next := rb.Next() if isIDR { - rb.SLogger.Debug("add idr") - rb.Lock() - rb.IDRingList.PushBack(rb.Ring) - rb.Unlock() + rb.SLogger.Log(nil, TraceLevel, "add idr") + rb.PushIDR() } if rb.IDRingList.Len() > 0 { oldIDR := rb.IDRingList.Front() @@ -136,23 +153,31 @@ func (rb *RingWriter) Step() (normal bool) { } } else if next == oldIDR.Value { if nextOld := oldIDR.Next(); nextOld != nil && rb.durationFrom(nextOld.Value) > rb.BufferRange[0] { - rb.SLogger.Debug("remove old idr") + rb.SLogger.Log(nil, TraceLevel, "remove old idr") rb.Lock() rb.IDRingList.Remove(oldIDR) rb.Unlock() } else { - rb.SLogger.Debug("not enough buffer") + rb.SLogger.Log(nil, TraceLevel, "not enough buffer") rb.glow(5) next = rb.Next() } } else if rb.BufferRange[1] > rb.BufferRange[0] { for tmpP, reduceCount := rb.Next(), 0; reduceCount < 5; reduceCount++ { if tmpP == oldIDR.Value { + rb.reduceVol = 0 break } - if tmpP = tmpP.Next(); reduceCount == 4 && rb.Size > rb.SizeRange[0] { - rb.reduce(5) - next = rb.Next() + if tmpP = tmpP.Next(); reduceCount == 4 { + if rb.Size > rb.SizeRange[0]+5 { + if rb.reduceVol++; rb.reduceVol > 50 { + rb.reduce(5) + next = rb.Next() + rb.reduceVol = 0 + } + } else { + rb.reduceVol = 0 + } } } } diff --git a/pkg/track.go b/pkg/track.go index 8dc0738..c16c430 100644 --- a/pkg/track.go +++ b/pkg/track.go @@ -13,7 +13,7 @@ import ( type ( Track struct { *slog.Logger - Ready *util.Promise[struct{}] + ready *util.Promise[struct{}] FrameType reflect.Type bytesIn int frameCount int @@ -56,7 +56,7 @@ func NewAVTrack(args ...any) (t *AVTrack) { t.RingWriter.SLogger = t.Logger } } - t.Ready = util.NewPromise(struct{}{}) + t.ready = util.NewPromise(struct{}{}) t.Info("create") return } @@ -77,6 +77,21 @@ func (t *Track) AddBytesIn(n int) { } } +func (t *Track) Ready(err error) { + if !t.IsReady() { + t.ready.Fulfill(err) + } +} + +func (t *Track) IsReady() bool { + return !t.ready.IsPending() +} + +func (t *Track) WaitReady() error { + _, err := t.ready.Await() + return err +} + func (t *Track) Trace(msg string, fields ...any) { t.Log(context.TODO(), TraceLevel, msg, fields...) } diff --git a/pkg/util/allocator.go b/pkg/util/allocator.go index 16c138f..be0ff68 100644 --- a/pkg/util/allocator.go +++ b/pkg/util/allocator.go @@ -164,9 +164,18 @@ func (a *Allocator) Init(size int) { a.offsetTree = root } +func (a *Allocator) Find(size int) (offset int) { + block := a.findAvailableBlock(size) + if block == nil { + return -1 + } + offset = block.Start + return +} + func (a *Allocator) Allocate(size int) (offset int) { //a.history = append(a.history, History{Malloc: true, Size: size}) - block := a.findAvailableBlock(a.sizeTree, size) + block := a.findAvailableBlock(size) if block == nil { return -1 } @@ -182,8 +191,8 @@ func (a *Allocator) Allocate(size int) (offset int) { return } -func (a *Allocator) findAvailableBlock(block *Block, size int) *Block { - var lastAvailableBlock *Block +func (a *Allocator) findAvailableBlock(size int) (lastAvailableBlock *Block) { + block := a.sizeTree for block != nil { if bSize := block.End - block.Start; bSize == size { return block @@ -194,7 +203,7 @@ func (a *Allocator) findAvailableBlock(block *Block, size int) *Block { block = tree.right } } - return lastAvailableBlock + return } func (a *Allocator) getBlock(start, end int) *Block { diff --git a/pkg/util/auth.go b/pkg/util/auth.go new file mode 100644 index 0000000..b99f9e6 --- /dev/null +++ b/pkg/util/auth.go @@ -0,0 +1,136 @@ +package util + +import ( + "crypto/md5" + "encoding/base64" + "encoding/hex" + "fmt" + "net/url" + "strings" +) + +type Auth struct { + Method byte + user string + pass string + header string + h1nonce string +} + +const ( + AuthNone byte = iota + AuthUnknown + AuthBasic + AuthDigest + AuthTPLink // https://drmnsamoliu.github.io/video.html +) + +func NewAuth(user *url.Userinfo) *Auth { + a := new(Auth) + a.user = user.Username() + a.pass, _ = user.Password() + if a.user != "" { + a.Method = AuthUnknown + } + return a +} + +func (a *Auth) Read(res *Response) bool { + auth := res.Header.Get("WWW-Authenticate") + if len(auth) < 6 { + return false + } + + switch auth[:6] { + case "Basic ": + a.header = "Basic " + B64(a.user, a.pass) + a.Method = AuthBasic + return true + case "Digest": + realm := Between(auth, `realm="`, `"`) + nonce := Between(auth, `nonce="`, `"`) + + a.h1nonce = HexMD5(a.user, realm, a.pass) + ":" + nonce + a.header = fmt.Sprintf( + `Digest username="%s", realm="%s", nonce="%s"`, + a.user, realm, nonce, + ) + a.Method = AuthDigest + return true + default: + return false + } +} + +func (a *Auth) Write(req *Request) { + if a == nil { + return + } + + switch a.Method { + case AuthBasic: + req.Header.Set("Authorization", a.header) + case AuthDigest: + // important to use String except RequestURL for RtspServer: + // https://github.com/AlexxIT/go2rtc/issues/244 + uri := req.URL.String() + h2 := HexMD5(req.Method, uri) + response := HexMD5(a.h1nonce, h2) + header := a.header + fmt.Sprintf( + `, uri="%s", response="%s"`, uri, response, + ) + req.Header.Set("Authorization", header) + case AuthTPLink: + req.URL.Host = "127.0.0.1" + } +} + +func (a *Auth) Validate(req *Request) bool { + if a == nil { + return true + } + + header := req.Header.Get("Authorization") + if header == "" { + return false + } + + if a.Method == AuthUnknown { + a.Method = AuthBasic + a.header = "Basic " + B64(a.user, a.pass) + } + + return header == a.header +} + +func (a *Auth) ReadNone(res *Response) bool { + auth := res.Header.Get("WWW-Authenticate") + if strings.Contains(auth, "TP-LINK Streaming Media") { + a.Method = AuthTPLink + return true + } + return false +} + +func Between(s, sub1, sub2 string) string { + i := strings.Index(s, sub1) + if i < 0 { + return "" + } + s = s[i+len(sub1):] + i = strings.Index(s, sub2) + if i < 0 { + return "" + } + return s[:i] +} + +func HexMD5(s ...string) string { + b := md5.Sum([]byte(strings.Join(s, ":"))) + return hex.EncodeToString(b[:]) +} + +func B64(s ...string) string { + b := []byte(strings.Join(s, ":")) + return base64.StdEncoding.EncodeToString(b) +} diff --git a/pkg/util/buf-reader.go b/pkg/util/buf-reader.go index 25a9466..cd2c7d7 100644 --- a/pkg/util/buf-reader.go +++ b/pkg/util/buf-reader.go @@ -2,6 +2,8 @@ package util import ( "io" + "net/textproto" + "strings" ) const defaultBufSize = 1 << 14 @@ -34,6 +36,10 @@ func (r *BufReader) Recycle() { r.Allocator.Recycle() } +func (r *BufReader) Buffered() int { + return r.buf.Length +} + func (r *BufReader) Peek(n int) (buf []byte, err error) { defer func(snap MemoryReader) { l := r.buf.Length + n @@ -52,19 +58,14 @@ func (r *BufReader) Peek(n int) (buf []byte, err error) { } func (r *BufReader) eat() error { - buf := r.Allocator.Malloc(r.BufLen) - if n, err := r.reader.Read(buf); err != nil { - r.Allocator.Free(buf) + buf, err := r.Allocator.Read(r.reader, r.BufLen) + if err != nil { return err - } else { - if n < r.BufLen { - r.Allocator.Free(buf[n:]) - buf = buf[:n] - } - r.buf.Buffers = append(r.buf.Buffers, buf) - r.buf.Size += n - r.buf.Length += n } + n := len(buf) + r.buf.Buffers = append(r.buf.Buffers, buf) + r.buf.Size += n + r.buf.Length += n return nil } @@ -139,12 +140,14 @@ func (r *BufReader) ReadNto(n int, to []byte) (err error) { l += ll }) } + func (r *BufReader) ReadString(n int) (s string, err error) { err = r.ReadRange(n, func(buf []byte) { s += string(buf) }) return } + func (r *BufReader) ReadBytes(n int) (mem Memory, err error) { err = r.ReadRange(n, func(buf []byte) { mem.Buffers = append(mem.Buffers, buf) @@ -156,3 +159,45 @@ func (r *BufReader) ReadBytes(n int) (mem Memory, err error) { func (r *BufReader) recycleFront() { r.buf.ClipFront(r.Allocator.Free) } + +func (r *BufReader) ReadLine() (line string, err error) { + var lastb, curb byte + snap, i := r.buf, 0 + for { + if curb, err = r.ReadByte(); err != nil { + return "", err + } else { + i++ + if l := r.buf.Length; curb == '\n' { + snap.Length = l + i + r.buf = snap + err = r.ReadRange(i, func(buf []byte) { + line = line + string(buf) + }) + if lastb == '\r' { + line = line[:i-2] + } else { + line = line[:i-1] + } + return + } + lastb = curb + } + } +} + +func (r *BufReader) ReadMIMEHeader() (textproto.MIMEHeader, error) { + result := make(textproto.MIMEHeader) + for { + l, err := r.ReadLine() + if err != nil { + return nil, err + } + if l == "" { + break + } + key, value, _ := strings.Cut(l, ":") + result.Add(key, strings.Trim(value, " ")) + } + return result, nil +} diff --git a/pkg/util/mem.go b/pkg/util/mem.go index 35e0a18..c884301 100644 --- a/pkg/util/mem.go +++ b/pkg/util/mem.go @@ -3,6 +3,7 @@ package util import ( "container/list" "fmt" + "io" "slices" "sync" "unsafe" @@ -63,6 +64,13 @@ func (ma *MemoryAllocator) Recycle() { lock.Unlock() } +func (ma *MemoryAllocator) Find(size int) (memory []byte) { + if offset := ma.allocator.Find(size); offset != -1 { + memory = ma.memory[offset : offset+size] + } + return +} + func (ma *MemoryAllocator) Malloc(size int) (memory []byte) { if offset := ma.allocator.Allocate(size); offset != -1 { memory = ma.memory[offset : offset+size] @@ -78,10 +86,10 @@ func (ma *MemoryAllocator) free(start, size int) (ret bool) { return true } -func (ma *MemoryAllocator) Free(mem []byte) bool { - start := int(int64(uintptr(unsafe.Pointer(&mem[0]))) - ma.start) - return ma.free(start, len(mem)) -} +//func (ma *MemoryAllocator) Free(mem []byte) bool { +// start := int(int64(uintptr(unsafe.Pointer(&mem[0]))) - ma.start) +// return ma.free(start, len(mem)) +//} func (ma *MemoryAllocator) GetBlocks() (blocks []*Block) { return ma.allocator.GetBlocks() @@ -139,10 +147,35 @@ func (sma *ScalableMemoryAllocator) Recycle() { } } -func (sma *ScalableMemoryAllocator) Malloc(size int) (memory []byte) { +// Borrow = Malloc + Free = Find, must use the memory at once +func (sma *ScalableMemoryAllocator) Borrow(size int) (memory []byte) { if sma == nil || size > MaxBlockSize { return } + defer sma.addMallocCount(size) + var child *MemoryAllocator + for _, child = range sma.children { + if memory = child.Find(size); memory != nil { + return + } + } + for sma.childSize < MaxBlockSize { + sma.childSize = sma.childSize << 1 + if sma.childSize >= size { + break + } + } + child = GetMemoryAllocator(sma.childSize) + sma.size += child.Size + memory = child.Find(size) + sma.children = append(sma.children, child) + return +} + +func (sma *ScalableMemoryAllocator) Malloc(size int) (memory []byte) { + if sma == nil || size > MaxBlockSize { + return make([]byte, size) + } if EnableCheckSize { defer sma.checkSize() } @@ -170,6 +203,27 @@ func (sma *ScalableMemoryAllocator) GetScalableMemoryAllocator() *ScalableMemory return sma } +func (sma *ScalableMemoryAllocator) Read(reader io.Reader, n int) (mem []byte, err error) { + mem = sma.Malloc(n) + meml := n + if n, err = reader.Read(mem); err == nil { + if n < meml { + sma.Free(mem[n:]) + mem = mem[:n] + } + } else { + sma.Free(mem) + } + return +} + +func (sma *ScalableMemoryAllocator) FreeRest(mem *[]byte, keep int) { + if keep < len(*mem) { + sma.Free((*mem)[keep:]) + *mem = (*mem)[:keep] + } +} + func (sma *ScalableMemoryAllocator) Free(mem []byte) bool { if sma == nil { return false diff --git a/pkg/util/textproto.go b/pkg/util/textproto.go new file mode 100644 index 0000000..2a161d2 --- /dev/null +++ b/pkg/util/textproto.go @@ -0,0 +1,145 @@ +package util + +import ( + "errors" + "fmt" + "io" + "net/textproto" + "net/url" + "strconv" + "strings" +) + +const EndLine = "\r\n" + +// Response like http.Response, but with any proto +type Response struct { + Status string + StatusCode int + Proto string + Header textproto.MIMEHeader + Body []byte + Request *Request +} + +func (r Response) String() string { + s := r.Proto + " " + r.Status + EndLine + for k, v := range r.Header { + s += k + ": " + v[0] + EndLine + } + s += EndLine + if r.Body != nil { + s += string(r.Body) + } + return s +} + +func (r *Response) Write(w io.Writer) (err error) { + _, err = w.Write([]byte(r.String())) + return +} + +func ReadResponse(r *BufReader) (*Response, error) { + line, err := r.ReadLine() + if err != nil { + return nil, err + } + if line == "" { + return nil, errors.New("empty response on RTSP request") + } + + ss := strings.SplitN(line, " ", 3) + if len(ss) != 3 { + return nil, fmt.Errorf("malformed response: %s", line) + } + + res := &Response{ + Status: ss[1] + " " + ss[2], + Proto: ss[0], + } + + res.StatusCode, err = strconv.Atoi(ss[1]) + if err != nil { + return nil, err + } + + res.Header, err = r.ReadMIMEHeader() + if err != nil { + return nil, err + } + + if val := res.Header.Get("Content-Length"); val != "" { + var i int + i, err = strconv.Atoi(val) + res.Body = make([]byte, i) + if err = r.ReadNto(i, res.Body); err != nil { + return nil, err + } + } + + return res, nil +} + +// Request like http.Request, but with any proto +type Request struct { + Method string + URL *url.URL + Proto string + Header textproto.MIMEHeader + Body []byte +} + +func (r *Request) String() string { + s := r.Method + " " + r.URL.String() + " " + r.Proto + EndLine + for k, v := range r.Header { + s += k + ": " + v[0] + EndLine + } + s += EndLine + if r.Body != nil { + s += string(r.Body) + } + return s +} + +func (r *Request) Write(w io.Writer) (err error) { + _, err = w.Write([]byte(r.String())) + return +} + +func ReadRequest(r *BufReader) (*Request, error) { + line, err := r.ReadLine() + if err != nil { + return nil, err + } + + ss := strings.SplitN(line, " ", 3) + if len(ss) != 3 { + return nil, fmt.Errorf("wrong request: %s", line) + } + + req := &Request{ + Method: ss[0], + Proto: ss[2], + } + + req.URL, err = url.Parse(ss[1]) + if err != nil { + return nil, err + } + + req.Header, err = r.ReadMIMEHeader() + if err != nil { + return nil, err + } + + if val := req.Header.Get("Content-Length"); val != "" { + var i int + i, err = strconv.Atoi(val) + req.Body = make([]byte, i) + if err = r.ReadNto(i, req.Body); err != nil { + return nil, err + } + } + + return req, nil +} diff --git a/plugin.go b/plugin.go index e088fc3..1d3ec29 100644 --- a/plugin.go +++ b/plugin.go @@ -337,6 +337,9 @@ func (p *Plugin) Subscribe(streamPath string, options ...any) (subscriber *Subsc } } subscriber.Init(p, streamPath, &subscriber.Subscribe, options...) + if subscriber.Subscribe.BufferTime > 0 { + subscriber.Subscribe.SubMode = SUBMODE_BUFFER + } _, err = p.server.Call(subscriber) return } diff --git a/plugin/rtmp/index.go b/plugin/rtmp/index.go index a9d90ef..a095b29 100644 --- a/plugin/rtmp/index.go +++ b/plugin/rtmp/index.go @@ -162,7 +162,7 @@ func (p *RTMPPlugin) OnTCPConnect(conn *net.TCPConn) { StreamID: cmd.StreamId, }, } - receiver.Publisher, err = p.Publish(nc.AppName+"/"+cmd.PublishingName, p.Context, conn, connectInfo) + receiver.Publisher, err = p.Publish(nc.AppName+"/"+cmd.PublishingName, conn, connectInfo) if err != nil { delete(receivers, cmd.StreamId) err = receiver.Response(cmd.TransactionId, NetStream_Publish_BadName, Level_Error) @@ -182,7 +182,7 @@ func (p *RTMPPlugin) OnTCPConnect(conn *net.TCPConn) { } var suber *m7s.Subscriber // sender.ID = fmt.Sprintf("%s|%d", conn.RemoteAddr().String(), sender.StreamID) - suber, err = p.Subscribe(streamPath, p.Context, conn, connectInfo) + suber, err = p.Subscribe(streamPath, conn, connectInfo) if err != nil { err = ns.Response(cmd.TransactionId, NetStream_Play_Failed, Level_Error) } else { diff --git a/plugin/rtmp/pkg/audio.go b/plugin/rtmp/pkg/audio.go index 7487491..ccc6dcc 100644 --- a/plugin/rtmp/pkg/audio.go +++ b/plugin/rtmp/pkg/audio.go @@ -51,6 +51,7 @@ func (avcc *RTMPAudio) Parse(t *AVTrack) (isIDR, isSeq bool, raw any, err error) } var cloneFrame RTMPAudio cloneFrame.CopyFrom(&avcc.Memory) + ctx.Asc = cloneFrame.Buffers[0] ctx.AudioObjectType = b0 >> 3 ctx.SamplingFrequencyIndex = (b0 & 0x07 << 1) | (b1 >> 7) ctx.ChannelConfiguration = (b1 >> 3) & 0x0F diff --git a/plugin/rtmp/pkg/const.go b/plugin/rtmp/pkg/const.go index 2a5a608..caa8570 100644 --- a/plugin/rtmp/pkg/const.go +++ b/plugin/rtmp/pkg/const.go @@ -1,7 +1,9 @@ package rtmp import ( + "encoding/binary" "fmt" + "io" "time" "m7s.live/m7s/v5/pkg/util" @@ -21,6 +23,15 @@ type RTMPData struct { util.RecyclableMemory } +func (avcc *RTMPData) Dump(t byte, w io.Writer) { + m := avcc.Borrow(9 + avcc.Size) + m[0] = t + binary.BigEndian.PutUint32(m[1:], uint32(4+avcc.Size)) + binary.BigEndian.PutUint32(m[5:], avcc.Timestamp) + avcc.CopyTo(m[9:]) + w.Write(m) +} + func (avcc *RTMPData) GetSize() int { return avcc.Size } diff --git a/plugin/rtmp/pkg/net-connection.go b/plugin/rtmp/pkg/net-connection.go index 4c96db4..86e64fd 100644 --- a/plugin/rtmp/pkg/net-connection.go +++ b/plugin/rtmp/pkg/net-connection.go @@ -127,7 +127,7 @@ func (conn *NetConnection) readChunk() (msg *Chunk, err error) { if err != nil { return nil, errors.New("get chunk stream id error :" + err.Error()) } - // println("ChunkStreamID:", ChunkStreamID, "ChunkType:", ChunkType) + //println("ChunkStreamID:", ChunkStreamID, "ChunkType:", ChunkType) chunk, ok := conn.incommingChunks[ChunkStreamID] if ChunkType != 3 && ok && chunk.bufLen > 0 { diff --git a/plugin/rtp/pkg/audio.go b/plugin/rtp/pkg/audio.go index b2d2c46..490bf83 100644 --- a/plugin/rtp/pkg/audio.go +++ b/plugin/rtp/pkg/audio.go @@ -1,7 +1,11 @@ package rtp import ( + "encoding/base64" + "encoding/binary" + "encoding/hex" "fmt" + "io" "time" "unsafe" @@ -18,6 +22,21 @@ type RTPData struct { util.RecyclableMemory } +func (r *RTPData) Dump(t byte, w io.Writer) { + m := r.Borrow(3 + len(r.Packets)*2 + r.GetSize()) + m[0] = t + binary.BigEndian.PutUint16(m[1:], uint16(len(r.Packets))) + offset := 3 + for _, p := range r.Packets { + size := p.MarshalSize() + binary.BigEndian.PutUint16(m[offset:], uint16(size)) + offset += 2 + p.MarshalTo(m[offset:]) + offset += size + } + w.Write(m) +} + func (r *RTPData) String() (s string) { for _, p := range r.Packets { s += fmt.Sprintf("t: %d, s: %d, p: %02X %d\n", p.Timestamp, p.SequenceNumber, p.Payload[0:2], len(p.Payload)) @@ -59,16 +78,16 @@ type ( codec.AACCtx } IRTPCtx interface { - GetRTPCodecCapability() webrtc.RTPCodecCapability + GetRTPCodecParameter() webrtc.RTPCodecParameters } ) func (r *RTPCtx) GetInfo() string { - return r.GetRTPCodecCapability().SDPFmtpLine + return r.GetRTPCodecParameter().SDPFmtpLine } -func (r *RTPCtx) GetRTPCodecCapability() webrtc.RTPCodecCapability { - return r.RTPCodecCapability +func (r *RTPCtx) GetRTPCodecParameter() webrtc.RTPCodecParameters { + return r.RTPCodecParameters } func (r *RTPCtx) GetSequenceFrame() IAVFrame { @@ -84,7 +103,7 @@ func (r *RTPData) DecodeConfig(t *AVTrack, from ICodecCtx) (err error) { ctx.MimeType = webrtc.MimeTypeH264 ctx.ClockRate = 90000 spsInfo := ctx.SPSInfo - ctx.SDPFmtpLine = fmt.Sprintf("profile-level-id=%02x%02x%02x;level-asymmetry-allowed=1;packetization-mode=1", spsInfo.ProfileIdc, spsInfo.ConstraintSetFlag, spsInfo.LevelIdc) + ctx.SDPFmtpLine = fmt.Sprintf("sprop-parameter-sets=%s,%s;profile-level-id=%02x%02x%02x;level-asymmetry-allowed=1;packetization-mode=1", base64.StdEncoding.EncodeToString(ctx.SPS[0]), base64.StdEncoding.EncodeToString(ctx.PPS[0]), spsInfo.ProfileIdc, spsInfo.ConstraintSetFlag, spsInfo.LevelIdc) ctx.SSRC = uint32(uintptr(unsafe.Pointer(&ctx))) t.ICodecCtx = &ctx case codec.IH265Ctx: @@ -92,9 +111,19 @@ func (r *RTPData) DecodeConfig(t *AVTrack, from ICodecCtx) (err error) { ctx.H265Ctx = *c.GetH265Ctx() ctx.PayloadType = 98 ctx.MimeType = webrtc.MimeTypeH265 + ctx.SDPFmtpLine = fmt.Sprintf("profile-id=1;sprop-sps=%s;sprop-pps=%s;sprop-vps=%s", base64.StdEncoding.EncodeToString(ctx.SPS[0]), base64.StdEncoding.EncodeToString(ctx.PPS[0]), base64.StdEncoding.EncodeToString(ctx.VPS[0])) ctx.ClockRate = 90000 ctx.SSRC = uint32(uintptr(unsafe.Pointer(&ctx))) t.ICodecCtx = &ctx + case codec.IAACCtx: + var ctx RTPAACCtx + ctx.SSRC = uint32(uintptr(unsafe.Pointer(&ctx))) + ctx.AACCtx = *c.GetAACCtx() + ctx.MimeType = "audio/MPEG4-GENERIC" + ctx.SDPFmtpLine = fmt.Sprintf("profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3; config=%s", hex.EncodeToString(ctx.AACCtx.Asc)) + ctx.PayloadType = 97 + ctx.ClockRate = uint32(ctx.SampleRate) + t.ICodecCtx = &ctx } return } diff --git a/plugin/rtp/pkg/video.go b/plugin/rtp/pkg/video.go index 025e157..c8539a7 100644 --- a/plugin/rtp/pkg/video.go +++ b/plugin/rtp/pkg/video.go @@ -44,6 +44,12 @@ var ( spropReg = regexp.MustCompile(`sprop-parameter-sets=(.+),([^;]+)(;|$)`) ) +const ( + startBit = 1 << 7 + endBit = 1 << 6 + MTUSize = 1460 +) + func (r *RTPVideo) Parse(t *AVTrack) (isIDR, isSeq bool, raw any, err error) { switch r.MimeType { case webrtc.MimeTypeH264: @@ -149,7 +155,7 @@ func (h265 *RTPH265Ctx) GetInfo() string { func (h264 *RTPH264Ctx) CreateFrame(from *AVFrame) (frame IAVFrame, err error) { var r RTPVideo - r.ScalableMemoryAllocator = from.Wraps[0].GetScalableMemoryAllocator() + //r.ScalableMemoryAllocator = from.Wraps[0].GetScalableMemoryAllocator() nalus := from.Raw.(Nalus) nalutype := nalus.H264Type() var lastPacket *rtp.Packet @@ -161,7 +167,7 @@ func (h264 *RTPH264Ctx) CreateFrame(from *AVFrame) (frame IAVFrame, err error) { SequenceNumber: h264.SequenceNumber, Timestamp: uint32(nalus.PTS), SSRC: h264.SSRC, - PayloadType: 96, + PayloadType: uint8(h264.PayloadType), }, Payload: payload, } @@ -172,21 +178,18 @@ func (h264 *RTPH264Ctx) CreateFrame(from *AVFrame) (frame IAVFrame, err error) { } for _, nalu := range nalus.Nalus { reader := util.NewReadableBuffersFromBytes(nalu...) - if startIndex := len(r.Packets); reader.Length > 1460 { + if startIndex := len(r.Packets); reader.Length > MTUSize { //fu-a for reader.Length > 0 { - mem := r.NextN(1460) + mem := r.Malloc(MTUSize) n := reader.ReadBytesTo(mem[1:]) mem[0] = codec.NALU_FUA.Or(mem[1] & 0x60) - if n < 1459 { - r.Free(mem[n+1:]) - mem = mem[:n+1] - } - r.UpdateBuffer(-1, mem) + r.FreeRest(&mem, n+1) + r.AddRecycleBytes(mem) r.Packets = append(r.Packets, createPacket(mem)) } - r.Packets[startIndex].Payload[1] |= 1 << 7 // set start bit - lastPacket.Payload[1] |= 1 << 6 // set end bit + r.Packets[startIndex].Payload[1] |= startBit + lastPacket.Payload[1] |= endBit } else { mem := r.NextN(reader.Length) reader.ReadBytesTo(mem) diff --git a/plugin/rtsp/index.go b/plugin/rtsp/index.go index d0fc4b5..e7367d1 100644 --- a/plugin/rtsp/index.go +++ b/plugin/rtsp/index.go @@ -3,8 +3,8 @@ package plugin_rtsp import ( "encoding/binary" "errors" + "fmt" "github.com/AlexxIT/go2rtc/pkg/core" - "github.com/AlexxIT/go2rtc/pkg/tcp" "github.com/pion/rtcp" "github.com/pion/rtp" "github.com/pion/webrtc/v4" @@ -13,6 +13,8 @@ import ( mrtp "m7s.live/m7s/v5/plugin/rtp/pkg" . "m7s.live/m7s/v5/plugin/rtsp/pkg" "net" + "net/http" + "reflect" "runtime/debug" "strconv" "strings" @@ -31,6 +33,7 @@ type RTSPPlugin struct { func (p *RTSPPlugin) OnTCPConnect(conn *net.TCPConn) { logger := p.Logger.With("remote", conn.RemoteAddr().String()) var receiver *Receiver + var sender *Sender var err error nc := NewNetConnection(conn, logger) defer func() { @@ -43,7 +46,7 @@ func (p *RTSPPlugin) OnTCPConnect(conn *net.TCPConn) { receiver.Dispose(err) } }() - var req *tcp.Request + var req *util.Request var timeout time.Duration var sendMode bool mem := util.NewScalableMemoryAllocator(1 << 12) @@ -77,7 +80,7 @@ func (p *RTSPPlugin) OnTCPConnect(conn *net.TCPConn) { // Sender: OPTIONS > ANNOUNCE > SETUP... > RECORD > TEARDOWN switch req.Method { case MethodOptions: - res := &tcp.Response{ + res := &util.Response{ Header: map[string][]string{ "Public": {"OPTIONS, SETUP, TEARDOWN, DESCRIBE, PLAY, PAUSE, ANNOUNCE, RECORD"}, }, @@ -99,13 +102,11 @@ func (p *RTSPPlugin) OnTCPConnect(conn *net.TCPConn) { return } - receiver = &Receiver{ - NetConnection: nc, - } - + receiver = &Receiver{} + receiver.NetConnection = nc if receiver.Publisher, err = p.Publish(strings.TrimPrefix(nc.URL.Path, "/")); err != nil { receiver = nil - err = nc.WriteResponse(&tcp.Response{ + err = nc.WriteResponse(&util.Response{ StatusCode: 500, Status: err.Error(), }) return @@ -141,7 +142,7 @@ func (p *RTSPPlugin) OnTCPConnect(conn *net.TCPConn) { timeout = time.Second * 15 - res := &tcp.Response{Request: req} + res := &util.Response{Request: req} if err = nc.WriteResponse(res); err != nil { return } @@ -150,33 +151,73 @@ func (p *RTSPPlugin) OnTCPConnect(conn *net.TCPConn) { sendMode = true timeout = time.Second * 60 - //if c.Senders == nil { - // res := &tcp.Response{ - // Status: "404 Not Found", - // Request: req, - // } - // return c.WriteResponse(res) - //} - - res := &tcp.Response{ + var subscriber *m7s.Subscriber + subscriber, err = p.Subscribe(strings.TrimPrefix(nc.URL.Path, "/"), conn) + if err != nil { + res := &util.Response{ + StatusCode: http.StatusBadRequest, + Status: err.Error(), + Request: req, + } + _ = nc.WriteResponse(res) + return + } + res := &util.Response{ Header: map[string][]string{ "Content-Type": {"application/sdp"}, }, Request: req, } - + sender = &Sender{ + Subscriber: subscriber, + } + sender.NetConnection = nc // convert tracks to real output medias var medias []*core.Media + if subscriber.SubAudio && subscriber.Publisher.PubAudio { + audioTrack := subscriber.Publisher.GetAudioTrack(reflect.TypeOf((*mrtp.RTPAudio)(nil))) + if err = audioTrack.WaitReady(); err != nil { + return + } + parameter := audioTrack.ICodecCtx.(mrtp.IRTPCtx).GetRTPCodecParameter() + media := &core.Media{ + Kind: "audio", + Direction: core.DirectionRecvonly, + Codecs: []*core.Codec{{ + Name: parameter.MimeType[6:], + ClockRate: parameter.ClockRate, + Channels: parameter.Channels, + FmtpLine: parameter.SDPFmtpLine, + PayloadType: uint8(parameter.PayloadType), + }}, + ID: fmt.Sprintf("trackID=%d", len(medias)), + } + medias = append(medias, media) + sender.AudioChannelID = 0 + } - //for i, track := range c.Senders { - // media := &core.Media{ - // Kind: core.GetKind(track.Codec.Name), - // Direction: core.DirectionRecvonly, - // Codecs: []*core.Codec{track.Codec}, - // ID: "trackID=" + strconv.Itoa(i), - // } - // medias = append(medias, media) - //} + if subscriber.SubVideo && subscriber.Publisher.PubVideo { + videoTrack := subscriber.Publisher.GetVideoTrack(reflect.TypeOf((*mrtp.RTPVideo)(nil))) + if err = videoTrack.WaitReady(); err != nil { + return + } + parameter := videoTrack.ICodecCtx.(mrtp.IRTPCtx).GetRTPCodecParameter() + c := core.Codec{ + Name: parameter.MimeType[6:], + ClockRate: parameter.ClockRate, + Channels: parameter.Channels, + FmtpLine: parameter.SDPFmtpLine, + PayloadType: uint8(parameter.PayloadType), + } + media := &core.Media{ + Kind: "video", + Direction: core.DirectionRecvonly, + Codecs: []*core.Codec{&c}, + ID: fmt.Sprintf("trackID=%d", len(medias)), + } + sender.VideoChannelID = byte(len(medias)) << 1 + medias = append(medias, media) + } res.Body, err = core.MarshalSDP(nc.SessionName, medias) if err != nil { @@ -192,7 +233,7 @@ func (p *RTSPPlugin) OnTCPConnect(conn *net.TCPConn) { case MethodSetup: tr := req.Header.Get("Transport") - res := &tcp.Response{ + res := &util.Response{ Header: map[string][]string{}, Request: req, } @@ -202,14 +243,12 @@ func (p *RTSPPlugin) OnTCPConnect(conn *net.TCPConn) { nc.Session = core.RandString(8, 10) if sendMode { - //if i := reqTrackID(req); i >= 0 && i < len(c.Senders) { - // // mark sender as SETUP - // c.Senders[i].Media.ID = MethodSetup - // tr = fmt.Sprintf("RTP/AVP/TCP;unicast;interleaved=%d-%d", i*2, i*2+1) - // res.Header.Set("Transport", tr) - //} else { - // res.Status = "400 Bad Request" - //} + if i := reqTrackID(req); i >= 0 { + tr = fmt.Sprintf("RTP/AVP/TCP;unicast;interleaved=%d-%d", i*2, i*2+1) + res.Header.Set("Transport", tr) + } else { + res.Status = "400 Bad Request" + } } else { res.Header.Set("Transport", tr[:len(transport)+3]) } @@ -222,23 +261,45 @@ func (p *RTSPPlugin) OnTCPConnect(conn *net.TCPConn) { } case MethodRecord, MethodPlay: + res := &util.Response{Request: req} + err = nc.WriteResponse(res) + var audioFrame *mrtp.RTPAudio + var videoFrame *mrtp.RTPVideo if sendMode { - // stop unconfigured senders - //for _, track := range c.Senders { - // if track.Media.ID != MethodSetup { - // track.Close() - // } - //} + go func() { + mem := util.NewScalableMemoryAllocator(1 << 11) + sendRTP := func(pack *mrtp.RTPData, channel byte) (err error) { + nc.StartWrite() + defer nc.StopWrite() + for _, packet := range pack.Packets { + size := packet.MarshalSize() + chunk := mem.Borrow(size + 4) + chunk[0], chunk[1], chunk[2], chunk[3] = '$', channel, byte(size>>8), byte(size) + if _, err = packet.MarshalTo(chunk[4:]); err != nil { + return + } + if _, err = nc.Write(chunk); err != nil { + return + } + } + return + } + m7s.PlayBlock(sender.Subscriber, func(audio *mrtp.RTPAudio) error { + return sendRTP(&audio.RTPData, sender.AudioChannelID) + }, func(video *mrtp.RTPVideo) error { + return sendRTP(&video.RTPData, sender.VideoChannelID) + }) + mem.Recycle() + }() + } else { + audioFrame = &mrtp.RTPAudio{} + audioFrame.ScalableMemoryAllocator = mem + audioFrame.RTPCodecParameters = receiver.AudioCodecParameters + videoFrame = &mrtp.RTPVideo{} + videoFrame.ScalableMemoryAllocator = mem + videoFrame.RTPCodecParameters = receiver.VideoCodecParameters } - res := &tcp.Response{Request: req} - err = nc.WriteResponse(res) - audioFrame := &mrtp.RTPAudio{} - audioFrame.ScalableMemoryAllocator = mem - audioFrame.RTPCodecParameters = receiver.AudioCodecParameters - videoFrame := &mrtp.RTPVideo{} - videoFrame.ScalableMemoryAllocator = mem - videoFrame.RTPCodecParameters = receiver.VideoCodecParameters for err == nil { ts := time.Now() @@ -250,8 +311,8 @@ func (p *RTSPPlugin) OnTCPConnect(conn *net.TCPConn) { // 1. RTP interleaved: `$` + 1B channel number + 2B size // 2. RTSP response: RTSP/1.0 200 OK // 3. RTSP request: OPTIONS ... - magic, err = nc.Peek(4) - if err != nil { + + if magic, err = nc.Peek(4); err != nil { return } @@ -259,10 +320,11 @@ func (p *RTSPPlugin) OnTCPConnect(conn *net.TCPConn) { var size int var buf []byte if magic[0] != '$' { - logger.Warn("not magic") - switch string(magic) { + magicWord := string(magic) + logger.Warn("not magic", "magic", magicWord) + switch magicWord { case "RTSP": - var res *tcp.Response + var res *util.Response if res, err = nc.ReadResponse(); err != nil { return } @@ -272,16 +334,22 @@ func (p *RTSPPlugin) OnTCPConnect(conn *net.TCPConn) { continue case "OPTI", "TEAR", "DESC", "SETU", "PLAY", "PAUS", "RECO", "ANNO", "GET_", "SET_": - var req *tcp.Request + var req *util.Request if req, err = nc.ReadRequest(); err != nil { return } if req.Method == MethodOptions { - res := &tcp.Response{Request: req} + res := &util.Response{Request: req} + if sendMode { + nc.StartWrite() + } if err = nc.WriteResponse(res); err != nil { return } + if sendMode { + nc.StopWrite() + } } continue @@ -320,6 +388,19 @@ func (p *RTSPPlugin) OnTCPConnect(conn *net.TCPConn) { // return fmt.Errorf("RTSP wrong input") // } //} + for err = nc.Skip(1); err == nil; { + if magic[0], err = nc.ReadByte(); magic[0] == '*' { + channelID, err = nc.ReadByte() + magic[2], err = nc.ReadByte() + magic[3], err = nc.ReadByte() + size = int(binary.BigEndian.Uint16(magic[2:])) + buf = mem.Malloc(size) + if err = nc.ReadNto(size, buf); err != nil { + return + } + break + } + } } } else { // hope that the odd channels are always RTCP @@ -408,7 +489,7 @@ func (p *RTSPPlugin) OnTCPConnect(conn *net.TCPConn) { return case MethodTeardown: - res := &tcp.Response{Request: req} + res := &util.Response{Request: req} _ = nc.WriteResponse(res) return @@ -418,7 +499,7 @@ func (p *RTSPPlugin) OnTCPConnect(conn *net.TCPConn) { } } -func reqTrackID(req *tcp.Request) int { +func reqTrackID(req *util.Request) int { var s string if req.URL.RawQuery != "" { s = req.URL.RawQuery diff --git a/plugin/rtsp/pkg/connection.go b/plugin/rtsp/pkg/connection.go index 8d2c1e8..c8317ff 100644 --- a/plugin/rtsp/pkg/connection.go +++ b/plugin/rtsp/pkg/connection.go @@ -1,15 +1,15 @@ package rtsp import ( - "bufio" "github.com/AlexxIT/go2rtc/pkg/core" - "github.com/AlexxIT/go2rtc/pkg/tcp" "log/slog" "m7s.live/m7s/v5/pkg/util" "net" "net/url" + "runtime" "strconv" "sync" + "sync/atomic" "time" ) @@ -18,17 +18,15 @@ const Timeout = time.Second * 5 func NewNetConnection(conn net.Conn, logger *slog.Logger) *NetConnection { defer logger.Info("new connection") return &NetConnection{ - conn: conn, - Logger: logger, - BufReader: util.NewBufReader(conn), - textReader: bufio.NewReader(conn), + conn: conn, + Logger: logger, + BufReader: util.NewBufReader(conn), } } type NetConnection struct { *slog.Logger *util.BufReader - textReader *bufio.Reader Backchannel bool Media string PacketSize uint16 @@ -42,18 +40,27 @@ type NetConnection struct { // internal - auth *tcp.Auth + auth *util.Auth conn net.Conn keepalive int - mode core.Mode sequence int Session string sdp string uri string + writing atomic.Bool + state State + stateMu sync.Mutex + SDP string +} - state State - stateMu sync.Mutex - SDP string +func (c *NetConnection) StartWrite() { + for !c.writing.CompareAndSwap(false, true) { + runtime.Gosched() + } +} + +func (c *NetConnection) StopWrite() { + c.writing.Store(false) } func (c *NetConnection) Destroy() { @@ -98,7 +105,7 @@ const ( StatePlay ) -func (c *NetConnection) WriteRequest(req *tcp.Request) error { +func (c *NetConnection) WriteRequest(req *util.Request) error { if req.Proto == "" { req.Proto = ProtoRTSP } @@ -130,11 +137,11 @@ func (c *NetConnection) WriteRequest(req *tcp.Request) error { return req.Write(c.conn) } -func (c *NetConnection) ReadRequest() (req *tcp.Request, err error) { +func (c *NetConnection) ReadRequest() (req *util.Request, err error) { if err = c.conn.SetReadDeadline(time.Now().Add(Timeout)); err != nil { return } - req, err = tcp.ReadRequest(c.textReader) + req, err = util.ReadRequest(c.BufReader) if err != nil { return } @@ -142,7 +149,7 @@ func (c *NetConnection) ReadRequest() (req *tcp.Request, err error) { return } -func (c *NetConnection) WriteResponse(res *tcp.Response) error { +func (c *NetConnection) WriteResponse(res *util.Response) error { if res.Proto == "" { res.Proto = ProtoRTSP } @@ -182,9 +189,16 @@ func (c *NetConnection) WriteResponse(res *tcp.Response) error { return res.Write(c.conn) } -func (c *NetConnection) ReadResponse() (*tcp.Response, error) { +func (c *NetConnection) ReadResponse() (*util.Response, error) { if err := c.conn.SetReadDeadline(time.Now().Add(Timeout)); err != nil { return nil, err } - return tcp.ReadResponse(c.textReader) + return util.ReadResponse(c.BufReader) +} + +func (c *NetConnection) Write(chunk []byte) (int, error) { + if err := c.conn.SetWriteDeadline(time.Now().Add(Timeout)); err != nil { + return 0, err + } + return c.conn.Write(chunk) } diff --git a/plugin/rtsp/pkg/transceiver.go b/plugin/rtsp/pkg/transceiver.go index 4a52407..e75b0c6 100644 --- a/plugin/rtsp/pkg/transceiver.go +++ b/plugin/rtsp/pkg/transceiver.go @@ -5,11 +5,19 @@ import ( "m7s.live/m7s/v5" ) +type Stream struct { + *NetConnection + AudioChannelID byte + VideoChannelID byte +} +type Sender struct { + *m7s.Subscriber + Stream +} + type Receiver struct { *m7s.Publisher - *NetConnection + Stream AudioCodecParameters *webrtc.RTPCodecParameters VideoCodecParameters *webrtc.RTPCodecParameters - AudioChannelID byte - VideoChannelID byte } diff --git a/plugin/webrtc/index.go b/plugin/webrtc/index.go index e517703..ed11bcb 100644 --- a/plugin/webrtc/index.go +++ b/plugin/webrtc/index.go @@ -189,29 +189,25 @@ func (conf *WebRTCPlugin) Push_(w http.ResponseWriter, r *http.Request) { frame.ScalableMemoryAllocator = mem for { var packet rtp.Packet - buf := frame.NextN(1460) + buf := frame.Malloc(mrtp.MTUSize) if n, _, err = track.Read(buf); err == nil { - if n < 1460 { - frame.Free(buf[n:]) - buf = buf[:n] - frame.UpdateBuffer(-1, buf) - } + frame.FreeRest(&buf, n) err = packet.Unmarshal(buf) } if err != nil { return } if len(packet.Payload) == 0 { - frame.Free(frame.RemoveRecycleBytes(-1)) + frame.Free(buf) continue } if len(frame.Packets) == 0 || packet.Timestamp == frame.Packets[0].Timestamp { + frame.AddRecycleBytes(buf) frame.Packets = append(frame.Packets, &packet) } else { - m := frame.RemoveRecycleBytes(-1) - publisher.WriteAudio(frame) + err = publisher.WriteAudio(frame) frame = &mrtp.RTPAudio{} - frame.AddRecycleBytes(m) + frame.AddRecycleBytes(buf) frame.Packets = []*rtp.Packet{&packet} frame.RTPCodecParameters = &codecP frame.ScalableMemoryAllocator = mem @@ -236,31 +232,27 @@ func (conf *WebRTCPlugin) Push_(w http.ResponseWriter, r *http.Request) { lastPLISent = time.Now() } var packet rtp.Packet - buf := frame.NextN(1460) + buf := frame.Malloc(mrtp.MTUSize) if n, _, err = track.Read(buf); err == nil { - if n < 1460 { - frame.Free(buf[n:]) - buf = buf[:n] - frame.UpdateBuffer(-1, buf) - } + frame.FreeRest(&buf, n) err = packet.Unmarshal(buf) } if err != nil { return } if len(packet.Payload) == 0 { - frame.Free(frame.RemoveRecycleBytes(-1)) + frame.Free(buf) continue } if len(frame.Packets) == 0 || packet.Timestamp == frame.Packets[0].Timestamp { + frame.AddRecycleBytes(buf) frame.Packets = append(frame.Packets, &packet) } else { // t := time.Now() - m := frame.RemoveRecycleBytes(-1) - publisher.WriteVideo(frame) + err = publisher.WriteVideo(frame) // fmt.Println("write video", time.Since(t)) frame = &mrtp.RTPVideo{} - frame.AddRecycleBytes(m) + frame.AddRecycleBytes(buf) frame.Packets = []*rtp.Packet{&packet} frame.RTPCodecParameters = &codecP frame.ScalableMemoryAllocator = mem @@ -351,20 +343,20 @@ func (conf *WebRTCPlugin) Play_(w http.ResponseWriter, r *http.Request) { if vt.FourCC() == codec.FourCC_H265 { useDC = true } else { - var rcc RTPCodecCapability + var rcc RTPCodecParameters if ctx, ok := vt.ICodecCtx.(mrtp.IRTPCtx); ok { - rcc = ctx.GetRTPCodecCapability() + rcc = ctx.GetRTPCodecParameter() } else { var rtpCtx mrtp.RTPData var tmpAVTrack AVTrack err = rtpCtx.DecodeConfig(&tmpAVTrack, vt.ICodecCtx) if err == nil { - rcc = tmpAVTrack.ICodecCtx.(mrtp.IRTPCtx).GetRTPCodecCapability() + rcc = tmpAVTrack.ICodecCtx.(mrtp.IRTPCtx).GetRTPCodecParameter() } else { return } } - videoTLSRTP, err = NewTrackLocalStaticRTP(rcc, vt.FourCC().String(), suber.StreamPath) + videoTLSRTP, err = NewTrackLocalStaticRTP(rcc.RTPCodecCapability, vt.FourCC().String(), suber.StreamPath) if err != nil { return } diff --git a/publisher.go b/publisher.go index 52443d9..91cf431 100644 --- a/publisher.go +++ b/publisher.go @@ -1,6 +1,8 @@ package m7s import ( + "os" + "path/filepath" "reflect" "slices" "sync" @@ -76,6 +78,7 @@ type Publisher struct { GOP int baseTs time.Duration lastTs time.Duration + dumpFile *os.File } func (p *Publisher) SubscriberRange(yield func(sub *Subscriber) bool) { @@ -129,6 +132,9 @@ func (p *Publisher) RemoveSubscriber(subscriber *Subscriber) { defer p.Unlock() p.Subscribers.Remove(subscriber) p.Info("subscriber -1", "count", p.Subscribers.Length) + if p.Plugin == nil { + return + } if subscriber.BufferTime == p.BufferTime && p.Subscribers.Length > 0 { p.BufferTime = slices.MaxFunc(p.Subscribers.Items, func(a, b *Subscriber) int { return int(a.BufferTime - b.BufferTime) @@ -175,6 +181,15 @@ func (p *Publisher) AddSubscriber(subscriber *Subscriber) { } } +func (p *Publisher) Start() { + p.Info("publish") + if p.Dump { + f := filepath.Join("./dump", p.StreamPath) + os.MkdirAll(filepath.Dir(f), 0666) + p.dumpFile, _ = os.OpenFile(filepath.Join("./dump", p.StreamPath), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666) + } +} + func (p *Publisher) writeAV(t *AVTrack, data IAVFrame) { frame := &t.Value frame.Wraps = append(frame.Wraps, data) @@ -199,6 +214,9 @@ func (p *Publisher) WriteVideo(data IAVFrame) (err error) { data.Recycle() } }() + if p.dumpFile != nil { + data.Dump(1, p.dumpFile) + } if !p.PubVideo || p.IsStopped() { return ErrMuted } @@ -230,17 +248,20 @@ func (p *Publisher) WriteVideo(data IAVFrame) (err error) { idr = t.IDRingList.Back().Value } if t.Value.IDR { - if t.Ready.IsPending() { + if !t.IsReady() { p.Info("ready") - t.Ready.Fulfill(nil) + t.Ready(nil) } else if idr != nil { p.GOP = int(t.Value.Sequence - idr.Value.Sequence) } else { p.GOP = 0 } + if p.AudioTrack.Length > 0 { + p.AudioTrack.PushIDR() + } } p.writeAV(t, data) - if p.VideoTrack.Length > 1 && !p.VideoTrack.AVTrack.Ready.IsPending() { + if p.VideoTrack.Length > 1 && p.VideoTrack.IsReady() { if t.Value.Raw == nil { t.Value.Raw, err = t.Value.Wraps[0].ToRaw(t.ICodecCtx) if err != nil { @@ -256,21 +277,22 @@ func (p *Publisher) WriteVideo(data IAVFrame) (err error) { track.Error("DecodeConfig", "err", err) return } - for rf := idr; rf != t.Ring; rf = rf.Next() { - if i == 0 && rf.Value.Raw == nil { - rf.Value.Raw, err = rf.Value.Wraps[0].ToRaw(t.ICodecCtx) - if err != nil { - t.Error("to raw", "err", err) - return err + if t.IDRingList.Len() > 0 { + for rf := t.IDRingList.Front().Value; rf != t.Ring; rf = rf.Next() { + if i == 0 && rf.Value.Raw == nil { + rf.Value.Raw, err = rf.Value.Wraps[0].ToRaw(t.ICodecCtx) + if err != nil { + t.Error("to raw", "err", err) + return err + } } + if toFrame, err = track.CreateFrame(&rf.Value); err != nil { + track.Error("from raw", "err", err) + return + } + rf.Value.Wraps = append(rf.Value.Wraps, toFrame) } - if toFrame, err = track.CreateFrame(&rf.Value); err != nil { - track.Error("from raw", "err", err) - return - } - rf.Value.Wraps = append(rf.Value.Wraps, toFrame) } - defer track.Ready.Fulfill(err) } if toFrame, err = track.CreateFrame(&t.Value); err != nil { track.Error("from raw", "err", err) @@ -280,6 +302,9 @@ func (p *Publisher) WriteVideo(data IAVFrame) (err error) { toFrame.DecodeConfig(track, t.ICodecCtx) } t.Value.Wraps = append(t.Value.Wraps, toFrame) + if track.ICodecCtx != nil { + track.Ready(err) + } } } t.Step() @@ -293,6 +318,9 @@ func (p *Publisher) WriteAudio(data IAVFrame) (err error) { data.Recycle() } }() + if p.dumpFile != nil { + data.Dump(0, p.dumpFile) + } if !p.PubAudio || p.IsStopped() { return ErrMuted } @@ -309,14 +337,60 @@ func (p *Publisher) WriteAudio(data IAVFrame) (err error) { } p.Unlock() } - _, _, _, err = data.Parse(t) + oldCodecCtx := t.ICodecCtx + _, _, t.Value.Raw, err = data.Parse(t) + codecCtxChanged := oldCodecCtx != t.ICodecCtx if t.ICodecCtx == nil { return ErrUnsupportCodec } - if t.Ready.IsPending() { - t.Ready.Fulfill(err) - } + t.Ready(err) p.writeAV(t, data) + if p.AudioTrack.Length > 1 && p.AudioTrack.IsReady() { + if t.Value.Raw == nil { + t.Value.Raw, err = t.Value.Wraps[0].ToRaw(t.ICodecCtx) + if err != nil { + t.Error("to raw", "err", err) + return err + } + } + var toFrame IAVFrame + for i, track := range p.AudioTrack.Items[1:] { + if track.ICodecCtx == nil { + err = (reflect.New(track.FrameType.Elem()).Interface().(IAVFrame)).DecodeConfig(track, t.ICodecCtx) + if err != nil { + track.Error("DecodeConfig", "err", err) + return + } + if idr := p.AudioTrack.GetOldestIDR(); idr != nil { + for rf := idr; rf != t.Ring; rf = rf.Next() { + if i == 0 && rf.Value.Raw == nil { + rf.Value.Raw, err = rf.Value.Wraps[0].ToRaw(t.ICodecCtx) + if err != nil { + t.Error("to raw", "err", err) + return err + } + } + if toFrame, err = track.CreateFrame(&rf.Value); err != nil { + track.Error("from raw", "err", err) + return + } + rf.Value.Wraps = append(rf.Value.Wraps, toFrame) + } + } + } + if toFrame, err = track.CreateFrame(&t.Value); err != nil { + track.Error("from raw", "err", err) + return + } + if codecCtxChanged { + toFrame.DecodeConfig(track, t.ICodecCtx) + } + t.Value.Wraps = append(t.Value.Wraps, toFrame) + if track.ICodecCtx != nil { + track.Ready(err) + } + } + } t.Step() p.AudioTrack.speedControl(p.Publish.Speed, p.lastTs) return @@ -365,6 +439,9 @@ func (p *Publisher) GetVideoTrack(dataType reflect.Type) (t *AVTrack) { func (p *Publisher) Dispose(err error) { p.Lock() defer p.Unlock() + if p.dumpFile != nil { + p.dumpFile.Close() + } if p.State == PublisherStateDisposed { return } diff --git a/server.go b/server.go index 813d4ad..5f0105a 100644 --- a/server.go +++ b/server.go @@ -410,7 +410,7 @@ func (s *Server) OnPublish(publisher *Publisher) error { publisher.ID = s.pidG publisher.Logger = p.With("streamPath", publisher.StreamPath, "pubID", publisher.ID) publisher.TimeoutTimer = time.NewTimer(p.config.PublishTimeout) - publisher.Info("publish") + publisher.Start() if waiting, ok := s.Waiting.Get(publisher.StreamPath); ok { publisher.TakeOver(waiting) s.Waiting.Remove(waiting) diff --git a/subscriber.go b/subscriber.go index 224f4eb..cbf06f7 100644 --- a/subscriber.go +++ b/subscriber.go @@ -69,11 +69,11 @@ func (ps *PubSubBase) Init(p *Plugin, streamPath string, conf any, options ...an for key, value := range ps.Args { if strings.HasSuffix(key, "ArgName") { targetArgName := strings.TrimSuffix(key, "ArgName") - cc[targetArgName] = ps.Args.Get(value[0])[0] + cc[strings.ToLower(targetArgName)] = ps.Args.Get(value[0])[0] ignores[value[0]] = struct{}{} delete(cc, value[0]) } else if _, ok := ignores[key]; !ok { - cc[key] = value[0] + cc[strings.ToLower(key)] = value[0] } } c.ParseModifyFile(cc) @@ -118,7 +118,7 @@ func PlayBlock[A any, V any](s *Subscriber, onAudio func(A) error, onVideo func( } } if at != nil { - if _, err := at.Ready.Await(); err != nil { + if err := at.WaitReady(); err != nil { return } ar = NewAVRingReader(at) @@ -142,7 +142,7 @@ func PlayBlock[A any, V any](s *Subscriber, onAudio func(A) error, onVideo func( } } if vt != nil { - if _, err := vt.Ready.Await(); err != nil { + if err := vt.WaitReady(); err != nil { return } vr = NewAVRingReader(vt)