fix: promise never return

This commit is contained in:
langhuihui
2023-11-16 09:55:22 +08:00
parent 2b658ec786
commit 6c6a28855d
4 changed files with 130 additions and 64 deletions

View File

@@ -512,11 +512,6 @@ func (s *Stream) run() {
case action, ok := <-s.actionChan.C: case action, ok := <-s.actionChan.C:
if !ok { if !ok {
return return
} else if s.State == STATE_CLOSED {
if s.actionChan.Close() { //再次尝试关闭
return
}
continue
} }
timeStart = time.Now() timeStart = time.Now()
switch v := action.(type) { switch v := action.(type) {
@@ -527,6 +522,7 @@ func (s *Stream) run() {
timeOutInfo = zap.String("action", "Publish") timeOutInfo = zap.String("action", "Publish")
if s.IsClosed() { if s.IsClosed() {
v.Reject(ErrStreamIsClosed) v.Reject(ErrStreamIsClosed)
break
} }
puber := v.Value.GetPublisher() puber := v.Value.GetPublisher()
conf := puber.Config conf := puber.Config
@@ -571,6 +567,7 @@ func (s *Stream) run() {
timeOutInfo = zap.String("action", "Subscribe") timeOutInfo = zap.String("action", "Subscribe")
if s.IsClosed() { if s.IsClosed() {
v.Reject(ErrStreamIsClosed) v.Reject(ErrStreamIsClosed)
break
} }
suber := v.Value suber := v.Value
io := suber.GetSubscriber() io := suber.GetSubscriber()
@@ -628,6 +625,9 @@ func (s *Stream) run() {
s.onSuberClose(v) s.onSuberClose(v)
case TrackRemoved: case TrackRemoved:
timeOutInfo = zap.String("action", "TrackRemoved") timeOutInfo = zap.String("action", "TrackRemoved")
if s.IsClosed() {
break
}
name := v.GetName() name := v.GetName()
if t, ok := s.Tracks.LoadAndDelete(name); ok { if t, ok := s.Tracks.LoadAndDelete(name); ok {
s.Info("track -1", zap.String("name", name)) s.Info("track -1", zap.String("name", name))
@@ -636,6 +636,10 @@ func (s *Stream) run() {
} }
case *util.Promise[Track]: case *util.Promise[Track]:
timeOutInfo = zap.String("action", "Track") timeOutInfo = zap.String("action", "Track")
if s.IsClosed() {
v.Reject(ErrStreamIsClosed)
break
}
if s.State == STATE_WAITPUBLISH { if s.State == STATE_WAITPUBLISH {
s.action(ACTION_PUBLISH) s.action(ACTION_PUBLISH)
} }
@@ -673,6 +677,9 @@ func (s *Stream) run() {
timeOutInfo = zap.String("action", "unknown") timeOutInfo = zap.String("action", "unknown")
s.Error("unknown action", timeOutInfo) s.Error("unknown action", timeOutInfo)
} }
if s.IsClosed() && s.actionChan.Close() { //再次尝试关闭
return
}
} }
} }
} }

View File

