diff --git a/examples/save-to-disk/ivf-writer.go b/examples/save-to-disk/ivf-writer.go new file mode 100644 index 00000000..7e9dbace --- /dev/null +++ b/examples/save-to-disk/ivf-writer.go @@ -0,0 +1,63 @@ +package main + +import ( + "encoding/binary" + "fmt" + "os" + "time" +) + +type IVFWriter struct { + fd *os.File + time time.Time + count uint64 +} + +func panicWrite(fd *os.File, data []byte) { + if _, err := fd.Write(data); err != nil { + panic(err) + } +} + +func NewIVFWriter(fileName string) (*IVFWriter, error) { + f, err := os.Create(fileName) + if err != nil { + return nil, err + } + + panicWrite(f, []byte("DKIF")) // DKIF + panicWrite(f, []byte{0, 0}) // Version + panicWrite(f, []byte{32, 0}) // Header Size + panicWrite(f, []byte("VP80")) // FOURCC + panicWrite(f, []byte{128, 2}) // Width (640) + panicWrite(f, []byte{224, 1}) // Height (480) + panicWrite(f, []byte{232, 3, 0, 0}) // Framerate numerator + panicWrite(f, []byte{1, 0, 0, 0}) // Framerate denominator + panicWrite(f, []byte{132, 3, 0, 0}) // Frame count + panicWrite(f, []byte{0, 0, 0, 0}) // Unused + + i := &IVFWriter{fd: f} + return i, nil +} + +func (i *IVFWriter) AddBuffer(buffer []byte) { + if len(buffer) == 0 { + fmt.Println("skipping") + return + } + bufferLen := make([]byte, 4) + fmt.Println(len(buffer)) + binary.LittleEndian.PutUint32(bufferLen, uint32(len(buffer))) + + pts := make([]byte, 8) + binary.LittleEndian.PutUint64(pts, i.count) + i.count += 33 + + panicWrite(i.fd, bufferLen) + panicWrite(i.fd, pts) + panicWrite(i.fd, buffer) +} + +func (i *IVFWriter) Close() error { + return i.fd.Close() +} diff --git a/examples/save-to-disk/main.go b/examples/save-to-disk/main.go index a3d16cc9..0bda73f2 100644 --- a/examples/save-to-disk/main.go +++ b/examples/save-to-disk/main.go @@ -5,8 +5,10 @@ import ( "encoding/base64" "fmt" "os" + "sync/atomic" "github.com/pions/webrtc" + "github.com/pions/webrtc/rtp" ) func main() { @@ -30,9 +32,29 @@ func main() { peerConnection := &webrtc.RTCPeerConnection{} // Set a handler for when a new remote track starts, this handler saves buffers to disk as - // an ivf file of the users choosing - peerConnection.Ontrack = func(mediaType webrtc.MediaType, buffers <-chan []byte) { - fmt.Println("We track was discovered") + // an ivf file, since we could have multiple video tracks we provide a counter + var trackCount uint64 + peerConnection.Ontrack = func(mediaType webrtc.MediaType, packets chan *rtp.Packet) { + go func() { + track := atomic.AddUint64(&trackCount, 1) + fmt.Printf("Track %d has started \n", track) + + i, err := NewIVFWriter(fmt.Sprintf("output-%d.ivf", track)) + if err != nil { + panic(err) + } + + currentFrame := []byte{} + for { + packet := <-packets + currentFrame = append(currentFrame, packet.Payload[4:]...) + + if packet.Marker { + i.AddBuffer(currentFrame) + currentFrame = nil + } + } + }() } // Set the remote SessionDescription diff --git a/internal/network/network.go b/internal/network/network.go index a33359bd..59e38052 100644 --- a/internal/network/network.go +++ b/internal/network/network.go @@ -7,16 +7,18 @@ import ( "github.com/pions/pkg/stun" "github.com/pions/webrtc/internal/dtls" - "github.com/pions/webrtc/internal/rtp" "github.com/pions/webrtc/internal/srtp" + "github.com/pions/webrtc/rtp" "golang.org/x/net/ipv4" ) -func packetHandler(conn *ipv4.PacketConn, srcString string, remoteKey []byte, tlscfg *dtls.TLSCfg) { +func packetHandler(conn *ipv4.PacketConn, srcString string, remoteKey []byte, tlscfg *dtls.TLSCfg, b BufferTransportGenerator) { const MTU = 8192 buffer := make([]byte, MTU) dtlsStates := make(map[string]*dtls.DTLSState) + bufferTransports := make(map[uint32]chan *rtp.Packet) + var srtpSession *srtp.Session for { n, _, rawDstAddr, _ := conn.ReadFrom(buffer) @@ -55,16 +57,28 @@ func packetHandler(conn *ipv4.PacketConn, srcString string, remoteKey []byte, tl fmt.Println("Failed to decrypt packet") continue } + // addr := &net.UDPAddr{ + // Port: 5000, + // IP: net.ParseIP("127.0.0.1"), + // } + // conn.WriteTo(unencrypted, nil, addr) - packet := rtp.Packet{} + packet := &rtp.Packet{} if err := packet.Unmarshal(unencrypted); err != nil { fmt.Println("Failed to unmarshal RTP packet") continue } - fmt.Printf("<- RTP SSRC: %d, SN: %d, TS: %d, PT: %d\n", packet.SSRC, packet.SequenceNumber, - packet.Timestamp, packet.PayloadType) - + bufferTransport := bufferTransports[packet.SSRC] + if bufferTransport == nil { + bufferTransport = b(packet.SSRC) + if bufferTransport == nil { + fmt.Println("Failed to generate buffer transport, onTrack should be defined") + continue + } + bufferTransports[packet.SSRC] = bufferTransport + } + bufferTransport <- packet } else { fmt.Println("SRTP packet, but no srtpSession") } @@ -82,7 +96,9 @@ func packetHandler(conn *ipv4.PacketConn, srcString string, remoteKey []byte, tl } } -func UdpListener(ip string, remoteKey []byte, tlscfg *dtls.TLSCfg) (int, error) { +type BufferTransportGenerator func(uint32) chan *rtp.Packet + +func UdpListener(ip string, remoteKey []byte, tlscfg *dtls.TLSCfg, b BufferTransportGenerator) (int, error) { listener, err := net.ListenPacket("udp4", ip+":0") if err != nil { return 0, err @@ -99,6 +115,6 @@ func UdpListener(ip string, remoteKey []byte, tlscfg *dtls.TLSCfg) (int, error) srcString := ip + ":" + strconv.Itoa(addr.Port) dtls.AddListener(srcString, conn) - go packetHandler(conn, srcString, remoteKey, tlscfg) + go packetHandler(conn, srcString, remoteKey, tlscfg, b) return addr.Port, err } diff --git a/rtcpeerconnection.go b/rtcpeerconnection.go index f5c5d6f4..edecc7ff 100644 --- a/rtcpeerconnection.go +++ b/rtcpeerconnection.go @@ -4,14 +4,18 @@ import ( "fmt" "math/rand" + "github.com/pions/pkg/stun" "github.com/pions/webrtc/internal/dtls" "github.com/pions/webrtc/internal/ice" "github.com/pions/webrtc/internal/network" "github.com/pions/webrtc/internal/sdp" + "github.com/pions/webrtc/rtp" "github.com/pkg/errors" ) +type ChannelGenerator func(username string, srcAddr *stun.TransportAddr) (password string, ok bool) + type MediaType int const ( @@ -25,7 +29,7 @@ const ( ) type RTCPeerConnection struct { - Ontrack func(mediaType MediaType, buffers <-chan []byte) + Ontrack func(mediaType MediaType, buffers chan *rtp.Packet) LocalDescription *sdp.SessionDescription tlscfg *dtls.TLSCfg @@ -51,7 +55,7 @@ func (r *RTCPeerConnection) CreateOffer() error { candidates := []string{} basePriority := uint16(rand.Uint32() & (1<<16 - 1)) for id, c := range ice.HostInterfaces() { - dstPort, err := network.UdpListener(c, []byte(r.icePassword), r.tlscfg) + dstPort, err := network.UdpListener(c, []byte(r.icePassword), r.tlscfg, r.generateChannel) if err != nil { panic(err) } @@ -69,3 +73,12 @@ func (r *RTCPeerConnection) AddStream(mediaType MediaType) (buffers chan<- []byt } // Private +func (r *RTCPeerConnection) generateChannel(ssrc uint32) (buffers chan *rtp.Packet) { + if r.Ontrack == nil { + return nil + } + + bufferTransport := make(chan *rtp.Packet, 15) + r.Ontrack(VP8, bufferTransport) // TODO look up media via SSRC in remote SD + return bufferTransport +} diff --git a/internal/rtp/rtp_packet.go b/rtp/rtp_packet.go similarity index 100% rename from internal/rtp/rtp_packet.go rename to rtp/rtp_packet.go diff --git a/internal/rtp/unmarshal.go b/rtp/unmarshal.go similarity index 100% rename from internal/rtp/unmarshal.go rename to rtp/unmarshal.go