API: Review DataChannel

Resolves #427
This commit is contained in:
backkem
2019-02-21 10:36:59 +01:00
committed by Michiel De Backker
parent bfba81167e
commit f5d11df18d
10 changed files with 99 additions and 218 deletions

View File

@@ -6,7 +6,6 @@ import (
"sync" "sync"
"github.com/pions/datachannel" "github.com/pions/datachannel"
sugar "github.com/pions/webrtc/pkg/datachannel"
"github.com/pions/webrtc/pkg/rtcerr" "github.com/pions/webrtc/pkg/rtcerr"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@@ -86,12 +85,10 @@ type DataChannel struct {
// "blob". This attribute controls how binary data is exposed to scripts. // "blob". This attribute controls how binary data is exposed to scripts.
// binaryType string // binaryType string
// OnOpen func()
// OnBufferedAmountLow func() // OnBufferedAmountLow func()
// OnError func() // OnError func()
// OnClose func()
onMessageHandler func(sugar.Payload) onMessageHandler func(DataChannelMessage)
onOpenHandler func() onOpenHandler func()
onCloseHandler func() onCloseHandler func()
@@ -265,35 +262,36 @@ func (d *DataChannel) onClose() (done chan struct{}) {
return return
} }
// OnMessage sets an event handler which is invoked on a message // DataChannelMessage represents a message received from the
// arrival over the sctp transport from a remote peer. // data channel. IsString will be set to true if the incoming
// message is of the string type. Otherwise the message is of
// a binary type.
type DataChannelMessage struct {
IsString bool
Data []byte
}
// OnMessage sets an event handler which is invoked on a binary
// message arrival over the sctp transport from a remote peer.
// OnMessage can currently receive messages up to 16384 bytes // OnMessage can currently receive messages up to 16384 bytes
// in size. Check out the detach API if you want to use larger // in size. Check out the detach API if you want to use larger
// message sizes. Note that browser support for larger messages // message sizes. Note that browser support for larger messages
// is also limited. // is also limited.
func (d *DataChannel) OnMessage(f func(p sugar.Payload)) { func (d *DataChannel) OnMessage(f func(msg DataChannelMessage)) {
d.mu.Lock() d.mu.Lock()
defer d.mu.Unlock() defer d.mu.Unlock()
d.onMessageHandler = f d.onMessageHandler = f
} }
func (d *DataChannel) onMessage(p sugar.Payload) { func (d *DataChannel) onMessage(msg DataChannelMessage) {
d.mu.RLock() d.mu.RLock()
hdlr := d.onMessageHandler hdlr := d.onMessageHandler
d.mu.RUnlock() d.mu.RUnlock()
if hdlr == nil || p == nil { if hdlr == nil {
return return
} }
hdlr(p) hdlr(msg)
}
// Onmessage sets an event handler which is invoked on a message
// arrival over the sctp transport from a remote peer.
//
// Deprecated: use OnMessage instead.
func (d *DataChannel) Onmessage(f func(p sugar.Payload)) {
d.OnMessage(f)
} }
func (d *DataChannel) handleOpen(dc *datachannel.DataChannel) { func (d *DataChannel) handleOpen(dc *datachannel.DataChannel) {
@@ -331,39 +329,38 @@ func (d *DataChannel) readLoop() {
return return
} }
if isString { d.onMessage(DataChannelMessage{Data: buffer[:n], IsString: isString})
d.onMessage(&sugar.PayloadString{Data: buffer[:n]})
continue
}
d.onMessage(&sugar.PayloadBinary{Data: buffer[:n]})
} }
} }
// Send sends the passed message to the DataChannel peer // Send sends the binary message to the DataChannel peer
func (d *DataChannel) Send(payload sugar.Payload) error { func (d *DataChannel) Send(data []byte) error {
err := d.ensureOpen() err := d.ensureOpen()
if err != nil { if err != nil {
return err return err
} }
var data []byte
isString := false
switch p := payload.(type) {
case sugar.PayloadString:
data = p.Data
isString = true
case sugar.PayloadBinary:
data = p.Data
default:
return errors.Errorf("unknown DataChannel Payload (%s)", payload.PayloadType())
}
if len(data) == 0 { if len(data) == 0 {
data = []byte{0} data = []byte{0}
} }
_, err = d.dataChannel.WriteDataChannel(data, isString) _, err = d.dataChannel.WriteDataChannel(data, false)
return err
}
// SendText sends the text message to the DataChannel peer
func (d *DataChannel) SendText(s string) error {
err := d.ensureOpen()
if err != nil {
return err
}
data := []byte(s)
if len(data) == 0 {
data = []byte{0}
}
_, err = d.dataChannel.WriteDataChannel(data, true)
return err return err
} }

