refactor: 语音对讲移植到流媒体服务器

This commit is contained in:
ydajiang
2025-05-09 20:58:22 +08:00
parent 89d3a885a9
commit 891304f852
26 changed files with 1508 additions and 1903 deletions

259
api.go
View File

@@ -2,13 +2,11 @@ package main
import (
"context"
"encoding/binary"
"fmt"
"github.com/ghettovoice/gosip/sip"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/rtp"
"math"
"net/http"
"strconv"
@@ -67,16 +65,10 @@ type PlatformChannel struct {
}
type BroadcastParams struct {
DeviceID string `json:"device_id"`
ChannelID string `json:"channel_id"`
RoomID string `json:"room_id"`
Type int `json:"type"`
}
type HangupParams struct {
DeviceID string `json:"device_id"`
ChannelID string `json:"channel_id"`
RoomID string `json:"room_id"`
DeviceID string `json:"device_id"`
ChannelID string `json:"channel_id"`
StreamId StreamID `json:"stream_id"`
Setup *SetupType `json:"setup"`
}
type RecordParams struct {
@@ -163,16 +155,9 @@ func startApiServer(addr string) {
apiServer.router.HandleFunc("/api/v1/platform/channel/bind", withDecodedParams(apiServer.OnPlatformChannelBind, &PlatformChannel{})) // 级联绑定通道
apiServer.router.HandleFunc("/api/v1/platform/channel/unbind", withDecodedParams(apiServer.OnPlatformChannelUnbind, &PlatformChannel{})) // 级联解绑通道
apiServer.router.HandleFunc("/ws/v1/talk", apiServer.OnWSTalk) // 语音广播/对讲, 主讲音频传输链路
apiServer.router.HandleFunc("/api/v1/broadcast/invite", withDecodedParams(apiServer.OnBroadcast, &BroadcastParams{Type: int(BroadcastTypeTCP)})) // 发起语音广播
apiServer.router.HandleFunc("/api/v1/broadcast/hangup", withDecodedParams(apiServer.OnHangup, &HangupParams{})) // 挂断广播会话
apiServer.router.HandleFunc("/api/v1/talk", apiServer.OnTalk) // 语音对讲
apiServer.router.HandleFunc("/broadcast.html", func(writer http.ResponseWriter, request *http.Request) {
http.ServeFile(writer, request, "./broadcast.html")
})
apiServer.router.HandleFunc("/g711.js", func(writer http.ResponseWriter, request *http.Request) {
http.ServeFile(writer, request, "./g711.js")
})
apiServer.router.HandleFunc("/api/v1/broadcast/invite", withDecodedParams(apiServer.OnBroadcast, &BroadcastParams{Setup: &DefaultSetupType})) // 发起语音广播
apiServer.router.HandleFunc("/api/v1/broadcast/hangup", withDecodedParams(apiServer.OnHangup, &BroadcastParams{})) // 挂断广播会话
apiServer.router.HandleFunc("/api/v1/talk", apiServer.OnTalk) // 语音对讲
http.Handle("/", apiServer.router)
@@ -241,11 +226,11 @@ func (api *ApiServer) OnPlay(params *StreamParams, w http.ResponseWriter, r *htt
var err error
streamType := strings.ToLower(query.Get("stream_type"))
if "playback" == streamType {
code, stream, err = api.DoInvite(InviteTypeLive, inviteParams, false, w, r)
code, stream, err = api.DoInvite(InviteTypePlay, inviteParams, false, w, r)
} else if "download" == streamType {
code, stream, err = api.DoInvite(InviteTypeDownload, inviteParams, false, w, r)
} else {
code, stream, err = api.DoInvite(InviteTypeLive, inviteParams, false, w, r)
code, stream, err = api.DoInvite(InviteTypePlay, inviteParams, false, w, r)
}
if err != nil {
@@ -261,28 +246,37 @@ func (api *ApiServer) OnPlay(params *StreamParams, w http.ResponseWriter, r *htt
func (api *ApiServer) OnPlayDone(params *PlayDoneParams, w http.ResponseWriter, r *http.Request) {
Sugar.Infof("播放结束事件. protocol: %s stream: %s", params.Protocol, params.Stream)
stream := StreamManager.Find(params.Stream)
if stream == nil {
Sugar.Errorf("处理播放结束事件失败, stream不存在. id: %s", params.Stream)
//stream := StreamManager.Find(params.Stream)
//if stream == nil {
// Sugar.Errorf("处理播放结束事件失败, stream不存在. id: %s", params.Stream)
// return
//}
//if 0 == stream.DecreaseSinkCount() && Config.AutoCloseOnIdle {
// CloseStream(params.Stream, true)
//}
if !strings.HasPrefix(params.Protocol, "gb") {
return
}
if 0 == stream.DecreaseSinkCount() && Config.AutoCloseOnIdle {
CloseStream(params.Stream, true)
sink := RemoveForwardSink(params.Stream, params.Sink)
if sink == nil {
Sugar.Errorf("处理转发结束事件失败, 找不到sink. stream: %s sink: %s", params.Stream, params.Sink)
return
}
// 级联断开连接, 向上级发送Bye请求
if params.Protocol == "gb_stream_forward" {
sink := stream.RemoveForwardStreamSink(params.Sink)
if sink == nil || sink.Dialog == nil {
return
}
if params.Protocol == "gb_cascaded_forward" {
if platform := PlatformManager.FindPlatform(sink.ServerID); platform != nil {
callID, _ := sink.Dialog.CallID()
platform.CloseStream(callID.String(), true, false)
}
} else if params.Protocol == "gb_talk_forward" {
// 对讲设备断开连接
}
sink.Close(true, false)
}
func (api *ApiServer) OnPublish(params *StreamParams, w http.ResponseWriter, r *http.Request) {
@@ -290,7 +284,30 @@ func (api *ApiServer) OnPublish(params *StreamParams, w http.ResponseWriter, r *
stream := StreamManager.Find(params.Stream)
if stream != nil {
stream.publishEvent <- 0
stream.onPublishCb <- 200
}
// 对讲websocket已连接
// 创建stream
if "gb_talk" == params.Protocol {
Sugar.Infof("对讲websocket已连接, stream: %s", params.Stream)
s := &Stream{
ID: params.Stream,
Protocol: params.Protocol,
CreateTime: time.Now().Unix(),
}
_, ok := StreamManager.Add(s)
if !ok {
Sugar.Errorf("处理推流事件失败, stream已存在. id: %s", params.Stream)
w.WriteHeader(http.StatusBadRequest)
return
}
if DB != nil {
go DB.SaveStream(s)
}
}
}
@@ -298,6 +315,10 @@ func (api *ApiServer) OnPublishDone(params *StreamParams, w http.ResponseWriter,
Sugar.Infof("推流结束事件. protocol: %s stream: %s", params.Protocol, params.Stream)
CloseStream(params.Stream, false)
// 对讲websocket断开连接
if "gb_talk" == params.Protocol {
}
}
func (api *ApiServer) OnIdleTimeout(params *StreamParams, w http.ResponseWriter, req *http.Request) {
@@ -336,7 +357,7 @@ func (api *ApiServer) OnInvite(v *InviteParams, w http.ResponseWriter, r *http.R
} else if "download" == action {
code, stream, err = apiServer.DoInvite(InviteTypeDownload, v, true, w, r)
} else if "live" == action {
code, stream, err = apiServer.DoInvite(InviteTypeLive, v, true, w, r)
code, stream, err = apiServer.DoInvite(InviteTypePlay, v, true, w, r)
} else {
w.WriteHeader(http.StatusNotFound)
return
@@ -369,7 +390,7 @@ func (api *ApiServer) DoInvite(inviteType InviteType, params *InviteParams, sync
// 解析回放或下载的时间范围参数
var startTimeSeconds string
var endTimeSeconds string
if InviteTypeLive != inviteType {
if InviteTypePlay != inviteType {
startTime, err := time.ParseInLocation("2006-01-02t15:04:05", params.StartTime, time.Local)
if err != nil {
return http.StatusBadRequest, nil, err
@@ -385,7 +406,7 @@ func (api *ApiServer) DoInvite(inviteType InviteType, params *InviteParams, sync
}
if params.streamId == "" {
params.streamId = GenerateStreamId(inviteType, device.GetID(), params.ChannelID, params.StartTime, params.EndTime)
params.streamId = GenerateStreamID(inviteType, device.GetID(), params.ChannelID, params.StartTime, params.EndTime)
}
// 解析回放或下载速度参数
@@ -547,75 +568,36 @@ func (api *ApiServer) OnPTZControl(w http.ResponseWriter, r *http.Request) {
}
func (api *ApiServer) OnWSTalk(w http.ResponseWriter, r *http.Request) {
conn, err := api.upgrader.Upgrade(w, r, nil)
if err != nil {
Sugar.Errorf("websocket头检查失败 err: %s", err.Error())
w.WriteHeader(http.StatusBadRequest)
return
func (api *ApiServer) OnHangup(v *BroadcastParams, w http.ResponseWriter, r *http.Request) {
Sugar.Infof("广播挂断 %v", *v)
id := GenerateStreamID(InviteTypeBroadcast, v.DeviceID, v.ChannelID, "", "")
if sink := RemoveForwardSinkWithSinkStreamId(id); sink != nil {
sink.Close(true, true)
}
roomId := utils.RandStringBytes(10)
room := BroadcastManager.CreateRoom(roomId)
response := MalformedRequest{200, "ok", map[string]string{
"room_id": roomId,
}}
conn.WriteJSON(response)
packet := make([]byte, 1500)
muxer := rtp.NewMuxer(8, 0, 0xFFFFFFFF)
for {
_, bytes, err := conn.ReadMessage()
n := len(bytes)
if err != nil {
Sugar.Infof("语音断开连接")
break
} else if n < 1 {
continue
}
count := (n-1)/320 + 1
for i := 0; i < count; i++ {
offset := i * 320
min := int(math.Min(float64(n), 320))
muxer.Input(bytes[offset:offset+min], uint32(min), func() []byte {
return packet[2:]
}, func(data []byte) {
binary.BigEndian.PutUint16(packet, uint16(len(data)))
room.DispatchRtpPacket(packet[:2+len(data)])
})
n -= min
}
}
Sugar.Infof("主讲websocket断开连接 room: %s", roomId)
sessions := BroadcastManager.RemoveRoom(roomId)
for _, session := range sessions {
session.Close(true)
}
}
func (api *ApiServer) OnHangup(v *HangupParams, w http.ResponseWriter, r *http.Request) {
if session := BroadcastManager.Remove(GenerateSessionId(v.DeviceID, v.ChannelID)); session != nil {
session.Close(true)
}
httpResponseOK(w, nil)
}
func (api *ApiServer) OnBroadcast(v *BroadcastParams, w http.ResponseWriter, r *http.Request) {
Sugar.Infof("语音广播 %v", *v)
Sugar.Infof("广播邀请 %v", *v)
var sinkStreamId StreamID
var InviteSourceId string
var ok bool
var err error
// 响应错误消息
defer func() {
if err != nil {
Sugar.Errorf("广播失败 err: %s", err.Error())
httpResponseError(w, err.Error())
if InviteSourceId != "" {
BroadcastDialogs.Remove(InviteSourceId)
}
if sinkStreamId != "" {
SinkManager.RemoveWithSinkStreamId(sinkStreamId)
}
}
}()
@@ -625,40 +607,48 @@ func (api *ApiServer) OnBroadcast(v *BroadcastParams, w http.ResponseWriter, r *
return
}
broadcastRoom := BroadcastManager.FindRoom(v.RoomID)
if broadcastRoom == nil {
//err := fmt.Errorf("the room with id '%s' is not found", v.RoomID)
err = fmt.Errorf("广播房间找不到. room: %s", v.RoomID)
// 主讲人id
source := StreamManager.Find(v.StreamId)
if source == nil {
//err := fmt.Errorf("the room with id '%s' is not found", v.Source)
err = fmt.Errorf("房间找不到. room: %s", v.StreamId)
return
}
// 生成下级设备Invite请求携带的user
// server用于区分是哪个设备的广播
InviteSourceId = string(v.StreamId) + utils.RandStringBytes(10)
// 每个设备的广播唯一ID
sessionId := GenerateSessionId(v.DeviceID, v.ChannelID)
if BroadcastManager.Find(sessionId) != nil {
err = fmt.Errorf("设备正在广播中. session: %s", sessionId)
return
}
// 生成让下级应答时携带的ID
sourceId := v.RoomID + utils.RandStringBytes(10)
session := &BroadcastSession{
SourceID: sourceId,
DeviceID: v.DeviceID,
ChannelID: v.ChannelID,
RoomId: v.RoomID,
Type: BroadcastType(v.Type),
}
if !BroadcastManager.AddSession(v.RoomID, session) {
err = fmt.Errorf("设备正在广播中. session: %s", sessionId)
sinkStreamId = GenerateStreamID(InviteTypeBroadcast, v.DeviceID, v.ChannelID, "", "")
setupType := SetupTypePassive
if v.Setup != nil && *v.Setup >= SetupTypeUDP && *v.Setup <= SetupTypeActive {
setupType = *v.Setup
}
sink := &Sink{
Stream: v.StreamId,
SinkStream: sinkStreamId,
Protocol: "gb_talk_forward",
CreateTime: time.Now().Unix(),
SetupType: setupType,
}
if ok = SinkManager.AddWithSinkStreamId(sink); !ok {
err = fmt.Errorf("设备正在广播中. session: %s", sinkStreamId)
return
} else if _, ok = BroadcastDialogs.Add(InviteSourceId, sink); !ok {
err = fmt.Errorf("source id 冲突. session: %s", InviteSourceId)
return
}
ok = false
cancel := r.Context()
transaction := device.Broadcast(sourceId, v.ChannelID)
transaction := device.Broadcast(InviteSourceId, v.ChannelID)
responses := transaction.Responses()
var ok bool
select {
// 等待message broadcast的应答
case response := <-responses:
if response == nil {
err = fmt.Errorf("信令超时")
@@ -670,31 +660,24 @@ func (api *ApiServer) OnBroadcast(v *BroadcastParams, w http.ResponseWriter, r *
break
}
// 不等下级的广播请求, 直接等Invite
timeout, _ := context.WithTimeout(r.Context(), 10*time.Second)
select {
case <-timeout.Done():
err = fmt.Errorf("invite超时. session: %s", session.Id())
break
case code := <-session.Answer:
if http.StatusOK != code {
err = fmt.Errorf("bad status code %d", code)
} else {
ok = true
}
break
// 等待下级设备的Invite请求
code := sink.WaitForPublishEvent(10)
if code == -1 {
err = fmt.Errorf("等待invite超时. session: %s", sinkStreamId)
} else if http.StatusOK != code {
err = fmt.Errorf("bad status code %d", code)
} else {
ok = AddForwardSink(v.StreamId, sink)
}
break
case <-cancel.Done():
// 取消http请求
Sugar.Warnf("广播失败, 取消http请求. session: %s", session.Id())
// http请求取消
Sugar.Warnf("广播失败, http请求取消. session: %s", sinkStreamId)
break
}
if ok {
httpResponseOK(w, nil)
} else {
BroadcastManager.Remove(sessionId)
}
}
@@ -777,6 +760,8 @@ func (api *ApiServer) OnPlatformList(w http.ResponseWriter, r *http.Request) {
}
func (api *ApiServer) OnPlatformChannelBind(v *PlatformChannel, w http.ResponseWriter, r *http.Request) {
Sugar.Infof("级联绑定通道 %v", *v)
platform := PlatformManager.FindPlatform(v.ServerID)
if platform == nil {

View File

@@ -4,7 +4,6 @@ import (
"fmt"
"gb-cms/sdp"
"github.com/ghettovoice/gosip/sip"
"github.com/lkmio/transport"
"net"
"net/http"
"strconv"
@@ -30,6 +29,32 @@ const (
"a=rtpmap:8 PCMA/8000\r\n"
)
func findSetup(descriptor *sdp.SDP) SetupType {
var tcp bool
if descriptor.Audio != nil {
tcp = strings.Contains(descriptor.Audio.Proto, "TCP")
}
if !tcp && descriptor.Video != nil {
tcp = strings.Contains(descriptor.Video.Proto, "TCP")
}
setup := SetupTypeUDP
if tcp {
for _, attr := range descriptor.Attrs {
if "setup" == attr[0] {
if SetupTypePassive.String() == attr[1] {
setup = SetupTypePassive
} else if SetupTypeActive.String() == attr[1] {
setup = SetupTypeActive
}
}
}
}
return setup
}
func (d *Device) DoBroadcast(sourceId, channelId string) error {
body := fmt.Sprintf(BroadcastFormat, 1, sourceId, channelId)
request := d.BuildMessageRequest(channelId, body)
@@ -38,76 +63,61 @@ func (d *Device) DoBroadcast(sourceId, channelId string) error {
return nil
}
// OnInvite 邀请语音广播
// OnInvite 语音广播
func (d *Device) OnInvite(request sip.Request, user string) sip.Response {
session := FindBroadcastSessionWithSourceID(user)
if session == nil {
sink := BroadcastDialogs.Find(user)
if sink == nil {
return CreateResponseWithStatusCode(request, http.StatusBadRequest)
}
body := request.Body()
offer, err := sdp.Parse(body)
if err != nil {
Sugar.Infof("解析sdp失败. session: %s err: %s sdp: %s", session.Id(), err.Error(), body)
session.Answer <- http.StatusBadRequest
Sugar.Infof("广播失败, 解析sdp发生err: %s sink: %s sdp: %s", err.Error(), sink.ID, body)
sink.onPublishCb <- http.StatusBadRequest
return CreateResponseWithStatusCode(request, http.StatusBadRequest)
} else if offer.Audio == nil {
Sugar.Infof("offer中缺少audio字段. session: %s sdp: %s", session.Id(), body)
session.Answer <- http.StatusBadRequest
Sugar.Infof("广播失败, offer中缺少audio字段. sink: %s sdp: %s", sink.ID, body)
sink.onPublishCb <- http.StatusBadRequest
return CreateResponseWithStatusCode(request, http.StatusBadRequest)
}
// 通知流媒体服务器创建answer
offerSetup := findSetup(offer)
answerSetup := sink.SetupType
finalSetup := offerSetup
if answerSetup != offerSetup {
finalSetup = answerSetup
}
addr := net.JoinHostPort(offer.Addr, strconv.Itoa(int(offer.Audio.Port)))
host, port, sinkId, err := CreateAnswer(string(sink.Stream), addr, offerSetup.String(), answerSetup.String(), "", string(InviteTypeBroadcast))
if err != nil {
Sugar.Errorf("广播失败, 流媒体创建answer发生err: %s sink: %s ", err.Error(), sink.ID)
sink.onPublishCb <- http.StatusInternalServerError
return CreateResponseWithStatusCode(request, http.StatusInternalServerError)
}
var answerSDP string
isTcp := strings.Contains(offer.Audio.Proto, "TCP")
// UDP广播
if !isTcp && BroadcastTypeUDP == session.Type {
var client *transport.UDPClient
err := TransportManager.AllocPort(false, func(port uint16) error {
client = &transport.UDPClient{}
localAddr, _ := net.ResolveUDPAddr("udp", net.JoinHostPort(Config.ListenIP, strconv.Itoa(int(port))))
remoteAddr, _ := net.ResolveUDPAddr("udp", net.JoinHostPort(offer.Addr, strconv.Itoa(int(offer.Audio.Port))))
return client.Connect(localAddr, remoteAddr)
})
if err == nil {
Sugar.Errorf("创建UDP广播端口失败 err:%s", err.Error())
session.Answer <- http.StatusInternalServerError
return CreateResponseWithStatusCode(request, http.StatusInternalServerError)
}
session.RemoteIP = offer.Addr
session.RemotePort = int(offer.Audio.Port)
session.Transport = client
session.Transport.SetHandler(session)
answerSDP = fmt.Sprintf(AnswerFormat, Config.SipId, Config.PublicIP, Config.PublicIP, client.ListenPort(), "RTP/AVP")
if SetupTypeUDP == finalSetup {
answerSDP = fmt.Sprintf(AnswerFormat, Config.SipId, host, host, port, "RTP/AVP")
} else {
// TCP广播
server, err := TransportManager.NewTCPServer()
if err != nil {
Sugar.Errorf("创建TCP广播端口失败 session: %s err:%s", session.Id(), err.Error())
session.Answer <- http.StatusInternalServerError
return CreateResponseWithStatusCode(request, http.StatusInternalServerError)
}
go server.Accept()
session.Transport = server
session.Transport.SetHandler(session)
answerSDP = fmt.Sprintf(AnswerFormat, Config.SipId, Config.PublicIP, Config.PublicIP, server.ListenPort(), "TCP/RTP/AVP")
answerSDP = fmt.Sprintf(AnswerFormat, Config.SipId, host, host, port, "TCP/RTP/AVP")
}
// 创建answer和dialog
response := CreateResponseWithStatusCode(request, http.StatusOK)
setToTag(response)
session.Successful = true
session.ByeRequest = d.CreateDialogRequestFromAnswer(response, true)
id, _ := request.CallID()
BroadcastManager.AddSessionWithCallId(id.Value(), session)
sink.ID = sinkId
sink.Dialog = d.CreateDialogRequestFromAnswer(response, true)
response.SetBody(answerSDP, true)
response.AppendHeader(&SDPMessageType)
response.AppendHeader(GlobalContactAddress.AsContactHeader())
session.Answer <- http.StatusOK
sink.onPublishCb <- http.StatusOK
return response
}

View File

@@ -1,371 +0,0 @@
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8"/>
<meta name="viewport"
content="width=device-width, user-scalable=no, initial-scale=1.0, maximum-scale=1.0, minimum-scale=1.0"/>
<meta name="apple-mobile-web-capable" content="yes"/>
<title>语音广播</title>
</head>
<body>
<button id="intercomBegin">开始广播</button>
<button id="intercomEnd">关闭广播</button>
<input style="width: 100px;" id="device_id" type="text" value="34020000001320000001"/>
<input style="width: 100px;" id="channel_id" type="text" value="34020000001320000001"/>
<button id="invite" onclick="invite()">邀请</button>
<button id="hangup" onclick="hangup()">挂断</button>
</body>
<script src="g711.js"></script>
<script type="text/javascript">
var begin = document.getElementById('intercomBegin');
var end = document.getElementById('intercomEnd');
var ws = null; //实现WebSocket
var record = null; //多媒体对象,用来处理音频
var roomId = null;
function init(rec) {
record = rec;
}
function invite() {
let deviceId = document.getElementById("device_id").value;
let channelId = document.getElementById("channel_id").value;
let data = {
device_id: deviceId,
channel_id: channelId,
type: 1,
room_id: roomId
};
fetch("/api/v1/broadcast/invite", {
method: 'POST',
body: JSON.stringify(data),
headers: new Headers({
'Content-Type': 'application/json',
}),
}).then((res) => res.json())
.then((data) => {
})
}
function hangup() {
let deviceId = document.getElementById("device_id").value;
let channelId = document.getElementById("channel_id").value;
let data = {
device_id: deviceId,
channel_id: channelId,
room_id: roomId
};
fetch("/api/v1/broadcast/hangup", {
method: 'POST',
body: JSON.stringify(data),
headers: new Headers({
'Content-Type': 'application/json',
}),
}).then((res) => res.json())
.then((data) => {
})
}
//录音对象
var Recorder = function (stream) {
var sampleBits = 16; //输出采样数位 8, 16
var sampleRate = 8000; //输出采样率
var context = new AudioContext();
var audioInput = context.createMediaStreamSource(stream);
var recorder = context.createScriptProcessor(4096, 1, 1);
var audioData = {
size: 0, //录音文件长度
buffer: [], //录音缓存
inputSampleRate: 48000, //输入采样率
inputSampleBits: 16, //输入采样数位 8, 16
outputSampleRate: sampleRate, //输出采样数位
oututSampleBits: sampleBits, //输出采样率
clear: function () {
this.buffer = [];
this.size = 0;
},
input: function (data) {
this.buffer.push(new Float32Array(data));
this.size += data.length;
},
compress: function () { //合并压缩
//合并
var data = new Float32Array(this.size);
var offset = 0;
for (var i = 0; i < this.buffer.length; i++) {
data.set(this.buffer[i], offset);
offset += this.buffer[i].length;
}
//压缩
var compression = parseInt(this.inputSampleRate / this.outputSampleRate);
var length = data.length / compression;
var result = new Float32Array(length);
var index = 0,
j = 0;
while (index < length) {
result[index] = data[j];
j += compression;
index++;
}
return result;
},
encodePCM: function () { //这里不对采集到的数据进行其他格式处理,如有需要均交给服务器端处理。
var sampleRate = Math.min(this.inputSampleRate, this.outputSampleRate);
var sampleBits = Math.min(this.inputSampleBits, this.oututSampleBits);
var bytes = this.compress();
var dataLength = bytes.length * (sampleBits / 8);
var buffer = new ArrayBuffer(dataLength);
var data = new DataView(buffer);
var offset = 0;
for (var i = 0; i < bytes.length; i++, offset += 2) {
var s = Math.max(-1, Math.min(1, bytes[i]));
data.setInt16(offset, s < 0 ? s * 0x8000 : s * 0x7FFF, true);
}
return new Blob([data]);
}
};
var sendData = function () { //对以获取的数据进行处理(分包)
var reader = new FileReader();
reader.onload = e => {
var outbuffer = e.target.result;
var arr = new Uint16Array(outbuffer);
var dst = new Int8Array(arr.length);
pcm16_to_alaw(arr.byteLength, arr, dst);
ws.send(dst);
};
reader.readAsArrayBuffer(audioData.encodePCM());
audioData.clear();//每次发送完成则清理掉旧数据
};
this.start = function () {
audioInput.connect(recorder);
recorder.connect(context.destination);
}
this.stop = function () {
recorder.disconnect();
}
this.getBlob = function () {
return audioData.encodePCM();
}
this.clear = function () {
audioData.clear();
}
recorder.onaudioprocess = function (e) {
var inputBuffer = e.inputBuffer.getChannelData(0);
audioData.input(inputBuffer);
sendData();
}
}
//录音对象
/* var Recorder = function(stream) {
var sampleBits = 16; //输出采样数位 8, 16
var sampleRate = 8000; //输出采样率
var context = new AudioContext();
var audioInput = context.createMediaStreamSource(stream);
var recorder = context.createScriptProcessor(4096, 1, 1);
var audioData = {
size: 0, //录音文件长度
buffer: [], //录音缓存
inputSampleRate: 48000, //输入采样率
inputSampleBits: 16, //输入采样数位 8, 16
outputSampleRate: sampleRate, //输出采样数位
oututSampleBits: sampleBits, //输出采样率
clear: function() {
this.buffer = [];
this.size = 0;
},
input: function(data) {
this.buffer.push(new Float32Array(data));
this.size += data.length;
},
compress: function() { //合并压缩
//合并
var data = new Float32Array(this.size);
var offset = 0;
for (var i = 0; i < this.buffer.length; i++) {
data.set(this.buffer[i], offset);
offset += this.buffer[i].length;
}
//压缩
var compression = parseInt(this.inputSampleRate / this.outputSampleRate);
var length = data.length / compression;
var result = new Float32Array(length);
var index = 0,
j = 0;
while (index < length) {
result[index] = data[j];
j += compression;
index++;
}
return result;
},
encodePCM: function() { //这里不对采集到的数据进行其他格式处理,如有需要均交给服务器端处理。
var sampleRate = Math.min(this.inputSampleRate, this.outputSampleRate);
var sampleBits = Math.min(this.inputSampleBits, this.oututSampleBits);
var bytes = this.compress();
var dataLength = bytes.length * (sampleBits / 8);
var buffer = new ArrayBuffer(dataLength);
var data = new DataView(buffer);
var offset = 0;
for (var i = 0; i < bytes.length; i++, offset += 2) {
var s = Math.max(-1, Math.min(1, bytes[i]));
data.setInt16(offset, s < 0 ? s * 0x8000 : s * 0x7FFF, true);
}
return new Blob([data]);
}
};
var sendData = function() { //对以获取的数据进行处理(分包)
var reader = new FileReader();
reader.onload = e => {
var outbuffer = e.target.result;
var arr = new Int8Array(outbuffer);
if (arr.length > 0) {
var tmparr = new Int8Array(1024);
var j = 0;
for (var i = 0; i < arr.byteLength; i++) {
tmparr[j++] = arr[i];
if (((i + 1) % 1024) == 0) {
ws.send(tmparr);
if (arr.byteLength - i - 1 >= 1024) {
tmparr = new Int8Array(1024);
} else {
tmparr = new Int8Array(arr.byteLength - i - 1);
}
j = 0;
}
if ((i + 1 == arr.byteLength) && ((i + 1) % 1024) != 0) {
ws.send(tmparr);
}
}
}
};
reader.readAsArrayBuffer(audioData.encodePCM());
audioData.clear();//每次发送完成则清理掉旧数据
};
this.start = function() {
audioInput.connect(recorder);
recorder.connect(context.destination);
}
this.stop = function() {
recorder.disconnect();
}
this.getBlob = function() {
return audioData.encodePCM();
}
this.clear = function() {
audioData.clear();
}
recorder.onaudioprocess = function(e) {
var inputBuffer = e.inputBuffer.getChannelData(0);
audioData.input(inputBuffer);
sendData();
}
}*/
/*
* WebSocket
*/
function connectWS() {
let secure = window.location.protocol === 'https:'
let url = (secure ? "wss:" : "ws:") + "/" + window.location.host + "/ws/v1/talk"
ws = new WebSocket(url);
ws.binaryType = 'arraybuffer'; //传输的是 ArrayBuffer 类型的数据
ws.onopen = function () {
console.log('握手成功');
record.start();
};
ws.onmessage = function (msg) {
const response = JSON.parse(msg.data);
roomId = response["data"]["room_id"];
console.info("room_id:" + roomId)
}
ws.onerror = function (err) {
console.info(err)
record.stop()
}
}
/*
* 开始对讲
*/
begin.onclick = function () {
navigator.getUserMedia = navigator.getUserMedia || navigator.webkitGetUserMedia;
if (!navigator.getUserMedia) {
alert('浏览器不支持音频输入');
} else {
navigator.getUserMedia({
audio: true
},
//获取到音频采集权限回调
function (mediaStream) {
console.log('开始对讲');
//初始化采集器
init(new Recorder(mediaStream));
//连接websocket
connectWS();
},
function (error) {
console.log(error);
switch (error.message || error.name) {
case 'PERMISSION_DENIED':
case 'PermissionDeniedError':
console.info('用户拒绝提供信息。');
break;
case 'NOT_SUPPORTED_ERROR':
case 'NotSupportedError':
console.info('浏览器不支持硬件设备。');
break;
case 'MANDATORY_UNSATISFIED_ERROR':
case 'MandatoryUnsatisfiedError':
console.info('无法发现指定的硬件设备。');
break;
default:
console.info('无法打开麦克风。异常信息:' + (error.code || error.name));
break;
}
}
)
}
}
/*
* 关闭对讲
*/
end.onclick = function () {
if (ws) {
ws.close();
record.stop();
console.log('关闭对讲以及WebSocket');
}
}
</script>
</html>

39
broadcast_dialogs.go Normal file
View File

@@ -0,0 +1,39 @@
package main
import "sync"
// BroadcastDialogs 临时保存广播会话
var BroadcastDialogs = &broadcastDialogs{
dialogs: make(map[string]*Sink),
}
type broadcastDialogs struct {
lock sync.RWMutex
dialogs map[string]*Sink
}
func (b *broadcastDialogs) Add(id string, dialog *Sink) (old *Sink, ok bool) {
b.lock.Lock()
defer b.lock.Unlock()
if old, ok = b.dialogs[id]; ok {
return old, false
}
b.dialogs[id] = dialog
return nil, true
}
func (b *broadcastDialogs) Find(id string) *Sink {
b.lock.RLock()
defer b.lock.RUnlock()
return b.dialogs[id]
}
func (b *broadcastDialogs) Remove(id string) *Sink {
b.lock.Lock()
defer b.lock.Unlock()
dialog := b.dialogs[id]
delete(b.dialogs, id)
return dialog
}

View File

@@ -1,152 +1,77 @@
package main
import (
"sync"
)
import "sync"
var (
BroadcastManager *broadcastManager
)
func init() {
BroadcastManager = &broadcastManager{
rooms: make(map[string]*BroadcastRoom, 12),
sessions: make(map[string]*BroadcastSession, 12),
callIds: make(map[string]*BroadcastSession, 12),
}
}
//var BroadcastManager = &broadcastManager{
// streams: make(map[StreamID]*Sink),
// callIds: make(map[string]*Sink),
//}
type broadcastManager struct {
rooms map[string]*BroadcastRoom //主讲人关联房间
sessions map[string]*BroadcastSession //sessionId关联广播会话
callIds map[string]*BroadcastSession //callId关联广播会话
lock sync.RWMutex
streams map[StreamID]*Sink // device stream id ->sink
callIds map[string]*Sink // invite call id->sink
lock sync.RWMutex
}
func FindBroadcastSessionWithSourceID(user string) *BroadcastSession {
roomId := user[:10]
room := BroadcastManager.FindRoom(roomId)
if room != nil {
return room.Find(user)
}
return nil
}
func (b *broadcastManager) CreateRoom(id string) *BroadcastRoom {
func (b *broadcastManager) Add(id StreamID, sink *Sink) (old *Sink, ok bool) {
b.lock.Lock()
defer b.lock.Unlock()
if _, ok := b.rooms[id]; ok {
panic("system error")
old, ok = b.streams[id]
if ok {
return old, false
}
room := &BroadcastRoom{
members: make(map[string]*BroadcastSession, 12),
}
b.rooms[id] = room
return room
b.streams[id] = sink
return nil, true
}
func (b *broadcastManager) FindRoom(id string) *BroadcastRoom {
func (b *broadcastManager) AddWithCallId(id string, sink *Sink) bool {
b.lock.Lock()
defer b.lock.Unlock()
if _, ok := b.callIds[id]; ok {
return false
}
b.callIds[id] = sink
return true
}
func (b *broadcastManager) Find(id StreamID) *Sink {
b.lock.RLock()
defer b.lock.RUnlock()
session, ok := b.rooms[id]
if !ok {
return nil
}
return session
return b.streams[id]
}
func (b *broadcastManager) RemoveRoom(roomId string) []*BroadcastSession {
b.lock.Lock()
defer b.lock.Unlock()
room, ok := b.rooms[roomId]
if !ok {
return nil
}
delete(b.rooms, roomId)
return room.PopAll()
}
func (b *broadcastManager) Remove(sessionId string) *BroadcastSession {
b.lock.Lock()
defer b.lock.Unlock()
session, ok := b.sessions[sessionId]
if !ok {
return nil
}
b.RemoveSession(session)
return session
}
func (b *broadcastManager) RemoveSession(session *BroadcastSession) {
delete(b.sessions, session.Id())
if session.ByeRequest != nil {
id, _ := session.ByeRequest.CallID()
delete(b.callIds, id.Value())
}
if room, ok := b.rooms[session.RoomId]; ok {
room.Remove(session.SourceID)
}
}
func (b *broadcastManager) RemoveWithCallId(callId string) *BroadcastSession {
b.lock.Lock()
defer b.lock.Unlock()
session, ok := b.callIds[callId]
if !ok {
return nil
}
b.RemoveSession(session)
return session
}
func (b *broadcastManager) Find(sessionId string) *BroadcastSession {
func (b *broadcastManager) FindWithCallId(id string) *Sink {
b.lock.RLock()
defer b.lock.RUnlock()
if session, ok := b.sessions[sessionId]; ok {
return session
}
return nil
return b.callIds[id]
}
func (b *broadcastManager) AddSession(roomId string, session *BroadcastSession) bool {
func (b *broadcastManager) Remove(id StreamID) *Sink {
b.lock.Lock()
defer b.lock.Unlock()
room, ok := b.rooms[roomId]
sink, ok := b.streams[id]
if !ok {
return false
} else if _, ok := b.sessions[session.Id()]; ok {
return false
} else if add := room.Add(session); add {
b.sessions[session.Id()] = session
return true
return nil
}
return false
if sink.Dialog != nil {
callID, _ := sink.Dialog.CallID()
delete(b.callIds, callID.String())
}
delete(b.streams, id)
return sink
}
func (b *broadcastManager) AddSessionWithCallId(callId string, session *BroadcastSession) bool {
func (b *broadcastManager) RemoveWithCallId(id string) *Sink {
b.lock.Lock()
defer b.lock.Unlock()
if _, ok := b.callIds[callId]; !ok {
b.callIds[callId] = session
sink, ok := b.callIds[id]
if !ok {
return nil
}
return false
delete(b.callIds, id)
delete(b.streams, sink.Stream)
return sink
}

View File

@@ -1,69 +0,0 @@
package main
import "sync"
type BroadcastRoom struct {
members map[string]*BroadcastSession
lock sync.RWMutex
}
func (r *BroadcastRoom) Add(session *BroadcastSession) bool {
r.lock.Lock()
defer r.lock.Unlock()
if _, ok := r.members[session.SourceID]; ok {
return false
}
r.members[session.SourceID] = session
return true
}
func (r *BroadcastRoom) Remove(sourceId string) {
r.lock.Lock()
defer r.lock.Unlock()
_, ok := r.members[sourceId]
if !ok {
return
}
delete(r.members, sourceId)
}
func (r *BroadcastRoom) Exist(sourceId string) bool {
r.lock.RLock()
defer r.lock.RUnlock()
_, ok := r.members[sourceId]
return ok
}
func (r *BroadcastRoom) Find(sourceId string) *BroadcastSession {
r.lock.RLock()
defer r.lock.RUnlock()
session, _ := r.members[sourceId]
return session
}
func (r *BroadcastRoom) PopAll() []*BroadcastSession {
r.lock.Lock()
defer r.lock.Unlock()
var members []*BroadcastSession
for _, session := range r.members {
members = append(members, session)
}
return members
}
func (r *BroadcastRoom) DispatchRtpPacket(data []byte) {
r.lock.RLock()
defer r.lock.RUnlock()
for _, session := range r.members {
session.Write(data)
}
}

View File

@@ -1,85 +0,0 @@
package main
import (
"fmt"
"github.com/ghettovoice/gosip/sip"
"github.com/lkmio/transport"
"net"
)
type BroadcastType int
const (
BroadcastTypeUDP = BroadcastType(0) // server主动向client的udp地址发包
BroadcastTypeTCP = BroadcastType(1) // 等待client连接tcpserver, 用此链接发包
BroadcastTypeTCPStream = BroadcastType(2) // @See BroadcastTypeTCP, 包头不含2字节包长
)
type BroadcastSession struct {
SourceID string // 发送广播消息时, 让设备invite请求携带的Id
DeviceID string
ChannelID string
RoomId string
Transport transport.Transport
Type BroadcastType
RemotePort int
RemoteIP string // udp广播时, 对方的连接地址
Successful bool // 对讲成功
Answer chan int // 处理invite后, 通知http接口
conn net.Conn // tcp广播时, client的链路
ByeRequest sip.Request
}
func GenerateSessionId(did, cid string) string {
return fmt.Sprintf("%s/%s", did, cid)
}
func (s *BroadcastSession) Id() string {
return GenerateSessionId(s.DeviceID, s.ChannelID)
}
func (s *BroadcastSession) OnConnected(conn net.Conn) []byte {
s.conn = conn
sessionId := GenerateSessionId(s.DeviceID, s.ChannelID)
Sugar.Infof("TCP语音广播连接 session:%s", sessionId)
return nil
}
func (s *BroadcastSession) OnPacket(conn net.Conn, data []byte) []byte {
return nil
}
func (s *BroadcastSession) OnDisConnected(conn net.Conn, err error) {
sessionId := GenerateSessionId(s.DeviceID, s.ChannelID)
Sugar.Infof("TCP语音广播断开连接 session:%s", sessionId)
BroadcastManager.Remove(sessionId)
s.Close(true)
}
func (s *BroadcastSession) Close(sendBye bool) {
if s.Transport != nil {
s.Transport.Close()
s.Transport = nil
}
if sendBye && s.ByeRequest != nil {
SipUA.SendRequest(s.ByeRequest)
s.ByeRequest = nil
}
}
func (s *BroadcastSession) Write(data []byte) {
if BroadcastTypeUDP == s.Type {
s.Transport.(*transport.UDPClient).Write(data[2:])
} else if s.conn == nil {
return
}
if BroadcastTypeTCPStream == s.Type {
s.conn.Write(data[2:])
} else {
s.conn.Write(data)
}
}

View File

@@ -1,360 +1,361 @@
package main
import (
"context"
"encoding/binary"
"encoding/json"
"fmt"
"github.com/ghettovoice/gosip/sip"
"github.com/lkmio/rtp"
"github.com/lkmio/transport"
"net"
"net/http"
"os"
"strconv"
"strings"
"sync"
"testing"
"time"
)
var (
rtpPackets [][]byte
locks map[uint32]*sync.RWMutex
)
type MediaStream struct {
ssrc uint32
tcp bool
conn net.Conn
transport transport.Transport
cancel context.CancelFunc
dialog sip.Request
ctx context.Context
closedCB func(sendBye bool)
}
func (m *MediaStream) write() {
var index int
length := len(rtpPackets)
for m.ctx.Err() == nil && index < length {
time.Sleep(time.Millisecond * 40)
//一次发送某个时间范围内的所有rtp包
ts := binary.BigEndian.Uint32(rtpPackets[index][2+4:])
mutex := locks[ts]
{
mutex.Lock()
for ; m.ctx.Err() == nil && index < length; index++ {
bytes := rtpPackets[index]
nextTS := binary.BigEndian.Uint32(bytes[2+4:])
if nextTS != ts {
break
}
rtp.ModifySSRC(bytes[2:], m.ssrc)
if m.tcp {
m.conn.Write(bytes)
} else {
m.transport.(*transport.UDPClient).Write(bytes[2:])
}
}
mutex.Unlock()
}
}
println("推流结束")
m.Close(true)
}
func (m *MediaStream) Start() {
m.ctx, m.cancel = context.WithCancel(context.Background())
go m.write()
}
func (m *MediaStream) Close(sendBye bool) {
m.cancel()
if m.closedCB != nil {
m.closedCB(sendBye)
}
}
func (m *MediaStream) OnConnected(conn net.Conn) []byte {
m.conn = conn
fmt.Printf("tcp连接:%s", conn.RemoteAddr())
return nil
}
func (m *MediaStream) OnPacket(conn net.Conn, data []byte) []byte {
return nil
}
func (m *MediaStream) OnDisConnected(conn net.Conn, err error) {
fmt.Printf("tcp断开连接:%s", conn.RemoteAddr())
m.Close(true)
}
type VirtualDevice struct {
*Client
streams map[string]*MediaStream
lock sync.Locker
}
func CreateTransport(ip string, port int, setup string, handler transport.Handler) (transport.Transport, bool, error) {
if "passive" == setup {
tcpClient := &transport.TCPClient{}
tcpClient.SetHandler(handler)
_, err := tcpClient.Connect(nil, &net.TCPAddr{IP: net.ParseIP(ip), Port: port})
return tcpClient, true, err
} else if "active" == setup {
tcpServer := &transport.TCPServer{}
tcpServer.SetHandler(handler)
err := tcpServer.Bind(nil)
return tcpServer, true, err
} else {
udp := &transport.UDPClient{}
err := udp.Connect(nil, &net.UDPAddr{IP: net.ParseIP(ip), Port: port})
return udp, false, err
}
}
func (v VirtualDevice) OnInvite(request sip.Request, user string) sip.Response {
if len(rtpPackets) < 1 {
return CreateResponseWithStatusCode(request, http.StatusInternalServerError)
}
offer, ssrc, speed, media, offerSetup, answerSetup, err := ParseGBSDP(request.Body())
if err != nil {
return CreateResponseWithStatusCode(request, http.StatusBadRequest)
}
stream := &MediaStream{}
socket, tcp, err := CreateTransport(offer.Addr, int(media.Port), offerSetup, stream)
if err != nil {
return CreateResponseWithStatusCode(request, http.StatusBadRequest)
}
time := strings.Split(offer.Time, " ")
if len(time) < 2 {
return CreateResponseWithStatusCode(request, http.StatusBadRequest)
}
var ip string
var port sip.Port
var contactAddr string
if v.sipClient.NatAddr != "" {
contactAddr = v.sipClient.NatAddr
} else {
contactAddr = v.sipClient.ListenAddr
}
host, p, _ := net.SplitHostPort(contactAddr)
ip = host
atoi, _ := strconv.Atoi(p)
port = sip.Port(atoi)
contactAddress := &sip.Address{
Uri: &sip.SipUri{
FUser: sip.String{Str: user},
FHost: ip,
FPort: &port,
},
}
answer := BuildSDP(user, offer.Session, ip, uint16(socket.ListenPort()), time[0], time[1], answerSetup, speed, ssrc)
response := CreateResponseWithStatusCode(request, http.StatusOK)
response.RemoveHeader("Contact")
response.AppendHeader(contactAddress.AsContactHeader())
response.AppendHeader(&SDPMessageType)
response.SetBody(answer, true)
setToTag(response)
i, _ := strconv.Atoi(ssrc)
stream.ssrc = uint32(i)
stream.tcp = tcp
stream.dialog = CreateDialogRequestFromAnswer(response, true, v.sipClient.Domain)
callId, _ := response.CallID()
{
v.lock.Lock()
defer v.lock.Unlock()
v.streams[callId.Value()] = stream
}
// 设置网络断开回调
stream.closedCB = func(sendBye bool) {
if stream.dialog != nil {
id, _ := stream.dialog.CallID()
StreamManager.RemoveWithCallId(id.Value())
{
v.lock.Lock()
delete(v.streams, id.Value())
v.lock.Unlock()
}
if sendBye {
bye := CreateRequestFromDialog(stream.dialog, sip.BYE)
v.sipClient.ua.SendRequest(bye)
}
stream.dialog = nil
}
if stream.transport != nil {
stream.transport.Close()
stream.transport = nil
}
}
stream.transport = socket
stream.Start()
// 绑定到StreamManager, bye请求才会找到设备回调
streamId := GenerateStreamId(InviteTypeLive, v.sipClient.Username, user, "", "")
s := Stream{ID: streamId, Dialog: stream.dialog}
StreamManager.Add(&s)
callID, _ := request.CallID()
StreamManager.AddWithCallId(callID.Value(), &s)
return response
}
func (v VirtualDevice) OnBye(request sip.Request) {
id, _ := request.CallID()
stream, ok := v.streams[id.Value()]
if !ok {
return
}
{
// 此作用域内defer不会生效
v.lock.Lock()
delete(v.streams, id.Value())
v.lock.Unlock()
}
stream.Close(false)
}
func (v VirtualDevice) Offline() {
for _, stream := range v.streams {
stream.Close(true)
}
v.streams = nil
}
type ClientConfig struct {
DeviceIDPrefix string `json:"device_id_prefix"`
ChannelIDPrefix string `json:"channel_id_prefix"`
ServerID string `json:"server_id"`
Domain string `json:"domain"`
Password string `json:"password"`
ListenAddr string `json:"listenAddr"`
Count int `json:"count"`
RawFilePath string `json:"rtp_over_tcp_raw_file_path"` // rtp over tcp源文件
}
func TestGBClient(t *testing.T) {
configData, err := os.ReadFile("./client_benchmark_test_config.json")
if err != nil {
panic(err)
}
clientConfig := &ClientConfig{}
if err = json.Unmarshal(configData, clientConfig); err != nil {
panic(err)
}
rtpData, err := os.ReadFile(clientConfig.RawFilePath)
if err != nil {
println("读取rtp源文件失败 不能推流")
} else {
// 分割rtp包
offset := 2
length := len(rtpData)
locks = make(map[uint32]*sync.RWMutex, 128)
for rtpSize := 0; offset < length; offset += rtpSize + 2 {
rtpSize = int(binary.BigEndian.Uint16(rtpData[offset-2:]))
if length-offset < rtpSize {
break
}
bytes := rtpData[offset : offset+rtpSize]
ts := binary.BigEndian.Uint32(bytes[4:])
// 每个相同时间戳共用一把互斥锁, 只允许同时一路流发送该时间戳内的rtp包, 保护ssrc被不同的流修改
if _, ok := locks[ts]; !ok {
locks[ts] = &sync.RWMutex{}
}
rtpPackets = append(rtpPackets, rtpData[offset-2:offset+rtpSize])
}
}
println("========================================")
println("源码地址: https://github.com/lkmio/gb-cms")
println("视频来源于网络,如有侵权,请联系删除")
println("========================================\r\n")
time.Sleep(3 * time.Second)
// 初始化UA配置, 防止SipServer使用时空指针
Config = &Config_{}
listenIP, listenPort, err := net.SplitHostPort(clientConfig.ListenAddr)
if err != nil {
panic(err)
}
atoi, err := strconv.Atoi(listenPort)
if err != nil {
panic(err)
}
server, err := StartSipServer("", listenIP, listenIP, atoi)
if err != nil {
panic(err)
}
DeviceChannelsManager = &DeviceChannels{
channels: make(map[string][]*Channel, clientConfig.Count),
}
for i := 0; i < clientConfig.Count; i++ {
deviceId := clientConfig.DeviceIDPrefix + fmt.Sprintf("%07d", i+1)
channelId := clientConfig.ChannelIDPrefix + fmt.Sprintf("%07d", i+1)
client := NewGBClient(deviceId, clientConfig.ServerID, clientConfig.Domain, "UDP", clientConfig.Password, 500, 40, server)
device := VirtualDevice{client.(*Client), map[string]*MediaStream{}, &sync.Mutex{}}
device.SetDeviceInfo(fmt.Sprintf("测试设备%d", i+1), "lkmio", "lkmio_gb", "dev-0.0.1")
channel := &Channel{
DeviceID: channelId,
Name: "1",
ParentID: deviceId,
}
DeviceManager.Add(device)
DeviceChannelsManager.AddChannel(deviceId, channel)
device.Start()
device.SetOnRegisterHandler(func() {
fmt.Printf(deviceId + " 注册成功\r\n")
}, func() {
fmt.Printf(deviceId + " 离线\r\n")
device.Offline()
})
}
for {
time.Sleep(time.Second * 3)
}
}
//
//import (
// "context"
// "encoding/binary"
// "encoding/json"
// "fmt"
// "github.com/ghettovoice/gosip/sip"
// "github.com/lkmio/rtp"
// "github.com/lkmio/transport"
// "net"
// "net/http"
// "os"
// "strconv"
// "strings"
// "sync"
// "testing"
// "time"
//)
//
//var (
// rtpPackets [][]byte
// locks map[uint32]*sync.RWMutex
//)
//
//type MediaStream struct {
// ssrc uint32
// tcp bool
// conn net.Conn
// transport transport.Transport
// cancel context.CancelFunc
// dialog sip.Request
// ctx context.Context
//
// closedCB func(sendBye bool)
//}
//
//func (m *MediaStream) write() {
// var index int
// length := len(rtpPackets)
// for m.ctx.Err() == nil && index < length {
// time.Sleep(time.Millisecond * 40)
//
// //一次发送某个时间范围内的所有rtp包
// ts := binary.BigEndian.Uint32(rtpPackets[index][2+4:])
// mutex := locks[ts]
// {
// mutex.Lock()
//
// for ; m.ctx.Err() == nil && index < length; index++ {
// bytes := rtpPackets[index]
// nextTS := binary.BigEndian.Uint32(bytes[2+4:])
// if nextTS != ts {
// break
// }
//
// rtp.ModifySSRC(bytes[2:], m.ssrc)
//
// if m.tcp {
// m.conn.Write(bytes)
// } else {
// m.transport.(*transport.UDPClient).Write(bytes[2:])
// }
// }
//
// mutex.Unlock()
// }
// }
//
// println("推流结束")
// m.Close(true)
//}
//
//func (m *MediaStream) Start() {
// m.ctx, m.cancel = context.WithCancel(context.Background())
// go m.write()
//}
//
//func (m *MediaStream) Close(sendBye bool) {
// m.cancel()
//
// if m.closedCB != nil {
// m.closedCB(sendBye)
// }
//}
//
//func (m *MediaStream) OnConnected(conn net.Conn) []byte {
// m.conn = conn
// fmt.Printf("tcp连接:%s", conn.RemoteAddr())
// return nil
//}
//
//func (m *MediaStream) OnPacket(conn net.Conn, data []byte) []byte {
// return nil
//}
//
//func (m *MediaStream) OnDisConnected(conn net.Conn, err error) {
// fmt.Printf("tcp断开连接:%s", conn.RemoteAddr())
// m.Close(true)
//}
//
//type VirtualDevice struct {
// *Client
// streams map[string]*MediaStream
// lock sync.Locker
//}
//
//func CreateTransport(ip string, port int, setup string, handler transport.Handler) (transport.Transport, bool, error) {
// if "passive" == setup {
// tcpClient := &transport.TCPClient{}
// tcpClient.SetHandler(handler)
//
// _, err := tcpClient.Connect(nil, &net.TCPAddr{IP: net.ParseIP(ip), Port: port})
// return tcpClient, true, err
// } else if "active" == setup {
// tcpServer := &transport.TCPServer{}
// tcpServer.SetHandler(handler)
// err := tcpServer.Bind(nil)
//
// return tcpServer, true, err
// } else {
// udp := &transport.UDPClient{}
// err := udp.Connect(nil, &net.UDPAddr{IP: net.ParseIP(ip), Port: port})
// return udp, false, err
// }
//}
//
//func (v VirtualDevice) OnInvite(request sip.Request, user string) sip.Response {
// if len(rtpPackets) < 1 {
// return CreateResponseWithStatusCode(request, http.StatusInternalServerError)
// }
//
// offer, ssrc, speed, media, offerSetup, answerSetup, err := ParseGBSDP(request.Body())
// if err != nil {
// return CreateResponseWithStatusCode(request, http.StatusBadRequest)
// }
//
// stream := &MediaStream{}
// socket, tcp, err := CreateTransport(offer.Addr, int(media.Port), offerSetup, stream)
// if err != nil {
// return CreateResponseWithStatusCode(request, http.StatusBadRequest)
// }
//
// time := strings.Split(offer.Time, " ")
// if len(time) < 2 {
// return CreateResponseWithStatusCode(request, http.StatusBadRequest)
// }
//
// var ip string
// var port sip.Port
// var contactAddr string
// if v.sipClient.NatAddr != "" {
// contactAddr = v.sipClient.NatAddr
// } else {
// contactAddr = v.sipClient.ListenAddr
// }
//
// host, p, _ := net.SplitHostPort(contactAddr)
// ip = host
// atoi, _ := strconv.Atoi(p)
// port = sip.Port(atoi)
//
// contactAddress := &sip.Address{
// Uri: &sip.SipUri{
// FUser: sip.String{Str: user},
// FHost: ip,
// FPort: &port,
// },
// }
//
// answer := BuildSDP(user, offer.Session, ip, uint16(socket.ListenPort()), time[0], time[1], answerSetup, speed, ssrc)
// response := CreateResponseWithStatusCode(request, http.StatusOK)
// response.RemoveHeader("Contact")
// response.AppendHeader(contactAddress.AsContactHeader())
// response.AppendHeader(&SDPMessageType)
// response.SetBody(answer, true)
// setToTag(response)
//
// i, _ := strconv.Atoi(ssrc)
// stream.ssrc = uint32(i)
// stream.tcp = tcp
// stream.dialog = CreateDialogRequestFromAnswer(response, true, v.sipClient.Domain)
// callId, _ := response.CallID()
//
// {
// v.lock.Lock()
// defer v.lock.Unlock()
// v.streams[callId.Value()] = stream
// }
//
// // 设置网络断开回调
// stream.closedCB = func(sendBye bool) {
// if stream.dialog != nil {
// id, _ := stream.dialog.CallID()
// StreamManager.RemoveWithCallId(id.Value())
//
// {
// v.lock.Lock()
// delete(v.streams, id.Value())
// v.lock.Unlock()
// }
//
// if sendBye {
// bye := CreateRequestFromDialog(stream.dialog, sip.BYE)
// v.sipClient.ua.SendRequest(bye)
// }
//
// stream.dialog = nil
// }
//
// if stream.transport != nil {
// stream.transport.Close()
// stream.transport = nil
// }
// }
//
// stream.transport = socket
// stream.Start()
//
// // 绑定到StreamManager, bye请求才会找到设备回调
// streamId := GenerateStreamID(InviteTypePlay, v.sipClient.Username, user, "", "")
// s := Stream{ID: streamId, Dialog: stream.dialog}
// StreamManager.Add(&s)
//
// callID, _ := request.CallID()
// StreamManager.AddWithCallId(callID.Value(), &s)
// return response
//}
//
//func (v VirtualDevice) OnBye(request sip.Request) {
// id, _ := request.CallID()
// stream, ok := v.streams[id.Value()]
// if !ok {
// return
// }
//
// {
// // 此作用域内defer不会生效
// v.lock.Lock()
// delete(v.streams, id.Value())
// v.lock.Unlock()
// }
//
// stream.Close(false)
//}
//
//func (v VirtualDevice) Offline() {
// for _, stream := range v.streams {
// stream.Close(true)
// }
//
// v.streams = nil
//}
//
//type ClientConfig struct {
// DeviceIDPrefix string `json:"device_id_prefix"`
// ChannelIDPrefix string `json:"channel_id_prefix"`
// ServerID string `json:"server_id"`
// Domain string `json:"domain"`
// Password string `json:"password"`
// ListenAddr string `json:"listenAddr"`
// Count int `json:"count"`
// RawFilePath string `json:"rtp_over_tcp_raw_file_path"` // rtp over tcp源文件
//}
//
//func TestGBClient(t *testing.T) {
// configData, err := os.ReadFile("./client_benchmark_test_config.json")
// if err != nil {
// panic(err)
// }
//
// clientConfig := &ClientConfig{}
// if err = json.Unmarshal(configData, clientConfig); err != nil {
// panic(err)
// }
//
// rtpData, err := os.ReadFile(clientConfig.RawFilePath)
// if err != nil {
// println("读取rtp源文件失败 不能推流")
// } else {
// // 分割rtp包
// offset := 2
// length := len(rtpData)
// locks = make(map[uint32]*sync.RWMutex, 128)
// for rtpSize := 0; offset < length; offset += rtpSize + 2 {
// rtpSize = int(binary.BigEndian.Uint16(rtpData[offset-2:]))
// if length-offset < rtpSize {
// break
// }
//
// bytes := rtpData[offset : offset+rtpSize]
// ts := binary.BigEndian.Uint32(bytes[4:])
// // 每个相同时间戳共用一把互斥锁, 只允许同时一路流发送该时间戳内的rtp包, 保护ssrc被不同的流修改
// if _, ok := locks[ts]; !ok {
// locks[ts] = &sync.RWMutex{}
// }
//
// rtpPackets = append(rtpPackets, rtpData[offset-2:offset+rtpSize])
// }
// }
//
// println("========================================")
// println("源码地址: https://github.com/lkmio/gb-cms")
// println("视频来源于网络,如有侵权,请联系删除")
// println("========================================\r\n")
//
// time.Sleep(3 * time.Second)
//
// // 初始化UA配置, 防止SipServer使用时空指针
// Config = &Config_{}
//
// listenIP, listenPort, err := net.SplitHostPort(clientConfig.ListenAddr)
// if err != nil {
// panic(err)
// }
//
// atoi, err := strconv.Atoi(listenPort)
// if err != nil {
// panic(err)
// }
//
// server, err := StartSipServer("", listenIP, listenIP, atoi)
// if err != nil {
// panic(err)
// }
// DeviceChannelsManager = &DeviceChannels{
// channels: make(map[string][]*Channel, clientConfig.Count),
// }
//
// for i := 0; i < clientConfig.Count; i++ {
// deviceId := clientConfig.DeviceIDPrefix + fmt.Sprintf("%07d", i+1)
// channelId := clientConfig.ChannelIDPrefix + fmt.Sprintf("%07d", i+1)
// client := NewGBClient(deviceId, clientConfig.ServerID, clientConfig.Domain, "UDP", clientConfig.Password, 500, 40, server)
//
// device := VirtualDevice{client.(*Client), map[string]*MediaStream{}, &sync.Mutex{}}
// device.SetDeviceInfo(fmt.Sprintf("测试设备%d", i+1), "lkmio", "lkmio_gb", "dev-0.0.1")
//
// channel := &Channel{
// DeviceID: channelId,
// Name: "1",
// ParentID: deviceId,
// }
//
// DeviceManager.Add(device)
// DeviceChannelsManager.AddChannel(deviceId, channel)
//
// device.Start()
//
// device.SetOnRegisterHandler(func() {
// fmt.Printf(deviceId + " 注册成功\r\n")
// }, func() {
// fmt.Printf(deviceId + " 离线\r\n")
// device.Offline()
// })
// }
//
// for {
// time.Sleep(time.Second * 3)
// }
//}

View File

@@ -19,7 +19,6 @@ type Config_ struct {
MobilePositionInterval int `json:"mobile_position_interval"`
MobilePositionExpires int `json:"mobile_position_expires"`
MediaServer string `json:"media_server"`
Port []int `json:"port"` //语音广播/对讲需要的端口
AutoCloseOnIdle bool `json:"auto_close_on_idle"`
Redis struct {

View File

@@ -10,8 +10,6 @@
"mobile_position_interval": 10,
"media_server": "0.0.0.0:8080",
"port": [20030,20050],
"?auto_close_on_idle": "拉流空闲时, 立即关闭流",
"auto_close_on_idle": true,

14
db.go
View File

@@ -49,4 +49,18 @@ type GB28181DB interface {
DeleteStream(time int64) error
//QueryStream(pate int, size int)
// QueryForwardSink 查询转发流Sink
QueryForwardSink(stream StreamID, sink string) (*Sink, error)
QueryForwardSinks(stream StreamID) (map[string]*Sink, error)
// SaveForwardSink 保存转发流Sink
SaveForwardSink(stream StreamID, sink *Sink) error
DeleteForwardSink(stream StreamID, sink string) error
DeleteForwardSinks(stream StreamID) error
Del(key string) error
}

View File

@@ -22,6 +22,9 @@ const (
RedisKeyStreams = "streams" //// 保存所有推流端信息
RedisKeySinks = "sinks" //// 保存所有拉流端信息
RedisKeyStreamSinks = "%s_sinks" //// 某路流下所有的拉流端
RedisKeyDialogs = "streams"
RedisKeyForwardSinks = "forward_%s"
)
type RedisDB struct {
@@ -48,6 +51,10 @@ func DeviceChannelsKey(id string) string {
return fmt.Sprintf(RedisKeyDeviceChannels, id)
}
func ForwardSinksKey(id string) string {
return fmt.Sprintf(RedisKeyForwardSinks, id)
}
// GenerateChannelKey 使用设备号+通道号作为通道的主键,兼容通道号可能重复的情况
func GenerateChannelKey(device, channel string) ChannelKey {
return ChannelKey(fmt.Sprintf(RedisUniqueChannelID, device, channel))
@@ -555,7 +562,8 @@ func (r *RedisDB) SaveStream(stream *Stream) error {
return err
}
return executor.Key(RedisKeyStreams).ZAddWithNotExists(stream.CreateTime, data)
// return executor.Key(RedisKeyStreams).ZAddWithNotExists(stream.CreateTime, data)
return executor.Key(RedisKeyStreams).ZAdd(stream.CreateTime, data)
}
func (r *RedisDB) DeleteStream(time int64) error {
@@ -567,6 +575,89 @@ func (r *RedisDB) DeleteStream(time int64) error {
return executor.Key(RedisKeyStreams).ZDelWithScore(time)
}
func (r *RedisDB) QueryForwardSink(stream StreamID, sinkId string) (*Sink, error) {
executor, err := r.utils.CreateExecutor()
if err != nil {
return nil, err
}
data, err := executor.Key(ForwardSinksKey(string(stream))).HGet(sinkId)
if err != nil {
return nil, err
}
sink := &Sink{}
if err = json.Unmarshal(data, sink); err != nil {
return nil, err
}
return sink, nil
}
func (r *RedisDB) QueryForwardSinks(stream StreamID) (map[string]*Sink, error) {
executor, err := r.utils.CreateExecutor()
if err != nil {
return nil, err
}
entries, err := executor.Key(ForwardSinksKey(string(stream))).HGetAll()
if err != nil {
return nil, err
}
var sinks map[string]*Sink
if len(entries) > 0 {
sinks = make(map[string]*Sink, len(entries))
}
for _, entry := range entries {
sink := &Sink{}
if err = json.Unmarshal(entry, sink); err != nil {
return nil, err
}
sinks[sink.ID] = sink
}
return sinks, nil
}
func (r *RedisDB) SaveForwardSink(stream StreamID, sink *Sink) error {
executor, err := r.utils.CreateExecutor()
if err != nil {
return err
}
data, err := json.Marshal(sink)
if err != nil {
return err
}
return executor.Key(ForwardSinksKey(string(stream))).HSet(sink.ID, data)
}
func (r *RedisDB) DeleteForwardSink(stream StreamID, sinkId string) error {
executor, err := r.utils.CreateExecutor()
if err != nil {
return err
}
return executor.Key(ForwardSinksKey(string(stream))).HDel(sinkId)
}
func (r *RedisDB) Del(key string) error {
executor, err := r.utils.CreateExecutor()
if err != nil {
return err
}
return executor.Key(key).Del()
}
func (r *RedisDB) DeleteForwardSinks(stream StreamID) error {
return r.Del(ForwardSinksKey(string(stream)))
}
// OnExpires Redis设备ID到期回调
func (r *RedisDB) OnExpires(db int, id string) {
Sugar.Infof("设备心跳过期 device: %s", id)

108
dialogs.go Normal file
View File

@@ -0,0 +1,108 @@
package main
import (
"fmt"
"github.com/ghettovoice/gosip/sip"
"github.com/ghettovoice/gosip/sip/parser"
"sync"
)
type DialogManager[T any] struct {
lock sync.RWMutex
dialogs map[string]T
callIds map[string]T
}
func (d *DialogManager[T]) Add(id string, dialog T) (T, bool) {
d.lock.Lock()
defer d.lock.Unlock()
var old T
var ok bool
if old, ok = d.dialogs[id]; ok {
return old, false
}
d.dialogs[id] = dialog
return old, true
}
func (d *DialogManager[T]) AddWithCallId(id string, dialog T) bool {
d.lock.Lock()
defer d.lock.Unlock()
if _, ok := d.callIds[id]; ok {
return false
}
d.callIds[id] = dialog
return true
}
func (d *DialogManager[T]) Find(id string) T {
d.lock.RLock()
defer d.lock.RUnlock()
return d.dialogs[id]
}
func (d *DialogManager[T]) FindWithCallId(id string) T {
d.lock.RLock()
defer d.lock.RUnlock()
return d.callIds[id]
}
func (d *DialogManager[T]) Remove(id string) T {
d.lock.Lock()
defer d.lock.Unlock()
dialog := d.dialogs[id]
delete(d.dialogs, id)
return dialog
}
func (d *DialogManager[T]) RemoveWithCallId(id string) T {
d.lock.Lock()
defer d.lock.Unlock()
dialog := d.callIds[id]
delete(d.callIds, id)
return dialog
}
func (d *DialogManager[T]) All() []T {
d.lock.RLock()
defer d.lock.RUnlock()
var result []T
for _, v := range d.dialogs {
result = append(result, v)
}
return result
}
func (d *DialogManager[T]) PopAll() []T {
d.lock.Lock()
defer d.lock.Unlock()
var result []T
for _, v := range d.dialogs {
result = append(result, v)
}
d.dialogs = make(map[string]T)
return result
}
func UnmarshalDialog(dialog string) (sip.Request, error) {
packetParser := parser.NewPacketParser(logger)
message, err := packetParser.ParseMessage([]byte(dialog))
if err != nil {
return nil, err
} else if request := message.(sip.Request); request == nil {
return nil, fmt.Errorf("dialog message is not sip request")
} else {
return request, nil
}
}
func NewDialogManager[T any]() *DialogManager[T] {
return &DialogManager[T]{
dialogs: make(map[string]T),
callIds: make(map[string]T),
}
}

341
g711.js
View File

@@ -1,341 +0,0 @@
var SIGN_BIT = 0x80; /* Sign bit for a A-law byte. */
var QUANT_MASK = 0xf; /* Quantization field mask. */
var NSEGS = 0x8; /* Number of A-law segments. */
var SEG_SHIFT = 0x4; /* Left shift for segment number. */
var SEG_MASK = 0x70; /* Segment field mask. */
var seg_aend = new Int16Array([0x1F, 0x3F, 0x7F, 0xFF,0x1FF, 0x3FF, 0x7FF, 0xFFF]);
var seg_uend = new Int16Array([0x3F, 0x7F, 0xFF, 0x1FF,0x3FF, 0x7FF, 0xFFF, 0x1FFF]);
/* copy from CCITT G.711 specifications */
var _u2a = new Uint8Array([1, 1, 2, 2, 3, 3, 4, 4,
5, 5, 6, 6, 7, 7, 8, 8,
9, 10, 11, 12, 13, 14, 15, 16,
17, 18, 19, 20, 21, 22, 23, 24,
25, 27, 29, 31, 33, 34, 35, 36,
37, 38, 39, 40, 41, 42, 43, 44,
46, 48, 49, 50, 51, 52, 53, 54,
55, 56, 57, 58, 59, 60, 61, 62,
64, 65, 66, 67, 68, 69, 70, 71,
72, 73, 74, 75, 76, 77, 78, 79,
80, 82, 83, 84, 85, 86, 87, 88,
89, 90, 91, 92, 93, 94, 95, 96,
97, 98, 99, 100, 101, 102, 103, 104,
105, 106, 107, 108, 109, 110, 111, 112,
113, 114, 115, 116, 117, 118, 119, 120,
121, 122, 123, 124, 125, 126, 127, 128]);
/* A- to u-law conversions */
var _a2u = new Uint8Array([1, 3, 5, 7, 9, 11, 13, 15,
16, 17, 18, 19, 20, 21, 22, 23,
24, 25, 26, 27, 28, 29, 30, 31,
32, 32, 33, 33, 34, 34, 35, 35,
36, 37, 38, 39, 40, 41, 42, 43,
44, 45, 46, 47, 48, 48, 49, 49,
50, 51, 52, 53, 54, 55, 56, 57,
58, 59, 60, 61, 62, 63, 64, 64,
65, 66, 67, 68, 69, 70, 71, 72,
73, 74, 75, 76, 77, 78, 79, 80,
80, 81, 82, 83, 84, 85, 86, 87,
88, 89, 90, 91, 92, 93, 94, 95,
96, 97, 98, 99, 100, 101, 102, 103,
104, 105, 106, 107, 108, 109, 110, 111,
112, 113, 114, 115, 116, 117, 118, 119,
120, 121, 122, 123, 124, 125, 126, 127]);
function search( val, table, size)
{
var i;
for (i = 0; i < size; i++) {
if (val <= table[i])
return (i);
}
return (size);
}
function linear2alaw(pcm_val) /* 2's complement (16-bit range) */
{
var mask;
var seg;
var aval;
pcm_val = pcm_val >> 3;
if (pcm_val >= 0) {
mask = 0xD5; /* sign (7th) bit = 1 */
} else {
mask = 0x55; /* sign bit = 0 */
pcm_val = -pcm_val - 1;
}
/* Convert the scaled magnitude to segment number. */
seg = search(pcm_val, seg_aend, 8);
/* Combine the sign, segment, and quantization bits. */
if (seg >= 8) /* out of range, return maximum value. */
return (0x7F ^ mask);
else {
aval = seg << SEG_SHIFT;
if (seg < 2)
aval |= (pcm_val >> 1) & QUANT_MASK;
else
aval |= (pcm_val >> seg) & QUANT_MASK;
return (aval ^ mask);
}
}
/*
* alaw2linear() - Convert an A-law value to 16-bit linear PCM
*
*/
function alaw2linear(a_val){
var t;
var seg;
a_val ^= 0x55;
t = (a_val & QUANT_MASK) << 4;
seg = (a_val & SEG_MASK) >> SEG_SHIFT;
switch (seg) {
case 0:
t += 8;
break;
case 1:
t += 0x108;
break;
default:
t += 0x108;
t <<= seg - 1;
}
return ((a_val & SIGN_BIT) ? t : -t);
}
// #define BIAS (0x84) /* Bias for linear code. */
// #define CLIP 8159
var BIAS = 0x84;
var CLIP = 8159;
// /*
// * linear2ulaw() - Convert a linear PCM value to u-law
// *
// * In order to simplify the encoding process, the original linear magnitude
// * is biased by adding 33 which shifts the encoding range from (0 - 8158) to
// * (33 - 8191). The result can be seen in the following encoding table:
// *
// * Biased Linear Input Code Compressed Code
// * ------------------------ ---------------
// * 00000001wxyza 000wxyz
// * 0000001wxyzab 001wxyz
// * 000001wxyzabc 010wxyz
// * 00001wxyzabcd 011wxyz
// * 0001wxyzabcde 100wxyz
// * 001wxyzabcdef 101wxyz
// * 01wxyzabcdefg 110wxyz
// * 1wxyzabcdefgh 111wxyz
// *
// * Each biased linear code has a leading 1 which identifies the segment
// * number. The value of the segment number is equal to 7 minus the number
// * of leading 0's. The quantization interval is directly available as the
// * four bits wxyz. * The trailing bits (a - h) are ignored.
// *
// * Ordinarily the complement of the resulting code word is used for
// * transmission, and so the code word is complemented before it is returned.
// *
// * For further information see John C. Bellamy's Digital Telephony, 1982,
// * John Wiley & Sons, pps 98-111 and 472-476.
function linear2ulaw(pcm_val) /* 2's complement (16-bit range) */
{
var mask;
var seg;
var uval;
/* Get the sign and the magnitude of the value. */
pcm_val = pcm_val >> 2;
if (pcm_val < 0) {
pcm_val = -pcm_val;
mask = 0x7F;
} else {
mask = 0xFF;
}
if ( pcm_val > CLIP ) pcm_val = CLIP; /* clip the magnitude */
pcm_val += (BIAS >> 2);
/* Convert the scaled magnitude to segment number. */
seg = search(pcm_val, seg_uend, 8);
/*
* Combine the sign, segment, quantization bits;
* and complement the code word.
*/
if (seg >= 8) /* out of range, return maximum value. */
return (0x7F ^ mask);
else {
uval = (seg << 4) | ((pcm_val >> (seg + 1)) & 0xF);
return (uval ^ mask);
}
}
// /*
// * ulaw2linear() - Convert a u-law value to 16-bit linear PCM
// *
// * First, a biased linear code is derived from the code word. An unbiased
// * output can then be obtained by subtracting 33 from the biased code.
// *
// * Note that this function expects to be passed the complement of the
// * original code word. This is in keeping with ISDN conventions.
// */
function ulaw2linear(u_val)
{
var t;
/* Complement to obtain normal u-law value. */
u_val = ~u_val;
/*
* Extract and bias the quantization bits. Then
* shift up by the segment number and subtract out the bias.
*/
t = ((u_val & QUANT_MASK) << 3) + BIAS;
t <<= (u_val & SEG_MASK) >> SEG_SHIFT;
return ((u_val & SIGN_BIT) ? (BIAS - t) : (t - BIAS));
}
// /* A-law to u-law conversion */
function alaw2ulaw(aval)
{
aval &= 0xff;
return ((aval & 0x80) ? (0xFF ^ _a2u[aval ^ 0xD5]) :
(0x7F ^ _a2u[aval ^ 0x55]));
}
/* u-law to A-law conversion */
function ulaw2alaw(uval)
{
uval &= 0xff;
return ((uval & 0x80) ? (0xD5 ^ (_u2a[0xFF ^ uval] - 1)) :
(0x55 ^ (_u2a[0x7F ^ uval] - 1)));
}
// unsigned char linear_to_alaw[65536];
// unsigned char linear_to_ulaw[65536];
var short_index = new Int16Array(65536);
var linear_to_alaw = new Uint8Array(65536);
var linear_to_ulaw = new Uint8Array(65536);
// /* 16384 entries per table (8 bit) */
// unsigned short alaw_to_linear[256];
// unsigned short ulaw_to_linear[256];
var alaw_to_linear = new Uint8Array(256);
var ulaw_to_linear = new Uint8Array(256);
function build_linear_to_xlaw_table(linear_to_xlaw,linear2xlaw)
{
var i;
for (i=0; i<65536;i++){
var v = linear2xlaw(short_index[i]);
linear_to_xlaw[i] = v;
}
}
function build_xlaw_to_linear_table(xlaw_to_linear,xlaw2linear)
{
var i;
for (i=0; i<256;i++){
xlaw_to_linear[i] = xlaw2linear(i);
}
}
function pcm16_to_xlaw(linear_to_xlaw, src_length,src_samples,dst_samples)
{
var i;
var s_samples;
s_samples = src_samples;
for (i=0; i < src_length / 2; i++)
{
dst_samples[i] = linear_to_xlaw[s_samples[i]];
}
}
function xlaw_to_pcm16(xlaw_to_linear, src_length,src_samples, dst_samples)
{
var i;
var s_samples;
var d_samples;
s_samples = src_samples;
d_samples = dst_samples;
for (i=0; i < src_length; i++)
{
d_samples[i] = xlaw_to_linear[s_samples[i]];
}
}
function pcm16_to_alaw(src_length, src_samples, dst_samples)
{
pcm16_to_xlaw(linear_to_alaw, src_length, src_samples, dst_samples);
}
function pcm16_to_ulaw(src_length, src_samples, dst_samples)
{
pcm16_to_xlaw(linear_to_ulaw, src_length, src_samples, dst_samples);
}
function alaw_to_pcm16(src_length, src_samples, dst_samples)
{
xlaw_to_pcm16(alaw_to_linear, src_length, src_samples, dst_samples);
}
function ulaw_to_pcm16(src_length, src_samples, dst_samples)
{
xlaw_to_pcm16(ulaw_to_linear, src_length, src_samples, dst_samples);
}
function pcm16_alaw_tableinit()
{
build_linear_to_xlaw_table(linear_to_alaw, linear2alaw);
}
function pcm16_ulaw_tableinit()
{
build_linear_to_xlaw_table(linear_to_ulaw, linear2ulaw);
}
function alaw_pcm16_tableinit()
{
build_xlaw_to_linear_table(alaw_to_linear, alaw2linear);
}
function ulaw_pcm16_tableinit()
{
build_xlaw_to_linear_table(ulaw_to_linear, ulaw2linear);
}
for(var i =0; i < 65536;i++){
short_index[i] = i;
}
pcm16_alaw_tableinit();
pcm16_ulaw_tableinit();
alaw_pcm16_tableinit();
ulaw_pcm16_tableinit();

7
go.mod
View File

@@ -18,6 +18,7 @@ require (
github.com/gobwas/httphead v0.1.0 // indirect
github.com/gobwas/pool v0.2.1 // indirect
github.com/gobwas/ws v1.4.0 // indirect
github.com/lkmio/transport v0.0.0-20250417030743-a4180637cd01 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d // indirect
@@ -35,12 +36,6 @@ require (
github.com/gorilla/mux v1.8.1
github.com/gorilla/websocket v1.5.3
github.com/lkmio/avformat v0.0.0
github.com/lkmio/rtp v0.0.0
github.com/lkmio/transport v0.0.0
)
replace github.com/lkmio/avformat => ../avformat
replace github.com/lkmio/rtp => ../rtp
replace github.com/lkmio/transport => ../transport

31
live.go
View File

@@ -8,16 +8,19 @@ import (
"github.com/ghettovoice/gosip/sip"
"math"
"net"
"net/http"
"strconv"
"time"
)
type InviteType int
type InviteType string
const (
InviteTypeLive = InviteType(0)
InviteTypePlayback = InviteType(1)
InviteTypeDownload = InviteType(2)
InviteTypePlay = InviteType("play")
InviteTypePlayback = InviteType("playback")
InviteTypeDownload = InviteType("download")
InviteTypeBroadcast = InviteType("broadcast")
InviteTypeTalk = InviteType("talk")
)
func (i *InviteType) SessionName2Type(name string) {
@@ -30,16 +33,15 @@ func (i *InviteType) SessionName2Type(name string) {
break
//case "play":
default:
*i = InviteTypeLive
*i = InviteTypePlay
break
}
}
func (d *Device) StartStream(inviteType InviteType, streamId StreamID, channelId, startTime, stopTime, setup string, speed int, sync bool) (*Stream, error) {
stream := &Stream{
ID: streamId,
ForwardStreamSinks: map[string]*Sink{},
CreateTime: time.Now().UnixMilli(),
ID: streamId,
CreateTime: time.Now().UnixMilli(),
}
// 先添加占位置, 防止重复请求
@@ -59,7 +61,7 @@ func (d *Device) StartStream(inviteType InviteType, streamId StreamID, channelId
// 等待流媒体服务发送推流通知
wait := func() bool {
ok := stream.WaitForPublishEvent(10)
ok := http.StatusOK == stream.WaitForPublishEvent(10)
if !ok {
Sugar.Infof("收流超时 发送bye请求...")
CloseStream(streamId, true)
@@ -76,7 +78,9 @@ func (d *Device) StartStream(inviteType InviteType, streamId StreamID, channelId
stream.urls = urls
// 保存到数据库
go DB.SaveStream(stream)
if DB != nil {
go DB.SaveStream(stream)
}
return stream, nil
}
@@ -93,8 +97,7 @@ func (d *Device) Invite(inviteType InviteType, streamId StreamID, channelId, sta
}()
// 告知流媒体服务创建国标源, 返回收流地址信息
ssrcValue, _ := strconv.Atoi(ssrc)
ip, port, urls, ssrc, msErr := CreateGBSource(string(streamId), setup, uint32(ssrcValue), int(inviteType))
ip, port, urls, ssrc, msErr := CreateGBSource(string(streamId), setup, "", string(inviteType))
if msErr != nil {
Sugar.Errorf("创建GBSource失败 err: %s", msErr.Error())
return nil, nil, msErr
@@ -172,8 +175,8 @@ func (d *Device) Invite(inviteType InviteType, streamId StreamID, channelId, sta
return dialogRequest, urls, nil
}
func (d *Device) Live(streamId StreamID, channelId, setup string) (sip.Request, []string, error) {
return d.Invite(InviteTypeLive, streamId, channelId, "", "", setup, 0)
func (d *Device) Play(streamId StreamID, channelId, setup string) (sip.Request, []string, error) {
return d.Invite(InviteTypePlay, streamId, channelId, "", "", setup, 0)
}
func (d *Device) Playback(streamId StreamID, channelId, startTime, stopTime, setup string) (sip.Request, []string, error) {

203
main.go
View File

@@ -2,18 +2,15 @@ package main
import (
"encoding/json"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/transport"
"go.uber.org/zap/zapcore"
"net"
"strconv"
)
var (
Config *Config_
SipUA SipServer
TransportManager transport.Manager
DB GB28181DB
Config *Config_
SipUA SipServer
DB GB28181DB
)
func init() {
@@ -29,183 +26,6 @@ func init() {
InitLogger(zapcore.Level(logConfig.Level), logConfig.Name, logConfig.MaxSize, logConfig.MaxBackup, logConfig.MaxAge, logConfig.Compress)
}
func startPlatformDevices() {
platforms, err := DB.LoadPlatforms()
if err != nil {
Sugar.Errorf("查询级联设备失败 err: %s", err.Error())
return
}
streams := StreamManager.All()
for _, record := range platforms {
platform, err := NewGBPlatform(record, SipUA)
// 都入库了不允许失败, 程序有BUG, 及时修复
utils.Assert(err == nil)
utils.Assert(PlatformManager.AddPlatform(platform))
if err := DB.UpdatePlatformStatus(record.SeverID, OFF); err != nil {
Sugar.Infof("更新级联设备状态失败 err: %s device: %s", err.Error(), record.SeverID)
}
// 恢复级联会话
for _, stream := range streams {
sinks := stream.GetForwardStreamSinks()
for _, sink := range sinks {
if sink.ID != record.SeverID {
continue
}
callId, _ := sink.Dialog.CallID()
channelCallId, _ := stream.Dialog.CallID()
platform.AddStream(callId.Value(), channelCallId.Value())
}
}
platform.Start()
}
}
func recoverStreams() ([]*Stream, []*Sink) {
// 查询数据库中的流记录
// 查询流媒体服务器中的记录
// 合并两份记录, 以流媒体服务器中的为准。如果流记录数量不一致(只会时数据库中的记录数大于或等于流媒体中的记录数), 释放过期的会话.
// source id和stream id目前都是同一个id
streams, err := DB.LoadStreams()
if err != nil {
Sugar.Errorf("恢复推流失败, 查询数据库发生错误. err: %s", err.Error())
return nil, nil
} else if len(streams) < 1 {
return nil, nil
}
sources, err := QuerySourceList()
if err != nil {
// 流媒体服务器崩了, 存在的所有流都无效, 删除全部记录
Sugar.Warnf("恢复推流失败, 查询推流源列表发生错误, 删除数据库中的推拉流会话记录. err: %s", err.Error())
for _, stream := range streams {
DB.DeleteStream(stream.CreateTime)
}
return nil, nil
}
sourceSinks := make(map[string][]string, len(sources))
for _, source := range sources {
// 跳过非国标流
if "28181" != source.Protocol {
continue
}
// 查询级联转发sink
sinks, err := QuerySinkList(source.ID)
if err != nil {
Sugar.Warnf("查询拉流列表发生 err: %s", err.Error())
continue
}
stream, ok := streams[source.ID]
utils.Assert(ok)
stream.SinkCount = int32(len(sinks))
var forwardSinks []string
for _, sink := range sinks {
if "gb_stream_forward" == sink.Protocol {
forwardSinks = append(forwardSinks, sink.ID)
}
}
sourceSinks[source.ID] = forwardSinks
}
var closedStreams []*Stream
var closedSinks []*Sink
for _, stream := range streams {
forwardSinks, ok := sourceSinks[string(stream.ID)]
if !ok {
Sugar.Infof("删除过期的推流会话 stream: %s", stream.ID)
closedStreams = append(closedStreams, stream)
continue
}
Sugar.Infof("恢复推流会话 stream: %s", stream.ID)
var invalidDialogs []string
for callId, sink := range stream.ForwardStreamSinks {
var exist bool
for _, id := range forwardSinks {
if id == sink.ID {
exist = true
break
}
}
if !exist {
Sugar.Infof("删除过期的级联转发会话 stream: %s sink: %s callId: %s", stream.ID, sink.ID, callId)
}
invalidDialogs = append(invalidDialogs, callId)
}
for _, id := range invalidDialogs {
sink := stream.RemoveForwardStreamSink(id)
closedSinks = append(closedSinks, sink)
}
StreamManager.Add(stream)
callId, _ := stream.Dialog.CallID()
StreamManager.AddWithCallId(callId.Value(), stream)
}
return closedStreams, closedSinks
}
func updateDevicesStatus() {
onlineDevices, err := DB.LoadOnlineDevices()
if err != nil {
panic(err)
}
devices, err := DB.LoadDevices()
if err != nil {
panic(err)
} else if len(devices) > 0 {
for key, device := range devices {
status := OFF
if _, ok := onlineDevices[key]; ok {
status = ON
}
// 根据通道在线状态,统计通道总数和离线数量
var total int
var online int
channels, _, err := DB.QueryChannels(key, 1, 0xFFFFFFFF)
if err != nil {
Sugar.Errorf("查询通道列表失败 err: %s device: %s", err.Error(), key)
} else {
total = len(channels)
for _, channel := range channels {
if channel.Online() {
online++
}
}
}
device.ChannelsTotal = total
device.ChannelsOnline = online
device.Status = status
if err = DB.SaveDevice(device); err != nil {
Sugar.Errorf("更新设备状态失败 device: %s status: %s", key, status)
continue
}
DeviceManager.Add(device)
}
}
}
func main() {
config, err := ParseConfig("./config.json")
if err != nil {
@@ -216,16 +36,19 @@ func main() {
indent, _ := json.MarshalIndent(Config, "", "\t")
Sugar.Infof("server config:\r\n%s", indent)
// 如果不想依赖数据库, 注释掉, 使用内存管理会话, 重启后会话会丢失
DB = NewRedisDB(Config.Redis.Addr, Config.Redis.Password)
// 查询在线设备, 更新设备在线状态
updateDevicesStatus()
// 从数据库中恢复会话
var streams []*Stream
var sinks []*Sink
if DB != nil {
// 查询在线设备, 更新设备在线状态
updateDevicesStatus()
// 恢复国标推流会话
streams, sinks := recoverStreams()
// 设置语音广播端口
TransportManager = transport.NewTransportManager(Config.ListenIP, uint16(Config.Port[0]), uint16(Config.Port[1]))
// 恢复国标推流会话
streams, sinks = recoverStreams()
}
// 启动sip server
server, err := StartSipServer(config.SipId, config.ListenIP, config.PublicIP, config.SipPort)

View File

@@ -4,7 +4,9 @@ import (
"bytes"
"encoding/json"
"fmt"
"net"
"net/http"
"strconv"
"time"
)
@@ -26,6 +28,24 @@ type SinkDetails struct {
Tracks []string `json:"tracks"` // 每路流编码器ID
}
type SDP struct {
SessionName string `json:"session_name,omitempty"` // play/download/playback/talk/broadcast
Addr string `json:"addr,omitempty"` // 连接地址
SSRC string `json:"ssrc,omitempty"`
Setup string `json:"setup,omitempty"` // active/passive
Transport string `json:"transport,omitempty"` // tcp/udp
}
type SourceSDP struct {
Source string `json:"source"` // GetSourceID
SDP
}
type GBOffer struct {
SourceSDP
AnswerSetup string `json:"answer_setup,omitempty"` // 希望应答的连接方式
}
func Send(path string, body interface{}) (*http.Response, error) {
url := fmt.Sprintf("http://%s/%s", Config.MediaServer, path)
@@ -47,29 +67,24 @@ func Send(path string, body interface{}) (*http.Response, error) {
return client.Do(request)
}
func CreateGBSource(id, setup string, ssrc uint32, inviteType int) (string, uint16, []string, string, error) {
v := &struct {
Source string `json:"source"`
Setup string `json:"setup"`
SSRC uint32 `json:"ssrc"`
Type int `json:"type"`
}{
func CreateGBSource(id, setup string, ssrc string, sessionName string) (string, uint16, []string, string, error) {
v := &SourceSDP{
Source: id,
Setup: setup,
SSRC: ssrc,
Type: inviteType,
SDP: SDP{
Setup: setup,
SSRC: ssrc,
SessionName: sessionName,
},
}
response, err := Send("api/v1/gb28181/source/create", v)
response, err := Send("api/v1/gb28181/offer/create", v)
if err != nil {
return "", 0, nil, "", err
}
data := &Response[struct {
IP string `json:"ip"`
Port uint16 `json:"port,omitempty"`
SDP
Urls []string `json:"urls"`
SSRC string `json:"ssrc,omitempty"`
}]{}
if err = DecodeJSONBody(response.Body, data); err != nil {
@@ -78,19 +93,24 @@ func CreateGBSource(id, setup string, ssrc uint32, inviteType int) (string, uint
return "", 0, nil, "", fmt.Errorf(data.Msg)
}
return data.Data.IP, data.Data.Port, data.Data.Urls, data.Data.SSRC, nil
host, p, err := net.SplitHostPort(data.Data.Addr)
if err != nil {
return "", 0, nil, "", err
}
port, err := strconv.Atoi(p)
return host, uint16(port), data.Data.Urls, data.Data.SSRC, err
}
func ConnectGBSource(id, addr string) error {
v := &struct {
Source string `json:"source"` //SourceID
RemoteAddr string `json:"remote_addr"`
}{
Source: id,
RemoteAddr: addr,
v := &SourceSDP{
Source: id,
SDP: SDP{
Addr: addr,
},
}
_, err := Send("api/v1/gb28181/source/connect", v)
_, err := Send("api/v1/gb28181/answer/set", v)
return err
}
@@ -105,28 +125,28 @@ func CloseSource(id string) error {
return err
}
func AddForwardStreamSink(id, serverAddr, setup string, ssrc uint32) (ip string, port uint16, sinkId string, err error) {
v := struct {
Source string `json:"source"`
Addr string `json:"addr"`
Setup string `json:"setup"`
SSRC uint32 `json:"ssrc"`
}{
Source: id,
Addr: serverAddr,
Setup: setup,
SSRC: ssrc,
func CreateAnswer(id, addr, offerSetup, answerSetup, ssrc, sessionName string) (string, uint16, string, error) {
offer := &GBOffer{
SourceSDP: SourceSDP{
Source: id,
SDP: SDP{
Addr: addr,
Setup: offerSetup,
SSRC: ssrc,
SessionName: sessionName,
},
},
AnswerSetup: answerSetup,
}
response, err := Send("api/v1/gb28181/forward", v)
response, err := Send("api/v1/gb28181/answer/create", offer)
if err != nil {
return "", 0, "", err
}
data := &Response[struct {
Sink string `json:"sink"`
IP string `json:"ip"`
Port uint16 `json:"port"`
Addr string `json:"addr"`
}]{}
if err = DecodeJSONBody(response.Body, data); err != nil {
@@ -135,7 +155,13 @@ func AddForwardStreamSink(id, serverAddr, setup string, ssrc uint32) (ip string,
return "", 0, "", fmt.Errorf(data.Msg)
}
return data.Data.IP, data.Data.Port, data.Data.Sink, nil
host, p, err := net.SplitHostPort(data.Data.Addr)
if err != nil {
return "", 0, "", err
}
port, _ := strconv.Atoi(p)
return host, uint16(port), data.Data.Sink, nil
}
func CloseSink(sourceId string, sinkId string) {

View File

@@ -3,8 +3,6 @@ package main
import (
"fmt"
"github.com/ghettovoice/gosip/sip"
"golang.org/x/text/encoding/simplifiedchinese"
"golang.org/x/text/transform"
"net"
"strconv"
"strings"
@@ -64,10 +62,11 @@ func BuildMessageRequest(from, fromRealm, to, toAddr, transport, body string) (s
body = XmlHeaderGBK + body
}
gbkBody, _, err := transform.String(simplifiedchinese.GBK.NewEncoder(), body)
if err != nil {
panic(err)
}
//gbkBody, _, err := transform.String(simplifiedchinese.GBK.NewEncoder(), body)
//if err != nil {
// panic(err)
//}
gbkBody := body
builder := NewRequestBuilder(sip.MESSAGE, from, fromRealm, to, toAddr, transport)
builder.SetContentType(&XmlMessageType)

View File

@@ -6,7 +6,6 @@ import (
"github.com/lkmio/avformat/utils"
"net/http"
"net/netip"
"strconv"
"strings"
"sync"
)
@@ -26,22 +25,22 @@ type GBPlatformRecord struct {
type GBPlatform struct {
*Client
lock sync.Mutex
streams map[string]string // 上级会话的callId关联到实际推流通道的callId
lock sync.Mutex
sinks map[string]StreamID // 保存级联转发的sink, 方便离线的时候关闭sink
}
func (g *GBPlatform) AddStream(callId string, channelCallId string) {
func (g *GBPlatform) addSink(callId string, stream StreamID) {
g.lock.Lock()
defer g.lock.Unlock()
g.streams[callId] = channelCallId
g.sinks[callId] = stream
}
func (g *GBPlatform) removeStream(callId string) string {
func (g *GBPlatform) removeSink(callId string) StreamID {
g.lock.Lock()
defer g.lock.Unlock()
channelCallId := g.streams[callId]
delete(g.streams, callId)
return channelCallId
stream := g.sinks[callId]
delete(g.sinks, callId)
return stream
}
// OnBye 被上级挂断
@@ -52,14 +51,8 @@ func (g *GBPlatform) OnBye(request sip.Request) {
// CloseStream 关闭级联会话
func (g *GBPlatform) CloseStream(callId string, bye, ms bool) {
channelCallId := g.removeStream(callId)
stream := StreamManager.FindWithCallId(channelCallId)
if stream == nil {
Sugar.Errorf("关闭级联转发sink失败, 找不到stream. callid: %s", callId)
return
}
sink := stream.RemoveForwardStreamSink(callId)
_ = g.removeSink(callId)
sink := RemoveForwardSinkWithCallId(callId)
if sink == nil {
Sugar.Errorf("关闭级联转发sink失败, 找不到sink. callid: %s", callId)
return
@@ -73,10 +66,11 @@ func (g *GBPlatform) CloseStreams(bye, ms bool) {
var callIds []string
g.lock.Lock()
for k := range g.streams {
for k := range g.sinks {
callIds = append(callIds, k)
}
g.sinks = make(map[string]StreamID)
g.lock.Unlock()
for _, id := range callIds {
@@ -122,15 +116,15 @@ func (g *GBPlatform) OnInvite(request sip.Request, user string) sip.Response {
var inviteType InviteType
inviteType.SessionName2Type(strings.ToLower(parse.Session))
switch inviteType {
case InviteTypeLive:
streamId = GenerateStreamId(InviteTypeLive, channel.ParentID, user, "", "")
case InviteTypePlay:
streamId = GenerateStreamID(InviteTypePlay, channel.ParentID, user, "", "")
break
case InviteTypePlayback:
// 级联下载和回放不限制路数,也不共享流
streamId = GenerateStreamId(InviteTypePlayback, channel.ParentID, user, time[0], time[1]) + StreamID("."+utils.RandStringBytes(10))
streamId = GenerateStreamID(InviteTypePlayback, channel.ParentID, user, time[0], time[1]) + StreamID("."+utils.RandStringBytes(10))
break
case InviteTypeDownload:
streamId = GenerateStreamId(InviteTypeDownload, channel.ParentID, user, time[0], time[1]) + StreamID("."+utils.RandStringBytes(10))
streamId = GenerateStreamID(InviteTypeDownload, channel.ParentID, user, time[0], time[1]) + StreamID("."+utils.RandStringBytes(10))
break
}
@@ -144,8 +138,7 @@ func (g *GBPlatform) OnInvite(request sip.Request, user string) sip.Response {
}
}
ssrcInt, _ := strconv.Atoi(ssrc)
ip, port, sinkID, err := AddForwardStreamSink(string(streamId), addr, offerSetup, uint32(ssrcInt))
ip, port, sinkID, err := CreateAnswer(string(streamId), addr, offerSetup, answerSetup, ssrc, string(inviteType))
if err != nil {
Sugar.Errorf("级联转发失败,向流媒体服务添加转发Sink失败 err: %s", err.Error())
@@ -166,14 +159,12 @@ func (g *GBPlatform) OnInvite(request sip.Request, user string) sip.Response {
setToTag(response)
// 添加级联转发流
callID, _ := request.CallID()
stream.AddForwardStreamSink(callID.Value(), &Sink{
AddForwardSink(streamId, &Sink{
ID: sinkID,
Stream: streamId,
ServerID: g.SeverID,
Dialog: g.CreateDialogRequestFromAnswer(response, true)},
)
Protocol: "gb_cascaded_forward",
Dialog: g.CreateDialogRequestFromAnswer(response, true)})
return response
}
@@ -221,5 +212,5 @@ func NewGBPlatform(record *GBPlatformRecord, ua SipServer) (*GBPlatform, error)
}
gbClient := NewGBClient(record.Username, record.SeverID, record.ServerAddr, record.Transport, record.Password, record.RegisterExpires, record.KeepAliveInterval, ua)
return &GBPlatform{Client: gbClient.(*Client), streams: make(map[string]string, 8)}, nil
return &GBPlatform{Client: gbClient.(*Client), sinks: make(map[string]StreamID, 8)}, nil
}

195
recover.go Normal file
View File

@@ -0,0 +1,195 @@
package main
import "github.com/lkmio/avformat/utils"
// 启动级联设备
func startPlatformDevices() {
platforms, err := DB.LoadPlatforms()
if err != nil {
Sugar.Errorf("查询级联设备失败 err: %s", err.Error())
return
}
//streams := StreamManager.All()
for _, record := range platforms {
platform, err := NewGBPlatform(record, SipUA)
// 都入库了不允许失败, 程序有BUG, 及时修复
utils.Assert(err == nil)
utils.Assert(PlatformManager.AddPlatform(platform))
if err := DB.UpdatePlatformStatus(record.SeverID, OFF); err != nil {
Sugar.Infof("更新级联设备状态失败 err: %s device: %s", err.Error(), record.SeverID)
}
// 恢复级联会话
// 不删会话能正常通信
//for _, stream := range streams {
// sinks := stream.GetForwardStreamSinks()
// for _, sink := range sinks {
// if sink.ID != record.SeverID {
// continue
// }
//
// callId, _ := sink.Dialog.CallID()
// channelCallId, _ := stream.Dialog.CallID()
// platform.addSink(callId.Value(), channelCallId.Value())
// }
//}
platform.Start()
}
}
func closeStream(stream *Stream) {
DB.DeleteStream(stream.CreateTime)
// 删除转发sink
DB.DeleteForwardSinks(stream.ID)
}
// 返回需要关闭的推流源和转流Sink
func recoverStreams() ([]*Stream, []*Sink) {
// 比较数据库和流媒体服务器中的流会话, 以流媒体服务器中的为准, 释放过期的会话
// source id和stream id目前都是同一个id
dbStreams, err := DB.LoadStreams()
if err != nil {
Sugar.Errorf("恢复推流失败, 查询数据库发生错误. err: %s", err.Error())
return nil, nil
} else if len(dbStreams) < 1 {
return nil, nil
}
var closedStreams []*Stream
var closedSinks []*Sink
// 查询流媒体服务器中的推流源列表
sources, err := QuerySourceList()
if err != nil {
// 流媒体服务器崩了, 存在的所有记录都无效, 全部删除
Sugar.Warnf("恢复推流失败, 查询推流源列表发生错误, 删除数据库中的所有记录. err: %s", err.Error())
for _, stream := range dbStreams {
closedStreams = append(closedStreams, stream)
}
return closedStreams, nil
}
// 查询推流源下所有的转发sink列表
msStreamSinks := make(map[string]map[string]string, len(sources))
for _, source := range sources {
// 跳过非国标流
if "28181" != source.Protocol && "gb_talk" != source.Protocol {
continue
}
// 查询转发sink
sinks, err := QuerySinkList(source.ID)
if err != nil {
Sugar.Warnf("查询拉流列表发生 err: %s", err.Error())
continue
}
stream, ok := dbStreams[source.ID]
if !ok {
Sugar.Warnf("流媒体中的流不存在于数据库中 source: %s", source.ID)
continue
}
stream.SinkCount = int32(len(sinks))
forwardSinks := make(map[string]string, len(sinks))
for _, sink := range sinks {
if "gb_cascaded_forward" == sink.Protocol || "gb_talk_forward" == sink.Protocol {
forwardSinks[sink.ID] = ""
}
}
msStreamSinks[source.ID] = forwardSinks
}
// 遍历数据库中的流会话, 比较是否存在于流媒体服务器中, 不存在则删除
for _, stream := range dbStreams {
// 如果stream不存在于流媒体服务器中, 则删除
msSinks, ok := msStreamSinks[string(stream.ID)]
if !ok {
Sugar.Infof("删除过期的推流会话 stream: %s", stream.ID)
closedStreams = append(closedStreams, stream)
continue
}
// 查询stream下的转发sink列表
dbSinks, err := DB.QueryForwardSinks(stream.ID)
if err != nil {
Sugar.Errorf("查询级联转发sink列表失败 err: %s", err.Error())
}
// 遍历数据库中的sink, 如果不存在于流媒体服务器中, 则删除
for _, sink := range dbSinks {
_, ok := msSinks[sink.ID]
if ok {
// 恢复转发sink
AddForwardSink(sink.Stream, sink)
if sink.Protocol == "gb_talk_forward" {
SinkManager.AddWithSinkStreamId(sink)
}
} else {
Sugar.Infof("删除过期的级联转发会话 stream: %s sink: %s", stream.ID, sink.ID)
closedSinks = append(closedSinks, sink)
}
}
Sugar.Infof("恢复推流会话 stream: %s", stream.ID)
StreamManager.Add(stream)
if stream.Dialog != nil {
callId, _ := stream.Dialog.CallID()
StreamManager.AddWithCallId(callId.Value(), stream)
}
}
return closedStreams, closedSinks
}
// 更新设备的在线状态
func updateDevicesStatus() {
onlineDevices, err := DB.LoadOnlineDevices()
if err != nil {
panic(err)
}
devices, err := DB.LoadDevices()
if err != nil {
panic(err)
} else if len(devices) > 0 {
for key, device := range devices {
status := OFF
if _, ok := onlineDevices[key]; ok {
status = ON
}
// 根据通道在线状态,统计通道总数和离线数量
var total int
var online int
channels, _, err := DB.QueryChannels(key, 1, 0xFFFFFFFF)
if err != nil {
Sugar.Errorf("查询通道列表失败 err: %s device: %s", err.Error(), key)
} else {
total = len(channels)
for _, channel := range channels {
if channel.Online() {
online++
}
}
}
device.ChannelsTotal = total
device.ChannelsOnline = online
device.Status = status
if err = DB.SaveDevice(device); err != nil {
Sugar.Errorf("更新设备状态失败 device: %s status: %s", key, status)
continue
}
DeviceManager.Add(device)
}
}
}

14
sink.go
View File

@@ -8,12 +8,16 @@ import (
// Sink 国标级联转发流
type Sink struct {
ID string `json:"id"` // 流媒体服务器中的SinkID
Stream StreamID `json:"stream"` // 所属的stream id
Protocol string `json:"protocol,omitempty"` // 拉流协议, 目前只保存"gb_stream_forward"
Dialog sip.Request `json:"dialog,omitempty"` // 级联时, 与上级的Invite会话
ServerID string `json:"server_id"` // 级联设备的上级ID
ID string `json:"id"` // 流媒体服务器中的sink id
Stream StreamID `json:"stream"` // 推流ID
SinkStream StreamID `json:"sink_stream"` // 广播使用, 每个广播设备的唯一ID
Protocol string `json:"protocol,omitempty"` // 转发流协议, gb_cascaded_forward/gb_talk_forward
Dialog sip.Request `json:"dialog,omitempty"`
ServerID string `json:"server_id,omitempty"` // 级联设备的上级ID
CreateTime int64 `json:"create_time"`
SetupType SetupType // 转发类型
StreamWaiting
}
// Close 关闭级联会话. 是否向上级发送bye请求, 是否通知流媒体服务器发送删除sink

274
sink_manager.go Normal file
View File

@@ -0,0 +1,274 @@
package main
import "sync"
var (
SinkManager = NewSinkManager()
)
type sinkManager struct {
lock sync.RWMutex
streamSinks map[StreamID]map[string]*Sink // 推流id->sinks(sinkId->sink)
callIds map[string]*Sink // callId->sink
sinkStreamIds map[StreamID]*Sink // sinkStreamId->sink, 关联广播sink
}
func (s *sinkManager) Add(sink *Sink) bool {
s.lock.Lock()
defer s.lock.Unlock()
streamSinks, ok := s.streamSinks[sink.Stream]
if !ok {
streamSinks = make(map[string]*Sink)
s.streamSinks[sink.Stream] = streamSinks
}
if sink.Dialog == nil {
return false
}
callId, _ := sink.Dialog.CallID()
id := callId.Value()
if _, ok := s.callIds[id]; ok {
return false
} else if _, ok := streamSinks[sink.ID]; ok {
return false
}
s.callIds[id] = sink
s.streamSinks[sink.Stream][sink.ID] = sink
return true
}
func (s *sinkManager) AddWithSinkStreamId(sink *Sink) bool {
s.lock.Lock()
defer s.lock.Unlock()
if _, ok := s.sinkStreamIds[sink.SinkStream]; ok {
return false
}
s.sinkStreamIds[sink.SinkStream] = sink
return true
}
func (s *sinkManager) Remove(stream StreamID, sinkID string) *Sink {
s.lock.Lock()
defer s.lock.Unlock()
if _, ok := s.streamSinks[stream]; !ok {
return nil
}
sink, ok := s.streamSinks[stream][sinkID]
if !ok {
return nil
}
s.removeSink(sink)
return sink
}
func (s *sinkManager) RemoveWithCallId(callId string) *Sink {
s.lock.Lock()
defer s.lock.Unlock()
if sink, ok := s.callIds[callId]; ok {
s.removeSink(sink)
return sink
}
return nil
}
func (s *sinkManager) removeSink(sink *Sink) {
delete(s.streamSinks[sink.Stream], sink.ID)
if sink.Dialog != nil {
callID, _ := sink.Dialog.CallID()
delete(s.callIds, callID.Value())
}
if sink.SinkStream != "" {
delete(s.sinkStreamIds, sink.SinkStream)
}
}
func (s *sinkManager) RemoveWithSinkStreamId(sinkStreamId StreamID) *Sink {
s.lock.Lock()
defer s.lock.Unlock()
if sink, ok := s.sinkStreamIds[sinkStreamId]; ok {
s.removeSink(sink)
return sink
}
return nil
}
func (s *sinkManager) Find(stream StreamID, sinkID string) *Sink {
s.lock.RLock()
defer s.lock.RUnlock()
if _, ok := s.streamSinks[stream]; !ok {
return nil
}
sink, ok := s.streamSinks[stream][sinkID]
if !ok {
return nil
}
return sink
}
func (s *sinkManager) FindWithCallId(callId string) *Sink {
s.lock.RLock()
defer s.lock.RUnlock()
if sink, ok := s.callIds[callId]; ok {
return sink
}
return nil
}
func (s *sinkManager) FindWithSinkStreamId(sinkStreamId StreamID) *Sink {
s.lock.RLock()
defer s.lock.RUnlock()
if sink, ok := s.sinkStreamIds[sinkStreamId]; ok {
return sink
}
return nil
}
func (s *sinkManager) PopSinks(stream StreamID) []*Sink {
s.lock.Lock()
defer s.lock.Unlock()
if _, ok := s.streamSinks[stream]; !ok {
return nil
}
var sinkList []*Sink
for _, sink := range s.streamSinks[stream] {
sinkList = append(sinkList, sink)
}
for _, sink := range sinkList {
s.removeSink(sink)
}
delete(s.streamSinks, stream)
return sinkList
}
func AddForwardSink(StreamID StreamID, sink *Sink) bool {
if !SinkManager.Add(sink) {
Sugar.Errorf("转发Sink添加失败, StreamID: %s SinkID: %s", StreamID, sink.ID)
return false
}
if DB != nil {
err := DB.SaveForwardSink(StreamID, sink)
if err != nil {
Sugar.Errorf("转发Sink保存到数据库失败, err: %s", err.Error())
}
}
return true
}
func RemoveForwardSink(StreamID StreamID, sinkID string) *Sink {
sink := SinkManager.Remove(StreamID, sinkID)
if sink == nil {
return nil
}
releaseSink(sink)
return sink
}
func RemoveForwardSinkWithCallId(callId string) *Sink {
sink := SinkManager.RemoveWithCallId(callId)
if sink == nil {
return nil
}
releaseSink(sink)
return sink
}
func RemoveForwardSinkWithSinkStreamId(sinkStreamId StreamID) *Sink {
sink := SinkManager.RemoveWithSinkStreamId(sinkStreamId)
if sink == nil {
return nil
}
releaseSink(sink)
return sink
}
func releaseSink(sink *Sink) {
if DB != nil {
err := DB.DeleteForwardSink(sink.Stream, sink.ID)
if err != nil {
Sugar.Errorf("删除转发Sink失败, err: %s", err.Error())
}
}
// 减少拉流计数
if stream := StreamManager.Find(sink.Stream); stream != nil {
stream.DecreaseSinkCount()
}
}
func closeSink(sink *Sink, bye, ms bool) {
releaseSink(sink)
var callId string
if sink.Dialog != nil {
callId_, _ := sink.Dialog.CallID()
callId = callId_.Value()
}
platform := PlatformManager.FindPlatform(sink.ServerID)
if platform != nil {
platform.CloseStream(callId, bye, ms)
} else {
sink.Close(bye, ms)
}
}
func CloseStreamSinks(StreamID StreamID, bye, ms bool) []*Sink {
sinks := SinkManager.PopSinks(StreamID)
for _, sink := range sinks {
closeSink(sink, bye, ms)
}
// 查询数据库中的残余sink
if DB != nil {
// 恢复级联转发sink
forwardSinks, _ := DB.QueryForwardSinks(StreamID)
for _, sink := range forwardSinks {
closeSink(sink, bye, ms)
}
}
// 删除整个转发流
if DB != nil {
err := DB.Del(ForwardSinksKey(string(StreamID)))
if err != nil {
Sugar.Errorf("删除转发Sink失败, err: %s", err.Error())
}
}
return sinks
}
func FindSink(StreamID StreamID, sinkID string) *Sink {
return SinkManager.Find(StreamID, sinkID)
}
func NewSinkManager() *sinkManager {
return &sinkManager{
streamSinks: make(map[StreamID]map[string]*Sink),
callIds: make(map[string]*Sink),
sinkStreamIds: make(map[StreamID]*Sink),
}
}

View File

@@ -36,6 +36,7 @@ const (
CmdRecordInfo = "RecordInfo"
CmdMobilePosition = "MobilePosition"
CmdKeepalive = "Keepalive"
CmdBroadcast = "Broadcast"
)
func init() {
@@ -117,24 +118,24 @@ func (s *sipServer) OnRegister(req sip.Request, tx sip.ServerTransaction, parent
}
}
// OnInvite 上级预览/下级广播
// OnInvite 收到上级预览/下级设备广播请求
func (s *sipServer) OnInvite(req sip.Request, tx sip.ServerTransaction, parent bool) {
SendResponse(tx, sip.NewResponseFromRequest("", req, 100, "Trying", ""))
user := req.Recipient().User().String()
if len(user) != 20 {
SendResponseWithStatusCode(req, tx, http.StatusNotFound)
return
}
//if len(user) != 20 {
// SendResponseWithStatusCode(req, tx, http.StatusNotFound)
// return
//}
// 查找对应的设备
var device GBDevice
if parent {
// 级联设备
device = PlatformManager.FindPlatformWithServerAddr(req.Source())
} else if session := FindBroadcastSessionWithSourceID(user); session != nil {
} else if session := BroadcastDialogs.Find(user); session != nil {
// 语音广播设备
device = DeviceManager.Find(session.DeviceID)
device = DeviceManager.Find(session.SinkStream.DeviceID())
} else {
// 根据Subject头域查找设备
headers := req.GetHeaders("Subject")
@@ -148,6 +149,8 @@ func (s *sipServer) OnInvite(req sip.Request, tx sip.ServerTransaction, parent b
}
if device == nil {
logger.Error("处理Invite失败, 找不到设备. request: %s", req.String())
SendResponseWithStatusCode(req, tx, http.StatusNotFound)
} else {
response := device.OnInvite(req, user)
@@ -170,10 +173,10 @@ func (s *sipServer) OnBye(req sip.Request, tx sip.ServerTransaction, parent bool
// 下级设备挂断, 关闭流
deviceId = stream.ID.DeviceID()
stream.Close(false, true)
} else if session := BroadcastManager.RemoveWithCallId(id.Value()); session != nil {
} else if session := StreamManager.RemoveWithCallId(id.Value()); session != nil {
// 广播挂断
deviceId = session.DeviceID
session.Close(false)
deviceId = session.ID.DeviceID()
session.Close(false, true)
}
if parent {
@@ -232,6 +235,11 @@ func (s *sipServer) OnMessage(req sip.Request, tx sip.ServerTransaction, parent
// 查找设备
var device GBDevice
deviceId := message.(BaseMessageGetter).GetDeviceID()
if CmdBroadcast == cmd {
// 广播消息
from, _ := req.From()
deviceId = from.Address.User().String()
}
if parent {
device = PlatformManager.FindPlatformWithServerAddr(req.Source())
} else {
@@ -262,7 +270,7 @@ func (s *sipServer) OnMessage(req sip.Request, tx sip.ServerTransaction, parent
// 查询出所有通道
if DB != nil {
result, _, err := DB.QueryChannels(client.GetID(), 1, 0xFFFFFFFF)
result, _, err := DB.QueryChannels(client.(*GBPlatform).SeverID, 1, 0xFFFFFFFF)
if err != nil {
Sugar.Errorf("查询设备通道列表失败 err: %s device: %s", err.Error(), client.GetID())
}
@@ -390,6 +398,7 @@ func StartSipServer(id, listenIP, publicIP string, listenPort int) (SipServer, e
fmt.Sprintf("%s.%s", XmlNameResponse, CmdRecordInfo): reflect.TypeOf(QueryRecordInfoResponse{}),
fmt.Sprintf("%s.%s", XmlNameNotify, CmdKeepalive): reflect.TypeOf(BaseMessage{}),
fmt.Sprintf("%s.%s", XmlNameNotify, CmdMobilePosition): reflect.TypeOf(BaseMessage{}),
fmt.Sprintf("%s.%s", XmlNameResponse, CmdBroadcast): reflect.TypeOf(BaseMessage{}),
}}
utils.Assert(ua.OnRequest(sip.REGISTER, filterRequest(server.OnRegister)) == nil)

122
stream.go
View File

@@ -5,24 +5,63 @@ import (
"encoding/json"
"github.com/ghettovoice/gosip/sip"
"github.com/ghettovoice/gosip/sip/parser"
"sync"
"sync/atomic"
"time"
)
// Stream 国标推流
type SetupType int
const (
SetupTypeUDP SetupType = iota
SetupTypePassive
SetupTypeActive
)
var (
DefaultSetupType = SetupTypePassive
)
func (s SetupType) String() string {
switch s {
case SetupTypeUDP:
return "udp"
case SetupTypePassive:
return "passive"
case SetupTypeActive:
return "active"
}
panic("invalid setup type")
}
type StreamWaiting struct {
onPublishCb chan int // 等待推流hook的管道
cancelFunc func() // 取消等待推流hook的ctx
}
func (s *StreamWaiting) WaitForPublishEvent(seconds int) int {
s.onPublishCb = make(chan int, 0)
timeout, cancelFunc := context.WithTimeout(context.Background(), time.Duration(seconds)*time.Second)
s.cancelFunc = cancelFunc
select {
case code := <-s.onPublishCb:
return code
case <-timeout.Done():
s.cancelFunc = nil
return -1
}
}
type Stream struct {
ID StreamID `json:"id"` // 流ID
Protocol string `json:"protocol,omitempty"` // 推流协议
Dialog sip.Request `json:"dialog,omitempty"` // 国标推流时, 与推流通道的Invite会话
ID StreamID `json:"id"` // 流ID
Protocol string `json:"protocol,omitempty"` // 推流协议, rtmp/28181/1078/gb_talk
Dialog sip.Request `json:"dialog,omitempty"` // 国标流的SipCall会话
CreateTime int64 `json:"create_time"` // 推流时间
SinkCount int32 `json:"sink_count"` // 拉流端计数(包含级联转发)
SetupType SetupType
lock sync.RWMutex
ForwardStreamSinks map[string]*Sink // 级联转发Sink, Key为与上级的CallID. 不保存所有的拉流端,查询拉流端列表,从流媒体服务器查询或新建数据库查询。 json序列化, 线程安全?
urls []string // 从流媒体服务器返回的拉流地址
publishEvent chan byte // 等待推流hook的管道
cancelFunc func() // 取消等待推流hook的ctx
urls []string // 从流媒体服务器返回的拉流地址
StreamWaiting
}
func (s *Stream) MarshalJSON() ([]byte, error) {
@@ -70,53 +109,6 @@ func (s *Stream) UnmarshalJSON(data []byte) error {
return nil
}
func (s *Stream) AddForwardStreamSink(id string, sink *Sink) {
s.lock.Lock()
defer s.lock.Unlock()
s.ForwardStreamSinks[id] = sink
go DB.SaveStream(s)
}
func (s *Stream) RemoveForwardStreamSink(id string) *Sink {
s.lock.Lock()
defer s.lock.Unlock()
sink, ok := s.ForwardStreamSinks[id]
if ok {
delete(s.ForwardStreamSinks, id)
}
go DB.SaveStream(s)
return sink
}
func (s *Stream) GetForwardStreamSinks() []*Sink {
s.lock.Lock()
defer s.lock.Unlock()
var sinks []*Sink
for _, sink := range s.ForwardStreamSinks {
sinks = append(sinks, sink)
}
return sinks
}
func (s *Stream) WaitForPublishEvent(seconds int) bool {
s.publishEvent = make(chan byte, 0)
timeout, cancelFunc := context.WithTimeout(context.Background(), time.Duration(seconds)*time.Second)
s.cancelFunc = cancelFunc
select {
case <-s.publishEvent:
return true
case <-timeout.Done():
s.cancelFunc = nil
return false
}
}
func (s *Stream) GetSinkCount() int32 {
return atomic.LoadInt32(&s.SinkCount)
}
@@ -152,21 +144,9 @@ func (s *Stream) Close(bye, ms bool) {
go CloseSource(string(s.ID))
}
// 关闭所有级联会话
sinks := s.GetForwardStreamSinks()
for _, sink := range sinks {
id, _ := sink.Dialog.CallID()
// 关闭所转发会话
CloseStreamSinks(s.ID, bye, ms)
// 如果级联设备存在, 通过级联设备中删除会话
platform := PlatformManager.FindPlatform(sink.ServerID)
if platform == nil {
continue
}
platform.CloseStream(id.Value(), true, true)
}
s.ForwardStreamSinks = map[string]*Sink{}
// 从数据库中删除流记录
DB.DeleteStream(s.CreateTime)
}

View File

@@ -5,7 +5,7 @@ import (
"strings"
)
type StreamID string
type StreamID string // 目前目涉及转码,多路流, 与SourceID相同
func (s StreamID) DeviceID() string {
return strings.Split(string(s), "/")[0]
@@ -15,7 +15,7 @@ func (s StreamID) ChannelID() string {
return strings.Split(strings.Split(string(s), "/")[1], ".")[0]
}
func GenerateStreamId(inviteType InviteType, deviceId, channelId string, startTime, endTime string) StreamID {
func GenerateStreamID(inviteType InviteType, deviceId, channelId string, startTime, endTime string) StreamID {
utils.Assert(channelId != "")
var streamId []string
@@ -28,6 +28,8 @@ func GenerateStreamId(inviteType InviteType, deviceId, channelId string, startTi
return StreamID(strings.Join(streamId, "/") + ".playback" + "." + startTime + "." + endTime)
} else if InviteTypeDownload == inviteType {
return StreamID(strings.Join(streamId, "/") + ".download" + "." + startTime + "." + endTime)
} else if InviteTypeBroadcast == inviteType {
return StreamID(strings.Join(streamId, "/") + ".broadcast")
}
return StreamID(strings.Join(streamId, "/"))