Files
lkm/api_gb.go
2024-10-22 11:23:43 +08:00

188 lines
5.1 KiB
Go

package main
import (
"fmt"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/lkm/gb28181"
"github.com/lkmio/lkm/log"
"github.com/lkmio/lkm/stream"
"net"
"net/http"
"strings"
)
type GBForwardParams struct {
Source string `json:"source"` //GetSourceID
Addr string `json:"addr"`
SSRC uint32 `json:"ssrc"`
Setup string `json:"setup"`
}
type GBSourceParams struct {
Source string `json:"source"` //GetSourceID
Setup string `json:"setup"` //active/passive
SSRC uint32 `json:"ssrc,omitempty"`
}
type GBConnect struct {
Source string `json:"source"` //GetSourceID
RemoteAddr string `json:"remote_addr"`
}
func (api *ApiServer) OnGBSourceCreate(v *GBSourceParams, w http.ResponseWriter, r *http.Request) {
log.Sugar.Infof("创建国标源: %v", v)
// 返回收流地址
response := &struct {
IP string `json:"ip"`
Port int `json:"port,omitempty"`
}{}
var err error
// 响应错误消息
defer func() {
if err != nil {
log.Sugar.Errorf(err.Error())
httpResponse2(w, err)
}
}()
source := stream.SourceManager.Find(v.Source)
if source != nil {
log.Sugar.Errorf("创建国标源失败, %s已经存在", v.Source)
err = &MalformedRequest{Code: http.StatusBadRequest, Msg: fmt.Sprintf("创建国标源失败, %s已经存在", v.Source)}
return
}
tcp := true
var active bool
if v.Setup == "passive" {
} else if v.Setup == "active" {
active = true
} else {
tcp = false
//udp收流
}
if tcp && active {
if !stream.AppConfig.GB28181.IsMultiPort() {
err = &MalformedRequest{Code: http.StatusBadRequest, Msg: "创建国标源失败, 单端口模式下不能主动拉流"}
} else if !tcp {
err = &MalformedRequest{Code: http.StatusBadRequest, Msg: "创建国标源失败, UDP不能主动拉流"}
} else if !stream.AppConfig.GB28181.IsEnableTCP() {
err = &MalformedRequest{Code: http.StatusBadRequest, Msg: "创建国标源失败, 未开启TCP, UDP不能主动拉流"}
}
if err != nil {
return
}
}
_, port, err := gb28181.NewGBSource(v.Source, v.SSRC, tcp, active)
if err != nil {
err = &MalformedRequest{Code: http.StatusInternalServerError, Msg: fmt.Sprintf("创建国标源失败 err:%s", err.Error())}
return
}
response.IP = stream.AppConfig.PublicIP
response.Port = port
httpResponseOK(w, response)
}
func (api *ApiServer) OnGBSourceConnect(v *GBConnect, w http.ResponseWriter, r *http.Request) {
log.Sugar.Infof("设置国标主动拉流连接地址: %v", v)
var err error
defer func() {
if err != nil {
log.Sugar.Errorf(err.Error())
httpResponse2(w, err)
}
}()
source := stream.SourceManager.Find(v.Source)
if source == nil {
log.Sugar.Errorf("设置主动拉流失败, %s源不存在", v.Source)
err = &MalformedRequest{Code: http.StatusBadRequest, Msg: "gb28181 source 不存在"}
return
}
activeSource, ok := source.(*gb28181.ActiveSource)
if !ok {
log.Sugar.Errorf("设置主动拉流失败, %s源不是Active拉流类型", v.Source)
err = &MalformedRequest{Code: http.StatusBadRequest, Msg: "gbsource 不能转为active source"}
return
}
addr, err := net.ResolveTCPAddr("tcp", v.RemoteAddr)
if err != nil {
log.Sugar.Errorf("设置主动拉流失败, err: %s", err.Error())
err = &MalformedRequest{Code: http.StatusBadRequest, Msg: "解析连接地址失败"}
return
}
err = activeSource.Connect(addr)
if err != nil {
log.Sugar.Errorf("设置主动拉流失败, err: %s", err.Error())
err = &MalformedRequest{Code: http.StatusBadRequest, Msg: fmt.Sprintf("连接Server失败 err:%s", err.Error())}
return
}
httpResponseOK(w, nil)
}
func (api *ApiServer) OnGBSourceForward(v *GBForwardParams, w http.ResponseWriter, r *http.Request) {
log.Sugar.Infof("设置国标级联转发: %v", v)
source := stream.SourceManager.Find(v.Source)
if source == nil {
log.Sugar.Infof("设置国标级联转发失败 %s源不存在", v.Source)
w.WriteHeader(http.StatusNotFound)
} else if source.GetType() != stream.SourceType28181 {
log.Sugar.Infof("设置国标级联转发失败 %s源不是国标推流类型", v.Source)
w.WriteHeader(http.StatusBadRequest)
return
}
var setup gb28181.SetupType
switch strings.ToLower(v.Setup) {
case "active":
setup = gb28181.SetupActive
break
case "passive":
setup = gb28181.SetupPassive
break
default:
setup = gb28181.SetupUDP
break
}
addr, _ := net.ResolveTCPAddr("tcp", r.RemoteAddr)
sinkId := stream.NetAddr2SinkId(addr)
// 添加随机数
if ipv4, ok := sinkId.(uint64); ok {
random := uint64(utils.RandomIntInRange(0x1000, 0xFFFF0000))
sinkId = (ipv4 & 0xFFFFFFFF00000000) | (random << 16) | (ipv4 & 0xFFFF)
}
sink, port, err := gb28181.NewForwardSink(v.SSRC, v.Addr, setup, sinkId, v.Source)
if err != nil {
log.Sugar.Errorf("设置国标级联转发 err: %s", err.Error())
w.WriteHeader(http.StatusInternalServerError)
return
}
source.AddSink(sink)
log.Sugar.Infof("设置国标级联转发成功 ID: %s", sink.GetID())
response := struct {
ID string `json:"id"` //sink id
IP string `json:"ip"`
Port int `json:"port"`
}{ID: stream.SinkId2String(sinkId), IP: stream.AppConfig.PublicIP, Port: port}
httpResponse2(w, &response)
}