RTMP streaming example added.

This commit is contained in:
zergon321
2023-03-05 17:16:24 +03:00
parent 4dc6bc6f5c
commit 9e8a6c89ad
7 changed files with 419 additions and 2 deletions

View File

@@ -0,0 +1,9 @@
version: "3.7"
services:
rtmp-streamer:
image: tiangolo/nginx-rtmp
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
ports:
- 1935:1935

365
examples/rtmp/main.go Normal file
View File

@@ -0,0 +1,365 @@
package main
import (
"bytes"
"encoding/binary"
"fmt"
"image"
"time"
"github.com/faiface/beep"
"github.com/faiface/beep/speaker"
"github.com/hajimehoshi/ebiten"
_ "github.com/silbinarywolf/preferdiscretegpu"
"github.com/zergon321/reisen"
)
const (
width = 1280
height = 720
frameBufferSize = 1024
sampleRate = 44100
channelCount = 2
bitDepth = 8
sampleBufferSize = 32 * channelCount * bitDepth * 1024
SpeakerSampleRate beep.SampleRate = 44100
)
// readVideoAndAudio reads video and audio frames
// from the opened media and sends the decoded
// data to che channels to be played.
func readVideoAndAudio(media *reisen.Media) (<-chan *image.RGBA, <-chan [2]float64, chan error, error) {
frameBuffer := make(chan *image.RGBA,
frameBufferSize)
sampleBuffer := make(chan [2]float64, sampleBufferSize)
errs := make(chan error)
err := media.OpenDecode()
if err != nil {
return nil, nil, nil, err
}
videoStream := media.VideoStreams()[0]
err = videoStream.Open()
if err != nil {
return nil, nil, nil, err
}
audioStream := media.AudioStreams()[0]
err = audioStream.Open()
if err != nil {
return nil, nil, nil, err
}
/*err = media.Streams()[0].Rewind(60 * time.Second)
if err != nil {
return nil, nil, nil, err
}*/
/*err = media.Streams()[0].ApplyFilter("h264_mp4toannexb")
if err != nil {
return nil, nil, nil, err
}*/
go func() {
for {
packet, gotPacket, err := media.ReadPacket()
if err != nil {
go func(err error) {
errs <- err
}(err)
}
if !gotPacket {
break
}
/*hash := sha256.Sum256(packet.Data())
fmt.Println(base58.Encode(hash[:]))*/
switch packet.Type() {
case reisen.StreamVideo:
s := media.Streams()[packet.StreamIndex()].(*reisen.VideoStream)
videoFrame, gotFrame, err := s.ReadVideoFrame()
if err != nil {
go func(err error) {
errs <- err
}(err)
}
if !gotFrame {
break
}
if videoFrame == nil {
continue
}
offset, err := videoFrame.PresentationOffset()
fmt.Println("video frame offset:", offset, err)
frameBuffer <- videoFrame.Image()
case reisen.StreamAudio:
s := media.Streams()[packet.StreamIndex()].(*reisen.AudioStream)
audioFrame, gotFrame, err := s.ReadAudioFrame()
if err != nil {
go func(err error) {
errs <- err
}(err)
}
if !gotFrame {
break
}
if audioFrame == nil {
continue
}
offset, err := audioFrame.PresentationOffset()
fmt.Println("audio frame offset:", offset, err)
// Turn the raw byte data into
// audio samples of type [2]float64.
reader := bytes.NewReader(audioFrame.Data())
// See the README.md file for
// detailed scheme of the sample structure.
for reader.Len() > 0 {
sample := [2]float64{0, 0}
var result float64
err = binary.Read(reader, binary.LittleEndian, &result)
if err != nil {
go func(err error) {
errs <- err
}(err)
}
sample[0] = result
err = binary.Read(reader, binary.LittleEndian, &result)
if err != nil {
go func(err error) {
errs <- err
}(err)
}
sample[1] = result
sampleBuffer <- sample
}
}
}
videoStream.Close()
audioStream.Close()
media.CloseDecode()
close(frameBuffer)
close(sampleBuffer)
close(errs)
}()
return frameBuffer, sampleBuffer, errs, nil
}
// streamSamples creates a new custom streamer for
// playing audio samples provided by the source channel.
//
// See https://github.com/faiface/beep/wiki/Making-own-streamers
// for reference.
func streamSamples(sampleSource <-chan [2]float64) beep.Streamer {
return beep.StreamerFunc(func(samples [][2]float64) (n int, ok bool) {
numRead := 0
for i := 0; i < len(samples); i++ {
sample, ok := <-sampleSource
if !ok {
numRead = i + 1
break
}
samples[i] = sample
numRead++
}
if numRead < len(samples) {
return numRead, false
}
return numRead, true
})
}
// Game holds all the data
// necessary for playing video.
type Game struct {
videoSprite *ebiten.Image
ticker <-chan time.Time
errs <-chan error
frameBuffer <-chan *image.RGBA
fps int
videoTotalFramesPlayed int
videoPlaybackFPS int
perSecond <-chan time.Time
last time.Time
deltaTime float64
}
// Strarts reading samples and frames
// of the media file.
func (game *Game) Start(fname string) error {
// For RTMP stream reading.
err := reisen.NetworkInitialize()
handleError(err)
defer reisen.NetworkDeinitialize()
// Initialize the audio speaker.
err = speaker.Init(sampleRate,
SpeakerSampleRate.N(time.Second/10))
if err != nil {
return err
}
// Sprite for drawing video frames.
game.videoSprite, err = ebiten.NewImage(
width, height, ebiten.FilterDefault)
if err != nil {
return err
}
// Open the media file.
media, err := reisen.NewMedia(fname)
if err != nil {
return err
}
// Get the FPS for playing
// video frames.
videoFPS, _ := media.VideoStreams()[0].FrameRate()
if err != nil {
return err
}
// SPF for frame ticker.
spf := 1.0 / float64(videoFPS)
frameDuration, err := time.
ParseDuration(fmt.Sprintf("%fs", spf))
if err != nil {
return err
}
// Start decoding streams.
var sampleSource <-chan [2]float64
game.frameBuffer, sampleSource,
game.errs, err = readVideoAndAudio(media)
if err != nil {
return err
}
// Start playing audio samples.
speaker.Play(streamSamples(sampleSource))
game.ticker = time.Tick(frameDuration)
// Setup metrics.
game.last = time.Now()
game.fps = 0
game.perSecond = time.Tick(time.Second)
game.videoTotalFramesPlayed = 0
game.videoPlaybackFPS = 0
return nil
}
func (game *Game) Update(screen *ebiten.Image) error {
// Compute dt.
game.deltaTime = time.Since(game.last).Seconds()
game.last = time.Now()
// Check for incoming errors.
select {
case err, ok := <-game.errs:
if ok {
return err
}
default:
}
// Read video frames and draw them.
select {
case <-game.ticker:
frame, ok := <-game.frameBuffer
if ok {
game.videoSprite.ReplacePixels(frame.Pix)
game.videoTotalFramesPlayed++
game.videoPlaybackFPS++
}
default:
}
// Draw the video sprite.
op := &ebiten.DrawImageOptions{}
err := screen.DrawImage(game.videoSprite, op)
if err != nil {
return err
}
game.fps++
// Update metrics in the window title.
select {
case <-game.perSecond:
ebiten.SetWindowTitle(fmt.Sprintf("%s | FPS: %d | dt: %f | Frames: %d | Video FPS: %d",
"Video", game.fps, game.deltaTime, game.videoTotalFramesPlayed, game.videoPlaybackFPS))
game.fps = 0
game.videoPlaybackFPS = 0
default:
}
return nil
}
func (game *Game) Layout(a, b int) (int, int) {
return width, height
}
func main() {
game := &Game{}
err := game.Start("rtmp://127.0.0.1/live/stream")
handleError(err)
ebiten.SetWindowSize(width, height)
ebiten.SetWindowTitle("Video")
err = ebiten.RunGame(game)
handleError(err)
}
func handleError(err error) {
if err != nil {
panic(err)
}
}

