Refactor code

This commit is contained in:
Kalit Inani
2024-10-24 16:42:30 -04:00
parent b4b599326b
commit 26adcb73ec
9 changed files with 708 additions and 766 deletions

3
.gitignore vendored
View File

@@ -1 +1,2 @@
bin/*
bin/*
*.jpg

Binary file not shown.

Before

Width:  |  Height:  |  Size: 46 KiB

View File

@@ -5,217 +5,8 @@ import (
"log"
"github.com/gorilla/websocket"
"github.com/pion/webrtc/v3"
)
type Message struct {
Type string `json:"type"`
Content string `json:"content"`
}
var (
answerChan = make(chan string) // Global variable for the channel
userPeerConnection *webrtc.PeerConnection = nil
connectionEstablishedChan = make(chan bool)
userVideoTrack *webrtc.TrackLocalStaticSample = nil
)
func createPeerConnection(conn *websocket.Conn) (*webrtc.PeerConnection, *webrtc.TrackLocalStaticSample, error) {
config := webrtc.Configuration{
ICEServers: []webrtc.ICEServer{
{
URLs: []string{
"stun:stun.l.google.com:19302",
"stun:stun1.l.google.com:19302",
"stun:stun2.l.google.com:19302",
},
},
},
}
// Create a new RTCPeerConnection
peerConnection, err := webrtc.NewPeerConnection(config)
if err != nil {
return nil, nil, err
}
peerConnection.OnICECandidate(func(candidate *webrtc.ICECandidate) {
if candidate == nil {
return
}
// Send this candidate to the remote peer
fmt.Println("New ICE candidate:", candidate.ToJSON())
iceCandidateMsg := Message{
Type: "iceCandidate",
Content: candidate.ToJSON().Candidate,
}
conn.WriteJSON(iceCandidateMsg)
})
// Set the handler for ICE connection state
// This will notify you when the peer has connected/disconnected
peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) {
fmt.Printf("Connection State has changed: %s\n", connectionState.String())
switch connectionState {
case webrtc.ICEConnectionStateConnected:
fmt.Println("Successfully connected!")
case webrtc.ICEConnectionStateDisconnected:
fmt.Println("Disconnected!")
case webrtc.ICEConnectionStateFailed:
fmt.Println("Connection failed!")
}
})
videoTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: "video/h264"}, "video", "pion")
if err != nil {
return nil, nil, err
}
// Add the track to the peer connection
_, err = peerConnection.AddTrack(videoTrack)
if err != nil {
return nil, nil, err
}
return peerConnection, videoTrack, nil
}
func openCameraFeed(peerConnection *webrtc.PeerConnection, videoTrack *webrtc.TrackLocalStaticSample) error {
// Handle incoming tracks
peerConnection.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
fmt.Println("Track received:", track.Kind())
fmt.Println("Track Codec:", track.Codec())
fmt.Println("Track Codec MimeType:", track.Codec().MimeType)
go func() {
for {
// Read frames from the track
_, _, err := track.ReadRTP()
if err != nil {
log.Println("Error reading RTP:", err)
return
}
// Handle the frames as needed and render into a video element
// fmt.Println(packet)
}
}()
})
fmt.Println("Writing to tracks")
go writeH264ToTrackAR(videoTrack)
// go writeH264ToTrack(videoTrack)
return nil
}
func establishConnectionWithPeer(conn *websocket.Conn){
peerConnection, videoTrack, err := createPeerConnection(conn)
if err != nil {
panic(err)
}
// Create offer
offer, err := peerConnection.CreateOffer(nil)
if err != nil {
log.Fatal("Failed to create offer:", err)
}
// Set the local description
if err = peerConnection.SetLocalDescription(offer); err != nil {
log.Fatal("Failed to set local description:", err)
}
offerMsg := Message{
Type: "offer",
Content: offer.SDP,
}
// fmt.Println(offerMsg)
conn.WriteJSON(offerMsg)
answer := <-answerChan
fmt.Println("Setting remote description with answer.")
// Set the remote description
answerSDP := webrtc.SessionDescription{
Type: webrtc.SDPTypeAnswer,
SDP: answer,
}
if err := peerConnection.SetRemoteDescription(answerSDP); err != nil {
log.Fatal("Failed to set remote description:", err)
}
userPeerConnection = peerConnection
userVideoTrack = videoTrack
connectionEstablishedChan <- true
}
func handleOffer(conn *websocket.Conn, msg Message){
fmt.Println("Received offer")
peerConnection, videoTrack, err := createPeerConnection(conn)
if err != nil {
panic(err)
}
offerSDP := webrtc.SessionDescription{
Type: webrtc.SDPTypeOffer,
SDP: msg.Content,
}
if err := peerConnection.SetRemoteDescription(offerSDP); err != nil {
log.Fatal("Failed to set remote description:", err)
}
// Create answer
answer, err := peerConnection.CreateAnswer(nil)
if err != nil {
log.Fatal("Failed to create answer:", err)
}
// Set the local description
if err = peerConnection.SetLocalDescription(answer); err != nil {
log.Fatal("Failed to set local description:", err)
}
answerMsg := Message{
Type: "answer",
Content: answer.SDP,
}
// fmt.Println(answerMsg)
conn.WriteJSON(answerMsg)
userPeerConnection = peerConnection
userVideoTrack = videoTrack
connectionEstablishedChan <- true
}
func handleAnswer(msg Message){
answerChan <- msg.Content
}
func addICECandidate(msg Message){
fmt.Println("Received ICE Candidate:", msg.Content)
if (userPeerConnection == nil){
fmt.Println("Peer connection not created yet. Returning...")
return
}
// Create a new ICE candidate from the received content
candidate := webrtc.ICECandidateInit{
Candidate: msg.Content,
}
// Add the ICE candidate to the peer connection
if err := userPeerConnection.AddICECandidate(candidate); err != nil {
log.Println("Failed to add ICE candidate:", err)
return
}
fmt.Println("ICE Candidate added successfully.")
}
func Run() {
// Connect to the WebSocket server
url := "ws://localhost:8080/ws"

122
client/connection.go Normal file
View File

@@ -0,0 +1,122 @@
package client
import (
"fmt"
"log"
"github.com/gorilla/websocket"
"github.com/pion/webrtc/v3"
)
var (
userPeerConnection *webrtc.PeerConnection
userVideoTrack *webrtc.TrackLocalStaticSample
connectionEstablishedChan = make(chan bool)
)
func createPeerConnection(conn *websocket.Conn) (*webrtc.PeerConnection, *webrtc.TrackLocalStaticSample, error) {
/*
Initializes a new WebRTC peer connection
*/
config := webrtc.Configuration{
ICEServers: []webrtc.ICEServer{
{
URLs: []string{
"stun:stun.l.google.com:19302",
"stun:stun1.l.google.com:19302",
"stun:stun2.l.google.com:19302",
},
},
},
}
// Create a new RTCPeerConnection
peerConnection, err := webrtc.NewPeerConnection(config)
if err != nil {
return nil, nil, err
}
peerConnection.OnICECandidate(func(candidate *webrtc.ICECandidate) {
if candidate == nil {
return
}
// Send this candidate to the remote peer
fmt.Println("New ICE candidate:", candidate.ToJSON())
iceCandidateMsg := Message{
Type: "iceCandidate",
Content: candidate.ToJSON().Candidate,
}
conn.WriteJSON(iceCandidateMsg)
})
// Set the handler for ICE connection state
// This will notify you when the peer has connected/disconnected
peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) {
fmt.Printf("Connection State has changed: %s\n", connectionState.String())
switch connectionState {
case webrtc.ICEConnectionStateConnected:
fmt.Println("Successfully connected!")
case webrtc.ICEConnectionStateDisconnected:
fmt.Println("Disconnected!")
case webrtc.ICEConnectionStateFailed:
fmt.Println("Connection failed!")
}
})
videoTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: "video/h264"}, "video", "pion")
if err != nil {
return nil, nil, err
}
// Add the track to the peer connection
_, err = peerConnection.AddTrack(videoTrack)
if err != nil {
return nil, nil, err
}
return peerConnection, videoTrack, nil
}
func establishConnectionWithPeer(conn *websocket.Conn){
peerConnection, videoTrack, err := createPeerConnection(conn)
if err != nil {
panic(err)
}
// Create offer
offer, err := peerConnection.CreateOffer(nil)
if err != nil {
log.Fatal("Failed to create offer: ", err)
}
// Set the local description
if err = peerConnection.SetLocalDescription(offer); err != nil {
log.Fatal("Failed to set local description: ", err)
}
offerMsg := Message{
Type: "offer",
Content: offer.SDP,
}
conn.WriteJSON(offerMsg)
answer := <-answerChan
fmt.Println("Setting remote description with answer.")
// Set the remote description
answerSDP := webrtc.SessionDescription{
Type: webrtc.SDPTypeAnswer,
SDP: answer,
}
if err := peerConnection.SetRemoteDescription(answerSDP); err != nil {
log.Fatal("Failed to set remote description: ", err)
}
userPeerConnection = peerConnection
userVideoTrack = videoTrack
connectionEstablishedChan <- true
}

