mirror of
https://github.com/cnotch/ipchub.git
synced 2025-09-27 03:45:54 +08:00
readd wsp
This commit is contained in:
@@ -189,6 +189,7 @@ func (s *Service) onGetRuntime(w http.ResponseWriter, r *http.Request, pathParam
|
||||
Streams sccc `json:"streams"`
|
||||
Rtsp stats.ConnsSample `json:"rtsp"`
|
||||
Flv stats.ConnsSample `json:"flv"`
|
||||
Wsp stats.ConnsSample `json:"wsp"`
|
||||
Extra *stats.Runtime `json:"extra,omitempty"`
|
||||
}
|
||||
sc, cc := media.Count()
|
||||
@@ -199,6 +200,7 @@ func (s *Service) onGetRuntime(w http.ResponseWriter, r *http.Request, pathParam
|
||||
Streams: sccc{sc, cc},
|
||||
Rtsp: stats.RtspConns.GetSample(),
|
||||
Flv: stats.FlvConns.GetSample(),
|
||||
Wsp: stats.WspConns.GetSample(),
|
||||
}
|
||||
|
||||
params := r.URL.Query()
|
||||
|
@@ -22,6 +22,7 @@ import (
|
||||
"github.com/cnotch/ipchub/provider/auth"
|
||||
"github.com/cnotch/ipchub/provider/route"
|
||||
"github.com/cnotch/ipchub/service/rtsp"
|
||||
"github.com/cnotch/ipchub/service/wsp"
|
||||
"github.com/cnotch/scheduler"
|
||||
"github.com/cnotch/xlog"
|
||||
"github.com/emitter-io/address"
|
||||
@@ -36,6 +37,7 @@ type Service struct {
|
||||
tlsusing bool
|
||||
http *http.Server
|
||||
rtsp *tcp.Server
|
||||
wsp *tcp.Server
|
||||
tokens *auth.TokenManager
|
||||
}
|
||||
|
||||
@@ -48,6 +50,7 @@ func NewService(ctx context.Context, l *xlog.Logger) (s *Service, err error) {
|
||||
logger: l,
|
||||
http: new(http.Server),
|
||||
rtsp: new(tcp.Server),
|
||||
wsp: new(tcp.Server),
|
||||
tokens: new(auth.TokenManager),
|
||||
}
|
||||
|
||||
@@ -78,7 +81,8 @@ func NewService(ctx context.Context, l *xlog.Logger) (s *Service, err error) {
|
||||
|
||||
// 设置 rtsp AcceptHandler
|
||||
s.rtsp.OnAccept = rtsp.CreateAcceptHandler()
|
||||
|
||||
// 设置 wsp AcceptHandler
|
||||
s.wsp.OnAccept = wsp.CreateAcceptHandler()
|
||||
// 启动定时存储拉流信息
|
||||
scheduler.PeriodFunc(time.Minute*5, time.Minute*5, func() {
|
||||
route.Flush()
|
||||
|
@@ -38,6 +38,12 @@ func (s *Service) onWebSocketRequest(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
if ws.Subprotocol() == "control" || ws.Subprotocol() == "data" {
|
||||
// 代理访问
|
||||
s.wsp.OnAccept(ws)
|
||||
return
|
||||
}
|
||||
|
||||
if ext == ".flv" {
|
||||
go flv.ConsumeByWebsocket(s.logger, streamPath, r.RemoteAddr, ws)
|
||||
return
|
||||
|
170
service/wsp/protocol.go
Executable file
170
service/wsp/protocol.go
Executable file
@@ -0,0 +1,170 @@
|
||||
// Copyright (c) 2019,CAOHONGJU All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package wsp
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
"sync"
|
||||
"unicode"
|
||||
|
||||
"github.com/cnotch/ipchub/utils/scan"
|
||||
"github.com/cnotch/xlog"
|
||||
)
|
||||
|
||||
const (
|
||||
wspProto = "WSP/1.1" // WSP协议版本
|
||||
prefixBody = "\r\n\r\n" // Header和Body分割符
|
||||
)
|
||||
|
||||
// WSP 协议命令
|
||||
const (
|
||||
CmdInit = "INIT" // 初始化建立通道
|
||||
CmdJoin = "JOIN" // 数据通道使用
|
||||
CmdWrap = "WRAP" // 包装其他协议的命令
|
||||
CmdGetInfo = "GET_INFO" // 获取客户及license信息
|
||||
)
|
||||
|
||||
// WSP 协议字段
|
||||
const (
|
||||
FieldProto = "proto" // 初始化的协议 如:rtsp
|
||||
FieldSeq = "seq" // 命令序列
|
||||
FieldHost = "host" // 需要代理服务访问的远端host
|
||||
FieldPort = "port" // 需要代理服务访问的远端port
|
||||
FieldClient = "client" // 客户信息
|
||||
FieldChannel = "channel" // 数据通道编号,相当于一个session
|
||||
FieldSocket = "socket" // 代替上面的host和port
|
||||
)
|
||||
|
||||
type badStringError struct {
|
||||
what string
|
||||
str string
|
||||
}
|
||||
|
||||
func (e *badStringError) Error() string { return fmt.Sprintf("%s %q", e.what, e.str) }
|
||||
|
||||
// Request WSP 协议请求
|
||||
type Request struct {
|
||||
Cmd string
|
||||
Header map[string]string
|
||||
Body string
|
||||
}
|
||||
|
||||
var (
|
||||
spacePair = scan.NewPair(' ',
|
||||
func(r rune) bool {
|
||||
return unicode.IsSpace(r)
|
||||
})
|
||||
|
||||
validCmds = map[string]bool{
|
||||
CmdGetInfo: true,
|
||||
CmdInit: true,
|
||||
CmdJoin: true,
|
||||
CmdWrap: true,
|
||||
}
|
||||
|
||||
bspool = &sync.Pool{
|
||||
New: func() interface{} {
|
||||
return make([]byte, 8*1024)
|
||||
},
|
||||
}
|
||||
buffers = sync.Pool{
|
||||
New: func() interface{} {
|
||||
return bytes.NewBuffer(make([]byte, 0, 1024*2))
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
// DecodeStringRequest 解码字串请求
|
||||
func DecodeStringRequest(input string) (*Request, error) {
|
||||
index := strings.Index(input, prefixBody)
|
||||
if index < 0 {
|
||||
return nil, &badStringError{"malformed WSP request,missing '\\r\\n\\r\\n'", input}
|
||||
}
|
||||
|
||||
req := &Request{
|
||||
Body: input[index+4:],
|
||||
Header: make(map[string]string, 4),
|
||||
}
|
||||
|
||||
scanner := scan.Line
|
||||
|
||||
// 先取首行
|
||||
tailing, substr, ok := scanner.Scan(input[:index])
|
||||
if !ok {
|
||||
return nil, &badStringError{"malformed WSP request first line", substr}
|
||||
}
|
||||
proto, cmd, ok := spacePair.Scan(substr)
|
||||
if proto != wspProto {
|
||||
return nil, &badStringError{"malformed WSP request proto ", proto}
|
||||
}
|
||||
if _, ok := validCmds[cmd]; !ok {
|
||||
return nil, &badStringError{"malformed WSP request command ", cmd}
|
||||
}
|
||||
req.Cmd = cmd
|
||||
|
||||
// 循环取header
|
||||
for ok {
|
||||
tailing, substr, ok = scanner.Scan(tailing)
|
||||
k, v, found := scan.ColonPair.Scan(substr)
|
||||
if found {
|
||||
req.Header[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
return req, nil
|
||||
}
|
||||
|
||||
// DecodeRequest 解码请求
|
||||
func DecodeRequest(r io.Reader, logger *xlog.Logger) (*Request, error) {
|
||||
buf := bspool.Get().([]byte)
|
||||
defer bspool.Put(buf)
|
||||
|
||||
n, err := r.Read(buf)
|
||||
if n == 0 && err == nil { // 上一个报文结束,再读一次
|
||||
n, err = r.Read(buf)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
input := string(buf[:n])
|
||||
logger.Debugf("wsp <<<=== \r\n%s", input)
|
||||
|
||||
return DecodeStringRequest(input)
|
||||
}
|
||||
|
||||
// IsWrap 是否是包装协议,如果是,可以从Body提取被包装的协议
|
||||
func (req *Request) IsWrap() bool {
|
||||
return req.Cmd == CmdWrap
|
||||
}
|
||||
|
||||
// ResponseOK 响应请求成功
|
||||
func (req *Request) ResponseOK(buf *bytes.Buffer, header map[string]string, payload string) {
|
||||
req.ResponseTo(buf, 200, "OK", header, payload)
|
||||
}
|
||||
|
||||
// ResponseTo 响应请求到buf
|
||||
func (req *Request) ResponseTo(buf *bytes.Buffer, statusCode int, statusText string, header map[string]string, payload string) {
|
||||
// 写首行
|
||||
buf.WriteString(fmt.Sprintf("%s %d %s\r\n", wspProto, statusCode, statusText))
|
||||
// 写头
|
||||
header[FieldSeq] = req.Header[FieldSeq]
|
||||
for k, v := range header {
|
||||
buf.WriteString(k)
|
||||
buf.WriteString(": ")
|
||||
buf.WriteString(v)
|
||||
buf.WriteString("\r\n")
|
||||
}
|
||||
// 写header和body分割
|
||||
buf.WriteString("\r\n")
|
||||
// 写payload
|
||||
if len(payload) > 0 {
|
||||
buf.WriteString(payload)
|
||||
}
|
||||
}
|
69
service/wsp/protocol_test.go
Executable file
69
service/wsp/protocol_test.go
Executable file
@@ -0,0 +1,69 @@
|
||||
// Copyright (c) 2019,CAOHONGJU All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package wsp
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/cnotch/xlog"
|
||||
)
|
||||
|
||||
func TestDecodeRequest(t *testing.T) {
|
||||
var reqStr = "WSP/1.1 GET_INFO\r\nproto: rtsp\r\nhost: 192.168.1.1\r\nport: 554\r\nclient: \r\nseq: 1\r\n\r\n"
|
||||
t.Run("decode", func(t *testing.T) {
|
||||
r := bytes.NewBufferString(reqStr)
|
||||
got, err := DecodeRequest(r, xlog.L())
|
||||
if err != nil {
|
||||
t.Errorf("DecodeRequest() error = %v", err)
|
||||
return
|
||||
}
|
||||
assert.Equal(t, CmdGetInfo, got.Cmd)
|
||||
assert.Equal(t, "1", got.Header[FieldSeq])
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func TestRequest_ResponseOK(t *testing.T) {
|
||||
respStr1 := "WSP/1.1 200 OK\r\nchannel: 334\r\nseq: 1\r\n\r\n"
|
||||
respStr2 := "WSP/1.1 404 NOT FOUND\r\nchannel: 334\r\nseq: 1\r\n\r\n123"
|
||||
t.Run("no payload", func(t *testing.T) {
|
||||
req := &Request{
|
||||
Header: make(map[string]string),
|
||||
}
|
||||
req.Header[FieldSeq] = "1"
|
||||
buf := &bytes.Buffer{}
|
||||
header := make(map[string]string)
|
||||
header[FieldChannel] = "334"
|
||||
req.ResponseOK(buf, header, "")
|
||||
resp := buf.String()
|
||||
assert.Equal(t, respStr1, resp)
|
||||
})
|
||||
t.Run("payload", func(t *testing.T) {
|
||||
req := &Request{
|
||||
Header: make(map[string]string),
|
||||
}
|
||||
req.Header[FieldSeq] = "1"
|
||||
buf := &bytes.Buffer{}
|
||||
header := make(map[string]string)
|
||||
header[FieldChannel] = "334"
|
||||
req.ResponseTo(buf, 404, "NOT FOUND", header, "123")
|
||||
resp := buf.String()
|
||||
assert.Equal(t, respStr2, resp)
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func Benchmark_DecodeRequest(b *testing.B) {
|
||||
var reqStr = "WSP/1.1 GET_INFO\r\nproto: rtsp\r\nhost: 192.168.1.1\r\nport: 554\r\nclient: \r\nseq: 1\r\n\r\n"
|
||||
b.ResetTimer()
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
for pb.Next() {
|
||||
got, _ := DecodeStringRequest(reqStr)
|
||||
_ = got
|
||||
}
|
||||
})
|
||||
}
|
461
service/wsp/session.go
Executable file
461
service/wsp/session.go
Executable file
@@ -0,0 +1,461 @@
|
||||
// Copyright (c) 2019,CAOHONGJU All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package wsp
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/url"
|
||||
"runtime/debug"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cnotch/ipchub/config"
|
||||
"github.com/cnotch/ipchub/media"
|
||||
"github.com/cnotch/ipchub/network/websocket"
|
||||
"github.com/cnotch/ipchub/provider/security"
|
||||
"github.com/cnotch/ipchub/service/rtsp"
|
||||
"github.com/cnotch/ipchub/stats"
|
||||
"github.com/cnotch/xlog"
|
||||
"github.com/pixelbender/go-sdp/sdp"
|
||||
)
|
||||
|
||||
const (
|
||||
statusInit = iota
|
||||
statusReady
|
||||
statusPlaying
|
||||
)
|
||||
const (
|
||||
rtspURLPrefix = "rtsp://" // RTSP地址前缀
|
||||
)
|
||||
|
||||
// Session RTSP 会话
|
||||
type Session struct {
|
||||
// 创建时设置
|
||||
svr *Server
|
||||
channelID string
|
||||
logger *xlog.Logger
|
||||
closed bool
|
||||
paused bool
|
||||
lsession string // 本地会话标识
|
||||
timeout time.Duration
|
||||
conn websocket.Conn
|
||||
lockW sync.Mutex
|
||||
dataChannel websocket.Conn
|
||||
|
||||
// DESCRIBE 后设置
|
||||
url *url.URL
|
||||
path string
|
||||
rawSdp string
|
||||
sdp *sdp.Session
|
||||
aControl string
|
||||
vControl string
|
||||
aCodec string
|
||||
vCodec string
|
||||
|
||||
// Setup 后设置
|
||||
transport rtsp.RTPTransport
|
||||
|
||||
// 启动流媒体传输后设置
|
||||
status int // session状态
|
||||
source *media.Stream
|
||||
cid *media.CID
|
||||
}
|
||||
|
||||
func newSession(svr *Server, conn websocket.Conn, channelID string) *Session {
|
||||
|
||||
session := &Session{
|
||||
svr: svr,
|
||||
channelID: channelID,
|
||||
lsession: security.NewID().Base64(),
|
||||
timeout: config.NetTimeout() * time.Duration(2),
|
||||
conn: conn,
|
||||
transport: rtsp.RTPTransport{
|
||||
Mode: rtsp.PlaySession, // 默认为播放
|
||||
Type: rtsp.RTPUnknownTrans,
|
||||
},
|
||||
status: statusInit,
|
||||
}
|
||||
|
||||
for i := 0; i < 4; i++ {
|
||||
session.transport.Channels[i] = -1
|
||||
session.transport.ClientPorts[i] = -1
|
||||
}
|
||||
session.logger = svr.logger.With(xlog.Fields(
|
||||
xlog.F("channel", channelID),
|
||||
xlog.F("path", conn.Path())))
|
||||
|
||||
return session
|
||||
}
|
||||
|
||||
// 设置rtp数据通道
|
||||
func (s *Session) setDataChannel(dc websocket.Conn) {
|
||||
s.lockW.Lock()
|
||||
s.dataChannel = dc
|
||||
s.lockW.Unlock()
|
||||
}
|
||||
|
||||
// Addr Session地址
|
||||
func (s *Session) Addr() string {
|
||||
return s.conn.RemoteAddr().String()
|
||||
}
|
||||
|
||||
// Consume 消费媒体包
|
||||
func (s *Session) Consume(p Pack) {
|
||||
if s.closed || s.paused {
|
||||
return
|
||||
}
|
||||
|
||||
buf := buffers.Get().(*bytes.Buffer)
|
||||
buf.Reset()
|
||||
defer buffers.Put(buf)
|
||||
p2 := p.(*rtsp.RTPPack)
|
||||
p2.Write(buf, s.transport.Channels[:])
|
||||
|
||||
var err error
|
||||
s.lockW.Lock()
|
||||
if s.dataChannel != nil {
|
||||
_, err = s.dataChannel.Write(buf.Bytes())
|
||||
}
|
||||
s.lockW.Unlock()
|
||||
|
||||
if err != nil {
|
||||
s.logger.Errorf("send pack error = %v , close socket", err)
|
||||
s.Close()
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Close 关闭会话
|
||||
func (s *Session) Close() error {
|
||||
if s.closed {
|
||||
return nil
|
||||
}
|
||||
|
||||
s.closed = true
|
||||
s.paused = false
|
||||
s.conn.Close()
|
||||
s.lockW.Lock()
|
||||
if s.dataChannel != nil {
|
||||
s.dataChannel.Close()
|
||||
}
|
||||
s.lockW.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Session) process() {
|
||||
var err error
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
s.logger.Errorf("wsp channel panic, %v \n %s", r, debug.Stack())
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
if err == io.EOF { // 如果客户端断开提醒
|
||||
s.logger.Warn("websocket disconnect actively")
|
||||
} else if !s.closed { // 如果主动关闭,不提示
|
||||
s.logger.Error(err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// 删除通道
|
||||
s.svr.sessions.Delete(s.channelID)
|
||||
// 停止消费
|
||||
if s.cid != nil {
|
||||
s.source.StopConsume(*s.cid)
|
||||
s.cid = nil
|
||||
s.source = nil
|
||||
}
|
||||
// 关闭连接
|
||||
s.Close()
|
||||
|
||||
// 重置到初始状态
|
||||
s.conn = nil
|
||||
s.dataChannel = nil
|
||||
s.status = statusInit
|
||||
stats.WspConns.Release()
|
||||
s.logger.Info("close wsp channel")
|
||||
}()
|
||||
|
||||
s.logger.Info("open wsp channel")
|
||||
|
||||
stats.WspConns.Add() // 增加一个 RTSP 连接计数
|
||||
|
||||
for !s.closed {
|
||||
deadLine := time.Time{}
|
||||
if s.timeout > 0 {
|
||||
deadLine = time.Now().Add(s.timeout)
|
||||
}
|
||||
if err = s.conn.SetReadDeadline(deadLine); err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
var req *Request
|
||||
req, err = DecodeRequest(s.conn, s.logger)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
if req.Cmd != CmdWrap {
|
||||
s.logger.Error("must is WRAP command request")
|
||||
break
|
||||
}
|
||||
|
||||
// 从包装命令中提取 rtsp 请求
|
||||
var rtspReq *rtsp.Request
|
||||
rtspReq, err = rtsp.ReadRequest(bufio.NewReader(bytes.NewBufferString(req.Body)))
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
// 处理请求,并获得响应
|
||||
rtspResp := s.onRequest(rtspReq)
|
||||
|
||||
// 发送响应
|
||||
buf := buffers.Get().(*bytes.Buffer)
|
||||
buf.Reset()
|
||||
defer buffers.Put(buf)
|
||||
req.ResponseOK(buf, map[string]string{FieldChannel: s.channelID}, "")
|
||||
rtspResp.Write(buf)
|
||||
_, err = s.conn.Write(buf.Bytes())
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
s.logger.Debugf("wsp ===>>>\r\n%s", buf.String())
|
||||
|
||||
// 关闭通道
|
||||
if rtspReq.Method == rtsp.MethodTeardown {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Session) onRequest(req *rtsp.Request) *rtsp.Response {
|
||||
resp := s.newResponse(rtsp.StatusOK, req)
|
||||
// 预处理
|
||||
continueProcess := s.onPreprocess(resp, req)
|
||||
if !continueProcess {
|
||||
return resp
|
||||
}
|
||||
|
||||
switch req.Method {
|
||||
case rtsp.MethodDescribe:
|
||||
s.onDescribe(resp, req)
|
||||
case rtsp.MethodSetup:
|
||||
s.onSetup(resp, req)
|
||||
case rtsp.MethodPlay:
|
||||
s.onPlay(resp, req)
|
||||
case rtsp.MethodPause:
|
||||
s.onPause(resp, req)
|
||||
default:
|
||||
// 状态不支持的方法
|
||||
resp.StatusCode = rtsp.StatusMethodNotValidInThisState
|
||||
}
|
||||
return resp
|
||||
}
|
||||
|
||||
func (s *Session) onDescribe(resp *rtsp.Response, req *rtsp.Request) {
|
||||
|
||||
// TODO: 检查 accept 中的类型是否包含 sdp
|
||||
s.url = req.URL
|
||||
s.path = s.conn.Path() // 使用websocket路径
|
||||
// s.path = utils.CanonicalPath(req.URL.Path)
|
||||
stream := media.GetOrCreate(s.path)
|
||||
if stream == nil {
|
||||
resp.StatusCode = rtsp.StatusNotFound
|
||||
return
|
||||
}
|
||||
|
||||
// 从流中取 sdp
|
||||
sdpRaw := stream.Sdp()
|
||||
if len(sdpRaw) == 0 {
|
||||
resp.StatusCode = rtsp.StatusNotFound
|
||||
return
|
||||
}
|
||||
err := s.parseSdp(sdpRaw)
|
||||
if err != nil { // TODO:需要更好的处理方式
|
||||
resp.StatusCode = rtsp.StatusNotFound
|
||||
return
|
||||
}
|
||||
|
||||
resp.Header.Set(rtsp.FieldContentType, "application/sdp")
|
||||
resp.Body = s.rawSdp
|
||||
}
|
||||
|
||||
func (s *Session) onSetup(resp *rtsp.Response, req *rtsp.Request) {
|
||||
// a=control:streamid=1
|
||||
// a=control:rtsp://192.168.1.165/trackID=1
|
||||
// a=control:?ctype=video
|
||||
setupURL := &url.URL{}
|
||||
*setupURL = *req.URL
|
||||
if setupURL.Port() == "" {
|
||||
setupURL.Host = fmt.Sprintf("%s:554", setupURL.Host)
|
||||
}
|
||||
setupPath := setupURL.String()
|
||||
|
||||
//setupPath = setupPath[strings.LastIndex(setupPath, "/")+1:]
|
||||
vPath := getControlPath(s.vControl)
|
||||
if vPath == "" {
|
||||
resp.StatusCode = rtsp.StatusInternalServerError
|
||||
resp.Status = "Invalid VControl"
|
||||
return
|
||||
}
|
||||
|
||||
aPath := getControlPath(s.aControl)
|
||||
|
||||
ts := req.Header.Get(rtsp.FieldTransport)
|
||||
resp.Header.Set(rtsp.FieldTransport, ts) // 先回写transport
|
||||
|
||||
// 检查控制路径
|
||||
chindex := -1
|
||||
if setupPath == aPath || (aPath != "" && strings.LastIndex(setupPath, aPath) == len(setupPath)-len(aPath)) {
|
||||
chindex = int(rtsp.ChannelAudio)
|
||||
} else if setupPath == vPath || (vPath != "" && strings.LastIndex(setupPath, vPath) == len(setupPath)-len(vPath)) {
|
||||
chindex = int(rtsp.ChannelVideo)
|
||||
} else { // 找不到被 Setup 的资源
|
||||
resp.StatusCode = rtsp.StatusInternalServerError
|
||||
resp.Status = fmt.Sprintf("SETUP Unkown control:%s", setupPath)
|
||||
return
|
||||
}
|
||||
|
||||
err := s.transport.ParseTransport(chindex, ts)
|
||||
if err != nil {
|
||||
resp.StatusCode = rtsp.StatusInvalidParameter
|
||||
resp.Status = err.Error()
|
||||
return
|
||||
}
|
||||
|
||||
// 检查必须是play模式
|
||||
if rtsp.PlaySession != s.transport.Mode {
|
||||
resp.StatusCode = rtsp.StatusInvalidParameter
|
||||
resp.Status = "can't setup as record"
|
||||
return
|
||||
}
|
||||
|
||||
if s.transport.Type != rtsp.RTPTCPUnicast { // 需要修改回复的transport
|
||||
resp.StatusCode = rtsp.StatusUnsupportedTransport
|
||||
resp.Status = "websocket only support tcp unicast"
|
||||
return
|
||||
}
|
||||
|
||||
if s.status < statusReady { // 初始状态切换到Ready
|
||||
s.status = statusReady
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Session) onPlay(resp *rtsp.Response, req *rtsp.Request) {
|
||||
if s.status == statusPlaying {
|
||||
s.paused = false
|
||||
return
|
||||
}
|
||||
|
||||
stream := media.GetOrCreate(s.path)
|
||||
if stream == nil {
|
||||
resp.StatusCode = rtsp.StatusNotFound
|
||||
return
|
||||
}
|
||||
|
||||
resp.Header.Set(rtsp.FieldRange, req.Header.Get(rtsp.FieldRange))
|
||||
if s.cid == nil {
|
||||
s.source = stream
|
||||
// cid := stream.StartConsume(s)
|
||||
cid := stream.StartConsumeNoGopCache(s, media.RTPPacket, "wsp")
|
||||
s.cid = &cid
|
||||
}
|
||||
s.status = statusPlaying
|
||||
s.paused = false
|
||||
return
|
||||
}
|
||||
|
||||
func (s *Session) onPause(resp *rtsp.Response, req *rtsp.Request) {
|
||||
if s.status == statusPlaying {
|
||||
s.paused = true
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Session) onPreprocess(resp *rtsp.Response, req *rtsp.Request) (continueProcess bool) {
|
||||
// Options
|
||||
if req.Method == rtsp.MethodOptions {
|
||||
resp.Header.Set(rtsp.FieldPublic, "DESCRIBE, SETUP, TEARDOWN, PLAY, OPTIONS, ANNOUNCE")
|
||||
return false
|
||||
}
|
||||
|
||||
// 关闭请求
|
||||
if req.Method == rtsp.MethodTeardown {
|
||||
return false
|
||||
}
|
||||
|
||||
// 检查状态下的方法
|
||||
switch s.status {
|
||||
case statusReady:
|
||||
continueProcess = req.Method == rtsp.MethodSetup ||
|
||||
req.Method == rtsp.MethodPlay
|
||||
case statusPlaying:
|
||||
continueProcess = (req.Method == rtsp.MethodPlay ||
|
||||
req.Method == rtsp.MethodPause)
|
||||
default:
|
||||
continueProcess = !(req.Method == rtsp.MethodPlay ||
|
||||
req.Method == rtsp.MethodRecord)
|
||||
}
|
||||
|
||||
if !continueProcess {
|
||||
resp.StatusCode = rtsp.StatusMethodNotValidInThisState
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (s *Session) newResponse(code int, req *rtsp.Request) *rtsp.Response {
|
||||
resp := &rtsp.Response{
|
||||
StatusCode: code,
|
||||
Header: make(rtsp.Header),
|
||||
Request: req,
|
||||
}
|
||||
|
||||
resp.Header.Set(rtsp.FieldCSeq, req.Header.Get(rtsp.FieldCSeq))
|
||||
resp.Header.Set(rtsp.FieldSession, s.lsession)
|
||||
return resp
|
||||
}
|
||||
|
||||
func (s *Session) parseSdp(rawSdp string) (err error) {
|
||||
// 从流中取 sdp
|
||||
s.rawSdp = rawSdp
|
||||
// 解析
|
||||
s.sdp, err = sdp.ParseString(s.rawSdp)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
for _, media := range s.sdp.Media {
|
||||
switch media.Type {
|
||||
case "video":
|
||||
s.vControl = media.Attributes.Get("control")
|
||||
s.vCodec = media.Format[0].Name
|
||||
case "audio":
|
||||
s.aControl = media.Attributes.Get("control")
|
||||
s.aCodec = media.Format[0].Name
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func getControlPath(ctrl string) (path string) {
|
||||
if len(ctrl) >= len(rtspURLPrefix) && strings.EqualFold(ctrl[:len(rtspURLPrefix)], rtspURLPrefix) {
|
||||
ctrlURL, err := url.Parse(ctrl)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if ctrlURL.Port() == "" {
|
||||
ctrlURL.Host = fmt.Sprintf("%s:554", ctrlURL.Hostname())
|
||||
}
|
||||
return ctrlURL.String()
|
||||
}
|
||||
return ctrl
|
||||
}
|
10
service/wsp/types.go
Normal file
10
service/wsp/types.go
Normal file
@@ -0,0 +1,10 @@
|
||||
// Copyright (c) 2019,CAOHONGJU All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package wsp
|
||||
|
||||
import "github.com/cnotch/ipchub/av/format"
|
||||
|
||||
// Pack .
|
||||
type Pack = format.Packet
|
124
service/wsp/wsp.go
Executable file
124
service/wsp/wsp.go
Executable file
@@ -0,0 +1,124 @@
|
||||
// Copyright (c) 2019,CAOHONGJU All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package wsp
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"net"
|
||||
"sync"
|
||||
|
||||
"github.com/cnotch/ipchub/network/websocket"
|
||||
"github.com/cnotch/ipchub/provider/security"
|
||||
"github.com/cnotch/xlog"
|
||||
"github.com/kelindar/tcp"
|
||||
)
|
||||
|
||||
// Server https://github.com/Streamedian/html5_rtsp_player 客户端配套的服务器
|
||||
type Server struct {
|
||||
logger *xlog.Logger
|
||||
|
||||
sessions sync.Map
|
||||
}
|
||||
|
||||
// CreateAcceptHandler 创建连接接入处理器
|
||||
func CreateAcceptHandler() tcp.OnAccept {
|
||||
svr := &Server{
|
||||
logger: xlog.L(),
|
||||
}
|
||||
return svr.onAcceptConn
|
||||
}
|
||||
|
||||
// onAcceptConn 当新连接接入时触发
|
||||
func (svr *Server) onAcceptConn(c net.Conn) {
|
||||
wsc := c.(websocket.Conn)
|
||||
if wsc.Subprotocol() == "control" {
|
||||
go svr.handshakeControlChannel(wsc)
|
||||
} else {
|
||||
go svr.handshakeDataChannel(wsc)
|
||||
}
|
||||
}
|
||||
|
||||
func (svr *Server) handshakeControlChannel(wsc websocket.Conn) {
|
||||
svr.logger.Info("wsp control channel handshake.")
|
||||
wsc = wsc.TextTransport()
|
||||
for {
|
||||
req, err := DecodeRequest(wsc, svr.logger)
|
||||
if err != nil {
|
||||
svr.logger.Error(err.Error())
|
||||
wsc.Close()
|
||||
break
|
||||
}
|
||||
|
||||
if req.Cmd == CmdGetInfo {
|
||||
continue
|
||||
}
|
||||
|
||||
if req.Cmd != CmdInit {
|
||||
svr.logger.Errorf("wsp control channel handshake failed, malformed WSP request command: %s.", req.Cmd)
|
||||
wsc.Close()
|
||||
break
|
||||
}
|
||||
|
||||
// 初始化
|
||||
channelID := security.NewID().String()
|
||||
buf := buffers.Get().(*bytes.Buffer)
|
||||
buf.Reset()
|
||||
defer buffers.Put(buf)
|
||||
req.ResponseOK(buf, map[string]string{FieldChannel: channelID}, "")
|
||||
_, err = wsc.Write(buf.Bytes())
|
||||
if err != nil {
|
||||
svr.logger.Error(err.Error())
|
||||
wsc.Close()
|
||||
break
|
||||
}
|
||||
session := newSession(svr, wsc, channelID)
|
||||
svr.sessions.Store(channelID, session)
|
||||
svr.logger.Debugf("wsp ===>>> \r\n%s", buf.String())
|
||||
go session.process()
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
func (svr *Server) handshakeDataChannel(wsc websocket.Conn) {
|
||||
tc := wsc.TextTransport()
|
||||
req, err := DecodeRequest(tc, svr.logger)
|
||||
if err != nil {
|
||||
svr.logger.Error(err.Error())
|
||||
tc.Close()
|
||||
return
|
||||
}
|
||||
|
||||
channelID := req.Header[FieldChannel]
|
||||
code := 200
|
||||
text := "OK"
|
||||
var session *Session
|
||||
si, ok := svr.sessions.Load(channelID)
|
||||
if ok {
|
||||
session = si.(*Session)
|
||||
} else {
|
||||
code = 404
|
||||
text = "NOT FOUND"
|
||||
}
|
||||
|
||||
buf := buffers.Get().(*bytes.Buffer)
|
||||
buf.Reset()
|
||||
defer buffers.Put(buf)
|
||||
req.ResponseTo(buf, code, text, map[string]string{}, "")
|
||||
_, err = tc.Write(buf.Bytes())
|
||||
if err != nil {
|
||||
svr.logger.Error(err.Error())
|
||||
tc.Close()
|
||||
return
|
||||
}
|
||||
|
||||
svr.logger.Debugf("wsp ===>>> \r\n%s", buf.String())
|
||||
if session == nil {
|
||||
tc.Close()
|
||||
return
|
||||
}
|
||||
|
||||
// 添加到session
|
||||
session.setDataChannel(wsc)
|
||||
}
|
Reference in New Issue
Block a user