View File

@@ -5,7 +5,6 @@ import (
"time" "time"
"github.com/pions/transport/test" "github.com/pions/transport/test"
"github.com/pions/webrtc/pkg/datachannel"
) )
func TestDataChannel_ORTCE2E(t *testing.T) { func TestDataChannel_ORTCE2E(t *testing.T) {
@@ -26,11 +25,11 @@ func TestDataChannel_ORTCE2E(t *testing.T) {
awaitBinary := make(chan struct{}) awaitBinary := make(chan struct{})
stackB.sctp.OnDataChannel(func(d *DataChannel) { stackB.sctp.OnDataChannel(func(d *DataChannel) {
close(awaitSetup) close(awaitSetup)
d.OnMessage(func(payload datachannel.Payload) {
switch payload.(type) { d.OnMessage(func(msg DataChannelMessage) {
case *datachannel.PayloadString: if msg.IsString {
close(awaitString) close(awaitString)
case *datachannel.PayloadBinary: } else {
close(awaitBinary) close(awaitBinary)
} }
}) })
@@ -52,11 +51,11 @@ func TestDataChannel_ORTCE2E(t *testing.T) {
<-awaitSetup <-awaitSetup
err = channelA.Send(datachannel.PayloadString{Data: []byte("ABC")}) err = channelA.SendText("ABC")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
err = channelA.Send(datachannel.PayloadBinary{Data: []byte("ABC")}) err = channelA.Send([]byte("ABC"))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@@ -8,8 +8,6 @@ import (
"testing" "testing"
"time" "time"
sugar "github.com/pions/webrtc/pkg/datachannel"
"github.com/pions/transport/test" "github.com/pions/transport/test"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
@@ -85,20 +83,20 @@ func TestDataChannel_Send(t *testing.T) {
assert.True(t, dc.Ordered, "Ordered should be set to true") assert.True(t, dc.Ordered, "Ordered should be set to true")
dc.OnOpen(func() { dc.OnOpen(func() {
e := dc.Send(sugar.PayloadString{Data: []byte("Ping")}) e := dc.SendText("Ping")
if e != nil { if e != nil {
t.Fatalf("Failed to send string on data channel") t.Fatalf("Failed to send string on data channel")
} }
}) })
dc.OnMessage(func(payload sugar.Payload) { dc.OnMessage(func(msg DataChannelMessage) {
done <- true done <- true
}) })
answerPC.OnDataChannel(func(d *DataChannel) { answerPC.OnDataChannel(func(d *DataChannel) {
assert.True(t, d.Ordered, "Ordered should be set to true") assert.True(t, d.Ordered, "Ordered should be set to true")
d.OnMessage(func(payload sugar.Payload) { d.OnMessage(func(msg DataChannelMessage) {
e := d.Send(sugar.PayloadBinary{Data: []byte("Pong")}) e := d.Send([]byte("Pong"))
if e != nil { if e != nil {
t.Fatalf("Failed to send string on data channel") t.Fatalf("Failed to send string on data channel")
} }
@@ -124,43 +122,27 @@ func TestDataChannel_EventHandlers(t *testing.T) {
api := NewAPI() api := NewAPI()
dc := &DataChannel{api: api} dc := &DataChannel{api: api}
onOpenCalled := make(chan bool) onOpenCalled := make(chan struct{})
onMessageCalled := make(chan bool) onMessageCalled := make(chan struct{})
// Verify that the noop case works // Verify that the noop case works
assert.NotPanics(t, func() { dc.onOpen() }) assert.NotPanics(t, func() { dc.onOpen() })
assert.NotPanics(t, func() { dc.onMessage(nil) })
dc.OnOpen(func() { dc.OnOpen(func() {
onOpenCalled <- true close(onOpenCalled)
}) })
dc.OnMessage(func(p sugar.Payload) { dc.OnMessage(func(p DataChannelMessage) {
go func() { close(onMessageCalled)
onMessageCalled <- true
}()
}) })
// Verify that the handlers deal with nil inputs
assert.NotPanics(t, func() { dc.onMessage(nil) })
// Verify that the set handlers are called // Verify that the set handlers are called
assert.NotPanics(t, func() { dc.onOpen() }) assert.NotPanics(t, func() { dc.onOpen() })
assert.NotPanics(t, func() { dc.onMessage(&sugar.PayloadString{Data: []byte("o hai")}) }) assert.NotPanics(t, func() { dc.onMessage(DataChannelMessage{Data: []byte("o hai")}) })
allTrue := func(vals []bool) bool { // Wait for all handlers to be called
for _, val := range vals { <-onOpenCalled
if !val { <-onMessageCalled
return false
}
}
return true
}
assert.True(t, allTrue([]bool{
<-onOpenCalled,
<-onMessageCalled,
}))
} }
func TestDataChannel_MessagesAreOrdered(t *testing.T) { func TestDataChannel_MessagesAreOrdered(t *testing.T) {
@@ -172,7 +154,7 @@ func TestDataChannel_MessagesAreOrdered(t *testing.T) {
max := 512 max := 512
out := make(chan int) out := make(chan int)
inner := func(payload sugar.Payload) { inner := func(msg DataChannelMessage) {
// randomly sleep // randomly sleep
// NB: The big.Int/crypto.Rand is overkill but makes the linter happy // NB: The big.Int/crypto.Rand is overkill but makes the linter happy
randInt, err := rand.Int(rand.Reader, big.NewInt(int64(max))) randInt, err := rand.Int(rand.Reader, big.NewInt(int64(max)))
@@ -180,13 +162,10 @@ func TestDataChannel_MessagesAreOrdered(t *testing.T) {
t.Fatalf("Failed to get random sleep duration: %s", err) t.Fatalf("Failed to get random sleep duration: %s", err)
} }
time.Sleep(time.Duration(randInt.Int64()) * time.Microsecond) time.Sleep(time.Duration(randInt.Int64()) * time.Microsecond)
p, ok := payload.(*sugar.PayloadBinary) s, _ := binary.Varint(msg.Data)
if ok {
s, _ := binary.Varint(p.Data)
out <- int(s) out <- int(s)
} }
} dc.OnMessage(func(p DataChannelMessage) {
dc.OnMessage(func(p sugar.Payload) {
inner(p) inner(p)
}) })
@@ -194,12 +173,12 @@ func TestDataChannel_MessagesAreOrdered(t *testing.T) {
for i := 1; i <= max; i++ { for i := 1; i <= max; i++ {
buf := make([]byte, 8) buf := make([]byte, 8)
binary.PutVarint(buf, int64(i)) binary.PutVarint(buf, int64(i))
dc.onMessage(&sugar.PayloadBinary{Data: buf}) dc.onMessage(DataChannelMessage{Data: buf})
// Change the registered handler a couple of times to make sure // Change the registered handler a couple of times to make sure
// that everything continues to work, we don't lose messages, etc. // that everything continues to work, we don't lose messages, etc.
if i%2 == 0 { if i%2 == 0 {
hdlr := func(p sugar.Payload) { hdlr := func(msg DataChannelMessage) {
inner(p) inner(msg)
} }
dc.OnMessage(hdlr) dc.OnMessage(hdlr)
} }

View File

@@ -6,7 +6,6 @@ import (
"time" "time"
"github.com/pions/webrtc" "github.com/pions/webrtc"
"github.com/pions/webrtc/pkg/datachannel"
"github.com/pions/webrtc/examples/internal/signal" "github.com/pions/webrtc/examples/internal/signal"
) )
@@ -56,9 +55,10 @@ func main() {
cnt := *closeAfter cnt := *closeAfter
for range ticker.C { for range ticker.C {
message := signal.RandSeq(15) message := signal.RandSeq(15)
fmt.Printf("Sending %s \n", message) fmt.Printf("Sending '%s'\n", message)
err := d.Send(datachannel.PayloadString{Data: []byte(message)}) // Send the message as text
err := d.SendText(message)
if err != nil { if err != nil {
panic(err) panic(err)
} }
@@ -76,15 +76,8 @@ func main() {
}) })
// Register message handling // Register message handling
d.OnMessage(func(payload datachannel.Payload) { d.OnMessage(func(msg webrtc.DataChannelMessage) {
switch p := payload.(type) { fmt.Printf("Message from DataChannel '%s': '%s'\n", d.Label, string(msg.Data))
case *datachannel.PayloadString:
fmt.Printf("Message '%s' from DataChannel '%s' payload '%s'\n", p.PayloadType().String(), d.Label, string(p.Data))
case *datachannel.PayloadBinary:
fmt.Printf("Message '%s' from DataChannel '%s' payload '% 02x'\n", p.PayloadType().String(), d.Label, p.Data)
default:
fmt.Printf("Message '%s' from DataChannel '%s' no payload \n", p.PayloadType().String(), d.Label)
}
}) })
}) })

View File

@@ -5,7 +5,6 @@ import (
"time" "time"
"github.com/pions/webrtc" "github.com/pions/webrtc"
sugar "github.com/pions/webrtc/pkg/datachannel"
"github.com/pions/webrtc/examples/internal/signal" "github.com/pions/webrtc/examples/internal/signal"
) )
@@ -46,25 +45,19 @@ func main() {
for range time.NewTicker(5 * time.Second).C { for range time.NewTicker(5 * time.Second).C {
message := signal.RandSeq(15) message := signal.RandSeq(15)
fmt.Printf("Sending %s \n", message) fmt.Printf("Sending '%s'\n", message)
err := dataChannel.Send(sugar.PayloadString{Data: []byte(message)}) // Send the message as text
err := dataChannel.SendText(message)
if err != nil { if err != nil {
panic(err) panic(err)
} }
} }
}) })
// Register the OnMessage to handle incoming messages // Register text message handling
dataChannel.OnMessage(func(payload sugar.Payload) { dataChannel.OnMessage(func(msg webrtc.DataChannelMessage) {
switch p := payload.(type) { fmt.Printf("Message from DataChannel '%s': '%s'\n", dataChannel.Label, string(msg.Data))
case *sugar.PayloadString:
fmt.Printf("Message '%s' from DataChannel '%s' payload '%s'\n", p.PayloadType().String(), dataChannel.Label, string(p.Data))
case *sugar.PayloadBinary:
fmt.Printf("Message '%s' from DataChannel '%s' payload '% 02x'\n", p.PayloadType().String(), dataChannel.Label, p.Data)
default:
fmt.Printf("Message '%s' from DataChannel '%s' no payload \n", p.PayloadType().String(), dataChannel.Label)
}
}) })
// Create an offer to send to the browser // Create an offer to send to the browser

View File

@@ -5,7 +5,6 @@ import (
"time" "time"
"github.com/pions/webrtc" "github.com/pions/webrtc"
sugar "github.com/pions/webrtc/pkg/datachannel"
"github.com/pions/webrtc/examples/internal/signal" "github.com/pions/webrtc/examples/internal/signal"
) )
@@ -44,25 +43,19 @@ func main() {
for range time.NewTicker(5 * time.Second).C { for range time.NewTicker(5 * time.Second).C {
message := signal.RandSeq(15) message := signal.RandSeq(15)
fmt.Printf("Sending %s \n", message) fmt.Printf("Sending '%s'\n", message)
err := d.Send(sugar.PayloadString{Data: []byte(message)}) // Send the message as text
err := d.SendText(message)
if err != nil { if err != nil {
panic(err) panic(err)
} }
} }
}) })
// Register message handling // Register text message handling
d.OnMessage(func(payload sugar.Payload) { d.OnMessage(func(msg webrtc.DataChannelMessage) {
switch p := payload.(type) { fmt.Printf("Message from DataChannel '%s': '%s'\n", d.Label, string(msg.Data))
case *sugar.PayloadString:
fmt.Printf("Message '%s' from DataChannel '%s' payload '%s'\n", p.PayloadType().String(), d.Label, string(p.Data))
case *sugar.PayloadBinary:
fmt.Printf("Message '%s' from DataChannel '%s' payload '% 02x'\n", p.PayloadType().String(), d.Label, p.Data)
default:
fmt.Printf("Message '%s' from DataChannel '%s' no payload \n", p.PayloadType().String(), d.Label)
}
}) })
}) })

View File

@@ -6,7 +6,6 @@ import (
"time" "time"
"github.com/pions/webrtc" "github.com/pions/webrtc"
sugar "github.com/pions/webrtc/pkg/datachannel"
"github.com/pions/webrtc/examples/internal/signal" "github.com/pions/webrtc/examples/internal/signal"
) )
@@ -51,7 +50,9 @@ func main() {
// Register the handlers // Register the handlers
channel.OnOpen(handleOnOpen(channel)) channel.OnOpen(handleOnOpen(channel))
channel.OnMessage(handleMessage(channel)) channel.OnMessage(func(msg webrtc.DataChannelMessage) {
fmt.Printf("Message from DataChannel '%s': '%s'\n", channel.Label, string(msg.Data))
})
}) })
// Gather candidates // Gather candidates
@@ -129,7 +130,9 @@ func main() {
// Register the handlers // Register the handlers
// channel.OnOpen(handleOnOpen(channel)) // TODO: OnOpen on handle ChannelAck // channel.OnOpen(handleOnOpen(channel)) // TODO: OnOpen on handle ChannelAck
go handleOnOpen(channel)() // Temporary alternative go handleOnOpen(channel)() // Temporary alternative
channel.OnMessage(handleMessage(channel)) channel.OnMessage(func(msg webrtc.DataChannelMessage) {
fmt.Printf("Message from DataChannel '%s': '%s'\n", channel.Label, string(msg.Data))
})
} }
select {} select {}
@@ -151,25 +154,12 @@ func handleOnOpen(channel *webrtc.DataChannel) func() {
for range time.NewTicker(5 * time.Second).C { for range time.NewTicker(5 * time.Second).C {
message := signal.RandSeq(15) message := signal.RandSeq(15)
fmt.Printf("Sending %s \n", message) fmt.Printf("Sending '%s' \n", message)
err := channel.Send(sugar.PayloadString{Data: []byte(message)}) err := channel.SendText(message)
if err != nil { if err != nil {
panic(err) panic(err)
} }
} }
} }
} }
func handleMessage(channel *webrtc.DataChannel) func(sugar.Payload) {
return func(payload sugar.Payload) {
switch p := payload.(type) {
case *sugar.PayloadString:
fmt.Printf("Message '%s' from DataChannel '%s' payload '%s'\n", p.PayloadType().String(), channel.Label, string(p.Data))
case *sugar.PayloadBinary:
fmt.Printf("Message '%s' from DataChannel '%s' payload '% 02x'\n", p.PayloadType().String(), channel.Label, p.Data)
default:
fmt.Printf("Message '%s' from DataChannel '%s' no payload \n", p.PayloadType().String(), channel.Label)
}
}
}