@@ -15,6 +15,7 @@ var _ SpesificTrack = (*AV1)(nil)
type AV1 struct { type AV1 struct {
Video Video
decoder rtpav1.Decoder decoder rtpav1.Decoder
encoder rtpav1.Encoder
} }
func NewAV1(stream IStream, stuff ...any) (vt *AV1) { func NewAV1(stream IStream, stuff ...any) (vt *AV1) {
@@ -24,8 +25,11 @@ func NewAV1(stream IStream, stuff ...any) (vt *AV1) {
if vt.BytesPool == nil { if vt.BytesPool == nil {
vt.BytesPool = make(util.BytesPool, 17) vt.BytesPool = make(util.BytesPool, 17)
} }
vt.nalulenSize = 4 vt.nalulenSize = 0
vt.dtsEst = NewDTSEstimator() vt.dtsEst = NewDTSEstimator()
vt.decoder.Init()
vt.encoder.Init()
vt.encoder.PayloadType = vt.PayloadType
return return
} }
@@ -33,7 +37,7 @@ func (vt *AV1) writeSequenceHead(head []byte) (err error) {
vt.WriteSequenceHead(head) vt.WriteSequenceHead(head)
var info codec.AV1CodecConfigurationRecord var info codec.AV1CodecConfigurationRecord
info.Unmarshal(head[5:]) info.Unmarshal(head[5:])
vt.ParamaterSets[0] = info.ConfigOBUs vt.ParamaterSets = [][]byte{info.ConfigOBUs, {info.SeqLevelIdx0, info.SeqProfile, info.SeqTier0}}
return return
} }
@@ -44,32 +48,32 @@ func (vt *AV1) WriteAVCC(ts uint32, frame *util.BLL) (err error) {
} }
b0 := frame.GetByte(0) b0 := frame.GetByte(0)
if isExtHeader := (b0 >> 4) & 0b1000; isExtHeader != 0 { if isExtHeader := (b0 >> 4) & 0b1000; isExtHeader != 0 {
firstBuffer := frame.Next.Value // firstBuffer := frame.Next.Value
packetType := b0 & 0b1111 packetType := b0 & 0b1111
switch packetType { switch packetType {
case codec.PacketTypeSequenceStart: case codec.PacketTypeSequenceStart:
header := frame.ToBytes() header := frame.ToBytes()
header[0] = 0x1d // header[0] = 0x1d
header[1] = 0x00 // header[1] = 0x00
header[2] = 0x00 // header[2] = 0x00
header[3] = 0x00 // header[3] = 0x00
header[4] = 0x00 // header[4] = 0x00
err = vt.writeSequenceHead(header) err = vt.writeSequenceHead(header)
frame.Recycle() frame.Recycle()
return return
case codec.PacketTypeCodedFrames: case codec.PacketTypeCodedFrames:
firstBuffer[0] = b0 & 0b0111_1111 & 0xFD // firstBuffer[0] = b0 & 0b0111_1111 & 0xFD
firstBuffer[1] = 0x01 // firstBuffer[1] = 0x01
copy(firstBuffer[2:], firstBuffer[5:]) // copy(firstBuffer[2:], firstBuffer[5:])
frame.Next.Value = firstBuffer[:firstBuffer.Len()-3] // frame.Next.Value = firstBuffer[:firstBuffer.Len()-3]
frame.ByteLength -= 3 // frame.ByteLength -= 3
return vt.Video.WriteAVCC(ts, frame) return vt.Video.WriteAVCC(ts, frame)
case codec.PacketTypeCodedFramesX: case codec.PacketTypeCodedFramesX:
firstBuffer[0] = b0 & 0b0111_1111 & 0xFD // firstBuffer[0] = b0 & 0b0111_1111 & 0xFD
firstBuffer[1] = 0x01 // firstBuffer[1] = 0x01
firstBuffer[2] = 0 // firstBuffer[2] = 0
firstBuffer[3] = 0 // firstBuffer[3] = 0
firstBuffer[4] = 0 // firstBuffer[4] = 0
return vt.Video.WriteAVCC(ts, frame) return vt.Video.WriteAVCC(ts, frame)
} }
} else { } else {
@@ -108,3 +112,26 @@ func (vt *AV1) WriteRTPFrame(rtpItem *util.ListItem[RTPFrame]) {
vt.Flush() vt.Flush()
} }
} }
// RTP格式补完
func (vt *AV1) CompleteRTP(value *AVFrame) {
rtps, err := vt.encoder.Encode(vt.Value.AUList.ToBuffers())
if err != nil {
vt.Error("AV1 encoder encode error", zap.Error(err))
return
}
if vt.Value.IFrame {
rtpItem := vt.GetRTPFromPool()
packet := &rtpItem.Value
br := util.LimitBuffer{Buffer: packet.Payload}
packet.Timestamp = uint32(vt.Value.PTS)
packet.Marker = false
br.Write(vt.ParamaterSets[0])
packet.Payload = br.Bytes()
vt.Value.RTP.Push(rtpItem)
}
for _, rtp := range rtps {
vt.Value.RTP.PushValue(RTPFrame{Packet: rtp})
}
}

View File

