支持rtmpserver收流

This commit is contained in:
DESKTOP-COJOJSE\lenovo
2023-11-10 18:07:46 +08:00
commit c468f7388c
21 changed files with 858 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
vendor/
.idea/

40
config.json Normal file
View File

@@ -0,0 +1,40 @@
{
"gop_cache": 0,
"rtmp": {
"enable": true,
"addr": "0.0.0.0:1935"
},
"rtsp": {
"enable": true,
"addr": "0.0.0.0:554",
"password": "123456"
},
"webrtc": {
"port": 8000,
"transport": "UDP"
},
"gb28181": {
"port": "50000-60000",
"transport": "UDP|TCP"
},
"record": {
"format": "mp4",
"path": ""
},
"hook": {
"on_publish": "http://localhost:8080/api/v1/live/publish/auth",
"on_publish_done": "http://localhost:8080/api/v1/live/publishdone",
"on_play" : "http://localhost:8080/api/v1/live/play/auth",
"on_play_done" : "http://localhost:8080/api/v1/live/playdone",
"on_record": "",
"on_idle_timeout": "",
"on_recv_timeout": ""
}
}

6
go.mod Normal file
View File

@@ -0,0 +1,6 @@
module github.com/yangjiechina/live-server
require github.com/yangjiechina/avformat v0.0.0
replace github.com/yangjiechina/avformat => ../avformat
go 1.19

5
main.go Normal file
View File

@@ -0,0 +1,5 @@
package main
func main() {
}

26
rtmp/rtmp_publisher.go Normal file
View File

@@ -0,0 +1,26 @@
package rtmp
import (
"github.com/yangjiechina/avformat/libflv"
"github.com/yangjiechina/live-server/stream"
)
type Publisher struct {
stream.SourceImpl
deMuxer libflv.DeMuxer
}
func NewPublisher(sourceId string) *Publisher {
publisher := &Publisher{SourceImpl: stream.SourceImpl{Id_: sourceId}}
muxer := &libflv.DeMuxer{}
muxer.SetHandler(publisher)
return publisher
}
func (p *Publisher) OnVideo(data []byte, ts uint32) {
_ = p.deMuxer.InputVideo(data, ts)
}
func (p *Publisher) OnAudio(data []byte, ts uint32) {
_ = p.deMuxer.InputAudio(data, ts)
}

56
rtmp/rtmp_server.go Normal file
View File

@@ -0,0 +1,56 @@
package rtmp
import (
"github.com/yangjiechina/avformat"
"github.com/yangjiechina/avformat/transport"
"net"
)
type IServer interface {
Start(addr net.Addr) error
Close()
}
type serverImpl struct {
tcp *transport.TCPServer
}
func (s *serverImpl) Start(addr net.Addr) error {
avformat.Assert(s.tcp == nil)
server := &transport.TCPServer{}
server.SetHandler(s)
err := server.Bind(addr.String())
if err != nil {
return err
}
s.tcp = server
return nil
}
func (s *serverImpl) Close() {
}
func (s *serverImpl) OnConnected(conn net.Conn) {
t := conn.(*transport.Conn)
t.Data = NewSession()
}
func (s *serverImpl) OnPacket(conn net.Conn, data []byte) {
t := conn.(*transport.Conn)
err := t.Data.(*sessionImpl).Input(conn, data)
if err != nil {
_ = conn.Close()
println("处理rtmp包发生错误:" + err.Error())
}
}
func (s *serverImpl) OnDisConnected(conn net.Conn, err error) {
t := conn.(*transport.Conn)
t.Data.(*sessionImpl).Close()
}

23
rtmp/rtmp_server_test.go Normal file
View File

@@ -0,0 +1,23 @@
package rtmp
import (
"net"
"testing"
)
func TestServer(t *testing.T) {
impl := serverImpl{}
addr := "0.0.0.0:1935"
tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
panic(err)
}
err = impl.Start(tcpAddr)
if err != nil {
panic(err)
}
println("启动rtmp服务成功:" + addr)
select {}
}

63
rtmp/rtmp_session.go Normal file
View File

