mirror of
https://github.com/MikeWang000000/Natter.git
synced 2025-10-05 09:16:52 +08:00
584 lines
23 KiB
Python
584 lines
23 KiB
Python
import threading
|
|
import socket
|
|
import struct
|
|
import codecs
|
|
import time
|
|
import sys
|
|
import os
|
|
|
|
|
|
# Fix OpenWRT Python codecs issues:
|
|
# Always fallback to ASCII when specified codec is not available.
|
|
try:
|
|
codecs.lookup('idna')
|
|
codecs.lookup('utf-8')
|
|
except LookupError:
|
|
def search_codec(_):
|
|
return codecs.CodecInfo(codecs.ascii_encode, codecs.ascii_decode, name='ascii')
|
|
codecs.register(search_codec)
|
|
|
|
|
|
class Logger(object):
|
|
DEBUG = 1
|
|
INFO = 2
|
|
WARNING = 3
|
|
ERROR = 4
|
|
|
|
def __init__(self, level = INFO):
|
|
self.level = level
|
|
|
|
def debug(self, msg):
|
|
if self.level <= Logger.DEBUG:
|
|
sys.stdout.write("[DEBUG] - " + str(msg) + "\n")
|
|
sys.stdout.flush()
|
|
|
|
def info(self, msg):
|
|
if self.level <= Logger.INFO:
|
|
sys.stdout.write("[INFO] - " + str(msg) + "\n")
|
|
sys.stdout.flush()
|
|
|
|
def warning(self, msg):
|
|
if self.level <= Logger.WARNING:
|
|
sys.stderr.write("[WARNING] - " + str(msg) + "\n")
|
|
sys.stderr.flush()
|
|
|
|
def error(self, msg):
|
|
if self.level <= Logger.ERROR:
|
|
sys.stderr.write("[ERROR] - " + str(msg) + "\n")
|
|
sys.stderr.flush()
|
|
|
|
|
|
class StunClient(object):
|
|
# Note: IPv4 Only.
|
|
# Reference:
|
|
# https://www.rfc-editor.org/rfc/rfc3489
|
|
# https://www.rfc-editor.org/rfc/rfc5389
|
|
# https://www.rfc-editor.org/rfc/rfc8489
|
|
|
|
# Servers in this list must be compatible with rfc5389 or rfc8489
|
|
stun_server_tcp = [
|
|
"fwa.lifesizecloud.com",
|
|
"stun.isp.net.au",
|
|
"stun.freeswitch.org",
|
|
"stun.voip.blackberry.com",
|
|
"stun.nextcloud.com",
|
|
"stun.stunprotocol.org",
|
|
"stun.sipnet.com",
|
|
"stun.radiojar.com",
|
|
"stun.sonetel.com",
|
|
"stun.voipgate.com"
|
|
]
|
|
# Servers in this list must be compatible with rfc3489, with "change IP" and "change port" functions available
|
|
stun_server_udp = [
|
|
"stun.miwifi.com",
|
|
"stun.qq.com"
|
|
]
|
|
MTU = 1500
|
|
STUN_PORT = 3478
|
|
MAGIC_COOKIE = 0x2112a442
|
|
BIND_REQUEST = 0x0001
|
|
BIND_RESPONSE = 0x0101
|
|
FAMILY_IPV4 = 0x01
|
|
FAMILY_IPV6 = 0x02
|
|
CHANGE_PORT = 0x0002
|
|
CHANGE_IP = 0x0004
|
|
ATTRIB_MAPPED_ADDRESS = 0x0001
|
|
ATTRIB_CHANGE_REQUEST = 0x0003
|
|
ATTRIB_XOR_MAPPED_ADDRESS = 0x0020
|
|
NAT_OPEN_INTERNET = 0
|
|
NAT_FULL_CONE = 1
|
|
NAT_RESTRICTED = 2
|
|
NAT_PORT_RESTRICTED = 3
|
|
NAT_SYMMETRIC = 4
|
|
NAT_SYM_UDP_FIREWALL = 5
|
|
|
|
def __init__(self, source_ip = "0.0.0.0", log_level = Logger.INFO):
|
|
self.logger = Logger(log_level)
|
|
self.source_ip = source_ip
|
|
self.stun_ip_tcp = []
|
|
self.stun_ip_udp = []
|
|
if not self.check_reuse_ability():
|
|
raise OSError("This OS or Python does not support reusing ports!")
|
|
self.logger.info("Getting STUN server IP...")
|
|
for hostname in self.stun_server_tcp:
|
|
self.stun_ip_tcp.extend(self.resolve_hostname(hostname))
|
|
for hostname in self.stun_server_udp:
|
|
self.stun_ip_udp.extend(self.resolve_hostname(hostname))
|
|
if not self.stun_ip_tcp or not self.stun_ip_udp:
|
|
raise Exception("No public STUN server is avaliable. Please check your Internet connection.")
|
|
|
|
def get_free_port(self, udp=False):
|
|
if udp:
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
else:
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
if 'SO_REUSEPORT' in dir(socket):
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
|
|
sock.bind(('', 0))
|
|
ret = sock.getsockname()[1]
|
|
sock.close()
|
|
return ret
|
|
|
|
def check_reuse_ability(self):
|
|
try:
|
|
test_port = self.get_free_port()
|
|
s1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
s1.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
if 'SO_REUSEPORT' in dir(socket):
|
|
s1.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
|
|
s1.bind(("0.0.0.0", test_port))
|
|
s1.listen(1)
|
|
s2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
s2.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
if 'SO_REUSEPORT' in dir(socket):
|
|
s2.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
|
|
s2.bind(("0.0.0.0", test_port))
|
|
s2.listen(1)
|
|
s1.close()
|
|
s2.close()
|
|
return True
|
|
except OSError as e:
|
|
self.logger.debug("%s: %s" % (e.__class__.__name__, e))
|
|
return False
|
|
|
|
def resolve_hostname(self, hostname):
|
|
self.logger.debug("Resolving hostname [%s]..." % hostname)
|
|
try:
|
|
host, alias, ip_addresses = socket.gethostbyname_ex(hostname)
|
|
return ip_addresses
|
|
except Exception as e:
|
|
self.logger.debug("%s: %s" % (e.__class__.__name__, e))
|
|
return []
|
|
|
|
def random_tran_id(self, use_magic_cookie = False):
|
|
if use_magic_cookie:
|
|
# Compatible with rfc3489, rfc5389 and rfc8489
|
|
return struct.pack("!L", self.MAGIC_COOKIE) + os.urandom(12)
|
|
else:
|
|
# Compatible with rfc3489
|
|
return os.urandom(16)
|
|
|
|
def pack_stun_message(self, msg_type, tran_id, payload = b""):
|
|
return struct.pack("!HH", msg_type, len(payload)) + tran_id + payload
|
|
|
|
def unpack_stun_message(self, data):
|
|
msg_type, msg_length = struct.unpack("!HH", data[:4])
|
|
tran_id = data[4:20]
|
|
payload = data[20:20 + msg_length]
|
|
return msg_type, tran_id, payload
|
|
|
|
def get_mapped_addr(self, payload):
|
|
while payload:
|
|
attrib_type, attrib_length = struct.unpack("!HH", payload[:4])
|
|
attrib_value = payload[4:4 + attrib_length]
|
|
payload = payload[4 + attrib_length:]
|
|
if attrib_type == self.ATTRIB_MAPPED_ADDRESS:
|
|
_, family, port = struct.unpack("!BBH", attrib_value[:4])
|
|
if family == self.FAMILY_IPV4:
|
|
ip = socket.inet_ntoa(attrib_value[4:8])
|
|
return ip, port
|
|
elif attrib_type == self.ATTRIB_XOR_MAPPED_ADDRESS:
|
|
# rfc5389 and rfc8489
|
|
_, family, xor_port = struct.unpack("!BBH", attrib_value[:4])
|
|
if family == self.FAMILY_IPV4:
|
|
xor_iip, = struct.unpack("!L", attrib_value[4:8])
|
|
ip = socket.inet_ntoa(struct.pack("!L", self.MAGIC_COOKIE ^ xor_iip))
|
|
port = (self.MAGIC_COOKIE >> 16) ^ xor_port
|
|
return ip, port
|
|
return None
|
|
|
|
def tcp_test(self, stun_host, source_port, timeout = 1):
|
|
# rfc5389 and rfc8489 only
|
|
self.logger.debug("Trying TCP STUN: %s" % stun_host)
|
|
tran_id = self.random_tran_id(use_magic_cookie = True)
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
try:
|
|
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
if 'SO_REUSEPORT' in dir(socket):
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
|
|
sock.settimeout(timeout)
|
|
sock.bind((self.source_ip, source_port))
|
|
sock.connect((stun_host, self.STUN_PORT))
|
|
data = self.pack_stun_message(self.BIND_REQUEST, tran_id)
|
|
sock.sendall(data)
|
|
buf = sock.recv(self.MTU)
|
|
msg_type, msg_id, payload = self.unpack_stun_message(buf)
|
|
if tran_id == msg_id and msg_type == self.BIND_RESPONSE:
|
|
source_addr = sock.getsockname()
|
|
mapped_addr = self.get_mapped_addr(payload)
|
|
ret = source_addr, mapped_addr
|
|
self.logger.debug("(TCP) %s says: %s" % (stun_host, mapped_addr))
|
|
else:
|
|
ret = None
|
|
sock.shutdown(socket.SHUT_RDWR)
|
|
sock.close()
|
|
except Exception as e:
|
|
self.logger.debug("%s: %s" % (e.__class__.__name__, e))
|
|
sock.close()
|
|
ret = None
|
|
return ret
|
|
|
|
def udp_test(self, stun_host, source_port, change_ip = False, change_port = False, timeout = 1, repeat = 3):
|
|
# Note:
|
|
# Assuming STUN is being multiplexed with other protocols,
|
|
# the packet must be inspected to check if it is a STUN packet.
|
|
self.logger.debug("Trying UDP STUN: %s (change ip:%d/port:%d)" % (stun_host, change_ip, change_port))
|
|
time_start = time.time()
|
|
tran_id = self.random_tran_id()
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
try:
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
if 'SO_REUSEPORT' in dir(socket):
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
|
|
sock.settimeout(timeout)
|
|
sock.bind((self.source_ip, source_port))
|
|
flags = 0
|
|
if change_ip:
|
|
flags |= self.CHANGE_IP
|
|
if change_port:
|
|
flags |= self.CHANGE_PORT
|
|
if flags:
|
|
payload = struct.pack("!HHL", self.ATTRIB_CHANGE_REQUEST, 0x4, flags)
|
|
data = self.pack_stun_message(self.BIND_REQUEST, tran_id, payload)
|
|
else:
|
|
data = self.pack_stun_message(self.BIND_REQUEST, tran_id)
|
|
# Send packets repeatedly to avoid packet loss.
|
|
for _ in range(repeat):
|
|
sock.sendto(data, (stun_host, self.STUN_PORT))
|
|
while True:
|
|
time_left = time_start + timeout - time.time()
|
|
if time_left <= 0:
|
|
raise socket.timeout("timed out")
|
|
sock.settimeout(time_left)
|
|
buf, recv_addr = sock.recvfrom(self.MTU)
|
|
recv_host, recv_port = recv_addr
|
|
# check STUN packet
|
|
if len(buf) < 20:
|
|
continue
|
|
msg_type, msg_id, payload = self.unpack_stun_message(buf)
|
|
if tran_id != msg_id or msg_type != self.BIND_RESPONSE:
|
|
continue
|
|
source_addr = sock.getsockname()
|
|
mapped_addr = self.get_mapped_addr(payload)
|
|
ip_changed = (recv_host != self.STUN_PORT)
|
|
port_changed = (recv_port != self.STUN_PORT)
|
|
self.logger.debug("(UDP) %s says: %s" % (recv_addr, mapped_addr))
|
|
return source_addr, mapped_addr, ip_changed, port_changed
|
|
except Exception as e:
|
|
self.logger.debug("%s: %s" % (e.__class__.__name__, e))
|
|
return None
|
|
finally:
|
|
sock.close()
|
|
|
|
def get_tcp_mapping(self, source_port):
|
|
server_ip = first = self.stun_ip_tcp[0]
|
|
while True:
|
|
ret = self.tcp_test(server_ip, source_port)
|
|
if ret is None:
|
|
# server unavailable, put it at the end of the list
|
|
self.stun_ip_tcp.append(self.stun_ip_tcp.pop(0))
|
|
server_ip = self.stun_ip_tcp[0]
|
|
if server_ip == first:
|
|
raise Exception("No public STUN server is avaliable. Please check your Internet connection.")
|
|
else:
|
|
source_addr, mapped_addr = ret
|
|
return source_addr, mapped_addr
|
|
|
|
def get_udp_mapping(self, source_port):
|
|
server_ip = first = self.stun_ip_udp[0]
|
|
while True:
|
|
ret = self.udp_test(server_ip, source_port)
|
|
if ret is None:
|
|
# server unavailable, put it at the end of the list
|
|
self.stun_ip_udp.append(self.stun_ip_udp.pop(0))
|
|
server_ip = self.stun_ip_udp[0]
|
|
if server_ip == first:
|
|
raise Exception("No public STUN server is avaliable. Please check your Internet connection.")
|
|
else:
|
|
source_addr, mapped_addr, ip_changed, port_changed = ret
|
|
return source_addr, mapped_addr
|
|
|
|
def check_nat_type(self, source_port = 0):
|
|
# Like classic STUN (rfc3489). Detect NAT behavior for UDP.
|
|
# Modified from rfc3489. Requires at least two STUN servers.
|
|
ret_test1_1 = None
|
|
ret_test1_2 = None
|
|
ret_test2 = None
|
|
ret_test3 = None
|
|
if source_port == 0:
|
|
source_port = self.get_free_port(udp=True)
|
|
|
|
for server_ip in self.stun_ip_udp:
|
|
ret = self.udp_test(server_ip, source_port, change_ip=False, change_port=False)
|
|
if ret is None:
|
|
self.logger.debug("No response. Trying another STUN server...")
|
|
continue
|
|
if ret_test1_1 is None:
|
|
ret_test1_1 = ret
|
|
continue
|
|
ret_test1_2 = ret
|
|
ret = self.udp_test(server_ip, source_port, change_ip=True, change_port=True)
|
|
if ret is not None:
|
|
source_addr, mapped_addr, ip_changed, port_changed = ret
|
|
if not ip_changed or not port_changed:
|
|
self.logger.debug("Trying another STUN server because current server do not have another available IP or port...")
|
|
continue
|
|
ret_test2 = ret
|
|
ret_test3 = self.udp_test(server_ip, source_port, change_ip=False, change_port=True)
|
|
break
|
|
else:
|
|
raise Exception("UDP Blocked or not enough STUN servers available.")
|
|
|
|
source_addr_1_1, mapped_addr_1_1, _, _ = ret_test1_1
|
|
source_addr_1_2, mapped_addr_1_2, _, _ = ret_test1_2
|
|
if mapped_addr_1_1 != mapped_addr_1_2:
|
|
return StunClient.NAT_SYMMETRIC
|
|
if source_addr_1_1 == mapped_addr_1_1:
|
|
if ret_test2 is not None:
|
|
return StunClient.NAT_OPEN_INTERNET
|
|
else:
|
|
return StunClient.NAT_SYM_UDP_FIREWALL
|
|
else:
|
|
if ret_test2 is not None:
|
|
return StunClient.NAT_FULL_CONE
|
|
else:
|
|
if ret_test3 is not None:
|
|
return StunClient.NAT_RESTRICTED
|
|
else:
|
|
return StunClient.NAT_PORT_RESTRICTED
|
|
|
|
def is_tcp_cone(self, source_port = 0):
|
|
# Detect NAT behavior for TCP. Requires at least three STUN servers for accuracy.
|
|
if source_port == 0:
|
|
source_port = self.get_free_port()
|
|
mapped_addr_first = None
|
|
count = 0
|
|
for server_ip in self.stun_ip_tcp:
|
|
if count >= 3:
|
|
return True
|
|
ret = self.tcp_test(server_ip, source_port)
|
|
if ret is not None:
|
|
source_addr, mapped_addr = ret
|
|
if mapped_addr_first is not None and mapped_addr != mapped_addr_first:
|
|
return False
|
|
mapped_addr_first = ret[1]
|
|
count += 1
|
|
raise Exception("Not enough STUN servers available.")
|
|
|
|
|
|
class HttpTestServer(object):
|
|
# HTTP Server for testing purpose
|
|
# On success, you can see the text "It works!".
|
|
|
|
def __init__(self, listen_addr):
|
|
self.running = False
|
|
self.listen_addr = listen_addr
|
|
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
if 'SO_REUSEPORT' in dir(socket):
|
|
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
|
|
|
|
def run(self):
|
|
self.running = True
|
|
self.sock.bind(self.listen_addr)
|
|
self.sock.listen(1)
|
|
while self.running:
|
|
try:
|
|
conn, addr = self.sock.accept()
|
|
except Exception:
|
|
return
|
|
try:
|
|
conn.recv(4096)
|
|
conn.sendall(b"HTTP/1.1 200 OK\r\n")
|
|
conn.sendall(b"Content-Type: text/html\r\n")
|
|
conn.sendall(b"\r\n")
|
|
conn.sendall(b"<h1>It works!</h1><hr/>Natter\r\n")
|
|
conn.shutdown(socket.SHUT_RDWR)
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
conn.close()
|
|
|
|
def start(self):
|
|
threading.Thread(target=self.run).start()
|
|
|
|
def stop(self):
|
|
self.running = False
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
sock.settimeout(0.1)
|
|
result = sock.connect_ex(self.listen_addr)
|
|
sock.close()
|
|
self.sock.close()
|
|
|
|
|
|
class Natter(object):
|
|
def __init__(self, source_ip, source_port, test_http = False,
|
|
keep_alive_host = "www.qq.com", keep_alive_interval = 10, retry_sec = 3, log_level = Logger.INFO):
|
|
self.logger = Logger(log_level)
|
|
self.source_ip = source_ip
|
|
self.source_port = source_port
|
|
self.test_http = test_http
|
|
self.keep_alive_host = keep_alive_host
|
|
self.keep_alive_interval = keep_alive_interval
|
|
self.retry_sec = retry_sec
|
|
self.stun_client = StunClient(source_ip, log_level = log_level)
|
|
self.keep_alive_sock = self._init_keep_alive_sock()
|
|
self.http_test_server = HttpTestServer((source_ip, source_port))
|
|
|
|
def _init_keep_alive_sock(self):
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
|
|
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
if 'SO_REUSEPORT' in dir(socket):
|
|
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
|
|
s.bind((self.source_ip, self.source_port))
|
|
s.connect((self.keep_alive_host, 80))
|
|
s.settimeout(self.keep_alive_interval)
|
|
return s
|
|
|
|
def _keep_alive(self):
|
|
s = self.keep_alive_sock
|
|
try:
|
|
s.sendall(b"GET /~ HTTP/1.1\r\n")
|
|
s.sendall(b"Host: %s\r\n" % self.keep_alive_host.encode())
|
|
s.sendall(b"Connection: keep-alive\r\n")
|
|
s.sendall(b"\r\n")
|
|
except Exception as e:
|
|
self.logger.debug("%s: %s" % (e.__class__.__name__, e))
|
|
return False
|
|
try:
|
|
while s.recv(4096):
|
|
self.logger.debug("[%s] Keep-Alive OK!" % time.asctime())
|
|
self.logger.debug("Server closed connection")
|
|
return False
|
|
except socket.timeout:
|
|
return True
|
|
except Exception as e:
|
|
self.logger.debug("%s: %s" % (e.__class__.__name__, e))
|
|
return False
|
|
|
|
def test_port_open(self, dst_addr, timeout = 3):
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
sock.settimeout(timeout)
|
|
result = sock.connect_ex(dst_addr)
|
|
sock.close()
|
|
return result == 0
|
|
|
|
def tcp_punch(self):
|
|
self.logger.info("Checking NAT Type for UDP...")
|
|
nat_type = self.stun_client.check_nat_type()
|
|
if nat_type == StunClient.NAT_OPEN_INTERNET:
|
|
nat_type_txt = "Open Internet"
|
|
elif nat_type == StunClient.NAT_SYM_UDP_FIREWALL:
|
|
nat_type_txt = "Symmetric UDP firewall"
|
|
elif nat_type == StunClient.NAT_FULL_CONE:
|
|
nat_type_txt = "Full cone (NAT 1)"
|
|
elif nat_type == StunClient.NAT_RESTRICTED:
|
|
nat_type_txt = "Restricted (NAT 2)"
|
|
elif nat_type == StunClient.NAT_PORT_RESTRICTED:
|
|
nat_type_txt = "Port restricted (NAT 3)"
|
|
elif nat_type == StunClient.NAT_SYMMETRIC:
|
|
nat_type_txt = "Symmetric (NAT 4)"
|
|
else:
|
|
nat_type_txt = "Unknown"
|
|
self.logger.info("NAT Type for UDP: [ %s ]" % nat_type_txt)
|
|
if nat_type == StunClient.NAT_OPEN_INTERNET:
|
|
self.logger.warning("It looks like you are not in a NAT network, so there is no need to use this tool.")
|
|
elif nat_type != StunClient.NAT_FULL_CONE:
|
|
self.logger.warning("The NAT type of your network is not full cone (NAT 1). TCP hole punching may fail.")
|
|
|
|
self.logger.info("Checking NAT Type for TCP...")
|
|
if self.stun_client.is_tcp_cone():
|
|
self.logger.info("NAT Type for TCP: [ Cone NAT ]")
|
|
else:
|
|
self.logger.info("NAT Type for TCP: [ Symmetric ]")
|
|
self.logger.error("You cannot perform TCP hole punching in a symmetric NAT network.")
|
|
return
|
|
|
|
self.logger.info("Start punching...")
|
|
self.http_test_server.start()
|
|
source_addr, mapped_addr = self.stun_client.get_tcp_mapping(self.source_port)
|
|
if not self.test_port_open(source_addr):
|
|
self.logger.error("Local address %s is not available. Check your firewall settings." % source_addr)
|
|
return
|
|
if self.test_port_open(mapped_addr):
|
|
self.logger.info(
|
|
"The TCP hole punching appears to be successful. "
|
|
"Please test this address from another network: %s" % str(mapped_addr)
|
|
)
|
|
print("\n================================\n %s\n================================\n"% str(mapped_addr))
|
|
if self.test_http:
|
|
print("HTTP test server is enabled. Please check [ http://%s:%d/ ]\n" % mapped_addr)
|
|
else:
|
|
self.logger.warning(
|
|
"TCP hole punching seems to fail. Maybe you are behind a firewall. "
|
|
"However, you may check this address from another network: %s" % str(mapped_addr)
|
|
)
|
|
if not self.test_http:
|
|
self.http_test_server.stop()
|
|
# Keep alive
|
|
self.logger.info("TCP keep-alive...")
|
|
while True:
|
|
ok = self._keep_alive()
|
|
if not ok:
|
|
self.keep_alive_sock.close()
|
|
time.sleep(self.retry_sec)
|
|
self.keep_alive_sock = self._init_keep_alive_sock()
|
|
self._keep_alive()
|
|
source_addr, mapped_addr = self.stun_client.get_tcp_mapping(self.source_port)
|
|
self.logger.info("Mapped address: %s" % str(mapped_addr))
|
|
|
|
def close(self):
|
|
try:
|
|
self.keep_alive_sock.shutdown(socket.SHUT_RDWR)
|
|
except Exception:
|
|
pass
|
|
self.keep_alive_sock.close()
|
|
self.http_test_server.stop()
|
|
|
|
|
|
def main():
|
|
try:
|
|
src_host = "0.0.0.0"
|
|
src_port = -1
|
|
verbose = False
|
|
test_http = False
|
|
l = []
|
|
for arg in sys.argv[1:]:
|
|
if arg[0] == "-":
|
|
if arg == "-v":
|
|
verbose = True
|
|
elif arg == "-t":
|
|
test_http = True
|
|
else:
|
|
raise ValueError
|
|
else:
|
|
l.append(arg)
|
|
if len(l) == 1:
|
|
src_port = int(l[0])
|
|
elif len(l) == 2:
|
|
src_host = l[0]
|
|
src_port = int(l[1])
|
|
else:
|
|
raise ValueError
|
|
except ValueError:
|
|
print("Usage: python natter.py [-v] [-t] [SRC_HOST] SRC_PORT\n")
|
|
return
|
|
|
|
if verbose:
|
|
log_level=Logger.DEBUG
|
|
else:
|
|
log_level=Logger.INFO
|
|
natter = Natter(src_host, src_port, test_http=test_http, log_level=log_level)
|
|
try:
|
|
natter.tcp_punch()
|
|
except KeyboardInterrupt:
|
|
print("\nExiting...\n")
|
|
natter.close()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|