mirror of
https://github.com/oscar-davids/lpmsdemo.git
synced 2025-12-24 12:37:59 +08:00
229 lines
6.6 KiB
Go
229 lines
6.6 KiB
Go
//The RTMP server. This will put up a RTMP endpoint when starting up Swarm.
|
|
//To integrate with LPMS means your code will become the source / destination of the media server.
|
|
//This RTMP endpoint is mainly used for video upload. The expected url is rtmp://localhost:port/livepeer/stream
|
|
package core
|
|
|
|
import (
|
|
"context"
|
|
"net/http"
|
|
"net/url"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/golang/glog"
|
|
"github.com/oscar-davids/lpmsdemo/ffmpeg"
|
|
"github.com/oscar-davids/lpmsdemo/segmenter"
|
|
"github.com/oscar-davids/lpmsdemo/stream"
|
|
"github.com/oscar-davids/lpmsdemo/vidlistener"
|
|
"github.com/oscar-davids/lpmsdemo/vidplayer"
|
|
"github.com/oscar-davids/lpmsdemo/m3u8"
|
|
|
|
joy4rtmp "github.com/livepeer/joy4/format/rtmp"
|
|
)
|
|
|
|
var RetryCount = 3
|
|
var SegmenterRetryWait = 500 * time.Millisecond
|
|
|
|
// LPMS struct is the container of all the LPMS server related tools.
|
|
type LPMS struct {
|
|
// rtmpServer *joy4rtmp.Server
|
|
vidPlayer *vidplayer.VidPlayer
|
|
vidListener *vidlistener.VidListener
|
|
workDir string
|
|
httpAddr string
|
|
rtmpAddr string
|
|
}
|
|
|
|
type transcodeReq struct {
|
|
Formats []string
|
|
Bitrates []string
|
|
Codecin string
|
|
Codecout []string
|
|
StreamID string
|
|
}
|
|
|
|
type LPMSOpts struct {
|
|
RtmpAddr string
|
|
RtmpDisabled bool
|
|
HttpAddr string
|
|
HttpDisabled bool
|
|
VodPath string
|
|
WorkDir string
|
|
|
|
// Pass in a custom HTTP mux.
|
|
// Useful if a non-default mux needs to be used.
|
|
// The caller is responsible for setting up the listener
|
|
// on the mux; LPMS won't initialize it.
|
|
// If set, HttpPort and HttpDisabled are ignored.
|
|
HttpMux *http.ServeMux
|
|
}
|
|
|
|
func defaultLPMSOpts(opts *LPMSOpts) {
|
|
if opts.RtmpAddr == "" {
|
|
opts.RtmpAddr = "0.0.0.0:1935"
|
|
}
|
|
if opts.HttpAddr == "" {
|
|
opts.HttpAddr = "0.0.0.0:7935"
|
|
}
|
|
}
|
|
|
|
//New creates a new LPMS server object. It really just brokers everything to the components.
|
|
func New(opts *LPMSOpts) *LPMS {
|
|
defaultLPMSOpts(opts)
|
|
var rtmpServer *joy4rtmp.Server
|
|
if !opts.RtmpDisabled {
|
|
rtmpServer = &joy4rtmp.Server{Addr: opts.RtmpAddr}
|
|
}
|
|
var httpAddr string
|
|
if !opts.HttpDisabled && opts.HttpMux == nil {
|
|
httpAddr = opts.HttpAddr
|
|
}
|
|
player := vidplayer.NewVidPlayer(rtmpServer, opts.VodPath, opts.HttpMux)
|
|
listener := &vidlistener.VidListener{RtmpServer: rtmpServer}
|
|
return &LPMS{vidPlayer: player, vidListener: listener, workDir: opts.WorkDir, rtmpAddr: opts.RtmpAddr, httpAddr: httpAddr}
|
|
}
|
|
|
|
//Start starts the rtmp and http servers, and initializes ffmpeg
|
|
func (l *LPMS) Start(ctx context.Context) error {
|
|
ec := make(chan error, 1)
|
|
ffmpeg.InitFFmpeg()
|
|
if l.vidListener.RtmpServer != nil {
|
|
go func() {
|
|
glog.Infof("LPMS Server listening on rtmp://%v", l.vidListener.RtmpServer.Addr)
|
|
ec <- l.vidListener.RtmpServer.ListenAndServe()
|
|
}()
|
|
}
|
|
startHTTP := l.httpAddr != ""
|
|
if startHTTP {
|
|
go func() {
|
|
glog.Infof("HTTP Server listening on http://%v", l.httpAddr)
|
|
ec <- http.ListenAndServe(l.httpAddr, nil)
|
|
}()
|
|
}
|
|
|
|
if l.vidListener.RtmpServer != nil || startHTTP {
|
|
select {
|
|
case err := <-ec:
|
|
glog.Errorf("LPMS Server Error: %v. Quitting...", err)
|
|
return err
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
//HandleRTMPPublish offload to the video listener. To understand how it works, look at videoListener.HandleRTMPPublish.
|
|
func (l *LPMS) HandleRTMPPublish(
|
|
makeStreamID func(url *url.URL) (strmID stream.AppData),
|
|
gotStream func(url *url.URL, rtmpStrm stream.RTMPVideoStream) (err error),
|
|
endStream func(url *url.URL, rtmpStrm stream.RTMPVideoStream) error) {
|
|
|
|
l.vidListener.HandleRTMPPublish(makeStreamID, gotStream, endStream)
|
|
}
|
|
|
|
//HandleRTMPPlay offload to the video player
|
|
func (l *LPMS) HandleRTMPPlay(getStream func(url *url.URL) (stream.RTMPVideoStream, error)) error {
|
|
return l.vidPlayer.HandleRTMPPlay(getStream)
|
|
}
|
|
|
|
//HandleHLSPlay offload to the video player
|
|
func (l *LPMS) HandleHLSPlay(
|
|
getMasterPlaylist func(url *url.URL) (*m3u8.MasterPlaylist, error),
|
|
getMediaPlaylist func(url *url.URL) (*m3u8.MediaPlaylist, error),
|
|
getSegment func(url *url.URL) ([]byte, error)) {
|
|
|
|
l.vidPlayer.HandleHLSPlay(getMasterPlaylist, getMediaPlaylist, getSegment)
|
|
}
|
|
|
|
//SegmentRTMPToHLS takes a rtmp stream and re-packages it into a HLS stream with the specified segmenter options
|
|
func (l *LPMS) SegmentRTMPToHLS(ctx context.Context, rs stream.RTMPVideoStream, hs stream.HLSVideoStream, segOptions segmenter.SegmenterOptions) error {
|
|
// set localhost if necessary. Check more problematic addrs? [::] ?
|
|
rtmpAddr := l.rtmpAddr
|
|
if strings.HasPrefix(rtmpAddr, "0.0.0.0") {
|
|
rtmpAddr = "127.0.0.1" + rtmpAddr[len("0.0.0.0"):]
|
|
}
|
|
localRtmpUrl := "rtmp://" + rtmpAddr + "/stream/" + rs.GetStreamID()
|
|
|
|
glog.Infof("Segment RTMP Req: %v", localRtmpUrl)
|
|
|
|
//Invoke Segmenter
|
|
s := segmenter.NewFFMpegVideoSegmenter(l.workDir, hs.GetStreamID(), localRtmpUrl, segOptions)
|
|
c := make(chan error, 1)
|
|
ffmpegCtx, ffmpegCancel := context.WithCancel(context.Background())
|
|
go func() { c <- rtmpToHLS(s, ffmpegCtx, true) }()
|
|
|
|
//Kick off go routine to write HLS segments
|
|
segCtx, segCancel := context.WithCancel(context.Background())
|
|
go func() {
|
|
c <- func() error {
|
|
var seg *segmenter.VideoSegment
|
|
var err error
|
|
for {
|
|
if hs == nil {
|
|
glog.Errorf("HLS Stream is nil")
|
|
return segmenter.ErrSegmenter
|
|
}
|
|
|
|
for i := 1; i <= RetryCount; i++ {
|
|
seg, err = s.PollSegment(segCtx)
|
|
if err == nil || err == context.Canceled || err == context.DeadlineExceeded {
|
|
break
|
|
} else if i < RetryCount {
|
|
glog.Errorf("Error polling Segment: %v, Retrying", err)
|
|
time.Sleep(SegmenterRetryWait)
|
|
}
|
|
}
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
ss := stream.HLSSegment{SeqNo: seg.SeqNo, Data: seg.Data, Name: seg.Name, Duration: seg.Length.Seconds()}
|
|
//glog.Infof("Writing stream: %v, duration:%v, len:%v", ss.Name, ss.Duration, len(seg.Data))
|
|
//if err = hs.AddHLSSegment(&ss); err != nil {
|
|
// glog.Errorf("Error adding segment: %v", err)
|
|
//}
|
|
//TrigerTranscode
|
|
if err = hs.TrigerTranscode(&ss); err != nil {
|
|
glog.Errorf("Error adding segment: %v", err)
|
|
}
|
|
select {
|
|
case <-segCtx.Done():
|
|
return segCtx.Err()
|
|
default:
|
|
}
|
|
}
|
|
}()
|
|
}()
|
|
|
|
select {
|
|
case err := <-c:
|
|
if err != nil && err != context.Canceled {
|
|
glog.Errorf("Error segmenting stream: %v", err)
|
|
}
|
|
ffmpegCancel()
|
|
segCancel()
|
|
return err
|
|
case <-ctx.Done():
|
|
ffmpegCancel()
|
|
segCancel()
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func rtmpToHLS(s segmenter.VideoSegmenter, ctx context.Context, cleanup bool) error {
|
|
var err error
|
|
for i := 1; i <= RetryCount; i++ {
|
|
err = s.RTMPToHLS(ctx, cleanup)
|
|
if err == nil {
|
|
break
|
|
} else if i < RetryCount {
|
|
glog.Errorf("Error Invoking Segmenter: %v, Retrying", err)
|
|
time.Sleep(SegmenterRetryWait)
|
|
}
|
|
}
|
|
return err
|
|
}
|