mirror of
https://github.com/livepeer/lpms
synced 2025-09-26 19:51:36 +08:00
standalone SRS V0.1
This commit is contained in:
169
README.md
169
README.md
@@ -1,6 +1,167 @@
|
||||
# LPMS - Livepeer media server
|
||||
|
||||
This is meant to be a standalone server, but at the moment it is
|
||||
coupled with the go-livepeer repo. For the time being development will
|
||||
proceed at the
|
||||
[github.com/livepeer/go-livepeer/lpms package](https://github.com/livepeer/go-livepeer/lpms).
|
||||
LPMS is a media server that can run independantly, or on top of the [Livepeer](https://livepeer.org)
|
||||
network. It allows you to manipulate / broadcast a live video stream. Currently, LPMS supports RTMP
|
||||
as input format and RTMP/HLS as output formats.
|
||||
|
||||
LPMS can be integrated into another service, or run as a standalone service. To try LPMS as a
|
||||
standalone service, simply get the package:
|
||||
```
|
||||
go get github.com/livepeer/lpms
|
||||
```
|
||||
|
||||
Go to the lpms root directory, and run
|
||||
```
|
||||
./lpms
|
||||
```
|
||||
|
||||
### Requirements
|
||||
|
||||
LPMS requires ffmpeg. To install it on OSX, use homebrew. As a part of this installation, `ffmpeg` and `ffplay` should be installed as commandline utilities.
|
||||
|
||||
```
|
||||
//This may take a few minutes
|
||||
brew install ffmpeg --with-fdk-aac --with-ffplay --with-freetype --with-libass --with-libquvi --with-libvorbis --with-libvpx --with-opus --with-x265
|
||||
```
|
||||
|
||||
LPMS uses [SRS](http://ossrs.net/srs.release/releases/) as a transcoding backend. It's included in
|
||||
the `/bin` directory for testing purposes. Make sure you are running SRS before testing out LPMS.
|
||||
|
||||
To start srs, run
|
||||
```
|
||||
./bin/srs -c ./bin/srs.conf
|
||||
```
|
||||
|
||||
### Testing out LPMS
|
||||
|
||||
The test LPMS server exposes a few different endpoints:
|
||||
1. `rtmp://localhost:1936/stream/test` for uploading/viewing RTMP video stream.
|
||||
2. `http://localhost:8000/transcode` for issuing transcode request.
|
||||
3. `http://localhost:8000/stream/test_tran.m3u8` for consuming the transcoded video.
|
||||
|
||||
Do the following steps to view a live stream video:
|
||||
1. Upload an RTMP video stream to `rtmp://localhost:1936/stream/test`. We recommend using [OBS](https://obsproject.com/download).
|
||||
|
||||
|
||||

|
||||
|
||||
|
||||
2. If you have successfully uploaded the stream, you should see something like this in the LPMS output
|
||||
```
|
||||
I0324 09:44:14.639405 80673 listener.go:28] RTMP server got upstream
|
||||
I0324 09:44:14.639429 80673 listener.go:42] Got RTMP Stream: test
|
||||
```
|
||||
3. Now you have a RTMP video stream running, we can view it from the server. Simply run `ffplay rtmp://localhost:1936/stream/test`, you should see the rtmp video playback.
|
||||
4. Let's transcode the video to HLS. Before issuing the transcoding request, make sure your SRS is running.
|
||||
```
|
||||
//To start SRS
|
||||
./bin/srs -c ./bin/srs.conf
|
||||
```
|
||||
|
||||

|
||||
|
||||
5. To issue the transcoding request, we can use curl.
|
||||
```
|
||||
curl -H "Content-Type: application/json" -X POST -d '{"StreamID":"test"}' http://localhost:8000/transcode
|
||||
```
|
||||
|
||||
5. You should see your SRS console start logging. Now just open up `hlsVideo.html` in Safari, and you should see the HLS video. There may be a delay due to the video transcoding - we'll expose more parameters to lower that delay in the future. Note that in typical internet broadcasting today, there is usually a delay of 30 - 90 seconds.
|
||||
|
||||
|
||||
### Integrating LPMS
|
||||
|
||||
LPMS exposes a few different methods for customization. As an example, take a look at `cmd/lpms.go`.
|
||||
|
||||
To create a new LPMS server:
|
||||
```
|
||||
//Specify ports you want the server to run on, and which port SRS is running on (should be specified)
|
||||
//in srs.conf
|
||||
lpms := lpms.New("1936", "8000", "2436", "7936")
|
||||
```
|
||||
|
||||
To handle RTMP publish:
|
||||
```
|
||||
lpms.HandleRTMPPublish(
|
||||
//getStreamID
|
||||
func(reqPath string) (string, error) {
|
||||
return getStreamIDFromPath(reqPath), nil
|
||||
},
|
||||
//getStream
|
||||
func(reqPath string) (*stream.Stream, error) {
|
||||
streamID := getStreamIDFromPath(reqPath)
|
||||
stream := stream.NewStream(streamID)
|
||||
streamDB.db[streamID] = stream
|
||||
return stream, nil
|
||||
},
|
||||
//finishStream
|
||||
func(reqPath string) {
|
||||
delete(streamDB.db, getStreamIDFromPath(reqPath))
|
||||
})
|
||||
```
|
||||
|
||||
To handle RTMP playback:
|
||||
```
|
||||
lpms.HandleRTMPPlay(
|
||||
//getStream
|
||||
func(ctx context.Context, reqPath string, dst av.MuxCloser) error {
|
||||
glog.Infof("Got req: ", reqPath)
|
||||
streamID := getStreamIDFromPath(reqPath)
|
||||
src := streamDB.db[streamID]
|
||||
|
||||
if src != nil {
|
||||
src.ReadRTMPFromStream(ctx, dst)
|
||||
} else {
|
||||
glog.Error("Cannot find stream for ", streamID)
|
||||
return stream.ErrNotFound
|
||||
}
|
||||
return nil
|
||||
})
|
||||
```
|
||||
|
||||
To handle transcode request:
|
||||
```
|
||||
lpms.HandleTranscode(
|
||||
//getInStream
|
||||
func(ctx context.Context, streamID string) (*stream.Stream, error) {
|
||||
if stream := streamDB.db[streamID]; stream != nil {
|
||||
return stream, nil
|
||||
}
|
||||
|
||||
return nil, stream.ErrNotFound
|
||||
},
|
||||
//getOutStream
|
||||
func(ctx context.Context, streamID string) (*stream.Stream, error) {
|
||||
//For this example, we'll name the transcoded stream "{streamID}_tran"
|
||||
newStream := stream.NewStream(streamID + "_tran")
|
||||
streamDB.db[newStream.StreamID] = newStream
|
||||
return newStream, nil
|
||||
})
|
||||
```
|
||||
|
||||
To handle HLS playback:
|
||||
```
|
||||
lpms.HandleHLSPlay(
|
||||
//getHLSBuffer
|
||||
func(reqPath string) (*stream.HLSBuffer, error) {
|
||||
streamID := getHLSStreamIDFromPath(reqPath)
|
||||
glog.Infof("Got HTTP Req for stream: %v", streamID)
|
||||
buffer := bufferDB.db[streamID]
|
||||
s := streamDB.db[streamID]
|
||||
|
||||
if s == nil {
|
||||
return nil, stream.ErrNotFound
|
||||
}
|
||||
|
||||
if buffer == nil {
|
||||
//Create the buffer and start copying the stream into the buffer
|
||||
buffer = stream.NewHLSBuffer()
|
||||
bufferDB.db[streamID] = buffer
|
||||
ec := make(chan error, 1)
|
||||
go func() { ec <- s.ReadHLSFromStream(buffer) }()
|
||||
}
|
||||
return buffer, nil
|
||||
|
||||
})
|
||||
```
|
||||
|
||||
You can follow the development of LPMS and Livepeer @ our [forum](http://forum.livepeer.org)
|
||||
|
64
bin/srs.conf
Executable file
64
bin/srs.conf
Executable file
@@ -0,0 +1,64 @@
|
||||
listen 2435;
|
||||
max_connections 200;
|
||||
daemon off;
|
||||
srs_log_tank console;
|
||||
http_server {
|
||||
enabled on;
|
||||
listen 7935;
|
||||
dir ./objs/nginx/html;
|
||||
}
|
||||
|
||||
vhost __defaultVhost__ {
|
||||
hls {
|
||||
enabled on;
|
||||
hls_fragment 10;
|
||||
hls_window 60;
|
||||
hls_path ./objs/nginx/html;
|
||||
hls_m3u8_file [app]/[stream].m3u8;
|
||||
hls_ts_file [app]/[stream]-[seq].ts;
|
||||
}
|
||||
|
||||
transcode {
|
||||
enabled on;
|
||||
ffmpeg /usr/local/bin/ffmpeg;
|
||||
engine hls500 {
|
||||
enabled on;
|
||||
vfilter {
|
||||
}
|
||||
vcodec libx264;
|
||||
vbitrate 500;
|
||||
vfps 25;
|
||||
vwidth 720;
|
||||
vheight 480;
|
||||
vthreads 12;
|
||||
vprofile main;
|
||||
vpreset medium;
|
||||
vparams {
|
||||
}
|
||||
acodec libfdk_aac;
|
||||
aparams {
|
||||
}
|
||||
output rtmp://127.0.0.1:[port]/[app]?vhost=[vhost]/[stream]_[engine];
|
||||
}
|
||||
engine hls1000 {
|
||||
enabled on;
|
||||
vfilter {
|
||||
}
|
||||
vcodec libx264;
|
||||
vbitrate 1000;
|
||||
vfps 25;
|
||||
vwidth 720;
|
||||
vheight 480;
|
||||
vthreads 12;
|
||||
vprofile main;
|
||||
vpreset medium;
|
||||
vparams {
|
||||
}
|
||||
acodec libfdk_aac;
|
||||
aparams {
|
||||
}
|
||||
output rtmp://127.0.0.1:[port]/[app]?vhost=[vhost]/[stream]_[engine];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
138
cmd/lpms.go
Normal file
138
cmd/lpms.go
Normal file
@@ -0,0 +1,138 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"github.com/livepeer/lpms"
|
||||
"github.com/livepeer/lpms/stream"
|
||||
|
||||
"github.com/nareix/joy4/av"
|
||||
)
|
||||
|
||||
type StreamDB struct {
|
||||
db map[string]*stream.Stream
|
||||
}
|
||||
|
||||
type BufferDB struct {
|
||||
db map[string]*stream.HLSBuffer
|
||||
}
|
||||
|
||||
func main() {
|
||||
flag.Set("logtostderr", "true")
|
||||
flag.Parse()
|
||||
|
||||
lpms := lpms.New("1935", "8000", "2435", "7935")
|
||||
streamDB := &StreamDB{db: make(map[string]*stream.Stream)}
|
||||
bufferDB := &BufferDB{db: make(map[string]*stream.HLSBuffer)}
|
||||
|
||||
lpms.HandleRTMPPublish(
|
||||
//getStreamID
|
||||
func(reqPath string) (string, error) {
|
||||
return getStreamIDFromPath(reqPath), nil
|
||||
},
|
||||
//getStream
|
||||
func(reqPath string) (*stream.Stream, error) {
|
||||
streamID := getStreamIDFromPath(reqPath)
|
||||
stream := stream.NewStream(streamID)
|
||||
streamDB.db[streamID] = stream
|
||||
return stream, nil
|
||||
},
|
||||
//finishStream
|
||||
func(reqPath string) {
|
||||
streamID := getStreamIDFromPath(reqPath)
|
||||
delete(streamDB.db, streamID)
|
||||
tranStreamID := streamID + "_tran"
|
||||
delete(streamDB.db, tranStreamID)
|
||||
})
|
||||
|
||||
lpms.HandleTranscode(
|
||||
//getInStream
|
||||
func(ctx context.Context, streamID string) (*stream.Stream, error) {
|
||||
if stream := streamDB.db[streamID]; stream != nil {
|
||||
return stream, nil
|
||||
}
|
||||
|
||||
return nil, stream.ErrNotFound
|
||||
},
|
||||
//getOutStream
|
||||
func(ctx context.Context, streamID string) (*stream.Stream, error) {
|
||||
//For this example, we'll name the transcoded stream "{streamID}_tran"
|
||||
newStream := stream.NewStream(streamID + "_tran")
|
||||
streamDB.db[newStream.StreamID] = newStream
|
||||
return newStream, nil
|
||||
})
|
||||
|
||||
lpms.HandleHLSPlay(
|
||||
//getHLSBuffer
|
||||
func(reqPath string) (*stream.HLSBuffer, error) {
|
||||
streamID := getHLSStreamIDFromPath(reqPath)
|
||||
glog.Infof("Got HTTP Req for stream: %v", streamID)
|
||||
buffer := bufferDB.db[streamID]
|
||||
s := streamDB.db[streamID]
|
||||
|
||||
if s == nil {
|
||||
return nil, stream.ErrNotFound
|
||||
}
|
||||
|
||||
if buffer == nil {
|
||||
//Create the buffer and start copying the stream into the buffer
|
||||
buffer = stream.NewHLSBuffer()
|
||||
bufferDB.db[streamID] = buffer
|
||||
ec := make(chan error, 1)
|
||||
go func() { ec <- s.ReadHLSFromStream(buffer) }()
|
||||
//May want to handle the error here
|
||||
}
|
||||
return buffer, nil
|
||||
|
||||
})
|
||||
|
||||
lpms.HandleRTMPPlay(
|
||||
//getStream
|
||||
func(ctx context.Context, reqPath string, dst av.MuxCloser) error {
|
||||
glog.Infof("Got req: ", reqPath)
|
||||
streamID := getStreamIDFromPath(reqPath)
|
||||
src := streamDB.db[streamID]
|
||||
|
||||
if src != nil {
|
||||
src.ReadRTMPFromStream(ctx, dst)
|
||||
} else {
|
||||
glog.Error("Cannot find stream for ", streamID)
|
||||
return stream.ErrNotFound
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
//Helper function to print out all the streams
|
||||
http.HandleFunc("/streams", func(w http.ResponseWriter, r *http.Request) {
|
||||
streams := []string{}
|
||||
|
||||
for k, _ := range streamDB.db {
|
||||
streams = append(streams, k)
|
||||
}
|
||||
|
||||
if len(streams) == 0 {
|
||||
w.Write([]byte("no streams"))
|
||||
return
|
||||
}
|
||||
str := strings.Join(streams, ",")
|
||||
w.Write([]byte(str))
|
||||
})
|
||||
|
||||
lpms.Start()
|
||||
}
|
||||
|
||||
func getStreamIDFromPath(reqPath string) string {
|
||||
return "test"
|
||||
}
|
||||
|
||||
func getHLSStreamIDFromPath(reqPath string) string {
|
||||
if strings.HasSuffix(reqPath, ".m3u8") {
|
||||
return "test_tran"
|
||||
} else {
|
||||
return "test_tran"
|
||||
}
|
||||
}
|
@@ -1,32 +0,0 @@
|
||||
package common
|
||||
|
||||
import "sync"
|
||||
|
||||
type config struct {
|
||||
SrsRTMPPort string
|
||||
SrsHTTPPort string
|
||||
LpmsRTMPPort string
|
||||
LpmsHTTPPort string
|
||||
}
|
||||
|
||||
var instance *config
|
||||
var once sync.Once
|
||||
|
||||
func GetConfig() *config {
|
||||
once.Do(func() {
|
||||
instance = &config{}
|
||||
})
|
||||
return instance
|
||||
}
|
||||
|
||||
func SetConfig(srsRTMPPort string, srsHTTPPort string, lpmsRTMPPOrt string, lpmsHTTPPort string) {
|
||||
c := GetConfig()
|
||||
c.LpmsHTTPPort = lpmsHTTPPort
|
||||
c.LpmsRTMPPort = lpmsRTMPPOrt
|
||||
c.SrsHTTPPort = srsHTTPPort
|
||||
c.SrsRTMPPort = srsRTMPPort
|
||||
}
|
||||
|
||||
// func (self *Config) GetSrsRTMPPort() string {
|
||||
// return self.SrsRTMPPort
|
||||
// }
|
9
hlsVideo.html
Normal file
9
hlsVideo.html
Normal file
@@ -0,0 +1,9 @@
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<meta charset=utf-8 />
|
||||
<title>LivePeer</title>
|
||||
</header>
|
||||
<body>
|
||||
<video src="http://localhost:8000/stream/test_tran.m3u8" controls></video>
|
||||
</body>
|
378
io/io.go
378
io/io.go
@@ -1,378 +0,0 @@
|
||||
package io
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/logger"
|
||||
"github.com/ethereum/go-ethereum/logger/glog"
|
||||
"github.com/livepeer/go-livepeer/livepeer/storage/streaming"
|
||||
"github.com/golang/groupcache/lru"
|
||||
"github.com/kz26/m3u8"
|
||||
lpmsCommon "github.com/livepeer/lpms/common"
|
||||
"github.com/livepeer/lpms/types"
|
||||
"github.com/nareix/joy4/av"
|
||||
joy4rtmp "github.com/nareix/joy4/format/rtmp"
|
||||
)
|
||||
|
||||
func CopyChannelToChannel(inChan chan *streaming.VideoChunk, outChan chan *streaming.VideoChunk) {
|
||||
for {
|
||||
select {
|
||||
case chunk := <-inChan:
|
||||
outChan <- chunk
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func Transcode(inChan chan *streaming.VideoChunk, outChan chan *streaming.VideoChunk, newStreamID streaming.StreamID,
|
||||
format string, bitrate string, codecin string, codecout string, closeStreamC chan bool) (err error) {
|
||||
if codecin != "RTMP" {
|
||||
return fmt.Errorf("Only support RTMP as input stream")
|
||||
}
|
||||
|
||||
if format != "HLS" {
|
||||
return fmt.Errorf("Only support HLS as output format")
|
||||
}
|
||||
|
||||
if bitrate != "1000" && bitrate != "500" {
|
||||
return fmt.Errorf("Only support 500 and 1000 bitrate")
|
||||
}
|
||||
|
||||
dstConn, err := joy4rtmp.Dial("rtmp://localhost:" + lpmsCommon.GetConfig().SrsRTMPPort + "/stream/" + string(newStreamID))
|
||||
if err != nil {
|
||||
glog.V(logger.Error).Infof("Error connecting to SRS server: ", err)
|
||||
return err
|
||||
}
|
||||
|
||||
//Upload the video to SRS
|
||||
go CopyRTMPFromChannel(dstConn, inChan, closeStreamC)
|
||||
|
||||
msChan := make(chan *types.Download, 1024)
|
||||
m3u8Chan := make(chan []byte)
|
||||
hlsSegChan := make(chan streaming.HlsSegment)
|
||||
//Download the playlist
|
||||
go GetHlsPlaylist("http://localhost:"+lpmsCommon.GetConfig().SrsHTTPPort+"/stream/"+string(newStreamID)+"_hls"+bitrate+".m3u8", time.Duration(0), true, msChan, m3u8Chan)
|
||||
//Download the segments
|
||||
go DownloadHlsSegment(msChan, hlsSegChan)
|
||||
//Copy the playlist and hls segments to a stream
|
||||
go CopyHlsToChannel(m3u8Chan, hlsSegChan, outChan, closeStreamC)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
//Copy packets from channels in the streamer to our destination muxer
|
||||
func CopyRTMPFromStream(dst av.Muxer, stream *streaming.Stream, closeStreamC chan bool) (err error) {
|
||||
if len(stream.SrcVideoChan) > 0 {
|
||||
//First check SrcVideoChan, and then check DstVideoChan
|
||||
CopyRTMPFromChannel(dst, stream.SrcVideoChan, closeStreamC)
|
||||
} else {
|
||||
CopyRTMPFromChannel(dst, stream.DstVideoChan, closeStreamC)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func CopyRTMPFromChannel(dst av.Muxer, videoChan chan *streaming.VideoChunk, closeStreamC chan bool) (err error) {
|
||||
chunk := <-videoChan
|
||||
if err := dst.WriteHeader(chunk.HeaderStreams); err != nil {
|
||||
fmt.Println("Error writing header copying from channel")
|
||||
return err
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case chunk := <-videoChan:
|
||||
// fmt.Println("Copying from channel")
|
||||
if chunk.ID == streaming.EOFStreamMsgID {
|
||||
fmt.Println("Copying EOF from channel")
|
||||
closeStreamC <- true
|
||||
err := dst.WriteTrailer()
|
||||
if err != nil {
|
||||
fmt.Println("Error writing trailer: ", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
err := dst.WritePacket(chunk.Packet)
|
||||
if chunk.Seq%100 == 0 {
|
||||
glog.V(logger.Info).Infof("Copy RTMP to muxer from channel. %d", chunk.Seq)
|
||||
}
|
||||
if err != nil {
|
||||
glog.V(logger.Error).Infof("Error writing packet to video player: %s", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//Copy HLS segments and playlist to the streamer channel.
|
||||
func CopyHlsToChannel(m3u8Chan chan []byte, hlsSegChan chan streaming.HlsSegment, outChan chan *streaming.VideoChunk, closeStreamC chan bool) {
|
||||
for {
|
||||
select {
|
||||
case m3u8 := <-m3u8Chan:
|
||||
// stream.M3U8 = m3u8 //Just for testing
|
||||
fmt.Printf("Sending HLS Playlist: %s\n", string(m3u8))
|
||||
CopyPacketsToChannel(1, nil, nil, m3u8, streaming.HlsSegment{}, outChan, closeStreamC)
|
||||
case hlsSeg := <-hlsSegChan:
|
||||
regex, _ := regexp.Compile("-(\\d)*")
|
||||
match := regex.FindString(hlsSeg.Name)
|
||||
segNumStr := match[1:len(match)]
|
||||
segNum, _ := strconv.Atoi(segNumStr)
|
||||
// stream.HlsSegNameMap[hlsSeg.Name] = hlsSeg.Data //Just for testing
|
||||
fmt.Printf("Sending HLS Segment: %d, %s\n", segNum, segNumStr)
|
||||
CopyPacketsToChannel(int64(segNum), nil, nil, nil, hlsSeg, outChan, closeStreamC)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// func CopyHlsToChannel(stream *streaming.Stream) (err error) {
|
||||
// for {
|
||||
// select {
|
||||
// case m3u8 := <-stream.M3U8Chan:
|
||||
// // stream.M3U8 = m3u8 //Just for testing
|
||||
// fmt.Printf("Sending HLS Playlist: %s\n", string(m3u8))
|
||||
// CopyPacketsToChannel(1, nil, nil, m3u8, streaming.HlsSegment{}, stream.SrcVideoChan)
|
||||
// case hlsSeg := <-stream.HlsSegChan:
|
||||
// regex, _ := regexp.Compile("-(\\d)*")
|
||||
// match := regex.FindString(hlsSeg.Name)
|
||||
// segNumStr := match[1:len(match)]
|
||||
// segNum, _ := strconv.Atoi(segNumStr)
|
||||
// // stream.HlsSegNameMap[hlsSeg.Name] = hlsSeg.Data //Just for testing
|
||||
// fmt.Printf("Sending HLS Segment: %d, %s\n", segNum, segNumStr)
|
||||
// CopyPacketsToChannel(int64(segNum), nil, nil, nil, hlsSeg, stream.SrcVideoChan)
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
//Copy packets from our source demuxer to the streamer channels. For now we put the header in every packet. We can
|
||||
//optimize for packet size later.
|
||||
func CopyToChannel(src av.Demuxer, stream *streaming.Stream, closeStreamC chan bool) (err error) {
|
||||
var streams []av.CodecData
|
||||
if streams, err = src.Streams(); err != nil {
|
||||
return
|
||||
}
|
||||
for seq := int64(0); ; seq++ {
|
||||
if err = CopyPacketsToChannel(seq, src, streams, nil, streaming.HlsSegment{}, stream.SrcVideoChan, closeStreamC); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// func CopyPacketsToChannel(seq int64, src av.PacketReader, headerStreams []av.CodecData, m3u8 []byte, hlsSeg streaming.HlsSegment, stream *streaming.Stream) (err error) {
|
||||
func CopyPacketsToChannel(seq int64, src av.PacketReader, headerStreams []av.CodecData, m3u8 []byte, hlsSeg streaming.HlsSegment, outVideoChan chan *streaming.VideoChunk, closeStreamC chan bool) (err error) {
|
||||
// for seq := int64(0); ; seq++ {
|
||||
var pkt av.Packet
|
||||
if src != nil {
|
||||
if pkt, err = src.ReadPacket(); err != nil {
|
||||
if err == io.EOF {
|
||||
chunk := &streaming.VideoChunk{
|
||||
ID: streaming.EOFStreamMsgID,
|
||||
Seq: seq,
|
||||
HeaderStreams: headerStreams,
|
||||
Packet: pkt,
|
||||
}
|
||||
// stream.SrcVideoChan <- chunk
|
||||
outVideoChan <- chunk
|
||||
fmt.Println("Done with packet reading: ", err)
|
||||
|
||||
// Close the channel so that the protocol.go loop
|
||||
// reading from the channel doesn't block
|
||||
close(outVideoChan)
|
||||
closeStreamC <- true
|
||||
return fmt.Errorf("EOF")
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
chunk := &streaming.VideoChunk{
|
||||
ID: streaming.DeliverStreamMsgID,
|
||||
Seq: seq,
|
||||
HeaderStreams: headerStreams,
|
||||
Packet: pkt,
|
||||
M3U8: m3u8,
|
||||
HLSSegData: hlsSeg.Data,
|
||||
HLSSegName: hlsSeg.Name,
|
||||
}
|
||||
|
||||
select {
|
||||
// case stream.SrcVideoChan <- chunk:
|
||||
case outVideoChan <- chunk:
|
||||
if chunk.Seq%100 == 0 {
|
||||
fmt.Printf("sent video chunk: %d, %s\n", chunk.Seq, hlsSeg.Name)
|
||||
}
|
||||
default:
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func doRequest(c *http.Client, req *http.Request) (*http.Response, error) {
|
||||
// req.Header.Set("User-Agent", USER_AGENT)
|
||||
resp, err := c.Do(req)
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func DownloadHlsSegment(dlc chan *types.Download, segChan chan streaming.HlsSegment) {
|
||||
for v := range dlc {
|
||||
req, err := http.NewRequest("GET", v.URI, nil)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
resp, err := doRequest(&http.Client{}, req)
|
||||
if err != nil {
|
||||
log.Print(err)
|
||||
continue
|
||||
}
|
||||
if resp.StatusCode != 200 {
|
||||
log.Printf("Received HTTP %v for %v\n", resp.StatusCode, v.URI)
|
||||
continue
|
||||
}
|
||||
|
||||
// Get the segment name - need to store in a map
|
||||
match := strings.Split(v.URI, "/")
|
||||
filename := match[len(match)-1]
|
||||
buf := new(bytes.Buffer)
|
||||
_, err = io.Copy(buf, resp.Body)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
seg := &streaming.HlsSegment{
|
||||
Data: buf.Bytes(),
|
||||
Name: filename,
|
||||
}
|
||||
// fmt.Println("Got HLS segment: ", filename)
|
||||
|
||||
segChan <- *seg
|
||||
resp.Body.Close()
|
||||
// log.Printf("Downloaded %v\n", v.URI)
|
||||
}
|
||||
}
|
||||
|
||||
func GetHlsPlaylist(urlStr string, recTime time.Duration, useLocalTime bool, dlc chan *types.Download, playlistChan chan []byte) {
|
||||
fmt.Println("Getting playlist: ", urlStr)
|
||||
startTime := time.Now()
|
||||
var recDuration time.Duration = 0
|
||||
cache := lru.New(1024)
|
||||
playlistUrl, err := url.Parse(urlStr)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
for {
|
||||
req, err := http.NewRequest("GET", urlStr, nil)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
resp, err := doRequest(&http.Client{}, req)
|
||||
if err != nil {
|
||||
log.Print(err)
|
||||
time.Sleep(time.Duration(3) * time.Second)
|
||||
}
|
||||
|
||||
playlist, listType, err := m3u8.DecodeFrom(resp.Body, true)
|
||||
if playlist == nil {
|
||||
//SRS doesn't serve the video right away. It take a few seconds. May be a param we can tune later.
|
||||
waitTime := time.Second * 5
|
||||
fmt.Println("Cannot read playlist from ", urlStr, resp.StatusCode, "Waiting", waitTime)
|
||||
time.Sleep(waitTime)
|
||||
} else {
|
||||
// fmt.Println("Got playlist", urlStr)
|
||||
buf := playlist.Encode()
|
||||
bytes := buf.Bytes()
|
||||
// fmt.Println("sending playlist to playlistChan", bytes)
|
||||
playlistChan <- bytes
|
||||
resp.Body.Close()
|
||||
if listType == m3u8.MEDIA {
|
||||
mpl := playlist.(*m3u8.MediaPlaylist)
|
||||
for _, v := range mpl.Segments {
|
||||
if v != nil {
|
||||
var msURI string
|
||||
if strings.HasPrefix(v.URI, "http") {
|
||||
msURI, err = url.QueryUnescape(v.URI)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
} else {
|
||||
msUrl, err := playlistUrl.Parse(v.URI)
|
||||
if err != nil {
|
||||
log.Print(err)
|
||||
continue
|
||||
}
|
||||
msURI, err = url.QueryUnescape(msUrl.String())
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}
|
||||
_, hit := cache.Get(msURI)
|
||||
if !hit {
|
||||
cache.Add(msURI, nil)
|
||||
if useLocalTime {
|
||||
recDuration = time.Now().Sub(startTime)
|
||||
} else {
|
||||
recDuration += time.Duration(int64(v.Duration * 1000000000))
|
||||
}
|
||||
dlc <- &types.Download{
|
||||
URI: msURI,
|
||||
TotalDuration: recDuration}
|
||||
}
|
||||
if recTime != 0 && recDuration != 0 && recDuration >= recTime {
|
||||
close(dlc)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
if mpl.Closed {
|
||||
close(dlc)
|
||||
return
|
||||
} else {
|
||||
time.Sleep(time.Duration(int64(mpl.TargetDuration * 1000000000)))
|
||||
}
|
||||
} else {
|
||||
log.Fatal("Not a valid media playlist")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func rememberHlsSegs(nameSegMap *map[string][]byte, segChan chan streaming.HlsSegment) {
|
||||
for {
|
||||
select {
|
||||
case seg := <-segChan:
|
||||
fmt.Println("Got a HLS segment:", seg.Name)
|
||||
(*nameSegMap)[seg.Name] = seg.Data
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func createTranscodeId(streamID streaming.StreamID, bReq types.BroadcastReq) common.Hash {
|
||||
//Create a "transcodeID" in the same keyspace as stream.ID
|
||||
fmt.Println("Creating transcode ID with: ", streamID, bReq)
|
||||
h := sha256.New()
|
||||
h.Write([]byte(streamID))
|
||||
h.Write([]byte(fmt.Sprintf("%v", bReq)))
|
||||
id := h.Sum(nil)
|
||||
|
||||
var x common.Hash
|
||||
if len(x) != len(id) {
|
||||
panic("Error creating trasncode ID")
|
||||
}
|
||||
for i := 0; i < len(x); i++ {
|
||||
x[i] = id[i]
|
||||
}
|
||||
|
||||
fmt.Println("Transcode ID: ", x)
|
||||
|
||||
return x
|
||||
}
|
158
lpms.go
158
lpms.go
@@ -1,26 +1,148 @@
|
||||
//Adding the RTMP server. This will put up a RTMP endpoint when starting up Swarm.
|
||||
//It's a simple RTMP server that will take a video stream and play it right back out.
|
||||
//After bringing up the Swarm node with RTMP enabled, try it out using:
|
||||
//
|
||||
//ffmpeg -re -i bunny.mp4 -c copy -f flv rtmp://localhost/movie
|
||||
//ffplay rtmp://localhost/movie
|
||||
|
||||
//The RTMP server. This will put up a RTMP endpoint when starting up Swarm.
|
||||
//To integrate with LPMS means your code will become the source / destination of the media server.
|
||||
//This RTMP endpoint is mainly used for video upload. The expected url is rtmp://localhost:port/livepeer/stream
|
||||
package lpms
|
||||
|
||||
import (
|
||||
"github.com/livepeer/go-livepeer/livepeer/network"
|
||||
"github.com/livepeer/go-livepeer/livepeer/storage"
|
||||
"github.com/livepeer/go-livepeer/livepeer/storage/streaming"
|
||||
"github.com/livepeer/lpms/common"
|
||||
"github.com/livepeer/lpms/server"
|
||||
streamingVizClient "github.com/livepeer/streamingviz/client"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"github.com/livepeer/lpms/stream"
|
||||
"github.com/livepeer/lpms/transcoder"
|
||||
"github.com/livepeer/lpms/vidlistener"
|
||||
"github.com/livepeer/lpms/vidplayer"
|
||||
"github.com/nareix/joy4/av"
|
||||
|
||||
joy4rtmp "github.com/nareix/joy4/format/rtmp"
|
||||
)
|
||||
|
||||
func StartVideoServer(rtmpPort string, httpPort string, srsRtmpPort string, srsHttpPort string, streamer *streaming.Streamer,
|
||||
forwarder storage.CloudStore, streamdb *network.StreamDB, viz *streamingVizClient.Client) {
|
||||
type LPMS struct {
|
||||
rtmpServer *joy4rtmp.Server
|
||||
vidPlayer *vidplayer.VidPlayer
|
||||
vidListen *vidlistener.VidListener
|
||||
httpPort string
|
||||
srsRTMPPort string
|
||||
srsHTTPPort string
|
||||
}
|
||||
|
||||
common.SetConfig(srsRtmpPort, srsHttpPort, rtmpPort, httpPort)
|
||||
server.StartRTMPServer(rtmpPort, srsRtmpPort, srsHttpPort, streamer, forwarder, viz)
|
||||
server.StartHTTPServer(rtmpPort, httpPort, srsRtmpPort, srsHttpPort, streamer, forwarder, streamdb, viz)
|
||||
type transcodeReq struct {
|
||||
Formats []string
|
||||
Bitrates []string
|
||||
Codecin string
|
||||
Codecout []string
|
||||
StreamID string
|
||||
}
|
||||
|
||||
//New creates a new LPMS server object. It really just brokers everything to the components.
|
||||
func New(rtmpPort string, httpPort string, srsRTMPPort string, srsHTTPPort string) *LPMS {
|
||||
server := &joy4rtmp.Server{Addr: (":" + rtmpPort)}
|
||||
player := &vidplayer.VidPlayer{RtmpServer: server}
|
||||
listener := &vidlistener.VidListener{RtmpServer: server}
|
||||
return &LPMS{rtmpServer: server, vidPlayer: player, vidListen: listener, srsRTMPPort: srsRTMPPort, srsHTTPPort: srsHTTPPort, httpPort: httpPort}
|
||||
}
|
||||
|
||||
//Start starts the rtmp and http server
|
||||
func (l *LPMS) Start() error {
|
||||
ec := make(chan error, 1)
|
||||
go func() {
|
||||
glog.Infof("Starting LPMS Server at :%v", l.rtmpServer.Addr)
|
||||
ec <- l.rtmpServer.ListenAndServe()
|
||||
}()
|
||||
go func() {
|
||||
glog.Infof("Starting HTTP Server at :%v", l.httpPort)
|
||||
ec <- http.ListenAndServe(":"+l.httpPort, nil)
|
||||
}()
|
||||
|
||||
select {
|
||||
case err := <-ec:
|
||||
glog.Infof("LPMS Server Error: %v. Quitting...", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
//HandleRTMPPublish offload to the video listener
|
||||
func (l *LPMS) HandleRTMPPublish(
|
||||
getStreamID func(reqPath string) (string, error),
|
||||
stream func(reqPath string) (*stream.Stream, error),
|
||||
endStream func(reqPath string)) error {
|
||||
|
||||
return l.vidListen.HandleRTMPPublish(getStreamID, stream, endStream)
|
||||
}
|
||||
|
||||
//HandleRTMPPlay offload to the video player
|
||||
func (l *LPMS) HandleRTMPPlay(getStream func(ctx context.Context, reqPath string, dst av.MuxCloser) error) error {
|
||||
return l.vidPlayer.HandleRTMPPlay(getStream)
|
||||
}
|
||||
|
||||
//HandleHLSPlay offload to the video player
|
||||
func (l *LPMS) HandleHLSPlay(getStream func(reqPath string) (*stream.HLSBuffer, error)) error {
|
||||
return l.vidPlayer.HandleHLSPlay(getStream)
|
||||
}
|
||||
|
||||
//HandleTranscode kicks off a transcoding process, keeps a local HLS buffer, and returns the new stream ID.
|
||||
//stream is the video stream you want to be transcoded. getNewStreamID gives you a way to name the transcoded stream.
|
||||
func (l *LPMS) HandleTranscode(getInStream func(ctx context.Context, streamID string) (*stream.Stream, error), getOutStream func(ctx context.Context, streamID string) (*stream.Stream, error)) {
|
||||
http.HandleFunc("/transcode", func(w http.ResponseWriter, r *http.Request) {
|
||||
ctx, _ := context.WithCancel(context.Background())
|
||||
// defer cancel()
|
||||
|
||||
//parse transcode request
|
||||
decoder := json.NewDecoder(r.Body)
|
||||
var tReq transcodeReq
|
||||
if r.Body == nil {
|
||||
http.Error(w, "Please send a request body", 400)
|
||||
return
|
||||
}
|
||||
err := decoder.Decode(&tReq)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), 400)
|
||||
return
|
||||
}
|
||||
|
||||
//Get the RTMP Stream
|
||||
inStream, err := getInStream(ctx, tReq.StreamID)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), 400)
|
||||
return
|
||||
}
|
||||
|
||||
//Get the HLS Stream
|
||||
newStream, err := getOutStream(ctx, tReq.StreamID)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), 400)
|
||||
}
|
||||
|
||||
ec := make(chan error, 1)
|
||||
go func() { ec <- l.doTranscoding(ctx, inStream, newStream) }()
|
||||
|
||||
w.Write([]byte("New Stream: " + newStream.StreamID))
|
||||
})
|
||||
}
|
||||
|
||||
func (l *LPMS) doTranscoding(ctx context.Context, inStream *stream.Stream, newStream *stream.Stream) error {
|
||||
t := transcoder.New(l.srsRTMPPort, l.srsHTTPPort, newStream.StreamID)
|
||||
//Should kick off a goroutine for this, so we can return the new streamID rightaway.
|
||||
|
||||
tranMux, err := t.LocalSRSUploadMux()
|
||||
if err != nil {
|
||||
return err
|
||||
// http.Error(w, "Cannot create a connection with local transcoder", 400)
|
||||
}
|
||||
|
||||
uec := make(chan error, 1)
|
||||
go func() { uec <- t.StartUpload(ctx, tranMux, inStream) }()
|
||||
dec := make(chan error, 1)
|
||||
go func() { dec <- t.StartDownload(ctx, newStream) }()
|
||||
|
||||
select {
|
||||
case err := <-uec:
|
||||
return err
|
||||
// http.Error(w, "Cannot upload stream to transcoder: "+err.Error(), 400)
|
||||
case err := <-dec:
|
||||
return err
|
||||
// http.Error(w, "Cannot download stream from transcoder: "+err.Error(), 400)
|
||||
}
|
||||
|
||||
}
|
||||
|
0
objs/srs.log
Normal file
0
objs/srs.log
Normal file
@@ -1,232 +0,0 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"mime"
|
||||
"net/http"
|
||||
"path"
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/logger"
|
||||
"github.com/ethereum/go-ethereum/logger/glog"
|
||||
"github.com/livepeer/go-livepeer/livepeer/network"
|
||||
"github.com/ethereum/go-ethereum/swarm/network/kademlia"
|
||||
"github.com/livepeer/go-livepeer/livepeer/storage"
|
||||
"github.com/livepeer/go-livepeer/livepeer/storage/streaming"
|
||||
lpmsIo "github.com/livepeer/lpms/io"
|
||||
streamingVizClient "github.com/livepeer/streamingviz/client"
|
||||
"github.com/nareix/joy4/format/flv"
|
||||
)
|
||||
|
||||
//This is for flushing to http request handlers (joy4 concept)
|
||||
type writeFlusher struct {
|
||||
httpflusher http.Flusher
|
||||
io.Writer
|
||||
}
|
||||
|
||||
func (self writeFlusher) Flush() error {
|
||||
self.httpflusher.Flush()
|
||||
return nil
|
||||
}
|
||||
|
||||
type broadcastReq struct {
|
||||
Formats []string
|
||||
Bitrates []string
|
||||
Codecin string
|
||||
Codecout []string
|
||||
StreamID string
|
||||
}
|
||||
|
||||
func StartHTTPServer(rtmpPort string, httpPort string, srsRtmpPort string, srsHttpPort string, streamer *streaming.Streamer, forwarder storage.CloudStore, streamdb *network.StreamDB, viz *streamingVizClient.Client) {
|
||||
glog.V(logger.Info).Infof("Starting HTTP Server at port: ", httpPort)
|
||||
|
||||
http.HandleFunc("/stream/", func(w http.ResponseWriter, r *http.Request) {
|
||||
fmt.Println("In handleFunc, Path: ", r.URL.Path)
|
||||
|
||||
var strmID string
|
||||
//Example path: /stream/133bd3c4e543e3cd53e2cf2b366eeeace7eae483b651b8b1e2a2072b250864fc62b0bac9f64df186c4fb74d427f136647dcf0ead9198dc7d9f881b1d5c2d2132-0.ts
|
||||
regex, _ := regexp.Compile("\\/stream\\/([[:alpha:]]|\\d)*")
|
||||
match := regex.FindString(r.URL.Path)
|
||||
if match != "" {
|
||||
strmID = strings.Replace(match, "/stream/", "", -1)
|
||||
}
|
||||
|
||||
glog.V(logger.Info).Infof("Got streamID as %v", strmID)
|
||||
|
||||
if strings.HasSuffix(r.URL.Path, ".m3u8") == true {
|
||||
stream, err := streamer.GetStreamByStreamID(streaming.StreamID(strmID))
|
||||
if stream == nil {
|
||||
stream, err = streamer.SubscribeToStream(strmID)
|
||||
if err != nil {
|
||||
glog.V(logger.Info).Infof("Error subscribing to stream %v", err)
|
||||
return
|
||||
}
|
||||
//Send subscribe request
|
||||
forwarder.Stream(strmID, kademlia.Address(common.HexToHash("")))
|
||||
}
|
||||
|
||||
//HLS request. Example: http://localhost:8080/stream/streamid.m3u8
|
||||
countdown := 12
|
||||
for countdown > 0 {
|
||||
if stream.M3U8 != nil {
|
||||
break
|
||||
} else {
|
||||
fmt.Println("Waiting for playlist")
|
||||
time.Sleep(time.Second * 5)
|
||||
}
|
||||
countdown = countdown - 1
|
||||
}
|
||||
if countdown == 0 {
|
||||
w.WriteHeader(404)
|
||||
w.Write([]byte("Cannot find playlist for HLS"))
|
||||
}
|
||||
// w.Header().Set("Content-Type", "application/vnd.apple.mpegurl")
|
||||
w.Header().Set("Content-Type", mime.TypeByExtension(path.Ext(r.URL.Path)))
|
||||
w.Header().Set("Access-Control-Allow-Origin", "*")
|
||||
w.Write(stream.M3U8)
|
||||
fmt.Println("Writing Playlist in handler: ", string(stream.M3U8))
|
||||
// go rememberHlsSegs(&stream.HlsSegNameMap, stream.HlsSegChan) // this is only used for testing viewer on publisher. Publisher doesn't need to remember HLS segments
|
||||
// return
|
||||
} else if strings.HasSuffix(r.URL.Path, ".ts") == true {
|
||||
//HLS video segments
|
||||
|
||||
stream, _ := streamer.GetStreamByStreamID(streaming.StreamID(strmID))
|
||||
fmt.Println("Got requests for: ", r.URL.Path)
|
||||
match := strings.Split(r.URL.Path, "/")
|
||||
filename := match[len(match)-1]
|
||||
|
||||
countdown := 60 //Total wait time is 60 seconds. Make the single wait smaller to minimize total delay.
|
||||
for countdown > 0 {
|
||||
if stream.HlsSegNameMap[filename] != nil {
|
||||
w.Header().Set("Content-Type", mime.TypeByExtension(path.Ext(r.URL.Path)))
|
||||
w.Write(stream.HlsSegNameMap[filename])
|
||||
break
|
||||
} else {
|
||||
fmt.Println("Waiting 1s for segment", filename, ", ", countdown)
|
||||
time.Sleep(time.Second * 1)
|
||||
}
|
||||
countdown = countdown - 1
|
||||
}
|
||||
|
||||
if countdown == 0 {
|
||||
w.WriteHeader(500)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
//Assume rtmp
|
||||
fmt.Println("Assumign rtmp: ", r.URL.Path)
|
||||
stream, err := streamer.GetStreamByStreamID(streaming.StreamID(strmID))
|
||||
if stream == nil {
|
||||
stream, err = streamer.SubscribeToStream(strmID)
|
||||
if err != nil {
|
||||
glog.V(logger.Info).Infof("Error subscribing to stream %v", err)
|
||||
return
|
||||
}
|
||||
//Send subscribe request
|
||||
forwarder.Stream(strmID, kademlia.Address(common.HexToHash("")))
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "video/x-flv")
|
||||
w.Header().Set("Transfer-Encoding", "chunked")
|
||||
w.WriteHeader(200)
|
||||
flusher := w.(http.Flusher)
|
||||
flusher.Flush()
|
||||
|
||||
muxer := flv.NewMuxerWriteFlusher(writeFlusher{httpflusher: flusher, Writer: w})
|
||||
//Cannot kick off a go routine here because the ResponseWriter is not a pointer (so a copy of the writer doesn't make any sense)
|
||||
lpmsIo.CopyRTMPFromStream(muxer, stream, stream.CloseChan)
|
||||
}
|
||||
})
|
||||
|
||||
http.HandleFunc("/broadcast", func(w http.ResponseWriter, r *http.Request) {
|
||||
glog.V(logger.Info).Infof("Got broadcast request")
|
||||
decoder := json.NewDecoder(r.Body)
|
||||
var bReq broadcastReq
|
||||
if r.Body == nil {
|
||||
http.Error(w, "Please send a request body", 400)
|
||||
return
|
||||
}
|
||||
err := decoder.Decode(&bReq)
|
||||
// glog.V(logger.Info).Infof("http body: ", r.Body)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), 400)
|
||||
return
|
||||
}
|
||||
bReq.Codecin = "RTMP"
|
||||
glog.V(logger.Info).Infof("Broadcast request: ", bReq)
|
||||
|
||||
var transcodeId common.Hash
|
||||
if len(r.URL.Query()["transcodeId"]) > 0 {
|
||||
str := r.URL.Query()["transcodeId"][0]
|
||||
transcodeId = common.HexToHash(str)
|
||||
glog.V(logger.Info).Infof("transcodeId %x", transcodeId[:])
|
||||
} else {
|
||||
//generate an completely random id
|
||||
transcodeId = common.HexToHash(string(streaming.MakeStreamID(streaming.RandomStreamID(), fmt.Sprintf("%x", streaming.RandomStreamID()))))
|
||||
}
|
||||
|
||||
// streamID := r.URL.Query()["streamId"][0]
|
||||
streamID := bReq.StreamID
|
||||
stream, _ := streamer.GetStreamByStreamID(streaming.StreamID(streamID))
|
||||
if stream == nil {
|
||||
// stream, _ = streamer.AddNewStream()
|
||||
//Require a stream to exist first
|
||||
w.WriteHeader(404)
|
||||
w.Write([]byte("Cannot find stream with ID: " + streamID))
|
||||
}
|
||||
forwarder.Transcode(string(stream.ID), transcodeId, bReq.Formats, bReq.Bitrates, bReq.Codecin, bReq.Codecout)
|
||||
glog.V(logger.Info).Infof("Broadcast Original Stream: %s. Waiting for ack...", stream.ID)
|
||||
})
|
||||
|
||||
http.HandleFunc("/transcodedVideo", func(w http.ResponseWriter, r *http.Request) {
|
||||
glog.V(logger.Info).Infof("Getting transcoded video")
|
||||
videos := streamdb.TranscodedStreams[streaming.StreamID(r.URL.Query()["originStreamID"][0])]
|
||||
js, err := json.Marshal(videos)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Write(js)
|
||||
})
|
||||
|
||||
http.HandleFunc("/streamIDs", func(w http.ResponseWriter, r *http.Request) {
|
||||
fmt.Println("Getting stream ids")
|
||||
streams := streamer.GetAllStreams()
|
||||
js, err := json.Marshal(streams)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Write(js)
|
||||
return
|
||||
})
|
||||
|
||||
http.HandleFunc("/streamEndpoint", func(w http.ResponseWriter, r *http.Request) {
|
||||
fmt.Println("Getting stream endpoint")
|
||||
resp := map[string]string{"url": "rtmp://localhost:" + rtmpPort + "/live/stream"}
|
||||
js, _ := json.Marshal(resp)
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Write(js)
|
||||
})
|
||||
|
||||
//For serving static HTML files (web-based broadcaster and viewer)
|
||||
fs := http.FileServer(http.Dir("static"))
|
||||
fmt.Println("Serving static files from: ", fs)
|
||||
http.Handle("/static/", http.StripPrefix("/static/", fs))
|
||||
|
||||
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
|
||||
http.Redirect(w, r, "/static/broadcast.html", 301)
|
||||
})
|
||||
|
||||
go http.ListenAndServe(":"+httpPort, nil)
|
||||
}
|
@@ -1,121 +0,0 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/logger"
|
||||
"github.com/ethereum/go-ethereum/logger/glog"
|
||||
"github.com/ethereum/go-ethereum/swarm/network/kademlia"
|
||||
"github.com/livepeer/go-livepeer/livepeer/storage"
|
||||
"github.com/livepeer/go-livepeer/livepeer/storage/streaming"
|
||||
"github.com/livepeer/lpms/io"
|
||||
"github.com/livepeer/lpms/types"
|
||||
streamingVizClient "github.com/livepeer/streamingviz/client"
|
||||
"github.com/nareix/joy4/av/avutil"
|
||||
joy4rtmp "github.com/nareix/joy4/format/rtmp"
|
||||
)
|
||||
|
||||
var srsRTMPPort string
|
||||
|
||||
func SrsRTMPPort() string {
|
||||
return srsRTMPPort
|
||||
}
|
||||
|
||||
func StartRTMPServer(rtmpPort string, srsRtmpPort string, srsHttpPort string, streamer *streaming.Streamer, forwarder storage.CloudStore, viz *streamingVizClient.Client) {
|
||||
if rtmpPort == "" {
|
||||
rtmpPort = "1935"
|
||||
}
|
||||
fmt.Println("Starting RTMP Server on port: ", rtmpPort)
|
||||
server := &joy4rtmp.Server{Addr: ":" + rtmpPort}
|
||||
|
||||
srsRTMPPort = srsRtmpPort
|
||||
|
||||
server.HandlePlay = func(conn *joy4rtmp.Conn) {
|
||||
glog.V(logger.Info).Infof("Trying to play stream at %v", conn.URL)
|
||||
|
||||
// Parse the streamID from the path host:port/stream/{streamID}
|
||||
var strmID string
|
||||
regex, _ := regexp.Compile("\\/stream\\/([[:alpha:]]|\\d)*")
|
||||
match := regex.FindString(conn.URL.Path)
|
||||
if match != "" {
|
||||
strmID = strings.Replace(match, "/stream/", "", -1)
|
||||
}
|
||||
|
||||
glog.V(logger.Info).Infof("Got streamID as %v", strmID)
|
||||
viz.LogConsume(strmID)
|
||||
stream, err := streamer.GetStreamByStreamID(streaming.StreamID(strmID))
|
||||
if stream == nil {
|
||||
stream, err = streamer.SubscribeToStream(strmID)
|
||||
if err != nil {
|
||||
glog.V(logger.Info).Infof("Error subscribing to stream %v", err)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
fmt.Println("Found stream: ", strmID)
|
||||
}
|
||||
|
||||
//Send subscribe request
|
||||
forwarder.Stream(strmID, kademlia.Address(common.HexToHash("")))
|
||||
|
||||
//Copy chunks to outgoing connection
|
||||
go io.CopyRTMPFromStream(conn, stream, stream.CloseChan)
|
||||
}
|
||||
|
||||
server.HandlePublish = func(conn *joy4rtmp.Conn) {
|
||||
transcodeParam := conn.URL.Query()["transcode"]
|
||||
if (len(transcodeParam) > 0) && (transcodeParam[0] == "true") {
|
||||
//For now, we rely on SRS. The next iteraion will be looking into directly integrating ffmpeg
|
||||
//First, forward the rtmp stream to the local SRS server (always running on .
|
||||
//Then, issue http req through the HLS endpoint.
|
||||
stream, _ := streamer.AddNewStream()
|
||||
glog.V(logger.Info).Infof("Added a new stream with id: %v", stream.ID)
|
||||
viz.LogBroadcast(string(stream.ID))
|
||||
dstConn, err := joy4rtmp.Dial("rtmp://localhost:" + srsRtmpPort + "/stream/" + string(stream.ID))
|
||||
if err != nil {
|
||||
glog.V(logger.Error).Infof("Error connecting to SRS server: ", err)
|
||||
return
|
||||
}
|
||||
|
||||
//To pass segment name from the playlist to the segment download routine.
|
||||
msChan := make(chan *types.Download, 1024)
|
||||
|
||||
//Copy to SRS rtmp
|
||||
go avutil.CopyFile(dstConn, conn)
|
||||
//Kick off goroutine to listen for HLS playlist file
|
||||
go io.GetHlsPlaylist("http://localhost:"+srsHttpPort+"/stream/"+string(stream.ID)+".m3u8", time.Duration(0), true, msChan, stream.M3U8Chan)
|
||||
//Download the segments
|
||||
go io.DownloadHlsSegment(msChan, stream.HlsSegChan)
|
||||
//Copy Hls segments to swarm
|
||||
go io.CopyHlsToChannel(stream.M3U8Chan, stream.HlsSegChan, stream.SrcVideoChan, stream.CloseChan)
|
||||
// go io.CopyHlsToChannel(stream)
|
||||
} else {
|
||||
//Do regular RTMP stuff - create a new stream, copy the video to the stream.
|
||||
var strmID string
|
||||
var stream *streaming.Stream
|
||||
regex, _ := regexp.Compile("\\/stream\\/([[:alpha:]]|\\d)*")
|
||||
match := regex.FindString(conn.URL.Path)
|
||||
if match != "" {
|
||||
strmID = strings.Replace(match, "/stream/", "", -1)
|
||||
stream, _ = streamer.GetStreamByStreamID(streaming.StreamID(strmID))
|
||||
}
|
||||
|
||||
if stream == nil {
|
||||
stream, _ = streamer.AddNewStream()
|
||||
glog.V(logger.Info).Infof("Added a new stream with id: %v", stream.ID)
|
||||
} else {
|
||||
glog.V(logger.Info).Infof("Got streamID as %v", strmID)
|
||||
}
|
||||
|
||||
viz.LogBroadcast(string(stream.ID))
|
||||
|
||||
//Send video to streamer channels
|
||||
go io.CopyToChannel(conn, stream, stream.CloseChan)
|
||||
}
|
||||
}
|
||||
|
||||
go server.ListenAndServe()
|
||||
}
|
317
stream/cmap.go
Normal file
317
stream/cmap.go
Normal file
@@ -0,0 +1,317 @@
|
||||
package stream
|
||||
|
||||
//Borrowed from https://github.com/orcaman/concurrent-map
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"sync"
|
||||
)
|
||||
|
||||
var SHARD_COUNT = 32
|
||||
|
||||
// A "thread" safe map of type string:Anything.
|
||||
// To avoid lock bottlenecks this map is dived to several (SHARD_COUNT) map shards.
|
||||
type ConcurrentMap []*ConcurrentMapShared
|
||||
|
||||
// A "thread" safe string to anything map.
|
||||
type ConcurrentMapShared struct {
|
||||
items map[string]interface{}
|
||||
sync.RWMutex // Read Write mutex, guards access to internal map.
|
||||
}
|
||||
|
||||
// Creates a new concurrent map.
|
||||
func NewCMap() ConcurrentMap {
|
||||
m := make(ConcurrentMap, SHARD_COUNT)
|
||||
for i := 0; i < SHARD_COUNT; i++ {
|
||||
m[i] = &ConcurrentMapShared{items: make(map[string]interface{})}
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
// Returns shard under given key
|
||||
func (m ConcurrentMap) GetShard(key string) *ConcurrentMapShared {
|
||||
return m[uint(fnv32(key))%uint(SHARD_COUNT)]
|
||||
}
|
||||
|
||||
func (m ConcurrentMap) MSet(data map[string]interface{}) {
|
||||
for key, value := range data {
|
||||
shard := m.GetShard(key)
|
||||
shard.Lock()
|
||||
shard.items[key] = value
|
||||
shard.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// Sets the given value under the specified key.
|
||||
func (m *ConcurrentMap) Set(key string, value interface{}) {
|
||||
// Get map shard.
|
||||
shard := m.GetShard(key)
|
||||
shard.Lock()
|
||||
shard.items[key] = value
|
||||
shard.Unlock()
|
||||
}
|
||||
|
||||
// Callback to return new element to be inserted into the map
|
||||
// It is called while lock is held, therefore it MUST NOT
|
||||
// try to access other keys in same map, as it can lead to deadlock since
|
||||
// Go sync.RWLock is not reentrant
|
||||
type UpsertCb func(exist bool, valueInMap interface{}, newValue interface{}) interface{}
|
||||
|
||||
// Insert or Update - updates existing element or inserts a new one using UpsertCb
|
||||
func (m *ConcurrentMap) Upsert(key string, value interface{}, cb UpsertCb) (res interface{}) {
|
||||
shard := m.GetShard(key)
|
||||
shard.Lock()
|
||||
v, ok := shard.items[key]
|
||||
res = cb(ok, v, value)
|
||||
shard.items[key] = res
|
||||
shard.Unlock()
|
||||
return res
|
||||
}
|
||||
|
||||
// Sets the given value under the specified key if no value was associated with it.
|
||||
func (m *ConcurrentMap) SetIfAbsent(key string, value interface{}) bool {
|
||||
// Get map shard.
|
||||
shard := m.GetShard(key)
|
||||
shard.Lock()
|
||||
_, ok := shard.items[key]
|
||||
if !ok {
|
||||
shard.items[key] = value
|
||||
}
|
||||
shard.Unlock()
|
||||
return !ok
|
||||
}
|
||||
|
||||
// Retrieves an element from map under given key.
|
||||
func (m ConcurrentMap) Get(key string) (interface{}, bool) {
|
||||
// Get shard
|
||||
shard := m.GetShard(key)
|
||||
shard.RLock()
|
||||
// Get item from shard.
|
||||
val, ok := shard.items[key]
|
||||
shard.RUnlock()
|
||||
return val, ok
|
||||
}
|
||||
|
||||
// Returns the number of elements within the map.
|
||||
func (m ConcurrentMap) Count() int {
|
||||
count := 0
|
||||
for i := 0; i < SHARD_COUNT; i++ {
|
||||
shard := m[i]
|
||||
shard.RLock()
|
||||
count += len(shard.items)
|
||||
shard.RUnlock()
|
||||
}
|
||||
return count
|
||||
}
|
||||
|
||||
// Looks up an item under specified key
|
||||
func (m *ConcurrentMap) Has(key string) bool {
|
||||
// Get shard
|
||||
shard := m.GetShard(key)
|
||||
shard.RLock()
|
||||
// See if element is within shard.
|
||||
_, ok := shard.items[key]
|
||||
shard.RUnlock()
|
||||
return ok
|
||||
}
|
||||
|
||||
// Removes an element from the map.
|
||||
func (m *ConcurrentMap) Remove(key string) {
|
||||
// Try to get shard.
|
||||
shard := m.GetShard(key)
|
||||
shard.Lock()
|
||||
delete(shard.items, key)
|
||||
shard.Unlock()
|
||||
}
|
||||
|
||||
// Removes an element from the map and returns it
|
||||
func (m *ConcurrentMap) Pop(key string) (v interface{}, exists bool) {
|
||||
// Try to get shard.
|
||||
shard := m.GetShard(key)
|
||||
shard.Lock()
|
||||
v, exists = shard.items[key]
|
||||
delete(shard.items, key)
|
||||
shard.Unlock()
|
||||
return v, exists
|
||||
}
|
||||
|
||||
// Checks if map is empty.
|
||||
func (m *ConcurrentMap) IsEmpty() bool {
|
||||
return m.Count() == 0
|
||||
}
|
||||
|
||||
// Used by the Iter & IterBuffered functions to wrap two variables together over a channel,
|
||||
type Tuple struct {
|
||||
Key string
|
||||
Val interface{}
|
||||
}
|
||||
|
||||
// Returns an iterator which could be used in a for range loop.
|
||||
//
|
||||
// Deprecated: using IterBuffered() will get a better performence
|
||||
func (m ConcurrentMap) Iter() <-chan Tuple {
|
||||
chans := snapshot(&m)
|
||||
ch := make(chan Tuple)
|
||||
go fanIn(chans, ch)
|
||||
return ch
|
||||
}
|
||||
|
||||
// Returns a buffered iterator which could be used in a for range loop.
|
||||
func (m ConcurrentMap) IterBuffered() <-chan Tuple {
|
||||
chans := snapshot(&m)
|
||||
total := 0
|
||||
for _, c := range chans {
|
||||
total += cap(c)
|
||||
}
|
||||
ch := make(chan Tuple, total)
|
||||
go fanIn(chans, ch)
|
||||
return ch
|
||||
}
|
||||
|
||||
// Returns a array of channels that contains elements in each shard,
|
||||
// which likely takes a snapshot of `m`.
|
||||
// It returns once the size of each buffered channel is determined,
|
||||
// before all the channels are populated using goroutines.
|
||||
func snapshot(m *ConcurrentMap) (chans []chan Tuple) {
|
||||
chans = make([]chan Tuple, SHARD_COUNT)
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(SHARD_COUNT)
|
||||
// Foreach shard.
|
||||
for index, shard := range *m {
|
||||
go func(index int, shard *ConcurrentMapShared) {
|
||||
// Foreach key, value pair.
|
||||
shard.RLock()
|
||||
chans[index] = make(chan Tuple, len(shard.items))
|
||||
wg.Done()
|
||||
for key, val := range shard.items {
|
||||
chans[index] <- Tuple{key, val}
|
||||
}
|
||||
shard.RUnlock()
|
||||
close(chans[index])
|
||||
}(index, shard)
|
||||
}
|
||||
wg.Wait()
|
||||
return chans
|
||||
}
|
||||
|
||||
// fanIn reads elements from channels `chans` into channel `out`
|
||||
func fanIn(chans []chan Tuple, out chan Tuple) {
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(len(chans))
|
||||
for _, ch := range chans {
|
||||
go func(ch chan Tuple) {
|
||||
for t := range ch {
|
||||
out <- t
|
||||
}
|
||||
wg.Done()
|
||||
}(ch)
|
||||
}
|
||||
wg.Wait()
|
||||
close(out)
|
||||
}
|
||||
|
||||
// Returns all items as map[string]interface{}
|
||||
func (m ConcurrentMap) Items() map[string]interface{} {
|
||||
tmp := make(map[string]interface{})
|
||||
|
||||
// Insert items to temporary map.
|
||||
for item := range m.IterBuffered() {
|
||||
tmp[item.Key] = item.Val
|
||||
}
|
||||
|
||||
return tmp
|
||||
}
|
||||
|
||||
// Iterator callback,called for every key,value found in
|
||||
// maps. RLock is held for all calls for a given shard
|
||||
// therefore callback sess consistent view of a shard,
|
||||
// but not across the shards
|
||||
type IterCb func(key string, v interface{})
|
||||
|
||||
// Callback based iterator, cheapest way to read
|
||||
// all elements in a map.
|
||||
func (m *ConcurrentMap) IterCb(fn IterCb) {
|
||||
for idx := range *m {
|
||||
shard := (*m)[idx]
|
||||
shard.RLock()
|
||||
for key, value := range shard.items {
|
||||
fn(key, value)
|
||||
}
|
||||
shard.RUnlock()
|
||||
}
|
||||
}
|
||||
|
||||
// Return all keys as []string
|
||||
func (m ConcurrentMap) Keys() []string {
|
||||
count := m.Count()
|
||||
ch := make(chan string, count)
|
||||
go func() {
|
||||
// Foreach shard.
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(SHARD_COUNT)
|
||||
for _, shard := range m {
|
||||
go func(shard *ConcurrentMapShared) {
|
||||
// Foreach key, value pair.
|
||||
shard.RLock()
|
||||
for key := range shard.items {
|
||||
ch <- key
|
||||
}
|
||||
shard.RUnlock()
|
||||
wg.Done()
|
||||
}(shard)
|
||||
}
|
||||
wg.Wait()
|
||||
close(ch)
|
||||
}()
|
||||
|
||||
// Generate keys
|
||||
keys := make([]string, 0, count)
|
||||
for k := range ch {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
return keys
|
||||
}
|
||||
|
||||
//Reviles ConcurrentMap "private" variables to json marshal.
|
||||
func (m ConcurrentMap) MarshalJSON() ([]byte, error) {
|
||||
// Create a temporary map, which will hold all item spread across shards.
|
||||
tmp := make(map[string]interface{})
|
||||
|
||||
// Insert items to temporary map.
|
||||
for item := range m.IterBuffered() {
|
||||
tmp[item.Key] = item.Val
|
||||
}
|
||||
return json.Marshal(tmp)
|
||||
}
|
||||
|
||||
func fnv32(key string) uint32 {
|
||||
hash := uint32(2166136261)
|
||||
const prime32 = uint32(16777619)
|
||||
for i := 0; i < len(key); i++ {
|
||||
hash *= prime32
|
||||
hash ^= uint32(key[i])
|
||||
}
|
||||
return hash
|
||||
}
|
||||
|
||||
// Concurrent map uses Interface{} as its value, therefor JSON Unmarshal
|
||||
// will probably won't know which to type to unmarshal into, in such case
|
||||
// we'll end up with a value of type map[string]interface{}, In most cases this isn't
|
||||
// out value type, this is why we've decided to remove this functionality.
|
||||
|
||||
// func (m *ConcurrentMap) UnmarshalJSON(b []byte) (err error) {
|
||||
// // Reverse process of Marshal.
|
||||
|
||||
// tmp := make(map[string]interface{})
|
||||
|
||||
// // Unmarshal into a single map.
|
||||
// if err := json.Unmarshal(b, &tmp); err != nil {
|
||||
// return nil
|
||||
// }
|
||||
|
||||
// // foreach key,value pair in temporary map insert into our concurrent map.
|
||||
// for key, val := range tmp {
|
||||
// m.Set(key, val)
|
||||
// }
|
||||
// return nil
|
||||
// }
|
94
stream/hls.go
Normal file
94
stream/hls.go
Normal file
@@ -0,0 +1,94 @@
|
||||
package stream
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"github.com/kz26/m3u8"
|
||||
)
|
||||
|
||||
var ErrNotFound = errors.New("Not Found")
|
||||
|
||||
type HLSDemuxer interface {
|
||||
//This method should ONLY push a playlist onto a chan when it's a NEW playlist
|
||||
WaitAndPopPlaylist(ctx context.Context) (m3u8.MediaPlaylist, error)
|
||||
//This method should ONLY push a segment onto a chan when it's a NEW segment
|
||||
WaitAndPopSegment(ctx context.Context, name string) ([]byte, error)
|
||||
}
|
||||
|
||||
type HLSMuxer interface {
|
||||
WritePlaylist(m3u8.MediaPlaylist) error
|
||||
WriteSegment(name string, s []byte) error
|
||||
}
|
||||
|
||||
//TODO: Write tests, set buffer size, kick out segments / playlists if too full
|
||||
type HLSBuffer struct {
|
||||
HoldTime time.Duration
|
||||
plCacheNew bool
|
||||
segCache *Queue
|
||||
// pq *Queue
|
||||
plCache m3u8.MediaPlaylist
|
||||
sq *ConcurrentMap
|
||||
lock sync.Locker
|
||||
}
|
||||
|
||||
func NewHLSBuffer() *HLSBuffer {
|
||||
m := NewCMap()
|
||||
return &HLSBuffer{plCacheNew: false, segCache: &Queue{}, HoldTime: time.Second, sq: &m, lock: &sync.Mutex{}}
|
||||
}
|
||||
|
||||
func (b *HLSBuffer) WritePlaylist(p m3u8.MediaPlaylist) error {
|
||||
|
||||
b.lock.Lock()
|
||||
b.plCache = p
|
||||
b.plCacheNew = true
|
||||
b.lock.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *HLSBuffer) WriteSegment(name string, s []byte) error {
|
||||
b.lock.Lock()
|
||||
b.segCache.Put(name)
|
||||
b.sq.Set(name, s)
|
||||
b.lock.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *HLSBuffer) WaitAndPopPlaylist(ctx context.Context) (m3u8.MediaPlaylist, error) {
|
||||
for {
|
||||
|
||||
if b.plCacheNew {
|
||||
return b.plCache, nil
|
||||
b.plCacheNew = false
|
||||
}
|
||||
time.Sleep(time.Second * 1)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return m3u8.MediaPlaylist{}, ctx.Err()
|
||||
default:
|
||||
//Fall through here so we can loop back
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (b *HLSBuffer) WaitAndPopSegment(ctx context.Context, name string) ([]byte, error) {
|
||||
for {
|
||||
seg, found := b.sq.Get(name)
|
||||
glog.Infof("GetSegment: %v, %v", name, found)
|
||||
if found {
|
||||
b.sq.Remove(name)
|
||||
return seg.([]byte), nil
|
||||
}
|
||||
|
||||
time.Sleep(time.Second * 1)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
default:
|
||||
//Fall through here so we can loop back
|
||||
}
|
||||
}
|
||||
}
|
375
stream/queue.go
Normal file
375
stream/queue.go
Normal file
@@ -0,0 +1,375 @@
|
||||
//Mostly take from github.com/Workiva/go-datastructures.
|
||||
package stream
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"runtime"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrDisposed is returned when an operation is performed on a disposed
|
||||
// queue.
|
||||
ErrDisposed = errors.New(`queue: disposed`)
|
||||
|
||||
// ErrTimeout is returned when an applicable queue operation times out.
|
||||
ErrTimeout = errors.New(`queue: poll timed out`)
|
||||
|
||||
// ErrEmptyQueue is returned when an non-applicable queue operation was called
|
||||
// due to the queue's empty item state
|
||||
ErrEmptyQueue = errors.New(`queue: empty queue`)
|
||||
)
|
||||
|
||||
type waiters []*sema
|
||||
|
||||
func (w *waiters) get() *sema {
|
||||
if len(*w) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
sema := (*w)[0]
|
||||
copy((*w)[0:], (*w)[1:])
|
||||
(*w)[len(*w)-1] = nil // or the zero value of T
|
||||
*w = (*w)[:len(*w)-1]
|
||||
return sema
|
||||
}
|
||||
|
||||
func (w *waiters) put(sema *sema) {
|
||||
*w = append(*w, sema)
|
||||
}
|
||||
|
||||
func (w *waiters) remove(sema *sema) {
|
||||
if len(*w) == 0 {
|
||||
return
|
||||
}
|
||||
// build new slice, copy all except sema
|
||||
ws := *w
|
||||
newWs := make(waiters, 0, len(*w))
|
||||
for i := range ws {
|
||||
if ws[i] != sema {
|
||||
newWs = append(newWs, ws[i])
|
||||
}
|
||||
}
|
||||
*w = newWs
|
||||
}
|
||||
|
||||
type items []interface{}
|
||||
|
||||
func (items *items) get(number int64) []interface{} {
|
||||
returnItems := make([]interface{}, 0, number)
|
||||
index := int64(0)
|
||||
for i := int64(0); i < number; i++ {
|
||||
if i >= int64(len(*items)) {
|
||||
break
|
||||
}
|
||||
|
||||
returnItems = append(returnItems, (*items)[i])
|
||||
(*items)[i] = nil
|
||||
index++
|
||||
}
|
||||
|
||||
*items = (*items)[index:]
|
||||
return returnItems
|
||||
}
|
||||
|
||||
func (items *items) peek() (interface{}, bool) {
|
||||
length := len(*items)
|
||||
|
||||
if length == 0 {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
return (*items)[0], true
|
||||
}
|
||||
|
||||
func (items *items) getUntil(checker func(item interface{}) bool) []interface{} {
|
||||
length := len(*items)
|
||||
|
||||
if len(*items) == 0 {
|
||||
// returning nil here actually wraps that nil in a list
|
||||
// of interfaces... thanks go
|
||||
return []interface{}{}
|
||||
}
|
||||
|
||||
returnItems := make([]interface{}, 0, length)
|
||||
index := -1
|
||||
for i, item := range *items {
|
||||
if !checker(item) {
|
||||
break
|
||||
}
|
||||
|
||||
returnItems = append(returnItems, item)
|
||||
index = i
|
||||
(*items)[i] = nil // prevent memory leak
|
||||
}
|
||||
|
||||
*items = (*items)[index+1:]
|
||||
return returnItems
|
||||
}
|
||||
|
||||
type sema struct {
|
||||
ready chan bool
|
||||
response *sync.WaitGroup
|
||||
}
|
||||
|
||||
func newSema() *sema {
|
||||
return &sema{
|
||||
ready: make(chan bool, 1),
|
||||
response: &sync.WaitGroup{},
|
||||
}
|
||||
}
|
||||
|
||||
// Queue is the struct responsible for tracking the state
|
||||
// of the queue.
|
||||
type Queue struct {
|
||||
waiters waiters
|
||||
items items
|
||||
lock sync.Mutex
|
||||
disposed bool
|
||||
}
|
||||
|
||||
// Put will add the specified items to the queue.
|
||||
func (q *Queue) Put(items ...interface{}) error {
|
||||
if len(items) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
q.lock.Lock()
|
||||
|
||||
if q.disposed {
|
||||
q.lock.Unlock()
|
||||
return ErrDisposed
|
||||
}
|
||||
|
||||
q.items = append(q.items, items...)
|
||||
for {
|
||||
sema := q.waiters.get()
|
||||
if sema == nil {
|
||||
break
|
||||
}
|
||||
sema.response.Add(1)
|
||||
select {
|
||||
case sema.ready <- true:
|
||||
sema.response.Wait()
|
||||
default:
|
||||
// This semaphore timed out.
|
||||
}
|
||||
if len(q.items) == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
q.lock.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get retrieves items from the queue. If there are some items in the
|
||||
// queue, get will return a number UP TO the number passed in as a
|
||||
// parameter. If no items are in the queue, this method will pause
|
||||
// until items are added to the queue.
|
||||
func (q *Queue) Get(number int64) ([]interface{}, error) {
|
||||
return q.Poll(number, 0)
|
||||
}
|
||||
|
||||
// Poll retrieves items from the queue. If there are some items in the queue,
|
||||
// Poll will return a number UP TO the number passed in as a parameter. If no
|
||||
// items are in the queue, this method will pause until items are added to the
|
||||
// queue or the provided timeout is reached. A non-positive timeout will block
|
||||
// until items are added. If a timeout occurs, ErrTimeout is returned.
|
||||
func (q *Queue) Poll(number int64, timeout time.Duration) ([]interface{}, error) {
|
||||
if number < 1 {
|
||||
// thanks again go
|
||||
return []interface{}{}, nil
|
||||
}
|
||||
|
||||
q.lock.Lock()
|
||||
|
||||
if q.disposed {
|
||||
q.lock.Unlock()
|
||||
return nil, ErrDisposed
|
||||
}
|
||||
|
||||
var items []interface{}
|
||||
|
||||
if len(q.items) == 0 {
|
||||
sema := newSema()
|
||||
q.waiters.put(sema)
|
||||
q.lock.Unlock()
|
||||
|
||||
var timeoutC <-chan time.Time
|
||||
if timeout > 0 {
|
||||
timeoutC = time.After(timeout)
|
||||
}
|
||||
select {
|
||||
case <-sema.ready:
|
||||
// we are now inside the put's lock
|
||||
if q.disposed {
|
||||
return nil, ErrDisposed
|
||||
}
|
||||
items = q.items.get(number)
|
||||
sema.response.Done()
|
||||
return items, nil
|
||||
case <-timeoutC:
|
||||
// cleanup the sema that was added to waiters
|
||||
select {
|
||||
case sema.ready <- true:
|
||||
// we called this before Put() could
|
||||
// Remove sema from waiters.
|
||||
q.lock.Lock()
|
||||
q.waiters.remove(sema)
|
||||
q.lock.Unlock()
|
||||
default:
|
||||
// Put() got it already, we need to call Done() so Put() can move on
|
||||
sema.response.Done()
|
||||
}
|
||||
return nil, ErrTimeout
|
||||
}
|
||||
}
|
||||
|
||||
items = q.items.get(number)
|
||||
q.lock.Unlock()
|
||||
return items, nil
|
||||
}
|
||||
|
||||
// Peek returns a the first item in the queue by value
|
||||
// without modifying the queue.
|
||||
func (q *Queue) Peek() (interface{}, error) {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
|
||||
if q.disposed {
|
||||
return nil, ErrDisposed
|
||||
}
|
||||
|
||||
peekItem, ok := q.items.peek()
|
||||
if !ok {
|
||||
return nil, ErrEmptyQueue
|
||||
}
|
||||
|
||||
return peekItem, nil
|
||||
}
|
||||
|
||||
// TakeUntil takes a function and returns a list of items that
|
||||
// match the checker until the checker returns false. This does not
|
||||
// wait if there are no items in the queue.
|
||||
func (q *Queue) TakeUntil(checker func(item interface{}) bool) ([]interface{}, error) {
|
||||
if checker == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
q.lock.Lock()
|
||||
|
||||
if q.disposed {
|
||||
q.lock.Unlock()
|
||||
return nil, ErrDisposed
|
||||
}
|
||||
|
||||
result := q.items.getUntil(checker)
|
||||
q.lock.Unlock()
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// Empty returns a bool indicating if this bool is empty.
|
||||
func (q *Queue) Empty() bool {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
|
||||
return len(q.items) == 0
|
||||
}
|
||||
|
||||
// Len returns the number of items in this queue.
|
||||
func (q *Queue) Len() int64 {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
|
||||
return int64(len(q.items))
|
||||
}
|
||||
|
||||
// Disposed returns a bool indicating if this queue
|
||||
// has had disposed called on it.
|
||||
func (q *Queue) Disposed() bool {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
|
||||
return q.disposed
|
||||
}
|
||||
|
||||
// Dispose will dispose of this queue and returns
|
||||
// the items disposed. Any subsequent calls to Get
|
||||
// or Put will return an error.
|
||||
func (q *Queue) Dispose() []interface{} {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
|
||||
q.disposed = true
|
||||
for _, waiter := range q.waiters {
|
||||
waiter.response.Add(1)
|
||||
select {
|
||||
case waiter.ready <- true:
|
||||
// release Poll immediately
|
||||
default:
|
||||
// ignore if it's a timeout or in the get
|
||||
}
|
||||
}
|
||||
|
||||
disposedItems := q.items
|
||||
|
||||
q.items = nil
|
||||
q.waiters = nil
|
||||
|
||||
return disposedItems
|
||||
}
|
||||
|
||||
// New is a constructor for a new threadsafe queue.
|
||||
func NewQueue(hint int64) *Queue {
|
||||
return &Queue{
|
||||
items: make([]interface{}, 0, hint),
|
||||
}
|
||||
}
|
||||
|
||||
// ExecuteInParallel will (in parallel) call the provided function
|
||||
// with each item in the queue until the queue is exhausted. When the queue
|
||||
// is exhausted execution is complete and all goroutines will be killed.
|
||||
// This means that the queue will be disposed so cannot be used again.
|
||||
func ExecuteInParallel(q *Queue, fn func(interface{})) {
|
||||
if q == nil {
|
||||
return
|
||||
}
|
||||
|
||||
q.lock.Lock() // so no one touches anything in the middle
|
||||
// of this process
|
||||
todo, done := uint64(len(q.items)), int64(-1)
|
||||
// this is important or we might face an infinite loop
|
||||
if todo == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
numCPU := 1
|
||||
if runtime.NumCPU() > 1 {
|
||||
numCPU = runtime.NumCPU() - 1
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(numCPU)
|
||||
items := q.items
|
||||
|
||||
for i := 0; i < numCPU; i++ {
|
||||
go func() {
|
||||
for {
|
||||
index := atomic.AddInt64(&done, 1)
|
||||
if index >= int64(todo) {
|
||||
wg.Done()
|
||||
break
|
||||
}
|
||||
|
||||
fn(items[index])
|
||||
items[index] = 0
|
||||
}
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
q.lock.Unlock()
|
||||
q.Dispose()
|
||||
}
|
193
stream/stream.go
Normal file
193
stream/stream.go
Normal file
@@ -0,0 +1,193 @@
|
||||
package stream
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"reflect"
|
||||
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"github.com/kz26/m3u8"
|
||||
"github.com/nareix/joy4/av"
|
||||
)
|
||||
|
||||
var ErrBufferFull = errors.New("Stream Buffer Full")
|
||||
var ErrBufferEmpty = errors.New("Stream Buffer Empty")
|
||||
var ErrBufferItemType = errors.New("Buffer Item Type Not Recognized")
|
||||
var ErrDroppedRTMPStream = errors.New("RTMP Stream Stopped Without EOF")
|
||||
var ErrHttpReqFailed = errors.New("Http Request Failed")
|
||||
|
||||
type RTMPEOF struct{}
|
||||
|
||||
type streamBuffer struct {
|
||||
q *Queue
|
||||
}
|
||||
|
||||
func newStreamBuffer() *streamBuffer {
|
||||
return &streamBuffer{q: NewQueue(1000)}
|
||||
}
|
||||
|
||||
func (b *streamBuffer) push(in interface{}) error {
|
||||
b.q.Put(in)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *streamBuffer) poll(wait time.Duration) (interface{}, error) {
|
||||
results, err := b.q.Poll(1, wait)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result := results[0]
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (b *streamBuffer) pop() (interface{}, error) {
|
||||
results, err := b.q.Get(1)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result := results[0]
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (b *streamBuffer) len() int64 {
|
||||
return b.q.Len()
|
||||
}
|
||||
|
||||
type HLSSegment struct {
|
||||
Name string
|
||||
Data []byte
|
||||
}
|
||||
|
||||
type Stream struct {
|
||||
StreamID string
|
||||
RTMPTimeout time.Duration
|
||||
HLSTimeout time.Duration
|
||||
buffer *streamBuffer
|
||||
}
|
||||
|
||||
func (s *Stream) Len() int64 {
|
||||
return s.buffer.len()
|
||||
}
|
||||
|
||||
func NewStream(id string) *Stream {
|
||||
return &Stream{buffer: newStreamBuffer(), StreamID: id}
|
||||
}
|
||||
|
||||
//ReadRTMPFromStream reads the content from the RTMP stream out into the dst.
|
||||
func (s *Stream) ReadRTMPFromStream(ctx context.Context, dst av.MuxCloser) error {
|
||||
defer dst.Close()
|
||||
|
||||
//TODO: Make sure to listen to ctx.Done()
|
||||
for {
|
||||
item, err := s.buffer.poll(s.RTMPTimeout)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
switch item.(type) {
|
||||
case []av.CodecData:
|
||||
headers := item.([]av.CodecData)
|
||||
err = dst.WriteHeader(headers)
|
||||
if err != nil {
|
||||
glog.Infof("Error writing RTMP header from Stream %v to mux", s.StreamID)
|
||||
return err
|
||||
}
|
||||
case av.Packet:
|
||||
packet := item.(av.Packet)
|
||||
err = dst.WritePacket(packet)
|
||||
if err != nil {
|
||||
glog.Infof("Error writing RTMP packet from Stream %v to mux", s.StreamID)
|
||||
return err
|
||||
}
|
||||
case RTMPEOF:
|
||||
err := dst.WriteTrailer()
|
||||
if err != nil {
|
||||
glog.Infof("Error writing RTMP trailer from Stream %v", s.StreamID)
|
||||
return err
|
||||
}
|
||||
return io.EOF
|
||||
default:
|
||||
glog.Infof("Cannot recognize buffer iteam type: ", reflect.TypeOf(item))
|
||||
return ErrBufferItemType
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//WriteRTMPToStream writes a video stream from src into the stream.
|
||||
func (s *Stream) WriteRTMPToStream(ctx context.Context, src av.DemuxCloser) error {
|
||||
defer src.Close()
|
||||
|
||||
c := make(chan error, 1)
|
||||
go func() {
|
||||
c <- func() error {
|
||||
header, err := src.Streams()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = s.buffer.push(header)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// var lastKeyframe av.Packet
|
||||
for {
|
||||
packet, err := src.ReadPacket()
|
||||
if err == io.EOF {
|
||||
s.buffer.push(RTMPEOF{})
|
||||
return err
|
||||
} else if err != nil {
|
||||
return err
|
||||
} else if len(packet.Data) == 0 { //TODO: Investigate if it's possible for packet to be nil (what happens when RTMP stopped publishing because of a dropped connection? Is it possible to have err and packet both nil?)
|
||||
return ErrDroppedRTMPStream
|
||||
}
|
||||
|
||||
if packet.IsKeyFrame {
|
||||
// lastKeyframe = packet
|
||||
}
|
||||
|
||||
err = s.buffer.push(packet)
|
||||
if err == ErrBufferFull {
|
||||
//TODO: Delete all packets until last keyframe, insert headers in front - trying to get rid of streaming artifacts.
|
||||
}
|
||||
}
|
||||
}()
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
glog.Infof("Finished writing RTMP to Stream %v", s.StreamID)
|
||||
return ctx.Err()
|
||||
case err := <-c:
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Stream) WriteHLSPlaylistToStream(pl m3u8.MediaPlaylist) error {
|
||||
return s.buffer.push(pl)
|
||||
}
|
||||
|
||||
func (s *Stream) WriteHLSSegmentToStream(seg HLSSegment) error {
|
||||
return s.buffer.push(seg)
|
||||
}
|
||||
|
||||
//ReadHLSFromStream reads an HLS stream into an HLSBuffer
|
||||
func (s *Stream) ReadHLSFromStream(buffer HLSMuxer) error {
|
||||
for {
|
||||
item, err := s.buffer.poll(s.HLSTimeout)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
switch item.(type) {
|
||||
case m3u8.MediaPlaylist:
|
||||
buffer.WritePlaylist(item.(m3u8.MediaPlaylist))
|
||||
case HLSSegment:
|
||||
buffer.WriteSegment(item.(HLSSegment).Name, item.(HLSSegment).Data)
|
||||
default:
|
||||
return ErrBufferItemType
|
||||
}
|
||||
}
|
||||
}
|
351
stream/stream_test.go
Normal file
351
stream/stream_test.go
Normal file
@@ -0,0 +1,351 @@
|
||||
package stream
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"runtime"
|
||||
"testing"
|
||||
|
||||
"time"
|
||||
|
||||
"github.com/kz26/m3u8"
|
||||
"github.com/nareix/joy4/av"
|
||||
)
|
||||
|
||||
//Testing WriteRTMP errors
|
||||
var ErrPacketRead = errors.New("packet read error")
|
||||
var ErrStreams = errors.New("streams error")
|
||||
|
||||
type BadStreamsDemuxer struct{}
|
||||
|
||||
func (d BadStreamsDemuxer) Close() error { return nil }
|
||||
func (d BadStreamsDemuxer) Streams() ([]av.CodecData, error) { return nil, ErrStreams }
|
||||
func (d BadStreamsDemuxer) ReadPacket() (av.Packet, error) { return av.Packet{Data: []byte{0, 0}}, nil }
|
||||
|
||||
type BadPacketsDemuxer struct{}
|
||||
|
||||
func (d BadPacketsDemuxer) Close() error { return nil }
|
||||
func (d BadPacketsDemuxer) Streams() ([]av.CodecData, error) { return nil, nil }
|
||||
func (d BadPacketsDemuxer) ReadPacket() (av.Packet, error) {
|
||||
return av.Packet{Data: []byte{0, 0}}, ErrPacketRead
|
||||
}
|
||||
|
||||
type NoEOFDemuxer struct {
|
||||
c *Counter
|
||||
}
|
||||
|
||||
type Counter struct {
|
||||
Count int
|
||||
}
|
||||
|
||||
func (d NoEOFDemuxer) Close() error { return nil }
|
||||
func (d NoEOFDemuxer) Streams() ([]av.CodecData, error) { return nil, nil }
|
||||
func (d NoEOFDemuxer) ReadPacket() (av.Packet, error) {
|
||||
if d.c.Count == 10 {
|
||||
return av.Packet{}, nil
|
||||
}
|
||||
|
||||
d.c.Count = d.c.Count + 1
|
||||
return av.Packet{Data: []byte{0}}, nil
|
||||
}
|
||||
|
||||
func TestWriteRTMPErrors(t *testing.T) {
|
||||
// stream := Stream{Buffer: &StreamBuffer{}, StreamID: "test"}
|
||||
stream := NewStream("test")
|
||||
err := stream.WriteRTMPToStream(context.Background(), BadStreamsDemuxer{})
|
||||
if err != ErrStreams {
|
||||
t.Error("Expecting Streams Error, but got: ", err)
|
||||
}
|
||||
|
||||
err = stream.WriteRTMPToStream(context.Background(), BadPacketsDemuxer{})
|
||||
if err != ErrPacketRead {
|
||||
t.Error("Expecting Packet Read Error, but got: ", err)
|
||||
}
|
||||
|
||||
err = stream.WriteRTMPToStream(context.Background(), NoEOFDemuxer{c: &Counter{Count: 0}})
|
||||
if err != ErrDroppedRTMPStream {
|
||||
t.Error("Expecting RTMP Dropped Error, but got: ", err)
|
||||
}
|
||||
}
|
||||
|
||||
//Testing WriteRTMP
|
||||
type PacketsDemuxer struct {
|
||||
c *Counter
|
||||
}
|
||||
|
||||
func (d PacketsDemuxer) Close() error { return nil }
|
||||
func (d PacketsDemuxer) Streams() ([]av.CodecData, error) { return nil, nil }
|
||||
func (d PacketsDemuxer) ReadPacket() (av.Packet, error) {
|
||||
if d.c.Count == 10 {
|
||||
return av.Packet{Data: []byte{0, 0}}, io.EOF
|
||||
}
|
||||
|
||||
d.c.Count = d.c.Count + 1
|
||||
return av.Packet{Data: []byte{0, 0}}, nil
|
||||
}
|
||||
|
||||
func TestWriteRTMP(t *testing.T) {
|
||||
// stream := Stream{Buffer: NewStreamBuffer(), StreamID: "test"}
|
||||
stream := NewStream("test")
|
||||
err := stream.WriteRTMPToStream(context.Background(), PacketsDemuxer{c: &Counter{Count: 0}})
|
||||
|
||||
if err != io.EOF {
|
||||
t.Error("Expecting EOF, but got: ", err)
|
||||
}
|
||||
|
||||
if stream.Len() != 12 { //10 packets, 1 header, 1 trailer
|
||||
t.Error("Expecting buffer length to be 12, but got: ", stream.Len())
|
||||
}
|
||||
|
||||
// fmt.Println(stream.buffer.q.Get(12))
|
||||
|
||||
//TODO: Test what happens when the buffer is full (should evict everything before the last keyframe)
|
||||
}
|
||||
|
||||
var ErrBadHeader = errors.New("BadHeader")
|
||||
var ErrBadPacket = errors.New("BadPacket")
|
||||
|
||||
type BadHeaderMuxer struct{}
|
||||
|
||||
func (d BadHeaderMuxer) Close() error { return nil }
|
||||
func (d BadHeaderMuxer) WriteHeader([]av.CodecData) error { return ErrBadHeader }
|
||||
func (d BadHeaderMuxer) WriteTrailer() error { return nil }
|
||||
func (d BadHeaderMuxer) WritePacket(av.Packet) error { return nil }
|
||||
|
||||
type BadPacketMuxer struct{}
|
||||
|
||||
func (d BadPacketMuxer) Close() error { return nil }
|
||||
func (d BadPacketMuxer) WriteHeader([]av.CodecData) error { return nil }
|
||||
func (d BadPacketMuxer) WriteTrailer() error { return nil }
|
||||
func (d BadPacketMuxer) WritePacket(av.Packet) error { return ErrBadPacket }
|
||||
|
||||
func TestReadRTMPError(t *testing.T) {
|
||||
stream := NewStream("test")
|
||||
err := stream.WriteRTMPToStream(context.Background(), PacketsDemuxer{c: &Counter{Count: 0}})
|
||||
if err != io.EOF {
|
||||
t.Error("Error setting up the test - while inserting packet.")
|
||||
}
|
||||
err = stream.ReadRTMPFromStream(context.Background(), BadHeaderMuxer{})
|
||||
|
||||
if err != ErrBadHeader {
|
||||
t.Error("Expecting bad header error, but got ", err)
|
||||
}
|
||||
|
||||
err = stream.ReadRTMPFromStream(context.Background(), BadPacketMuxer{})
|
||||
if err != ErrBadPacket {
|
||||
t.Error("Expecting bad packet error, but got ", err)
|
||||
}
|
||||
}
|
||||
|
||||
//Test ReadRTMP
|
||||
type PacketsMuxer struct{}
|
||||
|
||||
func (d PacketsMuxer) Close() error { return nil }
|
||||
func (d PacketsMuxer) WriteHeader([]av.CodecData) error { return nil }
|
||||
func (d PacketsMuxer) WriteTrailer() error { return nil }
|
||||
func (d PacketsMuxer) WritePacket(av.Packet) error { return nil }
|
||||
|
||||
func TestReadRTMP(t *testing.T) {
|
||||
stream := NewStream("test")
|
||||
err := stream.WriteRTMPToStream(context.Background(), PacketsDemuxer{c: &Counter{Count: 0}})
|
||||
if err != io.EOF {
|
||||
t.Error("Error setting up the test - while inserting packet.")
|
||||
}
|
||||
readErr := stream.ReadRTMPFromStream(context.Background(), PacketsMuxer{})
|
||||
|
||||
if readErr != io.EOF {
|
||||
t.Error("Expecting buffer to be empty, but got ", err)
|
||||
}
|
||||
|
||||
if stream.Len() != 0 {
|
||||
t.Error("Expecting buffer length to be 0, but got ", stream.Len())
|
||||
}
|
||||
|
||||
stream2 := NewStream("test2")
|
||||
stream2.RTMPTimeout = time.Millisecond * 50
|
||||
err2 := stream.WriteRTMPToStream(context.Background(), NoEOFDemuxer{c: &Counter{Count: 0}})
|
||||
if err2 != ErrDroppedRTMPStream {
|
||||
t.Error("Error setting up the test - while inserting packet.")
|
||||
}
|
||||
err2 = stream2.ReadRTMPFromStream(context.Background(), PacketsMuxer{})
|
||||
if err2 != ErrTimeout {
|
||||
t.Error("Expecting timeout, but got", err2)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWriteHLS(t *testing.T) {
|
||||
stream := NewStream("test")
|
||||
err1 := stream.WriteHLSPlaylistToStream(m3u8.MediaPlaylist{})
|
||||
err2 := stream.WriteHLSSegmentToStream(HLSSegment{})
|
||||
if err1 != nil {
|
||||
t.Error("Shouldn't be error writing playlist, but got:", err1)
|
||||
}
|
||||
if err2 != nil {
|
||||
t.Error("Shouldn't be error writing segment, but got:", err2)
|
||||
}
|
||||
if stream.buffer.len() != 2 {
|
||||
t.Error("Should have 2 packet, but got:", stream.buffer.len())
|
||||
}
|
||||
}
|
||||
|
||||
// struct TestHLSBuffer struct{}
|
||||
// func (b *TestHLSBuffer) WritePlaylist(m3u8.MediaPlaylist) error {
|
||||
|
||||
// }
|
||||
|
||||
// func (b *TestHLSBuffer) WriteSegment(name string, s []byte) error {
|
||||
|
||||
// }
|
||||
|
||||
func TestReadHLS(t *testing.T) {
|
||||
stream := NewStream("test")
|
||||
stream.HLSTimeout = time.Millisecond * 100
|
||||
buffer := NewHLSBuffer()
|
||||
grBefore := runtime.NumGoroutine()
|
||||
stream.WriteHLSPlaylistToStream(m3u8.MediaPlaylist{SeqNo: 100})
|
||||
for i := 0; i < 9; i++ {
|
||||
stream.WriteHLSSegmentToStream(HLSSegment{Name: "test" + string(i), Data: []byte{0}})
|
||||
}
|
||||
|
||||
ec := make(chan error, 1)
|
||||
go func() { ec <- stream.ReadHLSFromStream(buffer) }()
|
||||
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
if buffer.sq.Count() != 9 {
|
||||
t.Error("Should have 9 packets in the buffer, but got:", buffer.sq.Count())
|
||||
}
|
||||
|
||||
if buffer.plCache.SeqNo != 100 {
|
||||
t.Error("Should have inserted a playlist with SeqNo of 100")
|
||||
}
|
||||
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
grAfter := runtime.NumGoroutine()
|
||||
if grBefore != grAfter {
|
||||
t.Errorf("Should have %v Go routines, but have %v", grBefore, grAfter)
|
||||
}
|
||||
}
|
||||
|
||||
// type GoodHLSDemux struct{}
|
||||
|
||||
// func (d GoodHLSDemux) WaitAndPopPlaylist(ctx context.Context) (m3u8.MediaPlaylist, error) {
|
||||
// return m3u8.MediaPlaylist{}, nil
|
||||
// for i := 0; i < 2; i++ {
|
||||
// pc <- m3u8.MediaPlaylist{}
|
||||
// time.Sleep(time.Millisecond * 50)
|
||||
// }
|
||||
|
||||
// select {
|
||||
// case <-ctx.Done():
|
||||
// return ctx.Err()
|
||||
// }
|
||||
// }
|
||||
// func (d GoodHLSDemux) WaitAndGetSegment(ctx context.Context, name string) ([]byte, error) {
|
||||
// return nil, nil
|
||||
// }
|
||||
|
||||
// func (d GoodHLSDemux) PollPlaylist(ctx context.Context, pc chan m3u8.MediaPlaylist) error {
|
||||
// for i := 0; i < 2; i++ {
|
||||
// pc <- m3u8.MediaPlaylist{}
|
||||
// time.Sleep(time.Millisecond * 50)
|
||||
// }
|
||||
|
||||
// select {
|
||||
// case <-ctx.Done():
|
||||
// return ctx.Err()
|
||||
// }
|
||||
// }
|
||||
|
||||
// func (d GoodHLSDemux) PollSegment(ctx context.Context, sc chan m3u8.MediaSegment) error {
|
||||
// for i := 0; i < 4; i++ {
|
||||
// sc <- m3u8.MediaSegment{}
|
||||
// time.Sleep(time.Millisecond * 50)
|
||||
// }
|
||||
|
||||
// return io.EOF
|
||||
// }
|
||||
|
||||
// func TestWriteHLS(t *testing.T) {
|
||||
// stream := NewStream("test")
|
||||
// numGR := runtime.NumGoroutine()
|
||||
// ctx, cancel := context.WithCancel(context.Background())
|
||||
// err := stream.WriteHLSToStream(ctx, GoodHLSDemux{})
|
||||
// cancel()
|
||||
|
||||
// if err != io.EOF {
|
||||
// t.Error("Expecting EOF, but got:", err)
|
||||
// }
|
||||
|
||||
// if stream.buffer.len() != 6 {
|
||||
// t.Error("Expecting 6 packets in buffer, but got:", stream.buffer.len())
|
||||
// }
|
||||
|
||||
// time.Sleep(time.Millisecond * 100)
|
||||
// if numGR != runtime.NumGoroutine() {
|
||||
// t.Errorf("NumGoroutine not equal. Before:%v, After:%v", numGR, runtime.NumGoroutine())
|
||||
// }
|
||||
// }
|
||||
|
||||
// type TimeoutHLSDemux struct{}
|
||||
|
||||
// func (d TimeoutHLSDemux) PollPlaylist(ctx context.Context, pc chan m3u8.MediaPlaylist) error {
|
||||
// select {
|
||||
// case <-ctx.Done():
|
||||
// return ctx.Err()
|
||||
// }
|
||||
// }
|
||||
|
||||
// func (d TimeoutHLSDemux) PollSegment(ctx context.Context, sc chan m3u8.MediaSegment) error {
|
||||
// select {
|
||||
// case <-ctx.Done():
|
||||
// return ctx.Err()
|
||||
// }
|
||||
// }
|
||||
|
||||
// //This test is more for documentation - this is how timeout works here.
|
||||
// func TestWriteHLSTimeout(t *testing.T) {
|
||||
// stream := NewStream("test")
|
||||
// numGR := runtime.NumGoroutine()
|
||||
// ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100)
|
||||
// defer cancel()
|
||||
// err := stream.WriteHLSToStream(ctx, TimeoutHLSDemux{})
|
||||
|
||||
// if err != context.DeadlineExceeded {
|
||||
// t.Error("Expecting EOF, but got:", err)
|
||||
// }
|
||||
|
||||
// if stream.buffer.len() != 0 {
|
||||
// t.Error("Expecting 0 packets in buffer, but got:", stream.buffer.len())
|
||||
// }
|
||||
|
||||
// if numGR != runtime.NumGoroutine() {
|
||||
// t.Errorf("NumGoroutine not equal. Before:%v, After:%v", numGR, runtime.NumGoroutine())
|
||||
// }
|
||||
// }
|
||||
|
||||
// //Test ReadRTMP Errors
|
||||
// type FakeStreamBuffer struct {
|
||||
// c *Counter
|
||||
// }
|
||||
|
||||
// func (b *FakeStreamBuffer) Push(in interface{}) error { return nil }
|
||||
// func (b *FakeStreamBuffer) Pop() (interface{}, error) {
|
||||
// // fmt.Println("pop, count:", b.c.Count)
|
||||
// switch b.c.Count {
|
||||
// case 10:
|
||||
// b.c.Count = b.c.Count - 1
|
||||
// // i := &BufferItem{Type: RTMPHeader, Data: []av.CodecData{}}
|
||||
// // h, _ := Serialize(i)
|
||||
// // return h, nil
|
||||
// return []av.CodecData{}, nil
|
||||
// case 0:
|
||||
// return nil, ErrBufferEmpty
|
||||
// default:
|
||||
// b.c.Count = b.c.Count - 1
|
||||
// // i := &BufferItem{Type: RTMPPacket, Data: av.Packet{}}
|
||||
// // p, _ := Serialize(i)
|
||||
// // return p, nil
|
||||
// return av.Packet{}, nil
|
||||
// }
|
||||
// }
|
202
transcoder/external.go
Normal file
202
transcoder/external.go
Normal file
@@ -0,0 +1,202 @@
|
||||
package transcoder
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"github.com/kz26/m3u8"
|
||||
"github.com/livepeer/lpms/stream"
|
||||
"github.com/nareix/joy4/av"
|
||||
joy4rtmp "github.com/nareix/joy4/format/rtmp"
|
||||
cmap "github.com/orcaman/concurrent-map"
|
||||
)
|
||||
|
||||
var ErrTranscoderConnRefused = errors.New("Connection Refused for Local External Transcoder")
|
||||
var ErrHLSDownloadTimeout = errors.New("HLS Download Timeout")
|
||||
var ErrUnsupportFormat = errors.New("Unsupported Format")
|
||||
var ErrNotFound = errors.New("Not Found")
|
||||
|
||||
type ExternalTranscoder struct {
|
||||
localSRSRTMPPort string
|
||||
localSRSHTTPPort string
|
||||
streamID string
|
||||
downloader HLSDownloader
|
||||
|
||||
//TODO: Keep track of local SRS instance
|
||||
}
|
||||
|
||||
func New(rtmpPort string, srsHTTPPort string, streamID string) *ExternalTranscoder {
|
||||
m := cmap.New()
|
||||
d := SRSHLSDownloader{cache: &m, localEndpoint: "http://localhost:" + srsHTTPPort + "/stream/", streamID: streamID, startDownloadWaitTime: time.Second * 10, hlsIntervalWaitTime: time.Second}
|
||||
return &ExternalTranscoder{localSRSRTMPPort: rtmpPort, localSRSHTTPPort: srsHTTPPort, streamID: streamID, downloader: d}
|
||||
}
|
||||
|
||||
func (et *ExternalTranscoder) StartService() {
|
||||
//Start SRS
|
||||
}
|
||||
|
||||
//LocalSRSUploadMux Convenience method to get a mux
|
||||
func (et *ExternalTranscoder) LocalSRSUploadMux() (av.MuxCloser, error) {
|
||||
url := "rtmp://localhost:" + et.localSRSRTMPPort + "/stream/" + et.streamID
|
||||
glog.Infof("SRS Upload path: %v", url)
|
||||
rtmpMux, err := joy4rtmp.Dial(url)
|
||||
if err != nil {
|
||||
glog.Errorf("Transcoder RTMP Stream Publish Error: %v. Make sure you have started your local SRS instance correctly.", err)
|
||||
return nil, err
|
||||
}
|
||||
return rtmpMux, nil
|
||||
}
|
||||
|
||||
//StartUpload takes a io.Stream of RTMP stream, and loads it into a local RTMP endpoint. The streamID will be used as the streaming endpoint.
|
||||
//So if you want to create a new stream, make sure to do that before passing in the stream.
|
||||
func (et *ExternalTranscoder) StartUpload(ctx context.Context, rtmpMux av.MuxCloser, src *stream.Stream) error {
|
||||
upErrC := make(chan error, 1)
|
||||
|
||||
go func() { upErrC <- src.ReadRTMPFromStream(ctx, rtmpMux) }()
|
||||
|
||||
select {
|
||||
case err := <-upErrC:
|
||||
return err
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
//StartDownload pushes hls playlists and segments into the stream as they become available from the transcoder.
|
||||
func (et *ExternalTranscoder) StartDownload(ctx context.Context, hlsMux *stream.Stream) error {
|
||||
pc := make(chan *m3u8.MediaPlaylist)
|
||||
sc := make(chan *stream.HLSSegment)
|
||||
ec := make(chan error)
|
||||
go func() { ec <- et.downloader.Download(pc, sc) }()
|
||||
for {
|
||||
select {
|
||||
case pl := <-pc:
|
||||
err := hlsMux.WriteHLSPlaylistToStream(*pl)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case seg := <-sc:
|
||||
err := hlsMux.WriteHLSSegmentToStream(*seg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case err := <-ec:
|
||||
glog.Errorf("HLS Download Error: %v", err)
|
||||
return err
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//HLSDownloader doesn't take m3u8.MediaSegment because it doesn't contain the actual data
|
||||
type HLSDownloader interface {
|
||||
Download(pc chan *m3u8.MediaPlaylist, sc chan *stream.HLSSegment) error
|
||||
}
|
||||
|
||||
type SRSHLSDownloader struct {
|
||||
cache *cmap.ConcurrentMap
|
||||
localEndpoint string
|
||||
streamID string
|
||||
startDownloadWaitTime time.Duration
|
||||
hlsIntervalWaitTime time.Duration
|
||||
}
|
||||
|
||||
//Download only pushes a playlist onto the channel when there is a new segment in it.
|
||||
func (d SRSHLSDownloader) Download(pc chan *m3u8.MediaPlaylist, sc chan *stream.HLSSegment) error {
|
||||
before := time.Now()
|
||||
plURL := d.localEndpoint + d.streamID + ".m3u8"
|
||||
glog.Infof("SRS Playlist Download Path: ", plURL)
|
||||
|
||||
for {
|
||||
pl, errp := DownloadPlaylist(plURL)
|
||||
if errp == ErrNotFound && time.Since(before) < d.startDownloadWaitTime { //only sleep wait for until the start download time
|
||||
time.Sleep(time.Second * 5)
|
||||
continue
|
||||
} else if errp != nil {
|
||||
glog.Errorf("Transcoder HLS Playlist Download Error: %v", errp)
|
||||
return errp
|
||||
}
|
||||
|
||||
sendpl := false
|
||||
for _, seginfo := range pl.Segments {
|
||||
if seginfo == nil {
|
||||
continue
|
||||
}
|
||||
if _, found := d.cache.Get(seginfo.URI); found == false {
|
||||
seg, errs := DownloadSegment(d.localEndpoint, seginfo)
|
||||
if errs != nil {
|
||||
glog.Errorf("Transcoder HLS Segment Download Error: %v", errp)
|
||||
return errs
|
||||
}
|
||||
sc <- &stream.HLSSegment{Name: seginfo.URI, Data: seg}
|
||||
|
||||
d.cache.Set(seginfo.URI, true)
|
||||
sendpl = true
|
||||
}
|
||||
}
|
||||
|
||||
if sendpl {
|
||||
pc <- pl
|
||||
}
|
||||
|
||||
time.Sleep(d.hlsIntervalWaitTime)
|
||||
}
|
||||
}
|
||||
|
||||
func DownloadSegment(endpoint string, seginfo *m3u8.MediaSegment) ([]byte, error) {
|
||||
req, err := http.NewRequest("GET", endpoint+seginfo.URI, nil)
|
||||
if err != nil {
|
||||
glog.Errorf("Transcoder HLS Segment Download Error: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
client := http.Client{}
|
||||
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
glog.Errorf("Transcoder HLS Segment Download Error: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
buf := new(bytes.Buffer)
|
||||
_, err = io.Copy(buf, resp.Body)
|
||||
if err != nil {
|
||||
glog.Errorf("Segment Download Error: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return buf.Bytes(), nil
|
||||
}
|
||||
|
||||
func DownloadPlaylist(endpointUrl string) (*m3u8.MediaPlaylist, error) {
|
||||
req, err := http.NewRequest("GET", endpointUrl, nil)
|
||||
if err != nil {
|
||||
glog.Errorf("Transcoder HLS Download Error: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
client := http.Client{}
|
||||
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
glog.Errorf("Transcoder HLS Download Error: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
playlist, listType, err := m3u8.DecodeFrom(resp.Body, true)
|
||||
|
||||
if playlist == nil {
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
|
||||
if listType == m3u8.MEDIA {
|
||||
mpl := playlist.(*m3u8.MediaPlaylist)
|
||||
return mpl, nil
|
||||
}
|
||||
|
||||
return nil, ErrUnsupportFormat
|
||||
}
|
171
transcoder/external_test.go
Normal file
171
transcoder/external_test.go
Normal file
@@ -0,0 +1,171 @@
|
||||
package transcoder
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"testing"
|
||||
|
||||
"github.com/kz26/m3u8"
|
||||
"github.com/livepeer/lpms/stream"
|
||||
"github.com/nareix/joy4/av"
|
||||
)
|
||||
|
||||
type Counter struct {
|
||||
Count int
|
||||
}
|
||||
type PacketsDemuxer struct {
|
||||
c *Counter
|
||||
}
|
||||
|
||||
func (d PacketsDemuxer) Close() error { return nil }
|
||||
func (d PacketsDemuxer) Streams() ([]av.CodecData, error) { return []av.CodecData{}, nil }
|
||||
func (d PacketsDemuxer) ReadPacket() (av.Packet, error) {
|
||||
if d.c.Count == 10 {
|
||||
return av.Packet{}, io.EOF
|
||||
}
|
||||
|
||||
d.c.Count = d.c.Count + 1
|
||||
return av.Packet{Data: []byte{0, 0}}, nil
|
||||
}
|
||||
|
||||
type PacketsMuxer struct{ NumWrites int32 }
|
||||
|
||||
func (d *PacketsMuxer) Close() error { return nil }
|
||||
func (d *PacketsMuxer) WriteHeader([]av.CodecData) error {
|
||||
d.NumWrites = d.NumWrites + 1
|
||||
return nil
|
||||
}
|
||||
func (d *PacketsMuxer) WriteTrailer() error {
|
||||
// fmt.Println("writing Trailer")
|
||||
d.NumWrites = d.NumWrites + 1
|
||||
return nil
|
||||
}
|
||||
func (d *PacketsMuxer) WritePacket(av.Packet) error {
|
||||
// fmt.Println("writing packet")
|
||||
d.NumWrites = d.NumWrites + 1
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestStartUpload(t *testing.T) {
|
||||
tr := &ExternalTranscoder{}
|
||||
mux := &PacketsMuxer{}
|
||||
demux := &PacketsDemuxer{c: &Counter{}}
|
||||
stream := stream.NewStream("test")
|
||||
stream.WriteRTMPToStream(context.Background(), demux)
|
||||
ctx := context.Background()
|
||||
|
||||
err := tr.StartUpload(ctx, mux, stream)
|
||||
if err != io.EOF {
|
||||
t.Error("Should have gotten EOF, but got:", err)
|
||||
}
|
||||
|
||||
if mux.NumWrites != 12 {
|
||||
t.Error("Should have written 12 packets. Instead we got:", mux.NumWrites)
|
||||
}
|
||||
}
|
||||
|
||||
type Downloader struct{}
|
||||
|
||||
func (d Downloader) Download(pc chan *m3u8.MediaPlaylist, sc chan *stream.HLSSegment) error {
|
||||
pl := m3u8.MediaPlaylist{}
|
||||
pc <- &pl
|
||||
for i := 0; i < 9; i++ {
|
||||
seg := stream.HLSSegment{}
|
||||
sc <- &seg
|
||||
}
|
||||
return io.EOF
|
||||
}
|
||||
|
||||
func TestStartDownload(t *testing.T) {
|
||||
// fmt.Println("Testing Download")
|
||||
d := Downloader{}
|
||||
s := stream.NewStream("test")
|
||||
tr := &ExternalTranscoder{downloader: d}
|
||||
err := tr.StartDownload(context.Background(), s)
|
||||
|
||||
if err != io.EOF {
|
||||
t.Error("Expecting EOF, got", err)
|
||||
}
|
||||
|
||||
if s.Len() != 10 {
|
||||
t.Error("Expecting 10 packets, got ", s.Len())
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
//Be running SRS when doing this integration test
|
||||
// func TestDownloader(t *testing.T) {
|
||||
// fmt.Println("Testing Downloader - Integration Test")
|
||||
// m := cmap.New()
|
||||
// d := SRSHLSDownloader{cache: &m, localEndpoint: "http://localhost:7936/stream/", streamID: "live.m3u8", startDownloadWaitTime: time.Second, hlsIntervalWaitTime: time.Second * 5}
|
||||
// pc := make(chan *m3u8.MediaPlaylist)
|
||||
// sc := make(chan *lpmsio.HLSSegment)
|
||||
// ec := make(chan error, 1)
|
||||
// hlsBuffer := lpmsio.NewHLSBuffer()
|
||||
|
||||
// //Do the download into the channel (refer to the end of the method for copying into hlsBuffer)
|
||||
// go func() { ec <- d.Download(pc, sc) }()
|
||||
|
||||
// //Set up the player
|
||||
// player := vidplayer.VidPlayer{}
|
||||
// player.HandleHTTPPlay(func(ctx context.Context, reqPath string, writer io.Writer) error {
|
||||
// if strings.HasSuffix(reqPath, ".m3u8") {
|
||||
// fmt.Println("Got m3u8 req:", reqPath)
|
||||
// pl, err := hlsBuffer.WaitAndGetPlaylist(ctx)
|
||||
// buf := pl.Encode()
|
||||
// bytes := buf.Bytes()
|
||||
// _, werr := writer.Write(bytes)
|
||||
// if werr != nil {
|
||||
// fmt.Println("Error Writing m3u8 playlist: ", err)
|
||||
// }
|
||||
// return nil
|
||||
|
||||
// }
|
||||
|
||||
// if strings.HasSuffix(reqPath, ".ts") {
|
||||
// fmt.Println("Got ts req:", reqPath)
|
||||
// segID := strings.Split(reqPath, "/")[2]
|
||||
// seg, err := hlsBuffer.WaitAndGetSegment(ctx, segID)
|
||||
// fmt.Println("Got seg: ", len(seg))
|
||||
// if err != nil {
|
||||
// fmt.Println("Error Writing ts segs: ", err)
|
||||
// }
|
||||
// _, werr := writer.Write(seg)
|
||||
// if werr != nil {
|
||||
// fmt.Println("Error Writing ts segs: ", err)
|
||||
// }
|
||||
// return nil
|
||||
|
||||
// }
|
||||
|
||||
// return errors.New("Unrecognized req string: " + reqPath)
|
||||
// })
|
||||
|
||||
// //Get the server running
|
||||
// go http.ListenAndServe(":8000", nil)
|
||||
|
||||
// //Do the copying into the buffer
|
||||
// for {
|
||||
// select {
|
||||
// case e := <-ec:
|
||||
// fmt.Println(e)
|
||||
// return
|
||||
// case pl := <-pc:
|
||||
// for _, s := range pl.Segments {
|
||||
// if s != nil {
|
||||
// fmt.Println(s)
|
||||
// }
|
||||
// }
|
||||
// fmt.Println("Writing playlist to hlsBuffer")
|
||||
// hlsBuffer.WritePlaylist(*pl)
|
||||
// case seg := <-sc:
|
||||
// if seg.Name != "" {
|
||||
// fmt.Printf("Writing %v to hlsBuffer\n", seg.Name)
|
||||
// hlsBuffer.WriteSegment(seg.Name, seg.Data)
|
||||
// } else {
|
||||
// fmt.Printf("Skipping writting %v:%v to hlsBuffer\n", seg.Name, len(seg.Data))
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
// }
|
@@ -1,20 +0,0 @@
|
||||
package types
|
||||
|
||||
import "time"
|
||||
|
||||
type HlsSegment struct {
|
||||
Data []byte
|
||||
Name string
|
||||
}
|
||||
|
||||
type Download struct {
|
||||
URI string
|
||||
TotalDuration time.Duration
|
||||
}
|
||||
|
||||
type BroadcastReq struct {
|
||||
formats []string
|
||||
bitrates []string
|
||||
codecin string
|
||||
codecout []string
|
||||
}
|
53
vidlistener/listener.go
Normal file
53
vidlistener/listener.go
Normal file
@@ -0,0 +1,53 @@
|
||||
package vidlistener
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"github.com/livepeer/lpms/stream"
|
||||
joy4rtmp "github.com/nareix/joy4/format/rtmp"
|
||||
)
|
||||
|
||||
type LocalStream struct {
|
||||
StreamID string
|
||||
Timestamp int64
|
||||
}
|
||||
|
||||
type VidListener struct {
|
||||
RtmpServer *joy4rtmp.Server
|
||||
}
|
||||
|
||||
//HandleRTMPPublish writes the published RTMP stream into a stream. It exposes getStreamID so the
|
||||
//user can name the stream, and getStream so the user can keep track of all the streams.
|
||||
func (s *VidListener) HandleRTMPPublish(
|
||||
getStreamID func(reqPath string) (string, error),
|
||||
getStream func(reqPath string) (*stream.Stream, error),
|
||||
endStream func(reqPath string)) error {
|
||||
|
||||
s.RtmpServer.HandlePublish = func(conn *joy4rtmp.Conn) {
|
||||
glog.Infof("RTMP server got upstream")
|
||||
|
||||
streamID, err := getStreamID(conn.URL.Path)
|
||||
if err != nil {
|
||||
glog.Errorf("RTMP Stream Publish Error: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
stream, err := getStream(conn.URL.Path)
|
||||
if err != nil {
|
||||
glog.Errorf("RTMP Publish couldn't get a destination stream for %v", conn.URL.Path)
|
||||
return
|
||||
}
|
||||
|
||||
glog.Infof("Got RTMP Stream: %v", streamID)
|
||||
c := make(chan error, 0)
|
||||
go func() { c <- stream.WriteRTMPToStream(context.Background(), conn) }()
|
||||
select {
|
||||
case err := <-c:
|
||||
endStream(conn.URL.Path)
|
||||
glog.Error("Got error writing RTMP: ", err)
|
||||
}
|
||||
|
||||
}
|
||||
return nil
|
||||
}
|
73
vidlistener/listener_test.go
Normal file
73
vidlistener/listener_test.go
Normal file
@@ -0,0 +1,73 @@
|
||||
package vidlistener
|
||||
|
||||
import (
|
||||
"os/exec"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/livepeer/lpms/stream"
|
||||
joy4rtmp "github.com/nareix/joy4/format/rtmp"
|
||||
)
|
||||
|
||||
func TestError(t *testing.T) {
|
||||
server := &joy4rtmp.Server{Addr: ":1937"}
|
||||
listener := &VidListener{RtmpServer: server}
|
||||
listener.HandleRTMPPublish(
|
||||
func(reqPath string) (string, error) {
|
||||
return "test", nil
|
||||
},
|
||||
func(reqPath string) (*stream.Stream, error) {
|
||||
// return errors.New("Some Error")
|
||||
return &stream.Stream{}, nil
|
||||
},
|
||||
func(reqPath string) {})
|
||||
|
||||
ffmpegCmd := "ffmpeg"
|
||||
ffmpegArgs := []string{"-re", "-i", "../data/bunny2.mp4", "-c", "copy", "-f", "flv", "rtmp://localhost:1937/movie/stream"}
|
||||
go exec.Command(ffmpegCmd, ffmpegArgs...).Run()
|
||||
|
||||
go listener.RtmpServer.ListenAndServe()
|
||||
|
||||
time.Sleep(time.Second * 1)
|
||||
}
|
||||
|
||||
// Integration test.
|
||||
// func TestRTMPWithServer(t *testing.T) {
|
||||
// server := &joy4rtmp.Server{Addr: ":1936"}
|
||||
// listener := &VidListener{RtmpServer: server}
|
||||
// listener.HandleRTMPPublish(
|
||||
// func(reqPath string) (string, error) {
|
||||
// return "teststream", nil
|
||||
// },
|
||||
// func(reqPath string) (*lpmsio.Stream, error) {
|
||||
// header, err := demux.Streams()
|
||||
// if err != nil {
|
||||
// t.Fatal("Failed ot read stream header")
|
||||
// }
|
||||
// fmt.Println("header: ", header)
|
||||
|
||||
// counter := 0
|
||||
// fmt.Println("data: ")
|
||||
// for {
|
||||
// packet, err := demux.ReadPacket()
|
||||
// if err != nil {
|
||||
// t.Fatal("Failed to read packets")
|
||||
// }
|
||||
// fmt.Print("\r", len(packet.Data))
|
||||
// counter = counter + 1
|
||||
// }
|
||||
// },
|
||||
// func(reqPath string) {})
|
||||
// ffmpegCmd := "ffmpeg"
|
||||
// ffmpegArgs := []string{"-re", "-i", "../data/bunny2.mp4", "-c", "copy", "-f", "flv", "rtmp://localhost:1936/movie/stream"}
|
||||
// go exec.Command(ffmpegCmd, ffmpegArgs...).Run()
|
||||
|
||||
// go listener.RtmpServer.ListenAndServe()
|
||||
|
||||
// time.Sleep(time.Second * 1)
|
||||
// if stream := listener.Streams["teststream"]; stream.StreamID != "teststream" {
|
||||
// t.Fatal("Server did not set stream")
|
||||
// }
|
||||
|
||||
// time.Sleep(time.Second * 1)
|
||||
// }
|
88
vidplayer/player.go
Normal file
88
vidplayer/player.go
Normal file
@@ -0,0 +1,88 @@
|
||||
package vidplayer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
|
||||
"strings"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"github.com/livepeer/lpms/stream"
|
||||
"github.com/nareix/joy4/av"
|
||||
joy4rtmp "github.com/nareix/joy4/format/rtmp"
|
||||
)
|
||||
|
||||
//VidPlayer is the module that handles playing video. For now we only support RTMP and HLS play.
|
||||
type VidPlayer struct {
|
||||
RtmpServer *joy4rtmp.Server
|
||||
}
|
||||
|
||||
//HandleRTMPPlay is the handler when there is a RTMP request for a video. The source should write
|
||||
//into the MuxCloser. The easiest way is through avutil.Copy.
|
||||
func (s *VidPlayer) HandleRTMPPlay(getStream func(ctx context.Context, reqPath string, dst av.MuxCloser) error) error {
|
||||
s.RtmpServer.HandlePlay = func(conn *joy4rtmp.Conn) {
|
||||
glog.Infof("LPMS got RTMP request @ %v", conn.URL)
|
||||
|
||||
ctx := context.Background()
|
||||
c := make(chan error, 1)
|
||||
go func() { c <- getStream(ctx, conn.URL.Path, conn) }()
|
||||
select {
|
||||
case err := <-c:
|
||||
glog.Errorf("Rtmp getStream Error: %v", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
//HandleHLSPlay is the handler when there is a HLA request. The source should write the raw bytes into the io.Writer,
|
||||
//for either the playlist or the segment.
|
||||
func (s *VidPlayer) HandleHLSPlay(getHLSBuffer func(reqPath string) (*stream.HLSBuffer, error)) error {
|
||||
http.HandleFunc("/stream/", func(w http.ResponseWriter, r *http.Request) {
|
||||
glog.Infof("LPMS got HTTP request @ %v", r.URL.Path)
|
||||
|
||||
if !strings.HasSuffix(r.URL.Path, ".m3u8") && !strings.HasSuffix(r.URL.Path, ".ts") {
|
||||
http.Error(w, "LPMS only accepts HLS requests over HTTP (m3u8, ts).", 500)
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
// c := make(chan error, 1)
|
||||
// go func() { c <- getStream(ctx, r.URL.Path, w) }()
|
||||
buffer, err := getHLSBuffer(r.URL.Path)
|
||||
if err != nil {
|
||||
glog.Errorf("Error getting HLS Buffer: %v", err)
|
||||
}
|
||||
|
||||
if strings.HasSuffix(r.URL.Path, ".m3u8") {
|
||||
pl, err := buffer.WaitAndPopPlaylist(ctx)
|
||||
if err != nil {
|
||||
glog.Errorf("Error getting HLS playlist %v: %v", r.URL.Path, err)
|
||||
return
|
||||
}
|
||||
_, err = w.Write(pl.Encode().Bytes())
|
||||
if err != nil {
|
||||
glog.Errorf("Error writting HLS playlist %v: %v", r.URL.Path, err)
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if strings.HasSuffix(r.URL.Path, ".ts") {
|
||||
pathArr := strings.Split(r.URL.Path, "/")
|
||||
segName := pathArr[len(pathArr)-1]
|
||||
seg, err := buffer.WaitAndPopSegment(ctx, segName)
|
||||
if err != nil {
|
||||
glog.Errorf("Error getting HLS segment %v: %v", segName, err)
|
||||
return
|
||||
}
|
||||
_, err = w.Write(seg)
|
||||
if err != nil {
|
||||
glog.Errorf("Error writting HLS segment %v: %v", segName, err)
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
http.Error(w, "Cannot find HTTP video resource: "+r.URL.Path, 500)
|
||||
})
|
||||
return nil
|
||||
}
|
109
vidplayer/player_test.go
Normal file
109
vidplayer/player_test.go
Normal file
@@ -0,0 +1,109 @@
|
||||
package vidplayer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"time"
|
||||
|
||||
"github.com/kz26/m3u8"
|
||||
"github.com/livepeer/lpms/stream"
|
||||
"github.com/nareix/joy4/av"
|
||||
"github.com/nareix/joy4/av/avutil"
|
||||
joy4rtmp "github.com/nareix/joy4/format/rtmp"
|
||||
)
|
||||
|
||||
func TestRTMP(t *testing.T) {
|
||||
server := &joy4rtmp.Server{Addr: ":1936"}
|
||||
player := &VidPlayer{RtmpServer: server}
|
||||
var demuxer av.Demuxer
|
||||
gotUpvid := false
|
||||
gotPlayvid := false
|
||||
player.RtmpServer.HandlePublish = func(conn *joy4rtmp.Conn) {
|
||||
gotUpvid = true
|
||||
demuxer = conn
|
||||
}
|
||||
|
||||
player.HandleRTMPPlay(func(ctx context.Context, reqPath string, dst av.MuxCloser) error {
|
||||
gotPlayvid = true
|
||||
fmt.Println(reqPath)
|
||||
avutil.CopyFile(dst, demuxer)
|
||||
return nil
|
||||
})
|
||||
|
||||
// go server.ListenAndServe()
|
||||
|
||||
// ffmpegCmd := "ffmpeg"
|
||||
// ffmpegArgs := []string{"-re", "-i", "../data/bunny2.mp4", "-c", "copy", "-f", "flv", "rtmp://localhost:1936/movie/stream"}
|
||||
// go exec.Command(ffmpegCmd, ffmpegArgs...).Run()
|
||||
|
||||
// time.Sleep(time.Second * 1)
|
||||
|
||||
// if gotUpvid == false {
|
||||
// t.Fatal("Didn't get the upstream video")
|
||||
// }
|
||||
|
||||
// ffplayCmd := "ffplay"
|
||||
// ffplayArgs := []string{"rtmp://localhost:1936/movie/stream"}
|
||||
// go exec.Command(ffplayCmd, ffplayArgs...).Run()
|
||||
|
||||
// time.Sleep(time.Second * 1)
|
||||
// if gotPlayvid == false {
|
||||
// t.Fatal("Didn't get the downstream video")
|
||||
// }
|
||||
}
|
||||
|
||||
func TestHLS(t *testing.T) {
|
||||
player := &VidPlayer{}
|
||||
s := stream.NewStream("test")
|
||||
s.HLSTimeout = time.Second * 5
|
||||
//Write some packets into the stream
|
||||
s.WriteHLSPlaylistToStream(m3u8.MediaPlaylist{})
|
||||
s.WriteHLSSegmentToStream(stream.HLSSegment{})
|
||||
var buffer *stream.HLSBuffer
|
||||
player.HandleHLSPlay(func(reqPath string) (*stream.HLSBuffer, error) {
|
||||
//if can't find local cache, start downloading, and store in cache.
|
||||
if buffer == nil {
|
||||
buffer := stream.NewHLSBuffer()
|
||||
ec := make(chan error, 1)
|
||||
go func() { ec <- s.ReadHLSFromStream(buffer) }()
|
||||
// select {
|
||||
// case err := <-ec:
|
||||
// return err
|
||||
// }
|
||||
}
|
||||
return buffer, nil
|
||||
|
||||
// if strings.HasSuffix(reqPath, ".m3u8") {
|
||||
// pl, err := buffer.WaitAndPopPlaylist(ctx)
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// _, err = writer.Write(pl.Encode().Bytes())
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// return nil, nil
|
||||
// }
|
||||
|
||||
// if strings.HasSuffix(reqPath, ".ts") {
|
||||
// pathArr := strings.Split(reqPath, "/")
|
||||
// segName := pathArr[len(pathArr)-1]
|
||||
// seg, err := buffer.WaitAndPopSegment(ctx, segName)
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// _, err = writer.Write(seg)
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// }
|
||||
|
||||
// return nil, lpmsio.ErrNotFound
|
||||
})
|
||||
|
||||
// go http.ListenAndServe(":8000", nil)
|
||||
|
||||
//TODO: Add tests for checking if packets were written, etc.
|
||||
}
|
Reference in New Issue
Block a user