mirror of
https://github.com/pion/webrtc.git
synced 2025-10-04 23:02:48 +08:00
Add save-to-disk example
Currently the constructed IVF don't work, but everything saves properly. Hopefully off-by-one somewhere
This commit is contained in:
63
examples/save-to-disk/ivf-writer.go
Normal file
63
examples/save-to-disk/ivf-writer.go
Normal file
@@ -0,0 +1,63 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
)
|
||||
|
||||
type IVFWriter struct {
|
||||
fd *os.File
|
||||
time time.Time
|
||||
count uint64
|
||||
}
|
||||
|
||||
func panicWrite(fd *os.File, data []byte) {
|
||||
if _, err := fd.Write(data); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
func NewIVFWriter(fileName string) (*IVFWriter, error) {
|
||||
f, err := os.Create(fileName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
panicWrite(f, []byte("DKIF")) // DKIF
|
||||
panicWrite(f, []byte{0, 0}) // Version
|
||||
panicWrite(f, []byte{32, 0}) // Header Size
|
||||
panicWrite(f, []byte("VP80")) // FOURCC
|
||||
panicWrite(f, []byte{128, 2}) // Width (640)
|
||||
panicWrite(f, []byte{224, 1}) // Height (480)
|
||||
panicWrite(f, []byte{232, 3, 0, 0}) // Framerate numerator
|
||||
panicWrite(f, []byte{1, 0, 0, 0}) // Framerate denominator
|
||||
panicWrite(f, []byte{132, 3, 0, 0}) // Frame count
|
||||
panicWrite(f, []byte{0, 0, 0, 0}) // Unused
|
||||
|
||||
i := &IVFWriter{fd: f}
|
||||
return i, nil
|
||||
}
|
||||
|
||||
func (i *IVFWriter) AddBuffer(buffer []byte) {
|
||||
if len(buffer) == 0 {
|
||||
fmt.Println("skipping")
|
||||
return
|
||||
}
|
||||
bufferLen := make([]byte, 4)
|
||||
fmt.Println(len(buffer))
|
||||
binary.LittleEndian.PutUint32(bufferLen, uint32(len(buffer)))
|
||||
|
||||
pts := make([]byte, 8)
|
||||
binary.LittleEndian.PutUint64(pts, i.count)
|
||||
i.count += 33
|
||||
|
||||
panicWrite(i.fd, bufferLen)
|
||||
panicWrite(i.fd, pts)
|
||||
panicWrite(i.fd, buffer)
|
||||
}
|
||||
|
||||
func (i *IVFWriter) Close() error {
|
||||
return i.fd.Close()
|
||||
}
|
@@ -5,8 +5,10 @@ import (
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"os"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/pions/webrtc"
|
||||
"github.com/pions/webrtc/rtp"
|
||||
)
|
||||
|
||||
func main() {
|
||||
@@ -30,9 +32,29 @@ func main() {
|
||||
peerConnection := &webrtc.RTCPeerConnection{}
|
||||
|
||||
// Set a handler for when a new remote track starts, this handler saves buffers to disk as
|
||||
// an ivf file of the users choosing
|
||||
peerConnection.Ontrack = func(mediaType webrtc.MediaType, buffers <-chan []byte) {
|
||||
fmt.Println("We track was discovered")
|
||||
// an ivf file, since we could have multiple video tracks we provide a counter
|
||||
var trackCount uint64
|
||||
peerConnection.Ontrack = func(mediaType webrtc.MediaType, packets chan *rtp.Packet) {
|
||||
go func() {
|
||||
track := atomic.AddUint64(&trackCount, 1)
|
||||
fmt.Printf("Track %d has started \n", track)
|
||||
|
||||
i, err := NewIVFWriter(fmt.Sprintf("output-%d.ivf", track))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
currentFrame := []byte{}
|
||||
for {
|
||||
packet := <-packets
|
||||
currentFrame = append(currentFrame, packet.Payload[4:]...)
|
||||
|
||||
if packet.Marker {
|
||||
i.AddBuffer(currentFrame)
|
||||
currentFrame = nil
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Set the remote SessionDescription
|
||||
|
@@ -7,16 +7,18 @@ import (
|
||||
|
||||
"github.com/pions/pkg/stun"
|
||||
"github.com/pions/webrtc/internal/dtls"
|
||||
"github.com/pions/webrtc/internal/rtp"
|
||||
"github.com/pions/webrtc/internal/srtp"
|
||||
"github.com/pions/webrtc/rtp"
|
||||
"golang.org/x/net/ipv4"
|
||||
)
|
||||
|
||||
func packetHandler(conn *ipv4.PacketConn, srcString string, remoteKey []byte, tlscfg *dtls.TLSCfg) {
|
||||
func packetHandler(conn *ipv4.PacketConn, srcString string, remoteKey []byte, tlscfg *dtls.TLSCfg, b BufferTransportGenerator) {
|
||||
const MTU = 8192
|
||||
buffer := make([]byte, MTU)
|
||||
|
||||
dtlsStates := make(map[string]*dtls.DTLSState)
|
||||
bufferTransports := make(map[uint32]chan *rtp.Packet)
|
||||
|
||||
var srtpSession *srtp.Session
|
||||
for {
|
||||
n, _, rawDstAddr, _ := conn.ReadFrom(buffer)
|
||||
@@ -55,16 +57,28 @@ func packetHandler(conn *ipv4.PacketConn, srcString string, remoteKey []byte, tl
|
||||
fmt.Println("Failed to decrypt packet")
|
||||
continue
|
||||
}
|
||||
// addr := &net.UDPAddr{
|
||||
// Port: 5000,
|
||||
// IP: net.ParseIP("127.0.0.1"),
|
||||
// }
|
||||
// conn.WriteTo(unencrypted, nil, addr)
|
||||
|
||||
packet := rtp.Packet{}
|
||||
packet := &rtp.Packet{}
|
||||
if err := packet.Unmarshal(unencrypted); err != nil {
|
||||
fmt.Println("Failed to unmarshal RTP packet")
|
||||
continue
|
||||
}
|
||||
|
||||
fmt.Printf("<- RTP SSRC: %d, SN: %d, TS: %d, PT: %d\n", packet.SSRC, packet.SequenceNumber,
|
||||
packet.Timestamp, packet.PayloadType)
|
||||
|
||||
bufferTransport := bufferTransports[packet.SSRC]
|
||||
if bufferTransport == nil {
|
||||
bufferTransport = b(packet.SSRC)
|
||||
if bufferTransport == nil {
|
||||
fmt.Println("Failed to generate buffer transport, onTrack should be defined")
|
||||
continue
|
||||
}
|
||||
bufferTransports[packet.SSRC] = bufferTransport
|
||||
}
|
||||
bufferTransport <- packet
|
||||
} else {
|
||||
fmt.Println("SRTP packet, but no srtpSession")
|
||||
}
|
||||
@@ -82,7 +96,9 @@ func packetHandler(conn *ipv4.PacketConn, srcString string, remoteKey []byte, tl
|
||||
}
|
||||
}
|
||||
|
||||
func UdpListener(ip string, remoteKey []byte, tlscfg *dtls.TLSCfg) (int, error) {
|
||||
type BufferTransportGenerator func(uint32) chan *rtp.Packet
|
||||
|
||||
func UdpListener(ip string, remoteKey []byte, tlscfg *dtls.TLSCfg, b BufferTransportGenerator) (int, error) {
|
||||
listener, err := net.ListenPacket("udp4", ip+":0")
|
||||
if err != nil {
|
||||
return 0, err
|
||||
@@ -99,6 +115,6 @@ func UdpListener(ip string, remoteKey []byte, tlscfg *dtls.TLSCfg) (int, error)
|
||||
srcString := ip + ":" + strconv.Itoa(addr.Port)
|
||||
|
||||
dtls.AddListener(srcString, conn)
|
||||
go packetHandler(conn, srcString, remoteKey, tlscfg)
|
||||
go packetHandler(conn, srcString, remoteKey, tlscfg, b)
|
||||
return addr.Port, err
|
||||
}
|
||||
|
@@ -4,14 +4,18 @@ import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
|
||||
"github.com/pions/pkg/stun"
|
||||
"github.com/pions/webrtc/internal/dtls"
|
||||
"github.com/pions/webrtc/internal/ice"
|
||||
"github.com/pions/webrtc/internal/network"
|
||||
"github.com/pions/webrtc/internal/sdp"
|
||||
"github.com/pions/webrtc/rtp"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type ChannelGenerator func(username string, srcAddr *stun.TransportAddr) (password string, ok bool)
|
||||
|
||||
type MediaType int
|
||||
|
||||
const (
|
||||
@@ -25,7 +29,7 @@ const (
|
||||
)
|
||||
|
||||
type RTCPeerConnection struct {
|
||||
Ontrack func(mediaType MediaType, buffers <-chan []byte)
|
||||
Ontrack func(mediaType MediaType, buffers chan *rtp.Packet)
|
||||
LocalDescription *sdp.SessionDescription
|
||||
|
||||
tlscfg *dtls.TLSCfg
|
||||
@@ -51,7 +55,7 @@ func (r *RTCPeerConnection) CreateOffer() error {
|
||||
candidates := []string{}
|
||||
basePriority := uint16(rand.Uint32() & (1<<16 - 1))
|
||||
for id, c := range ice.HostInterfaces() {
|
||||
dstPort, err := network.UdpListener(c, []byte(r.icePassword), r.tlscfg)
|
||||
dstPort, err := network.UdpListener(c, []byte(r.icePassword), r.tlscfg, r.generateChannel)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
@@ -69,3 +73,12 @@ func (r *RTCPeerConnection) AddStream(mediaType MediaType) (buffers chan<- []byt
|
||||
}
|
||||
|
||||
// Private
|
||||
func (r *RTCPeerConnection) generateChannel(ssrc uint32) (buffers chan *rtp.Packet) {
|
||||
if r.Ontrack == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
bufferTransport := make(chan *rtp.Packet, 15)
|
||||
r.Ontrack(VP8, bufferTransport) // TODO look up media via SSRC in remote SD
|
||||
return bufferTransport
|
||||
}
|
||||
|
Reference in New Issue
Block a user