fit engine udpate

This commit is contained in:
langhuihui
2023-08-14 10:10:53 +08:00
parent 25185a8041
commit 44522009fb
3 changed files with 30 additions and 24 deletions

View File

@@ -109,7 +109,9 @@ func (pusher *RTMPPusher) Connect() (err error) {
}
return
}
func (pusher *RTMPPusher) Disconnect() {
pusher.NetConnection.Close()
}
func (pusher *RTMPPusher) Push() error {
pusher.SendMessage(RTMP_MSG_AMF0_COMMAND, &CommandMessage{"createStream", 2})
for {
@@ -168,6 +170,11 @@ func (puller *RTMPPuller) Connect() (err error) {
}
return
}
func (puller *RTMPPuller) Disconnect() {
if puller.NetConnection != nil {
puller.NetConnection.Close()
}
}
func (puller *RTMPPuller) Pull() (err error) {
defer puller.Stop()

17
main.go
View File

@@ -4,7 +4,6 @@ import (
"context"
"net/http"
"strconv"
"time"
"go.uber.org/zap"
. "m7s.live/engine/v4"
@@ -47,9 +46,9 @@ func (c *RTMPConfig) OnEvent(event any) {
RTMPPlugin.Error("push", zap.String("streamPath", v.Target.Path), zap.String("url", url), zap.Error(err))
}
}
case *Stream: //按需拉流
if url, ok := c.PullOnSub[v.Path]; ok {
pull(v.Path, url)
case InvitePublish: //按需拉流
if url, ok := c.PullOnSub[v.Target]; ok {
pull(v.Target, url)
}
}
}
@@ -72,24 +71,24 @@ func filterStreams() (ss []*Stream) {
}
func (*RTMPConfig) API_list(w http.ResponseWriter, r *http.Request) {
util.ReturnJson(filterStreams, time.Second, w, r)
util.ReturnFetchValue(filterStreams, w, r)
}
func (*RTMPConfig) API_Pull(rw http.ResponseWriter, r *http.Request) {
save, _ := strconv.Atoi(r.URL.Query().Get("save"))
err := RTMPPlugin.Pull(r.URL.Query().Get("streamPath"), r.URL.Query().Get("target"), new(RTMPPuller), save)
if err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
util.ReturnError(util.APIErrorQueryParse, err.Error(), rw, r)
} else {
rw.Write([]byte("ok"))
util.ReturnOK(rw, r)
}
}
func (*RTMPConfig) API_Push(rw http.ResponseWriter, r *http.Request) {
err := RTMPPlugin.Push(r.URL.Query().Get("streamPath"), r.URL.Query().Get("target"), new(RTMPPusher), r.URL.Query().Has("save"))
if err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
util.ReturnError(util.APIErrorQueryParse, err.Error(), rw, r)
} else {
rw.Write([]byte("ok"))
util.ReturnOK(rw, r)
}
}

View File

