Compare commits

...

3 Commits

Author SHA1 Message Date
ydajiang
cac5e91471 chore: update Go version to 1.20 in Dockerfile 2025-08-06 15:58:08 +08:00
ydajiang
b57a9de773 update module mepg to v0.0.3 2025-08-06 14:10:11 +08:00
ydajiang
7806098ad6 fix: 关闭推流失败的source时, 造成相同id的source被错误关闭问题; 2025-08-06 14:05:50 +08:00
4 changed files with 15 additions and 8 deletions

View File

@@ -1,4 +1,4 @@
FROM golang:1.19-alpine as builder
FROM golang:1.20-alpine as builder
# 设置构建参数
ARG GOOS=linux
@@ -27,8 +27,6 @@ WORKDIR /build/lkm
# 将代码复制到容器中
COPY . .
COPY ./avformat /build/avformat
RUN go mod download && go mod tidy -v && go build -o lkm .
# 运行阶段指定scratch作为基础镜像

2
go.mod
View File

@@ -4,7 +4,7 @@ require (
github.com/lkmio/audio-transcoder v0.2.1
github.com/lkmio/avformat v0.0.1
github.com/lkmio/flv v0.0.2
github.com/lkmio/mpeg v0.0.2
github.com/lkmio/mpeg v0.0.3
github.com/lkmio/rtmp v0.0.2
github.com/lkmio/rtp v0.0.2
github.com/lkmio/transport v0.0.1

View File

@@ -32,6 +32,7 @@ func PreparePublishSource(source Source, hook bool) (*http.Response, utils.HookS
log.Sugar.Infof("%s准备推流 source:%s 拉流地址:\r\n%s", source.GetType().String(), source.GetID(), indent)
source.SetState(SessionStateTransferring)
return response, utils.HookStateOK
}

View File

@@ -105,8 +105,7 @@ type PublishSource struct {
createTime time.Time // source创建时间
statistics *BitrateStatistics // 码流统计
streamLogger avformat.OnUnpackStream2FileHandler
// streamLock sync.RWMutex
streamLock sync.Mutex
streamLock sync.Mutex // 收流、探测超时、关闭等操作互斥锁
timers struct {
receiveTimer *time.Timer // 收流超时计时器
@@ -139,7 +138,7 @@ func (s *PublishSource) SetID(id string) {
}
func (s *PublishSource) Init() {
s.SetState(SessionStateHandshakeSuccess)
s.SetState(SessionStateCreated)
s.statistics = NewBitrateStatistics()
s.streamPublisher = NewTransStreamPublisher(s.ID)
@@ -180,6 +179,7 @@ func (s *PublishSource) SetState(state SessionState) {
func (s *PublishSource) DoClose() {
log.Sugar.Debugf("closing the %s source. id: %s. closed flag: %t", s.Type, s.ID, s.closed.Load())
// 已关闭, 直接返回
if s.closed.Load() {
return
}
@@ -189,6 +189,7 @@ func (s *PublishSource) DoClose() {
closed = s.closed.Swap(true)
})
// 已关闭, 直接返回
if closed {
return
}
@@ -216,9 +217,16 @@ func (s *PublishSource) DoClose() {
s.clearUnusedPackets(track.Packets)
}
// 等传输流发布器关闭结束
// 停止发布输出流
// 同步执行
s.streamPublisher.close()
// 只释放prepare成功的source, 否则在关闭失败的source时, 造成id相同的source被错误释放
if s.state < SessionStateTransferring {
return
}
s.state = SessionStateClosed
// 释放解复用器
// 释放转码器
// 释放每路转协议流, 将所有sink添加到等待队列