diff --git a/api.go b/api.go index d8d37d9..dc87038 100644 --- a/api.go +++ b/api.go @@ -83,12 +83,18 @@ func startApiServer(addr string) { // {source}.flv和/{source}/{stream}.flv意味着, 推流id(路径)只能嵌套一层 apiServer.router.HandleFunc("/{source}.flv", filterSourceID(apiServer.onFlv, ".flv")) apiServer.router.HandleFunc("/{source}/{stream}.flv", filterSourceID(apiServer.onFlv, ".flv")) - apiServer.router.HandleFunc("/{source}.m3u8", filterSourceID(apiServer.onHLS, ".m3u8")) - apiServer.router.HandleFunc("/{source}/{stream}.m3u8", filterSourceID(apiServer.onHLS, ".m3u8")) - apiServer.router.HandleFunc("/{source}.ts", filterSourceID(apiServer.onTS, ".ts")) - apiServer.router.HandleFunc("/{source}/{stream}.ts", filterSourceID(apiServer.onTS, ".ts")) - apiServer.router.HandleFunc("/{source}.rtc", filterSourceID(apiServer.onRtc, ".rtc")) - apiServer.router.HandleFunc("/{source}/{stream}.rtc", filterSourceID(apiServer.onRtc, ".rtc")) + + if stream.AppConfig.Hls.Enable { + apiServer.router.HandleFunc("/{source}.m3u8", filterSourceID(apiServer.onHLS, ".m3u8")) + apiServer.router.HandleFunc("/{source}/{stream}.m3u8", filterSourceID(apiServer.onHLS, ".m3u8")) + apiServer.router.HandleFunc("/{source}.ts", filterSourceID(apiServer.onTS, ".ts")) + apiServer.router.HandleFunc("/{source}/{stream}.ts", filterSourceID(apiServer.onTS, ".ts")) + } + + if stream.AppConfig.WebRtc.Enable { + apiServer.router.HandleFunc("/{source}.rtc", filterSourceID(apiServer.onRtc, ".rtc")) + apiServer.router.HandleFunc("/{source}/{stream}.rtc", filterSourceID(apiServer.onRtc, ".rtc")) + } apiServer.router.HandleFunc("/api/v1/source/list", apiServer.OnSourceList) // 查询所有推流源 apiServer.router.HandleFunc("/api/v1/source/close", filterRequestBodyParams(apiServer.OnSourceClose, &IDS{})) // 关闭推流源 diff --git a/config.json b/config.json index 9fa189d..dd48902 100644 --- a/config.json +++ b/config.json @@ -34,11 +34,13 @@ }, "webrtc": { + "enable": true, "port": 8000, "transport": "UDP" }, "gb28181": { + "enable": true, "port": [50000,60000], "transport": "UDP|TCP" }, @@ -55,7 +57,7 @@ }, "hooks": { - "enable": true, + "enable": false, "timeout": 10, "on_started": "http://localhost:9000/api/v1/hook/on_started", diff --git a/main.go b/main.go index 73a0b99..999ca5d 100644 --- a/main.go +++ b/main.go @@ -3,6 +3,7 @@ package main import ( "encoding/json" "github.com/lkmio/avformat/transport" + "github.com/lkmio/avformat/utils" "github.com/lkmio/lkm/flv" "github.com/lkmio/lkm/gb28181" "github.com/lkmio/lkm/hls" @@ -15,12 +16,98 @@ import ( "net" "net/http" _ "net/http/pprof" + "os" "strconv" + "strings" "github.com/lkmio/lkm/rtmp" "github.com/lkmio/lkm/stream" ) +func readRunArgs() (map[string]string, map[string]string) { + args := os.Args + + // 运行参数项优先级高于config.json参数项 + // --disable-rtmp --enable-rtmp=11935 + // --disable-rtsp --enable-rtsp + // --disable-hls --enable-hls + // --disable-webrtc --enable-webrtc=18000 + // --disable-gb28181 --enable-gb28181 + // --disable-jt1078 --enable-jt1078=11078 + // --disable-hooks --enable-hooks + // --disable-record --enable-record + + disableOptions := map[string]string{} + enableOptions := map[string]string{} + for _, arg := range args { + // 参数忽略大小写 + arg = strings.ToLower(arg) + + var option string + var enable bool + if strings.HasPrefix(arg, "--disable-") { + option = arg[len("--disable-"):] + } else if strings.HasPrefix(arg, "--enable-") { + option = arg[len("--enable-"):] + enable = true + } else { + continue + } + + pair := strings.Split(option, "=") + var value string + if len(pair) > 1 { + value = pair[1] + } + + if enable { + enableOptions[pair[0]] = value + } else { + disableOptions[pair[0]] = value + } + } + + // 删除重叠参数, 禁用和开启同时声明时, 以开启为准. + for k := range enableOptions { + if _, ok := disableOptions[k]; ok { + delete(disableOptions, k) + } + } + + return disableOptions, enableOptions +} + +func mergeArgs(options map[string]stream.EnableConfig, disableOptions, enableOptions map[string]string) { + for k := range disableOptions { + option, ok := options[k] + utils.Assert(ok) + + option.SetEnable(false) + } + + for k, v := range enableOptions { + var port int + + if len(v) > 0 { + atoi, err := strconv.Atoi(v) + if err == nil && atoi > 0 { + port = atoi + } + } + + option, ok := options[k] + utils.Assert(ok) + + option.SetEnable(true) + + if port > 0 { + if config, ok := option.(stream.PortConfig); ok { + config.SetPort(port) + } + } + } +} + func init() { stream.RegisterTransStreamFactory(stream.TransStreamRtmp, rtmp.TransStreamFactory) stream.RegisterTransStreamFactory(stream.TransStreamHls, hls.TransStreamFactory) @@ -36,28 +123,49 @@ func init() { } stream.SetDefaultConfig(config) + + options := map[string]stream.EnableConfig{ + "rtmp": &config.Rtmp, + "rtsp": &config.Rtsp, + "hls": &config.Hls, + "webrtc": &config.WebRtc, + "gb28181": &config.GB28181, + "jt1078": &config.JT1078, + "hooks": &config.Hooks, + "record": &config.Record, + } + + disableOptions, enableOptions := readRunArgs() + mergeArgs(options, disableOptions, enableOptions) + stream.AppConfig = *config - stream.InitHookUrls() - // 设置公网IP和端口 - rtc.InitConfig() + + if stream.AppConfig.Hooks.Enable { + stream.InitHookUrls() + } + + if stream.AppConfig.WebRtc.Enable { + // 设置公网IP和端口 + rtc.InitConfig() + } // 初始化日志 log.InitLogger(zapcore.Level(stream.AppConfig.Log.Level), stream.AppConfig.Log.Name, stream.AppConfig.Log.MaxSize, stream.AppConfig.Log.MaxBackup, stream.AppConfig.Log.MaxAge, stream.AppConfig.Log.Compress) - if stream.AppConfig.GB28181.IsMultiPort() { + if stream.AppConfig.GB28181.Enable && stream.AppConfig.GB28181.IsMultiPort() { gb28181.TransportManger = transport.NewTransportManager(uint16(stream.AppConfig.GB28181.Port[0]), uint16(stream.AppConfig.GB28181.Port[1])) } - if stream.AppConfig.Rtsp.IsMultiPort() { + if stream.AppConfig.Rtsp.Enable && stream.AppConfig.Rtsp.IsMultiPort() { rtsp.TransportManger = transport.NewTransportManager(uint16(stream.AppConfig.Rtsp.Port[1]), uint16(stream.AppConfig.Rtsp.Port[2])) } + // 打印配置信息 indent, _ := json.MarshalIndent(stream.AppConfig, "", "\t") log.Sugar.Infof("server config:\r\n%s", indent) } func main() { - if stream.AppConfig.Rtmp.Enable { rtmpAddr, err := net.ResolveTCPAddr("tcp", stream.ListenAddr(stream.AppConfig.Rtmp.Port)) if err != nil { @@ -91,9 +199,9 @@ func main() { log.Sugar.Info("启动http服务 addr:", stream.ListenAddr(stream.AppConfig.Http.Port)) go startApiServer(net.JoinHostPort(stream.AppConfig.ListenIP, strconv.Itoa(stream.AppConfig.Http.Port))) - //单端口模式下, 启动时就创建收流端口 - //多端口模式下, 创建GBSource时才创建收流端口 - if !stream.AppConfig.GB28181.IsMultiPort() { + // 单端口模式下, 启动时就创建收流端口 + // 多端口模式下, 创建GBSource时才创建收流端口 + if stream.AppConfig.GB28181.Enable && !stream.AppConfig.GB28181.IsMultiPort() { if stream.AppConfig.GB28181.IsEnableUDP() { server, err := gb28181.NewUDPServer(gb28181.NewSSRCFilter(128)) if err != nil { @@ -136,6 +244,7 @@ func main() { }() } + // 开启pprof调试 err := http.ListenAndServe(":19999", nil) if err != nil { println(err) diff --git a/stream/config.go b/stream/config.go index b901f68..53ec3d2 100644 --- a/stream/config.go +++ b/stream/config.go @@ -21,33 +21,69 @@ type TransportConfig struct { Transport string `json:"transport"` //"UDP|TCP" } -type RtmpConfig struct { +type EnableConfig interface { + IsEnable() bool + + SetEnable(bool) +} + +type enableConfig struct { Enable bool `json:"enable"` - Port int `json:"port"` +} + +func (e *enableConfig) IsEnable() bool { + return e.Enable +} + +func (e *enableConfig) SetEnable(b bool) { + e.Enable = b +} + +type PortConfig interface { + GetPort() int + + SetPort(port int) +} + +type portConfig struct { + Port int `json:"port"` +} + +func (s *portConfig) GetPort() int { + return s.Port +} + +func (s *portConfig) SetPort(port int) { + s.Port = port +} + +type RtmpConfig struct { + enableConfig + portConfig } type HlsConfig struct { - Enable bool `json:"enable"` + enableConfig Dir string `json:"dir"` Duration int `json:"segment_duration"` PlaylistLength int `json:"playlist_length"` } type JT1078Config struct { - Enable bool `json:"enable"` - Port int `json:"port"` + enableConfig + portConfig } type RtspConfig struct { TransportConfig - Enable bool `json:"enable"` + enableConfig Port []int `json:"port"` Password string `json:"password"` } type RecordConfig struct { - Enable bool `json:"enable"` + enableConfig Format string `json:"format"` Dir string `json:"dir"` } @@ -66,13 +102,15 @@ type HttpConfig struct { } type GB28181Config struct { + enableConfig TransportConfig Port []int `json:"port"` } type WebRtcConfig struct { + enableConfig TransportConfig - Port int `json:"port"` + portConfig } func (g TransportConfig) IsEnableTCP() bool { @@ -122,7 +160,7 @@ func (c HlsConfig) TSFormat(sourceId string) string { } type HooksConfig struct { - Enable bool `json:"enable"` + enableConfig Timeout int64 `json:"timeout"` OnStartedUrl string `json:"on_started"` //应用启动后回调 OnPublishUrl string `json:"on_publish"` //推流回调 @@ -166,15 +204,15 @@ func (hook *HooksConfig) IsEnableOnStarted() bool { return hook.Enable && hook.OnStartedUrl != "" } -func GetStreamPlayUrls(sourceId string) []string { +func GetStreamPlayUrls(source string) []string { var urls []string if AppConfig.Rtmp.Enable { - urls = append(urls, fmt.Sprintf("rtmp://%s:%d/%s", AppConfig.PublicIP, AppConfig.Rtmp.Port, sourceId)) + urls = append(urls, fmt.Sprintf("rtmp://%s:%d/%s", AppConfig.PublicIP, AppConfig.Rtmp.Port, source)) } if AppConfig.Rtsp.Enable { - //不拼接userinfo - urls = append(urls, fmt.Sprintf("rtsp://%s:%d/%s", AppConfig.PublicIP, AppConfig.Rtsp.Port[0], sourceId)) + // 不拼接userinfo + urls = append(urls, fmt.Sprintf("rtsp://%s:%d/%s", AppConfig.PublicIP, AppConfig.Rtsp.Port[0], source)) } //if AppConfig.Http.Enable { @@ -182,13 +220,12 @@ func GetStreamPlayUrls(sourceId string) []string { //} if AppConfig.Hls.Enable { - //不拼接userinfo - urls = append(urls, fmt.Sprintf("http://%s:%d/%s.m3u8", AppConfig.PublicIP, AppConfig.Http.Port, sourceId)) + urls = append(urls, fmt.Sprintf("http://%s:%d/%s.m3u8", AppConfig.PublicIP, AppConfig.Http.Port, source)) } - urls = append(urls, fmt.Sprintf("http://%s:%d/%s.flv", AppConfig.PublicIP, AppConfig.Http.Port, sourceId)) - urls = append(urls, fmt.Sprintf("http://%s:%d/%s.rtc", AppConfig.PublicIP, AppConfig.Http.Port, sourceId)) - urls = append(urls, fmt.Sprintf("ws://%s:%d/%s.flv", AppConfig.PublicIP, AppConfig.Http.Port, sourceId)) + urls = append(urls, fmt.Sprintf("http://%s:%d/%s.flv", AppConfig.PublicIP, AppConfig.Http.Port, source)) + urls = append(urls, fmt.Sprintf("http://%s:%d/%s.rtc", AppConfig.PublicIP, AppConfig.Http.Port, source)) + urls = append(urls, fmt.Sprintf("ws://%s:%d/%s.flv", AppConfig.PublicIP, AppConfig.Http.Port, source)) return urls } @@ -264,36 +301,36 @@ func LoadConfigFile(path string) (*AppConfig_, error) { return nil, err } - config_ := AppConfig_{} - if err := json.Unmarshal(file, &config_); err != nil { + config := AppConfig_{} + if err := json.Unmarshal(file, &config); err != nil { return nil, err } - return &config_, err + return &config, err } -func SetDefaultConfig(config_ *AppConfig_) { - if !config_.GOPCache { - config_.GOPCache = true - config_.GOPBufferSize = 8196 * 1024 - config_.MergeWriteLatency = 350 +func SetDefaultConfig(config *AppConfig_) { + if !config.GOPCache { + config.GOPCache = true + config.GOPBufferSize = 8196 * 1024 + config.MergeWriteLatency = 350 log.Sugar.Warnf("强制开启GOP缓存") } - config_.GOPBufferSize = limitInt(4096*1024/8, 2048*1024*10, config_.GOPBufferSize) //最低4M码率 最高160M码率 - config_.MergeWriteLatency = limitInt(350, 2000, config_.MergeWriteLatency) //最低缓存350毫秒数据才发送 最高缓存2秒数据才发送 - config_.ProbeTimeout = limitInt(2000, 5000, config_.MergeWriteLatency) //2-5秒内必须解析完AVStream - config_.WriteTimeout = limitInt(2000, 10000, config_.WriteTimeout) - config_.WriteBufferCapacity = config_.WriteTimeout/config_.MergeWriteLatency + 1 + config.GOPBufferSize = limitInt(4096*1024/8, 2048*1024*10, config.GOPBufferSize) // 最低4M码率 最高160M码率 + config.MergeWriteLatency = limitInt(350, 2000, config.MergeWriteLatency) // 最低缓存350毫秒数据才发送 最高缓存2秒数据才发送 + config.ProbeTimeout = limitInt(2000, 5000, config.MergeWriteLatency) // 2-5秒内必须解析完AVStream + config.WriteTimeout = limitInt(2000, 10000, config.WriteTimeout) + config.WriteBufferCapacity = config.WriteTimeout/config.MergeWriteLatency + 1 - config_.Log.Level = limitInt(int(zapcore.DebugLevel), int(zapcore.FatalLevel), config_.Log.Level) - config_.Log.MaxSize = limitMin(1, config_.Log.MaxSize) - config_.Log.MaxBackup = limitMin(1, config_.Log.MaxBackup) - config_.Log.MaxAge = limitMin(1, config_.Log.MaxAge) + config.Log.Level = limitInt(int(zapcore.DebugLevel), int(zapcore.FatalLevel), config.Log.Level) + config.Log.MaxSize = limitMin(1, config.Log.MaxSize) + config.Log.MaxBackup = limitMin(1, config.Log.MaxBackup) + config.Log.MaxAge = limitMin(1, config.Log.MaxAge) - config_.IdleTimeout *= int64(time.Second) - config_.ReceiveTimeout *= int64(time.Second) - config_.Hooks.Timeout *= int64(time.Second) + config.IdleTimeout *= int64(time.Second) + config.ReceiveTimeout *= int64(time.Second) + config.Hooks.Timeout *= int64(time.Second) } func limitMin(min, value int) int {