View File

@@ -8,7 +8,6 @@ import (
"time" "time"
"github.com/pions/webrtc" "github.com/pions/webrtc"
sugar "github.com/pions/webrtc/pkg/datachannel"
"github.com/pions/webrtc/examples/internal/signal" "github.com/pions/webrtc/examples/internal/signal"
) )
@@ -50,25 +49,19 @@ func main() {
for range time.NewTicker(5 * time.Second).C { for range time.NewTicker(5 * time.Second).C {
message := signal.RandSeq(15) message := signal.RandSeq(15)
fmt.Printf("Sending %s \n", message) fmt.Printf("Sending '%s'\n", message)
err := d.Send(sugar.PayloadString{Data: []byte(message)}) // Send the message as text
err := d.SendText(message)
if err != nil { if err != nil {
panic(err) panic(err)
} }
} }
}) })
// Register message handling // Register text message handling
d.OnMessage(func(payload sugar.Payload) { d.OnMessage(func(msg webrtc.DataChannelMessage) {
switch p := payload.(type) { fmt.Printf("Message from DataChannel '%s': '%s'\n", d.Label, string(msg.Data))
case *sugar.PayloadString:
fmt.Printf("Message '%s' from DataChannel '%s' payload '%s'\n", p.PayloadType().String(), d.Label, string(p.Data))
case *sugar.PayloadBinary:
fmt.Printf("Message '%s' from DataChannel '%s' payload '% 02x'\n", p.PayloadType().String(), d.Label, p.Data)
default:
fmt.Printf("Message '%s' from DataChannel '%s' no payload \n", p.PayloadType().String(), d.Label)
}
}) })
}) })