82
client/messaging.go Normal file
View File

@@ -0,0 +1,82 @@
package client
import (
"fmt"
"log"
"github.com/gorilla/websocket"
"github.com/pion/webrtc/v3"
)
type Message struct {
Type string `json:"type"`
Content string `json:"content"`
}
var (
answerChan = make(chan string) // Global variable for the channel
)
func handleOffer(conn *websocket.Conn, msg Message){
fmt.Println("Received offer")
peerConnection, videoTrack, err := createPeerConnection(conn)
if err != nil {
log.Fatal("Failed to create peer connection: ", err)
}
offerSDP := webrtc.SessionDescription{
Type: webrtc.SDPTypeOffer,
SDP: msg.Content,
}
if err := peerConnection.SetRemoteDescription(offerSDP); err != nil {
log.Fatal("Failed to set remote description: ", err)
}
// Create answer
answer, err := peerConnection.CreateAnswer(nil)
if err != nil {
log.Fatal("Failed to create answer: ", err)
}
// Set the local description
if err = peerConnection.SetLocalDescription(answer); err != nil {
log.Fatal("Failed to set local description: ", err)
}
answerMsg := Message{
Type: "answer",
Content: answer.SDP,
}
conn.WriteJSON(answerMsg)
userPeerConnection = peerConnection
userVideoTrack = videoTrack
connectionEstablishedChan <- true
}
func handleAnswer(msg Message){
answerChan <- msg.Content
}
func addICECandidate(msg Message){
fmt.Println("Received ICE Candidate:", msg.Content)
if (userPeerConnection == nil){
fmt.Println("Peer connection not created yet. Returning...")
return
}
// Create a new ICE candidate from the received content
candidate := webrtc.ICECandidateInit{
Candidate: msg.Content,
}
// Add the ICE candidate to the peer connection
if err := userPeerConnection.AddICECandidate(candidate); err != nil {
fmt.Println("Failed to add ICE candidate:", err)
return
}
fmt.Println("ICE Candidate added successfully.")
}