@@ -0,0 +1,63 @@
package rtmp
import (
"github.com/yangjiechina/avformat"
"github.com/yangjiechina/avformat/librtmp"
"github.com/yangjiechina/live-server/stream"
"net"
"net/http"
)
type Session interface {
Input(conn net.Conn, data []byte) error
Close()
}
func NewSession() *sessionImpl {
impl := &sessionImpl{}
stack := librtmp.NewStack(impl)
impl.stack = stack
return impl
}
type sessionImpl struct {
stream.SessionImpl
stack *librtmp.Stack
//publisher/sink
handle interface{}
streamId string
}
func (s *sessionImpl) OnPublish(app, stream_ string, response chan avformat.HookState) {
s.streamId = app + "/" + stream_
publisher := NewPublisher(s.streamId)
s.stack.SetOnPublishHandler(publisher)
s.SessionImpl.OnPublish(publisher, nil, func() {
s.handle = publisher
response <- http.StatusOK
}, func(state avformat.HookState) {
response <- state
})
}
func (s *sessionImpl) OnPlay(app, stream string, response chan avformat.HookState) {
s.streamId = app + "/" + stream
sink := &Sink{}
s.SessionImpl.OnPlay(sink, nil, func() {
s.handle = sink
response <- http.StatusOK
}, func(state avformat.HookState) {
response <- state
})
}
func (s *sessionImpl) Input(conn net.Conn, data []byte) error {
return s.stack.Input(conn, data)
}
func (s *sessionImpl) Close() {
//TODO implement me
panic("implement me")
}

7
rtmp/rtmp_sink.go Normal file
View File

@@ -0,0 +1,7 @@
package rtmp
import "github.com/yangjiechina/live-server/stream"
type Sink struct {
stream.SinkImpl
}

View File

@@ -0,0 +1,7 @@
package rtmp
import "github.com/yangjiechina/avformat"
type TransDeMuxer struct {
avformat.DeMuxerImpl
}

56
stream/config.go Normal file
View File

@@ -0,0 +1,56 @@
package stream
type RtmpConfig struct {
Enable bool `json:"enable"`
Addr string `json:"addr"`
}
type HookConfig struct {
Enable bool `json:"enable"`
OnPublish string `json:"on_publish"` //推流回调
OnPublishDone string `json:"on_publish_done"` //推流结束回调
OnPlay string `json:"on_play"` //拉流回调
OnPlayDone string `json:"on_play_done"` //拉流结束回调
OnRecord string `json:"on_record"` //录制流回调
OnIdleTimeout string `json:"on_idle_timeout"` //多久没有sink拉流回调
OnRecvTimeout string `json:"on_recv_timeout"` //多久没有推流回调
}
func (hook *HookConfig) EnableOnPublish() bool {
return hook.OnPublish != ""
}
func (hook *HookConfig) EnableOnPublishDone() bool {
return hook.OnPublishDone != ""
}
func (hook *HookConfig) EnableOnPlay() bool {
return hook.OnPlay != ""
}
func (hook *HookConfig) EnableOnPlayDone() bool {
return hook.OnPlayDone != ""
}
func (hook *HookConfig) EnableOnRecord() bool {
return hook.OnRecord != ""
}
func (hook *HookConfig) EnableOnIdleTimeout() bool {
return hook.OnIdleTimeout != ""
}
func (hook *HookConfig) EnableOnRecvTimeout() bool {
return hook.OnRecvTimeout != ""
}
var AppConfig AppConfig_
func init() {
AppConfig = AppConfig_{}
}
type AppConfig_ struct {
Rtmp RtmpConfig
Hook HookConfig
}

89
stream/hook.go Normal file
View File

