Upadate sctp datachannel

Relates to #652
This commit is contained in:
Yutaka Takeda
2019-05-03 11:22:28 -07:00
parent 7b7cd256e6
commit d4ea7e3838
3 changed files with 213 additions and 6 deletions

View File

@@ -3,12 +3,18 @@
package webrtc
import (
"bytes"
"crypto/rand"
"encoding/binary"
"io"
"io/ioutil"
"math/big"
"reflect"
"sync"
"testing"
"time"
"github.com/pion/logging"
"github.com/pion/transport/test"
"github.com/stretchr/testify/assert"
)
@@ -345,3 +351,204 @@ func TestDataChannelBufferedAmount(t *testing.T) {
assert.True(t, nCbs > 0, "callback should be made at least once")
})
}
func TestEOF(t *testing.T) {
log := logging.NewDefaultLoggerFactory().NewLogger("test")
label := "test-channel"
testData := []byte("this is some test data")
t.Run("Detach", func(t *testing.T) {
// Use Detach data channels mode
s := SettingEngine{}
s.DetachDataChannels()
api := NewAPI(WithSettingEngine(s))
// Set up two peer connections.
config := Configuration{}
pca, err := api.NewPeerConnection(config)
if err != nil {
t.Fatal(err)
}
pcb, err := api.NewPeerConnection(config)
if err != nil {
t.Fatal(err)
}
defer func() { assert.NoError(t, pca.Close(), "should succeed") }()
defer func() { assert.NoError(t, pcb.Close(), "should succeed") }()
var wg sync.WaitGroup
dcChan := make(chan *DataChannel)
pcb.OnDataChannel(func(dc *DataChannel) {
if dc.Label() != label {
return
}
log.Debug("OnDataChannel was called")
dc.OnOpen(func() {
dcChan <- dc
})
})
wg.Add(1)
go func() {
defer wg.Done()
var dc io.ReadWriteCloser
var msg []byte
log.Debug("Waiting for OnDataChannel")
attached := <-dcChan
log.Debug("data channel opened")
dc, err = attached.Detach()
if err != nil {
log.Debugf("Detach failed: %s\n", err.Error())
t.Error(err)
}
defer func() { assert.NoError(t, dc.Close(), "should succeed") }()
log.Debug("Waiting for ping...")
msg, err = ioutil.ReadAll(dc)
log.Debugf("Received ping! \"%s\"\n", string(msg))
if err != nil {
t.Error(err)
}
if !bytes.Equal(msg, testData) {
t.Errorf("expected %q, got %q", string(msg), string(testData))
} else {
log.Debug("Received ping successfully!")
}
}()
if err = signalPair(pca, pcb); err != nil {
t.Fatal(err)
}
attached, err := pca.CreateDataChannel(label, nil)
if err != nil {
t.Fatal(err)
}
log.Debug("Waiting for data channel to open")
open := make(chan struct{})
attached.OnOpen(func() {
open <- struct{}{}
})
<-open
log.Debug("data channel opened")
var dc io.ReadWriteCloser
dc, err = attached.Detach()
if err != nil {
t.Fatal(err)
}
wg.Add(1)
go func() {
defer wg.Done()
log.Debug("Sending ping...")
if _, err := dc.Write(testData); err != nil {
t.Error(err)
}
log.Debug("Sent ping")
assert.NoError(t, dc.Close(), "should succeed")
log.Debug("Wating for EOF")
ret, err := ioutil.ReadAll(dc)
assert.Nil(t, err, "should succeed")
assert.Equal(t, 0, len(ret), "should be empty")
}()
wg.Wait()
})
t.Run("No detach", func(t *testing.T) {
// Use Detach data channels mode
s := SettingEngine{}
//s.DetachDataChannels()
api := NewAPI(WithSettingEngine(s))
// Set up two peer connections.
config := Configuration{}
pca, err := api.NewPeerConnection(config)
if err != nil {
t.Fatal(err)
}
defer func() { assert.NoError(t, pca.Close(), "should succeed") }()
pcb, err := api.NewPeerConnection(config)
if err != nil {
t.Fatal(err)
}
defer func() { assert.NoError(t, pcb.Close(), "should succeed") }()
var dca, dcb *DataChannel
var dcbClosed bool
doneCh := make(chan struct{})
pcb.OnDataChannel(func(dc *DataChannel) {
if dc.Label() != label {
return
}
log.Debugf("pcb: new datachannel: %s\n", dc.Label())
dcb = dc
// Register channel opening handling
dcb.OnOpen(func() {
log.Debug("pcb: datachannel opened")
})
dcb.OnClose(func() {
log.Debug("pcb: data channel closed")
dcbClosed = true
})
// Register the OnMessage to handle incoming messages
log.Debug("pcb: registering onMessage callback")
dcb.OnMessage(func(dcMsg DataChannelMessage) {
log.Debugf("pcb: received ping: %s\n", string(dcMsg.Data))
if !reflect.DeepEqual(dcMsg.Data, testData) {
t.Error("data mismatch")
}
})
})
dca, err = pca.CreateDataChannel(label, nil)
if err != nil {
t.Fatal(err)
}
dca.OnOpen(func() {
log.Debug("pca: data channel opened")
log.Debugf("pca: sending \"%s\"", string(testData))
if err := dca.Send(testData); err != nil {
t.Fatal(err)
}
log.Debug("pca: sent ping")
assert.NoError(t, dca.Close(), "should succeed")
})
dca.OnClose(func() {
log.Debug("pca: data channel closed")
close(doneCh)
})
// Register the OnMessage to handle incoming messages
log.Debug("pca: registering onMessage callback")
dca.OnMessage(func(dcMsg DataChannelMessage) {
log.Debugf("pca: received pong: %s\n", string(dcMsg.Data))
if !reflect.DeepEqual(dcMsg.Data, testData) {
t.Error("data mismatch")
}
})
if err := signalPair(pca, pcb); err != nil {
t.Fatal(err)
}
<-doneCh
assert.True(t, dcbClosed, "dcb should be closed by now")
})
}

