diff --git a/core/input.go b/core/input.go index 967a435..66a7f68 100755 --- a/core/input.go +++ b/core/input.go @@ -84,7 +84,7 @@ func peekNextProto(ipv ipver, p []byte) (proto, error) { } } -func Input(pkt []byte) (int, error) { +func input(pkt []byte) (int, error) { if len(pkt) == 0 { return 0, nil } diff --git a/core/lwip.go b/core/lwip.go index 0635e03..9ba76e4 100644 --- a/core/lwip.go +++ b/core/lwip.go @@ -8,6 +8,8 @@ package core */ import "C" import ( + "context" + "errors" "sync" "time" "unsafe" @@ -28,6 +30,9 @@ var lwipMutex = &sync.Mutex{} type lwipStack struct { tpcb *C.struct_tcp_pcb upcb *C.struct_udp_pcb + + ctx context.Context + cancel context.CancelFunc } // NewLWIPStack listens for any incoming connections/packets and registers @@ -70,6 +75,8 @@ func NewLWIPStack() LWIPStack { setUDPRecvCallback(udpPCB, nil) + ctx, cancel := context.WithCancel(context.Background()) + go func() { for { select { @@ -77,33 +84,72 @@ func NewLWIPStack() LWIPStack { lwipMutex.Lock() C.sys_check_timeouts() lwipMutex.Unlock() + case <-ctx.Done(): + return } } }() return &lwipStack{ - tpcb: tcpPCB, - upcb: udpPCB, + tpcb: tcpPCB, + upcb: udpPCB, + ctx: ctx, + cancel: cancel, } } +// Write writes IP packets to the stack. func (s *lwipStack) Write(data []byte) (int, error) { - return Input(data) + select { + case <-s.ctx.Done(): + return 0, errors.New("stack closed") + default: + return input(data) + } } +// RestartTimeouts rebases the timeout times to the current time. +// +// This is necessary if sys_check_timeouts() hasn't been called for a long +// time (e.g. while saving energy) to prevent all timer functions of that +// period being called. func (s *lwipStack) RestartTimeouts() { + lwipMutex.Lock() C.sys_restart_timeouts() + lwipMutex.Unlock() } +// Close closes the stack. +// +// Timer events will be canceled and existing connections will be closed. +// Note this function will not free objects allocated in lwIP initialization +// stage, e.g. the loop interface. func (s *lwipStack) Close() error { + // Stop firing timer events. + s.cancel() + + // Abort and close all TCP and UDP connections. tcpConns.Range(func(_, c interface{}) bool { c.(*tcpConn).Abort() return true }) udpConns.Range(func(_, c interface{}) bool { + // This only closes UDP connections in the core, + // UDP connections in the handler will wait till + // timeout, they are not closed immediately for + // now. c.(*udpConn).Close() return true }) + + // Remove callbacks and close listening pcbs. + lwipMutex.Lock() + C.tcp_accept(s.tpcb, nil) + C.udp_recv(s.upcb, nil, nil) + C.tcp_close(s.tpcb) // FIXME handle error + C.udp_remove(s.upcb) + lwipMutex.Unlock() + return nil } diff --git a/core/tcp_conn.go b/core/tcp_conn.go index 6b87298..de5f659 100755 --- a/core/tcp_conn.go +++ b/core/tcp_conn.go @@ -430,10 +430,12 @@ func (conn *tcpConn) abortInternal() { func (conn *tcpConn) Abort() { conn.Lock() - defer conn.Unlock() - conn.state = tcpAborting - conn.canWrite.Broadcast() + conn.Unlock() + + lwipMutex.Lock() + conn.checkState() + lwipMutex.Unlock() } func (conn *tcpConn) Err(err error) { diff --git a/core/udp_conn.go b/core/udp_conn.go index cd50d09..6b30284 100755 --- a/core/udp_conn.go +++ b/core/udp_conn.go @@ -46,7 +46,7 @@ func newUDPConn(pcb *C.struct_udp_pcb, handler UDPConnHandler, localIP C.ip_addr localIP: localIP, localPort: localPort, state: udpConnecting, - pending: make(chan *udpPacket, 1), // To hold the first packet on the connection + pending: make(chan *udpPacket, 64), // To hold the early packets on the connection } go func() { @@ -68,6 +68,7 @@ func newUDPConn(pcb *C.struct_udp_pcb, handler UDPConnHandler, localIP C.ip_addr } continue DrainPending default: + conn.pending = nil break DrainPending } }