remove wsp service

This commit is contained in:
notch
2020-12-28 13:42:39 +08:00
parent 63ec15051b
commit b1edf0a3e0
10 changed files with 0 additions and 10591 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -1,251 +0,0 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>RTSP player example(based streamedian)</title>
<link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.6/css/bootstrap.min.css">
<link rel="stylesheet" href="style.css">
</head>
<body>
<div id="sourcesNode"></div>
<div>
<input id="stream_url" size="80" value="rtsp://localhost:1554/test/live1">
<button id="set_new_url">Set</button>
</div>
<div>
<p style="color:#808080">Enter your rtsp link to the stream, for example: "rtsp://localhost:1554/test/live1"</p>
<p style="color:#808080">If need token,for example: "rtsp://localhost:1554/test/live1?token=4df8f5d5d680385cb07c2e354dd0f3f3"</p>
</div>
<div>
<input id="buffer_duration" type="range" min="10" max="200" style="width:40%;">
<span id="buffer_value">120sec.</span>
</div>
<div>
<p style="color:#808080">Change buffer duration</p>
</div>
<video id="test_video" controls autoplay>
<!--<source src="rtsp://192.168.10.205:554/ch01.264" type="application/x-rtsp">-->
<!--<source src="rtsp://wowzaec2demo.streamlock.net/vod/mp4:BigBuckBunny_115k.mov" type="application/x-rtsp">-->
</video>
<div class="controls form">
<div>
Playback rate:&nbsp;
<input id="rate" class="input" type="range" min="0.5" max="5.0" value="1.0" step="0.5">
<output for="rate" id="rate_res">live</output>
</div>
<div>
<button id="to_end" class="btn btn-success">live</button>
</div>
</div>
<p>View HTML5 RTSP video player log</p>
<div id="pllogs" class="logs"></div>
<button class="btn btn-success" onclick="cleanLog(pllogs)">clear</button>
<button class="btn btn-success" onclick="scrollset(pllogs, true)">scroll up</button>
<button class="btn btn-success" onclick="scrollset(pllogs, false)">scroll down</button>
<button id="scrollSetPl" class="btn btn-success" onclick="scrollswitch(pllogs)">Scroll off</button>
<br/><br/>
<script src="free.player.1.8.js"></script> <!-- Path to player js-->
<script>
var scrollStatPl = true;
var scrollStatWs = true;
var pllogs = document.getElementById("pllogs");
var wslogs = document.getElementById("wslogs");
// define a new console
var console=(function(oldConsole){
return {
log: function(){
oldConsole.log(newConsole(arguments, "black", "#A9F5A9"));
},
info: function () {
oldConsole.info(newConsole(arguments, "black", "#A9F5A9"));
},
warn: function () {
oldConsole.warn(newConsole(arguments, "black", "#F3F781"));
},
error: function () {
oldConsole.error(newConsole(arguments, "black", "#F5A9A9"));
}
};
}(window.console));
function newConsole(args, textColor, backColor){
let text = '';
let node = document.createElement("div");
for (let arg in args){
text +=' ' + args[arg];
}
node.appendChild(document.createTextNode(text));
node.style.color = textColor;
node.style.backgroundColor = backColor;
pllogs.appendChild(node);
autoscroll(pllogs);
return text;
}
//Then redefine the old console
window.console = console;
function cleanLog(element){
while (element.firstChild) {
element.removeChild(element.firstChild);
}
}
function autoscroll(element){
if(scrollStatus(element)){
element.scrollTop = element.scrollHeight;
}
if(element.childElementCount > 1000){
element.removeChild(element.firstChild);
}
}
function scrollset(element, state){
if(state){
element.scrollTop = 0;
scrollChange(element, false);
} else {
element.scrollTop = element.scrollHeight;
scrollChange(element, true);
}
}
function scrollswitch(element){
if(scrollStatus(element)){
scrollChange(element, false);
} else {
scrollChange(element, true);
}
}
function scrollChange(element, status){
if(scrollStatus(element)){
scrollStatPl = false;
document.getElementById("scrollSetPl").innerText = "Scroll on";
} else {
scrollStatPl = true;
document.getElementById("scrollSetPl").innerText = "Scroll off";
}
}
function scrollStatus(element){
if(element.id === "pllogs"){
return scrollStatPl;
} else {
return scrollStatWs;
}
}
</script>
<script>
if (window.Streamedian) {
let errHandler = function(err){
alert(err.message);
};
let infHandler = function(inf) {
let sourcesNode = document.getElementById("sourcesNode");
let clients = inf.clients;
sourcesNode.innerHTML = "";
for (let client in clients) {
clients[client].forEach((sources) => {
let nodeButton = document.createElement("button");
nodeButton.setAttribute('data', sources.url + ' ' + client);
nodeButton.appendChild(document.createTextNode(sources.description));
nodeButton.onclick = (event)=> {
setPlayerSource(event.target.getAttribute('data'));
};
sourcesNode.appendChild(nodeButton);
});
}
};
var playerOptions = {
socket: "ws://localhost:1554/ws/test/live1", redirectNativeMediaErrors : true,
bufferDuration: 30,
errorHandler: errHandler,
infoHandler: infHandler
};
var html5Player = document.getElementById("test_video");
var urlButton = document.getElementById("set_new_url");
var urlEdit = document.getElementById("stream_url");
var bufferRange = document.getElementById("buffer_duration");
var bufferValue = document.getElementById("buffer_value");
var player = Streamedian.player('test_video', playerOptions);
var nativePlayer = document.getElementById('test_video');
var range = document.getElementById('rate');
var set_live = document.getElementById('to_end');
var range_out = document.getElementById('rate_res');
range.addEventListener('input', function () {
nativePlayer.playbackRate = range.value;
range_out.innerHTML = `x${range.value}`;
});
set_live.addEventListener('click', function () {
range.value = 1.0;
range_out.innerHTML = `live`;
nativePlayer.playbackRate = 1;
nativePlayer.currentTime = nativePlayer.buffered.end(0);
});
var updateRangeControls = function(){
bufferRange.value = player.bufferDuration;
bufferValue.innerHTML = bufferRange.value + "sec.";
};
bufferRange.addEventListener('input', function(){
var iValue = parseInt(this.value, 10);
player.bufferDuration = iValue;
bufferValue.innerHTML = this.value + "sec.";
});
bufferRange.innerHTML = player.bufferDuration + "sec.";
updateRangeControls();
urlButton.onclick = ()=> {
setPlayerSource(urlEdit.value);
};
function setPlayerSource(newSource) {
player.destroy();
player = null;
html5Player.src = newSource;
// 修改原例子 begn =======>
// 我们直接使用ws来决定播放的路径
// 比如ws://192.168.1.100:1554/ws/test/live1
// 表示要播放服务器上路径为/test/live1
// 如果播放失败,可能是以下情况(1和2发生在升级websocket阶段3发生在rtsp通讯阶段)
// 1. 如果服务器rtsp的验证模式不为NONE则需要登录后获取到token才能访问像这样ws://.../test/live1?token=...
// 2. 可能没有权限,需要联系人对你登录的用户授权
// 3. 可能找不到流媒体
//
// 如果不是使用例子,我们可以这样
// html5Player.src = "rtsp://placehold"
// playerOptions.socket ="ws://localhost:1554/ws/test/live1"
// 知道ws主机后实际上只需要提供流媒体path即可这也更好
//
let rtspUrl = new URL(newSource)
rtspUrl.protocol = "ws"
rtspUrl.pathname = "/ws"+rtspUrl.pathname
playerOptions.socket = rtspUrl.href
// <========= end
player = Streamedian.player("test_video", playerOptions);
updateRangeControls();
}
}
</script>
</body>
</html>

