mirror of
https://github.com/pion/ice.git
synced 2025-09-26 19:41:11 +08:00

Replace manual atomic operations with atomic.Bool type for better type safety and cleaner code. This modernizes the atomic usage pattern from atomic.LoadInt32/StoreInt32 to the newer Load/Store methods on atomic.Bool. - Update activeTCPConn.closed field type from int32 to atomic.Bool - Replace atomic.LoadInt32(&a.closed) with a.closed.Load() - Replace atomic.StoreInt32(&a.closed, 1) with a.closed.Store(true) All existing functionality preserved with improved type safety. Signed-off-by: Xiaobo Liu <cppcoffee@gmail.com> tweak Signed-off-by: Xiaobo Liu <cppcoffee@gmail.com>
178 lines
3.7 KiB
Go
178 lines
3.7 KiB
Go
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
|
|
// SPDX-License-Identifier: MIT
|
|
|
|
package ice
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
"net"
|
|
"net/netip"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/pion/logging"
|
|
"github.com/pion/transport/v3/packetio"
|
|
)
|
|
|
|
type activeTCPConn struct {
|
|
readBuffer, writeBuffer *packetio.Buffer
|
|
localAddr, remoteAddr atomic.Value
|
|
closed atomic.Bool
|
|
}
|
|
|
|
func newActiveTCPConn(
|
|
ctx context.Context,
|
|
localAddress string,
|
|
remoteAddress netip.AddrPort,
|
|
log logging.LeveledLogger,
|
|
) (a *activeTCPConn) {
|
|
a = &activeTCPConn{
|
|
readBuffer: packetio.NewBuffer(),
|
|
writeBuffer: packetio.NewBuffer(),
|
|
}
|
|
|
|
laddr, err := getTCPAddrOnInterface(localAddress)
|
|
if err != nil {
|
|
a.closed.Store(true)
|
|
log.Infof("Failed to dial TCP address %s: %v", remoteAddress, err)
|
|
|
|
return a
|
|
}
|
|
a.localAddr.Store(laddr)
|
|
|
|
go func() {
|
|
defer func() {
|
|
a.closed.Store(true)
|
|
}()
|
|
|
|
dialer := &net.Dialer{
|
|
LocalAddr: laddr,
|
|
}
|
|
conn, err := dialer.DialContext(ctx, "tcp", remoteAddress.String())
|
|
if err != nil {
|
|
log.Infof("Failed to dial TCP address %s: %v", remoteAddress, err)
|
|
|
|
return
|
|
}
|
|
a.remoteAddr.Store(conn.RemoteAddr())
|
|
|
|
go func() {
|
|
buff := make([]byte, receiveMTU)
|
|
|
|
for !a.closed.Load() {
|
|
n, err := readStreamingPacket(conn, buff)
|
|
if err != nil {
|
|
log.Infof("Failed to read streaming packet: %s", err)
|
|
|
|
break
|
|
}
|
|
|
|
if _, err := a.readBuffer.Write(buff[:n]); err != nil {
|
|
log.Infof("Failed to write to buffer: %s", err)
|
|
|
|
break
|
|
}
|
|
}
|
|
}()
|
|
|
|
buff := make([]byte, receiveMTU)
|
|
|
|
for !a.closed.Load() {
|
|
n, err := a.writeBuffer.Read(buff)
|
|
if err != nil {
|
|
log.Infof("Failed to read from buffer: %s", err)
|
|
|
|
break
|
|
}
|
|
|
|
if _, err = writeStreamingPacket(conn, buff[:n]); err != nil {
|
|
log.Infof("Failed to write streaming packet: %s", err)
|
|
|
|
break
|
|
}
|
|
}
|
|
|
|
if err := conn.Close(); err != nil {
|
|
log.Infof("Failed to close connection: %s", err)
|
|
}
|
|
}()
|
|
|
|
return a
|
|
}
|
|
|
|
func (a *activeTCPConn) ReadFrom(buff []byte) (n int, srcAddr net.Addr, err error) {
|
|
if a.closed.Load() {
|
|
return 0, nil, io.ErrClosedPipe
|
|
}
|
|
|
|
n, err = a.readBuffer.Read(buff)
|
|
// RemoteAddr is assuredly set *after* we can read from the buffer
|
|
srcAddr = a.RemoteAddr()
|
|
|
|
return
|
|
}
|
|
|
|
func (a *activeTCPConn) WriteTo(buff []byte, _ net.Addr) (n int, err error) {
|
|
if a.closed.Load() {
|
|
return 0, io.ErrClosedPipe
|
|
}
|
|
|
|
return a.writeBuffer.Write(buff)
|
|
}
|
|
|
|
func (a *activeTCPConn) Close() error {
|
|
a.closed.Store(true)
|
|
_ = a.readBuffer.Close()
|
|
_ = a.writeBuffer.Close()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (a *activeTCPConn) LocalAddr() net.Addr {
|
|
if v, ok := a.localAddr.Load().(*net.TCPAddr); ok {
|
|
return v
|
|
}
|
|
|
|
return &net.TCPAddr{}
|
|
}
|
|
|
|
// RemoteAddr returns the remote address of the connection which is only
|
|
// set once a background goroutine has successfully dialed. That means
|
|
// this may return ":0" for the address prior to that happening. If this
|
|
// becomes an issue, we can introduce a synchronization point between Dial
|
|
// and these methods.
|
|
func (a *activeTCPConn) RemoteAddr() net.Addr {
|
|
if v, ok := a.remoteAddr.Load().(*net.TCPAddr); ok {
|
|
return v
|
|
}
|
|
|
|
return &net.TCPAddr{}
|
|
}
|
|
|
|
func (a *activeTCPConn) SetDeadline(time.Time) error { return io.EOF }
|
|
func (a *activeTCPConn) SetReadDeadline(time.Time) error { return io.EOF }
|
|
func (a *activeTCPConn) SetWriteDeadline(time.Time) error { return io.EOF }
|
|
|
|
func getTCPAddrOnInterface(address string) (*net.TCPAddr, error) {
|
|
addr, err := net.ResolveTCPAddr("tcp", address)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
l, err := net.ListenTCP("tcp", addr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer func() {
|
|
_ = l.Close()
|
|
}()
|
|
|
|
tcpAddr, ok := l.Addr().(*net.TCPAddr)
|
|
if !ok {
|
|
return nil, errInvalidAddress
|
|
}
|
|
|
|
return tcpAddr, nil
|
|
}
|