4
go.mod
View File

@@ -3,14 +3,14 @@ module github.com/pion/webrtc/v2
go 1.12
require (
github.com/pion/datachannel v1.4.1
github.com/pion/datachannel v1.4.2
github.com/pion/dtls v1.3.4
github.com/pion/ice v0.2.6
github.com/pion/logging v0.2.1
github.com/pion/quic v0.1.1
github.com/pion/rtcp v1.2.0
github.com/pion/rtp v1.1.2
github.com/pion/sctp v1.6.1
github.com/pion/sctp v1.6.2
github.com/pion/sdp/v2 v2.1.1
github.com/pion/srtp v1.2.4
github.com/pion/transport v0.7.0

8
go.sum
View File

@@ -20,8 +20,8 @@ github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/pion/datachannel v1.4.1 h1:VwK+oAzq72vup9aXBxTeIYvVOgHj0MBvrJnzOgCvet4=
github.com/pion/datachannel v1.4.1/go.mod h1:044W2uxb6Wnm1c7X/jjhztShfD4e9p9yUNtdhvrTtaI=
github.com/pion/datachannel v1.4.2 h1:DyNBYmM73irlbLQpRBfUqTs5P9WFVhjg4yXE/4XmBW0=
github.com/pion/datachannel v1.4.2/go.mod h1:KCNXb2VWEZYfdXTzN719Pgkg5fO0wsr3iokIaO/E6zo=
github.com/pion/dtls v1.3.4 h1:MdOMsCfd44m2iTrxtkzA6UndvYVjLWWjua7hxU8EXEA=
github.com/pion/dtls v1.3.4/go.mod h1:CjlPLfQdsTg3G4AEXjJp8FY5bRweBlxHrgoFrN+fQsk=
github.com/pion/ice v0.2.6 h1:N/xhQtO6WfWlyMvIZgE+cqG5AHJnKL+aEboade72qno=
@@ -34,8 +34,8 @@ github.com/pion/rtcp v1.2.0 h1:rT2FptW5YHIern+4XlbGYnnsT26XGxurnkNLnzhtDXg=
github.com/pion/rtcp v1.2.0/go.mod h1:a5dj2d6BKIKHl43EnAOIrCczcjESrtPuMgfmL6/K6QM=
github.com/pion/rtp v1.1.2 h1:ERNugzYHW9F2ldpwoARbeFGKRoq1REe5Jxdjvm/rOx8=
github.com/pion/rtp v1.1.2/go.mod h1:/l4cvcKd0D3u9JLs2xSVI95YkfXW87a3br3nqmVtSlE=
github.com/pion/sctp v1.6.1 h1:4o1M+xCaz7Q8P5EmdqvbkECLIbkOk2yIRv9b1Z01MKc=
github.com/pion/sctp v1.6.1/go.mod h1:2OUthw9DpDB3AGoHFHMUrxBlhLXPJfrn/Q52qECtGZI=
github.com/pion/sctp v1.6.2 h1:K/j9rsv4Lv97znjMum6xGNfeBl7LwVlVjGbTfJ9a+0Q=
github.com/pion/sctp v1.6.2/go.mod h1:cCqpLdYvgEUdl715+qbWtgT439CuQrAgy8BZTp0aEfA=
github.com/pion/sdp/v2 v2.1.1 h1:i3fAyjiLuQseYNo0BtCOPfzp91Ppb7vasRGmUUTog28=
github.com/pion/sdp/v2 v2.1.1/go.mod h1:idSlWxhfWQDtTy9J05cgxpHBu/POwXN2VDRGYxT/EjU=
github.com/pion/srtp v1.2.4 h1:wwGKC5ewuBukkZ+i+pZ8aO33+t6z2y/XRiYtyP0Xpv0=