diff --git a/examples/gstreamer-receive/gst/gst.c b/examples/gstreamer-receive/gst/gst.c index 6c29758d..4539af60 100644 --- a/examples/gstreamer-receive/gst/gst.c +++ b/examples/gstreamer-receive/gst/gst.c @@ -43,7 +43,6 @@ GstElement *gstreamer_recieve_create_pipeline(char *pipeline) { } void gstreamer_recieve_start_pipeline(GstElement *pipeline) { - GstElement *source, *demuxer, *decoder, *conv, *sink; GstBus *bus; guint bus_watch_id; diff --git a/examples/gstreamer-send/gst/gst.c b/examples/gstreamer-send/gst/gst.c index 552ab148..08de302f 100644 --- a/examples/gstreamer-send/gst/gst.c +++ b/examples/gstreamer-send/gst/gst.c @@ -2,8 +2,6 @@ #include -#define PIPELINE "videotestsrc ! vp8enc ! appsink name=appsink" - static gboolean gstreamer_send_bus_call(GstBus *bus, GstMessage *msg, gpointer data) { GMainLoop *loop = (GMainLoop *)data; @@ -52,10 +50,10 @@ GstFlowReturn gstreamer_send_new_sample_handler(GstElement *object, gpointer use return GST_FLOW_OK; } -GstElement *gstreamer_send_create_pipeline() { +GstElement *gstreamer_send_create_pipeline(char *pipeline) { gst_init(NULL, NULL); GError *error = NULL; - return gst_parse_launch(PIPELINE, &error); + return gst_parse_launch(pipeline, &error); } void gstreamer_send_start_pipeline(GstElement *pipeline) { diff --git a/examples/gstreamer-send/gst/gst.go b/examples/gstreamer-send/gst/gst.go index 9ef8d6c6..738dd05b 100644 --- a/examples/gstreamer-send/gst/gst.go +++ b/examples/gstreamer-send/gst/gst.go @@ -9,28 +9,46 @@ package gst import "C" import ( "fmt" + "github.com/pions/webrtc" "unsafe" ) // Pipeline is a wrapper for a GStreamer Pipeline type Pipeline struct { Pipeline *C.GstElement - in chan<- []byte + in chan<- webrtc.RTCSample + samples uint32 } -// This allows cgo to access pipeline, this will not work if you want multiple -var globalPipeline *Pipeline - // CreatePipeline creates a GStreamer Pipeline -func CreatePipeline(in chan<- []byte) *Pipeline { +func CreatePipeline(codec webrtc.TrackType, in chan<- webrtc.RTCSample) *Pipeline { + pipelineStr := "appsink name=appsink" + var samples uint32 + switch codec { + case webrtc.VP8: + pipelineStr = "videotestsrc ! vp8enc ! " + pipelineStr + case webrtc.VP9: + pipelineStr = "videotestsrc ! vp9enc ! " + pipelineStr + case webrtc.Opus: + pipelineStr = "audiotestsrc ! opusenc ! " + pipelineStr + default: + panic("Unhandled codec " + codec.String()) + } + + pipelineStrUnsafe := C.CString(pipelineStr) + defer C.free(unsafe.Pointer(pipelineStrUnsafe)) globalPipeline = &Pipeline{ - Pipeline: C.gstreamer_send_create_pipeline(), + Pipeline: C.gstreamer_send_create_pipeline(pipelineStrUnsafe), in: in, + samples: samples, } return globalPipeline } +// This allows cgo to access pipeline, this will not work if you want multiple +var globalPipeline *Pipeline + // Start starts the GStreamer Pipeline func (p *Pipeline) Start() { C.gstreamer_send_start_pipeline(p.Pipeline) @@ -42,9 +60,9 @@ func (p *Pipeline) Stop() { } //export goHandlePipelineBuffer -func goHandlePipelineBuffer(buffer unsafe.Pointer, bufferLen C.int) { +func goHandlePipelineBuffer(buffer unsafe.Pointer, bufferLen C.int, samples C.int) { if globalPipeline != nil { - globalPipeline.in <- C.GoBytes(buffer, bufferLen) + globalPipeline.in <- webrtc.RTCSample{C.GoBytes(buffer, bufferLen), samples} } else { fmt.Println("discarding buffer, globalPipeline not set") } diff --git a/examples/gstreamer-send/gst/gst.h b/examples/gstreamer-send/gst/gst.h index ea3050c0..034197ed 100644 --- a/examples/gstreamer-send/gst/gst.h +++ b/examples/gstreamer-send/gst/gst.h @@ -8,7 +8,7 @@ extern void goHandlePipelineBuffer(void *buffer, int bufferLen); -GstElement *gstreamer_send_create_pipeline(); +GstElement *gstreamer_send_create_pipeline(char *pipeline); void gstreamer_send_start_pipeline(GstElement *pipeline); void gstreamer_send_stop_pipeline(GstElement *pipeline); diff --git a/examples/gstreamer-send/main.go b/examples/gstreamer-send/main.go index ca6837f9..116e9d46 100644 --- a/examples/gstreamer-send/main.go +++ b/examples/gstreamer-send/main.go @@ -31,7 +31,7 @@ func main() { peerConnection := &webrtc.RTCPeerConnection{} // Create a video track, and start pushing buffers - in, err := peerConnection.AddTrack(webrtc.VP8) + in, err := peerConnection.AddTrack(webrtc.Opus) if err != nil { panic(err) } @@ -56,6 +56,6 @@ func main() { localDescriptionStr := peerConnection.LocalDescription.Marshal() fmt.Println(base64.StdEncoding.EncodeToString([]byte(localDescriptionStr))) - gst.CreatePipeline(in).Start() + gst.CreatePipeline(webrtc.Opus, in).Start() select {} } diff --git a/pkg/rtp/codecs/opus_packet.go b/pkg/rtp/codecs/opus_packet.go new file mode 100644 index 00000000..41d1dafd --- /dev/null +++ b/pkg/rtp/codecs/opus_packet.go @@ -0,0 +1,22 @@ +package codecs + +import "github.com/pions/webrtc/pkg/rtp" + +// OpusPayloader payloads Opus packets +type OpusPayloader struct{} + +// Payload fragments an Opus packet across one or more byte arrays +func (p *OpusPayloader) Payload(mtu int, payload []byte) [][]byte { + return [][]byte{payload} +} + +// OpusPacket represents the VP8 header that is stored in the payload of an RTP Packet +type OpusPacket struct { + Payload []byte +} + +// Unmarshal parses the passed byte slice and stores the result in the OpusPacket this method is called upon +func (p *OpusPacket) Unmarshal(packet *rtp.Packet) error { + p.Payload = packet.Payload + return nil +} diff --git a/pkg/rtp/packetizer.go b/pkg/rtp/packetizer.go index 057a2152..d1d1ca70 100644 --- a/pkg/rtp/packetizer.go +++ b/pkg/rtp/packetizer.go @@ -7,7 +7,7 @@ type Payloader interface { // Packetizer packetizes a payload type Packetizer interface { - Packetize(payload []byte) []*Packet + Packetize(payload []byte, samples uint32) []*Packet } type packetizer struct { @@ -17,10 +17,11 @@ type packetizer struct { Payloader Payloader Sequencer Sequencer Timestamp uint32 + ClockRate uint32 } // NewPacketizer returns a new instance of a Packetizer for a specific payloader -func NewPacketizer(mtu int, pt uint8, ssrc uint32, payloader Payloader, sequencer Sequencer) Packetizer { +func NewPacketizer(mtu int, pt uint8, ssrc uint32, payloader Payloader, sequencer Sequencer, clockRate uint32) Packetizer { return &packetizer{ mtu, pt, @@ -28,11 +29,12 @@ func NewPacketizer(mtu int, pt uint8, ssrc uint32, payloader Payloader, sequence payloader, sequencer, 0, + clockRate, } } // Packetize packetizes the payload of an RTP packet and returns one or more RTP packets -func (p *packetizer) Packetize(payload []byte) []*Packet { +func (p *packetizer) Packetize(payload []byte, samples uint32) []*Packet { payloads := p.Payloader.Payload(p.MTU-12, payload) packets := make([]*Packet, len(payloads)) @@ -48,7 +50,7 @@ func (p *packetizer) Packetize(payload []byte) []*Packet { SSRC: p.SSRC, Payload: pp, } - p.Timestamp++ + p.Timestamp += samples } return packets diff --git a/rtcpeerconnection.go b/rtcpeerconnection.go index 5b7c0e8e..118ec605 100644 --- a/rtcpeerconnection.go +++ b/rtcpeerconnection.go @@ -21,6 +21,11 @@ func init() { rand.Seed(time.Now().UTC().UnixNano()) } +type RTCSample struct { + Data []byte + Samples uint32 +} + // TrackType determines the type of media we are sending receiving type TrackType int @@ -117,23 +122,33 @@ func (r *RTCPeerConnection) CreateOffer() error { // AddTrack adds a new track to the RTCPeerConnection // This function returns a channel to push buffers on, and an error if the channel can't be added // Closing the channel ends this stream -func (r *RTCPeerConnection) AddTrack(mediaType TrackType) (buffers chan<- []byte, err error) { - if mediaType != VP8 { +func (r *RTCPeerConnection) AddTrack(mediaType TrackType, clockRate uint32) (samples chan<- RTCSample, err error) { + if mediaType != VP8 && mediaType != Opus { panic("TODO Discarding packet, need media parsing") } - trackInput := make(chan []byte, 15) + trackInput := make(chan RTCSample, 15) go func() { ssrc := rand.Uint32() sdpTrack := &sdp.SessionBuilderTrack{SSRC: ssrc} - if mediaType == Opus { + var payloader rtp.Payloader + var payloadType uint8 + switch mediaType { + case Opus: sdpTrack.IsAudio = true + payloader = &codecs.OpusPayloader{} + payloadType = 111 + + case VP8: + payloader = &codecs.VP8Payloader{} + payloadType = 96 } r.localTracks = append(r.localTracks, sdpTrack) - packetizer := rtp.NewPacketizer(1500, 96, ssrc, &codecs.VP8Payloader{}, rtp.NewRandomSequencer()) + packetizer := rtp.NewPacketizer(1500, payloadType, ssrc, payloader, rtp.NewRandomSequencer(), clockRate) for { - packets := packetizer.Packetize(<-trackInput) + in := <-trackInput + packets := packetizer.Packetize(in.Data, in.Samples) for _, p := range packets { for _, port := range r.ports { port.Send(p)