mirror of
https://github.com/langhuihui/monibuca.git
synced 2025-12-24 13:48:04 +08:00
fix: rtsp pull
This commit is contained in:
@@ -247,129 +247,146 @@ func (c *NetConnection) ReadResponse() (res *util.Response, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (c *NetConnection) Receive(sendMode bool) (channelID byte, buf []byte, err error) {
|
||||
ts := time.Now()
|
||||
if err = c.conn.SetReadDeadline(ts.Add(util.Conditional(sendMode, time.Second*60, time.Second*15))); err != nil {
|
||||
return
|
||||
}
|
||||
var magic []byte
|
||||
// we can read:
|
||||
// 1. RTP interleaved: `$` + 1B channel number + 2B size
|
||||
// 2. RTSP response: RTSP/1.0 200 OK
|
||||
// 3. RTSP request: OPTIONS ...
|
||||
|
||||
if magic, err = c.Peek(4); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
var size int
|
||||
if magic[0] != '$' {
|
||||
magicWord := string(magic)
|
||||
//c.Warn("not magic", "magic", magicWord)
|
||||
switch magicWord {
|
||||
case "RTSP":
|
||||
var res *util.Response
|
||||
if res, err = c.ReadResponse(); err != nil {
|
||||
return
|
||||
}
|
||||
c.Warn(string(res.Body))
|
||||
// for playing backchannel only after OK response on play
|
||||
|
||||
func (c *NetConnection) Receive(sendMode bool, onReceive func(byte, []byte), onRTCP func(byte, []byte)) (err error) {
|
||||
for err == nil {
|
||||
ts := time.Now()
|
||||
if err = c.conn.SetReadDeadline(ts.Add(util.Conditional(sendMode, time.Second*60, time.Second*15))); err != nil {
|
||||
return
|
||||
}
|
||||
var magic []byte
|
||||
// we can read:
|
||||
// 1. RTP interleaved: `$` + 1B channel number + 2B size
|
||||
// 2. RTSP response: RTSP/1.0 200 OK
|
||||
// 3. RTSP request: OPTIONS ...
|
||||
|
||||
case "OPTI", "TEAR", "DESC", "SETU", "PLAY", "PAUS", "RECO", "ANNO", "GET_", "SET_":
|
||||
var req *util.Request
|
||||
if req, err = c.ReadRequest(); err != nil {
|
||||
return
|
||||
}
|
||||
if magic, err = c.Peek(4); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if req.Method == MethodOptions {
|
||||
res := &util.Response{Request: req}
|
||||
if sendMode {
|
||||
c.StartWrite()
|
||||
}
|
||||
if err = c.WriteResponse(res); err != nil {
|
||||
var size int
|
||||
if magic[0] != '$' {
|
||||
magicWord := string(magic)
|
||||
c.Warn("not magic", "magic", magicWord)
|
||||
switch magicWord {
|
||||
case "RTSP":
|
||||
var res *util.Response
|
||||
if res, err = c.ReadResponse(); err != nil {
|
||||
return
|
||||
}
|
||||
if sendMode {
|
||||
c.StopWrite()
|
||||
}
|
||||
}
|
||||
return
|
||||
c.Warn("response", "res", res.String())
|
||||
// for playing backchannel only after OK response on play
|
||||
|
||||
default:
|
||||
c.Error("wrong input")
|
||||
//c.Fire("RTSP wrong input")
|
||||
//
|
||||
//for i := 0; ; i++ {
|
||||
// // search next start symbol
|
||||
// if _, err = c.reader.ReadBytes('$'); err != nil {
|
||||
// return err
|
||||
// }
|
||||
//
|
||||
// if channelID, err = c.reader.ReadByte(); err != nil {
|
||||
// return err
|
||||
// }
|
||||
//
|
||||
// // TODO: better check maximum good channel ID
|
||||
// if channelID >= 20 {
|
||||
// continue
|
||||
// }
|
||||
//
|
||||
// buf4 = make([]byte, 2)
|
||||
// if _, err = io.ReadFull(c.reader, buf4); err != nil {
|
||||
// return err
|
||||
// }
|
||||
//
|
||||
// // check if size good for RTP
|
||||
// size = binary.BigEndian.Uint16(buf4)
|
||||
// if size <= 1500 {
|
||||
// break
|
||||
// }
|
||||
//
|
||||
// // 10 tries to find good packet
|
||||
// if i >= 10 {
|
||||
// return fmt.Errorf("RTSP wrong input")
|
||||
// }
|
||||
//}
|
||||
for err = c.Skip(1); err == nil; {
|
||||
if magic[0], err = c.ReadByte(); magic[0] == '*' {
|
||||
channelID, err = c.ReadByte()
|
||||
magic[2], err = c.ReadByte()
|
||||
magic[3], err = c.ReadByte()
|
||||
size = int(binary.BigEndian.Uint16(magic[2:]))
|
||||
buf = c.MemoryAllocator.Malloc(size)
|
||||
if err = c.ReadNto(size, buf); err != nil {
|
||||
continue
|
||||
|
||||
case "OPTI", "TEAR", "DESC", "SETU", "PLAY", "PAUS", "RECO", "ANNO", "GET_", "SET_":
|
||||
var req *util.Request
|
||||
if req, err = c.ReadRequest(); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if req.Method == MethodOptions {
|
||||
res := &util.Response{Request: req}
|
||||
if sendMode {
|
||||
c.StartWrite()
|
||||
}
|
||||
if err = c.WriteResponse(res); err != nil {
|
||||
return
|
||||
}
|
||||
break
|
||||
if sendMode {
|
||||
c.StopWrite()
|
||||
}
|
||||
}
|
||||
continue
|
||||
|
||||
default:
|
||||
c.Error("wrong input")
|
||||
//c.Fire("RTSP wrong input")
|
||||
//
|
||||
//for i := 0; ; i++ {
|
||||
// // search next start symbol
|
||||
// if _, err = c.reader.ReadBytes('$'); err != nil {
|
||||
// return err
|
||||
// }
|
||||
//
|
||||
// if channelID, err = c.reader.ReadByte(); err != nil {
|
||||
// return err
|
||||
// }
|
||||
//
|
||||
// // TODO: better check maximum good channel ID
|
||||
// if channelID >= 20 {
|
||||
// continue
|
||||
// }
|
||||
//
|
||||
// buf4 = make([]byte, 2)
|
||||
// if _, err = io.ReadFull(c.reader, buf4); err != nil {
|
||||
// return err
|
||||
// }
|
||||
//
|
||||
// // check if size good for RTP
|
||||
// size = binary.BigEndian.Uint16(buf4)
|
||||
// if size <= 1500 {
|
||||
// break
|
||||
// }
|
||||
//
|
||||
// // 10 tries to find good packet
|
||||
// if i >= 10 {
|
||||
// return fmt.Errorf("RTSP wrong input")
|
||||
// }
|
||||
//}
|
||||
for err = c.Skip(1); err == nil; {
|
||||
if magic[0], err = c.ReadByte(); magic[0] == '*' {
|
||||
var channelID byte
|
||||
channelID, err = c.ReadByte()
|
||||
magic[2], err = c.ReadByte()
|
||||
magic[3], err = c.ReadByte()
|
||||
size = int(binary.BigEndian.Uint16(magic[2:]))
|
||||
buf := c.MemoryAllocator.Malloc(size)
|
||||
if err = c.ReadNto(size, buf); err != nil {
|
||||
c.MemoryAllocator.Free(buf)
|
||||
return
|
||||
} else if onReceive != nil {
|
||||
onReceive(channelID, buf)
|
||||
} else {
|
||||
c.MemoryAllocator.Free(buf)
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// hope that the odd channels are always RTCP
|
||||
} else {
|
||||
// hope that the odd channels are always RTCP
|
||||
channelID := magic[1]
|
||||
|
||||
channelID = magic[1]
|
||||
|
||||
// get data size
|
||||
size = int(binary.BigEndian.Uint16(magic[2:]))
|
||||
// skip 4 bytes from c.reader.Peek
|
||||
if err = c.Skip(4); err != nil {
|
||||
return
|
||||
}
|
||||
buf = c.MemoryAllocator.Malloc(size)
|
||||
if err = c.ReadNto(size, buf); err != nil {
|
||||
// get data size
|
||||
size = int(binary.BigEndian.Uint16(magic[2:]))
|
||||
// skip 4 bytes from c.reader.Peek
|
||||
if err = c.Skip(4); err != nil {
|
||||
return
|
||||
}
|
||||
buf := c.MemoryAllocator.Malloc(size)
|
||||
if err = c.ReadNto(size, buf); err != nil {
|
||||
c.MemoryAllocator.Free(buf)
|
||||
return
|
||||
}
|
||||
if channelID&1 == 0 {
|
||||
if onReceive != nil {
|
||||
onReceive(channelID, buf)
|
||||
continue
|
||||
}
|
||||
} else if onRTCP != nil {
|
||||
onRTCP(channelID, buf)
|
||||
continue
|
||||
}
|
||||
c.MemoryAllocator.Free(buf)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if ts.After(c.keepaliveTS) {
|
||||
req := &util.Request{Method: MethodOptions, URL: c.URL}
|
||||
if err = c.WriteRequest(req); err != nil {
|
||||
return
|
||||
if ts.After(c.keepaliveTS) {
|
||||
req := &util.Request{Method: MethodOptions, URL: c.URL}
|
||||
if err = c.WriteRequest(req); err != nil {
|
||||
return
|
||||
}
|
||||
c.keepaliveTS = ts.Add(25 * time.Second)
|
||||
}
|
||||
c.keepaliveTS = ts.Add(25 * time.Second)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user