16
examples/rtmp/nginx.conf Normal file
View File

@@ -0,0 +1,16 @@
worker_processes auto;
rtmp_auto_push on;
events {}
rtmp {
server {
listen 1935;
chunk_size 4096;
allow publish 0.0.0.0;
application live {
live on;
record off;
}
}
}

1
examples/rtmp/stream.sh Executable file
View File

@@ -0,0 +1 @@
ffmpeg -re -i video.mp4 -c:v copy -c:a aac -ar 44100 -ac 1 -f flv rtmp://127.0.0.1/live/stream

BIN
examples/rtmp/video.mp4 Executable file

Binary file not shown.

View File

@@ -105,8 +105,6 @@ func (media *Media) FormatMIMEType() string {
// from the media container.
func (media *Media) findStreams() error {
streams := []Stream{}
innerStreams := unsafe.Slice(
media.ctx.streams, media.ctx.nb_streams)
status := C.avformat_find_stream_info(media.ctx, nil)
if status < 0 {
@@ -114,6 +112,9 @@ func (media *Media) findStreams() error {
"couldn't find stream information")
}
innerStreams := unsafe.Slice(
media.ctx.streams, media.ctx.nb_streams)
for _, innerStream := range innerStreams {
codecParams := innerStream.codecpar
codec := C.avcodec_find_decoder(codecParams.codec_id)

25
network.go Normal file
View File

@@ -0,0 +1,25 @@
package reisen
// #include <libavformat/avformat.h>
import "C"
import "fmt"
func NetworkInitialize() error {
code := C.avformat_network_init()
if code < 0 {
return fmt.Errorf("error occurred: 0x%X", code)
}
return nil
}
func NetworkDeinitialize() error {
code := C.avformat_network_deinit()
if code < 0 {
return fmt.Errorf("error occurred: 0x%X", code)
}
return nil
}