diff --git a/examples/rtmp/docker-compose.yml b/examples/rtmp/docker-compose.yml new file mode 100644 index 0000000..238bd60 --- /dev/null +++ b/examples/rtmp/docker-compose.yml @@ -0,0 +1,9 @@ +version: "3.7" + +services: + rtmp-streamer: + image: tiangolo/nginx-rtmp + volumes: + - ./nginx.conf:/etc/nginx/nginx.conf + ports: + - 1935:1935 \ No newline at end of file diff --git a/examples/rtmp/main.go b/examples/rtmp/main.go new file mode 100644 index 0000000..9f8e6ed --- /dev/null +++ b/examples/rtmp/main.go @@ -0,0 +1,365 @@ +package main + +import ( + "bytes" + "encoding/binary" + "fmt" + "image" + "time" + + "github.com/faiface/beep" + "github.com/faiface/beep/speaker" + "github.com/hajimehoshi/ebiten" + _ "github.com/silbinarywolf/preferdiscretegpu" + "github.com/zergon321/reisen" +) + +const ( + width = 1280 + height = 720 + frameBufferSize = 1024 + sampleRate = 44100 + channelCount = 2 + bitDepth = 8 + sampleBufferSize = 32 * channelCount * bitDepth * 1024 + SpeakerSampleRate beep.SampleRate = 44100 +) + +// readVideoAndAudio reads video and audio frames +// from the opened media and sends the decoded +// data to che channels to be played. +func readVideoAndAudio(media *reisen.Media) (<-chan *image.RGBA, <-chan [2]float64, chan error, error) { + frameBuffer := make(chan *image.RGBA, + frameBufferSize) + sampleBuffer := make(chan [2]float64, sampleBufferSize) + errs := make(chan error) + + err := media.OpenDecode() + + if err != nil { + return nil, nil, nil, err + } + + videoStream := media.VideoStreams()[0] + err = videoStream.Open() + + if err != nil { + return nil, nil, nil, err + } + + audioStream := media.AudioStreams()[0] + err = audioStream.Open() + + if err != nil { + return nil, nil, nil, err + } + + /*err = media.Streams()[0].Rewind(60 * time.Second) + + if err != nil { + return nil, nil, nil, err + }*/ + + /*err = media.Streams()[0].ApplyFilter("h264_mp4toannexb") + + if err != nil { + return nil, nil, nil, err + }*/ + + go func() { + for { + packet, gotPacket, err := media.ReadPacket() + + if err != nil { + go func(err error) { + errs <- err + }(err) + } + + if !gotPacket { + break + } + + /*hash := sha256.Sum256(packet.Data()) + fmt.Println(base58.Encode(hash[:]))*/ + + switch packet.Type() { + case reisen.StreamVideo: + s := media.Streams()[packet.StreamIndex()].(*reisen.VideoStream) + videoFrame, gotFrame, err := s.ReadVideoFrame() + + if err != nil { + go func(err error) { + errs <- err + }(err) + } + + if !gotFrame { + break + } + + if videoFrame == nil { + continue + } + + offset, err := videoFrame.PresentationOffset() + fmt.Println("video frame offset:", offset, err) + + frameBuffer <- videoFrame.Image() + + case reisen.StreamAudio: + s := media.Streams()[packet.StreamIndex()].(*reisen.AudioStream) + audioFrame, gotFrame, err := s.ReadAudioFrame() + + if err != nil { + go func(err error) { + errs <- err + }(err) + } + + if !gotFrame { + break + } + + if audioFrame == nil { + continue + } + + offset, err := audioFrame.PresentationOffset() + fmt.Println("audio frame offset:", offset, err) + + // Turn the raw byte data into + // audio samples of type [2]float64. + reader := bytes.NewReader(audioFrame.Data()) + + // See the README.md file for + // detailed scheme of the sample structure. + for reader.Len() > 0 { + sample := [2]float64{0, 0} + var result float64 + err = binary.Read(reader, binary.LittleEndian, &result) + + if err != nil { + go func(err error) { + errs <- err + }(err) + } + + sample[0] = result + + err = binary.Read(reader, binary.LittleEndian, &result) + + if err != nil { + go func(err error) { + errs <- err + }(err) + } + + sample[1] = result + sampleBuffer <- sample + } + } + } + + videoStream.Close() + audioStream.Close() + media.CloseDecode() + close(frameBuffer) + close(sampleBuffer) + close(errs) + }() + + return frameBuffer, sampleBuffer, errs, nil +} + +// streamSamples creates a new custom streamer for +// playing audio samples provided by the source channel. +// +// See https://github.com/faiface/beep/wiki/Making-own-streamers +// for reference. +func streamSamples(sampleSource <-chan [2]float64) beep.Streamer { + return beep.StreamerFunc(func(samples [][2]float64) (n int, ok bool) { + numRead := 0 + + for i := 0; i < len(samples); i++ { + sample, ok := <-sampleSource + + if !ok { + numRead = i + 1 + break + } + + samples[i] = sample + numRead++ + } + + if numRead < len(samples) { + return numRead, false + } + + return numRead, true + }) +} + +// Game holds all the data +// necessary for playing video. +type Game struct { + videoSprite *ebiten.Image + ticker <-chan time.Time + errs <-chan error + frameBuffer <-chan *image.RGBA + fps int + videoTotalFramesPlayed int + videoPlaybackFPS int + perSecond <-chan time.Time + last time.Time + deltaTime float64 +} + +// Strarts reading samples and frames +// of the media file. +func (game *Game) Start(fname string) error { + // For RTMP stream reading. + err := reisen.NetworkInitialize() + handleError(err) + defer reisen.NetworkDeinitialize() + + // Initialize the audio speaker. + err = speaker.Init(sampleRate, + SpeakerSampleRate.N(time.Second/10)) + + if err != nil { + return err + } + + // Sprite for drawing video frames. + game.videoSprite, err = ebiten.NewImage( + width, height, ebiten.FilterDefault) + + if err != nil { + return err + } + + // Open the media file. + media, err := reisen.NewMedia(fname) + + if err != nil { + return err + } + + // Get the FPS for playing + // video frames. + videoFPS, _ := media.VideoStreams()[0].FrameRate() + + if err != nil { + return err + } + + // SPF for frame ticker. + spf := 1.0 / float64(videoFPS) + frameDuration, err := time. + ParseDuration(fmt.Sprintf("%fs", spf)) + + if err != nil { + return err + } + + // Start decoding streams. + var sampleSource <-chan [2]float64 + game.frameBuffer, sampleSource, + game.errs, err = readVideoAndAudio(media) + + if err != nil { + return err + } + + // Start playing audio samples. + speaker.Play(streamSamples(sampleSource)) + + game.ticker = time.Tick(frameDuration) + + // Setup metrics. + game.last = time.Now() + game.fps = 0 + game.perSecond = time.Tick(time.Second) + game.videoTotalFramesPlayed = 0 + game.videoPlaybackFPS = 0 + + return nil +} + +func (game *Game) Update(screen *ebiten.Image) error { + // Compute dt. + game.deltaTime = time.Since(game.last).Seconds() + game.last = time.Now() + + // Check for incoming errors. + select { + case err, ok := <-game.errs: + if ok { + return err + } + + default: + } + + // Read video frames and draw them. + select { + case <-game.ticker: + frame, ok := <-game.frameBuffer + + if ok { + game.videoSprite.ReplacePixels(frame.Pix) + + game.videoTotalFramesPlayed++ + game.videoPlaybackFPS++ + } + + default: + } + + // Draw the video sprite. + op := &ebiten.DrawImageOptions{} + err := screen.DrawImage(game.videoSprite, op) + + if err != nil { + return err + } + + game.fps++ + + // Update metrics in the window title. + select { + case <-game.perSecond: + ebiten.SetWindowTitle(fmt.Sprintf("%s | FPS: %d | dt: %f | Frames: %d | Video FPS: %d", + "Video", game.fps, game.deltaTime, game.videoTotalFramesPlayed, game.videoPlaybackFPS)) + + game.fps = 0 + game.videoPlaybackFPS = 0 + + default: + } + + return nil +} + +func (game *Game) Layout(a, b int) (int, int) { + return width, height +} + +func main() { + game := &Game{} + err := game.Start("rtmp://127.0.0.1/live/stream") + handleError(err) + + ebiten.SetWindowSize(width, height) + ebiten.SetWindowTitle("Video") + err = ebiten.RunGame(game) + handleError(err) +} + +func handleError(err error) { + if err != nil { + panic(err) + } +} diff --git a/examples/rtmp/nginx.conf b/examples/rtmp/nginx.conf new file mode 100644 index 0000000..bb4c0fc --- /dev/null +++ b/examples/rtmp/nginx.conf @@ -0,0 +1,16 @@ +worker_processes auto; +rtmp_auto_push on; +events {} + +rtmp { + server { + listen 1935; + chunk_size 4096; + allow publish 0.0.0.0; + + application live { + live on; + record off; + } + } +} \ No newline at end of file diff --git a/examples/rtmp/stream.sh b/examples/rtmp/stream.sh new file mode 100755 index 0000000..d44fefb --- /dev/null +++ b/examples/rtmp/stream.sh @@ -0,0 +1 @@ +ffmpeg -re -i video.mp4 -c:v copy -c:a aac -ar 44100 -ac 1 -f flv rtmp://127.0.0.1/live/stream \ No newline at end of file diff --git a/examples/rtmp/video.mp4 b/examples/rtmp/video.mp4 new file mode 100755 index 0000000..d6f6f8a Binary files /dev/null and b/examples/rtmp/video.mp4 differ diff --git a/media.go b/media.go index 3e1a7e0..edb8571 100644 --- a/media.go +++ b/media.go @@ -105,8 +105,6 @@ func (media *Media) FormatMIMEType() string { // from the media container. func (media *Media) findStreams() error { streams := []Stream{} - innerStreams := unsafe.Slice( - media.ctx.streams, media.ctx.nb_streams) status := C.avformat_find_stream_info(media.ctx, nil) if status < 0 { @@ -114,6 +112,9 @@ func (media *Media) findStreams() error { "couldn't find stream information") } + innerStreams := unsafe.Slice( + media.ctx.streams, media.ctx.nb_streams) + for _, innerStream := range innerStreams { codecParams := innerStream.codecpar codec := C.avcodec_find_decoder(codecParams.codec_id) diff --git a/network.go b/network.go new file mode 100644 index 0000000..abc6bad --- /dev/null +++ b/network.go @@ -0,0 +1,25 @@ +package reisen + +// #include +import "C" +import "fmt" + +func NetworkInitialize() error { + code := C.avformat_network_init() + + if code < 0 { + return fmt.Errorf("error occurred: 0x%X", code) + } + + return nil +} + +func NetworkDeinitialize() error { + code := C.avformat_network_deinit() + + if code < 0 { + return fmt.Errorf("error occurred: 0x%X", code) + } + + return nil +}