mirror of
https://github.com/langhuihui/monibuca.git
synced 2025-12-24 13:48:04 +08:00
fix: dispose SinglePortReader
This commit is contained in:
@@ -130,11 +130,13 @@ func (Buffer) Reuse() bool {
|
||||
}
|
||||
|
||||
func (b *Buffer) Read(buf []byte) (n int, err error) {
|
||||
if n = b.Len(); n == 0 {
|
||||
return 0, io.EOF
|
||||
}
|
||||
if !b.CanReadN(len(buf)) {
|
||||
copy(buf, *b)
|
||||
n = b.Len()
|
||||
*b = (*b)[n:]
|
||||
return n, io.EOF
|
||||
return
|
||||
}
|
||||
ret := b.ReadN(len(buf))
|
||||
copy(buf, ret)
|
||||
|
||||
@@ -379,11 +379,17 @@ func (d *Dialog) Run() (err error) {
|
||||
case mrtp.StreamModeTCPPassive:
|
||||
if d.gb.tcpPort > 0 {
|
||||
d.Info("into single port mode,use gb.tcpPort", d.gb.tcpPort)
|
||||
// 创建一个可取消的上下文
|
||||
reader := &gb28181.SinglePortReader{
|
||||
SSRC: d.SSRC,
|
||||
Mouth: make(chan []byte, 1),
|
||||
SSRC: d.SSRC,
|
||||
Mouth: make(chan []byte, 1),
|
||||
Context: d,
|
||||
}
|
||||
var loaded bool
|
||||
reader, loaded = d.gb.singlePorts.LoadOrStore(reader)
|
||||
if loaded {
|
||||
reader.Context = d
|
||||
}
|
||||
reader, _ = d.gb.singlePorts.LoadOrStore(reader)
|
||||
pub.SinglePort = reader
|
||||
d.OnStop(func() {
|
||||
reader.Close()
|
||||
@@ -395,10 +401,15 @@ func (d *Dialog) Run() (err error) {
|
||||
if d.gb.udpPort > 0 {
|
||||
d.Info("into single port mode, use gb.udpPort", d.gb.udpPort)
|
||||
reader := &gb28181.SinglePortReader{
|
||||
SSRC: d.SSRC,
|
||||
Mouth: make(chan []byte, 100),
|
||||
SSRC: d.SSRC,
|
||||
Mouth: make(chan []byte, 100),
|
||||
Context: d,
|
||||
}
|
||||
var loaded bool
|
||||
reader, loaded = d.gb.singlePorts.LoadOrStore(reader)
|
||||
if loaded {
|
||||
reader.Context = d
|
||||
}
|
||||
reader, _ = d.gb.singlePorts.LoadOrStore(reader)
|
||||
pub.SinglePort = reader
|
||||
d.OnStop(func() {
|
||||
reader.Close()
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package gb28181
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
@@ -15,6 +16,7 @@ type SinglePortReader struct {
|
||||
conn io.ReadCloser
|
||||
buffered util.Buffer
|
||||
Mouth chan []byte
|
||||
Context context.Context
|
||||
}
|
||||
|
||||
func (s *SinglePortReader) GetKey() uint32 {
|
||||
@@ -23,14 +25,18 @@ func (s *SinglePortReader) GetKey() uint32 {
|
||||
|
||||
func (s *SinglePortReader) Read(buf []byte) (n int, err error) {
|
||||
if s.buffered.Len() > 0 {
|
||||
n, _ = s.buffered.Read(buf)
|
||||
return
|
||||
return s.buffered.Read(buf)
|
||||
}
|
||||
if s.conn != nil {
|
||||
return s.conn.Read(buf)
|
||||
}
|
||||
s.buffered = <-s.Mouth
|
||||
return s.Read(buf)
|
||||
// 添加对 Context 的检查,如果上下文已取消则返回 EOF
|
||||
select {
|
||||
case s.buffered = <-s.Mouth:
|
||||
return s.Read(buf)
|
||||
case <-s.Context.Done():
|
||||
return 0, s.Context.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *SinglePortReader) Close() error {
|
||||
|
||||
Reference in New Issue
Block a user