View File

@@ -0,0 +1,120 @@
package client
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"image"
"image/draw"
"image/jpeg"
"net"
"os"
"github.com/asticode/go-astiav"
)
func processImageFrame(conn net.Conn, frame *astiav.Frame) error {
// Convert frame to RGBA image
width := frame.Width()
height := frame.Height()
img := image.NewRGBA(image.Rect(0, 0, int(width), int(height)))
frame.Data().ToImage(img)
// Encode the RGBA image to bugger
var buf bytes.Buffer
if err := jpeg.Encode(&buf, img, nil); err != nil {
return err
}
// Send the size of the frame data
frameSize := uint32(buf.Len())
if err := binary.Write(conn, binary.BigEndian, frameSize); err != nil {
return fmt.Errorf("failed to send frame size: %w", err)
}
// Send the frame data for processing
_, err := conn.Write(buf.Bytes())
if err != nil {
return fmt.Errorf("failed to send frame data: %w", err)
}
return nil
}
func receiveProcessedImageFrame(conn net.Conn) (*image.Image, error) {
// Read the size of the processed frame
var processedFrameSize uint32
if err := binary.Read(conn, binary.BigEndian, &processedFrameSize); err != nil {
return nil, fmt.Errorf("failed to read processed frame size: %w", err)
}
// Read the processed frame data
processedFrameData := make([]byte, processedFrameSize)
_, err := conn.Read(processedFrameData)
if err != nil {
return nil, fmt.Errorf("failed to read processed frame data: %w", err)
}
// Decode the image buffer to image.Image
reader := bytes.NewReader(processedFrameData)
processed_img, err := jpeg.Decode(reader)
if err != nil {
return nil, fmt.Errorf("failed to decode image: %w", err)
}
return &processed_img, nil
}
func convertImageToFrame(img *image.Image, frame *astiav.Frame) (*astiav.Frame, error) {
imgRGBA := image.NewRGBA((*img).Bounds())
if imgRGBA == nil {
return frame, errors.New("Failed to convert image to RGBA format")
}
draw.Draw(imgRGBA, imgRGBA.Bounds(), (*img), (*img).Bounds().Min, draw.Over)
processedFrame := frame.Clone()
if err := processedFrame.MakeWritable(); err != nil {
return frame, fmt.Errorf("main: making frame writable failed: %w", err)
}
if err := processedFrame.Data().FromImage(imgRGBA); err != nil {
return frame, fmt.Errorf("converting processed image to frame failed: %w", err)
}
return processedFrame, nil
}
func dumpImageToFile(filename string, img *image.Image) error {
file, err := os.Create(filename)
if err != nil {
return err
}
defer file.Close()
err = jpeg.Encode(file, *img, nil)
if err != nil {
return fmt.Errorf("failed to encode image: %w", err)
}
return nil
}
func OverlayARFilter(conn net.Conn, frame *astiav.Frame) (*astiav.Frame, error) {
if err := processImageFrame(conn, frame); err != nil {
return frame, err
}
processed_image, err := receiveProcessedImageFrame(conn)
if err != nil {
return frame, err
}
processed_frame, err := convertImageToFrame(processed_image, frame)
if err != nil {
return frame, err
}
dumpImageToFile("processed.jpg", processed_image)
return processed_frame, nil
}

266
client/video_processor.go Normal file
View File

