mirror of
				https://github.com/521xueweihan/GitHub520.git
				synced 2025-10-31 11:46:22 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			160 lines
		
	
	
		
			4.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			160 lines
		
	
	
		
			4.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #!/usr/bin/env python
 | |
| # -*- coding:utf-8 -*-
 | |
| #   
 | |
| #   Author  :   XueWeiHan
 | |
| #   E-mail  :   595666367@qq.com
 | |
| #   Date    :   2020-05-19 15:27
 | |
| #   Desc    :   获取最新的 GitHub 相关域名对应 IP
 | |
| import re
 | |
| from typing import Any, Dict, List, Optional
 | |
| from datetime import datetime
 | |
| import sys
 | |
| import asyncio
 | |
| import aiodns
 | |
| 
 | |
| from pythonping import ping
 | |
| from requests_html import HTMLSession
 | |
| from retry import retry
 | |
| 
 | |
| from common import GITHUB_URLS, write_hosts_content
 | |
| 
 | |
| 
 | |
| PING_TIMEOUT_SEC: int = 1
 | |
| DISCARD_LIST: List[str] = ["1.0.1.1", "1.2.1.1", "127.0.0.1"]
 | |
| 
 | |
| 
 | |
| PING_LIST: Dict[str, int] = dict()
 | |
| 
 | |
| 
 | |
| def ping_cached(ip: str) -> int:
 | |
|     global PING_LIST
 | |
|     if ip in PING_LIST:
 | |
|         return PING_LIST[ip]
 | |
|     ping_times = [ping(ip, timeout=PING_TIMEOUT_SEC).rtt_avg_ms for _ in range(3)]
 | |
|     ping_times.sort()
 | |
|     print(f'Ping {ip}: {ping_times} ms')
 | |
|     PING_LIST[ip] = ping_times[1] # 取中位数
 | |
|     return PING_LIST[ip]
 | |
| 
 | |
| 
 | |
| def select_ip_from_list(ip_list: List[str]) -> Optional[str]:
 | |
|     if len(ip_list) == 0:
 | |
|         return None
 | |
|     ping_results = [(ip, ping_cached(ip)) for ip in ip_list]
 | |
|     ping_results.sort(key=lambda x: x[1])
 | |
|     best_ip = ping_results[0][0]
 | |
|     print(f"{ping_results}, selected {best_ip}")
 | |
|     return best_ip
 | |
| 
 | |
| 
 | |
| @retry(tries=3)
 | |
| def get_ip_list_from_ipaddress_com(session: Any, github_url: str) -> Optional[List[str]]:
 | |
|     url = f'https://sites.ipaddress.com/{github_url}'
 | |
|     headers = {
 | |
|         'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)'
 | |
|                       ' AppleWebKit/537.36 (KHTML, like Gecko) Chrome/1'
 | |
|                       '06.0.0.0 Safari/537.36'}
 | |
|     try:
 | |
|         rs = session.get(url, headers=headers, timeout=5)
 | |
|         pattern = r"\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b"
 | |
|         ip_list = re.findall(pattern, rs.html.text)
 | |
|         return ip_list
 | |
|     except Exception as ex:
 | |
|         print(f"get: {url}, error: {ex}")
 | |
|         raise Exception
 | |
| 
 | |
| 
 | |
| DNS_SERVER_LIST = [
 | |
|     "1.1.1.1",  # Cloudflare
 | |
|     "8.8.8.8",  # Google
 | |
|     "101.101.101.101",  # Quad101
 | |
|     "101.102.103.104",  # Quad101
 | |
| ]
 | |
| 
 | |
| 
 | |
| def windows_compatibility_check():
 | |
|     if sys.platform == "win32":
 | |
|         # 检查 pycares 是否正常加载
 | |
|         try:
 | |
|             import pycares
 | |
|         except ImportError:
 | |
|             raise RuntimeError("请先执行 'pip install pycares'")
 | |
| 
 | |
| 
 | |
| async def get_ip_list_from_dns(
 | |
|     domain,
 | |
|     record_type="A",
 | |
|     dns_server_list=["1.2.4.8", "114.114.114.114"],
 | |
| ):
 | |
|     # Windows 兼容性检查
 | |
|     windows_compatibility_check()
 | |
| 
 | |
|     # 配置 DNS 服务器
 | |
|     resolver = aiodns.DNSResolver()
 | |
|     resolver.nameservers = dns_server_list
 | |
| 
 | |
|     try:
 | |
|         # 执行异步查询
 | |
|         result = await resolver.query(domain, record_type)
 | |
|         return [answer.host for answer in result]
 | |
|     except aiodns.error.DNSError as e:
 | |
|         print(f"{domain}: DNS 查询失败: {e}")
 | |
|         return []
 | |
| 
 | |
| 
 | |
| async def get_ip(session: Any, github_url: str) -> Optional[str]:
 | |
|     ip_list_web = []
 | |
|     try:
 | |
|         ip_list_web = get_ip_list_from_ipaddress_com(session, github_url)
 | |
|     except Exception as ex:
 | |
|         pass
 | |
|     ip_list_dns = []
 | |
|     try:
 | |
|         ip_list_dns = await get_ip_list_from_dns(github_url, dns_server_list=DNS_SERVER_LIST)
 | |
|     except Exception as ex:
 | |
|         pass
 | |
|     ip_list_set = set(ip_list_web + ip_list_dns)
 | |
|     for discard_ip in DISCARD_LIST:
 | |
|         ip_list_set.discard(discard_ip)
 | |
|     ip_list = list(ip_list_set)
 | |
|     ip_list.sort()
 | |
|     if len(ip_list) == 0:
 | |
|         return None
 | |
|     print(f"{github_url}: {ip_list}")
 | |
|     best_ip = select_ip_from_list(ip_list)
 | |
|     return best_ip
 | |
| 
 | |
| 
 | |
| async def main() -> None:
 | |
|     current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
 | |
|     print(f'{current_time} - Start script.')
 | |
|     session = HTMLSession()
 | |
|     content = ""
 | |
|     content_list = []
 | |
|     for index, github_url in enumerate(GITHUB_URLS):
 | |
|         print(f'Start Processing url: {index + 1}/{len(GITHUB_URLS)}, {github_url}')
 | |
|         try:
 | |
|             ip = await get_ip(session, github_url)
 | |
|             if ip is None:
 | |
|                 print(f"{github_url}: IP Not Found")
 | |
|                 ip = "# IP Address Not Found"
 | |
|             content += ip.ljust(30) + github_url
 | |
|             global PING_LIST
 | |
|             if PING_LIST.get(ip) is not None and PING_LIST.get(ip) == PING_TIMEOUT_SEC * 1000:
 | |
|                 content += "  # Timeout"
 | |
|             content += "\n"
 | |
|             content_list.append((ip, github_url,))
 | |
|         except Exception:
 | |
|             continue
 | |
| 
 | |
|     write_hosts_content(content, content_list)
 | |
|     # print(hosts_content)
 | |
|     current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
 | |
|     print(f'{current_time} - End script.')
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     if sys.platform == "win32":
 | |
|         # Windows 事件循环策略配置
 | |
|         asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
 | |
|     asyncio.run(main()) | 