@@ -5,7 +5,6 @@ import (
"fmt"
"io"
"net"
"sync/atomic"
"go.uber.org/zap"
"m7s.live/engine/v4"
@@ -20,8 +19,6 @@ func (ns *NetStream) Begin() {
ns.SendStreamID(RTMP_USER_STREAM_BEGIN, ns.StreamID)
}
var gstreamid uint32
type RTMPSubscriber struct {
RTMPSender
}
@@ -35,12 +32,14 @@ func (s *RTMPSubscriber) OnEvent(event any) {
}
func (config *RTMPConfig) ServeTCP(conn net.Conn) {
defer conn.Close()
zapRemote := zap.String("remote", conn.RemoteAddr().String())
logger := RTMPPlugin.Logger.With(zap.String("remote", conn.RemoteAddr().String()))
senders := make(map[uint32]*RTMPSubscriber)
receivers := make(map[uint32]*RTMPReceiver)
var err error
logger.Info("conn")
defer func() {
ze := zap.Error(err)
logger.Info("conn close", ze)
for _, sender := range senders {
sender.Stop(ze)
}
@@ -53,10 +52,11 @@ func (config *RTMPConfig) ServeTCP(conn net.Conn) {
defer cancel()
/* Handshake */
if err = nc.Handshake(); err != nil {
RTMPPlugin.Error("handshake", zap.Error(err), zapRemote)
logger.Error("handshake", zap.Error(err))
return
}
var msg *Chunk
var gstreamid uint32
for {
if msg, err = nc.RecvMessage(); err == nil {
if msg.MessageLength <= 0 {
@@ -68,7 +68,7 @@ func (config *RTMPConfig) ServeTCP(conn net.Conn) {
break
}
cmd := msg.MsgData.(Commander).GetCommand()
RTMPPlugin.Debug("recv cmd", zap.String("commandName", cmd.CommandName), zap.Uint32("streamID", msg.MessageStreamID), zapRemote)
logger.Debug("recv cmd", zap.String("commandName", cmd.CommandName), zap.Uint32("streamID", msg.MessageStreamID))
switch cmd := msg.MsgData.(type) {
case *CallMessage: //connect
app := cmd.Object["app"] // 客户端要连接到的服务应用名
@@ -80,7 +80,7 @@ func (config *RTMPConfig) ServeTCP(conn net.Conn) {
nc.objectEncoding = 0
}
nc.appName = app.(string)
RTMPPlugin.Info("connect", zap.String("appName", nc.appName), zap.Float64("objectEncoding", nc.objectEncoding), zapRemote)
logger.Info("connect", zap.String("appName", nc.appName), zap.Float64("objectEncoding", nc.objectEncoding))
err = nc.SendMessage(RTMP_MSG_ACK_SIZE, Uint32Message(512<<10))
nc.writeChunkSize = config.ChunkSize
err = nc.SendMessage(RTMP_MSG_CHUNK_SIZE, Uint32Message(config.ChunkSize))
@@ -105,9 +105,9 @@ func (config *RTMPConfig) ServeTCP(conn net.Conn) {
}
err = nc.SendMessage(RTMP_MSG_AMF0_COMMAND, m)
case *CommandMessage: // "createStream"
streamId := atomic.AddUint32(&gstreamid, 1)
RTMPPlugin.Info("createStream:", zap.Uint32("streamId", streamId), zapRemote)
nc.ResponseCreateStream(cmd.TransactionId, streamId)
gstreamid++
logger.Info("createStream:", zap.Uint32("streamId", gstreamid))
nc.ResponseCreateStream(cmd.TransactionId, gstreamid)
case *CURDStreamMessage:
if stream, ok := receivers[cmd.StreamId]; ok {
stream.Stop()
@@ -171,20 +171,20 @@ func (config *RTMPConfig) ServeTCP(conn net.Conn) {
if r, ok := receivers[msg.MessageStreamID]; ok {
r.ReceiveAudio(msg)
} else {
RTMPPlugin.Warn("ReceiveAudio", zap.Uint32("MessageStreamID", msg.MessageStreamID), zapRemote)
logger.Warn("ReceiveAudio", zap.Uint32("MessageStreamID", msg.MessageStreamID))
}
case RTMP_MSG_VIDEO:
if r, ok := receivers[msg.MessageStreamID]; ok {
r.ReceiveVideo(msg)
} else {
RTMPPlugin.Warn("ReceiveVideo", zap.Uint32("MessageStreamID", msg.MessageStreamID), zapRemote)
logger.Warn("ReceiveVideo", zap.Uint32("MessageStreamID", msg.MessageStreamID))
}
}
} else if err == io.EOF || err == io.ErrUnexpectedEOF {
RTMPPlugin.Info("rtmp client closed", zapRemote)
logger.Info("rtmp client closed")
return
} else {
RTMPPlugin.Warn("ReadMessage", zap.Error(err), zapRemote)
logger.Warn("ReadMessage", zap.Error(err))
return
}
}