server: support reading with multicast

This commit is contained in:
aler9
2021-06-14 19:39:50 +02:00
committed by Alessandro Ros
parent f80faf5ac5
commit 3f3226b53d
16 changed files with 749 additions and 275 deletions

View File

@@ -16,9 +16,8 @@ import (
type serverHandler struct {
mutex sync.Mutex
stream *gortsplib.ServerStream
publisher *gortsplib.ServerSession
readers map[*gortsplib.ServerSession]struct{}
sdp []byte
}
// called after a connection is opened.
@@ -43,23 +42,22 @@ func (sh *serverHandler) OnSessionClose(ctx *gortsplib.ServerHandlerOnSessionClo
sh.mutex.Lock()
defer sh.mutex.Unlock()
if ctx.Session == sh.publisher {
sh.publisher = nil
sh.sdp = nil
} else {
delete(sh.readers, ctx.Session)
// close stream-related listeners and disconnect every reader
if sh.stream != nil && ctx.Session == sh.publisher {
sh.stream.Close()
sh.stream = nil
}
}
// called after receiving a DESCRIBE request.
func (sh *serverHandler) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, []byte, error) {
func (sh *serverHandler) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) {
log.Printf("describe request")
sh.mutex.Lock()
defer sh.mutex.Unlock()
// no one is publishing yet
if sh.publisher == nil {
if sh.stream == nil {
return &base.Response{
StatusCode: base.StatusNotFound,
}, nil, nil
@@ -67,7 +65,7 @@ func (sh *serverHandler) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (
return &base.Response{
StatusCode: base.StatusOK,
}, sh.sdp, nil
}, sh.stream, nil
}
// called after receiving an ANNOUNCE request.
@@ -77,14 +75,14 @@ func (sh *serverHandler) OnAnnounce(ctx *gortsplib.ServerHandlerOnAnnounceCtx) (
sh.mutex.Lock()
defer sh.mutex.Unlock()
if sh.publisher != nil {
if sh.stream != nil {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, fmt.Errorf("someone is already publishing")
}
sh.stream = gortsplib.NewServerStream(ctx.Tracks)
sh.publisher = ctx.Session
sh.sdp = ctx.Tracks.Write()
return &base.Response{
StatusCode: base.StatusOK,
@@ -92,23 +90,25 @@ func (sh *serverHandler) OnAnnounce(ctx *gortsplib.ServerHandlerOnAnnounceCtx) (
}
// called after receiving a SETUP request.
func (sh *serverHandler) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *uint32, error) {
func (sh *serverHandler) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, *uint32, error) {
log.Printf("setup request")
// no one is publishing yet
if sh.stream == nil {
return &base.Response{
StatusCode: base.StatusNotFound,
}, nil, nil, nil
}
return &base.Response{
StatusCode: base.StatusOK,
}, nil, nil
}, sh.stream, nil, nil
}
// called after receiving a PLAY request.
func (sh *serverHandler) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) {
log.Printf("play request")
sh.mutex.Lock()
defer sh.mutex.Unlock()
sh.readers[ctx.Session] = struct{}{}
return &base.Response{
StatusCode: base.StatusOK,
}, nil
@@ -130,16 +130,14 @@ func (sh *serverHandler) OnFrame(ctx *gortsplib.ServerHandlerOnFrameCtx) {
// if we are the publisher, route frames to readers
if ctx.Session == sh.publisher {
for r := range sh.readers {
r.WriteFrame(ctx.TrackID, ctx.StreamType, ctx.Payload)
}
sh.stream.WriteFrame(ctx.TrackID, ctx.StreamType, ctx.Payload)
}
}
func main() {
// configure server
s := &gortsplib.Server{
Handler: &serverHandler{readers: make(map[*gortsplib.ServerSession]struct{})},
Handler: &serverHandler{},
UDPRTPAddress: ":8000",
UDPRTCPAddress: ":8001",
}