Compare commits

...

5 Commits

Author SHA1 Message Date
yangjiechina
61e152e8ed fix: 国标级联转发失败问题 2025-05-11 18:58:33 +08:00
ydajiang
b254fff76c feat: 支持国标语音广播 2025-05-09 20:56:17 +08:00
ydajiang
9f22072406 Merge branch 'dev' of https://github.com/lkmio/lkm into dev 2025-05-05 17:11:34 +08:00
ydajiang
866dac20e1 fix: sink连接断开后仍然推流问题; 2025-05-05 11:15:54 +08:00
ydajiang
c9881b9549 refactor: ssrc由流媒体服务生成 2025-05-05 10:32:12 +08:00
21 changed files with 1301 additions and 273 deletions

13
api.go
View File

@@ -104,19 +104,18 @@ func startApiServer(addr string) {
apiServer.router.HandleFunc("/api/v1/streams/statistics", nil) // 统计所有推拉流
if stream.AppConfig.GB28181.Enable {
apiServer.router.HandleFunc("/api/v1/gb28181/forward", filterRequestBodyParams(apiServer.OnGBSourceForward, &GBForwardParams{})) // 设置级联转发目标停止级联调用sink/close接口级联断开会走on_play_done事件通知
apiServer.router.HandleFunc("/api/v1/gb28181/source/create", filterRequestBodyParams(apiServer.OnGBSourceCreate, &GBSourceParams{})) // 创建国标推流源
apiServer.router.HandleFunc("/api/v1/gb28181/source/connect", filterRequestBodyParams(apiServer.OnGBSourceConnect, &GBConnect{})) // 为国标TCP主动推流设置连接地址
apiServer.router.HandleFunc("/ws/v1/gb28181/talk", apiServer.OnGBTalk) // 对讲的主讲人WebSocket连接
apiServer.router.HandleFunc("/api/v1/gb28181/offer/create", filterRequestBodyParams(apiServer.OnGBOfferCreate, &SourceSDP{}))
apiServer.router.HandleFunc("/api/v1/gb28181/answer/create", filterRequestBodyParams(apiServer.OnGBAnswerCreate, &GBOffer{}))
apiServer.router.HandleFunc("/api/v1/gb28181/answer/set", filterRequestBodyParams(apiServer.OnGBSourceConnect, &SourceSDP{})) // active拉流模式下, 设置对方的地址
}
apiServer.router.HandleFunc("/api/v1/gc/force", func(writer http.ResponseWriter, request *http.Request) {
runtime.GC()
writer.WriteHeader(http.StatusOK)
})
apiServer.router.HandleFunc("/rtc.html", func(writer http.ResponseWriter, request *http.Request) {
http.ServeFile(writer, request, "./rtc.html")
})
apiServer.router.PathPrefix("/web/").Handler(http.StripPrefix("/web/", http.FileServer(http.Dir("./web"))))
http.Handle("/", apiServer.router)
srv := &http.Server{

183
api_gb.go
View File

@@ -2,40 +2,48 @@ package main
import (
"fmt"
"github.com/lkmio/avformat/bufio"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/lkm/gb28181"
"github.com/lkmio/lkm/log"
"github.com/lkmio/lkm/stream"
"net"
"net/http"
"strings"
"strconv"
)
type GBForwardParams struct {
Source string `json:"source"` //GetSourceID
Addr string `json:"addr"`
SSRC uint32 `json:"ssrc"`
Setup string `json:"setup"`
const (
InviteTypePlay = "play"
InviteTypePlayback = "playback"
InviteTypeDownload = "download"
InviteTypeBroadcast = "broadcast"
InviteTypeTalk = "talk"
)
type SDP struct {
SessionName string `json:"session_name,omitempty"` // play/download/playback/talk/broadcast
Addr string `json:"addr,omitempty"` // 连接地址
SSRC string `json:"ssrc,omitempty"`
Setup string `json:"setup,omitempty"` // active/passive
Transport string `json:"transport,omitempty"` // tcp/udp
}
type GBSourceParams struct {
Source string `json:"source"` //GetSourceID
Setup string `json:"setup"` //active/passive
SSRC uint32 `json:"ssrc,omitempty"`
type SourceSDP struct {
Source string `json:"source"` // GetSourceID
SDP
}
type GBConnect struct {
Source string `json:"source"` //GetSourceID
RemoteAddr string `json:"remote_addr"`
type GBOffer struct {
SourceSDP
AnswerSetup string `json:"answer_setup,omitempty"` // 希望应答的连接方式
}
func (api *ApiServer) OnGBSourceCreate(v *GBSourceParams, w http.ResponseWriter, r *http.Request) {
func (api *ApiServer) OnGBSourceCreate(v *SourceSDP, w http.ResponseWriter, r *http.Request) {
log.Sugar.Infof("创建国标源: %v", v)
// 返回收流地址
response := &struct {
IP string `json:"ip"`
Port int `json:"port,omitempty"`
SDP
Urls []string `json:"urls"`
}{}
@@ -78,18 +86,28 @@ func (api *ApiServer) OnGBSourceCreate(v *GBSourceParams, w http.ResponseWriter,
}
}
_, port, err := gb28181.NewGBSource(v.Source, v.SSRC, tcp, active)
var ssrc string
if v.SessionName == InviteTypeDownload || v.SessionName == InviteTypePlayback {
ssrc = gb28181.GetVodSSRC()
} else {
ssrc = gb28181.GetLiveSSRC()
}
ssrcValue, _ := strconv.Atoi(ssrc)
_, port, err := gb28181.NewGBSource(v.Source, uint32(ssrcValue), tcp, active)
if err != nil {
return
}
response.IP = stream.AppConfig.PublicIP
response.Port = port
response.Addr = net.JoinHostPort(stream.AppConfig.PublicIP, strconv.Itoa(port))
response.Urls = stream.GetStreamPlayUrls(v.Source)
response.SSRC = ssrc
log.Sugar.Infof("创建国标源成功, addr: %s, ssrc: %d", response.Addr, ssrcValue)
httpResponseOK(w, response)
}
func (api *ApiServer) OnGBSourceConnect(v *GBConnect, w http.ResponseWriter, r *http.Request) {
func (api *ApiServer) OnGBSourceConnect(v *SourceSDP, w http.ResponseWriter, r *http.Request) {
log.Sugar.Infof("设置国标主动拉流连接地址: %v", v)
var err error
@@ -113,7 +131,7 @@ func (api *ApiServer) OnGBSourceConnect(v *GBConnect, w http.ResponseWriter, r *
return
}
addr, err := net.ResolveTCPAddr("tcp", v.RemoteAddr)
addr, err := net.ResolveTCPAddr("tcp", v.Addr)
if err != nil {
return
}
@@ -123,62 +141,133 @@ func (api *ApiServer) OnGBSourceConnect(v *GBConnect, w http.ResponseWriter, r *
}
}
func (api *ApiServer) OnGBSourceForward(v *GBForwardParams, w http.ResponseWriter, r *http.Request) {
log.Sugar.Infof("设置国标级联转发: %v", v)
func (api *ApiServer) OnGBOfferCreate(v *SourceSDP, w http.ResponseWriter, r *http.Request) {
// 预览下级设备
if v.SessionName == "" || v.SessionName == InviteTypePlay ||
v.SessionName == InviteTypePlayback ||
v.SessionName == InviteTypeDownload {
api.OnGBSourceCreate(v, w, r)
} else {
// 向上级转发广播和对讲, 或者是向设备发送invite talk
}
}
func (api *ApiServer) OnGBAnswerCreate(v *GBOffer, w http.ResponseWriter, r *http.Request) {
log.Sugar.Infof("创建应答 offer: %v", v)
var sink stream.Sink
var err error
// 响应错误消息
defer func() {
if err != nil {
log.Sugar.Errorf("设置级联转发失败 err: %s", err.Error())
log.Sugar.Errorf("创建应答失败 err: %s", err.Error())
httpResponseError(w, err.Error())
if sink != nil {
sink.Close()
}
}
}()
source := stream.SourceManager.Find(v.Source)
if source == nil {
err = fmt.Errorf("%s 源不存在", v.Source)
} else if source.GetType() != stream.SourceType28181 {
log.Sugar.Infof("%s 源不是国标推流类型", v.Source)
return
}
var setup gb28181.SetupType
switch strings.ToLower(v.Setup) {
case "active":
setup = gb28181.SetupActive
break
case "passive":
setup = gb28181.SetupPassive
break
default:
setup = gb28181.SetupUDP
break
}
addr, _ := net.ResolveTCPAddr("tcp", r.RemoteAddr)
sinkId := stream.NetAddr2SinkId(addr)
// 添加随机数
// sinkId添加随机数
if ipv4, ok := sinkId.(uint64); ok {
random := uint64(utils.RandomIntInRange(0x1000, 0xFFFF0000))
sinkId = (ipv4 & 0xFFFFFFFF00000000) | (random << 16) | (ipv4 & 0xFFFF)
}
sink, port, err := gb28181.NewForwardSink(v.SSRC, v.Addr, setup, sinkId, v.Source)
setup := gb28181.SetupTypeFromString(v.Setup)
if v.AnswerSetup != "" {
setup = gb28181.SetupTypeFromString(v.AnswerSetup)
}
var protocol stream.TransStreamProtocol
// 级联转发
if v.SessionName == "" || v.SessionName == InviteTypePlay ||
v.SessionName == InviteTypePlayback ||
v.SessionName == InviteTypeDownload {
protocol = stream.TransStreamGBCascadedForward
} else {
// 对讲广播转发
protocol = stream.TransStreamGBTalkForward
}
var port int
sink, port, err = stream.NewForwardSink(setup.TransportType(), protocol, sinkId, v.Source, v.Addr, gb28181.TransportManger)
if err != nil {
return
}
source.AddSink(sink)
log.Sugar.Infof("设置国标级联转发成功 ID: %s", sink.GetID())
log.Sugar.Infof("创建转发sink成功, sink: %s port: %d transport: %s", sink.GetID(), port, setup.TransportType())
_, state := stream.PreparePlaySink(sink)
if utils.HookStateOK != state {
err = fmt.Errorf("failed to prepare play sink")
return
}
response := struct {
Sink string `json:"sink"` //sink id
IP string `json:"ip"`
Port int `json:"port"`
}{Sink: stream.SinkId2String(sinkId), IP: stream.AppConfig.PublicIP, Port: port}
SDP
}{Sink: stream.SinkId2String(sinkId), SDP: SDP{Addr: net.JoinHostPort(stream.AppConfig.PublicIP, strconv.Itoa(port))}}
httpResponseOK(w, &response)
}
// OnGBTalk 国标广播/对讲流程:
// 1. 浏览器使用WS携带source_id访问/api/v1/gb28181/talk, 如果source_id冲突, 直接断开ws连接
// 2. WS链接建立后, 调用gb-cms接口/api/v1/broadcast/invite, 向设备发送广播请求
func (api *ApiServer) OnGBTalk(w http.ResponseWriter, r *http.Request) {
conn, err := api.upgrader.Upgrade(w, r, nil)
if err != nil {
log.Sugar.Errorf("升级为websocket失败 err: %s", err.Error())
conn.Close()
return
}
// 获取id
id := r.FormValue("source")
talkSource := gb28181.NewTalkSource(id, conn)
talkSource.Init(stream.TCPReceiveBufferQueueSize)
talkSource.SetUrlValues(r.Form)
_, state := stream.PreparePublishSource(talkSource, true)
if utils.HookStateOK != state {
log.Sugar.Errorf("对讲失败, source: %s", talkSource)
conn.Close()
return
}
log.Sugar.Infof("ws对讲连接成功, source: %s", talkSource)
go stream.LoopEvent(talkSource)
for {
_, bytes, err := conn.ReadMessage()
length := len(bytes)
if err != nil {
log.Sugar.Errorf("读取对讲音频包失败, source: %s err: %s", id, err.Error())
break
} else if length < 1 {
continue
}
for i := 0; i < length; {
data := stream.UDPReceiveBufferPool.Get().([]byte)
n := bufio.MinInt(stream.UDPReceiveBufferSize, length-i)
copy(data, bytes[:n])
_ = talkSource.PublishSource.Input(data[:n])
i += n
}
}
talkSource.Close()
}

View File

@@ -0,0 +1,9 @@
package gb28181
import (
"github.com/lkmio/lkm/stream"
)
func CascadedTransStreamFactory(source stream.Source, protocol stream.TransStreamProtocol, tracks []*stream.Track) (stream.TransStream, error) {
return stream.NewRtpTransStream(stream.TransStreamGBCascadedForward, 1024), nil
}

View File

@@ -1,124 +0,0 @@
package gb28181
import (
"github.com/lkmio/avformat/collections"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/lkm/log"
"github.com/lkmio/lkm/stream"
"github.com/lkmio/rtp"
"github.com/lkmio/transport"
"net"
)
type ForwardSink struct {
stream.BaseSink
setup SetupType
socket transport.Transport
ssrc uint32
}
func (f *ForwardSink) OnConnected(conn net.Conn) []byte {
log.Sugar.Infof("级联连接 conn: %s", conn.RemoteAddr())
f.Conn = conn
f.Conn.(*transport.Conn).EnableAsyncWriteMode(512)
return nil
}
func (f *ForwardSink) OnPacket(conn net.Conn, data []byte) []byte {
return nil
}
func (f *ForwardSink) OnDisConnected(conn net.Conn, err error) {
log.Sugar.Infof("级联断开连接 conn: %s", conn.RemoteAddr())
f.Close()
}
func (f *ForwardSink) Write(index int, data []*collections.ReferenceCounter[[]byte], ts int64, keyVideo bool) error {
// TCP等待连接后再转发数据
if SetupUDP != f.setup && f.Conn == nil {
return nil
}
// 修改为与上级协商的SSRC
rtp.ModifySSRC(data[0].Get(), f.ssrc)
if SetupUDP == f.setup {
f.socket.(*transport.UDPClient).Write(data[0].Get()[2:])
} else {
return f.BaseSink.Write(index, data, ts, keyVideo)
}
return nil
}
// Close 关闭国标转发流
func (f *ForwardSink) Close() {
f.BaseSink.Close()
if f.socket != nil {
f.socket.Close()
}
}
// NewForwardSink 创建国标级联转发流Sink
// 返回监听的端口和Sink
func NewForwardSink(ssrc uint32, serverAddr string, setup SetupType, sinkId stream.SinkID, sourceId string) (stream.Sink, int, error) {
sink := &ForwardSink{BaseSink: stream.BaseSink{ID: sinkId, SourceID: sourceId, State: stream.SessionStateCreated, Protocol: stream.TransStreamGBStreamForward}, ssrc: ssrc, setup: setup}
if SetupUDP == setup {
remoteAddr, err := net.ResolveUDPAddr("udp", serverAddr)
if err != nil {
return nil, 0, err
}
client, err := TransportManger.NewUDPClient(remoteAddr)
if err != nil {
return nil, 0, err
}
sink.socket = client
} else if SetupActive == setup {
server, err := TransportManger.NewTCPServer()
if err != nil {
return nil, 0, err
}
sink.TCPStreaming = true
sink.socket = server
} else if SetupPassive == setup {
client := transport.TCPClient{}
err := TransportManger.AllocPort(true, func(port uint16) error {
localAddr, err := net.ResolveTCPAddr("tcp", stream.ListenAddr(int(port)))
if err != nil {
return err
}
remoteAddr, err := net.ResolveTCPAddr("tcp", serverAddr)
if err != nil {
return err
}
client.SetHandler(sink)
conn, err := client.Connect(localAddr, remoteAddr)
if err != nil {
return err
}
sink.Conn = conn
return nil
})
if err != nil {
return nil, 0, err
}
sink.TCPStreaming = true
sink.socket = &client
} else {
utils.Assert(false)
}
return sink, sink.socket.ListenPort(), nil
}

View File

@@ -1,61 +0,0 @@
package gb28181
import (
"encoding/binary"
"github.com/lkmio/avformat"
"github.com/lkmio/avformat/collections"
"github.com/lkmio/lkm/log"
"github.com/lkmio/lkm/stream"
)
// ForwardStream 国标级联转发流, 下级推什么, 就向上级发什么.
type ForwardStream struct {
stream.BaseTransStream
rtpBuffers *collections.Queue[*collections.ReferenceCounter[[]byte]]
}
func (f *ForwardStream) WriteHeader() error {
return nil
}
func (f *ForwardStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
size := 2 + uint16(len(packet.Data))
if size > stream.UDPReceiveBufferSize {
log.Sugar.Errorf("国标级联转发流失败 rtp包过长, 长度:%d, 最大允许:%d", len(packet.Data), stream.UDPReceiveBufferSize)
return nil, 0, false, nil
}
// 释放rtp包
for f.rtpBuffers.Size() > 0 {
rtp := f.rtpBuffers.Peek(0)
if rtp.UseCount() > 1 {
break
}
f.rtpBuffers.Pop()
// 放回池中
data := rtp.Get()
stream.UDPReceiveBufferPool.Put(data[:cap(data)])
}
bytes := stream.UDPReceiveBufferPool.Get().([]byte)
binary.BigEndian.PutUint16(bytes, size-2)
copy(bytes[2:], packet.Data)
rtp := collections.NewReferenceCounter(bytes[:size])
f.rtpBuffers.Push(rtp)
// 每帧都当关键帧, 直接发给上级
return []*collections.ReferenceCounter[[]byte]{rtp}, -1, true, nil
}
func NewTransStream() (stream.TransStream, error) {
return &ForwardStream{
BaseTransStream: stream.BaseTransStream{Protocol: stream.TransStreamGBStreamForward},
rtpBuffers: collections.NewQueue[*collections.ReferenceCounter[[]byte]](1024),
}, nil
}
func TransStreamFactory(source stream.Source, protocol stream.TransStreamProtocol, tracks []*stream.Track) (stream.TransStream, error) {
return NewTransStream()
}

View File

@@ -15,6 +15,7 @@ import (
"net/http"
"os"
"sort"
"strconv"
"testing"
"time"
)
@@ -50,15 +51,17 @@ func connectSource(source string, addr string) {
}
}
func createSource(source, setup string, ssrc uint32) (string, uint16) {
func createSource(source, setup string, ssrc uint32) (string, uint16, uint32) {
v := struct {
Source string `json:"source"` //GetSourceID
Setup string `json:"setup"` //active/passive
SSRC uint32 `json:"ssrc,omitempty"`
Source string `json:"source"` //GetSourceID
Setup string `json:"setup"` //active/passive
SSRC string `json:"ssrc,omitempty"`
SessionName string `json:"session_name,omitempty"` // play/download/playback/talk/broadcast
}{
Source: source,
Setup: setup,
SSRC: ssrc,
Source: source,
Setup: setup,
SSRC: strconv.Itoa(int(ssrc)),
SessionName: "play",
}
marshal, err := json.Marshal(v)
@@ -66,7 +69,8 @@ func createSource(source, setup string, ssrc uint32) (string, uint16) {
panic(err)
}
request, err := http.NewRequest("POST", "http://localhost:8080/api/v1/gb28181/source/create", bytes.NewBuffer(marshal))
//request, err := http.NewRequest("POST", "http://localhost:8080/api/v1/gb28181/source/create", bytes.NewBuffer(marshal))
request, err := http.NewRequest("POST", "http://localhost:8080/api/v1/gb28181/offer/create", bytes.NewBuffer(marshal))
if err != nil {
panic(err)
}
@@ -88,8 +92,8 @@ func createSource(source, setup string, ssrc uint32) (string, uint16) {
Code int `json:"code"`
Msg string `json:"msg"`
Data struct {
IP string `json:"ip"`
Port uint16 `json:"port,omitempty"`
Addr string `json:"addr"`
SSRC string `json:"ssrc,omitempty"`
}
}{}
@@ -98,7 +102,14 @@ func createSource(source, setup string, ssrc uint32) (string, uint16) {
panic(err)
}
return connectInfo.Data.IP, connectInfo.Data.Port
atoi, err := strconv.Atoi(connectInfo.Data.SSRC)
if err != nil {
panic(err)
}
host, p, err := net.SplitHostPort(connectInfo.Data.Addr)
Port, err := strconv.Atoi(p)
return host, uint16(Port), uint32(atoi)
}
// 分割rtp包, 返回rtp over tcp包
@@ -171,11 +182,24 @@ func ctrDelay(data []byte) {
ts = int64(packet.Timestamp)
}
func modifySSRC(data []byte, ssrc uint32) {
packet := rtp.Packet{}
err := packet.Unmarshal(data)
if err != nil {
panic(err)
}
packet.SSRC = ssrc
bytes, err := packet.Marshal()
utils.Assert(len(bytes) == len(data))
copy(data, bytes)
}
// 使用wireshark直接导出的rtp流
// 根据ssrc来查找每个rtp包, rtp不要带扩展字段
func TestPublish(t *testing.T) {
path := "../../source_files/gb28181_h264.rtp"
var ssrc uint32 = 0xBEBC201
var rawSsrc uint32 = 0xBEBC201
localAddr := "0.0.0.0:20001"
id := "hls_mystream"
@@ -185,7 +209,7 @@ func TestPublish(t *testing.T) {
}
var packets [][]byte
packets, ssrc = splitPackets(data, ssrc)
packets, rawSsrc = splitPackets(data, rawSsrc)
utils.Assert(len(packets) > 0)
sort.Slice(packets, func(i, j int) bool {
@@ -230,7 +254,7 @@ func TestPublish(t *testing.T) {
})
t.Run("udp", func(t *testing.T) {
ip, port := createSource(id, "udp", ssrc)
ip, port, ssrc := createSource(id, "udp", rawSsrc)
addr, _ := net.ResolveUDPAddr("udp", localAddr)
remoteAddr, _ := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", ip, port))
@@ -242,13 +266,14 @@ func TestPublish(t *testing.T) {
}
for _, packet := range packets {
modifySSRC(packet[2:], ssrc)
client.Write(packet[2:])
ctrDelay(packet[2:])
}
})
t.Run("passive", func(t *testing.T) {
ip, port := createSource(id, "passive", ssrc)
ip, port, ssrc := createSource(id, "passive", rawSsrc)
addr, _ := net.ResolveTCPAddr("tcp", localAddr)
remoteAddr, _ := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", ip, port))
@@ -261,19 +286,21 @@ func TestPublish(t *testing.T) {
}
for _, packet := range packets {
modifySSRC(packet[2:], ssrc)
client.Write(packet)
ctrDelay(packet[2:])
}
})
t.Run("active", func(t *testing.T) {
ip, port := createSource(id, "active", ssrc)
ip, port, ssrc := createSource(id, "active", rawSsrc)
addr, _ := net.ResolveTCPAddr("tcp", localAddr)
server := transport.TCPServer{}
server.SetHandler2(func(conn net.Conn) []byte {
for _, packet := range packets {
modifySSRC(packet[2:], ssrc)
conn.Write(packet)
ctrDelay(packet[2:])
}

View File

@@ -25,6 +25,43 @@ const (
JitterBufferSize = 1024 * 1024
)
func (s SetupType) TransportType() stream.TransportType {
switch s {
case SetupUDP:
return stream.TransportTypeUDP
case SetupPassive:
return stream.TransportTypeTCPServer
case SetupActive:
return stream.TransportTypeTCPClient
default:
panic(fmt.Errorf("invalid setup type: %d", s))
}
}
func (s SetupType) String() string {
switch s {
case SetupUDP:
return "udp"
case SetupPassive:
return "passive"
case SetupActive:
return "active"
default:
panic(fmt.Errorf("invalid setup type: %d", s))
}
}
func SetupTypeFromString(setupType string) SetupType {
switch setupType {
case "passive":
return SetupPassive
case "active":
return SetupActive
default:
return SetupUDP
}
}
var (
TransportManger transport.Manager
SharedUDPServer *UDPServer

48
gb28181/ssrc_manager.go Normal file
View File

@@ -0,0 +1,48 @@
package gb28181
import (
"fmt"
"strconv"
"sync"
)
const (
MaxSsrcValue = 999999999
)
var (
ssrcCount uint32
lock sync.Mutex
SSRCFilters []Filter
)
func NextSSRC() uint32 {
lock.Lock()
defer lock.Unlock()
ssrcCount = (ssrcCount + 1) % MaxSsrcValue
return ssrcCount
}
func getUniqueSSRC(ssrc string, get func() string) string {
atoi, err := strconv.Atoi(ssrc)
if err != nil {
panic(err)
}
v := uint32(atoi)
for _, filter := range SSRCFilters {
if filter.FindSource(v) != nil {
ssrc = get()
}
}
return ssrc
}
func GetLiveSSRC() string {
return getUniqueSSRC(fmt.Sprintf("0%09d", NextSSRC()), GetLiveSSRC)
}
func GetVodSSRC() string {
return getUniqueSSRC(fmt.Sprintf("%d", 1000000000+NextSSRC()), GetVodSSRC)
}

94
gb28181/talk_source.go Normal file
View File

@@ -0,0 +1,94 @@
package gb28181
import (
"github.com/gorilla/websocket"
"github.com/lkmio/avformat"
"github.com/lkmio/avformat/bufio"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/lkm/stream"
"time"
)
type Demuxer struct {
avformat.BaseDemuxer
ts int64
firstOfPacket bool
}
func (d *Demuxer) Input(data []byte) (int, error) {
length := len(data)
if !d.firstOfPacket {
d.firstOfPacket = true
d.OnNewAudioTrack(0, utils.AVCodecIdPCMALAW, 8000, nil, avformat.AudioConfig{
HasADTSHeader: false,
Channels: 1,
SampleRate: 8000,
SampleSize: 2,
})
d.ProbeComplete()
}
for i := 0; i < length; {
n := bufio.MinInt(length-i, 320)
_, _ = d.DataPipeline.Write(data[i:i+n], 0, utils.AVMediaTypeAudio)
pkt, _ := d.DataPipeline.Feat(0)
d.OnAudioPacket(0, utils.AVCodecIdPCMALAW, pkt, d.ts)
d.ts += int64(n)
i += n
}
return length, nil
}
type TalkSource struct {
stream.PublishSource
}
func (s *TalkSource) Input(data []byte) error {
_, err := s.PublishSource.TransDemuxer.Input(data)
return err
}
func (s *TalkSource) Close() {
s.PublishSource.Close()
// 关闭所有对讲设备的会话
stream.CloseWaitingSinks(s.ID)
}
type WSConn struct {
*websocket.Conn
}
func (w WSConn) Read(b []byte) (n int, err error) {
panic("implement me")
}
func (w WSConn) Write(block []byte) (n int, err error) {
panic("implement me")
}
func (w WSConn) SetDeadline(t time.Time) error {
panic("implement me")
}
func NewTalkSource(id string, conn *websocket.Conn) *TalkSource {
s := &TalkSource{
PublishSource: stream.PublishSource{
ID: id,
Type: stream.SourceTypeGBTalk,
Conn: &WSConn{conn},
TransDemuxer: &Demuxer{
BaseDemuxer: avformat.BaseDemuxer{
DataPipeline: &avformat.StreamsBuffer{},
Name: "gb_talk",
AutoFree: false,
},
},
},
}
s.TransDemuxer.SetHandler(s)
return s
}

42
gb28181/talk_stream.go Normal file
View File

@@ -0,0 +1,42 @@
package gb28181
import (
"github.com/lkmio/avformat"
"github.com/lkmio/avformat/collections"
"github.com/lkmio/lkm/stream"
"github.com/lkmio/rtp"
)
type TalkStream struct {
*stream.RtpStream
muxer rtp.Muxer
packet []byte
}
func (s *TalkStream) WriteHeader() error {
return nil
}
func (s *TalkStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
var size int
s.muxer.Input(packet.Data, uint32(packet.Dts), func() []byte {
return s.packet
}, func(pkt []byte) {
size = len(pkt)
})
packet = &avformat.AVPacket{Data: s.packet[:size]}
return s.RtpStream.Input(packet)
}
func NewTalkTransStream() (stream.TransStream, error) {
return &TalkStream{
RtpStream: stream.NewRtpTransStream(stream.TransStreamGBTalkForward, 1024),
muxer: rtp.NewMuxer(8, 0, 0xFFFFFFFF),
packet: make([]byte, 1500),
}, nil
}
func TalkTransStreamFactory(source stream.Source, protocol stream.TransStreamProtocol, tracks []*stream.Track) (stream.TransStream, error) {
return NewTalkTransStream()
}

11
main.go
View File

@@ -28,7 +28,8 @@ func init() {
stream.RegisterTransStreamFactory(stream.TransStreamFlv, flv.TransStreamFactory)
stream.RegisterTransStreamFactory(stream.TransStreamRtsp, rtsp.TransStreamFactory)
stream.RegisterTransStreamFactory(stream.TransStreamRtc, rtc.TransStreamFactory)
stream.RegisterTransStreamFactory(stream.TransStreamGBStreamForward, gb28181.TransStreamFactory)
stream.RegisterTransStreamFactory(stream.TransStreamGBCascadedForward, gb28181.CascadedTransStreamFactory)
stream.RegisterTransStreamFactory(stream.TransStreamGBTalkForward, gb28181.TalkTransStreamFactory)
stream.SetRecordStreamFactory(record.NewFLVFileSink)
stream.StreamEndInfoBride = NewStreamEndInfo
@@ -126,23 +127,27 @@ func main() {
// 多端口模式下, 创建GBSource时才创建收流端口
if stream.AppConfig.GB28181.Enable && !stream.AppConfig.GB28181.IsMultiPort() {
if stream.AppConfig.GB28181.IsEnableUDP() {
server, err := gb28181.NewUDPServer(gb28181.NewSSRCFilter(128))
filter := gb28181.NewSSRCFilter(128)
server, err := gb28181.NewUDPServer(filter)
if err != nil {
panic(err)
}
gb28181.SharedUDPServer = server
log.Sugar.Info("启动GB28181 udp收流端口成功:" + stream.ListenAddr(stream.AppConfig.GB28181.Port[0]))
gb28181.SSRCFilters = append(gb28181.SSRCFilters, filter)
}
if stream.AppConfig.GB28181.IsEnableTCP() {
server, err := gb28181.NewTCPServer(gb28181.NewSSRCFilter(128))
filter := gb28181.NewSSRCFilter(128)
server, err := gb28181.NewTCPServer(filter)
if err != nil {
panic(err)
}
gb28181.SharedTCPServer = server
log.Sugar.Info("启动GB28181 tcp收流端口成功:" + stream.ListenAddr(stream.AppConfig.GB28181.Port[0]))
gb28181.SSRCFilters = append(gb28181.SSRCFilters, filter)
}
}

161
stream/forward_sink.go Normal file
View File

@@ -0,0 +1,161 @@
package stream
import (
"github.com/lkmio/avformat/collections"
"github.com/lkmio/lkm/log"
"github.com/lkmio/transport"
"net"
"time"
)
type TransportType int
const (
TransportTypeUDP TransportType = iota
TransportTypeTCPClient
TransportTypeTCPServer
)
func (t TransportType) String() string {
switch t {
case TransportTypeUDP:
return "udp"
case TransportTypeTCPClient:
return "tcp_client"
case TransportTypeTCPServer:
return "tcp_server"
default:
panic("invalid transport type")
}
}
type ForwardSink struct {
BaseSink
socket transport.Transport
transportType TransportType
receiveTimer *time.Timer
}
func (f *ForwardSink) OnConnected(conn net.Conn) []byte {
log.Sugar.Infof("%s 连接 conn: %s", f.Protocol, conn.RemoteAddr())
f.receiveTimer.Stop()
// 如果f.Conn赋值后, 发送数据先于EnableAsyncWriteMode执行, 可能会panic
// 所以保险一点, 放在主协程执行
ExecuteSyncEventOnSource(f.SourceID, func() {
f.Conn = conn
f.BaseSink.EnableAsyncWriteMode(512)
})
return nil
}
func (f *ForwardSink) OnPacket(conn net.Conn, data []byte) []byte {
return nil
}
func (f *ForwardSink) OnDisConnected(conn net.Conn, err error) {
log.Sugar.Infof("%s 断开连接 conn: %s", f.Protocol, conn.RemoteAddr())
f.Close()
}
func (f *ForwardSink) Write(index int, data []*collections.ReferenceCounter[[]byte], ts int64, keyVideo bool) error {
// TCP等待连接后再转发数据
if TransportTypeUDP != f.transportType && f.Conn == nil {
return nil
}
if TransportTypeUDP == f.transportType {
f.socket.(*transport.UDPClient).Write(data[0].Get()[2:])
} else {
return f.BaseSink.Write(index, data, ts, keyVideo)
}
return nil
}
// Close 关闭国标转发流
func (f *ForwardSink) Close() {
f.BaseSink.Close()
if f.socket != nil {
f.socket.Close()
}
if f.receiveTimer != nil {
f.receiveTimer.Stop()
}
}
// StartReceiveTimer 启动tcp sever计时器, 如果计时器触发, 没有连接, 则关闭流
func (f *ForwardSink) StartReceiveTimer() {
f.receiveTimer = time.AfterFunc(time.Second*10, func() {
if f.Conn == nil {
log.Sugar.Infof("%s 等待连接超时, 关闭sink", f.Protocol)
f.Close()
}
})
}
func NewForwardSink(transportType TransportType, protocol TransStreamProtocol, sinkId SinkID, sourceId string, addr string, manager transport.Manager) (*ForwardSink, int, error) {
sink := &ForwardSink{
BaseSink: BaseSink{ID: sinkId, SourceID: sourceId, State: SessionStateCreated, Protocol: protocol},
transportType: transportType,
}
if transportType == TransportTypeUDP {
remoteAddr, err := net.ResolveUDPAddr("udp", addr)
if err != nil {
return nil, 0, err
}
client, err := manager.NewUDPClient(remoteAddr)
if err != nil {
return nil, 0, err
}
sink.socket = client
} else if transportType == TransportTypeTCPClient {
client := transport.TCPClient{}
err := manager.AllocPort(true, func(port uint16) error {
localAddr, err := net.ResolveTCPAddr("tcp", ListenAddr(int(port)))
if err != nil {
return err
}
remoteAddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
return err
}
client.SetHandler(sink)
conn, err := client.Connect(localAddr, remoteAddr)
if err != nil {
return err
}
sink.Conn = conn
return nil
})
if err != nil {
return nil, 0, err
}
sink.socket = &client
} else if transportType == TransportTypeTCPServer {
tcpServer, err := manager.NewTCPServer()
if err != nil {
return nil, 0, err
}
tcpServer.SetHandler(sink)
tcpServer.Accept()
sink.socket = tcpServer
sink.StartReceiveTimer()
}
return sink, sink.socket.ListenPort(), nil
}

56
stream/rtp_stream.go Normal file
View File

@@ -0,0 +1,56 @@
package stream
import (
"encoding/binary"
"github.com/lkmio/avformat"
"github.com/lkmio/avformat/collections"
"github.com/lkmio/lkm/log"
)
type RtpStream struct {
BaseTransStream
rtpBuffers *collections.Queue[*collections.ReferenceCounter[[]byte]]
}
func (f *RtpStream) WriteHeader() error {
return nil
}
func (f *RtpStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
size := 2 + uint16(len(packet.Data))
if size > UDPReceiveBufferSize {
log.Sugar.Errorf("转发%s流失败 rtp包过长, 长度:%d, 最大允许:%d", f.Protocol, len(packet.Data), UDPReceiveBufferSize)
return nil, 0, false, nil
}
// 释放rtp包
for f.rtpBuffers.Size() > 0 {
rtp := f.rtpBuffers.Peek(0)
if rtp.UseCount() > 1 {
break
}
f.rtpBuffers.Pop()
// 放回池中
data := rtp.Get()
UDPReceiveBufferPool.Put(data[:cap(data)])
}
bytes := UDPReceiveBufferPool.Get().([]byte)
binary.BigEndian.PutUint16(bytes, size-2)
copy(bytes[2:], packet.Data)
rtp := collections.NewReferenceCounter(bytes[:size])
f.rtpBuffers.Push(rtp)
// 每帧都当关键帧, 直接发给上级
return []*collections.ReferenceCounter[[]byte]{rtp}, -1, true, nil
}
func NewRtpTransStream(protocol TransStreamProtocol, capacity int) *RtpStream {
return &RtpStream{
BaseTransStream: BaseTransStream{Protocol: protocol},
rtpBuffers: collections.NewQueue[*collections.ReferenceCounter[[]byte]](capacity),
}
}

View File

@@ -206,6 +206,8 @@ func (s *BaseSink) doAsyncWrite() {
duration := time.Now().UnixMilli() - l
if err != nil {
log.Sugar.Errorf(err.Error())
<-s.cancelCtx.Done()
return
}
data.Release()

View File

@@ -53,3 +53,13 @@ func SinkId2String(id SinkID) string {
func CreateSinkDisconnectionMessage(sink Sink) string {
return fmt.Sprintf("%s sink断开连接. id: %s", sink.GetProtocol(), sink.GetID())
}
func ExecuteSyncEventOnSource(sourceId string, event func()) bool {
source := SourceManager.Find(sourceId)
if source != nil {
source.ExecuteSyncEvent(event)
return true
}
return false
}

View File

@@ -63,6 +63,13 @@ func PopWaitingSinks(sourceId string) []Sink {
return sinks
}
func CloseWaitingSinks(sourceId string) {
sinks := PopWaitingSinks(sourceId)
for _, sink := range sinks {
sink.Close()
}
}
func ExistSinkInWaitingQueue(sourceId string, sinkId SinkID) bool {
mutex.RLock()
defer mutex.RUnlock()

View File

@@ -288,7 +288,7 @@ func (s *PublishSource) CreateTransStream(id TransStreamID, protocol TransStream
_ = transStream.WriteHeader()
// 设置转发流
if TransStreamGBStreamForward == transStream.GetProtocol() {
if TransStreamGBCascadedForward == transStream.GetProtocol() {
s.ForwardTransStream = transStream
}
@@ -472,7 +472,7 @@ func (s *PublishSource) doAddSink(sink Sink, resume bool) bool {
}
// 新建传输流,发送已经缓存的音视频帧
if !exist && AppConfig.GOPCache && s.existVideo {
if !exist && AppConfig.GOPCache && s.existVideo && TransStreamGBCascadedForward != transStream.GetProtocol() {
s.DispatchGOPBuffer(transStream)
}
@@ -612,7 +612,7 @@ func (s *PublishSource) DoClose() {
transStreamID := sink.GetTransStreamID()
sink.SetTransStreamID(0)
if s.recordSink == sink {
return
continue
}
{
@@ -828,7 +828,9 @@ func (s *PublishSource) OnPacket(packet *avformat.AVPacket) {
// 分发给各个传输流
for _, transStream := range s.TransStreams {
s.DispatchPacket(transStream, packet)
if TransStreamGBCascadedForward != transStream.GetProtocol() {
s.DispatchPacket(transStream, packet)
}
}
}

View File

@@ -20,16 +20,18 @@ type TransStreamProtocol uint32
type SessionState uint32
const (
SourceTypeRtmp = SourceType(1)
SourceType28181 = SourceType(2)
SourceType1078 = SourceType(3)
SourceTypeRtmp = SourceType(1)
SourceType28181 = SourceType(2)
SourceType1078 = SourceType(3)
SourceTypeGBTalk = SourceType(4) // 国标广播/对讲
TransStreamRtmp = TransStreamProtocol(1)
TransStreamFlv = TransStreamProtocol(2)
TransStreamRtsp = TransStreamProtocol(3)
TransStreamHls = TransStreamProtocol(4)
TransStreamRtc = TransStreamProtocol(5)
TransStreamGBStreamForward = TransStreamProtocol(6) // 国标级联转发
TransStreamRtmp = TransStreamProtocol(1)
TransStreamFlv = TransStreamProtocol(2)
TransStreamRtsp = TransStreamProtocol(3)
TransStreamHls = TransStreamProtocol(4)
TransStreamRtc = TransStreamProtocol(5)
TransStreamGBCascadedForward = TransStreamProtocol(6) // 国标级联转发
TransStreamGBTalkForward = TransStreamProtocol(7) // 国标广播/对讲转发
)
const (
@@ -49,6 +51,8 @@ func (s SourceType) String() string {
return "28181"
} else if SourceType1078 == s {
return "jt1078"
} else if SourceTypeGBTalk == s {
return "gb_talk"
}
panic(fmt.Sprintf("unknown source type %d", s))
@@ -65,8 +69,10 @@ func (p TransStreamProtocol) String() string {
return "hls"
} else if TransStreamRtc == p {
return "rtc"
} else if TransStreamGBStreamForward == p {
return "gb_stream_forward"
} else if TransStreamGBCascadedForward == p {
return "gb_cascaded_forward"
} else if TransStreamGBTalkForward == p {
return "gb_talk_forward"
}
panic(fmt.Sprintf("unknown stream protocol %d", p))

278
web/broadcast.html Normal file
View File

@@ -0,0 +1,278 @@
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8"/>
<meta name="viewport"
content="width=device-width, user-scalable=no, initial-scale=1.0, maximum-scale=1.0, minimum-scale=1.0"/>
<meta name="apple-mobile-web-capable" content="yes"/>
<title>语音广播</title>
</head>
<body>
<button id="intercomBegin">开始广播</button>
<button id="intercomEnd">关闭广播</button>
<input style="width: 100px;" id="device_id" type="text" value="34020000001320000001"/>
<input style="width: 100px;" id="channel_id" type="text" value="34020000001320000001"/>
<button id="invite" onclick="invite()">邀请</button>
<button id="hangup" onclick="hangup()">挂断</button>
</body>
<script src="g711.js"></script>
<script type="text/javascript">
var begin = document.getElementById('intercomBegin');
var end = document.getElementById('intercomEnd');
var ws = null; //实现WebSocket
var record = null; //多媒体对象,用来处理音频
var source = null;
function init(rec) {
record = rec;
}
function invite() {
let deviceId = document.getElementById("device_id").value;
let channelId = document.getElementById("channel_id").value;
let data = {
device_id: deviceId,
channel_id: channelId,
type: 1,
source: source
};
fetch("http://localhost:9000/api/v1/broadcast/invite", {
method: 'POST',
body: JSON.stringify(data),
headers: new Headers({
'Content-Type': 'application/json',
}),
}).then((res) => res.json())
.then((data) => {
})
}
function hangup() {
let deviceId = document.getElementById("device_id").value;
let channelId = document.getElementById("channel_id").value;
let data = {
device_id: deviceId,
channel_id: channelId,
source: source
};
fetch("http://localhost:9000/api/v1/broadcast/hangup", {
method: 'POST',
body: JSON.stringify(data),
headers: new Headers({
'Content-Type': 'application/json',
}),
}).then((res) => res.json())
.then((data) => {
})
}
//录音对象
var Recorder = function (stream) {
var sampleBits = 16; //输出采样数位 8, 16
var sampleRate = 8000; //输出采样率
var context = new AudioContext();
var audioInput = context.createMediaStreamSource(stream);
var recorder = context.createScriptProcessor(4096, 1, 1);
var audioData = {
size: 0, //录音文件长度
buffer: [], //录音缓存
inputSampleRate: 48000, //输入采样率
inputSampleBits: 16, //输入采样数位 8, 16
outputSampleRate: sampleRate, //输出采样数位
oututSampleBits: sampleBits, //输出采样率
clear: function () {
this.buffer = [];
this.size = 0;
},
input: function (data) {
this.buffer.push(new Float32Array(data));
this.size += data.length;
},
compress: function () { //合并压缩
//合并
var data = new Float32Array(this.size);
var offset = 0;
for (var i = 0; i < this.buffer.length; i++) {
data.set(this.buffer[i], offset);
offset += this.buffer[i].length;
}
//压缩
var compression = parseInt(this.inputSampleRate / this.outputSampleRate);
var length = data.length / compression;
var result = new Float32Array(length);
var index = 0,
j = 0;
while (index < length) {
result[index] = data[j];
j += compression;
index++;
}
return result;
},
encodePCM: function () { //这里不对采集到的数据进行其他格式处理,如有需要均交给服务器端处理。
var sampleRate = Math.min(this.inputSampleRate, this.outputSampleRate);
var sampleBits = Math.min(this.inputSampleBits, this.oututSampleBits);
var bytes = this.compress();
var dataLength = bytes.length * (sampleBits / 8);
var buffer = new ArrayBuffer(dataLength);
var data = new DataView(buffer);
var offset = 0;
for (var i = 0; i < bytes.length; i++, offset += 2) {
var s = Math.max(-1, Math.min(1, bytes[i]));
data.setInt16(offset, s < 0 ? s * 0x8000 : s * 0x7FFF, true);
}
return new Blob([data]);
}
};
var sendData = function () { //对以获取的数据进行处理(分包)
var reader = new FileReader();
reader.onload = e => {
var outbuffer = e.target.result;
var arr = new Uint16Array(outbuffer);
var dst = new Int8Array(arr.length);
pcm16_to_alaw(arr.byteLength, arr, dst);
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(dst);
} else {
console.log('WebSocket not ready, state:', ws ? ws.readyState : 'null');
record.stop(); // 停止录音
if (ws) {
ws.close(); // 确保关闭连接
}
}
};
reader.readAsArrayBuffer(audioData.encodePCM());
audioData.clear();//每次发送完成则清理掉旧数据
};
this.start = function () {
audioInput.connect(recorder);
recorder.connect(context.destination);
}
this.stop = function () {
recorder.disconnect();
}
this.getBlob = function () {
return audioData.encodePCM();
}
this.clear = function () {
audioData.clear();
}
recorder.onaudioprocess = function (e) {
var inputBuffer = e.inputBuffer.getChannelData(0);
audioData.input(inputBuffer);
sendData();
}
}
function generateRandomAlphanumeric10() {
const chars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789';
let result = '';
for (let i = 0; i < 20; i++) {
result += chars.charAt(Math.floor(Math.random() * chars.length));
}
return result;
}
/*
* WebSocket
*/
function connectWS() {
let secure = window.location.protocol === 'https:'
//source = generateRandomAlphanumeric10();
source = "unique_id";
let url = (secure ? "wss:" : "ws:") + "/" + window.location.host + "/ws/v1/gb28181/talk" + "?source=" + source
ws = new WebSocket(url);
ws.binaryType = 'arraybuffer'; //传输的是 ArrayBuffer 类型的数据
ws.onopen = function () {
console.log('ws连接成功');
record.start();
};
ws.onmessage = function (msg) {
}
ws.onerror = function (err) {
console.log('ws连接断开');
console.info(err)
record.stop()
}
}
/*
* 开始对讲
*/
begin.onclick = function () {
navigator.getUserMedia = navigator.getUserMedia || navigator.webkitGetUserMedia;
if (!navigator.getUserMedia) {
alert('浏览器不支持音频输入');
} else {
navigator.getUserMedia({
audio: true
},
//获取到音频采集权限回调
function (mediaStream) {
console.log('开始对讲');
//初始化采集器
init(new Recorder(mediaStream));
//连接websocket
connectWS();
},
function (error) {
console.log(error);
switch (error.message || error.name) {
case 'PERMISSION_DENIED':
case 'PermissionDeniedError':
console.info('用户拒绝提供信息。');
break;
case 'NOT_SUPPORTED_ERROR':
case 'NotSupportedError':
console.info('浏览器不支持硬件设备。');
break;
case 'MANDATORY_UNSATISFIED_ERROR':
case 'MandatoryUnsatisfiedError':
console.info('无法发现指定的硬件设备。');
break;
default:
console.info('无法打开麦克风。异常信息:' + (error.code || error.name));
break;
}
}
)
}
}
/*
* 关闭对讲
*/
end.onclick = function () {
if (ws) {
ws.close();
record.stop();
console.log('关闭对讲以及WebSocket');
}
}
</script>
</html>

341
web/g711.js Normal file
View File

@@ -0,0 +1,341 @@
var SIGN_BIT = 0x80; /* Sign bit for a A-law byte. */
var QUANT_MASK = 0xf; /* Quantization field mask. */
var NSEGS = 0x8; /* Number of A-law segments. */
var SEG_SHIFT = 0x4; /* Left shift for segment number. */
var SEG_MASK = 0x70; /* Segment field mask. */
var seg_aend = new Int16Array([0x1F, 0x3F, 0x7F, 0xFF,0x1FF, 0x3FF, 0x7FF, 0xFFF]);
var seg_uend = new Int16Array([0x3F, 0x7F, 0xFF, 0x1FF,0x3FF, 0x7FF, 0xFFF, 0x1FFF]);
/* copy from CCITT G.711 specifications */
var _u2a = new Uint8Array([1, 1, 2, 2, 3, 3, 4, 4,
5, 5, 6, 6, 7, 7, 8, 8,
9, 10, 11, 12, 13, 14, 15, 16,
17, 18, 19, 20, 21, 22, 23, 24,
25, 27, 29, 31, 33, 34, 35, 36,
37, 38, 39, 40, 41, 42, 43, 44,
46, 48, 49, 50, 51, 52, 53, 54,
55, 56, 57, 58, 59, 60, 61, 62,
64, 65, 66, 67, 68, 69, 70, 71,
72, 73, 74, 75, 76, 77, 78, 79,
80, 82, 83, 84, 85, 86, 87, 88,
89, 90, 91, 92, 93, 94, 95, 96,
97, 98, 99, 100, 101, 102, 103, 104,
105, 106, 107, 108, 109, 110, 111, 112,
113, 114, 115, 116, 117, 118, 119, 120,
121, 122, 123, 124, 125, 126, 127, 128]);
/* A- to u-law conversions */
var _a2u = new Uint8Array([1, 3, 5, 7, 9, 11, 13, 15,
16, 17, 18, 19, 20, 21, 22, 23,
24, 25, 26, 27, 28, 29, 30, 31,
32, 32, 33, 33, 34, 34, 35, 35,
36, 37, 38, 39, 40, 41, 42, 43,
44, 45, 46, 47, 48, 48, 49, 49,
50, 51, 52, 53, 54, 55, 56, 57,
58, 59, 60, 61, 62, 63, 64, 64,
65, 66, 67, 68, 69, 70, 71, 72,
73, 74, 75, 76, 77, 78, 79, 80,
80, 81, 82, 83, 84, 85, 86, 87,
88, 89, 90, 91, 92, 93, 94, 95,
96, 97, 98, 99, 100, 101, 102, 103,
104, 105, 106, 107, 108, 109, 110, 111,
112, 113, 114, 115, 116, 117, 118, 119,
120, 121, 122, 123, 124, 125, 126, 127]);
function search( val, table, size)
{
var i;
for (i = 0; i < size; i++) {
if (val <= table[i])
return (i);
}
return (size);
}
function linear2alaw(pcm_val) /* 2's complement (16-bit range) */
{
var mask;
var seg;
var aval;
pcm_val = pcm_val >> 3;
if (pcm_val >= 0) {
mask = 0xD5; /* sign (7th) bit = 1 */
} else {
mask = 0x55; /* sign bit = 0 */
pcm_val = -pcm_val - 1;
}
/* Convert the scaled magnitude to segment number. */
seg = search(pcm_val, seg_aend, 8);
/* Combine the sign, segment, and quantization bits. */
if (seg >= 8) /* out of range, return maximum value. */
return (0x7F ^ mask);
else {
aval = seg << SEG_SHIFT;
if (seg < 2)
aval |= (pcm_val >> 1) & QUANT_MASK;
else
aval |= (pcm_val >> seg) & QUANT_MASK;
return (aval ^ mask);
}
}
/*
* alaw2linear() - Convert an A-law value to 16-bit linear PCM
*
*/
function alaw2linear(a_val){
var t;
var seg;
a_val ^= 0x55;
t = (a_val & QUANT_MASK) << 4;
seg = (a_val & SEG_MASK) >> SEG_SHIFT;
switch (seg) {
case 0:
t += 8;
break;
case 1:
t += 0x108;
break;
default:
t += 0x108;
t <<= seg - 1;
}
return ((a_val & SIGN_BIT) ? t : -t);
}
// #define BIAS (0x84) /* Bias for linear code. */
// #define CLIP 8159
var BIAS = 0x84;
var CLIP = 8159;
// /*
// * linear2ulaw() - Convert a linear PCM value to u-law
// *
// * In order to simplify the encoding process, the original linear magnitude
// * is biased by adding 33 which shifts the encoding range from (0 - 8158) to
// * (33 - 8191). The result can be seen in the following encoding table:
// *
// * Biased Linear Input Code Compressed Code
// * ------------------------ ---------------
// * 00000001wxyza 000wxyz
// * 0000001wxyzab 001wxyz
// * 000001wxyzabc 010wxyz
// * 00001wxyzabcd 011wxyz
// * 0001wxyzabcde 100wxyz
// * 001wxyzabcdef 101wxyz
// * 01wxyzabcdefg 110wxyz
// * 1wxyzabcdefgh 111wxyz
// *
// * Each biased linear code has a leading 1 which identifies the segment
// * number. The value of the segment number is equal to 7 minus the number
// * of leading 0's. The quantization interval is directly available as the
// * four bits wxyz. * The trailing bits (a - h) are ignored.
// *
// * Ordinarily the complement of the resulting code word is used for
// * transmission, and so the code word is complemented before it is returned.
// *
// * For further information see John C. Bellamy's Digital Telephony, 1982,
// * John Wiley & Sons, pps 98-111 and 472-476.
function linear2ulaw(pcm_val) /* 2's complement (16-bit range) */
{
var mask;
var seg;
var uval;
/* Get the sign and the magnitude of the value. */
pcm_val = pcm_val >> 2;
if (pcm_val < 0) {
pcm_val = -pcm_val;
mask = 0x7F;
} else {
mask = 0xFF;
}
if ( pcm_val > CLIP ) pcm_val = CLIP; /* clip the magnitude */
pcm_val += (BIAS >> 2);
/* Convert the scaled magnitude to segment number. */
seg = search(pcm_val, seg_uend, 8);
/*
* Combine the sign, segment, quantization bits;
* and complement the code word.
*/
if (seg >= 8) /* out of range, return maximum value. */
return (0x7F ^ mask);
else {
uval = (seg << 4) | ((pcm_val >> (seg + 1)) & 0xF);
return (uval ^ mask);
}
}
// /*
// * ulaw2linear() - Convert a u-law value to 16-bit linear PCM
// *
// * First, a biased linear code is derived from the code word. An unbiased
// * output can then be obtained by subtracting 33 from the biased code.
// *
// * Note that this function expects to be passed the complement of the
// * original code word. This is in keeping with ISDN conventions.
// */
function ulaw2linear(u_val)
{
var t;
/* Complement to obtain normal u-law value. */
u_val = ~u_val;
/*
* Extract and bias the quantization bits. Then
* shift up by the segment number and subtract out the bias.
*/
t = ((u_val & QUANT_MASK) << 3) + BIAS;
t <<= (u_val & SEG_MASK) >> SEG_SHIFT;
return ((u_val & SIGN_BIT) ? (BIAS - t) : (t - BIAS));
}
// /* A-law to u-law conversion */
function alaw2ulaw(aval)
{
aval &= 0xff;
return ((aval & 0x80) ? (0xFF ^ _a2u[aval ^ 0xD5]) :
(0x7F ^ _a2u[aval ^ 0x55]));
}
/* u-law to A-law conversion */
function ulaw2alaw(uval)
{
uval &= 0xff;
return ((uval & 0x80) ? (0xD5 ^ (_u2a[0xFF ^ uval] - 1)) :
(0x55 ^ (_u2a[0x7F ^ uval] - 1)));
}
// unsigned char linear_to_alaw[65536];
// unsigned char linear_to_ulaw[65536];
var short_index = new Int16Array(65536);
var linear_to_alaw = new Uint8Array(65536);
var linear_to_ulaw = new Uint8Array(65536);
// /* 16384 entries per table (8 bit) */
// unsigned short alaw_to_linear[256];
// unsigned short ulaw_to_linear[256];
var alaw_to_linear = new Uint8Array(256);
var ulaw_to_linear = new Uint8Array(256);
function build_linear_to_xlaw_table(linear_to_xlaw,linear2xlaw)
{
var i;
for (i=0; i<65536;i++){
var v = linear2xlaw(short_index[i]);
linear_to_xlaw[i] = v;
}
}
function build_xlaw_to_linear_table(xlaw_to_linear,xlaw2linear)
{
var i;
for (i=0; i<256;i++){
xlaw_to_linear[i] = xlaw2linear(i);
}
}
function pcm16_to_xlaw(linear_to_xlaw, src_length,src_samples,dst_samples)
{
var i;
var s_samples;
s_samples = src_samples;
for (i=0; i < src_length / 2; i++)
{
dst_samples[i] = linear_to_xlaw[s_samples[i]];
}
}
function xlaw_to_pcm16(xlaw_to_linear, src_length,src_samples, dst_samples)
{
var i;
var s_samples;
var d_samples;
s_samples = src_samples;
d_samples = dst_samples;
for (i=0; i < src_length; i++)
{
d_samples[i] = xlaw_to_linear[s_samples[i]];
}
}
function pcm16_to_alaw(src_length, src_samples, dst_samples)
{
pcm16_to_xlaw(linear_to_alaw, src_length, src_samples, dst_samples);
}
function pcm16_to_ulaw(src_length, src_samples, dst_samples)
{
pcm16_to_xlaw(linear_to_ulaw, src_length, src_samples, dst_samples);
}
function alaw_to_pcm16(src_length, src_samples, dst_samples)
{
xlaw_to_pcm16(alaw_to_linear, src_length, src_samples, dst_samples);
}
function ulaw_to_pcm16(src_length, src_samples, dst_samples)
{
xlaw_to_pcm16(ulaw_to_linear, src_length, src_samples, dst_samples);
}
function pcm16_alaw_tableinit()
{
build_linear_to_xlaw_table(linear_to_alaw, linear2alaw);
}
function pcm16_ulaw_tableinit()
{
build_linear_to_xlaw_table(linear_to_ulaw, linear2ulaw);
}
function alaw_pcm16_tableinit()
{
build_xlaw_to_linear_table(alaw_to_linear, alaw2linear);
}
function ulaw_pcm16_tableinit()
{
build_xlaw_to_linear_table(ulaw_to_linear, ulaw2linear);
}
for(var i =0; i < 65536;i++){
short_index[i] = i;
}
pcm16_alaw_tableinit();
pcm16_ulaw_tableinit();
alaw_pcm16_tableinit();
ulaw_pcm16_tableinit();