feat: 添加自定义邮箱注册功能,支持手动输入验证码并优化相关逻辑;更新前端界面以支持新功能。

This commit is contained in:
real-jacket
2025-03-30 18:43:47 +08:00
parent 9dd112f468
commit bb725d6d32
7 changed files with 3141 additions and 1850 deletions

View File

@@ -7,6 +7,9 @@ def warn(message):
import time
import re
import requests
import os
import uuid
from datetime import datetime
from config import (
EMAIL_USERNAME,
EMAIL_DOMAIN,
@@ -17,13 +20,22 @@ from config import (
EMAIL_PROXY_ADDRESS,
EMAIL_PROXY_ENABLED,
EMAIL_API,
EMAIL_CODE_TYPE
EMAIL_CODE_TYPE,
EMAIL_DOMAINS
)
# 声明全局变量用于存储验证码请求
pending_verification_codes = {}
class EmailVerificationHandler:
def __init__(self, username=None, domain=None, pin=None, use_proxy=False):
def __init__(self, username=None, domain=None, pin=None, use_proxy=False, custom_email=None):
self.email = EMAIL_TYPE
self.custom_email = custom_email
# 如果提供了自定义邮箱,则解析它
if custom_email and '@' in custom_email:
username, domain = custom_email.split('@', 1)
self.username = username or EMAIL_USERNAME
self.domain = domain or EMAIL_DOMAIN
self.session = requests.Session()
@@ -51,7 +63,14 @@ class EmailVerificationHandler:
info(f"已启用代理: {EMAIL_PROXY_ADDRESS}")
def check(self):
mail_list_url = f"https://tempmail.plus/api/mails?email={self.username}%40{self.domain}&limit=20&epin={self.pin}"
# 如果有自定义邮箱,优先使用它
username = self.username
domain = self.domain
if self.custom_email and '@' in self.custom_email:
username, domain = self.custom_email.split('@', 1)
info(f"使用自定义邮箱进行连接检查: {username}@{domain}")
mail_list_url = f"https://tempmail.plus/api/mails?email={username}%40{domain}&limit=20&epin={self.pin}"
try:
# 增加超时时间并添加错误重试
for retry in range(3):
@@ -97,11 +116,22 @@ class EmailVerificationHandler:
Returns:
str: 验证码或None
"""
# 如果邮箱验证码获取方式为输入,则直接返回输入的验证码
# 记录当前的验证码获取模式和邮箱信息
info(f"验证码获取模式: {EMAIL_CODE_TYPE}, 使用邮箱: {source_email}")
# 首先检查是否是自定义邮箱(非系统配置的邮箱)
if source_email and '@' in source_email:
username, domain = source_email.split('@', 1)
if username != EMAIL_USERNAME or domain != EMAIL_DOMAIN:
info(f"检测到使用非系统配置邮箱: {source_email},强制使用手动输入模式")
return self.prompt_manual_code(source_email)
# 如果是INPUT模式始终直接进入手动输入
if EMAIL_CODE_TYPE == "INPUT":
info("EMAIL_CODE_TYPE设为INPUT跳过自动获取直接手动输入")
return self.prompt_manual_code()
return self.prompt_manual_code(source_email)
# 以下是自动获取验证码的逻辑只有在EMAIL_CODE_TYPE不是INPUT且不是自定义邮箱时才会执行
max_retries = max_retries or EMAIL_VERIFICATION_RETRIES
wait_time = wait_time or EMAIL_VERIFICATION_WAIT
info(f"开始获取邮箱验证码=>最大重试次数:{max_retries}, 等待时间:{wait_time}")
@@ -110,7 +140,7 @@ class EmailVerificationHandler:
if self.email not in ["tempemail", "zmail"]:
error(f"不支持的邮箱类型: {self.email},支持的类型为: tempemail, zmail")
warn("自动切换到手动输入模式")
return self.prompt_manual_code()
return self.prompt_manual_code(source_email)
for attempt in range(max_retries):
try:
@@ -142,20 +172,77 @@ class EmailVerificationHandler:
# 所有自动尝试都失败后,询问是否手动输入
response = input("自动获取验证码失败,是否手动输入? (y/n): ").lower()
if response == 'y':
return self.prompt_manual_code()
return self.prompt_manual_code(source_email)
return None
# 手动输入验证码
def prompt_manual_code(self):
"""手动输入验证码"""
info("自动获取验证码失败,开始手动输入验证码。")
code = input("输入6位数字验证码: ").strip()
return code
# 手动输入验证码 - 添加前端输入支持
def prompt_manual_code(self, source_email=None):
"""手动输入验证码,支持前端输入"""
email_display = source_email if source_email else f"{self.username}@{self.domain}"
info("=============手动输入验证码模式=============")
info(f"请查看邮箱 [{email_display}] 中的验证码")
# 检查是否有CURSOR_AUTO_REGISTER_WEB环境变量表示是否通过Web界面运行
web_mode = os.environ.get("CURSOR_AUTO_REGISTER_WEB", "").lower() == "true"
if web_mode:
info("检测到Web模式使用前端验证码输入")
return self.prompt_manual_code_web(email_display)
else:
info("命令行模式,使用控制台输入验证码")
info("通常验证码为6位数字在邮件正文中")
code = input("请输入收到的验证码: ").strip()
info(f"已输入验证码: {code}")
return code
# 前端验证码输入方法
def prompt_manual_code_web(self, source_email):
"""在Web界面请求验证码输入"""
# 生成唯一ID
email_id = str(uuid.uuid4())
# 存储到等待字典中
global pending_verification_codes
pending_verification_codes[email_id] = {
"email": source_email,
"status": "pending",
"created_at": datetime.now().isoformat(),
"code": None
}
info(f"已创建验证码请求 ID: {email_id},等待前端输入验证码")
# 循环等待验证码输入最多等待180秒
start_time = time.time()
while time.time() - start_time < 180:
# 检查是否已提交验证码
if email_id in pending_verification_codes and pending_verification_codes[email_id]["status"] == "submitted":
code = pending_verification_codes[email_id]["code"]
info(f"前端已提交验证码: {code}")
# 删除已使用的记录
pending_verification_codes.pop(email_id, None)
return code
# 等待1秒再检查
time.sleep(1)
# 超时,从等待字典中移除
pending_verification_codes.pop(email_id, None)
info("验证码输入超时")
return None
def get_tempmail_email_code(self, source_email=None):
info("开始获取邮件列表")
# 获取邮件列表
mail_list_url = f"https://tempmail.plus/api/mails?email={self.username}%40{self.domain}&limit=20&epin={self.pin}"
# 如果提供了source_email且当前不是使用custom_email则尝试解析它
if source_email and source_email != f"{self.username}@{self.domain}" and '@' in source_email:
username, domain = source_email.split('@', 1)
mail_list_url = f"https://tempmail.plus/api/mails?email={username}%40{domain}&limit=20&epin={self.pin}"
info(f"使用自定义邮箱获取验证码: {source_email}")
else:
mail_list_url = f"https://tempmail.plus/api/mails?email={self.username}%40{self.domain}&limit=20&epin={self.pin}"
info(f"使用配置邮箱获取验证码: {self.username}@{self.domain}")
try:
# 增加错误重试和超时时间
for retry in range(3):
@@ -188,8 +275,14 @@ class EmailVerificationHandler:
if not first_id:
return None, None
info(f"开始获取邮件详情: {first_id}")
# 获取具体邮件内容
mail_detail_url = f"https://tempmail.plus/api/mails/{first_id}?email={self.username}%40{self.domain}&epin={self.pin}"
# 使用相同的用户名和域名获取邮件详情
if source_email and source_email != f"{self.username}@{self.domain}" and '@' in source_email:
username, domain = source_email.split('@', 1)
mail_detail_url = f"https://tempmail.plus/api/mails/{first_id}?email={username}%40{domain}&epin={self.pin}"
else:
mail_detail_url = f"https://tempmail.plus/api/mails/{first_id}?email={self.username}%40{self.domain}&epin={self.pin}"
try:
mail_detail_response = self.session.get(
mail_detail_url, timeout=10
@@ -222,7 +315,7 @@ class EmailVerificationHandler:
if code_match:
# 清理邮件
self._cleanup_mail(first_id)
self._cleanup_mail(first_id, source_email)
return code_match.group(), first_id
return None, None
except requests.exceptions.Timeout:
@@ -235,11 +328,18 @@ class EmailVerificationHandler:
error(f"获取邮件列表发生错误: {str(e)}")
return None, None
def _cleanup_mail(self, first_id):
def _cleanup_mail(self, first_id, source_email=None):
# 如果提供了source_email优先使用它
username = self.username
domain = self.domain
if source_email and '@' in source_email:
username, domain = source_email.split('@', 1)
info(f"使用自定义邮箱清理邮件: {username}@{domain}")
# 构造删除请求的URL和数据
delete_url = "https://tempmail.plus/api/mails/"
payload = {
"email": f"{self.username}@{self.domain}",
"email": f"{username}@{domain}",
"first_id": first_id,
"epin": self.pin,
}
@@ -301,13 +401,24 @@ class EmailVerificationHandler:
def get_zmail_email_code(self, source_email=None):
info("开始获取邮件列表")
# 获取邮件列表
username = source_email.split("@")[0]
# 优先使用传入的source_email其次是自定义邮箱最后才是配置中的邮箱
email_to_use = source_email if source_email else (self.custom_email if self.custom_email else f"{self.username}@{self.domain}")
if '@' not in email_to_use:
error(f"邮箱格式错误: {email_to_use}")
return None, None
username = email_to_use.split("@")[0]
info(f"使用邮箱获取验证码: {email_to_use}, 用户名: {username}")
mail_list_url = f"{EMAIL_API}/api/mailboxes/{username}/emails"
proxy = {
if EMAIL_PROXY_ENABLED:
proxy = {
"http": f"{EMAIL_PROXY_ADDRESS}",
"https": f"{EMAIL_PROXY_ADDRESS}",
}
self.session.proxies.update(proxy)
self.session.proxies.update(proxy)
try:
mail_list_response = self.session.get(
mail_list_url, timeout=10000