View File

@@ -9,7 +9,6 @@ import (
"time" "time"
"github.com/pions/webrtc" "github.com/pions/webrtc"
sugar "github.com/pions/webrtc/pkg/datachannel"
"github.com/pions/webrtc/examples/internal/signal" "github.com/pions/webrtc/examples/internal/signal"
) )
@@ -53,25 +52,19 @@ func main() {
for range time.NewTicker(5 * time.Second).C { for range time.NewTicker(5 * time.Second).C {
message := signal.RandSeq(15) message := signal.RandSeq(15)
fmt.Printf("Sending %s \n", message) fmt.Printf("Sending '%s'\n", message)
err := dataChannel.Send(sugar.PayloadString{Data: []byte(message)}) // Send the message as text
err := dataChannel.SendText(message)
if err != nil { if err != nil {
panic(err) panic(err)
} }
} }
}) })
// Register the OnMessage to handle incoming messages // Register text message handling
dataChannel.OnMessage(func(payload sugar.Payload) { dataChannel.OnMessage(func(msg webrtc.DataChannelMessage) {
switch p := payload.(type) { fmt.Printf("Message from DataChannel '%s': '%s'\n", dataChannel.Label, string(msg.Data))
case *sugar.PayloadString:
fmt.Printf("Message '%s' from DataChannel '%s' payload '%s'\n", p.PayloadType().String(), dataChannel.Label, string(p.Data))
case *sugar.PayloadBinary:
fmt.Printf("Message '%s' from DataChannel '%s' payload '% 02x'\n", p.PayloadType().String(), dataChannel.Label, p.Data)
default:
fmt.Printf("Message '%s' from DataChannel '%s' no payload \n", p.PayloadType().String(), dataChannel.Label)
}
}) })
// Create an offer to send to the browser // Create an offer to send to the browser

View File

@@ -1,49 +0,0 @@
package datachannel
import "fmt"
// PayloadType are the different types of data that can be
// represented in a DataChannel message
type PayloadType int
// PayloadType enums
const (
PayloadTypeString = iota + 1
PayloadTypeBinary
)
func (p PayloadType) String() string {
switch p {
case PayloadTypeString:
return "Payload Type String"
case PayloadTypeBinary:
return "Payload Type Binary"
default:
return fmt.Sprintf("Invalid PayloadType (%d)", p)
}
}
// Payload is the body of a DataChannel message
type Payload interface {
PayloadType() PayloadType
}
// PayloadString is a string DataChannel message
type PayloadString struct {
Data []byte
}
//PayloadType returns the type of payload
func (p PayloadString) PayloadType() PayloadType {
return PayloadTypeString
}
// PayloadBinary is a binary DataChannel message
type PayloadBinary struct {
Data []byte
}
//PayloadType returns the type of payload
func (p PayloadBinary) PayloadType() PayloadType {
return PayloadTypeBinary
}