@@ -0,0 +1,266 @@
package client
import (
"errors"
"fmt"
"log"
"strconv"
"time"
"github.com/asticode/go-astiav"
)
type VideoProcessor struct {
inputFormatContext *astiav.FormatContext
videoStream *astiav.Stream
decodeCodecContext *astiav.CodecContext
decodePacket *astiav.Packet
decodeFrame *astiav.Frame
encodeCodecContext *astiav.CodecContext
encodePacket *astiav.Packet
convertToRGBAContext *astiav.SoftwareScaleContext
convertToYUV420PContext *astiav.SoftwareScaleContext
rgbaFrame *astiav.Frame
yuv420PFrame *astiav.Frame
filterGraph *astiav.FilterGraph
filterFrame *astiav.Frame
buffersinkContext *astiav.BuffersinkFilterContext
buffersrcContext *astiav.BuffersrcFilterContext
arFilterFrame *astiav.Frame
pts int64
}
const h264FrameDuration = time.Millisecond * 20
func NewVideoProcessor() *VideoProcessor {
vp := &VideoProcessor{}
astiav.RegisterAllDevices()
if err := vp.initTestSrc(); err != nil{
log.Fatal("Failed to initialize source: ", err)
}
if err := vp.initVideoEncoding(); err != nil{
log.Fatal("Failed to initialize video encoding: ", err)
}
return vp
}
func (vp *VideoProcessor) initTestSrc() error {
if vp.inputFormatContext = astiav.AllocFormatContext(); vp.inputFormatContext == nil {
return errors.New("Failed to AllocCodecContext")
}
// Open input
if err := vp.inputFormatContext.OpenInput("udp://224.0.0.251:5353", nil, nil); err != nil {
return err
}
// Find stream info
if err := vp.inputFormatContext.FindStreamInfo(nil); err != nil {
return err
}
// Set stream
vp.videoStream = vp.inputFormatContext.Streams()[0]
// Find decoder
decoder := astiav.FindDecoder(vp.videoStream.CodecParameters().CodecID())
if decoder == nil {
return errors.New("FindDecoder returned nil")
}
// Allocate decoding context
if vp.decodeCodecContext = astiav.AllocCodecContext(decoder); vp.decodeCodecContext == nil {
return errors.New("Failed to allocate context for decoder")
}
// Update codec context of video stream
if err := vp.videoStream.CodecParameters().ToCodecContext(vp.decodeCodecContext); err != nil {
return err
}
// Set framerate
vp.decodeCodecContext.SetFramerate(vp.inputFormatContext.GuessFrameRate(vp.videoStream, nil))
// Open decoding codec context
if err := vp.decodeCodecContext.Open(decoder, nil); err != nil {
panic(err)
}
vp.decodePacket = astiav.AllocPacket()
vp.decodeFrame = astiav.AllocFrame()
return nil
}
func (vp *VideoProcessor) initVideoEncoding() error {
if vp.encodeCodecContext != nil {
return nil
}
// Find H264 encoder
h264Encoder := astiav.FindEncoder(astiav.CodecIDH264)
if h264Encoder == nil {
return errors.New("No H264 Encoder Found")
}
// Allocate encoding codec context
if vp.encodeCodecContext = astiav.AllocCodecContext(h264Encoder); vp.encodeCodecContext == nil {
return errors.New("Failed to AllocCodecContext Decoder")
}
// Update encoding codec context
vp.encodeCodecContext.SetPixelFormat(astiav.PixelFormatYuv420P)
vp.encodeCodecContext.SetSampleAspectRatio(vp.decodeCodecContext.SampleAspectRatio())
vp.encodeCodecContext.SetTimeBase(astiav.NewRational(1, 30))
vp.encodeCodecContext.SetWidth(vp.decodeCodecContext.Width())
vp.encodeCodecContext.SetHeight(vp.decodeCodecContext.Height())
// Open encoding codec context
err := vp.encodeCodecContext.Open(h264Encoder, nil);
if err != nil {
return err
}
// create a scale context to convert frames to RGBA format
vp.convertToRGBAContext, err = astiav.CreateSoftwareScaleContext(
vp.decodeCodecContext.Width(),
vp.decodeCodecContext.Height(),
vp.decodeCodecContext.PixelFormat(),
vp.decodeCodecContext.Width(),
vp.decodeCodecContext.Height(),
astiav.PixelFormatRgba,
astiav.NewSoftwareScaleContextFlags(astiav.SoftwareScaleContextFlagBilinear),
)
if err != nil {
return err
}
// create a scale context to convert frames to YUV420P format
vp.convertToYUV420PContext, err = astiav.CreateSoftwareScaleContext(
vp.convertToRGBAContext.DestinationWidth(),
vp.convertToRGBAContext.DestinationHeight(),
vp.convertToRGBAContext.DestinationPixelFormat(),
vp.convertToRGBAContext.SourceWidth(),
vp.convertToRGBAContext.SourceHeight(),
astiav.PixelFormatYuv420P,
vp.convertToRGBAContext.Flags(),
)
if err != nil {
return err
}
vp.rgbaFrame = astiav.AllocFrame()
vp.yuv420PFrame = astiav.AllocFrame()
return nil
}
func (vp *VideoProcessor) initFilters() error {
if vp.filterGraph = astiav.AllocFilterGraph(); vp.filterGraph == nil {
return errors.New("filtergraph could not be created")
}
// Alloc outputs
outputs := astiav.AllocFilterInOut()
if outputs == nil {
return errors.New("main: outputs is nil")
}
// Alloc inputs
inputs := astiav.AllocFilterInOut()
if inputs == nil {
return errors.New("main: inputs is nil")
}
// Create source buffer filters
buffersrc := astiav.FindFilterByName("buffer")
if buffersrc == nil {
return errors.New("buffersrc is nil")
}
// Create sink buffer filters
buffersink := astiav.FindFilterByName("buffersink")
if buffersink == nil {
return errors.New("buffersink is nil")
}
// Create filter contexts
var err error
if vp.buffersrcContext, err = vp.filterGraph.NewBuffersrcFilterContext(
buffersrc,
"in",
astiav.FilterArgs{
"pix_fmt": strconv.Itoa(int(vp.decodeCodecContext.PixelFormat())),
"video_size": strconv.Itoa(vp.decodeCodecContext.Width()) + "x" + strconv.Itoa(vp.decodeCodecContext.Height()),
"time_base": vp.videoStream.TimeBase().String(),
}); err != nil {
return err
}
if vp.buffersinkContext, err = vp.filterGraph.NewBuffersinkFilterContext(
buffersink,
"in",
nil); err != nil {
return fmt.Errorf("main: creating buffersink context failed: %w", err)
}
// Update outputs
outputs.SetName("in")
outputs.SetFilterContext(vp.buffersrcContext.FilterContext())
outputs.SetPadIdx(0)
outputs.SetNext(nil)
// Update inputs
inputs.SetName("out")
inputs.SetFilterContext(vp.buffersinkContext.FilterContext())
inputs.SetPadIdx(0)
inputs.SetNext(nil)
// Link buffersrc and buffersink through the eq filter for brightness
if err := vp.filterGraph.Parse("eq=brightness=0.5", inputs, outputs); err != nil {
return err
}
if err := vp.filterGraph.Configure(); err != nil {
return fmt.Errorf("main: configuring filter failed: %w", err)
}
// Allocate frame to store the filtered contents
vp.filterFrame = astiav.AllocFrame()
return nil
}
func (vp *VideoProcessor) freeVideoCoding() {
vp.inputFormatContext.CloseInput()
vp.inputFormatContext.Free()
vp.decodeCodecContext.Free()
vp.decodePacket.Free()
vp.decodeFrame.Free()
vp.encodeCodecContext.Free()
vp.encodePacket.Free()
vp.convertToRGBAContext.Free()
vp.convertToYUV420PContext.Free()
vp.rgbaFrame.Free()
vp.yuv420PFrame.Free()
vp.buffersrcContext.FilterContext().Free()
vp.buffersinkContext.FilterContext().Free()
vp.filterFrame.Free()
vp.filterGraph.Free()
}