@@ -1,7 +1,6 @@
package track package track
import ( import (
"time" "time"
"github.com/pion/rtp" "github.com/pion/rtp"
@@ -127,41 +126,64 @@ func (vt *Video) WriteAVCC(ts uint32, frame *util.BLL) (e error) {
if err != nil { if err != nil {
return err return err
} }
b = (b >> 4) & 0b0111 isExtHeader := (b >> 4) & 0b1000
vt.Value.IFrame = b == 1 || b == 4 frameType := (b >> 4) & 0b0111
r.ReadByte() //sequence frame flag vt.Value.IFrame = frameType == 1 || frameType == 4
cts, err := r.ReadBE(3) packetType := b & 0b1111
var cts uint32
if isExtHeader != 0 {
fourcCC, _ := r.ReadBE(4) //sequence frame flag
switch packetType {
case codec.PacketTypeSequenceStart:
case codec.PacketTypeCodedFrames:
if fourcCC == codec.FourCC_H265_32 {
cts, err = r.ReadBE(3)
}
case codec.PacketTypeCodedFramesX:
}
} else {
r.ReadByte() //sequence frame flag
cts, err = r.ReadBE(3)
if err != nil {
return err
}
}
if err != nil { if err != nil {
return err return err
} }
vt.Value.PTS = time.Duration(ts+cts) * 90 vt.Value.PTS = time.Duration(ts+cts) * 90
vt.Value.DTS = time.Duration(ts) * 90 vt.Value.DTS = time.Duration(ts) * 90
// println(":", vt.Value.Sequence) // println(":", vt.Value.Sequence)
var nalulen uint32 if isExtHeader == 0 {
for nalulen, e = r.ReadBE(vt.nalulenSize); e == nil; nalulen, e = r.ReadBE(vt.nalulenSize) { var nalulen uint32
if remain := frame.ByteLength - r.GetOffset(); remain < int(nalulen) { for nalulen, e = r.ReadBE(vt.nalulenSize); e == nil; nalulen, e = r.ReadBE(vt.nalulenSize) {
vt.Error("read nalu length error", zap.Int("nalulen", int(nalulen)), zap.Int("remain", remain)) if remain := frame.ByteLength - r.GetOffset(); remain < int(nalulen) {
frame.Recycle() vt.Error("read nalu length error", zap.Int("nalulen", int(nalulen)), zap.Int("remain", remain))
vt.Value.Reset() frame.Recycle()
return vt.Value.Reset()
// for bbb.CanRead() { return
// nalulen = bbb.ReadUint32() // for bbb.CanRead() {
// if bbb.CanReadN(int(nalulen)) { // nalulen = bbb.ReadUint32()
// bbb.ReadN(int(nalulen)) // if bbb.CanReadN(int(nalulen)) {
// } else { // bbb.ReadN(int(nalulen))
// panic("read nalu error1") // } else {
// } // panic("read nalu error1")
// }
// }
// panic("read nalu error2")
}
// var au util.BLL
// for _, bb := range r.ReadN(int(nalulen)) {
// au.Push(vt.BytesPool.GetShell(bb))
// } // }
// panic("read nalu error2") // println(":", nalulen, au.ByteLength)
// vt.Value.AUList.PushValue(&au)
vt.AppendAuBytes(r.ReadN(int(nalulen))...)
} }
// var au util.BLL } else {
// for _, bb := range r.ReadN(int(nalulen)) { vt.AppendAuBytes(r.ReadN(frame.ByteLength - 5)...)
// au.Push(vt.BytesPool.GetShell(bb))
// }
// println(":", nalulen, au.ByteLength)
// vt.Value.AUList.PushValue(&au)
vt.AppendAuBytes(r.ReadN(int(nalulen))...)
} }
vt.Value.WriteAVCC(ts, frame) vt.Value.WriteAVCC(ts, frame)
// { // {
// b := util.Buffer(vt.Value.AVCC.ToBytes()[5:]) // b := util.Buffer(vt.Value.AVCC.ToBytes()[5:])

View File

@@ -1,8 +1,11 @@
package util package util
import ( import (
"context"
"errors"
"math" "math"
"sync/atomic" "sync/atomic"
"time"
) )
// SafeChan安全的channel可以防止close后被写入的问题 // SafeChan安全的channel可以防止close后被写入的问题
@@ -46,33 +49,40 @@ func (sc *SafeChan[T]) IsFull() bool {
return atomic.LoadInt32(&sc.senders) > 0 return atomic.LoadInt32(&sc.senders) > 0
} }
var errResolved = errors.New("resolved")
type Promise[S any] struct { type Promise[S any] struct {
context.Context
context.CancelCauseFunc
context.CancelFunc
Value S Value S
c chan error
state int32 // 0 pendding 1 fullfilled -1 rejected
} }
func (r *Promise[S]) Resolve() { func (r *Promise[S]) Resolve() {
if atomic.CompareAndSwapInt32(&r.state, 0, 1) { r.CancelCauseFunc(errResolved)
r.c <- nil
close(r.c)
}
} }
func (r *Promise[S]) Reject(err error) { func (r *Promise[S]) Reject(err error) {
if atomic.CompareAndSwapInt32(&r.state, 0, -1) { r.CancelCauseFunc(err)
r.c <- err
close(r.c)
}
} }
func (p *Promise[S]) Await() error { func (p *Promise[S]) Await() (err error) {
return <-p.c <-p.Done()
err = context.Cause(p.Context)
if err == errResolved {
err = nil
}
p.CancelFunc()
return
} }
func NewPromise[S any](value S) *Promise[S] { func NewPromise[S any](value S) *Promise[S] {
ctx0, cancel0 := context.WithTimeout(context.Background(), time.Second*10)
ctx, cancel := context.WithCancelCause(ctx0)
return &Promise[S]{ return &Promise[S]{
Value: value, Value: value,
c: make(chan error, 1), Context: ctx,
CancelCauseFunc: cancel,
CancelFunc: cancel0,
} }
} }