适配引擎变更

This commit is contained in:
dexter
2022-02-08 19:32:02 +08:00
parent abb285fe5d
commit e3bed2dacb
3 changed files with 21 additions and 37 deletions

39
main.go
View File

@@ -5,38 +5,22 @@ import (
"log" "log"
. "github.com/Monibuca/engine/v4" . "github.com/Monibuca/engine/v4"
"github.com/Monibuca/engine/v4/config"
"github.com/Monibuca/engine/v4/util" "github.com/Monibuca/engine/v4/util"
. "github.com/logrusorgru/aurora" . "github.com/logrusorgru/aurora"
) )
type RTMPConfig struct { type RTMPConfig struct {
Publish PublishConfig config.Publish
Subscribe SubscribeConfig config.Subscribe
TCPConfig config.TCP
ChunkSize int ChunkSize int
context.Context
cancel context.CancelFunc
} }
var config = &RTMPConfig{ func (config *RTMPConfig) Update(override config.Config) {
Publish: DefaultPublishConfig, override.Unmarshal(config)
Subscribe: DefaultSubscribeConfig, util.Print(Green("server rtmp start at"), BrightBlue(config.ListenAddr))
ChunkSize: 4096, err := config.Listen(plugin, config)
TCPConfig: TCPConfig{ListenAddr: ":1935"},
}
func (cfg *RTMPConfig) Update(override Config) {
override.Unmarshal(cfg)
if config.cancel == nil {
util.Print(Green("server rtmp start at"), BrightBlue(config.ListenAddr))
} else if override.Has("ListenAddr") {
config.cancel()
util.Print(Green("server rtmp restart at"), BrightBlue(config.ListenAddr))
} else {
return
}
config.Context, config.cancel = context.WithCancel(Ctx)
err := cfg.Listen(cfg)
if err == context.Canceled { if err == context.Canceled {
log.Println(err) log.Println(err)
} else { } else {
@@ -44,6 +28,7 @@ func (cfg *RTMPConfig) Update(override Config) {
} }
} }
func init() { var plugin = InstallPlugin(&RTMPConfig{
InstallPlugin(config) ChunkSize: 4096,
} TCP: config.TCP{ListenAddr: ":1935"},
})

View File

@@ -7,7 +7,7 @@ import (
"io" "io"
"net" "net"
"github.com/Monibuca/engine/v4" . "github.com/Monibuca/engine/v4"
"github.com/Monibuca/engine/v4/util" "github.com/Monibuca/engine/v4/util"
) )
@@ -46,7 +46,7 @@ const (
func newConnectResponseMessageData(objectEncoding float64) (amfobj AMFObject) { func newConnectResponseMessageData(objectEncoding float64) (amfobj AMFObject) {
amfobj = make(AMFObject) amfobj = make(AMFObject)
amfobj["fmsVer"] = "monibuca/" + engine.Version amfobj["fmsVer"] = "monibuca/" + Engine.Version
amfobj["capabilities"] = 31 amfobj["capabilities"] = 31
amfobj["mode"] = 1 amfobj["mode"] = 1
amfobj["Author"] = "dexter" amfobj["Author"] = "dexter"
@@ -76,8 +76,8 @@ func newPlayResponseMessageData(streamid uint32, code, level string) (amfobj AMF
} }
type NetConnection struct { type NetConnection struct {
engine.Publisher Publisher
subscribers map[uint32]*engine.Subscriber subscribers map[uint32]*Subscriber
*bufio.Reader *bufio.Reader
*net.TCPConn *net.TCPConn
bandwidth uint32 bandwidth uint32
@@ -208,7 +208,7 @@ func (conn *NetConnection) SendCommand(message string, args any) error {
// } // }
//} //}
pro["fmsVer"] = "monibuca/" + engine.Version pro["fmsVer"] = "monibuca/" + Engine.Version
pro["capabilities"] = 31 pro["capabilities"] = 31
pro["mode"] = 1 pro["mode"] = 1
pro["Author"] = "dexter" pro["Author"] = "dexter"

View File

@@ -14,7 +14,7 @@ import (
var gstreamid = uint32(64) var gstreamid = uint32(64)
func (cfg *RTMPConfig) ServeTCP(conn *net.TCPConn) { func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) {
nc := NetConnection{ nc := NetConnection{
TCPConn: conn, TCPConn: conn,
Reader: bufio.NewReader(conn), Reader: bufio.NewReader(conn),
@@ -87,8 +87,7 @@ func (cfg *RTMPConfig) ServeTCP(conn *net.TCPConn) {
} }
case "publish": case "publish":
pm := msg.MsgData.(*PublishMessage) pm := msg.MsgData.(*PublishMessage)
nc.Config = config.Publish if nc.Publish(nc.appName+"/"+pm.PublishingName, &nc, config.Publish) {
if nc.Publish(nc.appName+"/"+pm.PublishingName, &nc) {
absTs := make(map[uint32]uint32) absTs := make(map[uint32]uint32)
vt := nc.Stream.NewVideoTrack() vt := nc.Stream.NewVideoTrack()
at := nc.Stream.NewAudioTrack() at := nc.Stream.NewAudioTrack()
@@ -135,7 +134,7 @@ func (cfg *RTMPConfig) ServeTCP(conn *net.TCPConn) {
vt, at := subscriber.WaitVideoTrack(), subscriber.WaitAudioTrack() vt, at := subscriber.WaitVideoTrack(), subscriber.WaitAudioTrack()
if vt != nil { if vt != nil {
frame := vt.DecoderConfiguration frame := vt.DecoderConfiguration
err = nc.sendAVMessage(0, frame.AVCC, false, true) err = nc.sendAVMessage(0, net.Buffers(frame.AVCC), false, true)
subscriber.OnVideo = func(frame *engine.VideoFrame) error { subscriber.OnVideo = func(frame *engine.VideoFrame) error {
return nc.sendAVMessage(frame.DeltaTime, frame.AVCC, false, false) return nc.sendAVMessage(frame.DeltaTime, frame.AVCC, false, false)
} }
@@ -144,7 +143,7 @@ func (cfg *RTMPConfig) ServeTCP(conn *net.TCPConn) {
subscriber.OnAudio = func(frame *engine.AudioFrame) (err error) { subscriber.OnAudio = func(frame *engine.AudioFrame) (err error) {
if at.CodecID == codec.CodecID_AAC { if at.CodecID == codec.CodecID_AAC {
frame := at.DecoderConfiguration frame := at.DecoderConfiguration
err = nc.sendAVMessage(0, frame.AVCC, true, true) err = nc.sendAVMessage(0, net.Buffers{frame.AVCC}, true, true)
} else { } else {
err = nc.sendAVMessage(0, frame.AVCC, true, true) err = nc.sendAVMessage(0, frame.AVCC, true, true)
} }