116
client/video_stream.go Normal file
View File

@@ -0,0 +1,116 @@
package client
import (
"errors"
"fmt"
"log"
"net"
"time"
"github.com/asticode/go-astiav"
"github.com/pion/webrtc/v3"
"github.com/pion/webrtc/v3/pkg/media"
)
func openCameraFeed(peerConnection *webrtc.PeerConnection, videoTrack *webrtc.TrackLocalStaticSample) error {
// Handle incoming tracks
peerConnection.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
fmt.Println("Track received:", track.Kind())
fmt.Println("Track Codec:", track.Codec())
fmt.Println("Track Codec MimeType:", track.Codec().MimeType)
go func() {
for {
// Read frames from the track
_, _, err := track.ReadRTP()
if err != nil {
log.Println("Error reading RTP:", err)
return
}
// Handle the frames as needed and render into a video element
// fmt.Println(packet)
}
}()
})
fmt.Println("Writing to tracks")
vp := NewVideoProcessor()
go vp.writeH264ToTrackAR(videoTrack)
return nil
}
func (vp *VideoProcessor) writeH264ToTrackAR(track *webrtc.TrackLocalStaticSample) {
defer vp.freeVideoCoding()
conn, err := net.Dial("tcp", "localhost:5005")
if err != nil {
panic(err)
}
defer conn.Close()
ticker := time.NewTicker(h264FrameDuration)
for ; true; <-ticker.C {
if err = vp.inputFormatContext.ReadFrame(vp.decodePacket); err != nil {
if errors.Is(err, astiav.ErrEof) {
break
}
panic(err)
}
vp.decodePacket.RescaleTs(vp.videoStream.TimeBase(), vp.decodeCodecContext.TimeBase())
if err = vp.decodeCodecContext.SendPacket(vp.decodePacket); err != nil {
panic(err)
}
for {
if err = vp.decodeCodecContext.ReceiveFrame(vp.decodeFrame); err != nil {
if errors.Is(err, astiav.ErrEof) || errors.Is(err, astiav.ErrEagain) {
fmt.Println("Error while receiving decoded framed: ", err)
break
}
panic(err)
}
if err = vp.convertToRGBAContext.ScaleFrame(vp.decodeFrame, vp.rgbaFrame); err != nil {
panic(err)
}
vp.pts++
vp.rgbaFrame.SetPts(vp.pts)
vp.arFilterFrame, err = OverlayARFilter(conn, vp.rgbaFrame)
if err != nil {
fmt.Println("Failed to add AR filter to frame: ", err)
}
if err = vp.convertToYUV420PContext.ScaleFrame(vp.arFilterFrame, vp.yuv420PFrame); err != nil {
panic(err)
}
vp.pts++
vp.yuv420PFrame.SetPts(vp.pts)
if err = vp.encodeCodecContext.SendFrame(vp.yuv420PFrame); err != nil {
panic(err)
}
for {
// Read encoded packets
vp.encodePacket = astiav.AllocPacket()
if err = vp.encodeCodecContext.ReceivePacket(vp.encodePacket); err != nil {
if errors.Is(err, astiav.ErrEof) || errors.Is(err, astiav.ErrEagain) {
break
}
panic(err)
}
// Write H264 to track
if err = track.WriteSample(media.Sample{Data: vp.encodePacket.Data(), Duration: h264FrameDuration}); err != nil {
panic(err)
}
}
}
}
}

View File

