mirror of
https://github.com/langhuihui/monibuca.git
synced 2025-09-27 01:15:52 +08:00
fix: listen tcp failed return err
This commit is contained in:
@@ -85,9 +85,14 @@ func (a *AnnexB) Parse(t *AVTrack) (err error) {
|
||||
switch codec.ParseH264NALUType(nalu.Buffers[0][0]) {
|
||||
case codec.NALU_SPS:
|
||||
ctx.RecordInfo.SPS = [][]byte{nalu.ToBytes()}
|
||||
if len(ctx.RecordInfo.PPS) > 0 {
|
||||
ctx.CodecData, err = h264parser.NewCodecDataFromSPSAndPPS(ctx.SPS(), ctx.PPS())
|
||||
}
|
||||
case codec.NALU_PPS:
|
||||
ctx.RecordInfo.PPS = [][]byte{nalu.ToBytes()}
|
||||
ctx.CodecData, err = h264parser.NewCodecDataFromSPSAndPPS(ctx.SPS(), ctx.PPS())
|
||||
if len(ctx.RecordInfo.SPS) > 0 {
|
||||
ctx.CodecData, err = h264parser.NewCodecDataFromSPSAndPPS(ctx.SPS(), ctx.PPS())
|
||||
}
|
||||
case codec.NALU_IDR_Picture:
|
||||
t.Value.IDR = true
|
||||
}
|
||||
|
@@ -73,6 +73,7 @@ func (task *ListenTCPWork) Start() (err error) {
|
||||
task.Info("listen tcp")
|
||||
} else {
|
||||
task.Error("failed to listen tcp", "error", err)
|
||||
return err
|
||||
}
|
||||
if task.handler == nil {
|
||||
return nil
|
||||
|
@@ -58,7 +58,10 @@ type GB28181Plugin struct {
|
||||
tcpPorts chan uint16
|
||||
}
|
||||
|
||||
var _ = m7s.InstallPlugin[GB28181Plugin](pb.RegisterApiHandler, &pb.Api_ServiceDesc, func() m7s.IPuller {
|
||||
var _ = m7s.InstallPlugin[GB28181Plugin](pb.RegisterApiHandler, &pb.Api_ServiceDesc, func(conf config.Pull) m7s.IPuller {
|
||||
if util.Exist(conf.URL) {
|
||||
return &gb28181.DumpPuller{}
|
||||
}
|
||||
return new(Dialog)
|
||||
})
|
||||
|
||||
@@ -365,6 +368,11 @@ func (gb *GB28181Plugin) StoreDevice(id string, req *sip.Request) (d *Device) {
|
||||
}
|
||||
|
||||
func (gb *GB28181Plugin) Pull(streamPath string, conf config.Pull, pubConf *config.Publish) {
|
||||
if util.Exist(conf.URL) {
|
||||
var puller gb28181.DumpPuller
|
||||
puller.GetPullJob().Init(&puller, &gb.Plugin, streamPath, conf, pubConf)
|
||||
return
|
||||
}
|
||||
dialog := Dialog{
|
||||
gb: gb,
|
||||
}
|
||||
|
41
plugin/gb28181/pkg/puller-dump.go
Normal file
41
plugin/gb28181/pkg/puller-dump.go
Normal file
@@ -0,0 +1,41 @@
|
||||
package gb28181
|
||||
|
||||
import (
|
||||
"m7s.live/v5"
|
||||
"m7s.live/v5/pkg/util"
|
||||
"time"
|
||||
)
|
||||
|
||||
type DumpPuller struct {
|
||||
m7s.HTTPFilePuller
|
||||
}
|
||||
|
||||
func (p *DumpPuller) Run() (err error) {
|
||||
pub := p.PullJob.Publisher
|
||||
pub.Type = m7s.PublishTypeReplay
|
||||
puber := NewPSPublisher(pub)
|
||||
puber.Receiver.Logger = p.Logger
|
||||
go puber.Demux()
|
||||
var t uint16
|
||||
defer close(puber.Receiver.FeedChan)
|
||||
for l := make([]byte, 6); pub.State != m7s.PublisherStateDisposed; time.Sleep(time.Millisecond * time.Duration(t)) {
|
||||
_, err = p.Read(l)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
payloadLen := util.ReadBE[int](l[:4])
|
||||
payload := make([]byte, payloadLen)
|
||||
t = util.ReadBE[uint16](l[4:])
|
||||
_, err = p.Read(payload)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if err = puber.Receiver.ReadRTP(payload); err != nil {
|
||||
p.Error("replayPS", "err", err)
|
||||
}
|
||||
if pub.IsStopped() {
|
||||
return pub.StopReason()
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
@@ -100,6 +100,8 @@ func (p *PSPublisher) Demux() {
|
||||
}
|
||||
case StartCodeMAP:
|
||||
p.decProgramStreamMap()
|
||||
case StartCodeSYS, PrivateStreamCode:
|
||||
p.ReadPayload()
|
||||
default:
|
||||
p.ReadPayload()
|
||||
}
|
||||
@@ -137,6 +139,9 @@ func (p *Receiver) ReadRTP(rtp util.Buffer) (err error) {
|
||||
if err = p.Unmarshal(rtp); err != nil {
|
||||
return
|
||||
}
|
||||
if p.Enabled(p, task.TraceLevel) {
|
||||
p.Trace("rtp", "len", rtp.Len(), "seq", p.SequenceNumber, "payloadType", p.PayloadType, "ssrc", p.SSRC)
|
||||
}
|
||||
copyData := make([]byte, len(p.Payload))
|
||||
copy(copyData, p.Payload)
|
||||
p.FeedChan <- copyData
|
||||
|
@@ -88,6 +88,10 @@ func (p *MP4Plugin) List(ctx context.Context, req *pb.ReqRecordList) (resp *pb.R
|
||||
}
|
||||
|
||||
func (p *MP4Plugin) Catalog(ctx context.Context, req *emptypb.Empty) (resp *pb.ResponseCatalog, err error) {
|
||||
if p.DB == nil {
|
||||
err = pkg.ErrNoDB
|
||||
return
|
||||
}
|
||||
resp = &pb.ResponseCatalog{}
|
||||
var result []struct {
|
||||
StreamPath string
|
||||
@@ -151,6 +155,10 @@ func (p *MP4Plugin) Delete(ctx context.Context, req *pb.ReqRecordDelete) (resp *
|
||||
}
|
||||
|
||||
func (p *MP4Plugin) download(w http.ResponseWriter, r *http.Request) {
|
||||
if p.DB == nil {
|
||||
http.Error(w, pkg.ErrNoDB.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
w.Header().Set("Content-Type", "video/mp4")
|
||||
streamPath := r.PathValue("streamPath")
|
||||
|
||||
|
Reference in New Issue
Block a user