feat: webrtc->rtmp h264

This commit is contained in:
langhuihui
2024-05-17 14:50:01 +08:00
parent e1cc2eda38
commit 6ab39296ef
19 changed files with 1357 additions and 621 deletions

106
api.go
View File

@@ -40,18 +40,77 @@ func (s *Server) SysInfo(context.Context, *emptypb.Empty) (res *pb.SysInfoRespon
}
func (s *Server) StreamInfo(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.StreamInfoResponse, err error) {
// s.Call(func() {
// if pub, ok := s.Streams.Get(req.StreamPath); ok {
// res = &pb.StreamInfoResponse{
// }
// } else {
// err = pkg.ErrNotFound
// }
// })
s.Call(func() {
if pub, ok := s.Streams.Get(req.StreamPath); ok {
tmp, _ := json.Marshal(pub.MetaData)
res = &pb.StreamInfoResponse{
Meta: string(tmp),
}
if t := pub.AudioTrack.AVTrack; t != nil {
res.AudioTrack = &pb.AudioTrackInfo{
Meta: t.GetInfo(),
Bps: uint32(t.BPS),
Delta: pub.AudioTrack.Delta.String(),
}
if t.ICodecCtx != nil {
res.AudioTrack.SampleRate = uint32(t.ICodecCtx.(pkg.IAudioCodecCtx).GetSampleRate())
res.AudioTrack.Channels = uint32(t.ICodecCtx.(pkg.IAudioCodecCtx).GetChannels())
}
}
if t := pub.VideoTrack.AVTrack; t != nil {
res.VideoTrack = &pb.VideoTrackInfo{
Meta: t.GetInfo(),
Bps: uint32(t.BPS),
Delta: pub.VideoTrack.Delta.String(),
Gop: uint32(pub.GOP),
}
if t.ICodecCtx != nil {
res.VideoTrack.Width = uint32(t.ICodecCtx.(pkg.IVideoCodecCtx).GetWidth())
res.VideoTrack.Height = uint32(t.ICodecCtx.(pkg.IVideoCodecCtx).GetHeight())
}
}
} else {
err = pkg.ErrNotFound
}
})
return
}
func (s *Server) AudioTrackSnap(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.AudioTrackSnapShotResponse, err error) {
func (s *Server) GetSubscribers(ctx context.Context, req *pb.SubscribersRequest) (res *pb.SubscribersResponse, err error) {
s.Call(func() {
var subscribers []*pb.SubscriberSnapShot
for _, subscriber := range s.Subscribers.Items {
meta, _ := json.Marshal(subscriber.MetaData)
snap := &pb.SubscriberSnapShot{
Id: uint32(subscriber.ID),
StartTime: timestamppb.New(subscriber.StartTime),
Meta: string(meta),
}
if ar := subscriber.AudioReader; ar != nil {
snap.AudioReader = &pb.RingReaderSnapShot{
Sequence: uint32(ar.Value.Sequence),
Timestamp: ar.AbsTime,
Delay: ar.Delay,
State: int32(ar.State),
}
}
if vr := subscriber.VideoReader; vr != nil {
snap.VideoReader = &pb.RingReaderSnapShot{
Sequence: uint32(vr.Value.Sequence),
Timestamp: vr.AbsTime,
Delay: vr.Delay,
State: int32(vr.State),
}
}
subscribers = append(subscribers, snap)
}
res = &pb.SubscribersResponse{
List: subscribers,
Total: int32(s.Subscribers.Length),
}
})
return
}
func (s *Server) AudioTrackSnap(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.TrackSnapShotResponse, err error) {
// s.Call(func() {
// if pub, ok := s.Streams.Get(req.StreamPath); ok {
// res = pub.AudioSnapShot()
@@ -62,16 +121,21 @@ func (s *Server) AudioTrackSnap(ctx context.Context, req *pb.StreamSnapRequest)
return
}
func (s *Server) VideoTrackSnap(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.VideoTrackSnapShotResponse, err error) {
func (s *Server) VideoTrackSnap(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.TrackSnapShotResponse, err error) {
s.Call(func() {
if pub, ok := s.Streams.Get(req.StreamPath); ok {
res = &pb.VideoTrackSnapShotResponse{}
res = &pb.TrackSnapShotResponse{}
if !pub.VideoTrack.IsEmpty() {
vcc := pub.VideoTrack.AVTrack.ICodecCtx.(pkg.IVideoCodecCtx)
res.Width = uint32(vcc.GetWidth())
res.Height = uint32(vcc.GetHeight())
res.Info = pub.VideoTrack.GetInfo()
pub.VideoTrack.Ring.Next().Do(func(v *pkg.AVFrame) {
// vcc := pub.VideoTrack.AVTrack.ICodecCtx.(pkg.IVideoCodecCtx)
res.Reader = make(map[uint32]uint32)
for sub := range pub.Subscribers {
if sub.VideoReader == nil {
continue
}
res.Reader[uint32(sub.ID)] = sub.VideoReader.Value.Sequence
}
pub.VideoTrack.Ring.Do(func(v *pkg.AVFrame) {
if v.TryRLock() {
var snap pb.TrackSnapShot
snap.Sequence = v.Sequence
snap.Timestamp = uint32(v.Timestamp / time.Millisecond)
@@ -86,6 +150,8 @@ func (s *Server) VideoTrackSnap(ctx context.Context, req *pb.StreamSnapRequest)
}
}
res.Ring = append(res.Ring, &snap)
v.RUnlock()
}
})
}
} else {
@@ -111,7 +177,7 @@ func (s *Server) Shutdown(ctx context.Context, req *pb.RequestWithId) (res *empt
return empty, err
}
func (s *Server) StopSubscribe(ctx context.Context, req *pb.StopSubscribeRequest) (res *pb.StopSubscribeResponse, err error) {
func (s *Server) StopSubscribe(ctx context.Context, req *pb.RequestWithId) (res *pb.SuccessResponse, err error) {
s.Call(func() {
if subscriber, ok := s.Subscribers.Get(int(req.Id)); ok {
subscriber.Stop(errors.New("stop by api"))
@@ -119,7 +185,7 @@ func (s *Server) StopSubscribe(ctx context.Context, req *pb.StopSubscribeRequest
err = pkg.ErrNotFound
}
})
return &pb.StopSubscribeResponse{
return &pb.SuccessResponse{
Success: err == nil,
}, err
}
@@ -265,7 +331,7 @@ func (s *Server) GetConfig(_ context.Context, req *pb.GetConfigRequest) (res *pb
return
}
func (s *Server) ModifyConfig(_ context.Context, req *pb.ModifyConfigRequest) (res *pb.ModifyConfigResponse, err error) {
func (s *Server) ModifyConfig(_ context.Context, req *pb.ModifyConfigRequest) (res *pb.SuccessResponse, err error) {
var conf *config.Config
if req.Name == "global" {
conf = &s.Config

View File

@@ -1,5 +1,12 @@
global:
# loglevel: debug
tcp:
listenaddr: :50051
console:
secret: de2c0bb9fd47684adc07a426e139239b
webrtc:
publish:
pubaudio: false
rtmp:
chunksize: 2048
subscribe:

View File

@@ -3,7 +3,9 @@ global:
http:
listenaddr: :8081
listenaddrtls: :8555
disableall: true
rtmp:
enable: true
chunksize: 2048
tcp:
listenaddr:

View File

View File

@@ -7,13 +7,15 @@ import (
"m7s.live/m7s/v5"
_ "m7s.live/m7s/v5/plugin/debug"
_ "m7s.live/m7s/v5/plugin/hdl"
_ "m7s.live/m7s/v5/plugin/webrtc"
_ "m7s.live/m7s/v5/plugin/rtmp"
_ "m7s.live/m7s/v5/plugin/console"
)
func main() {
ctx := context.Background()
// ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Second*100))
go m7s.Run(ctx, "config1.yaml")
time.Sleep(time.Second * 10)
time.Sleep(time.Second * 20)
m7s.NewServer().Run(ctx, "config2.yaml")
}

File diff suppressed because it is too large Load Diff

View File

@@ -260,6 +260,76 @@ func local_request_Global_StreamInfo_0(ctx context.Context, marshaler runtime.Ma
}
var (
filter_Global_GetSubscribers_0 = &utilities.DoubleArray{Encoding: map[string]int{"streamPath": 0}, Base: []int{1, 1, 0}, Check: []int{0, 1, 2}}
)
func request_Global_GetSubscribers_0(ctx context.Context, marshaler runtime.Marshaler, client GlobalClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq SubscribersRequest
var metadata runtime.ServerMetadata
var (
val string
ok bool
err error
_ = err
)
val, ok = pathParams["streamPath"]
if !ok {
return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "streamPath")
}
protoReq.StreamPath, err = runtime.String(val)
if err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "streamPath", err)
}
if err := req.ParseForm(); err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
if err := runtime.PopulateQueryParameters(&protoReq, req.Form, filter_Global_GetSubscribers_0); err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
msg, err := client.GetSubscribers(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func local_request_Global_GetSubscribers_0(ctx context.Context, marshaler runtime.Marshaler, server GlobalServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq SubscribersRequest
var metadata runtime.ServerMetadata
var (
val string
ok bool
err error
_ = err
)
val, ok = pathParams["streamPath"]
if !ok {
return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "streamPath")
}
protoReq.StreamPath, err = runtime.String(val)
if err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "streamPath", err)
}
if err := req.ParseForm(); err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
if err := runtime.PopulateQueryParameters(&protoReq, req.Form, filter_Global_GetSubscribers_0); err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
msg, err := server.GetSubscribers(ctx, &protoReq)
return msg, metadata, err
}
func request_Global_AudioTrackSnap_0(ctx context.Context, marshaler runtime.Marshaler, client GlobalClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq StreamSnapRequest
var metadata runtime.ServerMetadata
@@ -365,7 +435,7 @@ func local_request_Global_VideoTrackSnap_0(ctx context.Context, marshaler runtim
}
func request_Global_StopSubscribe_0(ctx context.Context, marshaler runtime.Marshaler, client GlobalClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq StopSubscribeRequest
var protoReq RequestWithId
var metadata runtime.ServerMetadata
if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && err != io.EOF {
@@ -395,7 +465,7 @@ func request_Global_StopSubscribe_0(ctx context.Context, marshaler runtime.Marsh
}
func local_request_Global_StopSubscribe_0(ctx context.Context, marshaler runtime.Marshaler, server GlobalServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq StopSubscribeRequest
var protoReq RequestWithId
var metadata runtime.ServerMetadata
if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && err != io.EOF {
@@ -744,6 +814,31 @@ func RegisterGlobalHandlerServer(ctx context.Context, mux *runtime.ServeMux, ser
})
mux.Handle("GET", pattern_Global_GetSubscribers_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
var stream runtime.ServerTransportStream
ctx = grpc.NewContextWithServerTransportStream(ctx, &stream)
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/m7s.Global/GetSubscribers", runtime.WithHTTPPathPattern("/api/subscribers/{streamPath=**}"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := local_request_Global_GetSubscribers_0(annotatedContext, inboundMarshaler, server, req, pathParams)
md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer())
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Global_GetSubscribers_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("GET", pattern_Global_AudioTrackSnap_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
@@ -1067,6 +1162,28 @@ func RegisterGlobalHandlerClient(ctx context.Context, mux *runtime.ServeMux, cli
})
mux.Handle("GET", pattern_Global_GetSubscribers_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/m7s.Global/GetSubscribers", runtime.WithHTTPPathPattern("/api/subscribers/{streamPath=**}"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_Global_GetSubscribers_0(annotatedContext, inboundMarshaler, client, req, pathParams)
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Global_GetSubscribers_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("GET", pattern_Global_AudioTrackSnap_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
@@ -1215,6 +1332,8 @@ var (
pattern_Global_StreamInfo_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 3, 0, 4, 1, 5, 3}, []string{"api", "stream", "info", "streamPath"}, ""))
pattern_Global_GetSubscribers_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 3, 0, 4, 1, 5, 2}, []string{"api", "subscribers", "streamPath"}, ""))
pattern_Global_AudioTrackSnap_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 3, 0, 4, 1, 5, 3}, []string{"api", "audiotrack", "snap", "streamPath"}, ""))
pattern_Global_VideoTrackSnap_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 3, 0, 4, 1, 5, 3}, []string{"api", "videotrack", "snap", "streamPath"}, ""))
@@ -1241,6 +1360,8 @@ var (
forward_Global_StreamInfo_0 = runtime.ForwardResponseMessage
forward_Global_GetSubscribers_0 = runtime.ForwardResponseMessage
forward_Global_AudioTrackSnap_0 = runtime.ForwardResponseMessage
forward_Global_VideoTrackSnap_0 = runtime.ForwardResponseMessage

View File

@@ -37,17 +37,22 @@ service Global {
get: "/api/stream/info/{streamPath=**}"
};
}
rpc AudioTrackSnap (StreamSnapRequest) returns (AudioTrackSnapShotResponse) {
rpc GetSubscribers(SubscribersRequest) returns (SubscribersResponse) {
option (google.api.http) = {
get: "/api/subscribers/{streamPath=**}"
};
}
rpc AudioTrackSnap (StreamSnapRequest) returns (TrackSnapShotResponse) {
option (google.api.http) = {
get: "/api/audiotrack/snap/{streamPath=**}"
};
}
rpc VideoTrackSnap (StreamSnapRequest) returns (VideoTrackSnapShotResponse) {
rpc VideoTrackSnap (StreamSnapRequest) returns (TrackSnapShotResponse) {
option (google.api.http) = {
get: "/api/videotrack/snap/{streamPath=**}"
};
}
rpc StopSubscribe (StopSubscribeRequest) returns (StopSubscribeResponse) {
rpc StopSubscribe (RequestWithId) returns (SuccessResponse) {
option (google.api.http) = {
post: "/api/stop/subscribe/{id}"
body: "*"
@@ -63,7 +68,7 @@ service Global {
get: "/api/config/formily/{name}"
};
}
rpc ModifyConfig (ModifyConfigRequest) returns (ModifyConfigResponse) {
rpc ModifyConfig (ModifyConfigRequest) returns (SuccessResponse) {
option (google.api.http) = {
post: "/api/config/modify/{name}"
body: "yaml"
@@ -98,10 +103,6 @@ message ModifyConfigRequest {
string yaml = 2;
}
message ModifyConfigResponse {
bool success = 1;
}
message NetWorkInfo {
string name = 1;
uint64 receive = 2;
@@ -160,7 +161,9 @@ message StreamSnapRequest {
}
message StreamInfoResponse {
string meta = 1;
AudioTrackInfo audioTrack = 2;
VideoTrackInfo videoTrack = 3;
}
message Wrap {
@@ -177,28 +180,60 @@ message TrackSnapShot {
repeated Wrap wrap = 5;
}
message AudioTrackSnapShotResponse {
message AudioTrackInfo {
string delta = 1;
string meta = 2;
uint32 bps = 3;
uint32 sampleRate = 4;
uint32 channels = 5;
}
message TrackSnapShotResponse {
repeated TrackSnapShot ring = 1;
uint32 sampleRate = 2;
uint32 channels = 3;
string info = 4;
map<uint32, uint32> reader = 2;
}
message VideoTrackSnapShotResponse {
repeated TrackSnapShot ring = 1;
uint32 width = 2;
uint32 height = 3;
string info = 4;
message VideoTrackInfo {
string delta = 1;
string meta = 2;
uint32 bps = 3;
uint32 width = 4;
uint32 height = 5;
uint32 gop = 6;
}
message StopSubscribeRequest {
uint32 id = 1;
}
message StopSubscribeResponse {
message SuccessResponse {
bool success = 1;
}
message RequestWithId {
uint32 id = 1;
}
message SubscribersRequest {
string streamPath = 1;
int32 pageNum = 2;
int32 pageSize = 3;
}
message RingReaderSnapShot {
uint32 sequence = 1;
uint32 timestamp = 2;
uint32 delay = 3;
int32 state = 4;
}
message SubscriberSnapShot {
uint32 id = 1;
google.protobuf.Timestamp startTime = 2;
RingReaderSnapShot audioReader = 3;
RingReaderSnapShot videoReader = 4;
string meta = 5;
}
message SubscribersResponse {
int32 total = 1;
int32 pageNum = 2;
int32 pageSize = 3;
repeated SubscriberSnapShot list = 4;
}

View File

@@ -29,12 +29,13 @@ type GlobalClient interface {
Restart(ctx context.Context, in *RequestWithId, opts ...grpc.CallOption) (*emptypb.Empty, error)
StreamList(ctx context.Context, in *StreamListRequest, opts ...grpc.CallOption) (*StreamListResponse, error)
StreamInfo(ctx context.Context, in *StreamSnapRequest, opts ...grpc.CallOption) (*StreamInfoResponse, error)
AudioTrackSnap(ctx context.Context, in *StreamSnapRequest, opts ...grpc.CallOption) (*AudioTrackSnapShotResponse, error)
VideoTrackSnap(ctx context.Context, in *StreamSnapRequest, opts ...grpc.CallOption) (*VideoTrackSnapShotResponse, error)
StopSubscribe(ctx context.Context, in *StopSubscribeRequest, opts ...grpc.CallOption) (*StopSubscribeResponse, error)
GetSubscribers(ctx context.Context, in *SubscribersRequest, opts ...grpc.CallOption) (*SubscribersResponse, error)
AudioTrackSnap(ctx context.Context, in *StreamSnapRequest, opts ...grpc.CallOption) (*TrackSnapShotResponse, error)
VideoTrackSnap(ctx context.Context, in *StreamSnapRequest, opts ...grpc.CallOption) (*TrackSnapShotResponse, error)
StopSubscribe(ctx context.Context, in *RequestWithId, opts ...grpc.CallOption) (*SuccessResponse, error)
GetConfig(ctx context.Context, in *GetConfigRequest, opts ...grpc.CallOption) (*GetConfigResponse, error)
GetFormily(ctx context.Context, in *GetConfigRequest, opts ...grpc.CallOption) (*GetConfigResponse, error)
ModifyConfig(ctx context.Context, in *ModifyConfigRequest, opts ...grpc.CallOption) (*ModifyConfigResponse, error)
ModifyConfig(ctx context.Context, in *ModifyConfigRequest, opts ...grpc.CallOption) (*SuccessResponse, error)
}
type globalClient struct {
@@ -99,8 +100,17 @@ func (c *globalClient) StreamInfo(ctx context.Context, in *StreamSnapRequest, op
return out, nil
}
func (c *globalClient) AudioTrackSnap(ctx context.Context, in *StreamSnapRequest, opts ...grpc.CallOption) (*AudioTrackSnapShotResponse, error) {
out := new(AudioTrackSnapShotResponse)
func (c *globalClient) GetSubscribers(ctx context.Context, in *SubscribersRequest, opts ...grpc.CallOption) (*SubscribersResponse, error) {
out := new(SubscribersResponse)
err := c.cc.Invoke(ctx, "/m7s.Global/GetSubscribers", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *globalClient) AudioTrackSnap(ctx context.Context, in *StreamSnapRequest, opts ...grpc.CallOption) (*TrackSnapShotResponse, error) {
out := new(TrackSnapShotResponse)
err := c.cc.Invoke(ctx, "/m7s.Global/AudioTrackSnap", in, out, opts...)
if err != nil {
return nil, err
@@ -108,8 +118,8 @@ func (c *globalClient) AudioTrackSnap(ctx context.Context, in *StreamSnapRequest
return out, nil
}
func (c *globalClient) VideoTrackSnap(ctx context.Context, in *StreamSnapRequest, opts ...grpc.CallOption) (*VideoTrackSnapShotResponse, error) {
out := new(VideoTrackSnapShotResponse)
func (c *globalClient) VideoTrackSnap(ctx context.Context, in *StreamSnapRequest, opts ...grpc.CallOption) (*TrackSnapShotResponse, error) {
out := new(TrackSnapShotResponse)
err := c.cc.Invoke(ctx, "/m7s.Global/VideoTrackSnap", in, out, opts...)
if err != nil {
return nil, err
@@ -117,8 +127,8 @@ func (c *globalClient) VideoTrackSnap(ctx context.Context, in *StreamSnapRequest
return out, nil
}
func (c *globalClient) StopSubscribe(ctx context.Context, in *StopSubscribeRequest, opts ...grpc.CallOption) (*StopSubscribeResponse, error) {
out := new(StopSubscribeResponse)
func (c *globalClient) StopSubscribe(ctx context.Context, in *RequestWithId, opts ...grpc.CallOption) (*SuccessResponse, error) {
out := new(SuccessResponse)
err := c.cc.Invoke(ctx, "/m7s.Global/StopSubscribe", in, out, opts...)
if err != nil {
return nil, err
@@ -144,8 +154,8 @@ func (c *globalClient) GetFormily(ctx context.Context, in *GetConfigRequest, opt
return out, nil
}
func (c *globalClient) ModifyConfig(ctx context.Context, in *ModifyConfigRequest, opts ...grpc.CallOption) (*ModifyConfigResponse, error) {
out := new(ModifyConfigResponse)
func (c *globalClient) ModifyConfig(ctx context.Context, in *ModifyConfigRequest, opts ...grpc.CallOption) (*SuccessResponse, error) {
out := new(SuccessResponse)
err := c.cc.Invoke(ctx, "/m7s.Global/ModifyConfig", in, out, opts...)
if err != nil {
return nil, err
@@ -163,12 +173,13 @@ type GlobalServer interface {
Restart(context.Context, *RequestWithId) (*emptypb.Empty, error)
StreamList(context.Context, *StreamListRequest) (*StreamListResponse, error)
StreamInfo(context.Context, *StreamSnapRequest) (*StreamInfoResponse, error)
AudioTrackSnap(context.Context, *StreamSnapRequest) (*AudioTrackSnapShotResponse, error)
VideoTrackSnap(context.Context, *StreamSnapRequest) (*VideoTrackSnapShotResponse, error)
StopSubscribe(context.Context, *StopSubscribeRequest) (*StopSubscribeResponse, error)
GetSubscribers(context.Context, *SubscribersRequest) (*SubscribersResponse, error)
AudioTrackSnap(context.Context, *StreamSnapRequest) (*TrackSnapShotResponse, error)
VideoTrackSnap(context.Context, *StreamSnapRequest) (*TrackSnapShotResponse, error)
StopSubscribe(context.Context, *RequestWithId) (*SuccessResponse, error)
GetConfig(context.Context, *GetConfigRequest) (*GetConfigResponse, error)
GetFormily(context.Context, *GetConfigRequest) (*GetConfigResponse, error)
ModifyConfig(context.Context, *ModifyConfigRequest) (*ModifyConfigResponse, error)
ModifyConfig(context.Context, *ModifyConfigRequest) (*SuccessResponse, error)
mustEmbedUnimplementedGlobalServer()
}
@@ -194,13 +205,16 @@ func (UnimplementedGlobalServer) StreamList(context.Context, *StreamListRequest)
func (UnimplementedGlobalServer) StreamInfo(context.Context, *StreamSnapRequest) (*StreamInfoResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method StreamInfo not implemented")
}
func (UnimplementedGlobalServer) AudioTrackSnap(context.Context, *StreamSnapRequest) (*AudioTrackSnapShotResponse, error) {
func (UnimplementedGlobalServer) GetSubscribers(context.Context, *SubscribersRequest) (*SubscribersResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetSubscribers not implemented")
}
func (UnimplementedGlobalServer) AudioTrackSnap(context.Context, *StreamSnapRequest) (*TrackSnapShotResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method AudioTrackSnap not implemented")
}
func (UnimplementedGlobalServer) VideoTrackSnap(context.Context, *StreamSnapRequest) (*VideoTrackSnapShotResponse, error) {
func (UnimplementedGlobalServer) VideoTrackSnap(context.Context, *StreamSnapRequest) (*TrackSnapShotResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method VideoTrackSnap not implemented")
}
func (UnimplementedGlobalServer) StopSubscribe(context.Context, *StopSubscribeRequest) (*StopSubscribeResponse, error) {
func (UnimplementedGlobalServer) StopSubscribe(context.Context, *RequestWithId) (*SuccessResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method StopSubscribe not implemented")
}
func (UnimplementedGlobalServer) GetConfig(context.Context, *GetConfigRequest) (*GetConfigResponse, error) {
@@ -209,7 +223,7 @@ func (UnimplementedGlobalServer) GetConfig(context.Context, *GetConfigRequest) (
func (UnimplementedGlobalServer) GetFormily(context.Context, *GetConfigRequest) (*GetConfigResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetFormily not implemented")
}
func (UnimplementedGlobalServer) ModifyConfig(context.Context, *ModifyConfigRequest) (*ModifyConfigResponse, error) {
func (UnimplementedGlobalServer) ModifyConfig(context.Context, *ModifyConfigRequest) (*SuccessResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method ModifyConfig not implemented")
}
func (UnimplementedGlobalServer) mustEmbedUnimplementedGlobalServer() {}
@@ -333,6 +347,24 @@ func _Global_StreamInfo_Handler(srv interface{}, ctx context.Context, dec func(i
return interceptor(ctx, in, info, handler)
}
func _Global_GetSubscribers_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(SubscribersRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(GlobalServer).GetSubscribers(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/m7s.Global/GetSubscribers",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(GlobalServer).GetSubscribers(ctx, req.(*SubscribersRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Global_AudioTrackSnap_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(StreamSnapRequest)
if err := dec(in); err != nil {
@@ -370,7 +402,7 @@ func _Global_VideoTrackSnap_Handler(srv interface{}, ctx context.Context, dec fu
}
func _Global_StopSubscribe_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(StopSubscribeRequest)
in := new(RequestWithId)
if err := dec(in); err != nil {
return nil, err
}
@@ -382,7 +414,7 @@ func _Global_StopSubscribe_Handler(srv interface{}, ctx context.Context, dec fun
FullMethod: "/m7s.Global/StopSubscribe",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(GlobalServer).StopSubscribe(ctx, req.(*StopSubscribeRequest))
return srv.(GlobalServer).StopSubscribe(ctx, req.(*RequestWithId))
}
return interceptor(ctx, in, info, handler)
}
@@ -472,6 +504,10 @@ var Global_ServiceDesc = grpc.ServiceDesc{
MethodName: "StreamInfo",
Handler: _Global_StreamInfo_Handler,
},
{
MethodName: "GetSubscribers",
Handler: _Global_GetSubscribers_Handler,
},
{
MethodName: "AudioTrackSnap",
Handler: _Global_AudioTrackSnap_Handler,

View File

@@ -32,8 +32,7 @@ type Publish struct {
IdleTimeout time.Duration `desc:"空闲(无订阅)超时"` // 空闲(无订阅)超时
PauseTimeout time.Duration `default:"30s" desc:"暂停超时时间"` // 暂停超时
BufferTime time.Duration `desc:"缓冲长度(单位:秒)0代表取最近关键帧"` // 缓冲长度(单位:秒)0代表取最近关键帧
SpeedLimit time.Duration `default:"500ms" desc:"速度限制最大等待时间,0则不等待"` //速度限制最大等待时间
Speed float64 `default:"1" desc:"倍速"` // 倍速
Speed float64 `default:"0" desc:"倍速"` // 倍速0 为不限速
Key string `desc:"发布鉴权key"` // 发布鉴权key
SecretArgName string `default:"secret" desc:"发布鉴权参数名"` // 发布鉴权参数名
ExpireArgName string `default:"expire" desc:"发布鉴权失效时间参数名"` // 发布鉴权失效时间参数名

View File

@@ -29,7 +29,7 @@ func NewBufReader(reader io.Reader) (r *BufReader) {
}
func (r *BufReader) eat() error {
buf := r.buf.Malloc(r.BufLen)
buf := r.buf.NextN(r.BufLen)
if n, err := r.reader.Read(buf); err != nil {
return err
} else if n < r.BufLen {

View File

@@ -190,6 +190,11 @@ func (buffers *Buffers) WriteTo(w io.Writer) (n int64, err error) {
return buf.WriteTo(w)
}
func (buffers *Buffers) ReadN(n int) (r net.Buffers, actual int) {
actual = buffers.WriteNTo(n, &r)
return
}
func (buffers *Buffers) WriteNTo(n int, result *net.Buffers) (actual int) {
for actual = n; buffers.Length > 0 && n > 0; buffers.skipBuf() {
if buffers.curBufLen > n {

View File

@@ -91,6 +91,9 @@ func NewScalableMemoryAllocator(size int) (ret *ScalableMemoryAllocator) {
}
func (sma *ScalableMemoryAllocator) Malloc(size int) (memory []byte) {
if sma == nil {
return make([]byte, size)
}
memory, _, _, _ = sma.Malloc2(size)
return memory
}
@@ -112,6 +115,9 @@ func (sma *ScalableMemoryAllocator) GetScalableMemoryAllocator() *ScalableMemory
return sma
}
func (sma *ScalableMemoryAllocator) Free(mem []byte) bool {
if sma == nil {
return false
}
ptr := uintptr(unsafe.Pointer(&mem[:1][0]))
for _, child := range *sma {
if start := int(int64(ptr) - child.start); child.Free2(start, start+len(mem)) {
@@ -144,6 +150,20 @@ func (r *RecyclableMemory) Malloc(size int) (memory []byte) {
return ret
}
func (r *RecyclableMemory) Pop() []int {
l := len(r.mem)
if l == 0 {
return nil
}
ret := r.mem[l-3:]
r.mem = r.mem[:l-3]
return ret
}
func (r *RecyclableMemory) Push(args ...int) {
r.mem = append(r.mem, args...)
}
func (r *RecyclableMemory) Recycle() {
for i := 0; i < len(r.mem); i += 3 {
r.Free2(r.mem[i], r.mem[i+1], r.mem[i+2])
@@ -157,6 +177,9 @@ func (r *RecyclableMemory) RecycleBack(n int) {
start := *end - n
r.Free2(r.mem[l-3], start, *end)
*end = start
if start == r.mem[l-2] {
r.mem = r.mem[:l-3]
}
}
type RecyclableBuffers struct {
@@ -164,7 +187,7 @@ type RecyclableBuffers struct {
Buffers
}
func (r *RecyclableBuffers) Malloc(size int) (memory []byte) {
func (r *RecyclableBuffers) NextN(size int) (memory []byte) {
memory = r.ScalableMemoryAllocator.Malloc(size)
r.Buffers.ReadFromBytes(memory)
return

View File

@@ -73,6 +73,7 @@ func (plugin *PluginMeta) Init(s *Server, userConfig map[string]any) {
}
if p.Disabled {
p.Warn("plugin disabled")
return
} else {
p.assign()
}

View File

@@ -79,6 +79,20 @@ func (avcc *RTMPVideo) Parse(t *AVTrack) (isIDR, isSeq bool, raw any, err error)
if err = parseSequence(); err != nil {
return
}
} else {
// var naluLen int
// for reader.Length > 0 {
// naluLen, err = reader.ReadBE(4) // naluLenM
// if err != nil {
// return
// }
// _, n := reader.ReadN(naluLen)
// fmt.Println(avcc.Timestamp, n)
// if n != naluLen {
// err = fmt.Errorf("naluLen:%d != n:%d", naluLen, n)
// return
// }
// }
}
}
return
@@ -112,7 +126,10 @@ func (avcc *RTMPVideo) DecodeConfig(t *AVTrack, from ICodecCtx) (err error) {
seqFrame.Buffers.ReadFromBytes(b)
t.SequenceFrame = seqFrame.WrapVideo()
if t.Enabled(context.TODO(), TraceLevel) {
t.Trace("decConfig", "codec", t.FourCC().String(), "size", seqFrame.GetSize(), "data", seqFrame.String())
codec := t.FourCC().String()
size := seqFrame.GetSize()
data := seqFrame.String()
t.Trace("decConfig", "codec", codec, "size", size, "data", data)
}
}
@@ -211,18 +228,18 @@ func (avcc *RTMPVideo) ToRaw(codecCtx ICodecCtx) (any, error) {
func (h264 *H264Ctx) CreateFrame(from *AVFrame) (frame IAVFrame, err error) {
var rtmpVideo RTMPVideo
rtmpVideo.Timestamp = uint32(from.Timestamp / time.Millisecond)
rtmpVideo.RecyclableBuffers = &util.RecyclableBuffers{}
rtmpVideo.ScalableMemoryAllocator = from.Wraps[0].GetScalableMemoryAllocator()
// TODO: rtmpVideo.ScalableMemoryAllocator = from.Wraps[0].GetScalableMemoryAllocator()
nalus := from.Raw.(Nalus)
head := rtmpVideo.Malloc(5)
head := rtmpVideo.NextN(5)
head[0] = util.Conditoinal[byte](from.IDR, 0x10, 0x20) | byte(ParseVideoCodec(h264.FourCC()))
head[1] = 1
util.PutBE(head[2:5], (nalus.PTS-nalus.DTS)/90) // cts
rtmpVideo.ReadFromBytes(head)
for _, nalu := range nalus.Nalus {
naluLenM := rtmpVideo.Malloc(4)
binary.BigEndian.PutUint32(naluLenM, uint32(util.LenOfBuffers(nalu)))
rtmpVideo.ReadFromBytes(naluLenM)
naluLenM := rtmpVideo.NextN(4)
naluLen := uint32(util.LenOfBuffers(nalu))
binary.BigEndian.PutUint32(naluLenM, naluLen)
rtmpVideo.ReadFromBytes(nalu...)
}
frame = &rtmpVideo

View File

@@ -50,16 +50,27 @@ func (r *RTPVideo) Parse(t *AVTrack) (isIDR, isSeq bool, raw any, err error) {
} else {
ctx = &RTPH264Ctx{}
ctx.RTPCodecParameters = *r.RTPCodecParameters
t.ICodecCtx = ctx
}
raw, err = r.ToRaw(ctx)
if err != nil {
return
}
nalus := raw.(Nalus)
if len(nalus.Nalus) > 0 {
isIDR = nalus.H264Type() == codec.NALU_IDR_Picture
}
for _, nalu := range nalus.Nalus {
switch codec.ParseH264NALUType(nalu[0][0]) {
case codec.NALU_SPS:
ctx = &RTPH264Ctx{}
ctx.SPS = [][]byte{slices.Concat(nalu...)}
ctx.SPSInfo.Unmarshal(ctx.SPS[0])
ctx.RTPCodecParameters = *r.RTPCodecParameters
t.ICodecCtx = ctx
case codec.NALU_PPS:
ctx.PPS = [][]byte{slices.Concat(nalu...)}
case codec.NALU_IDR_Picture:
isIDR = true
}
}
case webrtc.MimeTypeVP9:
// var ctx RTPVP9Ctx
// ctx.FourCC = codec.FourCC_VP9
@@ -136,22 +147,14 @@ func (h264 *RTPH264Ctx) CreateFrame(from *AVFrame) (frame IAVFrame, err error) {
}
func (r *RTPVideo) ToRaw(ictx ICodecCtx) (any, error) {
switch ctx := ictx.(type) {
switch ictx.(type) {
case *RTPH264Ctx:
var nalus Nalus
var nalu Nalu
var naluType codec.H264NALUType
gotNalu := func(t codec.H264NALUType) {
gotNalu := func() {
if len(nalu) > 0 {
switch t {
case codec.NALU_SPS:
ctx.SPS = [][]byte{slices.Concat(nalu...)}
ctx.SPSInfo.Unmarshal(ctx.SPS[0])
case codec.NALU_PPS:
ctx.PPS = [][]byte{slices.Concat(nalu...)}
default:
nalus.Nalus = append(nalus.Nalus, nalu)
}
nalu = nil
}
}
@@ -159,9 +162,10 @@ func (r *RTPVideo) ToRaw(ictx ICodecCtx) (any, error) {
packet := r.Packets[i]
nalus.PTS = time.Duration(packet.Timestamp)
nalus.DTS = nalus.PTS
if t := codec.ParseH264NALUType(packet.Payload[0]); t < 24 {
b0 := packet.Payload[0]
if t := codec.ParseH264NALUType(b0); t < 24 {
nalu = [][]byte{packet.Payload}
gotNalu(t)
gotNalu()
} else {
offset := t.Offset()
switch t {
@@ -172,17 +176,17 @@ func (r *RTPVideo) ToRaw(ictx ICodecCtx) (any, error) {
for buffer := util.Buffer(packet.Payload[offset:]); buffer.CanRead(); {
if nextSize := int(buffer.ReadUint16()); buffer.Len() >= nextSize {
nalu = [][]byte{buffer.ReadN(nextSize)}
gotNalu(codec.ParseH264NALUType(nalu[0][0]))
gotNalu()
} else {
return nil, fmt.Errorf("invalid nalu size %d", nextSize)
}
}
case codec.NALU_FUA, codec.NALU_FUB:
b1 := packet.Payload[1]
fmt.Printf("%08b\n", b1)
if util.Bit1(b1, 0) {
naluType.Parse(b1)
firstByte := naluType.Or(packet.Payload[0] & 0x60)
nalu = append([][]byte{{firstByte}}, packet.Payload[offset:])
nalu = [][]byte{{naluType.Or(b0 & 0x60)}}
}
if len(nalu) > 0 {
nalu = append(nalu, packet.Payload[offset:])
@@ -190,7 +194,7 @@ func (r *RTPVideo) ToRaw(ictx ICodecCtx) (any, error) {
return nil, errors.New("fu have no start")
}
if util.Bit1(b1, 1) {
gotNalu(naluType)
gotNalu()
}
default:
return nil, fmt.Errorf("unsupported nalu type %d", t)

View File

@@ -195,8 +195,10 @@ func (conf *WebRTCPlugin) Push_(w http.ResponseWriter, r *http.Request) {
if len(frame.Packets) == 0 || packet.Timestamp == frame.Packets[0].Timestamp {
frame.Packets = append(frame.Packets, &packet)
} else {
m := frame.Pop()
publisher.WriteAudio(frame)
frame = &mrtp.RTPAudio{}
frame.Push(m...)
frame.Packets = []*rtp.Packet{&packet}
frame.RTPCodecParameters = &codecP
frame.ScalableMemoryAllocator = mem
@@ -219,8 +221,7 @@ func (conf *WebRTCPlugin) Push_(w http.ResponseWriter, r *http.Request) {
var packet rtp.Packet
buf := frame.Malloc(1460)
if n, _, err = track.Read(buf); err == nil {
buf := buf[:n]
err = packet.Unmarshal(buf)
err = packet.Unmarshal(buf[:n])
if n < 1460 {
frame.RecycleBack(1460 - n)
}
@@ -229,16 +230,18 @@ func (conf *WebRTCPlugin) Push_(w http.ResponseWriter, r *http.Request) {
return
}
if len(packet.Payload) == 0 {
frame.RecycleBack(len(buf))
frame.RecycleBack(n)
continue
}
if len(frame.Packets) == 0 || packet.Timestamp == frame.Packets[0].Timestamp {
frame.Packets = append(frame.Packets, &packet)
} else {
// t := time.Now()
m := frame.Pop()
publisher.WriteVideo(frame)
// fmt.Println("write video", time.Since(t))
frame = &mrtp.RTPVideo{}
frame.Push(m...)
frame.Packets = []*rtp.Packet{&packet}
frame.RTPCodecParameters = &codecP
frame.ScalableMemoryAllocator = mem

View File

@@ -23,6 +23,7 @@ type SpeedControl struct {
speed float64
beginTime time.Time
beginTimestamp time.Duration
Delta time.Duration
}
func (s *SpeedControl) speedControl(speed float64, ts time.Duration) {
@@ -32,15 +33,21 @@ func (s *SpeedControl) speedControl(speed float64, ts time.Duration) {
s.beginTimestamp = ts
} else {
elapsed := time.Since(s.beginTime)
if speed == 0 {
s.Delta = ts - elapsed
return
}
should := time.Duration(float64(ts) / speed)
if needSleep := should - elapsed; needSleep > time.Second {
time.Sleep(needSleep)
s.Delta = should - elapsed
if s.Delta > time.Second {
time.Sleep(s.Delta)
}
}
}
type AVTracks struct {
*AVTrack
SpeedControl
util.Collection[reflect.Type, *AVTrack]
}
@@ -59,7 +66,6 @@ type Publisher struct {
PubSubBase
sync.RWMutex `json:"-" yaml:"-"`
config.Publish
SpeedControl
State PublisherState
VideoTrack AVTracks
AudioTrack AVTracks
@@ -100,9 +106,11 @@ func (p *Publisher) checkTimeout() (err error) {
default:
if p.PublishTimeout > 0 {
if !p.VideoTrack.IsEmpty() && !p.VideoTrack.LastValue.WriteTime.IsZero() && time.Since(p.VideoTrack.LastValue.WriteTime) > p.PublishTimeout {
p.Error("video timeout", "writeTime", p.VideoTrack.LastValue.WriteTime)
err = ErrPublishTimeout
}
if !p.AudioTrack.IsEmpty() && !p.AudioTrack.LastValue.WriteTime.IsZero() && time.Since(p.AudioTrack.LastValue.WriteTime) > p.PublishTimeout {
p.Error("audio timeout", "writeTime", p.AudioTrack.LastValue.WriteTime)
err = ErrPublishTimeout
}
}
@@ -156,7 +164,7 @@ func (p *Publisher) writeAV(t *AVTrack, data IAVFrame) {
if p.Enabled(p, TraceLevel) {
codec := t.FourCC().String()
data := frame.Wraps[0].String()
p.Trace("write", "seq", frame.Sequence, "ts", frame.Timestamp, "codec", codec, "size", bytesIn, "data", data)
p.Trace("write", "seq", frame.Sequence, "ts", uint32(frame.Timestamp/time.Millisecond), "codec", codec, "size", bytesIn, "data", data)
}
}
@@ -177,7 +185,9 @@ func (p *Publisher) WriteVideo(data IAVFrame) (err error) {
}
p.Unlock()
}
oldCodecCtx := t.ICodecCtx
isIDR, isSeq, raw, err := data.Parse(t)
codecCtxChanged := oldCodecCtx != t.ICodecCtx
if err != nil || (isSeq && !isIDR) {
p.Error("parse", "err", err)
return err
@@ -189,7 +199,7 @@ func (p *Publisher) WriteVideo(data IAVFrame) (err error) {
if idr != nil {
p.GOP = int(t.Value.Sequence - idr.Value.Sequence)
if hidr == nil {
if l := t.Size - p.GOP; l > 12 && t.Size > 100 {
if l := t.Size - p.GOP; l > 50 {
t.Debug("resize", "gop", p.GOP, "before", t.Size, "after", t.Size-5)
t.Reduce(5) //缩小缓冲环节省内存
}
@@ -226,7 +236,11 @@ func (p *Publisher) WriteVideo(data IAVFrame) (err error) {
for i, track := range p.VideoTrack.Items[1:] {
if track.ICodecCtx == nil {
err = (reflect.New(track.FrameType.Elem()).Interface().(IAVFrame)).DecodeConfig(track, t.ICodecCtx)
for rf := idr; rf != t.Next(); rf = rf.Next() {
if err != nil {
track.Error("DecodeConfig", "err", err)
return
}
for rf := idr; rf != t.Ring; rf = rf.Next() {
if i == 0 && rf.Value.Raw == nil {
rf.Value.Raw, err = rf.Value.Wraps[0].ToRaw(t.ICodecCtx)
if err != nil {
@@ -234,29 +248,26 @@ func (p *Publisher) WriteVideo(data IAVFrame) (err error) {
return err
}
}
if toFrame, err = track.CreateFrame(&rf.Value); err != nil {
track.Error("from raw", "err", err)
return
}
rf.Value.Wraps = append(rf.Value.Wraps, toFrame)
}
track.Ready.Fulfill(err)
if err != nil {
track.Error("DecodeConfig", "err", err)
return
defer track.Ready.Fulfill(err)
}
} else {
if toFrame, err = track.CreateFrame(&t.Value); err != nil {
track.Error("from raw", "err", err)
return
}
if codecCtxChanged {
toFrame.DecodeConfig(track, t.ICodecCtx)
}
t.Value.Wraps = append(t.Value.Wraps, toFrame)
}
}
}
t.Step()
p.speedControl(p.Publish.Speed, p.lastTs)
p.VideoTrack.speedControl(p.Speed, p.lastTs)
return
}
@@ -284,7 +295,7 @@ func (p *Publisher) WriteAudio(data IAVFrame) (err error) {
}
p.writeAV(t, data)
t.Step()
p.speedControl(p.Publish.Speed, p.lastTs)
p.AudioTrack.speedControl(p.Publish.Speed, p.lastTs)
return
}

View File

@@ -18,6 +18,7 @@ import (
type Owner struct {
Conn net.Conn
File *os.File
MetaData any
}
type PubSubBase struct {
@@ -50,6 +51,8 @@ func (ps *PubSubBase) Init(p *Plugin, streamPath string, options ...any) {
ps.Closer = v
case io.Closer:
ps.Closer = v
default:
ps.MetaData = v
}
}
ps.Context, ps.CancelCauseFunc = context.WithCancelCause(ctx)
@@ -63,6 +66,8 @@ type Subscriber struct {
PubSubBase
config.Subscribe
Publisher *Publisher
AudioReader *AVRingReader
VideoReader *AVRingReader
}
type SubscriberHandler struct {
@@ -94,6 +99,7 @@ func (s *Subscriber) Handle(handler SubscriberHandler) {
if at := s.Publisher.GetAudioTrack(a1); at != nil {
awi = at.WrapIndex
ar = NewAVRingReader(at)
s.AudioReader = ar
ar.Logger = s.Logger.With("reader", a1.String())
ar.Info("start read")
ah = reflect.ValueOf(handler.OnAudio)
@@ -106,6 +112,7 @@ func (s *Subscriber) Handle(handler SubscriberHandler) {
if vt := s.Publisher.GetVideoTrack(v1); vt != nil {
vwi = vt.WrapIndex
vr = NewAVRingReader(vt)
s.VideoReader = vr
vr.Logger = s.Logger.With("reader", v1.String())
vr.Info("start read")
vh = reflect.ValueOf(handler.OnVideo)
@@ -115,11 +122,9 @@ func (s *Subscriber) Handle(handler SubscriberHandler) {
createVideoReader()
defer func() {
if lastSentVF != nil {
// lastSentVF.ReaderLeave()
lastSentVF.RUnlock()
}
if lastSentAF != nil {
// lastSentAF.ReaderLeave()
lastSentAF.RUnlock()
}
}()