View File

@@ -1,28 +0,0 @@
body {
max-width: 720px;
margin: 50px auto;
}
#test_video {
width: 720px;
}
.controls {
display: flex;
justify-content: space-around;
align-items: center;
}
input.input, .form-inline .input-group>.form-control {
width: 300px;
}
.logs {
overflow: auto;
width: 720px;
height: 150px;
padding: 5px;
border-top: solid 1px gray;
border-bottom: solid 1px gray;
}
button {
margin: 5px
}

View File

@@ -22,7 +22,6 @@ import (
"github.com/cnotch/ipchub/provider/auth"
"github.com/cnotch/ipchub/provider/route"
"github.com/cnotch/ipchub/service/rtsp"
"github.com/cnotch/ipchub/service/wsp"
"github.com/cnotch/scheduler"
"github.com/cnotch/xlog"
"github.com/emitter-io/address"
@@ -37,7 +36,6 @@ type Service struct {
tlsusing bool
http *http.Server
rtsp *tcp.Server
wsp *tcp.Server
tokens *auth.TokenManager
}
@@ -50,7 +48,6 @@ func NewService(ctx context.Context, l *xlog.Logger) (s *Service, err error) {
logger: l,
http: new(http.Server),
rtsp: new(tcp.Server),
wsp: new(tcp.Server),
tokens: new(auth.TokenManager),
}
@@ -82,9 +79,6 @@ func NewService(ctx context.Context, l *xlog.Logger) (s *Service, err error) {
// 设置 rtsp AcceptHandler
s.rtsp.OnAccept = rtsp.CreateAcceptHandler()
// 设置 wsp AcceptHandler
s.wsp.OnAccept = wsp.CreateAcceptHandler()
// 启动定时存储拉流信息
scheduler.PeriodFunc(time.Minute*5, time.Minute*5, func() {
route.Flush()

View File

@@ -39,12 +39,6 @@ func (s *Service) onWebSocketRequest(w http.ResponseWriter, r *http.Request) {
return
}
if ws.Subprotocol() == "control" || ws.Subprotocol() == "data" {
// 代理访问
s.wsp.OnAccept(ws)
return
}
if ext == ".flv" {
go flv.ConsumeByWebsocket(s.logger, streamPath, r.RemoteAddr, ws)
return

View File

@@ -1,170 +0,0 @@
// Copyright (c) 2019,CAOHONGJU All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package wsp
import (
"bytes"
"fmt"
"io"
"strings"
"sync"
"unicode"
"github.com/cnotch/ipchub/utils/scan"
"github.com/cnotch/xlog"
)
const (
wspProto = "WSP/1.1" // WSP协议版本
prefixBody = "\r\n\r\n" // Header和Body分割符
)
// WSP 协议命令
const (
CmdInit = "INIT" // 初始化建立通道
CmdJoin = "JOIN" // 数据通道使用
CmdWrap = "WRAP" // 包装其他协议的命令
CmdGetInfo = "GET_INFO" // 获取客户及license信息
)
// WSP 协议字段
const (
FieldProto = "proto" // 初始化的协议 如rtsp
FieldSeq = "seq" // 命令序列
FieldHost = "host" // 需要代理服务访问的远端host
FieldPort = "port" // 需要代理服务访问的远端port
FieldClient = "client" // 客户信息
FieldChannel = "channel" // 数据通道编号相当于一个session
FieldSocket = "socket" // 代替上面的host和port
)
type badStringError struct {
what string
str string
}
func (e *badStringError) Error() string { return fmt.Sprintf("%s %q", e.what, e.str) }
// Request WSP 协议请求
type Request struct {
Cmd string
Header map[string]string
Body string
}
var (
spacePair = scan.NewPair(' ',
func(r rune) bool {
return unicode.IsSpace(r)
})
validCmds = map[string]bool{
CmdGetInfo: true,
CmdInit: true,
CmdJoin: true,
CmdWrap: true,
}
bspool = &sync.Pool{
New: func() interface{} {
return make([]byte, 8*1024)
},
}
buffers = sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(make([]byte, 0, 1024*2))
},
}
)
// DecodeStringRequest 解码字串请求
func DecodeStringRequest(input string) (*Request, error) {
index := strings.Index(input, prefixBody)
if index < 0 {
return nil, &badStringError{"malformed WSP request,missing '\\r\\n\\r\\n'", input}
}
req := &Request{
Body: input[index+4:],
Header: make(map[string]string, 4),
}
scanner := scan.Line
// 先取首行
tailing, substr, ok := scanner.Scan(input[:index])
if !ok {
return nil, &badStringError{"malformed WSP request first line", substr}
}
proto, cmd, ok := spacePair.Scan(substr)
if proto != wspProto {
return nil, &badStringError{"malformed WSP request proto ", proto}
}
if _, ok := validCmds[cmd]; !ok {
return nil, &badStringError{"malformed WSP request command ", cmd}
}
req.Cmd = cmd
// 循环取header
for ok {
tailing, substr, ok = scanner.Scan(tailing)
k, v, found := scan.ColonPair.Scan(substr)
if found {
req.Header[k] = v
}
}
return req, nil
}
// DecodeRequest 解码请求
func DecodeRequest(r io.Reader, logger *xlog.Logger) (*Request, error) {
buf := bspool.Get().([]byte)
defer bspool.Put(buf)
n, err := r.Read(buf)
if n == 0 && err == nil { // 上一个报文结束,再读一次
n, err = r.Read(buf)
}
if err != nil {
return nil, err
}
input := string(buf[:n])
logger.Debugf("wsp <<<=== \r\n%s", input)
return DecodeStringRequest(input)
}
// IsWrap 是否是包装协议如果是可以从Body提取被包装的协议
func (req *Request) IsWrap() bool {
return req.Cmd == CmdWrap
}
// ResponseOK 响应请求成功
func (req *Request) ResponseOK(buf *bytes.Buffer, header map[string]string, payload string) {
req.ResponseTo(buf, 200, "OK", header, payload)
}
// ResponseTo 响应请求到buf
func (req *Request) ResponseTo(buf *bytes.Buffer, statusCode int, statusText string, header map[string]string, payload string) {
// 写首行
buf.WriteString(fmt.Sprintf("%s %d %s\r\n", wspProto, statusCode, statusText))
// 写头
header[FieldSeq] = req.Header[FieldSeq]
for k, v := range header {
buf.WriteString(k)
buf.WriteString(": ")
buf.WriteString(v)
buf.WriteString("\r\n")
}
// 写header和body分割
buf.WriteString("\r\n")
// 写payload
if len(payload) > 0 {
buf.WriteString(payload)
}
}

View File

@@ -1,69 +0,0 @@
// Copyright (c) 2019,CAOHONGJU All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package wsp
import (
"bytes"
"testing"
"github.com/stretchr/testify/assert"
"github.com/cnotch/xlog"
)
func TestDecodeRequest(t *testing.T) {
var reqStr = "WSP/1.1 GET_INFO\r\nproto: rtsp\r\nhost: 192.168.1.1\r\nport: 554\r\nclient: \r\nseq: 1\r\n\r\n"
t.Run("decode", func(t *testing.T) {
r := bytes.NewBufferString(reqStr)
got, err := DecodeRequest(r, xlog.L())
if err != nil {
t.Errorf("DecodeRequest() error = %v", err)
return
}
assert.Equal(t, CmdGetInfo, got.Cmd)
assert.Equal(t, "1", got.Header[FieldSeq])
})
}
func TestRequest_ResponseOK(t *testing.T) {
respStr1 := "WSP/1.1 200 OK\r\nchannel: 334\r\nseq: 1\r\n\r\n"
respStr2 := "WSP/1.1 404 NOT FOUND\r\nchannel: 334\r\nseq: 1\r\n\r\n123"
t.Run("no payload", func(t *testing.T) {
req := &Request{
Header: make(map[string]string),
}
req.Header[FieldSeq] = "1"
buf := &bytes.Buffer{}
header := make(map[string]string)
header[FieldChannel] = "334"
req.ResponseOK(buf, header, "")
resp := buf.String()
assert.Equal(t, respStr1, resp)
})
t.Run("payload", func(t *testing.T) {
req := &Request{
Header: make(map[string]string),
}
req.Header[FieldSeq] = "1"
buf := &bytes.Buffer{}
header := make(map[string]string)
header[FieldChannel] = "334"
req.ResponseTo(buf, 404, "NOT FOUND", header, "123")
resp := buf.String()
assert.Equal(t, respStr2, resp)
})
}
func Benchmark_DecodeRequest(b *testing.B) {
var reqStr = "WSP/1.1 GET_INFO\r\nproto: rtsp\r\nhost: 192.168.1.1\r\nport: 554\r\nclient: \r\nseq: 1\r\n\r\n"
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
got, _ := DecodeStringRequest(reqStr)
_ = got
}
})
}