@@ -0,0 +1,89 @@
package stream
import (
"bytes"
"encoding/json"
"net/http"
)
type HookFunc func(m map[string]interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error
type Hook interface {
DoPublish(m map[string]interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error
DoPublishDone(m map[string]interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error
DoPlay(m map[string]interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error
DoPlayDone(m map[string]interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error
DoRecord(m map[string]interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error
DoIdleTimeout(m map[string]interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error
DoRecvTimeout(m map[string]interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error
}
type hookImpl struct {
}
// 每个通知的时间都需要携带的字段
type eventInfo struct {
stream string //stream id
protocol string //推拉流协议
remoteAddr string //peer地址
}
func (hook *hookImpl) send(url string, m map[string]interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error {
marshal, err := json.Marshal(m)
if err != nil {
return err
}
client := &http.Client{}
request, err := http.NewRequest("post", url, bytes.NewBuffer(marshal))
if err != nil {
return err
}
request.Header.Set("Content-Type", "application/json")
go func() {
response, err := client.Do(request)
if err != nil || response.StatusCode != http.StatusOK {
failure(response, err)
return
}
success(response)
}()
return nil
}
func (hook *hookImpl) DoPublish(m map[string]interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error {
return hook.send(AppConfig.Hook.OnPublish, m, success, failure)
}
func (hook *hookImpl) DoPublishDone(m map[string]interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error {
return hook.send(AppConfig.Hook.OnPublishDone, m, success, failure)
}
func (hook *hookImpl) DoPlay(m map[string]interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error {
return hook.send(AppConfig.Hook.OnPlay, m, success, failure)
}
func (hook *hookImpl) DoPlayDone(m map[string]interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error {
return hook.send(AppConfig.Hook.OnPlayDone, m, success, failure)
}
func (hook *hookImpl) DoRecord(m map[string]interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error {
return hook.send(AppConfig.Hook.OnRecord, m, success, failure)
}
func (hook *hookImpl) DoIdleTimeout(m map[string]interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error {
return hook.send(AppConfig.Hook.OnIdleTimeout, m, success, failure)
}
func (hook *hookImpl) DoRecvTimeout(m map[string]interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error {
return hook.send(AppConfig.Hook.OnRecvTimeout, m, success, failure)
}

22
stream/hook_test.go Normal file
View File

@@ -0,0 +1,22 @@
package stream
import (
"net/http"
"testing"
)
func TestHookServer(t *testing.T) {
http.HandleFunc("/api/v1/live/publish/auth", func(writer http.ResponseWriter, request *http.Request) {
if true {
writer.WriteHeader(http.StatusOK)
} else {
writer.WriteHeader(http.StatusNonAuthoritativeInfo)
}
})
err := http.ListenAndServe(":8080", nil)
if err != nil {
panic(err)
}
}

113
stream/session.go Normal file
View File

@@ -0,0 +1,113 @@
package stream
import (
"github.com/yangjiechina/avformat"
"net/http"
)
// Session 封装推拉流Session 统一管理,统一 hook回调
type Session interface {
OnPublish(source ISource, pra map[string]interface{}, success func(), failure func(state avformat.HookState))
OnPublishDone()
OnPlay(sink ISink, pra map[string]interface{}, success func(), failure func(state avformat.HookState))
OnPlayDone(pra map[string]interface{}, success func(), failure func(state avformat.HookState))
}
type SessionImpl struct {
hookImpl
stream string //stream id
protocol string //推拉流协议
remoteAddr string //peer地址
}
// AddInfoParams 为每个需要通知的时间添加必要的信息
func (s *SessionImpl) AddInfoParams(data map[string]interface{}) {
data["stream"] = s.stream
data["protocol"] = s.protocol
data["remoteAddr"] = s.remoteAddr
}
func (s *SessionImpl) OnPublish(source_ ISource, pra map[string]interface{}, success func(), failure func(state avformat.HookState)) {
//streamId 已经被占用
source := SourceManager.Find(s.stream)
if source != nil {
failure(avformat.HookStateOccupy)
return
}
if !AppConfig.Hook.EnableOnPublish() {
if err := SourceManager.Add(source_); err != nil {
success()
} else {
failure(avformat.HookStateOccupy)
}
return
}
if pra == nil {
pra = make(map[string]interface{}, 5)
}
s.AddInfoParams(pra)
err := s.DoPublish(pra, func(response *http.Response) {
if err := SourceManager.Add(source_); err != nil {
success()
} else {
failure(avformat.HookStateOccupy)
}
}, func(response *http.Response, err error) {
failure(avformat.HookStateFailure)
})
//hook地址连接失败
if err != nil {
failure(avformat.HookStateFailure)
return
}
}
func (s *SessionImpl) OnPublishDone() {
}
func (s *SessionImpl) OnPlay(sink ISink, pra map[string]interface{}, success func(), failure func(state avformat.HookState)) {
f := func() {
source := SourceManager.Find(s.stream)
if source == nil {
AddSinkToWaitingQueue(s.stream, nil)
} else {
source.AddSink(nil)
}
}
if !AppConfig.Hook.EnableOnPlay() {
f()
success()
return
}
if pra == nil {
pra = make(map[string]interface{}, 5)
}
s.AddInfoParams(pra)
err := s.DoPlay(pra, func(response *http.Response) {
f()
success()
}, func(response *http.Response, err error) {
failure(avformat.HookStateFailure)
})
if err != nil {
failure(avformat.HookStateFailure)
return
}
}
func (s *SessionImpl) OnPlayDone(pra map[string]interface{}, success func(), failure func(state avformat.HookState)) {
}

91
stream/sink.go Normal file
View File

@@ -0,0 +1,91 @@
package stream
import "github.com/yangjiechina/avformat/utils"
type ISink interface {
Id() string
Input(data []byte)
Send(buffer utils.ByteBuffer)
SourceId() string
Protocol() int
State() int
SetState(state int)
DisableVideo() bool
SetEnableVideo(enable bool)
Close()
}
func AddSinkToWaitingQueue(streamId string, sink ISink) {
}
func RemoveSinkFromWaitingQueue(streamId, sinkId string) ISink {
return nil
}
func PopWaitingSinks(streamId string) []ISink {
return nil
}
type SinkImpl struct {
}
func (s *SinkImpl) Id() string {
//TODO implement me
panic("implement me")
}
func (s *SinkImpl) Input(data []byte) {
//TODO implement me
panic("implement me")
}
func (s *SinkImpl) Send(buffer utils.ByteBuffer) {
//TODO implement me
panic("implement me")
}
func (s *SinkImpl) SourceId() string {
//TODO implement me
panic("implement me")
}
func (s *SinkImpl) Protocol() int {
//TODO implement me
panic("implement me")
}
func (s *SinkImpl) State() int {
//TODO implement me
panic("implement me")
}
func (s *SinkImpl) SetState(state int) {
//TODO implement me
panic("implement me")
}
func (s *SinkImpl) DisableVideo() bool {
//TODO implement me
panic("implement me")
}
func (s *SinkImpl) SetEnableVideo(enable bool) {
//TODO implement me
panic("implement me")
}
func (s *SinkImpl) Close() {
//TODO implement me
panic("implement me")
}

154
stream/source.go Normal file
View File

@@ -0,0 +1,154 @@
package stream
import (
"github.com/yangjiechina/avformat/utils"
"github.com/yangjiechina/live-server/transcode"
)
type TransMuxerId uint32
// SourceType Source 推流类型
type SourceType uint32
// Protocol 输出协议
type Protocol uint32
const (
SourceTypeRtmp = SourceType(1)
SourceType28181 = SourceType(2)
SourceType1078 = SourceType(3)
ProtocolRtmp = Protocol(1)
ProtocolFlv = Protocol(2)
ProtocolRtsp = Protocol(3)
ProtocolHls = Protocol(4)
ProtocolRtc = Protocol(5)
)
// SessionState 推拉流Session状态
// 包含, 握手阶段、Hook授权.
type SessionState uint32
const (
SessionStateCreate = SessionState(1)
SessionStateHandshaking = SessionState(2)
SessionStateHandshakeFailure = SessionState(3)
SessionStateHandshakeDone = SessionState(4)
SessionStateTransferring = SessionState(5)
)
type ISource interface {
// Id Source的唯一ID/**
Id() string
// Input 输入推流数据
Input(data []byte)
// CreateTransDeMuxer 创建推流的解服用器
CreateTransDeMuxer() ITransDeMuxer
// CreateTranscoder 创建转码器
CreateTranscoder(src utils.AVStream, dst utils.AVStream) transcode.ITranscoder
// OriginStreams 返回推流的原始Streams
OriginStreams() []utils.AVStream
// TranscodeStreams 返回转码的Streams
TranscodeStreams() []utils.AVStream
// AddSink 添加Sink, 在此之前Sink已经握手、授权通过. 如果Source还未WriteHeader将Sink添加到等待队列.
// 匹配拉流的编码器, 创建TransMuxer或向存在TransMuxer添加Sink
AddSink(sink ISink) bool
// RemoveSink 删除Sink/**
RemoveSink(tid TransMuxerId, sinkId string) bool
// Close 关闭Source
// 停止一切封装和转发流以及转码工作
// 将Sink添加到等待队列
Close()
}
type onSourceHandler interface {
onDeMuxStream(stream utils.AVStream)
}
type SourceImpl struct {
Id_ string
type_ SourceType
state SessionState
deMuxer ITransDeMuxer
recordSink ISink
audioTranscoders []transcode.ITranscoder
videoTranscoders []transcode.ITranscoder
transcodeStreams []utils.AVStream
//所有的输出协议, 持有Sink
transMuxers map[Protocol]map[TransMuxerId]ITransMuxer
}
func (s *SourceImpl) Id() string {
//TODO implement me
panic("implement me")
}
func (s *SourceImpl) Input(data []byte) {
//TODO implement me
panic("implement me")
}
func (s *SourceImpl) CreateTransDeMuxer() ITransDeMuxer {
//TODO implement me
panic("implement me")
}
func (s *SourceImpl) CreateTranscoder(src utils.AVStream, dst utils.AVStream) transcode.ITranscoder {
//TODO implement me
panic("implement me")
}
func (s *SourceImpl) OriginStreams() []utils.AVStream {
//TODO implement me
panic("implement me")
}
func (s *SourceImpl) TranscodeStreams() []utils.AVStream {
//TODO implement me
panic("implement me")
}
func (s *SourceImpl) AddSink(sink ISink) bool {
//TODO implement me
panic("implement me")
}
func (s *SourceImpl) RemoveSink(tid TransMuxerId, sinkId string) bool {
//TODO implement me
panic("implement me")
}
func (s *SourceImpl) Close() {
//TODO implement me
panic("implement me")
}
func (s *SourceImpl) OnDeMuxStream(stream utils.AVStream) {
//TODO implement me
panic("implement me")
}
func (s *SourceImpl) OnDeMuxStreamDone() {
//TODO implement me
panic("implement me")
}
func (s *SourceImpl) OnDeMuxPacket(index int, packet *utils.AVPacket2) {
//TODO implement me
panic("implement me")
}
func (s *SourceImpl) OnDeMuxDone() {
//TODO implement me
panic("implement me")
}

51
stream/source_manager.go Normal file
View File

@@ -0,0 +1,51 @@
package stream
import (
"fmt"
"sync"
)
type ISourceManager interface {
Add(source ISource) error
Find(id string) ISource
Remove(id string) (ISource, error)
}
var SourceManager ISourceManager
func init() {
SourceManager = &sourceMangerImpl{}
}
type sourceMangerImpl struct {
m sync.Map
}
func (s *sourceMangerImpl) Add(source ISource) error {
_, ok := s.m.LoadOrStore(source.Id(), source)
if ok {
return fmt.Errorf("the source %s has been exist", source.Id())
}
return nil
}
func (s *sourceMangerImpl) Find(id string) ISource {
value, ok := s.m.Load(id)
if ok {
return value.(ISource)
}
return nil
}
func (s *sourceMangerImpl) Remove(id string) (ISource, error) {
value, loaded := s.m.LoadAndDelete(id)
if loaded {
return value.(ISource), nil
}
return nil, fmt.Errorf("source with id %s was not find", id)
}

35
stream/trans_demuxer.go Normal file
View File

@@ -0,0 +1,35 @@
package stream
import "github.com/yangjiechina/avformat/utils"
// OnTransDeMuxerHandler 解复用器回调 /**
type OnTransDeMuxerHandler interface {
OnDeMuxStream(stream utils.AVStream)
OnDeMuxStreamDone()
OnDeMuxPacket(index int, packet utils.AVPacket)
OnDeMuxDone()
}
type ITransDeMuxer interface {
Input(data []byte)
SetHandler(handler OnTransDeMuxerHandler)
Close()
}
type TransDeMuxerImpl struct {
handler OnTransDeMuxerHandler
}
func (impl *TransDeMuxerImpl) Input(data []byte) {
panic("implement me")
}
func (impl *TransDeMuxerImpl) SetHandler(handler OnTransDeMuxerHandler) {
impl.handler = handler
}
func (impl *TransDeMuxerImpl) Close() {
panic("implement me")
}

4
stream/trans_muxer.go Normal file
View File

@@ -0,0 +1,4 @@
package stream
type ITransMuxer interface {
}

4
stream/trans_stream.go Normal file
View File

@@ -0,0 +1,4 @@
package stream
type TransStream struct {
}

4
transcode/transcoder.go Normal file
View File

@@ -0,0 +1,4 @@
package transcode
type ITranscoder interface {
}