Files
cursor-auto-register/get_email_code.py
2025-03-29 00:10:12 +08:00

302 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from logger import info, error
import time
import re
import requests
from config import (
EMAIL_USERNAME,
EMAIL_DOMAIN,
EMAIL_PIN,
EMAIL_VERIFICATION_RETRIES,
EMAIL_VERIFICATION_WAIT,
EMAIL_TYPE,
EMAIL_PROXY_ADDRESS,
EMAIL_PROXY_ENABLED,
EMAIL_API,
EMAIL_CODE_TYPE
)
class EmailVerificationHandler:
def __init__(self, username=None, domain=None, pin=None):
self.email = EMAIL_TYPE
self.username = username or EMAIL_USERNAME
self.domain = domain or EMAIL_DOMAIN
self.session = requests.Session()
self.emailApi = EMAIL_API
self.emailExtension = f"@{self.domain}"
self.pin = pin or EMAIL_PIN
if self.email == "tempemail":
info(
f"初始化邮箱验证器成功: {self.username}{self.emailExtension} pin: {self.pin}"
)
elif self.email == "zmail":
info(
f"初始化邮箱验证器成功: {self.emailApi}"
)
def check(self):
mail_list_url = f"https://tempmail.plus/api/mails?email={self.username}{self.emailExtension}&limit=20&epin={self.pin}"
try:
mail_list_response = self.session.get(mail_list_url, timeout=10) # 添加超时参数
mail_list_data = mail_list_response.json()
time.sleep(0.5)
if not mail_list_data.get("result"):
return True
except requests.exceptions.Timeout:
error("获取邮件列表超时")
except requests.exceptions.ConnectionError:
error("获取邮件列表连接错误")
except Exception as e:
error(f"获取邮件列表发生错误: {str(e)}")
return False
def get_verification_code(
self, source_email=None, max_retries=None, wait_time=None
):
"""
获取验证码,增加了重试机制
Args:
max_retries: 最大重试次数
wait_time: 每次重试间隔时间(秒)
Returns:
str: 验证码或None
"""
# 如果邮箱验证码获取方式为输入,则直接返回输入的验证码
if EMAIL_CODE_TYPE == "INPUT":
return self.prompt_manual_code()
max_retries = max_retries or EMAIL_VERIFICATION_RETRIES
wait_time = wait_time or EMAIL_VERIFICATION_WAIT
info(f"开始获取邮箱验证码=>最大重试次数:{max_retries}, 等待时间:{wait_time}")
for attempt in range(max_retries):
try:
info(f"当前EMail类型为 {EMAIL_TYPE}")
if self.email == "tempemail":
code, mail_id = self.get_tempmail_email_code(source_email)
elif self.email == "zmail":
code, mail_id = self.get_zmail_email_code(source_email)
if code:
info(f"成功获取验证码: {code}")
return code
if attempt < max_retries - 1:
info(
f"未找到验证码,{wait_time}秒后重试 ({attempt + 1}/{max_retries})..."
)
time.sleep(wait_time)
except Exception as e:
error(f"获取验证码失败: {str(e)}")
if attempt < max_retries - 1:
info(f"将在{wait_time}秒后重试...")
time.sleep(wait_time)
return None
# 手动输入验证码
def prompt_manual_code(self):
"""手动输入验证码"""
info("自动获取验证码失败,开始手动输入验证码。")
code = input("输入6位数字验证码: ").strip()
return code
def get_tempmail_email_code(self, source_email=None):
info("开始获取邮件列表")
# 获取邮件列表
mail_list_url = f"https://tempmail.plus/api/mails?email={self.username}{self.emailExtension}&limit=20&epin={self.pin}"
try:
mail_list_response = self.session.get(
mail_list_url, timeout=10
) # 添加超时参数
mail_list_data = mail_list_response.json()
time.sleep(0.5)
if not mail_list_data.get("result"):
return None, None
except requests.exceptions.Timeout:
error("获取邮件列表超时")
return None, None
except requests.exceptions.ConnectionError:
error("获取邮件列表连接错误")
return None, None
except Exception as e:
error(f"获取邮件列表发生错误: {str(e)}")
return None, None
# 获取最新邮件的ID
first_id = mail_list_data.get("first_id")
if not first_id:
return None, None
info(f"开始获取邮件详情: {first_id}")
# 获取具体邮件内容
mail_detail_url = f"https://tempmail.plus/api/mails/{first_id}?email={self.username}{self.emailExtension}&epin={self.pin}"
try:
mail_detail_response = self.session.get(
mail_detail_url, timeout=10
) # 添加超时参数
mail_detail_data = mail_detail_response.json()
time.sleep(0.5)
if not mail_detail_data.get("result"):
return None, None
except requests.exceptions.Timeout:
error("获取邮件详情超时")
return None, None
except requests.exceptions.ConnectionError:
error("获取邮件详情连接错误")
return None, None
except Exception as e:
error(f"获取邮件详情发生错误: {str(e)}")
return None, None
# 从邮件文本中提取6位数字验证码
mail_text = mail_detail_data.get("text", "")
# 如果提供了source_email确保邮件内容中包含该邮箱地址
if source_email and source_email.lower() not in mail_text.lower():
error(f"邮件内容不包含指定的邮箱地址: {source_email}")
else:
info(f"邮件内容包含指定的邮箱地址: {source_email}")
code_match = re.search(r"(?<![a-zA-Z@.])\b\d{6}\b", mail_text)
if code_match:
# 清理邮件
self._cleanup_mail(first_id)
return code_match.group(), first_id
return None, None
def _cleanup_mail(self, first_id):
# 构造删除请求的URL和数据
delete_url = "https://tempmail.plus/api/mails/"
payload = {
"email": f"{self.username}{self.emailExtension}",
"first_id": first_id,
"epin": self.pin,
}
# 最多尝试3次
for _ in range(3):
response = self.session.delete(delete_url, data=payload)
try:
result = response.json().get("result")
if result is True:
return True
except:
pass
# 如果失败,等待0.2秒后重试
time.sleep(0.2)
return False
# 如果是zmail 需要先创建邮箱
def create_zmail_email(account_info):
# 如果邮箱类型是zmail 需要先创建邮箱
session = requests.Session()
if EMAIL_PROXY_ENABLED:
proxy = {
"http": f"{EMAIL_PROXY_ADDRESS}",
"https": f"{EMAIL_PROXY_ADDRESS}",
}
session.proxies.update(proxy)
# 创建临时邮箱URL
create_url = f"{EMAIL_API}/api/mailboxes"
username = account_info["email"].split("@")[0]
# 生成临时邮箱地址
payload = {
"address": f"{username}",
"expiresInHours": 24,
}
# 发送POST请求创建临时邮箱
try:
create_response = session.post(
create_url, json=payload, timeout=100
) # 添加超时参数
info(f"创建临时邮箱成功: {create_response.status_code}")
create_data = create_response.json()
info(f"创建临时邮箱返回数据: {create_data}")
# 检查创建邮箱是否成功
time.sleep(0.5)
if create_data.get("success") is True or create_data.get('error') == '邮箱地址已存在':
info(f"邮箱创建成功: {create_data}")
else:
error(f"邮箱创建失败: {create_data}")
return None, None
except requests.exceptions.Timeout:
error("创建临时邮箱超时", create_url)
return None, None
info(f"创建临时邮箱成功: {create_data}, 返回值: {create_data}")
# 获取zmail邮箱验证码
def get_zmail_email_code(self, source_email=None):
info("开始获取邮件列表")
# 获取邮件列表
username = source_email.split("@")[0]
mail_list_url = f"{EMAIL_API}/api/mailboxes/{username}/emails"
proxy = {
"http": f"{EMAIL_PROXY_ADDRESS}",
"https": f"{EMAIL_PROXY_ADDRESS}",
}
self.session.proxies.update(proxy)
try:
mail_list_response = self.session.get(
mail_list_url, timeout=10000
) # 添加超时参数
mail_list_data = mail_list_response.json()
time.sleep(2)
if not mail_list_data.get("emails"):
return None, None
except requests.exceptions.Timeout:
error("获取邮件列表超时")
return None, None
except requests.exceptions.ConnectionError:
error("获取邮件列表连接错误")
return None, None
except Exception as e:
error(f"获取邮件列表发生错误: {str(e)}")
return None, None
# 获取最新邮件的ID、
mail_detail_data_len = len(mail_list_data["emails"])
if mail_detail_data_len == 0:
return None, None
mail_list_data = mail_list_data["emails"][0]
# 获取最新邮件的ID
mail_id = mail_list_data.get("id")
if not mail_id:
return None, None
# 获取具体邮件内容
mail_detail_url = f"{EMAIL_API}/api/emails/{mail_id}"
returnData = ''
try:
mail_detail_response = self.session.get(
mail_detail_url, timeout=10
) # 添加超时参数
returnData = mail_detail_response.json()
time.sleep(2)
except requests.exceptions.Timeout:
error("获取邮件详情超时")
return None, None
except requests.exceptions.ConnectionError:
error("获取邮件详情连接错误")
return None, None
except Exception as e:
error(f"获取邮件详情发生错误: {str(e)}")
return None, None
# 从邮件文本中提取6位数字验证码\
mail_text = returnData.get("email")
mail_text = mail_text.get("textContent")
# 如果提供了source_email确保邮件内容中包含该邮箱地址
if source_email and source_email.lower() not in mail_text.lower():
error(f"邮件内容不包含指定的邮箱地址: {source_email}")
else:
info(f"邮件内容包含指定的邮箱地址: {source_email}")
code_match = re.search(r"(?<![a-zA-Z@.])\b\d{6}\b", mail_text)
info(f"验证码匹配结果: {code_match}")
# 如果找到验证码, 返回验证码和邮件ID
if code_match:
return code_match.group(), mail_id
else:
error("未找到验证码")
return None, None