View File

@@ -1,461 +0,0 @@
// Copyright (c) 2019,CAOHONGJU All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package wsp
import (
"bufio"
"bytes"
"fmt"
"io"
"net/url"
"runtime/debug"
"strings"
"sync"
"time"
"github.com/cnotch/ipchub/config"
"github.com/cnotch/ipchub/media"
"github.com/cnotch/ipchub/network/websocket"
"github.com/cnotch/ipchub/provider/security"
"github.com/cnotch/ipchub/service/rtsp"
"github.com/cnotch/ipchub/stats"
"github.com/cnotch/xlog"
"github.com/pixelbender/go-sdp/sdp"
)
const (
statusInit = iota
statusReady
statusPlaying
)
const (
rtspURLPrefix = "rtsp://" // RTSP地址前缀
)
// Session RTSP 会话
type Session struct {
// 创建时设置
svr *Server
channelID string
logger *xlog.Logger
closed bool
paused bool
lsession string // 本地会话标识
timeout time.Duration
conn websocket.Conn
lockW sync.Mutex
dataChannel websocket.Conn
// DESCRIBE 后设置
url *url.URL
path string
rawSdp string
sdp *sdp.Session
aControl string
vControl string
aCodec string
vCodec string
// Setup 后设置
transport rtsp.RTPTransport
// 启动流媒体传输后设置
status int // session状态
source *media.Stream
cid *media.CID
}
func newSession(svr *Server, conn websocket.Conn, channelID string) *Session {
session := &Session{
svr: svr,
channelID: channelID,
lsession: security.NewID().Base64(),
timeout: config.NetTimeout() * time.Duration(2),
conn: conn,
transport: rtsp.RTPTransport{
Mode: rtsp.PlaySession, // 默认为播放
Type: rtsp.RTPUnknownTrans,
},
status: statusInit,
}
for i := 0; i < 4; i++ {
session.transport.Channels[i] = -1
session.transport.ClientPorts[i] = -1
}
session.logger = svr.logger.With(xlog.Fields(
xlog.F("channel", channelID),
xlog.F("path", conn.Path())))
return session
}
// 设置rtp数据通道
func (s *Session) setDataChannel(dc websocket.Conn) {
s.lockW.Lock()
s.dataChannel = dc
s.lockW.Unlock()
}
// Addr Session地址
func (s *Session) Addr() string {
return s.conn.RemoteAddr().String()
}
// Consume 消费媒体包
func (s *Session) Consume(p Pack) {
if s.closed || s.paused {
return
}
buf := buffers.Get().(*bytes.Buffer)
buf.Reset()
defer buffers.Put(buf)
p2 := p.(*rtsp.RTPPack)
p2.Write(buf, s.transport.Channels[:])
var err error
s.lockW.Lock()
if s.dataChannel != nil {
_, err = s.dataChannel.Write(buf.Bytes())
}
s.lockW.Unlock()
if err != nil {
s.logger.Errorf("send pack error = %v , close socket", err)
s.Close()
return
}
}
// Close 关闭会话
func (s *Session) Close() error {
if s.closed {
return nil
}
s.closed = true
s.paused = false
s.conn.Close()
s.lockW.Lock()
if s.dataChannel != nil {
s.dataChannel.Close()
}
s.lockW.Unlock()
return nil
}
func (s *Session) process() {
var err error
defer func() {
if r := recover(); r != nil {
s.logger.Errorf("wsp channel panic, %v \n %s", r, debug.Stack())
}
if err != nil {
if err == io.EOF { // 如果客户端断开提醒
s.logger.Warn("websocket disconnect actively")
} else if !s.closed { // 如果主动关闭,不提示
s.logger.Error(err.Error())
}
}
// 删除通道
s.svr.sessions.Delete(s.channelID)
// 停止消费
if s.cid != nil {
s.source.StopConsume(*s.cid)
s.cid = nil
s.source = nil
}
// 关闭连接
s.Close()
// 重置到初始状态
s.conn = nil
s.dataChannel = nil
s.status = statusInit
stats.WspConns.Release()
s.logger.Info("close wsp channel")
}()
s.logger.Info("open wsp channel")
stats.WspConns.Add() // 增加一个 RTSP 连接计数
for !s.closed {
deadLine := time.Time{}
if s.timeout > 0 {
deadLine = time.Now().Add(s.timeout)
}
if err = s.conn.SetReadDeadline(deadLine); err != nil {
break
}
var req *Request
req, err = DecodeRequest(s.conn, s.logger)
if err != nil {
break
}
if req.Cmd != CmdWrap {
s.logger.Error("must is WRAP command request")
break
}
// 从包装命令中提取 rtsp 请求
var rtspReq *rtsp.Request
rtspReq, err = rtsp.ReadRequest(bufio.NewReader(bytes.NewBufferString(req.Body)))
if err != nil {
break
}
// 处理请求,并获得响应
rtspResp := s.onRequest(rtspReq)
// 发送响应
buf := buffers.Get().(*bytes.Buffer)
buf.Reset()
defer buffers.Put(buf)
req.ResponseOK(buf, map[string]string{FieldChannel: s.channelID}, "")
rtspResp.Write(buf)
_, err = s.conn.Write(buf.Bytes())
if err != nil {
break
}
s.logger.Debugf("wsp ===>>>\r\n%s", buf.String())
// 关闭通道
if rtspReq.Method == rtsp.MethodTeardown {
break
}
}
}
func (s *Session) onRequest(req *rtsp.Request) *rtsp.Response {
resp := s.newResponse(rtsp.StatusOK, req)
// 预处理
continueProcess := s.onPreprocess(resp, req)
if !continueProcess {
return resp
}
switch req.Method {
case rtsp.MethodDescribe:
s.onDescribe(resp, req)
case rtsp.MethodSetup:
s.onSetup(resp, req)
case rtsp.MethodPlay:
s.onPlay(resp, req)
case rtsp.MethodPause:
s.onPause(resp, req)
default:
// 状态不支持的方法
resp.StatusCode = rtsp.StatusMethodNotValidInThisState
}
return resp
}
func (s *Session) onDescribe(resp *rtsp.Response, req *rtsp.Request) {
// TODO: 检查 accept 中的类型是否包含 sdp
s.url = req.URL
s.path = s.conn.Path() // 使用websocket路径
// s.path = utils.CanonicalPath(req.URL.Path)
stream := media.GetOrCreate(s.path)
if stream == nil {
resp.StatusCode = rtsp.StatusNotFound
return
}
// 从流中取 sdp
sdpRaw := stream.Sdp()
if len(sdpRaw) == 0 {
resp.StatusCode = rtsp.StatusNotFound
return
}
err := s.parseSdp(sdpRaw)
if err != nil { // TODO需要更好的处理方式
resp.StatusCode = rtsp.StatusNotFound
return
}
resp.Header.Set(rtsp.FieldContentType, "application/sdp")
resp.Body = s.rawSdp
}
func (s *Session) onSetup(resp *rtsp.Response, req *rtsp.Request) {
// a=control:streamid=1
// a=control:rtsp://192.168.1.165/trackID=1
// a=control:?ctype=video
setupURL := &url.URL{}
*setupURL = *req.URL
if setupURL.Port() == "" {
setupURL.Host = fmt.Sprintf("%s:554", setupURL.Host)
}
setupPath := setupURL.String()
//setupPath = setupPath[strings.LastIndex(setupPath, "/")+1:]
vPath := getControlPath(s.vControl)
if vPath == "" {
resp.StatusCode = rtsp.StatusInternalServerError
resp.Status = "Invalid VControl"
return
}
aPath := getControlPath(s.aControl)
ts := req.Header.Get(rtsp.FieldTransport)
resp.Header.Set(rtsp.FieldTransport, ts) // 先回写transport
// 检查控制路径
chindex := -1
if setupPath == aPath || (aPath != "" && strings.LastIndex(setupPath, aPath) == len(setupPath)-len(aPath)) {
chindex = int(rtsp.ChannelAudio)
} else if setupPath == vPath || (vPath != "" && strings.LastIndex(setupPath, vPath) == len(setupPath)-len(vPath)) {
chindex = int(rtsp.ChannelVideo)
} else { // 找不到被 Setup 的资源
resp.StatusCode = rtsp.StatusInternalServerError
resp.Status = fmt.Sprintf("SETUP Unkown control:%s", setupPath)
return
}
err := s.transport.ParseTransport(chindex, ts)
if err != nil {
resp.StatusCode = rtsp.StatusInvalidParameter
resp.Status = err.Error()
return
}
// 检查必须是play模式
if rtsp.PlaySession != s.transport.Mode {
resp.StatusCode = rtsp.StatusInvalidParameter
resp.Status = "can't setup as record"
return
}
if s.transport.Type != rtsp.RTPTCPUnicast { // 需要修改回复的transport
resp.StatusCode = rtsp.StatusUnsupportedTransport
resp.Status = "websocket only support tcp unicast"
return
}
if s.status < statusReady { // 初始状态切换到Ready
s.status = statusReady
}
}
func (s *Session) onPlay(resp *rtsp.Response, req *rtsp.Request) {
if s.status == statusPlaying {
s.paused = false
return
}
stream := media.GetOrCreate(s.path)
if stream == nil {
resp.StatusCode = rtsp.StatusNotFound
return
}
resp.Header.Set(rtsp.FieldRange, req.Header.Get(rtsp.FieldRange))
if s.cid == nil {
s.source = stream
// cid := stream.StartConsume(s)
cid := stream.StartConsumeNoGopCache(s, media.RTPPacket, "wsp")
s.cid = &cid
}
s.status = statusPlaying
s.paused = false
return
}
func (s *Session) onPause(resp *rtsp.Response, req *rtsp.Request) {
if s.status == statusPlaying {
s.paused = true
}
}
func (s *Session) onPreprocess(resp *rtsp.Response, req *rtsp.Request) (continueProcess bool) {
// Options
if req.Method == rtsp.MethodOptions {
resp.Header.Set(rtsp.FieldPublic, "DESCRIBE, SETUP, TEARDOWN, PLAY, OPTIONS, ANNOUNCE")
return false
}
// 关闭请求
if req.Method == rtsp.MethodTeardown {
return false
}
// 检查状态下的方法
switch s.status {
case statusReady:
continueProcess = req.Method == rtsp.MethodSetup ||
req.Method == rtsp.MethodPlay
case statusPlaying:
continueProcess = (req.Method == rtsp.MethodPlay ||
req.Method == rtsp.MethodPause)
default:
continueProcess = !(req.Method == rtsp.MethodPlay ||
req.Method == rtsp.MethodRecord)
}
if !continueProcess {
resp.StatusCode = rtsp.StatusMethodNotValidInThisState
return false
}
return true
}
func (s *Session) newResponse(code int, req *rtsp.Request) *rtsp.Response {
resp := &rtsp.Response{
StatusCode: code,
Header: make(rtsp.Header),
Request: req,
}
resp.Header.Set(rtsp.FieldCSeq, req.Header.Get(rtsp.FieldCSeq))
resp.Header.Set(rtsp.FieldSession, s.lsession)
return resp
}
func (s *Session) parseSdp(rawSdp string) (err error) {
// 从流中取 sdp
s.rawSdp = rawSdp
// 解析
s.sdp, err = sdp.ParseString(s.rawSdp)
if err != nil {
return
}
for _, media := range s.sdp.Media {
switch media.Type {
case "video":
s.vControl = media.Attributes.Get("control")
s.vCodec = media.Format[0].Name
case "audio":
s.aControl = media.Attributes.Get("control")
s.aCodec = media.Format[0].Name
}
}
return
}
func getControlPath(ctrl string) (path string) {
if len(ctrl) >= len(rtspURLPrefix) && strings.EqualFold(ctrl[:len(rtspURLPrefix)], rtspURLPrefix) {
ctrlURL, err := url.Parse(ctrl)
if err != nil {
return
}
if ctrlURL.Port() == "" {
ctrlURL.Host = fmt.Sprintf("%s:554", ctrlURL.Hostname())
}
return ctrlURL.String()
}
return ctrl
}

