mirror of
https://github.com/pion/stun.git
synced 2025-10-20 14:35:19 +08:00
{client,agent}: don't alloc on client.Do
This commit is contained in:
81
README.md
81
README.md
@@ -73,46 +73,47 @@ goos: linux
|
|||||||
goarch: amd64
|
goarch: amd64
|
||||||
pkg: github.com/gortc/stun
|
pkg: github.com/gortc/stun
|
||||||
PASS
|
PASS
|
||||||
benchmark iter time/iter throughput bytes alloc allocs
|
benchmark iter time/iter throughput bytes alloc allocs
|
||||||
--------- ---- --------- ---------- ----------- ------
|
--------- ---- --------- ---------- ----------- ------
|
||||||
BenchmarkMappedAddress_AddTo-12 100000000 22.50 ns/op 0 B/op 0 allocs/op
|
BenchmarkMappedAddress_AddTo-12 1000000000 22.90 ns/op 0 B/op 0 allocs/op
|
||||||
BenchmarkAlternateServer_AddTo-12 100000000 22.20 ns/op 0 B/op 0 allocs/op
|
BenchmarkAlternateServer_AddTo-12 1000000000 23.00 ns/op 0 B/op 0 allocs/op
|
||||||
BenchmarkAgent_GC-12 1000000 2038.00 ns/op 0 B/op 0 allocs/op
|
BenchmarkAgent_GC-12 10000000 2217.00 ns/op 0 B/op 0 allocs/op
|
||||||
BenchmarkAgent_Process-12 30000000 47.60 ns/op 0 B/op 0 allocs/op
|
BenchmarkAgent_Process-12 300000000 52.80 ns/op 0 B/op 0 allocs/op
|
||||||
BenchmarkMessage_GetNotFound-12 300000000 4.29 ns/op 0 B/op 0 allocs/op
|
BenchmarkMessage_GetNotFound-12 3000000000 4.13 ns/op 0 B/op 0 allocs/op
|
||||||
BenchmarkClient_Do-12 2000000 534.00 ns/op 16 B/op 1 allocs/op
|
BenchmarkClient_Do-12 30000000 495.00 ns/op 0 B/op 0 allocs/op
|
||||||
BenchmarkErrorCode_AddTo-12 30000000 42.00 ns/op 0 B/op 0 allocs/op
|
BenchmarkErrorCode_AddTo-12 300000000 41.10 ns/op 0 B/op 0 allocs/op
|
||||||
BenchmarkErrorCodeAttribute_AddTo-12 50000000 30.90 ns/op 0 B/op 0 allocs/op
|
BenchmarkErrorCodeAttribute_AddTo-12 500000000 30.90 ns/op 0 B/op 0 allocs/op
|
||||||
BenchmarkErrorCodeAttribute_GetFrom-12 200000000 7.78 ns/op 0 B/op 0 allocs/op
|
BenchmarkErrorCodeAttribute_GetFrom-12 2000000000 7.84 ns/op 0 B/op 0 allocs/op
|
||||||
BenchmarkFingerprint_AddTo-12 30000000 47.30 ns/op 931.09 MB/s 0 B/op 0 allocs/op
|
BenchmarkFingerprint_AddTo-12 300000000 46.60 ns/op 944.52 MB/s 0 B/op 0 allocs/op
|
||||||
BenchmarkFingerprint_Check-12 50000000 38.20 ns/op 1360.04 MB/s 0 B/op 0 allocs/op
|
BenchmarkFingerprint_Check-12 500000000 37.20 ns/op 1397.50 MB/s 0 B/op 0 allocs/op
|
||||||
BenchmarkBuildOverhead/Build-12 10000000 139.00 ns/op 0 B/op 0 allocs/op
|
BenchmarkBuildOverhead/Build-12 100000000 133.00 ns/op 0 B/op 0 allocs/op
|
||||||
BenchmarkBuildOverhead/BuildNonPointer-12 5000000 249.00 ns/op 100 B/op 4 allocs/op
|
BenchmarkBuildOverhead/BuildNonPointer-12 100000000 262.00 ns/op 100 B/op 4 allocs/op
|
||||||
BenchmarkBuildOverhead/Raw-12 20000000 114.00 ns/op 0 B/op 0 allocs/op
|
BenchmarkBuildOverhead/Raw-12 100000000 119.00 ns/op 0 B/op 0 allocs/op
|
||||||
BenchmarkMessageIntegrity_AddTo-12 2000000 1022.00 ns/op 19.56 MB/s 480 B/op 6 allocs/op
|
BenchmarkMessageIntegrity_AddTo-12 20000000 1058.00 ns/op 18.89 MB/s 480 B/op 6 allocs/op
|
||||||
BenchmarkMessageIntegrity_Check-12 1000000 1084.00 ns/op 29.50 MB/s 480 B/op 6 allocs/op
|
BenchmarkMessageIntegrity_Check-12 20000000 1023.00 ns/op 31.27 MB/s 480 B/op 6 allocs/op
|
||||||
BenchmarkMessage_Write-12 100000000 16.10 ns/op 1743.03 MB/s 0 B/op 0 allocs/op
|
BenchmarkMessage_Write-12 1000000000 16.10 ns/op 1734.19 MB/s 0 B/op 0 allocs/op
|
||||||
BenchmarkMessageType_Value-12 2000000000 0.23 ns/op 0 B/op 0 allocs/op
|
BenchmarkMessageType_Value-12 10000000000 0.23 ns/op 0 B/op 0 allocs/op
|
||||||
BenchmarkMessage_WriteTo-12 200000000 8.11 ns/op 0 B/op 0 allocs/op
|
BenchmarkMessage_WriteTo-12 2000000000 8.19 ns/op 0 B/op 0 allocs/op
|
||||||
BenchmarkMessage_ReadFrom-12 100000000 18.30 ns/op 1095.75 MB/s 0 B/op 0 allocs/op
|
BenchmarkMessage_ReadFrom-12 1000000000 18.00 ns/op 1108.32 MB/s 0 B/op 0 allocs/op
|
||||||
BenchmarkMessage_ReadBytes-12 100000000 10.70 ns/op 1870.83 MB/s 0 B/op 0 allocs/op
|
BenchmarkMessage_ReadBytes-12 2000000000 10.50 ns/op 1896.39 MB/s 0 B/op 0 allocs/op
|
||||||
BenchmarkIsMessage-12 2000000000 0.68 ns/op 29576.35 MB/s 0 B/op 0 allocs/op
|
BenchmarkIsMessage-12 10000000000 0.64 ns/op 31241.13 MB/s 0 B/op 0 allocs/op
|
||||||
BenchmarkMessage_NewTransactionID-12 3000000 393.00 ns/op 0 B/op 0 allocs/op
|
BenchmarkMessage_NewTransactionID-12 50000000 385.00 ns/op 0 B/op 0 allocs/op
|
||||||
BenchmarkMessageFull-12 10000000 138.00 ns/op 0 B/op 0 allocs/op
|
BenchmarkMessageFull-12 100000000 134.00 ns/op 0 B/op 0 allocs/op
|
||||||
BenchmarkMessageFullHardcore-12 30000000 53.90 ns/op 0 B/op 0 allocs/op
|
BenchmarkMessageFullHardcore-12 300000000 52.70 ns/op 0 B/op 0 allocs/op
|
||||||
BenchmarkMessage_WriteHeader-12 300000000 5.45 ns/op 0 B/op 0 allocs/op
|
BenchmarkMessage_WriteHeader-12 3000000000 5.21 ns/op 0 B/op 0 allocs/op
|
||||||
BenchmarkUsername_AddTo-12 100000000 15.00 ns/op 0 B/op 0 allocs/op
|
BenchmarkUsername_AddTo-12 1000000000 14.60 ns/op 0 B/op 0 allocs/op
|
||||||
BenchmarkUsername_GetFrom-12 100000000 11.90 ns/op 0 B/op 0 allocs/op
|
BenchmarkUsername_GetFrom-12 2000000000 11.70 ns/op 0 B/op 0 allocs/op
|
||||||
BenchmarkNonce_AddTo-12 100000000 20.70 ns/op 0 B/op 0 allocs/op
|
BenchmarkNonce_AddTo-12 1000000000 20.00 ns/op 0 B/op 0 allocs/op
|
||||||
BenchmarkNonce_AddTo_BadLength-12 100000000 27.40 ns/op 32 B/op 1 allocs/op
|
BenchmarkNonce_AddTo_BadLength-12 300000000 49.40 ns/op 32 B/op 1 allocs/op
|
||||||
BenchmarkNonce_GetFrom-12 200000000 11.90 ns/op 0 B/op 0 allocs/op
|
BenchmarkNonce_GetFrom-12 2000000000 11.80 ns/op 0 B/op 0 allocs/op
|
||||||
BenchmarkUnknownAttributes/AddTo-12 100000000 18.90 ns/op 0 B/op 0 allocs/op
|
BenchmarkUnknownAttributes/AddTo-12 1000000000 18.50 ns/op 0 B/op 0 allocs/op
|
||||||
BenchmarkUnknownAttributes/GetFrom-12 100000000 13.70 ns/op 0 B/op 0 allocs/op
|
BenchmarkUnknownAttributes/GetFrom-12 1000000000 13.40 ns/op 0 B/op 0 allocs/op
|
||||||
BenchmarkXOR-12 100000000 14.60 ns/op 70066.01 MB/s 0 B/op 0 allocs/op
|
BenchmarkXOR-12 1000000000 14.00 ns/op 73071.52 MB/s 0 B/op 0 allocs/op
|
||||||
BenchmarkXORSafe-12 20000000 98.80 ns/op 10363.12 MB/s 0 B/op 0 allocs/op
|
BenchmarkXORSafe-12 100000000 102.00 ns/op 9945.44 MB/s 0 B/op 0 allocs/op
|
||||||
BenchmarkXORFast-12 100000000 13.80 ns/op 74327.93 MB/s 0 B/op 0 allocs/op
|
BenchmarkXORFast-12 1000000000 13.60 ns/op 75332.68 MB/s 0 B/op 0 allocs/op
|
||||||
BenchmarkXORMappedAddress_AddTo-12 50000000 35.20 ns/op 0 B/op 0 allocs/op
|
BenchmarkXORMappedAddress_AddTo-12 500000000 35.20 ns/op 0 B/op 0 allocs/op
|
||||||
BenchmarkXORMappedAddress_GetFrom-12 100000000 22.30 ns/op 0 B/op 0 allocs/op
|
BenchmarkXORMappedAddress_GetFrom-12 1000000000 22.30 ns/op 0 B/op 0 allocs/op
|
||||||
|
ok github.com/gortc/stun 698.712s
|
||||||
```
|
```
|
||||||
|
|
||||||
# development
|
# development
|
||||||
|
55
agent.go
55
agent.go
@@ -8,7 +8,7 @@ import (
|
|||||||
|
|
||||||
// AgentOptions are required to initialize Agent.
|
// AgentOptions are required to initialize Agent.
|
||||||
type AgentOptions struct {
|
type AgentOptions struct {
|
||||||
Handler AgentFn // Default handler, can be nil.
|
Handler Handler // Default handler, can be nil.
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewAgent initializes and returns new Agent from options.
|
// NewAgent initializes and returns new Agent from options.
|
||||||
@@ -32,17 +32,28 @@ type Agent struct {
|
|||||||
transactions map[transactionID]agentTransaction
|
transactions map[transactionID]agentTransaction
|
||||||
closed bool // all calls are invalid if true
|
closed bool // all calls are invalid if true
|
||||||
mux sync.Mutex // protects transactions and closed
|
mux sync.Mutex // protects transactions and closed
|
||||||
zeroHandler AgentFn // handles non-registered transactions if set
|
zeroHandler Handler // handles non-registered transactions if set
|
||||||
}
|
}
|
||||||
|
|
||||||
// AgentFn is called on transaction state change.
|
// Handler handles state changes of transaction.
|
||||||
// Usage of e is valid only during call, user must
|
type Handler interface {
|
||||||
// copy needed fields explicitly.
|
// HandleEvent is called on transaction state change.
|
||||||
type AgentFn func(e AgentEvent)
|
// Usage of e is valid only during call, user must
|
||||||
|
// copy needed fields explicitly.
|
||||||
|
HandleEvent(e Event)
|
||||||
|
}
|
||||||
|
|
||||||
// AgentEvent is set of arguments passed to AgentFn, describing
|
// HandlerFunc is function that implements Handler interface.
|
||||||
|
type HandlerFunc func(e Event)
|
||||||
|
|
||||||
|
// HandleEvent implements Handler.
|
||||||
|
func (f HandlerFunc) HandleEvent(e Event) {
|
||||||
|
f(e)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Event is set of arguments passed to AgentFn, describing
|
||||||
// an transaction event. Do not reuse outside AgentFn.
|
// an transaction event. Do not reuse outside AgentFn.
|
||||||
type AgentEvent struct {
|
type Event struct {
|
||||||
Message *Message
|
Message *Message
|
||||||
Error error
|
Error error
|
||||||
}
|
}
|
||||||
@@ -54,7 +65,7 @@ type AgentEvent struct {
|
|||||||
type agentTransaction struct {
|
type agentTransaction struct {
|
||||||
id transactionID
|
id transactionID
|
||||||
deadline time.Time
|
deadline time.Time
|
||||||
f AgentFn
|
h Handler
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@@ -81,7 +92,7 @@ func (a *Agent) StopWithError(id [TransactionIDSize]byte, err error) error {
|
|||||||
if !exists {
|
if !exists {
|
||||||
return ErrTransactionNotExists
|
return ErrTransactionNotExists
|
||||||
}
|
}
|
||||||
t.f(AgentEvent{
|
t.h.HandleEvent(Event{
|
||||||
Error: err,
|
Error: err,
|
||||||
})
|
})
|
||||||
return nil
|
return nil
|
||||||
@@ -101,7 +112,7 @@ var ErrAgentClosed = errors.New("agent is closed")
|
|||||||
// Could return ErrAgentClosed, ErrTransactionExists.
|
// Could return ErrAgentClosed, ErrTransactionExists.
|
||||||
// Callback f is guaranteed to be eventually called. See AgentFn for
|
// Callback f is guaranteed to be eventually called. See AgentFn for
|
||||||
// callback processing constraints.
|
// callback processing constraints.
|
||||||
func (a *Agent) Start(id [TransactionIDSize]byte, deadline time.Time, f AgentFn) error {
|
func (a *Agent) Start(id [TransactionIDSize]byte, deadline time.Time, h Handler) error {
|
||||||
a.mux.Lock()
|
a.mux.Lock()
|
||||||
defer a.mux.Unlock()
|
defer a.mux.Unlock()
|
||||||
if a.closed {
|
if a.closed {
|
||||||
@@ -113,7 +124,7 @@ func (a *Agent) Start(id [TransactionIDSize]byte, deadline time.Time, f AgentFn)
|
|||||||
}
|
}
|
||||||
a.transactions[id] = agentTransaction{
|
a.transactions[id] = agentTransaction{
|
||||||
id: id,
|
id: id,
|
||||||
f: f,
|
h: h,
|
||||||
deadline: deadline,
|
deadline: deadline,
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@@ -132,7 +143,7 @@ var ErrTransactionTimeOut = errors.New("transaction is timed out")
|
|||||||
//
|
//
|
||||||
// It is safe to call Collect concurrently but makes no sense.
|
// It is safe to call Collect concurrently but makes no sense.
|
||||||
func (a *Agent) Collect(gcTime time.Time) error {
|
func (a *Agent) Collect(gcTime time.Time) error {
|
||||||
toCall := make([]AgentFn, 0, agentCollectCap)
|
toCall := make([]Handler, 0, agentCollectCap)
|
||||||
toRemove := make([]transactionID, 0, agentCollectCap)
|
toRemove := make([]transactionID, 0, agentCollectCap)
|
||||||
a.mux.Lock()
|
a.mux.Lock()
|
||||||
if a.closed {
|
if a.closed {
|
||||||
@@ -149,7 +160,7 @@ func (a *Agent) Collect(gcTime time.Time) error {
|
|||||||
for id, t := range a.transactions {
|
for id, t := range a.transactions {
|
||||||
if t.deadline.Before(gcTime) {
|
if t.deadline.Before(gcTime) {
|
||||||
toRemove = append(toRemove, id)
|
toRemove = append(toRemove, id)
|
||||||
toCall = append(toCall, t.f)
|
toCall = append(toCall, t.h)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Un-registering timed out transactions.
|
// Un-registering timed out transactions.
|
||||||
@@ -161,11 +172,11 @@ func (a *Agent) Collect(gcTime time.Time) error {
|
|||||||
a.mux.Unlock()
|
a.mux.Unlock()
|
||||||
// Sending ErrTransactionTimeOut to all callbacks, blocking
|
// Sending ErrTransactionTimeOut to all callbacks, blocking
|
||||||
// Collect until last one.
|
// Collect until last one.
|
||||||
event := AgentEvent{
|
event := Event{
|
||||||
Error: ErrTransactionTimeOut,
|
Error: ErrTransactionTimeOut,
|
||||||
}
|
}
|
||||||
for _, f := range toCall {
|
for _, handler := range toCall {
|
||||||
f(event)
|
handler.HandleEvent(event)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -175,7 +186,7 @@ func (a *Agent) Collect(gcTime time.Time) error {
|
|||||||
// handle is not provided, message is silently ignored.
|
// handle is not provided, message is silently ignored.
|
||||||
// Call blocks until handler returns.
|
// Call blocks until handler returns.
|
||||||
func (a *Agent) Process(m *Message) error {
|
func (a *Agent) Process(m *Message) error {
|
||||||
e := AgentEvent{
|
e := Event{
|
||||||
Message: m,
|
Message: m,
|
||||||
}
|
}
|
||||||
a.mux.Lock()
|
a.mux.Lock()
|
||||||
@@ -187,9 +198,9 @@ func (a *Agent) Process(m *Message) error {
|
|||||||
delete(a.transactions, m.TransactionID)
|
delete(a.transactions, m.TransactionID)
|
||||||
a.mux.Unlock()
|
a.mux.Unlock()
|
||||||
if ok {
|
if ok {
|
||||||
t.f(e)
|
t.h.HandleEvent(e)
|
||||||
} else if a.zeroHandler != nil {
|
} else if a.zeroHandler != nil {
|
||||||
a.zeroHandler(e)
|
a.zeroHandler.HandleEvent(e)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -197,7 +208,7 @@ func (a *Agent) Process(m *Message) error {
|
|||||||
// Close terminates all transactions with ErrAgentClosed and renders Agent to
|
// Close terminates all transactions with ErrAgentClosed and renders Agent to
|
||||||
// closed state.
|
// closed state.
|
||||||
func (a *Agent) Close() error {
|
func (a *Agent) Close() error {
|
||||||
e := AgentEvent{
|
e := Event{
|
||||||
Error: ErrAgentClosed,
|
Error: ErrAgentClosed,
|
||||||
}
|
}
|
||||||
a.mux.Lock()
|
a.mux.Lock()
|
||||||
@@ -206,7 +217,7 @@ func (a *Agent) Close() error {
|
|||||||
return ErrAgentClosed
|
return ErrAgentClosed
|
||||||
}
|
}
|
||||||
for _, t := range a.transactions {
|
for _, t := range a.transactions {
|
||||||
t.f(e)
|
t.h.HandleEvent(e)
|
||||||
}
|
}
|
||||||
a.transactions = nil
|
a.transactions = nil
|
||||||
a.closed = true
|
a.closed = true
|
||||||
|
@@ -8,22 +8,21 @@ import (
|
|||||||
func TestAgent_ProcessInTransaction(t *testing.T) {
|
func TestAgent_ProcessInTransaction(t *testing.T) {
|
||||||
m := New()
|
m := New()
|
||||||
a := NewAgent(AgentOptions{
|
a := NewAgent(AgentOptions{
|
||||||
Handler: func(e AgentEvent) {
|
Handler: HandlerFunc(func(e Event) {
|
||||||
t.Error("should not be called")
|
t.Error("should not be called")
|
||||||
},
|
}),
|
||||||
})
|
})
|
||||||
if err := m.NewTransactionID(); err != nil {
|
if err := m.NewTransactionID(); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if err := a.Start(m.TransactionID, time.Time{}, func(e AgentEvent) {
|
if err := a.Start(m.TransactionID, time.Time{}, HandlerFunc(func(e Event) {
|
||||||
if e.Error != nil {
|
if e.Error != nil {
|
||||||
t.Errorf("got error: %s", e.Error)
|
t.Errorf("got error: %s", e.Error)
|
||||||
}
|
}
|
||||||
if !e.Message.Equal(m) {
|
if !e.Message.Equal(m) {
|
||||||
t.Errorf("%s (got) != %s (expected)", e.Message, m)
|
t.Errorf("%s (got) != %s (expected)", e.Message, m)
|
||||||
}
|
}
|
||||||
|
})); err != nil {
|
||||||
}); err != nil {
|
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if err := a.Process(m); err != nil {
|
if err := a.Process(m); err != nil {
|
||||||
@@ -37,14 +36,14 @@ func TestAgent_ProcessInTransaction(t *testing.T) {
|
|||||||
func TestAgent_Process(t *testing.T) {
|
func TestAgent_Process(t *testing.T) {
|
||||||
m := New()
|
m := New()
|
||||||
a := NewAgent(AgentOptions{
|
a := NewAgent(AgentOptions{
|
||||||
Handler: func(e AgentEvent) {
|
Handler: HandlerFunc(func(e Event) {
|
||||||
if e.Error != nil {
|
if e.Error != nil {
|
||||||
t.Errorf("got error: %s", e.Error)
|
t.Errorf("got error: %s", e.Error)
|
||||||
}
|
}
|
||||||
if !e.Message.Equal(m) {
|
if !e.Message.Equal(m) {
|
||||||
t.Errorf("%s (got) != %s (expected)", e.Message, m)
|
t.Errorf("%s (got) != %s (expected)", e.Message, m)
|
||||||
}
|
}
|
||||||
},
|
}),
|
||||||
})
|
})
|
||||||
if err := m.NewTransactionID(); err != nil {
|
if err := m.NewTransactionID(); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@@ -91,11 +90,11 @@ func TestAgent_Stop(t *testing.T) {
|
|||||||
t.Fatalf("unexpected error: %s, should be %s", err, ErrTransactionNotExists)
|
t.Fatalf("unexpected error: %s, should be %s", err, ErrTransactionNotExists)
|
||||||
}
|
}
|
||||||
id := NewTransactionID()
|
id := NewTransactionID()
|
||||||
called := make(chan AgentEvent, 1)
|
called := make(chan Event, 1)
|
||||||
timeout := time.Millisecond * 200
|
timeout := time.Millisecond * 200
|
||||||
if err := a.Start(id, time.Now().Add(timeout), func(e AgentEvent) {
|
if err := a.Start(id, time.Now().Add(timeout), HandlerFunc(func(e Event) {
|
||||||
called <- e
|
called <- e
|
||||||
}); err != nil {
|
})); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if err := a.Stop(id); err != nil {
|
if err := a.Stop(id); err != nil {
|
||||||
@@ -119,18 +118,18 @@ func TestAgent_Stop(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var noopHandler = func(e AgentEvent) {}
|
var noopHandler HandlerFunc = func(e Event) {}
|
||||||
|
|
||||||
func TestAgent_GC(t *testing.T) {
|
func TestAgent_GC(t *testing.T) {
|
||||||
a := NewAgent(AgentOptions{
|
a := NewAgent(AgentOptions{
|
||||||
Handler: noopHandler,
|
Handler: noopHandler,
|
||||||
})
|
})
|
||||||
shouldTimeOut := func(e AgentEvent) {
|
var shouldTimeOut HandlerFunc = func(e Event) {
|
||||||
if e.Error != ErrTransactionTimeOut {
|
if e.Error != ErrTransactionTimeOut {
|
||||||
t.Errorf("should time out, but got <%s>", e.Error)
|
t.Errorf("should time out, but got <%s>", e.Error)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
shouldNotTimeOut := func(e AgentEvent) {
|
var shouldNotTimeOut HandlerFunc = func(e Event) {
|
||||||
if e.Error == ErrTransactionTimeOut {
|
if e.Error == ErrTransactionTimeOut {
|
||||||
t.Error("should not time out")
|
t.Error("should not time out")
|
||||||
}
|
}
|
||||||
|
78
client.go
78
client.go
@@ -65,7 +65,7 @@ type Connection interface {
|
|||||||
type ClientAgent interface {
|
type ClientAgent interface {
|
||||||
Process(*Message) error
|
Process(*Message) error
|
||||||
Close() error
|
Close() error
|
||||||
Start(id [TransactionIDSize]byte, deadline time.Time, f AgentFn) error
|
Start(id [TransactionIDSize]byte, deadline time.Time, f Handler) error
|
||||||
Stop(id [TransactionIDSize]byte) error
|
Stop(id [TransactionIDSize]byte) error
|
||||||
Collect(time.Time) error
|
Collect(time.Time) error
|
||||||
}
|
}
|
||||||
@@ -184,21 +184,14 @@ func (c *Client) Indicate(m *Message) error {
|
|||||||
return c.Start(m, time.Time{}, nil)
|
return c.Start(m, time.Time{}, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
type clientCallState struct {
|
// callbackWaitHandler blocks on wait() call until callback is called.
|
||||||
callback func(event AgentEvent)
|
type callbackWaitHandler struct {
|
||||||
|
callback func(event Event)
|
||||||
cond *sync.Cond
|
cond *sync.Cond
|
||||||
processed bool
|
processed bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *clientCallState) wait() {
|
func (s *callbackWaitHandler) HandleEvent(e Event) {
|
||||||
s.cond.L.Lock()
|
|
||||||
for !s.processed {
|
|
||||||
s.cond.Wait()
|
|
||||||
}
|
|
||||||
s.cond.L.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *clientCallState) wrapper(e AgentEvent) {
|
|
||||||
if s.callback == nil {
|
if s.callback == nil {
|
||||||
panic("s.callback is nil")
|
panic("s.callback is nil")
|
||||||
}
|
}
|
||||||
@@ -209,66 +202,73 @@ func (s *clientCallState) wrapper(e AgentEvent) {
|
|||||||
s.cond.L.Unlock()
|
s.cond.L.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *clientCallState) setCallback(f func(event AgentEvent)) {
|
func (s *callbackWaitHandler) wait() {
|
||||||
|
s.cond.L.Lock()
|
||||||
|
for !s.processed {
|
||||||
|
s.cond.Wait()
|
||||||
|
}
|
||||||
|
s.cond.L.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *callbackWaitHandler) setCallback(f func(event Event)) {
|
||||||
if f == nil {
|
if f == nil {
|
||||||
panic("f is nil")
|
panic("f is nil")
|
||||||
}
|
}
|
||||||
s.callback = f
|
s.callback = f
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *clientCallState) reset() {
|
func (s *callbackWaitHandler) reset() {
|
||||||
s.processed = false
|
s.processed = false
|
||||||
s.callback = nil
|
s.callback = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func newClientCallState(f func(event AgentEvent)) *clientCallState {
|
var callbackWaitHandlerPool = sync.Pool{
|
||||||
return &clientCallState{
|
|
||||||
cond: sync.NewCond(new(sync.Mutex)),
|
|
||||||
callback: f,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var clientCallStatePool = sync.Pool{
|
|
||||||
New: func() interface{} {
|
New: func() interface{} {
|
||||||
return newClientCallState(nil)
|
return &callbackWaitHandler{
|
||||||
|
cond: sync.NewCond(new(sync.Mutex)),
|
||||||
|
}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
// Do is Start wrapper that waits until callback is called. If no callback
|
// Do is Start wrapper that waits until callback is called. If no callback
|
||||||
// provided, Indicate is called instead.
|
// provided, Indicate is called instead.
|
||||||
//
|
//
|
||||||
// Do has memory allocation overhead due to blocking, see BenchmarkClient_Do.
|
// Do has cpu overhead due to blocking, see BenchmarkClient_Do.
|
||||||
// Use Start for zero overhead.
|
// Use Start method for less overhead.
|
||||||
func (c *Client) Do(m *Message, d time.Time, f func(AgentEvent)) error {
|
func (c *Client) Do(m *Message, d time.Time, f func(Event)) error {
|
||||||
if f == nil {
|
if f == nil {
|
||||||
return c.Indicate(m)
|
return c.Indicate(m)
|
||||||
}
|
}
|
||||||
state := clientCallStatePool.Get().(*clientCallState)
|
h := callbackWaitHandlerPool.Get().(*callbackWaitHandler)
|
||||||
state.setCallback(f)
|
h.setCallback(f)
|
||||||
err := c.Start(m, d, state.wrapper)
|
defer func() {
|
||||||
state.wait()
|
h.reset()
|
||||||
state.reset()
|
callbackWaitHandlerPool.Put(h)
|
||||||
clientCallStatePool.Put(state)
|
}()
|
||||||
return err
|
if err := c.Start(m, d, h); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
h.wait()
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start starts transaction (if f set) and writes message to server, callback
|
// Start starts transaction (if f set) and writes message to server, handler
|
||||||
// is called asynchronously.
|
// is called asynchronously.
|
||||||
func (c *Client) Start(m *Message, d time.Time, f func(AgentEvent)) error {
|
func (c *Client) Start(m *Message, d time.Time, h Handler) error {
|
||||||
c.closedMux.RLock()
|
c.closedMux.RLock()
|
||||||
closed := c.closed
|
closed := c.closed
|
||||||
c.closedMux.RUnlock()
|
c.closedMux.RUnlock()
|
||||||
if closed {
|
if closed {
|
||||||
return ErrClientClosed
|
return ErrClientClosed
|
||||||
}
|
}
|
||||||
if f != nil {
|
if h != nil {
|
||||||
// Starting transaction only if f is set. Useful for indications.
|
// Starting transaction only if h is set. Useful for indications.
|
||||||
if err := c.a.Start(m.TransactionID, d, f); err != nil {
|
if err := c.a.Start(m.TransactionID, d, h); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_, err := m.WriteTo(c.c)
|
_, err := m.WriteTo(c.c)
|
||||||
if err != nil && f != nil {
|
if err != nil && h != nil {
|
||||||
// Stopping transaction instead of waiting until deadline.
|
// Stopping transaction instead of waiting until deadline.
|
||||||
if stopErr := c.a.Stop(m.TransactionID); stopErr != nil {
|
if stopErr := c.a.Stop(m.TransactionID); stopErr != nil {
|
||||||
return StopErr{
|
return StopErr{
|
||||||
|
@@ -8,7 +8,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type TestAgent struct {
|
type TestAgent struct {
|
||||||
f chan AgentFn
|
f chan Handler
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *TestAgent) Close() error {
|
func (n *TestAgent) Close() error {
|
||||||
@@ -20,7 +20,7 @@ func (TestAgent) Collect(time.Time) error { return nil }
|
|||||||
|
|
||||||
func (TestAgent) Process(m *Message) error { return nil }
|
func (TestAgent) Process(m *Message) error { return nil }
|
||||||
|
|
||||||
func (n *TestAgent) Start(id [TransactionIDSize]byte, deadline time.Time, f AgentFn) error {
|
func (n *TestAgent) Start(id [TransactionIDSize]byte, deadline time.Time, f Handler) error {
|
||||||
n.f <- f
|
n.f <- f
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -47,7 +47,7 @@ func (noopConnection) Close() error {
|
|||||||
func BenchmarkClient_Do(b *testing.B) {
|
func BenchmarkClient_Do(b *testing.B) {
|
||||||
b.ReportAllocs()
|
b.ReportAllocs()
|
||||||
agent := &TestAgent{
|
agent := &TestAgent{
|
||||||
f: make(chan AgentFn, 1000),
|
f: make(chan Handler, 1000),
|
||||||
}
|
}
|
||||||
client := NewClient(ClientOptions{
|
client := NewClient(ClientOptions{
|
||||||
Agent: agent,
|
Agent: agent,
|
||||||
@@ -55,17 +55,17 @@ func BenchmarkClient_Do(b *testing.B) {
|
|||||||
})
|
})
|
||||||
defer client.Close()
|
defer client.Close()
|
||||||
go func() {
|
go func() {
|
||||||
e := AgentEvent{
|
e := Event{
|
||||||
Error: nil,
|
Error: nil,
|
||||||
Message: nil,
|
Message: nil,
|
||||||
}
|
}
|
||||||
for f := range agent.f {
|
for f := range agent.f {
|
||||||
f(e)
|
f.HandleEvent(e)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
m := new(Message)
|
m := new(Message)
|
||||||
m.Encode()
|
m.Encode()
|
||||||
noopF := func(event AgentEvent) {
|
noopF := func(event Event) {
|
||||||
// pass
|
// pass
|
||||||
}
|
}
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
@@ -140,7 +140,7 @@ func TestClient_Do(t *testing.T) {
|
|||||||
m.TransactionID = response.TransactionID
|
m.TransactionID = response.TransactionID
|
||||||
m.Encode()
|
m.Encode()
|
||||||
d := time.Now().Add(time.Second)
|
d := time.Now().Add(time.Second)
|
||||||
if err := c.Do(m, d, func(event AgentEvent) {
|
if err := c.Do(m, d, func(event Event) {
|
||||||
if event.Error != nil {
|
if event.Error != nil {
|
||||||
t.Error(event.Error)
|
t.Error(event.Error)
|
||||||
}
|
}
|
||||||
|
@@ -25,7 +25,7 @@ func main() {
|
|||||||
log.Fatal("dial:", err)
|
log.Fatal("dial:", err)
|
||||||
}
|
}
|
||||||
deadline := time.Now().Add(time.Second * 5)
|
deadline := time.Now().Add(time.Second * 5)
|
||||||
if err := c.Do(stun.MustBuild(stun.TransactionID, stun.BindingRequest), deadline, func(res stun.AgentEvent) {
|
if err := c.Do(stun.MustBuild(stun.TransactionID, stun.BindingRequest), deadline, func(res stun.Event) {
|
||||||
if res.Error != nil {
|
if res.Error != nil {
|
||||||
log.Fatalln(err)
|
log.Fatalln(err)
|
||||||
}
|
}
|
||||||
|
@@ -44,7 +44,7 @@ func main() {
|
|||||||
}
|
}
|
||||||
timeout := time.Second
|
timeout := time.Second
|
||||||
deadline := time.Now().Add(timeout)
|
deadline := time.Now().Add(timeout)
|
||||||
if err := client.Do(request, deadline, func(event stun.AgentEvent) {
|
if err := client.Do(request, deadline, func(event stun.Event) {
|
||||||
if event.Error != nil {
|
if event.Error != nil {
|
||||||
log.Fatalln("got event with error:", event.Error)
|
log.Fatalln("got event with error:", event.Error)
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user