@@ -1,556 +0,0 @@
package client
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"image"
"image/draw"
"image/jpeg"
"net"
"os"
"strconv"
"time"
"github.com/asticode/go-astiav"
"github.com/pion/webrtc/v3"
"github.com/pion/webrtc/v3/pkg/media"
)
// nolint: gochecknoglobals
var (
inputFormatContext *astiav.FormatContext
decodeCodecContext *astiav.CodecContext
decodePacket *astiav.Packet
decodeFrame *astiav.Frame
videoStream *astiav.Stream
encodeCodecContext *astiav.CodecContext
encodePacket *astiav.Packet
softwareScaleContext *astiav.SoftwareScaleContext
scaledFrame *astiav.Frame
softwareScaleContext2 *astiav.SoftwareScaleContext
scaledFrame2 *astiav.Frame
processedFrame *astiav.Frame
filterFrame *astiav.Frame
filterGraph *astiav.FilterGraph
brightnessFilter *astiav.FilterContext
buffersinkCtx *astiav.FilterContext
buffersrcCtx *astiav.FilterContext
pts int64
err error
)
const h264FrameDuration = time.Millisecond * 20
func writeH264ToTrackAR(track *webrtc.TrackLocalStaticSample) {
astiav.RegisterAllDevices()
initTestSrc()
initFilters()
defer freeVideoCoding()
conn, err := net.Dial("tcp", "localhost:5005")
if err != nil {
panic(err)
}
defer conn.Close()
ticker := time.NewTicker(h264FrameDuration)
for ; true; <-ticker.C {
if err = inputFormatContext.ReadFrame(decodePacket); err != nil {
if errors.Is(err, astiav.ErrEof) {
break
}
panic(err)
}
decodePacket.RescaleTs(videoStream.TimeBase(), decodeCodecContext.TimeBase())
if err = decodeCodecContext.SendPacket(decodePacket); err != nil {
panic(err)
}
for {
if err = decodeCodecContext.ReceiveFrame(decodeFrame); err != nil {
if errors.Is(err, astiav.ErrEof) || errors.Is(err, astiav.ErrEagain) {
fmt.Println("Error while receiving decoded framed: ", err)
break
}
panic(err)
}
initVideoEncoding()
// Scale the video
if err = softwareScaleContext.ScaleFrame(decodeFrame, scaledFrame); err != nil {
panic(err)
}
// We don't care about the PTS, but encoder complains if unset
pts++
scaledFrame.SetPts(pts)
processedFrame, err = sendFrameToPython(conn, scaledFrame)
if(err != nil){
fmt.Println("ERROR WHILE SENDING FRAME TO PYTHON: ", err)
}
if err = softwareScaleContext2.ScaleFrame(processedFrame, scaledFrame2); err != nil {
panic(err)
}
// We don't care about the PTS, but encoder complains if unset
pts++
scaledFrame2.SetPts(pts)
// fmt.Println("scaledFrame2 PIXEL FMT: ", scaledFrame2.PixelFormat())
// Encode the frame
if err = encodeCodecContext.SendFrame(scaledFrame2); err != nil {
panic(err)
}
// fmt.Println("writeH264ToTrackAR: H2")
for {
// fmt.Println("writeH264ToTrackAR: H4")
// Read encoded packets and write to file
encodePacket = astiav.AllocPacket()
if err = encodeCodecContext.ReceivePacket(encodePacket); err != nil {
if errors.Is(err, astiav.ErrEof) || errors.Is(err, astiav.ErrEagain) {
break
}
panic(err)
}
// fmt.Println("writeH264ToTrackAR: H5")
// Write H264 to track
if err = track.WriteSample(media.Sample{Data: encodePacket.Data(), Duration: h264FrameDuration}); err != nil {
panic(err)
}
// fmt.Println("writeH264ToTrackAR: H")
}
}
}
}
func sendFrameToPython(conn net.Conn, frame *astiav.Frame) (*astiav.Frame, error) {
// fmt.Println("sendFrameToPython: H1")
width := frame.Width()
height := frame.Height()
fmt.Println("FRAME WIDTH: ", width)
fmt.Println("FRAME HEIGHT: ", height)
fmt.Println("FRAME LINESIZE: ")
lineSize2 := frame.Linesize()
for i := 0; i < len(lineSize2); i++ {
fmt.Println(lineSize2[i])
}
img := image.NewRGBA(image.Rect(0, 0, int(width), int(height)))
// fmt.Println("sendFrameToPython: H2")
frame.Data().ToImage(img)
// fmt.Println("sendFrameToPython: H3")
var buf bytes.Buffer
if err := jpeg.Encode(&buf, img, nil); err != nil {
return frame, err
}
// fmt.Println("sendFrameToPython: H4")
// Send the size of the frame data
frameSize := uint32(buf.Len())
if err := binary.Write(conn, binary.BigEndian, frameSize); err != nil {
return frame, fmt.Errorf("failed to send frame size: %w", err)
}
// fmt.Println("sendFrameToPython: H5")
// Send the frame data
_, err = conn.Write(buf.Bytes())
if err != nil {
return frame, fmt.Errorf("failed to send frame data: %w", err)
}
// fmt.Println("sendFrameToPython: H6")
// Read the size of the processed frame
var processedFrameSize uint32
if err := binary.Read(conn, binary.BigEndian, &processedFrameSize); err != nil {
return frame, fmt.Errorf("failed to read processed frame size: %w", err)
}
// fmt.Println("sendFrameToPython: H7")
// Read the processed frame data
processedFrameData := make([]byte, processedFrameSize)
_, err = conn.Read(processedFrameData)
if err != nil {
return frame, fmt.Errorf("failed to read processed frame data: %w", err)
}
// fmt.Println("sendFrameToPython: H8")
reader := bytes.NewReader(processedFrameData)
// Decode the JPEG image from the reader
processed_img, err := jpeg.Decode(reader)
if err != nil {
fmt.Println("DECODING FAILED: ", err)
return frame, fmt.Errorf("failed to decode image: %w", err)
}
rgba_img := image.NewRGBA(processed_img.Bounds())
draw.Draw(rgba_img, rgba_img.Bounds(), processed_img, processed_img.Bounds().Min, draw.Over)
fmt.Println("HEIGHT: ", rgba_img.Bounds().Dy())
fmt.Println("WIDTH: ", rgba_img.Bounds().Dx())
processedFrame = frame.Clone()
// processedFrame.SetHeight(processed_img.Bounds().Dy())
// processedFrame.SetWidth(processed_img.Bounds().Dx())
// processedFrame.SetPixelFormat(astiav.PixelFormatRgba)
fmt.Println("AFTER SETTING")
fmt.Println("HEIGHT: ", processedFrame.Height())
fmt.Println("WIDTH: ", processedFrame.Width())
fmt.Println("PIXEL FMT: ", processedFrame.PixelFormat())
// fmt.Println("sendFrameToPython: H10")
// align := 0
// if err := processedFrame.AllocBuffer(align); err != nil {
// return frame, fmt.Errorf("main: allocating buffer failed: %w", err)
// }
// fmt.Println("sendFrameToPython: H11")
// // Alloc image
// if err := processedFrame.AllocImage(align); err != nil {
// return frame, fmt.Errorf("main: allocating image failed: %w", err)
// }
// fmt.Println("sendFrameToPython: H12")
// if err := processedFrame.MakeWritable(); err != nil {
// return frame, fmt.Errorf("main: making frame writable failed: %w", err)
// }
// iswritable := processedFrame.IsWritable()
// fmt.Println("IS WRITABLE: ", iswritable)
// // processedFrame.
fmt.Println("FRAME ISWRITABLE?: ", processedFrame.IsWritable())
fmt.Println("sendFrameToPython: H12")
if err := processedFrame.MakeWritable(); err != nil {
return frame, fmt.Errorf("main: making frame writable failed: %w", err)
}
fmt.Println("LINESIZE: ")
lineSize := processedFrame.Linesize()
for i := 0; i < len(lineSize); i++ {
fmt.Println(lineSize[i])
}
fmt.Println("sendFrameToPython: H13")
if err := processedFrame.Data().FromImage(rgba_img); err != nil {
return frame, fmt.Errorf("converting processed image to frame failed: %w", err)
}
fmt.Println("sendFrameToPython: H14")
filename := "processed.jpg"
file, err := os.Create(filename)
if err != nil {
return frame, fmt.Errorf("failed to decode image: %w", err)
}
defer file.Close()
fmt.Println("sendFrameToPython: H15")
err = jpeg.Encode(file, processed_img, nil)
if err != nil {
return frame, fmt.Errorf("failed to decode image: %w", err)
}
fmt.Println("sendFrameToPython: H16")
return processedFrame, nil
}
/*
func writeH264ToTrack(track *webrtc.TrackLocalStaticSample) {
This function continuously reads video frames from a specified input, decodes them,
scales them, encodes them back into H.264 format, and writes the samples to a WebRTC track.
astiav.RegisterAllDevices()
initTestSrc()
initFilters()
defer freeVideoCoding()
ticker := time.NewTicker(h264FrameDuration)
for ; true; <-ticker.C {
if err = inputFormatContext.ReadFrame(decodePacket); err != nil {
if errors.Is(err, astiav.ErrEof) {
break
}
panic(err)
}
decodePacket.RescaleTs(videoStream.TimeBase(), decodeCodecContext.TimeBase())
// Send the packet
if err = decodeCodecContext.SendPacket(decodePacket); err != nil {
panic(err)
}
for {
// Read Decoded Frame
if err = decodeCodecContext.ReceiveFrame(decodeFrame); err != nil {
if errors.Is(err, astiav.ErrEof) || errors.Is(err, astiav.ErrEagain) {
fmt.Println("In error block")
break
}
panic(err)
}
initVideoEncoding()
fmt.Println("Pixel format of decoded frame: ", decodeFrame.PixelFormat());
if err = buffersrcCtx.BuffersrcAddFrame(decodeFrame, astiav.NewBuffersrcFlags(astiav.BuffersrcFlagKeepRef)); err != nil {
err = fmt.Errorf("main: adding frame failed: %w", err)
return
}
for{
filterFrame.Unref()
if err = buffersinkCtx.BuffersinkGetFrame(filterFrame, astiav.NewBuffersinkFlags()); err != nil {
if errors.Is(err, astiav.ErrEof) || errors.Is(err, astiav.ErrEagain) {
break
}
panic(err)
}
pts++
filterFrame.SetPts(pts)
// Encode the frame
if err = encodeCodecContext.SendFrame(filterFrame); err != nil {
panic(err)
}
for {
// Read encoded packets and write to file
encodePacket = astiav.AllocPacket()
if err = encodeCodecContext.ReceivePacket(encodePacket); err != nil {
if errors.Is(err, astiav.ErrEof) || errors.Is(err, astiav.ErrEagain) {
break
}
panic(err)
}
// Write H264 to track
if err = track.WriteSample(media.Sample{Data: encodePacket.Data(), Duration: h264FrameDuration}); err != nil {
panic(err)
}
}
}
}
}
}
*/
func initTestSrc() {
if inputFormatContext = astiav.AllocFormatContext(); inputFormatContext == nil {
panic("Failed to AllocCodecContext")
}
// Open input
if err = inputFormatContext.OpenInput("udp://224.0.0.251:5353", nil, nil); err != nil {
panic(err)
}
// Find stream info
if err = inputFormatContext.FindStreamInfo(nil); err != nil {
panic(err)
}
videoStream = inputFormatContext.Streams()[0]
// Find decoder
decodeCodec := astiav.FindDecoder(videoStream.CodecParameters().CodecID())
if decodeCodec == nil {
panic("FindDecoder returned nil")
}
// Find decoder
if decodeCodecContext = astiav.AllocCodecContext(decodeCodec); decodeCodecContext == nil {
panic(err)
}
// Update codec context
if err = videoStream.CodecParameters().ToCodecContext(decodeCodecContext); err != nil {
panic(err)
}
// Set framerate
decodeCodecContext.SetFramerate(inputFormatContext.GuessFrameRate(videoStream, nil))
// Open codec context
if err = decodeCodecContext.Open(decodeCodec, nil); err != nil {
panic(err)
}
fmt.Println("decodeCodecContext.PixelFormat().Name(): ", decodeCodecContext.PixelFormat().Name())
decodePacket = astiav.AllocPacket()
decodeFrame = astiav.AllocFrame()
}
func initVideoEncoding() {
if encodeCodecContext != nil {
return
}
// Find encoder
h264Encoder := astiav.FindEncoder(astiav.CodecIDH264)
if h264Encoder == nil {
panic("No H264 Encoder Found")
}
// Alloc codec context
if encodeCodecContext = astiav.AllocCodecContext(h264Encoder); encodeCodecContext == nil {
panic("Failed to AllocCodecContext Decoder")
}
// Update codec context
encodeCodecContext.SetPixelFormat(astiav.PixelFormatYuv420P)
encodeCodecContext.SetSampleAspectRatio(decodeCodecContext.SampleAspectRatio())
encodeCodecContext.SetTimeBase(astiav.NewRational(1, 30))
encodeCodecContext.SetWidth(decodeCodecContext.Width())
encodeCodecContext.SetHeight(decodeCodecContext.Height())
// Open codec context
if err = encodeCodecContext.Open(h264Encoder, nil); err != nil {
panic(err)
}
softwareScaleContext, err = astiav.CreateSoftwareScaleContext(
decodeCodecContext.Width(),
decodeCodecContext.Height(),
decodeCodecContext.PixelFormat(),
decodeCodecContext.Width(),
decodeCodecContext.Height(),
astiav.PixelFormatRgba,
astiav.NewSoftwareScaleContextFlags(astiav.SoftwareScaleContextFlagBilinear),
)
if err != nil {
panic(err)
}
scaledFrame = astiav.AllocFrame()
softwareScaleContext2, err = astiav.CreateSoftwareScaleContext(
softwareScaleContext.DestinationWidth(),
softwareScaleContext.DestinationHeight(),
softwareScaleContext.DestinationPixelFormat(),
softwareScaleContext.SourceWidth(),
softwareScaleContext.SourceHeight(),
astiav.PixelFormatYuv420P,
softwareScaleContext.Flags(),
)
if err != nil {
panic(err)
}
scaledFrame2 = astiav.AllocFrame()
}
func initFilters() {
filterGraph = astiav.AllocFilterGraph()
if filterGraph == nil {
panic("filtergraph could not be created")
}
// Alloc outputs
outputs := astiav.AllocFilterInOut()
if outputs == nil {
err = errors.New("main: outputs is nil")
return
}
// Alloc inputs
inputs := astiav.AllocFilterInOut()
if inputs == nil {
err = errors.New("main: inputs is nil")
return
}
// Create buffersrc and buffersink filter contexts
buffersrc := astiav.FindFilterByName("buffer")
if buffersrc == nil {
panic("buffersrc is nil")
}
buffersink := astiav.FindFilterByName("buffersink")
if buffersink == nil {
panic("buffersink is nil")
}
fmt.Println("decodeCodecContext.PixelFormat(): ", decodeCodecContext.PixelFormat().Name())
var err error
if buffersrcCtx, err = filterGraph.NewFilterContext(buffersrc, "in", astiav.FilterArgs{
"pix_fmt": strconv.Itoa(int(decodeCodecContext.PixelFormat())),
"video_size": strconv.Itoa(decodeCodecContext.Width()) + "x" + strconv.Itoa(decodeCodecContext.Height()),
"time_base": videoStream.TimeBase().String(),
}); err != nil {
panic(err)
}
if buffersinkCtx, err = filterGraph.NewFilterContext(buffersink, "in", nil); err != nil {
err = fmt.Errorf("main: creating buffersink context failed: %w", err)
return
}
// Update outputs
outputs.SetName("in")
outputs.SetFilterContext(buffersrcCtx)
outputs.SetPadIdx(0)
outputs.SetNext(nil)
// Update inputs
inputs.SetName("out")
inputs.SetFilterContext(buffersinkCtx)
inputs.SetPadIdx(0)
inputs.SetNext(nil)
// Link buffersrc and buffersink through the eq filter for brightness
if err = filterGraph.Parse("eq=brightness=0.5", inputs, outputs); err != nil {
panic(err)
}
if err = filterGraph.Configure(); err != nil {
err = fmt.Errorf("main: configuring filter failed: %w", err)
return
}
filterFrame = astiav.AllocFrame()
}
func freeVideoCoding() {
inputFormatContext.CloseInput()
inputFormatContext.Free()
decodeCodecContext.Free()
decodePacket.Free()
decodeFrame.Free()
encodeCodecContext.Free()
encodePacket.Free()
}