View File

@@ -1,10 +0,0 @@
// Copyright (c) 2019,CAOHONGJU All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package wsp
import "github.com/cnotch/ipchub/protos"
// Pack .
type Pack = protos.Pack

View File

@@ -1,124 +0,0 @@
// Copyright (c) 2019,CAOHONGJU All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package wsp
import (
"bytes"
"net"
"sync"
"github.com/cnotch/ipchub/network/websocket"
"github.com/cnotch/ipchub/provider/security"
"github.com/cnotch/xlog"
"github.com/kelindar/tcp"
)
// Server https://github.com/Streamedian/html5_rtsp_player 客户端配套的服务器
type Server struct {
logger *xlog.Logger
sessions sync.Map
}
// CreateAcceptHandler 创建连接接入处理器
func CreateAcceptHandler() tcp.OnAccept {
svr := &Server{
logger: xlog.L(),
}
return svr.onAcceptConn
}
// onAcceptConn 当新连接接入时触发
func (svr *Server) onAcceptConn(c net.Conn) {
wsc := c.(websocket.Conn)
if wsc.Subprotocol() == "control" {
go svr.handshakeControlChannel(wsc)
} else {
go svr.handshakeDataChannel(wsc)
}
}
func (svr *Server) handshakeControlChannel(wsc websocket.Conn) {
svr.logger.Info("wsp control channel handshake.")
wsc = wsc.TextTransport()
for {
req, err := DecodeRequest(wsc, svr.logger)
if err != nil {
svr.logger.Error(err.Error())
wsc.Close()
break
}
if req.Cmd == CmdGetInfo {
continue
}
if req.Cmd != CmdInit {
svr.logger.Errorf("wsp control channel handshake failed, malformed WSP request command: %s.", req.Cmd)
wsc.Close()
break
}
// 初始化
channelID := security.NewID().String()
buf := buffers.Get().(*bytes.Buffer)
buf.Reset()
defer buffers.Put(buf)
req.ResponseOK(buf, map[string]string{FieldChannel: channelID}, "")
_, err = wsc.Write(buf.Bytes())
if err != nil {
svr.logger.Error(err.Error())
wsc.Close()
break
}
session := newSession(svr, wsc, channelID)
svr.sessions.Store(channelID, session)
svr.logger.Debugf("wsp ===>>> \r\n%s", buf.String())
go session.process()
break
}
}
func (svr *Server) handshakeDataChannel(wsc websocket.Conn) {
tc := wsc.TextTransport()
req, err := DecodeRequest(tc, svr.logger)
if err != nil {
svr.logger.Error(err.Error())
tc.Close()
return
}
channelID := req.Header[FieldChannel]
code := 200
text := "OK"
var session *Session
si, ok := svr.sessions.Load(channelID)
if ok {
session = si.(*Session)
} else {
code = 404
text = "NOT FOUND"
}
buf := buffers.Get().(*bytes.Buffer)
buf.Reset()
defer buffers.Put(buf)
req.ResponseTo(buf, code, text, map[string]string{}, "")
_, err = tc.Write(buf.Bytes())
if err != nil {
svr.logger.Error(err.Error())
tc.Close()
return
}
svr.logger.Debugf("wsp ===>>> \r\n%s", buf.String())
if session == nil {
tc.Close()
return
}
// 添加到session
session.setDataChannel(wsc)
}