mirror of
https://github.com/lkmio/lkm.git
synced 2025-09-26 19:21:14 +08:00
feat: 支持国标语音广播
This commit is contained in:
13
api.go
13
api.go
@@ -104,19 +104,18 @@ func startApiServer(addr string) {
|
||||
apiServer.router.HandleFunc("/api/v1/streams/statistics", nil) // 统计所有推拉流
|
||||
|
||||
if stream.AppConfig.GB28181.Enable {
|
||||
apiServer.router.HandleFunc("/api/v1/gb28181/forward", filterRequestBodyParams(apiServer.OnGBSourceForward, &GBForwardParams{})) // 设置级联转发目标,停止级联调用sink/close接口,级联断开会走on_play_done事件通知
|
||||
apiServer.router.HandleFunc("/api/v1/gb28181/source/create", filterRequestBodyParams(apiServer.OnGBSourceCreate, &GBSourceParams{})) // 创建国标推流源
|
||||
apiServer.router.HandleFunc("/api/v1/gb28181/source/connect", filterRequestBodyParams(apiServer.OnGBSourceConnect, &GBConnect{})) // 为国标TCP主动推流,设置连接地址
|
||||
apiServer.router.HandleFunc("/ws/v1/gb28181/talk", apiServer.OnGBTalk) // 对讲的主讲人WebSocket连接
|
||||
apiServer.router.HandleFunc("/api/v1/gb28181/offer/create", filterRequestBodyParams(apiServer.OnGBOfferCreate, &SourceSDP{}))
|
||||
apiServer.router.HandleFunc("/api/v1/gb28181/answer/create", filterRequestBodyParams(apiServer.OnGBAnswerCreate, &GBOffer{}))
|
||||
apiServer.router.HandleFunc("/api/v1/gb28181/answer/set", filterRequestBodyParams(apiServer.OnGBSourceConnect, &SourceSDP{})) // active拉流模式下, 设置对方的地址
|
||||
}
|
||||
|
||||
apiServer.router.HandleFunc("/api/v1/gc/force", func(writer http.ResponseWriter, request *http.Request) {
|
||||
runtime.GC()
|
||||
writer.WriteHeader(http.StatusOK)
|
||||
})
|
||||
|
||||
apiServer.router.HandleFunc("/rtc.html", func(writer http.ResponseWriter, request *http.Request) {
|
||||
http.ServeFile(writer, request, "./rtc.html")
|
||||
})
|
||||
apiServer.router.PathPrefix("/web/").Handler(http.StripPrefix("/web/", http.FileServer(http.Dir("./web"))))
|
||||
|
||||
http.Handle("/", apiServer.router)
|
||||
|
||||
srv := &http.Server{
|
||||
|
175
api_gb.go
175
api_gb.go
@@ -2,6 +2,7 @@ package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/lkmio/avformat/bufio"
|
||||
"github.com/lkmio/avformat/utils"
|
||||
"github.com/lkmio/lkm/gb28181"
|
||||
"github.com/lkmio/lkm/log"
|
||||
@@ -9,47 +10,41 @@ import (
|
||||
"net"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
const (
|
||||
InviteTypeLive = iota
|
||||
InviteTypePlayback
|
||||
InviteTypeDownload
|
||||
InviteTypeBroadcast
|
||||
InviteTypeTalk
|
||||
InviteTypePlay = "play"
|
||||
InviteTypePlayback = "playback"
|
||||
InviteTypeDownload = "download"
|
||||
InviteTypeBroadcast = "broadcast"
|
||||
InviteTypeTalk = "talk"
|
||||
)
|
||||
|
||||
type GBForwardParams struct {
|
||||
Source string `json:"source"` // GetSourceID
|
||||
Addr string `json:"addr"`
|
||||
SSRC uint32 `json:"ssrc"`
|
||||
OfferSetup string `json:"offer_setup"`
|
||||
Setup string `json:"setup"`
|
||||
Type int `json:"type"` // live/download/playback/talk/broadcast
|
||||
type SDP struct {
|
||||
SessionName string `json:"session_name,omitempty"` // play/download/playback/talk/broadcast
|
||||
Addr string `json:"addr,omitempty"` // 连接地址
|
||||
SSRC string `json:"ssrc,omitempty"`
|
||||
Setup string `json:"setup,omitempty"` // active/passive
|
||||
Transport string `json:"transport,omitempty"` // tcp/udp
|
||||
}
|
||||
|
||||
type GBSourceParams struct {
|
||||
type SourceSDP struct {
|
||||
Source string `json:"source"` // GetSourceID
|
||||
Setup string `json:"setup"` // active/passive
|
||||
SSRC uint32 `json:"ssrc,omitempty"`
|
||||
Type int `json:"type"` // live/download/playback/talk/broadcast
|
||||
SDP
|
||||
}
|
||||
|
||||
type GBConnect struct {
|
||||
Source string `json:"source"` // GetSourceID
|
||||
RemoteAddr string `json:"remote_addr"`
|
||||
type GBOffer struct {
|
||||
SourceSDP
|
||||
AnswerSetup string `json:"answer_setup,omitempty"` // 希望应答的连接方式
|
||||
}
|
||||
|
||||
func (api *ApiServer) OnGBSourceCreate(v *GBSourceParams, w http.ResponseWriter, r *http.Request) {
|
||||
func (api *ApiServer) OnGBSourceCreate(v *SourceSDP, w http.ResponseWriter, r *http.Request) {
|
||||
log.Sugar.Infof("创建国标源: %v", v)
|
||||
|
||||
// 返回收流地址
|
||||
response := &struct {
|
||||
IP string `json:"ip"`
|
||||
Port int `json:"port,omitempty"`
|
||||
SDP
|
||||
Urls []string `json:"urls"`
|
||||
SSRC string `json:"ssrc,omitempty"`
|
||||
}{}
|
||||
|
||||
var err error
|
||||
@@ -92,7 +87,7 @@ func (api *ApiServer) OnGBSourceCreate(v *GBSourceParams, w http.ResponseWriter,
|
||||
}
|
||||
|
||||
var ssrc string
|
||||
if v.Type == InviteTypeDownload || v.Type == InviteTypePlayback {
|
||||
if v.SessionName == InviteTypeDownload || v.SessionName == InviteTypePlayback {
|
||||
ssrc = gb28181.GetVodSSRC()
|
||||
} else {
|
||||
ssrc = gb28181.GetLiveSSRC()
|
||||
@@ -104,14 +99,13 @@ func (api *ApiServer) OnGBSourceCreate(v *GBSourceParams, w http.ResponseWriter,
|
||||
return
|
||||
}
|
||||
|
||||
response.IP = stream.AppConfig.PublicIP
|
||||
response.Port = port
|
||||
response.Addr = net.JoinHostPort(stream.AppConfig.PublicIP, strconv.Itoa(port))
|
||||
response.Urls = stream.GetStreamPlayUrls(v.Source)
|
||||
response.SSRC = ssrc
|
||||
httpResponseOK(w, response)
|
||||
}
|
||||
|
||||
func (api *ApiServer) OnGBSourceConnect(v *GBConnect, w http.ResponseWriter, r *http.Request) {
|
||||
func (api *ApiServer) OnGBSourceConnect(v *SourceSDP, w http.ResponseWriter, r *http.Request) {
|
||||
log.Sugar.Infof("设置国标主动拉流连接地址: %v", v)
|
||||
|
||||
var err error
|
||||
@@ -135,7 +129,7 @@ func (api *ApiServer) OnGBSourceConnect(v *GBConnect, w http.ResponseWriter, r *
|
||||
return
|
||||
}
|
||||
|
||||
addr, err := net.ResolveTCPAddr("tcp", v.RemoteAddr)
|
||||
addr, err := net.ResolveTCPAddr("tcp", v.Addr)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
@@ -145,62 +139,133 @@ func (api *ApiServer) OnGBSourceConnect(v *GBConnect, w http.ResponseWriter, r *
|
||||
}
|
||||
}
|
||||
|
||||
func (api *ApiServer) OnGBSourceForward(v *GBForwardParams, w http.ResponseWriter, r *http.Request) {
|
||||
log.Sugar.Infof("设置国标级联转发: %v", v)
|
||||
func (api *ApiServer) OnGBOfferCreate(v *SourceSDP, w http.ResponseWriter, r *http.Request) {
|
||||
// 预览下级设备
|
||||
if v.SessionName == "" || v.SessionName == InviteTypePlay ||
|
||||
v.SessionName == InviteTypePlayback ||
|
||||
v.SessionName == InviteTypeDownload {
|
||||
api.OnGBSourceCreate(v, w, r)
|
||||
} else {
|
||||
// 向上级转发广播和对讲, 或者是向设备发送invite talk
|
||||
}
|
||||
}
|
||||
|
||||
func (api *ApiServer) OnGBAnswerCreate(v *GBOffer, w http.ResponseWriter, r *http.Request) {
|
||||
log.Sugar.Infof("创建应答 offer: %v", v)
|
||||
|
||||
var sink stream.Sink
|
||||
var err error
|
||||
// 响应错误消息
|
||||
defer func() {
|
||||
if err != nil {
|
||||
log.Sugar.Errorf("设置级联转发失败 err: %s", err.Error())
|
||||
log.Sugar.Errorf("创建应答失败 err: %s", err.Error())
|
||||
httpResponseError(w, err.Error())
|
||||
|
||||
if sink != nil {
|
||||
sink.Close()
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
source := stream.SourceManager.Find(v.Source)
|
||||
if source == nil {
|
||||
err = fmt.Errorf("%s 源不存在", v.Source)
|
||||
} else if source.GetType() != stream.SourceType28181 {
|
||||
log.Sugar.Infof("%s 源不是国标推流类型", v.Source)
|
||||
return
|
||||
}
|
||||
|
||||
var setup gb28181.SetupType
|
||||
switch strings.ToLower(v.Setup) {
|
||||
case "active":
|
||||
setup = gb28181.SetupActive
|
||||
break
|
||||
case "passive":
|
||||
setup = gb28181.SetupPassive
|
||||
break
|
||||
default:
|
||||
setup = gb28181.SetupUDP
|
||||
break
|
||||
}
|
||||
|
||||
addr, _ := net.ResolveTCPAddr("tcp", r.RemoteAddr)
|
||||
sinkId := stream.NetAddr2SinkId(addr)
|
||||
|
||||
// 添加随机数
|
||||
// sinkId添加随机数
|
||||
if ipv4, ok := sinkId.(uint64); ok {
|
||||
random := uint64(utils.RandomIntInRange(0x1000, 0xFFFF0000))
|
||||
sinkId = (ipv4 & 0xFFFFFFFF00000000) | (random << 16) | (ipv4 & 0xFFFF)
|
||||
}
|
||||
|
||||
sink, port, err := gb28181.NewForwardSink(v.SSRC, v.Addr, setup, sinkId, v.Source)
|
||||
setup := gb28181.SetupTypeFromString(v.Setup)
|
||||
if v.AnswerSetup != "" {
|
||||
setup = gb28181.SetupTypeFromString(v.AnswerSetup)
|
||||
}
|
||||
|
||||
var protocol stream.TransStreamProtocol
|
||||
// 级联转发
|
||||
if v.SessionName == "" || v.SessionName == InviteTypePlay ||
|
||||
v.SessionName == InviteTypePlayback ||
|
||||
v.SessionName == InviteTypeDownload {
|
||||
protocol = stream.TransStreamGBCascadedForward
|
||||
} else {
|
||||
// 对讲广播转发
|
||||
protocol = stream.TransStreamGBTalkForward
|
||||
}
|
||||
|
||||
var port int
|
||||
sink, port, err = stream.NewForwardSink(setup.TransportType(), protocol, sinkId, v.Source, v.Addr, gb28181.TransportManger)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
source.AddSink(sink)
|
||||
|
||||
log.Sugar.Infof("设置国标级联转发成功 ID: %s", sink.GetID())
|
||||
log.Sugar.Infof("创建转发sink成功, sink: %s port: %d transport: %s", sink.GetID(), port, setup.TransportType())
|
||||
_, state := stream.PreparePlaySink(sink)
|
||||
if utils.HookStateOK != state {
|
||||
err = fmt.Errorf("failed to prepare play sink")
|
||||
return
|
||||
}
|
||||
|
||||
response := struct {
|
||||
Sink string `json:"sink"` //sink id
|
||||
IP string `json:"ip"`
|
||||
Port int `json:"port"`
|
||||
}{Sink: stream.SinkId2String(sinkId), IP: stream.AppConfig.PublicIP, Port: port}
|
||||
SDP
|
||||
}{Sink: stream.SinkId2String(sinkId), SDP: SDP{Addr: net.JoinHostPort(stream.AppConfig.PublicIP, strconv.Itoa(port))}}
|
||||
|
||||
httpResponseOK(w, &response)
|
||||
}
|
||||
|
||||
// OnGBTalk 国标广播/对讲流程:
|
||||
// 1. 浏览器使用WS携带source_id访问/api/v1/gb28181/talk, 如果source_id冲突, 直接断开ws连接
|
||||
// 2. WS链接建立后, 调用gb-cms接口/api/v1/broadcast/invite, 向设备发送广播请求
|
||||
func (api *ApiServer) OnGBTalk(w http.ResponseWriter, r *http.Request) {
|
||||
conn, err := api.upgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
log.Sugar.Errorf("升级为websocket失败 err: %s", err.Error())
|
||||
conn.Close()
|
||||
return
|
||||
}
|
||||
|
||||
// 获取id
|
||||
id := r.FormValue("source")
|
||||
|
||||
talkSource := gb28181.NewTalkSource(id, conn)
|
||||
talkSource.Init(stream.TCPReceiveBufferQueueSize)
|
||||
talkSource.SetUrlValues(r.Form)
|
||||
|
||||
_, state := stream.PreparePublishSource(talkSource, true)
|
||||
if utils.HookStateOK != state {
|
||||
log.Sugar.Errorf("对讲失败, source: %s", talkSource)
|
||||
conn.Close()
|
||||
return
|
||||
}
|
||||
|
||||
log.Sugar.Infof("ws对讲连接成功, source: %s", talkSource)
|
||||
|
||||
go stream.LoopEvent(talkSource)
|
||||
|
||||
for {
|
||||
_, bytes, err := conn.ReadMessage()
|
||||
length := len(bytes)
|
||||
if err != nil {
|
||||
log.Sugar.Errorf("读取对讲音频包失败, source: %s err: %s", id, err.Error())
|
||||
break
|
||||
} else if length < 1 {
|
||||
continue
|
||||
}
|
||||
|
||||
for i := 0; i < length; {
|
||||
data := stream.UDPReceiveBufferPool.Get().([]byte)
|
||||
n := bufio.MinInt(stream.UDPReceiveBufferSize, length-i)
|
||||
copy(data, bytes[:n])
|
||||
_ = talkSource.PublishSource.Input(data[:n])
|
||||
i += n
|
||||
}
|
||||
}
|
||||
|
||||
talkSource.Close()
|
||||
}
|
||||
|
9
gb28181/cascaded_stream.go
Normal file
9
gb28181/cascaded_stream.go
Normal file
@@ -0,0 +1,9 @@
|
||||
package gb28181
|
||||
|
||||
import (
|
||||
"github.com/lkmio/lkm/stream"
|
||||
)
|
||||
|
||||
func CascadedTransStreamFactory(source stream.Source, protocol stream.TransStreamProtocol, tracks []*stream.Track) (stream.TransStream, error) {
|
||||
return stream.NewRtpTransStream(stream.TransStreamGBCascadedForward, 1024), nil
|
||||
}
|
@@ -1,124 +0,0 @@
|
||||
package gb28181
|
||||
|
||||
import (
|
||||
"github.com/lkmio/avformat/collections"
|
||||
"github.com/lkmio/avformat/utils"
|
||||
"github.com/lkmio/lkm/log"
|
||||
"github.com/lkmio/lkm/stream"
|
||||
"github.com/lkmio/rtp"
|
||||
"github.com/lkmio/transport"
|
||||
"net"
|
||||
)
|
||||
|
||||
type ForwardSink struct {
|
||||
stream.BaseSink
|
||||
setup SetupType
|
||||
socket transport.Transport
|
||||
ssrc uint32
|
||||
}
|
||||
|
||||
func (f *ForwardSink) OnConnected(conn net.Conn) []byte {
|
||||
log.Sugar.Infof("级联连接 conn: %s", conn.RemoteAddr())
|
||||
|
||||
f.Conn = conn
|
||||
f.Conn.(*transport.Conn).EnableAsyncWriteMode(512)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *ForwardSink) OnPacket(conn net.Conn, data []byte) []byte {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *ForwardSink) OnDisConnected(conn net.Conn, err error) {
|
||||
log.Sugar.Infof("级联断开连接 conn: %s", conn.RemoteAddr())
|
||||
|
||||
f.Close()
|
||||
}
|
||||
|
||||
func (f *ForwardSink) Write(index int, data []*collections.ReferenceCounter[[]byte], ts int64, keyVideo bool) error {
|
||||
// TCP等待连接后再转发数据
|
||||
if SetupUDP != f.setup && f.Conn == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// 修改为与上级协商的SSRC
|
||||
rtp.ModifySSRC(data[0].Get(), f.ssrc)
|
||||
|
||||
if SetupUDP == f.setup {
|
||||
f.socket.(*transport.UDPClient).Write(data[0].Get()[2:])
|
||||
} else {
|
||||
return f.BaseSink.Write(index, data, ts, keyVideo)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close 关闭国标转发流
|
||||
func (f *ForwardSink) Close() {
|
||||
f.BaseSink.Close()
|
||||
|
||||
if f.socket != nil {
|
||||
f.socket.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// NewForwardSink 创建国标级联转发流Sink
|
||||
// 返回监听的端口和Sink
|
||||
func NewForwardSink(ssrc uint32, serverAddr string, setup SetupType, sinkId stream.SinkID, sourceId string) (stream.Sink, int, error) {
|
||||
sink := &ForwardSink{BaseSink: stream.BaseSink{ID: sinkId, SourceID: sourceId, State: stream.SessionStateCreated, Protocol: stream.TransStreamGBStreamForward}, ssrc: ssrc, setup: setup}
|
||||
|
||||
if SetupUDP == setup {
|
||||
remoteAddr, err := net.ResolveUDPAddr("udp", serverAddr)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
client, err := TransportManger.NewUDPClient(remoteAddr)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
sink.socket = client
|
||||
} else if SetupActive == setup {
|
||||
server, err := TransportManger.NewTCPServer()
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
sink.TCPStreaming = true
|
||||
sink.socket = server
|
||||
} else if SetupPassive == setup {
|
||||
client := transport.TCPClient{}
|
||||
err := TransportManger.AllocPort(true, func(port uint16) error {
|
||||
localAddr, err := net.ResolveTCPAddr("tcp", stream.ListenAddr(int(port)))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
remoteAddr, err := net.ResolveTCPAddr("tcp", serverAddr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
client.SetHandler(sink)
|
||||
conn, err := client.Connect(localAddr, remoteAddr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sink.Conn = conn
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
sink.TCPStreaming = true
|
||||
sink.socket = &client
|
||||
} else {
|
||||
utils.Assert(false)
|
||||
}
|
||||
|
||||
return sink, sink.socket.ListenPort(), nil
|
||||
}
|
@@ -1,61 +0,0 @@
|
||||
package gb28181
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"github.com/lkmio/avformat"
|
||||
"github.com/lkmio/avformat/collections"
|
||||
"github.com/lkmio/lkm/log"
|
||||
"github.com/lkmio/lkm/stream"
|
||||
)
|
||||
|
||||
// ForwardStream 国标级联转发流, 下级推什么, 就向上级发什么.
|
||||
type ForwardStream struct {
|
||||
stream.BaseTransStream
|
||||
rtpBuffers *collections.Queue[*collections.ReferenceCounter[[]byte]]
|
||||
}
|
||||
|
||||
func (f *ForwardStream) WriteHeader() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *ForwardStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
|
||||
size := 2 + uint16(len(packet.Data))
|
||||
if size > stream.UDPReceiveBufferSize {
|
||||
log.Sugar.Errorf("国标级联转发流失败 rtp包过长, 长度:%d, 最大允许:%d", len(packet.Data), stream.UDPReceiveBufferSize)
|
||||
return nil, 0, false, nil
|
||||
}
|
||||
|
||||
// 释放rtp包
|
||||
for f.rtpBuffers.Size() > 0 {
|
||||
rtp := f.rtpBuffers.Peek(0)
|
||||
if rtp.UseCount() > 1 {
|
||||
break
|
||||
}
|
||||
|
||||
f.rtpBuffers.Pop()
|
||||
|
||||
// 放回池中
|
||||
data := rtp.Get()
|
||||
stream.UDPReceiveBufferPool.Put(data[:cap(data)])
|
||||
}
|
||||
|
||||
bytes := stream.UDPReceiveBufferPool.Get().([]byte)
|
||||
binary.BigEndian.PutUint16(bytes, size-2)
|
||||
copy(bytes[2:], packet.Data)
|
||||
|
||||
rtp := collections.NewReferenceCounter(bytes[:size])
|
||||
f.rtpBuffers.Push(rtp)
|
||||
// 每帧都当关键帧, 直接发给上级
|
||||
return []*collections.ReferenceCounter[[]byte]{rtp}, -1, true, nil
|
||||
}
|
||||
|
||||
func NewTransStream() (stream.TransStream, error) {
|
||||
return &ForwardStream{
|
||||
BaseTransStream: stream.BaseTransStream{Protocol: stream.TransStreamGBStreamForward},
|
||||
rtpBuffers: collections.NewQueue[*collections.ReferenceCounter[[]byte]](1024),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func TransStreamFactory(source stream.Source, protocol stream.TransStreamProtocol, tracks []*stream.Track) (stream.TransStream, error) {
|
||||
return NewTransStream()
|
||||
}
|
@@ -53,13 +53,15 @@ func connectSource(source string, addr string) {
|
||||
|
||||
func createSource(source, setup string, ssrc uint32) (string, uint16, uint32) {
|
||||
v := struct {
|
||||
Source string `json:"source"` //GetSourceID
|
||||
Setup string `json:"setup"` //active/passive
|
||||
SSRC uint32 `json:"ssrc,omitempty"`
|
||||
Source string `json:"source"` //GetSourceID
|
||||
Setup string `json:"setup"` //active/passive
|
||||
SSRC string `json:"ssrc,omitempty"`
|
||||
SessionName string `json:"session_name,omitempty"` // play/download/playback/talk/broadcast
|
||||
}{
|
||||
Source: source,
|
||||
Setup: setup,
|
||||
SSRC: ssrc,
|
||||
Source: source,
|
||||
Setup: setup,
|
||||
SSRC: strconv.Itoa(int(ssrc)),
|
||||
SessionName: "play",
|
||||
}
|
||||
|
||||
marshal, err := json.Marshal(v)
|
||||
@@ -67,7 +69,8 @@ func createSource(source, setup string, ssrc uint32) (string, uint16, uint32) {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
request, err := http.NewRequest("POST", "http://localhost:8080/api/v1/gb28181/source/create", bytes.NewBuffer(marshal))
|
||||
//request, err := http.NewRequest("POST", "http://localhost:8080/api/v1/gb28181/source/create", bytes.NewBuffer(marshal))
|
||||
request, err := http.NewRequest("POST", "http://localhost:8080/api/v1/gb28181/offer/create", bytes.NewBuffer(marshal))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
@@ -89,8 +92,7 @@ func createSource(source, setup string, ssrc uint32) (string, uint16, uint32) {
|
||||
Code int `json:"code"`
|
||||
Msg string `json:"msg"`
|
||||
Data struct {
|
||||
IP string `json:"ip"`
|
||||
Port uint16 `json:"port,omitempty"`
|
||||
Addr string `json:"addr"`
|
||||
SSRC string `json:"ssrc,omitempty"`
|
||||
}
|
||||
}{}
|
||||
@@ -105,7 +107,9 @@ func createSource(source, setup string, ssrc uint32) (string, uint16, uint32) {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return connectInfo.Data.IP, connectInfo.Data.Port, uint32(atoi)
|
||||
host, p, err := net.SplitHostPort(connectInfo.Data.Addr)
|
||||
Port, err := strconv.Atoi(p)
|
||||
return host, uint16(Port), uint32(atoi)
|
||||
}
|
||||
|
||||
// 分割rtp包, 返回rtp over tcp包
|
||||
|
@@ -25,6 +25,43 @@ const (
|
||||
JitterBufferSize = 1024 * 1024
|
||||
)
|
||||
|
||||
func (s SetupType) TransportType() stream.TransportType {
|
||||
switch s {
|
||||
case SetupUDP:
|
||||
return stream.TransportTypeUDP
|
||||
case SetupPassive:
|
||||
return stream.TransportTypeTCPServer
|
||||
case SetupActive:
|
||||
return stream.TransportTypeTCPClient
|
||||
default:
|
||||
panic(fmt.Errorf("invalid setup type: %d", s))
|
||||
}
|
||||
}
|
||||
|
||||
func (s SetupType) String() string {
|
||||
switch s {
|
||||
case SetupUDP:
|
||||
return "udp"
|
||||
case SetupPassive:
|
||||
return "passive"
|
||||
case SetupActive:
|
||||
return "active"
|
||||
default:
|
||||
panic(fmt.Errorf("invalid setup type: %d", s))
|
||||
}
|
||||
}
|
||||
|
||||
func SetupTypeFromString(setupType string) SetupType {
|
||||
switch setupType {
|
||||
case "passive":
|
||||
return SetupPassive
|
||||
case "active":
|
||||
return SetupActive
|
||||
default:
|
||||
return SetupUDP
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
TransportManger transport.Manager
|
||||
SharedUDPServer *UDPServer
|
||||
|
94
gb28181/talk_source.go
Normal file
94
gb28181/talk_source.go
Normal file
@@ -0,0 +1,94 @@
|
||||
package gb28181
|
||||
|
||||
import (
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/lkmio/avformat"
|
||||
"github.com/lkmio/avformat/bufio"
|
||||
"github.com/lkmio/avformat/utils"
|
||||
"github.com/lkmio/lkm/stream"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Demuxer struct {
|
||||
avformat.BaseDemuxer
|
||||
ts int64
|
||||
firstOfPacket bool
|
||||
}
|
||||
|
||||
func (d *Demuxer) Input(data []byte) (int, error) {
|
||||
length := len(data)
|
||||
|
||||
if !d.firstOfPacket {
|
||||
d.firstOfPacket = true
|
||||
d.OnNewAudioTrack(0, utils.AVCodecIdPCMALAW, 8000, nil, avformat.AudioConfig{
|
||||
HasADTSHeader: false,
|
||||
Channels: 1,
|
||||
SampleRate: 8000,
|
||||
SampleSize: 2,
|
||||
})
|
||||
|
||||
d.ProbeComplete()
|
||||
}
|
||||
|
||||
for i := 0; i < length; {
|
||||
n := bufio.MinInt(length-i, 320)
|
||||
_, _ = d.DataPipeline.Write(data[i:i+n], 0, utils.AVMediaTypeAudio)
|
||||
pkt, _ := d.DataPipeline.Feat(0)
|
||||
d.OnAudioPacket(0, utils.AVCodecIdPCMALAW, pkt, d.ts)
|
||||
d.ts += int64(n)
|
||||
i += n
|
||||
}
|
||||
|
||||
return length, nil
|
||||
}
|
||||
|
||||
type TalkSource struct {
|
||||
stream.PublishSource
|
||||
}
|
||||
|
||||
func (s *TalkSource) Input(data []byte) error {
|
||||
_, err := s.PublishSource.TransDemuxer.Input(data)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *TalkSource) Close() {
|
||||
s.PublishSource.Close()
|
||||
// 关闭所有对讲设备的会话
|
||||
stream.CloseWaitingSinks(s.ID)
|
||||
}
|
||||
|
||||
type WSConn struct {
|
||||
*websocket.Conn
|
||||
}
|
||||
|
||||
func (w WSConn) Read(b []byte) (n int, err error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (w WSConn) Write(block []byte) (n int, err error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (w WSConn) SetDeadline(t time.Time) error {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func NewTalkSource(id string, conn *websocket.Conn) *TalkSource {
|
||||
s := &TalkSource{
|
||||
PublishSource: stream.PublishSource{
|
||||
ID: id,
|
||||
Type: stream.SourceTypeGBTalk,
|
||||
Conn: &WSConn{conn},
|
||||
TransDemuxer: &Demuxer{
|
||||
BaseDemuxer: avformat.BaseDemuxer{
|
||||
DataPipeline: &avformat.StreamsBuffer{},
|
||||
Name: "gb_talk",
|
||||
AutoFree: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
s.TransDemuxer.SetHandler(s)
|
||||
return s
|
||||
}
|
42
gb28181/talk_stream.go
Normal file
42
gb28181/talk_stream.go
Normal file
@@ -0,0 +1,42 @@
|
||||
package gb28181
|
||||
|
||||
import (
|
||||
"github.com/lkmio/avformat"
|
||||
"github.com/lkmio/avformat/collections"
|
||||
"github.com/lkmio/lkm/stream"
|
||||
"github.com/lkmio/rtp"
|
||||
)
|
||||
|
||||
type TalkStream struct {
|
||||
*stream.RtpStream
|
||||
muxer rtp.Muxer
|
||||
packet []byte
|
||||
}
|
||||
|
||||
func (s *TalkStream) WriteHeader() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *TalkStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
|
||||
var size int
|
||||
s.muxer.Input(packet.Data, uint32(packet.Dts), func() []byte {
|
||||
return s.packet
|
||||
}, func(pkt []byte) {
|
||||
size = len(pkt)
|
||||
})
|
||||
|
||||
packet = &avformat.AVPacket{Data: s.packet[:size]}
|
||||
return s.RtpStream.Input(packet)
|
||||
}
|
||||
|
||||
func NewTalkTransStream() (stream.TransStream, error) {
|
||||
return &TalkStream{
|
||||
RtpStream: stream.NewRtpTransStream(stream.TransStreamGBTalkForward, 1024),
|
||||
muxer: rtp.NewMuxer(8, 0, 0xFFFFFFFF),
|
||||
packet: make([]byte, 1500),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func TalkTransStreamFactory(source stream.Source, protocol stream.TransStreamProtocol, tracks []*stream.Track) (stream.TransStream, error) {
|
||||
return NewTalkTransStream()
|
||||
}
|
3
main.go
3
main.go
@@ -28,7 +28,8 @@ func init() {
|
||||
stream.RegisterTransStreamFactory(stream.TransStreamFlv, flv.TransStreamFactory)
|
||||
stream.RegisterTransStreamFactory(stream.TransStreamRtsp, rtsp.TransStreamFactory)
|
||||
stream.RegisterTransStreamFactory(stream.TransStreamRtc, rtc.TransStreamFactory)
|
||||
stream.RegisterTransStreamFactory(stream.TransStreamGBStreamForward, gb28181.TransStreamFactory)
|
||||
stream.RegisterTransStreamFactory(stream.TransStreamGBCascadedForward, gb28181.CascadedTransStreamFactory)
|
||||
stream.RegisterTransStreamFactory(stream.TransStreamGBTalkForward, gb28181.TalkTransStreamFactory)
|
||||
stream.SetRecordStreamFactory(record.NewFLVFileSink)
|
||||
stream.StreamEndInfoBride = NewStreamEndInfo
|
||||
|
||||
|
161
stream/forward_sink.go
Normal file
161
stream/forward_sink.go
Normal file
@@ -0,0 +1,161 @@
|
||||
package stream
|
||||
|
||||
import (
|
||||
"github.com/lkmio/avformat/collections"
|
||||
"github.com/lkmio/lkm/log"
|
||||
"github.com/lkmio/transport"
|
||||
"net"
|
||||
"time"
|
||||
)
|
||||
|
||||
type TransportType int
|
||||
|
||||
const (
|
||||
TransportTypeUDP TransportType = iota
|
||||
TransportTypeTCPClient
|
||||
TransportTypeTCPServer
|
||||
)
|
||||
|
||||
func (t TransportType) String() string {
|
||||
switch t {
|
||||
case TransportTypeUDP:
|
||||
return "udp"
|
||||
case TransportTypeTCPClient:
|
||||
return "tcp_client"
|
||||
case TransportTypeTCPServer:
|
||||
return "tcp_server"
|
||||
default:
|
||||
panic("invalid transport type")
|
||||
}
|
||||
}
|
||||
|
||||
type ForwardSink struct {
|
||||
BaseSink
|
||||
socket transport.Transport
|
||||
transportType TransportType
|
||||
receiveTimer *time.Timer
|
||||
}
|
||||
|
||||
func (f *ForwardSink) OnConnected(conn net.Conn) []byte {
|
||||
log.Sugar.Infof("%s 连接 conn: %s", f.Protocol, conn.RemoteAddr())
|
||||
|
||||
f.receiveTimer.Stop()
|
||||
|
||||
// 如果f.Conn赋值后, 发送数据先于EnableAsyncWriteMode执行, 可能会panic
|
||||
// 所以保险一点, 放在主协程执行
|
||||
ExecuteSyncEventOnSource(f.SourceID, func() {
|
||||
f.Conn = conn
|
||||
f.BaseSink.EnableAsyncWriteMode(512)
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *ForwardSink) OnPacket(conn net.Conn, data []byte) []byte {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *ForwardSink) OnDisConnected(conn net.Conn, err error) {
|
||||
log.Sugar.Infof("%s 断开连接 conn: %s", f.Protocol, conn.RemoteAddr())
|
||||
|
||||
f.Close()
|
||||
}
|
||||
|
||||
func (f *ForwardSink) Write(index int, data []*collections.ReferenceCounter[[]byte], ts int64, keyVideo bool) error {
|
||||
// TCP等待连接后再转发数据
|
||||
if TransportTypeUDP != f.transportType && f.Conn == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if TransportTypeUDP == f.transportType {
|
||||
f.socket.(*transport.UDPClient).Write(data[0].Get()[2:])
|
||||
} else {
|
||||
return f.BaseSink.Write(index, data, ts, keyVideo)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close 关闭国标转发流
|
||||
func (f *ForwardSink) Close() {
|
||||
f.BaseSink.Close()
|
||||
|
||||
if f.socket != nil {
|
||||
f.socket.Close()
|
||||
}
|
||||
|
||||
if f.receiveTimer != nil {
|
||||
f.receiveTimer.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
// StartReceiveTimer 启动tcp sever计时器, 如果计时器触发, 没有连接, 则关闭流
|
||||
func (f *ForwardSink) StartReceiveTimer() {
|
||||
f.receiveTimer = time.AfterFunc(time.Second*10, func() {
|
||||
if f.Conn == nil {
|
||||
log.Sugar.Infof("%s 等待连接超时, 关闭sink", f.Protocol)
|
||||
f.Close()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func NewForwardSink(transportType TransportType, protocol TransStreamProtocol, sinkId SinkID, sourceId string, addr string, manager transport.Manager) (*ForwardSink, int, error) {
|
||||
sink := &ForwardSink{
|
||||
BaseSink: BaseSink{ID: sinkId, SourceID: sourceId, State: SessionStateCreated, Protocol: protocol},
|
||||
transportType: transportType,
|
||||
}
|
||||
|
||||
if transportType == TransportTypeUDP {
|
||||
remoteAddr, err := net.ResolveUDPAddr("udp", addr)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
client, err := manager.NewUDPClient(remoteAddr)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
sink.socket = client
|
||||
} else if transportType == TransportTypeTCPClient {
|
||||
client := transport.TCPClient{}
|
||||
err := manager.AllocPort(true, func(port uint16) error {
|
||||
localAddr, err := net.ResolveTCPAddr("tcp", ListenAddr(int(port)))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
remoteAddr, err := net.ResolveTCPAddr("tcp", addr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
client.SetHandler(sink)
|
||||
conn, err := client.Connect(localAddr, remoteAddr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sink.Conn = conn
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
sink.socket = &client
|
||||
} else if transportType == TransportTypeTCPServer {
|
||||
tcpServer, err := manager.NewTCPServer()
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
tcpServer.SetHandler(sink)
|
||||
tcpServer.Accept()
|
||||
sink.socket = tcpServer
|
||||
sink.StartReceiveTimer()
|
||||
}
|
||||
|
||||
return sink, sink.socket.ListenPort(), nil
|
||||
}
|
56
stream/rtp_stream.go
Normal file
56
stream/rtp_stream.go
Normal file
@@ -0,0 +1,56 @@
|
||||
package stream
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"github.com/lkmio/avformat"
|
||||
"github.com/lkmio/avformat/collections"
|
||||
"github.com/lkmio/lkm/log"
|
||||
)
|
||||
|
||||
type RtpStream struct {
|
||||
BaseTransStream
|
||||
rtpBuffers *collections.Queue[*collections.ReferenceCounter[[]byte]]
|
||||
}
|
||||
|
||||
func (f *RtpStream) WriteHeader() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *RtpStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
|
||||
size := 2 + uint16(len(packet.Data))
|
||||
if size > UDPReceiveBufferSize {
|
||||
log.Sugar.Errorf("转发%s流失败 rtp包过长, 长度:%d, 最大允许:%d", f.Protocol, len(packet.Data), UDPReceiveBufferSize)
|
||||
return nil, 0, false, nil
|
||||
}
|
||||
|
||||
// 释放rtp包
|
||||
for f.rtpBuffers.Size() > 0 {
|
||||
rtp := f.rtpBuffers.Peek(0)
|
||||
if rtp.UseCount() > 1 {
|
||||
break
|
||||
}
|
||||
|
||||
f.rtpBuffers.Pop()
|
||||
|
||||
// 放回池中
|
||||
data := rtp.Get()
|
||||
UDPReceiveBufferPool.Put(data[:cap(data)])
|
||||
}
|
||||
|
||||
bytes := UDPReceiveBufferPool.Get().([]byte)
|
||||
binary.BigEndian.PutUint16(bytes, size-2)
|
||||
copy(bytes[2:], packet.Data)
|
||||
|
||||
rtp := collections.NewReferenceCounter(bytes[:size])
|
||||
f.rtpBuffers.Push(rtp)
|
||||
|
||||
// 每帧都当关键帧, 直接发给上级
|
||||
return []*collections.ReferenceCounter[[]byte]{rtp}, -1, true, nil
|
||||
}
|
||||
|
||||
func NewRtpTransStream(protocol TransStreamProtocol, capacity int) *RtpStream {
|
||||
return &RtpStream{
|
||||
BaseTransStream: BaseTransStream{Protocol: protocol},
|
||||
rtpBuffers: collections.NewQueue[*collections.ReferenceCounter[[]byte]](capacity),
|
||||
}
|
||||
}
|
@@ -53,3 +53,13 @@ func SinkId2String(id SinkID) string {
|
||||
func CreateSinkDisconnectionMessage(sink Sink) string {
|
||||
return fmt.Sprintf("%s sink断开连接. id: %s", sink.GetProtocol(), sink.GetID())
|
||||
}
|
||||
|
||||
func ExecuteSyncEventOnSource(sourceId string, event func()) bool {
|
||||
source := SourceManager.Find(sourceId)
|
||||
if source != nil {
|
||||
source.ExecuteSyncEvent(event)
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
@@ -63,6 +63,13 @@ func PopWaitingSinks(sourceId string) []Sink {
|
||||
return sinks
|
||||
}
|
||||
|
||||
func CloseWaitingSinks(sourceId string) {
|
||||
sinks := PopWaitingSinks(sourceId)
|
||||
for _, sink := range sinks {
|
||||
sink.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func ExistSinkInWaitingQueue(sourceId string, sinkId SinkID) bool {
|
||||
mutex.RLock()
|
||||
defer mutex.RUnlock()
|
||||
|
@@ -288,7 +288,7 @@ func (s *PublishSource) CreateTransStream(id TransStreamID, protocol TransStream
|
||||
_ = transStream.WriteHeader()
|
||||
|
||||
// 设置转发流
|
||||
if TransStreamGBStreamForward == transStream.GetProtocol() {
|
||||
if TransStreamGBCascadedForward == transStream.GetProtocol() {
|
||||
s.ForwardTransStream = transStream
|
||||
}
|
||||
|
||||
@@ -612,7 +612,7 @@ func (s *PublishSource) DoClose() {
|
||||
transStreamID := sink.GetTransStreamID()
|
||||
sink.SetTransStreamID(0)
|
||||
if s.recordSink == sink {
|
||||
return
|
||||
continue
|
||||
}
|
||||
|
||||
{
|
||||
|
@@ -20,16 +20,18 @@ type TransStreamProtocol uint32
|
||||
type SessionState uint32
|
||||
|
||||
const (
|
||||
SourceTypeRtmp = SourceType(1)
|
||||
SourceType28181 = SourceType(2)
|
||||
SourceType1078 = SourceType(3)
|
||||
SourceTypeRtmp = SourceType(1)
|
||||
SourceType28181 = SourceType(2)
|
||||
SourceType1078 = SourceType(3)
|
||||
SourceTypeGBTalk = SourceType(4) // 国标广播/对讲
|
||||
|
||||
TransStreamRtmp = TransStreamProtocol(1)
|
||||
TransStreamFlv = TransStreamProtocol(2)
|
||||
TransStreamRtsp = TransStreamProtocol(3)
|
||||
TransStreamHls = TransStreamProtocol(4)
|
||||
TransStreamRtc = TransStreamProtocol(5)
|
||||
TransStreamGBStreamForward = TransStreamProtocol(6) // 国标级联转发
|
||||
TransStreamRtmp = TransStreamProtocol(1)
|
||||
TransStreamFlv = TransStreamProtocol(2)
|
||||
TransStreamRtsp = TransStreamProtocol(3)
|
||||
TransStreamHls = TransStreamProtocol(4)
|
||||
TransStreamRtc = TransStreamProtocol(5)
|
||||
TransStreamGBCascadedForward = TransStreamProtocol(6) // 国标级联转发
|
||||
TransStreamGBTalkForward = TransStreamProtocol(7) // 国标广播/对讲转发
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -49,6 +51,8 @@ func (s SourceType) String() string {
|
||||
return "28181"
|
||||
} else if SourceType1078 == s {
|
||||
return "jt1078"
|
||||
} else if SourceTypeGBTalk == s {
|
||||
return "gb_talk"
|
||||
}
|
||||
|
||||
panic(fmt.Sprintf("unknown source type %d", s))
|
||||
@@ -65,8 +69,10 @@ func (p TransStreamProtocol) String() string {
|
||||
return "hls"
|
||||
} else if TransStreamRtc == p {
|
||||
return "rtc"
|
||||
} else if TransStreamGBStreamForward == p {
|
||||
return "gb_stream_forward"
|
||||
} else if TransStreamGBCascadedForward == p {
|
||||
return "gb_cascaded_forward"
|
||||
} else if TransStreamGBTalkForward == p {
|
||||
return "gb_talk_forward"
|
||||
}
|
||||
|
||||
panic(fmt.Sprintf("unknown stream protocol %d", p))
|
||||
|
278
web/broadcast.html
Normal file
278
web/broadcast.html
Normal file
@@ -0,0 +1,278 @@
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<meta charset="UTF-8"/>
|
||||
<meta name="viewport"
|
||||
content="width=device-width, user-scalable=no, initial-scale=1.0, maximum-scale=1.0, minimum-scale=1.0"/>
|
||||
<meta name="apple-mobile-web-capable" content="yes"/>
|
||||
<title>语音广播</title>
|
||||
</head>
|
||||
<body>
|
||||
<button id="intercomBegin">开始广播</button>
|
||||
<button id="intercomEnd">关闭广播</button>
|
||||
|
||||
<input style="width: 100px;" id="device_id" type="text" value="34020000001320000001"/>
|
||||
<input style="width: 100px;" id="channel_id" type="text" value="34020000001320000001"/>
|
||||
|
||||
<button id="invite" onclick="invite()">邀请</button>
|
||||
<button id="hangup" onclick="hangup()">挂断</button>
|
||||
|
||||
</body>
|
||||
<script src="g711.js"></script>
|
||||
<script type="text/javascript">
|
||||
|
||||
var begin = document.getElementById('intercomBegin');
|
||||
var end = document.getElementById('intercomEnd');
|
||||
|
||||
var ws = null; //实现WebSocket
|
||||
var record = null; //多媒体对象,用来处理音频
|
||||
var source = null;
|
||||
|
||||
function init(rec) {
|
||||
record = rec;
|
||||
}
|
||||
|
||||
function invite() {
|
||||
let deviceId = document.getElementById("device_id").value;
|
||||
let channelId = document.getElementById("channel_id").value;
|
||||
|
||||
let data = {
|
||||
device_id: deviceId,
|
||||
channel_id: channelId,
|
||||
type: 1,
|
||||
source: source
|
||||
};
|
||||
|
||||
fetch("http://localhost:9000/api/v1/broadcast/invite", {
|
||||
method: 'POST',
|
||||
body: JSON.stringify(data),
|
||||
headers: new Headers({
|
||||
'Content-Type': 'application/json',
|
||||
}),
|
||||
}).then((res) => res.json())
|
||||
.then((data) => {
|
||||
|
||||
})
|
||||
}
|
||||
|
||||
function hangup() {
|
||||
let deviceId = document.getElementById("device_id").value;
|
||||
let channelId = document.getElementById("channel_id").value;
|
||||
|
||||
let data = {
|
||||
device_id: deviceId,
|
||||
channel_id: channelId,
|
||||
source: source
|
||||
};
|
||||
|
||||
fetch("http://localhost:9000/api/v1/broadcast/hangup", {
|
||||
method: 'POST',
|
||||
body: JSON.stringify(data),
|
||||
headers: new Headers({
|
||||
'Content-Type': 'application/json',
|
||||
}),
|
||||
}).then((res) => res.json())
|
||||
.then((data) => {
|
||||
|
||||
})
|
||||
}
|
||||
|
||||
//录音对象
|
||||
var Recorder = function (stream) {
|
||||
var sampleBits = 16; //输出采样数位 8, 16
|
||||
var sampleRate = 8000; //输出采样率
|
||||
var context = new AudioContext();
|
||||
var audioInput = context.createMediaStreamSource(stream);
|
||||
var recorder = context.createScriptProcessor(4096, 1, 1);
|
||||
var audioData = {
|
||||
size: 0, //录音文件长度
|
||||
buffer: [], //录音缓存
|
||||
inputSampleRate: 48000, //输入采样率
|
||||
inputSampleBits: 16, //输入采样数位 8, 16
|
||||
outputSampleRate: sampleRate, //输出采样数位
|
||||
oututSampleBits: sampleBits, //输出采样率
|
||||
clear: function () {
|
||||
this.buffer = [];
|
||||
this.size = 0;
|
||||
},
|
||||
input: function (data) {
|
||||
this.buffer.push(new Float32Array(data));
|
||||
this.size += data.length;
|
||||
},
|
||||
compress: function () { //合并压缩
|
||||
//合并
|
||||
var data = new Float32Array(this.size);
|
||||
var offset = 0;
|
||||
for (var i = 0; i < this.buffer.length; i++) {
|
||||
data.set(this.buffer[i], offset);
|
||||
offset += this.buffer[i].length;
|
||||
}
|
||||
//压缩
|
||||
var compression = parseInt(this.inputSampleRate / this.outputSampleRate);
|
||||
var length = data.length / compression;
|
||||
var result = new Float32Array(length);
|
||||
var index = 0,
|
||||
j = 0;
|
||||
while (index < length) {
|
||||
result[index] = data[j];
|
||||
j += compression;
|
||||
index++;
|
||||
}
|
||||
return result;
|
||||
},
|
||||
encodePCM: function () { //这里不对采集到的数据进行其他格式处理,如有需要均交给服务器端处理。
|
||||
var sampleRate = Math.min(this.inputSampleRate, this.outputSampleRate);
|
||||
var sampleBits = Math.min(this.inputSampleBits, this.oututSampleBits);
|
||||
var bytes = this.compress();
|
||||
var dataLength = bytes.length * (sampleBits / 8);
|
||||
var buffer = new ArrayBuffer(dataLength);
|
||||
var data = new DataView(buffer);
|
||||
var offset = 0;
|
||||
for (var i = 0; i < bytes.length; i++, offset += 2) {
|
||||
var s = Math.max(-1, Math.min(1, bytes[i]));
|
||||
data.setInt16(offset, s < 0 ? s * 0x8000 : s * 0x7FFF, true);
|
||||
}
|
||||
return new Blob([data]);
|
||||
}
|
||||
};
|
||||
|
||||
var sendData = function () { //对以获取的数据进行处理(分包)
|
||||
var reader = new FileReader();
|
||||
reader.onload = e => {
|
||||
var outbuffer = e.target.result;
|
||||
var arr = new Uint16Array(outbuffer);
|
||||
var dst = new Int8Array(arr.length);
|
||||
pcm16_to_alaw(arr.byteLength, arr, dst);
|
||||
|
||||
if (ws && ws.readyState === WebSocket.OPEN) {
|
||||
ws.send(dst);
|
||||
} else {
|
||||
console.log('WebSocket not ready, state:', ws ? ws.readyState : 'null');
|
||||
record.stop(); // 停止录音
|
||||
if (ws) {
|
||||
ws.close(); // 确保关闭连接
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
reader.readAsArrayBuffer(audioData.encodePCM());
|
||||
audioData.clear();//每次发送完成则清理掉旧数据
|
||||
};
|
||||
|
||||
this.start = function () {
|
||||
audioInput.connect(recorder);
|
||||
recorder.connect(context.destination);
|
||||
}
|
||||
|
||||
this.stop = function () {
|
||||
recorder.disconnect();
|
||||
}
|
||||
|
||||
this.getBlob = function () {
|
||||
return audioData.encodePCM();
|
||||
}
|
||||
|
||||
this.clear = function () {
|
||||
audioData.clear();
|
||||
}
|
||||
|
||||
recorder.onaudioprocess = function (e) {
|
||||
var inputBuffer = e.inputBuffer.getChannelData(0);
|
||||
audioData.input(inputBuffer);
|
||||
sendData();
|
||||
}
|
||||
}
|
||||
|
||||
function generateRandomAlphanumeric10() {
|
||||
const chars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789';
|
||||
let result = '';
|
||||
for (let i = 0; i < 20; i++) {
|
||||
result += chars.charAt(Math.floor(Math.random() * chars.length));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/*
|
||||
* WebSocket
|
||||
*/
|
||||
function connectWS() {
|
||||
let secure = window.location.protocol === 'https:'
|
||||
//source = generateRandomAlphanumeric10();
|
||||
source = "unique_id";
|
||||
let url = (secure ? "wss:" : "ws:") + "/" + window.location.host + "/ws/v1/gb28181/talk" + "?source=" + source
|
||||
ws = new WebSocket(url);
|
||||
ws.binaryType = 'arraybuffer'; //传输的是 ArrayBuffer 类型的数据
|
||||
|
||||
ws.onopen = function () {
|
||||
console.log('ws连接成功');
|
||||
record.start();
|
||||
};
|
||||
|
||||
ws.onmessage = function (msg) {
|
||||
|
||||
}
|
||||
|
||||
ws.onerror = function (err) {
|
||||
console.log('ws连接断开');
|
||||
console.info(err)
|
||||
record.stop()
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* 开始对讲
|
||||
*/
|
||||
begin.onclick = function () {
|
||||
navigator.getUserMedia = navigator.getUserMedia || navigator.webkitGetUserMedia;
|
||||
if (!navigator.getUserMedia) {
|
||||
alert('浏览器不支持音频输入');
|
||||
} else {
|
||||
navigator.getUserMedia({
|
||||
audio: true
|
||||
},
|
||||
|
||||
//获取到音频采集权限回调
|
||||
function (mediaStream) {
|
||||
console.log('开始对讲');
|
||||
//初始化采集器
|
||||
init(new Recorder(mediaStream));
|
||||
//连接websocket
|
||||
connectWS();
|
||||
},
|
||||
|
||||
function (error) {
|
||||
console.log(error);
|
||||
switch (error.message || error.name) {
|
||||
case 'PERMISSION_DENIED':
|
||||
case 'PermissionDeniedError':
|
||||
console.info('用户拒绝提供信息。');
|
||||
break;
|
||||
case 'NOT_SUPPORTED_ERROR':
|
||||
case 'NotSupportedError':
|
||||
console.info('浏览器不支持硬件设备。');
|
||||
break;
|
||||
case 'MANDATORY_UNSATISFIED_ERROR':
|
||||
case 'MandatoryUnsatisfiedError':
|
||||
console.info('无法发现指定的硬件设备。');
|
||||
break;
|
||||
default:
|
||||
console.info('无法打开麦克风。异常信息:' + (error.code || error.name));
|
||||
break;
|
||||
}
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* 关闭对讲
|
||||
*/
|
||||
end.onclick = function () {
|
||||
if (ws) {
|
||||
ws.close();
|
||||
record.stop();
|
||||
console.log('关闭对讲以及WebSocket');
|
||||
}
|
||||
}
|
||||
</script>
|
||||
</html>
|
341
web/g711.js
Normal file
341
web/g711.js
Normal file
@@ -0,0 +1,341 @@
|
||||
var SIGN_BIT = 0x80; /* Sign bit for a A-law byte. */
|
||||
var QUANT_MASK = 0xf; /* Quantization field mask. */
|
||||
var NSEGS = 0x8; /* Number of A-law segments. */
|
||||
var SEG_SHIFT = 0x4; /* Left shift for segment number. */
|
||||
var SEG_MASK = 0x70; /* Segment field mask. */
|
||||
|
||||
|
||||
var seg_aend = new Int16Array([0x1F, 0x3F, 0x7F, 0xFF,0x1FF, 0x3FF, 0x7FF, 0xFFF]);
|
||||
|
||||
var seg_uend = new Int16Array([0x3F, 0x7F, 0xFF, 0x1FF,0x3FF, 0x7FF, 0xFFF, 0x1FFF]);
|
||||
|
||||
|
||||
/* copy from CCITT G.711 specifications */
|
||||
var _u2a = new Uint8Array([1, 1, 2, 2, 3, 3, 4, 4,
|
||||
5, 5, 6, 6, 7, 7, 8, 8,
|
||||
9, 10, 11, 12, 13, 14, 15, 16,
|
||||
17, 18, 19, 20, 21, 22, 23, 24,
|
||||
25, 27, 29, 31, 33, 34, 35, 36,
|
||||
37, 38, 39, 40, 41, 42, 43, 44,
|
||||
46, 48, 49, 50, 51, 52, 53, 54,
|
||||
55, 56, 57, 58, 59, 60, 61, 62,
|
||||
64, 65, 66, 67, 68, 69, 70, 71,
|
||||
72, 73, 74, 75, 76, 77, 78, 79,
|
||||
80, 82, 83, 84, 85, 86, 87, 88,
|
||||
89, 90, 91, 92, 93, 94, 95, 96,
|
||||
97, 98, 99, 100, 101, 102, 103, 104,
|
||||
105, 106, 107, 108, 109, 110, 111, 112,
|
||||
113, 114, 115, 116, 117, 118, 119, 120,
|
||||
121, 122, 123, 124, 125, 126, 127, 128]);
|
||||
|
||||
/* A- to u-law conversions */
|
||||
var _a2u = new Uint8Array([1, 3, 5, 7, 9, 11, 13, 15,
|
||||
16, 17, 18, 19, 20, 21, 22, 23,
|
||||
24, 25, 26, 27, 28, 29, 30, 31,
|
||||
32, 32, 33, 33, 34, 34, 35, 35,
|
||||
36, 37, 38, 39, 40, 41, 42, 43,
|
||||
44, 45, 46, 47, 48, 48, 49, 49,
|
||||
50, 51, 52, 53, 54, 55, 56, 57,
|
||||
58, 59, 60, 61, 62, 63, 64, 64,
|
||||
65, 66, 67, 68, 69, 70, 71, 72,
|
||||
73, 74, 75, 76, 77, 78, 79, 80,
|
||||
80, 81, 82, 83, 84, 85, 86, 87,
|
||||
88, 89, 90, 91, 92, 93, 94, 95,
|
||||
96, 97, 98, 99, 100, 101, 102, 103,
|
||||
104, 105, 106, 107, 108, 109, 110, 111,
|
||||
112, 113, 114, 115, 116, 117, 118, 119,
|
||||
120, 121, 122, 123, 124, 125, 126, 127]);
|
||||
|
||||
function search( val, table, size)
|
||||
{
|
||||
var i;
|
||||
|
||||
for (i = 0; i < size; i++) {
|
||||
if (val <= table[i])
|
||||
return (i);
|
||||
}
|
||||
|
||||
return (size);
|
||||
}
|
||||
|
||||
function linear2alaw(pcm_val) /* 2's complement (16-bit range) */
|
||||
{
|
||||
var mask;
|
||||
var seg;
|
||||
var aval;
|
||||
|
||||
pcm_val = pcm_val >> 3;
|
||||
|
||||
if (pcm_val >= 0) {
|
||||
mask = 0xD5; /* sign (7th) bit = 1 */
|
||||
} else {
|
||||
mask = 0x55; /* sign bit = 0 */
|
||||
pcm_val = -pcm_val - 1;
|
||||
}
|
||||
|
||||
/* Convert the scaled magnitude to segment number. */
|
||||
seg = search(pcm_val, seg_aend, 8);
|
||||
|
||||
/* Combine the sign, segment, and quantization bits. */
|
||||
|
||||
if (seg >= 8) /* out of range, return maximum value. */
|
||||
return (0x7F ^ mask);
|
||||
else {
|
||||
aval = seg << SEG_SHIFT;
|
||||
if (seg < 2)
|
||||
aval |= (pcm_val >> 1) & QUANT_MASK;
|
||||
else
|
||||
aval |= (pcm_val >> seg) & QUANT_MASK;
|
||||
return (aval ^ mask);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* alaw2linear() - Convert an A-law value to 16-bit linear PCM
|
||||
*
|
||||
*/
|
||||
|
||||
function alaw2linear(a_val){
|
||||
var t;
|
||||
var seg;
|
||||
|
||||
a_val ^= 0x55;
|
||||
|
||||
t = (a_val & QUANT_MASK) << 4;
|
||||
seg = (a_val & SEG_MASK) >> SEG_SHIFT;
|
||||
switch (seg) {
|
||||
case 0:
|
||||
t += 8;
|
||||
break;
|
||||
case 1:
|
||||
t += 0x108;
|
||||
break;
|
||||
default:
|
||||
t += 0x108;
|
||||
t <<= seg - 1;
|
||||
}
|
||||
return ((a_val & SIGN_BIT) ? t : -t);
|
||||
}
|
||||
|
||||
|
||||
// #define BIAS (0x84) /* Bias for linear code. */
|
||||
// #define CLIP 8159
|
||||
var BIAS = 0x84;
|
||||
|
||||
var CLIP = 8159;
|
||||
|
||||
// /*
|
||||
// * linear2ulaw() - Convert a linear PCM value to u-law
|
||||
// *
|
||||
// * In order to simplify the encoding process, the original linear magnitude
|
||||
// * is biased by adding 33 which shifts the encoding range from (0 - 8158) to
|
||||
// * (33 - 8191). The result can be seen in the following encoding table:
|
||||
// *
|
||||
// * Biased Linear Input Code Compressed Code
|
||||
// * ------------------------ ---------------
|
||||
// * 00000001wxyza 000wxyz
|
||||
// * 0000001wxyzab 001wxyz
|
||||
// * 000001wxyzabc 010wxyz
|
||||
// * 00001wxyzabcd 011wxyz
|
||||
// * 0001wxyzabcde 100wxyz
|
||||
// * 001wxyzabcdef 101wxyz
|
||||
// * 01wxyzabcdefg 110wxyz
|
||||
// * 1wxyzabcdefgh 111wxyz
|
||||
// *
|
||||
// * Each biased linear code has a leading 1 which identifies the segment
|
||||
// * number. The value of the segment number is equal to 7 minus the number
|
||||
// * of leading 0's. The quantization interval is directly available as the
|
||||
// * four bits wxyz. * The trailing bits (a - h) are ignored.
|
||||
// *
|
||||
// * Ordinarily the complement of the resulting code word is used for
|
||||
// * transmission, and so the code word is complemented before it is returned.
|
||||
// *
|
||||
// * For further information see John C. Bellamy's Digital Telephony, 1982,
|
||||
// * John Wiley & Sons, pps 98-111 and 472-476.
|
||||
function linear2ulaw(pcm_val) /* 2's complement (16-bit range) */
|
||||
{
|
||||
var mask;
|
||||
var seg;
|
||||
var uval;
|
||||
|
||||
/* Get the sign and the magnitude of the value. */
|
||||
pcm_val = pcm_val >> 2;
|
||||
if (pcm_val < 0) {
|
||||
pcm_val = -pcm_val;
|
||||
mask = 0x7F;
|
||||
} else {
|
||||
mask = 0xFF;
|
||||
}
|
||||
if ( pcm_val > CLIP ) pcm_val = CLIP; /* clip the magnitude */
|
||||
pcm_val += (BIAS >> 2);
|
||||
|
||||
/* Convert the scaled magnitude to segment number. */
|
||||
seg = search(pcm_val, seg_uend, 8);
|
||||
|
||||
/*
|
||||
* Combine the sign, segment, quantization bits;
|
||||
* and complement the code word.
|
||||
*/
|
||||
if (seg >= 8) /* out of range, return maximum value. */
|
||||
return (0x7F ^ mask);
|
||||
else {
|
||||
uval = (seg << 4) | ((pcm_val >> (seg + 1)) & 0xF);
|
||||
return (uval ^ mask);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
// /*
|
||||
// * ulaw2linear() - Convert a u-law value to 16-bit linear PCM
|
||||
// *
|
||||
// * First, a biased linear code is derived from the code word. An unbiased
|
||||
// * output can then be obtained by subtracting 33 from the biased code.
|
||||
// *
|
||||
// * Note that this function expects to be passed the complement of the
|
||||
// * original code word. This is in keeping with ISDN conventions.
|
||||
// */
|
||||
function ulaw2linear(u_val)
|
||||
{
|
||||
var t;
|
||||
|
||||
/* Complement to obtain normal u-law value. */
|
||||
u_val = ~u_val;
|
||||
|
||||
/*
|
||||
* Extract and bias the quantization bits. Then
|
||||
* shift up by the segment number and subtract out the bias.
|
||||
*/
|
||||
t = ((u_val & QUANT_MASK) << 3) + BIAS;
|
||||
t <<= (u_val & SEG_MASK) >> SEG_SHIFT;
|
||||
|
||||
return ((u_val & SIGN_BIT) ? (BIAS - t) : (t - BIAS));
|
||||
}
|
||||
|
||||
// /* A-law to u-law conversion */
|
||||
function alaw2ulaw(aval)
|
||||
{
|
||||
aval &= 0xff;
|
||||
return ((aval & 0x80) ? (0xFF ^ _a2u[aval ^ 0xD5]) :
|
||||
(0x7F ^ _a2u[aval ^ 0x55]));
|
||||
}
|
||||
|
||||
/* u-law to A-law conversion */
|
||||
function ulaw2alaw(uval)
|
||||
{
|
||||
uval &= 0xff;
|
||||
return ((uval & 0x80) ? (0xD5 ^ (_u2a[0xFF ^ uval] - 1)) :
|
||||
(0x55 ^ (_u2a[0x7F ^ uval] - 1)));
|
||||
}
|
||||
|
||||
// unsigned char linear_to_alaw[65536];
|
||||
// unsigned char linear_to_ulaw[65536];
|
||||
|
||||
var short_index = new Int16Array(65536);
|
||||
|
||||
var linear_to_alaw = new Uint8Array(65536);
|
||||
var linear_to_ulaw = new Uint8Array(65536);
|
||||
|
||||
// /* 16384 entries per table (8 bit) */
|
||||
// unsigned short alaw_to_linear[256];
|
||||
// unsigned short ulaw_to_linear[256];
|
||||
|
||||
var alaw_to_linear = new Uint8Array(256);
|
||||
var ulaw_to_linear = new Uint8Array(256);
|
||||
|
||||
function build_linear_to_xlaw_table(linear_to_xlaw,linear2xlaw)
|
||||
{
|
||||
var i;
|
||||
for (i=0; i<65536;i++){
|
||||
var v = linear2xlaw(short_index[i]);
|
||||
linear_to_xlaw[i] = v;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
function build_xlaw_to_linear_table(xlaw_to_linear,xlaw2linear)
|
||||
{
|
||||
var i;
|
||||
|
||||
for (i=0; i<256;i++){
|
||||
xlaw_to_linear[i] = xlaw2linear(i);
|
||||
}
|
||||
}
|
||||
|
||||
function pcm16_to_xlaw(linear_to_xlaw, src_length,src_samples,dst_samples)
|
||||
{
|
||||
var i;
|
||||
var s_samples;
|
||||
|
||||
s_samples = src_samples;
|
||||
|
||||
for (i=0; i < src_length / 2; i++)
|
||||
{
|
||||
dst_samples[i] = linear_to_xlaw[s_samples[i]];
|
||||
}
|
||||
}
|
||||
|
||||
function xlaw_to_pcm16(xlaw_to_linear, src_length,src_samples, dst_samples)
|
||||
{
|
||||
var i;
|
||||
var s_samples;
|
||||
var d_samples;
|
||||
|
||||
s_samples = src_samples;
|
||||
d_samples = dst_samples;
|
||||
|
||||
for (i=0; i < src_length; i++)
|
||||
{
|
||||
d_samples[i] = xlaw_to_linear[s_samples[i]];
|
||||
}
|
||||
}
|
||||
|
||||
function pcm16_to_alaw(src_length, src_samples, dst_samples)
|
||||
{
|
||||
pcm16_to_xlaw(linear_to_alaw, src_length, src_samples, dst_samples);
|
||||
}
|
||||
|
||||
function pcm16_to_ulaw(src_length, src_samples, dst_samples)
|
||||
{
|
||||
pcm16_to_xlaw(linear_to_ulaw, src_length, src_samples, dst_samples);
|
||||
}
|
||||
|
||||
function alaw_to_pcm16(src_length, src_samples, dst_samples)
|
||||
{
|
||||
xlaw_to_pcm16(alaw_to_linear, src_length, src_samples, dst_samples);
|
||||
}
|
||||
|
||||
function ulaw_to_pcm16(src_length, src_samples, dst_samples)
|
||||
{
|
||||
xlaw_to_pcm16(ulaw_to_linear, src_length, src_samples, dst_samples);
|
||||
}
|
||||
|
||||
function pcm16_alaw_tableinit()
|
||||
{
|
||||
build_linear_to_xlaw_table(linear_to_alaw, linear2alaw);
|
||||
}
|
||||
|
||||
function pcm16_ulaw_tableinit()
|
||||
{
|
||||
build_linear_to_xlaw_table(linear_to_ulaw, linear2ulaw);
|
||||
}
|
||||
|
||||
function alaw_pcm16_tableinit()
|
||||
{
|
||||
build_xlaw_to_linear_table(alaw_to_linear, alaw2linear);
|
||||
}
|
||||
|
||||
function ulaw_pcm16_tableinit()
|
||||
{
|
||||
build_xlaw_to_linear_table(ulaw_to_linear, ulaw2linear);
|
||||
}
|
||||
|
||||
for(var i =0; i < 65536;i++){
|
||||
short_index[i] = i;
|
||||
}
|
||||
|
||||
pcm16_alaw_tableinit();
|
||||
pcm16_ulaw_tableinit();
|
||||
alaw_pcm16_tableinit();
|
||||
ulaw_pcm16_tableinit();
|
Reference in New Issue
Block a user