style: ruff check --fix format

This commit is contained in:
ff
2025-03-27 15:47:11 +08:00
parent 0b61d96f2b
commit de091fea54
14 changed files with 373 additions and 274 deletions

193
api.py
View File

@@ -1,12 +1,11 @@
from fastapi import FastAPI, HTTPException, Depends, status, Body
from fastapi import FastAPI, HTTPException, status
from pydantic import BaseModel
from typing import Optional, List
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, delete, desc
from pathlib import Path
from database import get_session, AccountModel, init_db
from fastapi.middleware.cors import CORSMiddleware
from datetime import datetime, timedelta
from datetime import datetime
import uvicorn
import asyncio
import os
@@ -15,13 +14,18 @@ from fastapi.responses import JSONResponse, FileResponse
from cursor_pro_keep_alive import main as register_account
from browser_utils import BrowserManager
from logger import info, error
from tokenManager.oneapi_cursor_cleaner import handle_oneapi_cursor_channel
from tokenManager.oneapi_manager import OneAPIManager
from contextlib import asynccontextmanager
from tokenManager.cursor import Cursor # 添加这个导入
import concurrent.futures
from functools import lru_cache
from config import MAX_ACCOUNTS, REGISTRATION_INTERVAL, API_HOST, API_PORT, API_DEBUG, API_WORKERS
from config import (
MAX_ACCOUNTS,
REGISTRATION_INTERVAL,
API_HOST,
API_PORT,
API_DEBUG,
API_WORKERS,
)
from fastapi.staticfiles import StaticFiles
from dotenv import load_dotenv
@@ -43,6 +47,7 @@ static_path.mkdir(exist_ok=True) # 确保目录存在
# 全局任务存储
background_tasks = {"registration_task": None}
# 添加lifespan管理器在应用启动时初始化数据库
@asynccontextmanager
async def lifespan(app: FastAPI):
@@ -67,12 +72,6 @@ app = FastAPI(
# 挂载静态文件目录
app.mount("/static", StaticFiles(directory=str(static_path)), name="static")
# 使用startup事件初始化数据库
@app.on_event("startup")
async def startup_event():
await init_db()
info("数据库已初始化")
# 添加CORS中间件
app.add_middleware(
CORSMiddleware,
@@ -102,12 +101,18 @@ class AccountResponse(BaseModel):
data: Optional[Account] = None
message: str = ""
async def get_active_account_count() -> int:
"""获取当前账号总数"""
async with get_session() as session:
result = await session.execute(select(func.count()).select_from(AccountModel).where(AccountModel.status == "active"))
result = await session.execute(
select(func.count())
.select_from(AccountModel)
.where(AccountModel.status == "active")
)
return result.scalar()
async def get_account_count() -> int:
"""获取当前账号总数"""
async with get_session() as session:
@@ -127,18 +132,18 @@ async def run_registration():
try:
count = await get_active_account_count()
info(f"当前数据库已激活账号数: {count}")
if count >= MAX_ACCOUNTS:
# 修改:不再结束任务,而是进入监控模式
info(f"已达到最大账号数量 ({count}/{MAX_ACCOUNTS}),进入监控模式")
registration_status["last_status"] = "monitoring"
# 等待检测间隔时间
next_check = datetime.now().timestamp() + REGISTRATION_INTERVAL
registration_status["next_run"] = next_check
info(f"将在 {REGISTRATION_INTERVAL} 秒后重新检查账号数量")
await asyncio.sleep(REGISTRATION_INTERVAL)
# 跳过当前循环的剩余部分,继续下一次循环检查
continue
@@ -215,6 +220,7 @@ async def run_registration():
error(f"清理浏览器资源时出错: {str(e)}")
error(traceback.format_exc())
@app.get("/", tags=["UI"])
async def serve_index():
"""提供Web UI界面"""
@@ -344,24 +350,24 @@ async def health_check():
@app.get("/accounts", response_model=List[Account], tags=["Accounts"])
async def get_accounts(status: Optional[str] = None):
"""获取所有可用的账号和token
可选参数 status 用于过滤账号状态:
- active: 只返回正常账号
- disabled: 只返回停用账号
- deleted: 只返回已删除账号
- 不提供参数则返回所有账号
结果按id降序排列最新的账号排在前面
"""
try:
async with get_session() as session:
# 构建基本查询添加按id降序排序
query = select(AccountModel).order_by(desc(AccountModel.id))
# 根据状态过滤
if status:
query = query.where(AccountModel.status == status)
result = await session.execute(query)
accounts = result.scalars().all()
@@ -422,7 +428,7 @@ async def create_account(account: Account):
@app.delete("/account/{email}", response_model=AccountResponse, tags=["Accounts"])
async def delete_account(email: str, hard_delete: bool = False):
"""删除或停用指定邮箱的账号
如果 hard_delete=True则物理删除账号
否则仅将状态设置为'deleted'
"""
@@ -435,9 +441,7 @@ async def delete_account(email: str, hard_delete: bool = False):
account = result.scalar_one_or_none()
if not account:
return AccountResponse(
success=False, message=f"账号 {email} 不存在"
)
return AccountResponse(success=False, message=f"账号 {email} 不存在")
if hard_delete:
# 物理删除账号
@@ -449,12 +453,10 @@ async def delete_account(email: str, hard_delete: bool = False):
# 逻辑删除:将状态更新为'deleted'
account.status = "deleted"
delete_message = f"账号 {email} 已标记为删除状态"
await session.commit()
return AccountResponse(
success=True, message=delete_message
)
return AccountResponse(success=True, message=delete_message)
except Exception as e:
error(f"删除账号失败: {str(e)}")
error(traceback.format_exc())
@@ -468,10 +470,11 @@ async def delete_account(email: str, hard_delete: bool = False):
class StatusUpdate(BaseModel):
status: str
@app.put("/account/id/{id}/status", response_model=AccountResponse, tags=["Accounts"])
async def update_account_status(id: str, update: StatusUpdate):
"""更新账号状态
可选状态: active (正常), disabled (停用), deleted (已删除)
"""
# 使用update.status替代原先的status参数
@@ -480,10 +483,10 @@ async def update_account_status(id: str, update: StatusUpdate):
valid_statuses = ["active", "disabled", "deleted"]
if update.status not in valid_statuses:
return AccountResponse(
success=False,
message=f"无效的状态值。允许的值: {', '.join(valid_statuses)}"
success=False,
message=f"无效的状态值。允许的值: {', '.join(valid_statuses)}",
)
async with get_session() as session:
# 通过邮箱查询账号
result = await session.execute(
@@ -501,7 +504,8 @@ async def update_account_status(id: str, update: StatusUpdate):
await session.commit()
return AccountResponse(
success=True, message=f"账号 {account.email} 状态已更新为 '{update.status}'"
success=True,
message=f"账号 {account.email} 状态已更新为 '{update.status}'",
)
except Exception as e:
error(f"通过邮箱更新账号状态失败: {str(e)}")
@@ -520,20 +524,25 @@ async def start_registration():
try:
# 检查是否已达到最大账号数
count = await get_active_account_count()
# 检查任务是否已在运行
if (
background_tasks["registration_task"]
and not background_tasks["registration_task"].done()
):
# 确定当前状态
current_status = "monitoring" if registration_status["last_status"] == "monitoring" else "running"
current_status = (
"monitoring"
if registration_status["last_status"] == "monitoring"
else "running"
)
status_message = (
f"注册任务已在运行中 (状态: {current_status})" if current_status == "running"
f"注册任务已在运行中 (状态: {current_status})"
if current_status == "running"
else f"已达到最大账号数量({count}/{MAX_ACCOUNTS}),正在监控账号状态,当账号数量减少时将自动继续注册"
)
info(f"注册请求被忽略 - 任务已在{current_status}状态")
return {
"success": True,
@@ -677,9 +686,12 @@ async def get_registration_status():
try:
count = await get_account_count()
active_count = await get_active_account_count() # 添加获取活跃账号数
# 更新任务状态逻辑
if background_tasks["registration_task"] and not background_tasks["registration_task"].done():
if (
background_tasks["registration_task"]
and not background_tasks["registration_task"].done()
):
if registration_status["last_status"] == "monitoring":
task_status = "monitoring" # 新增监控状态
else:
@@ -714,13 +726,19 @@ async def get_registration_status():
# 添加状态解释信息
if task_status == "monitoring":
status_info["status_message"] = f"已达到最大账号数量({active_count}/{MAX_ACCOUNTS}),正在监控账号状态,当账号数量减少时将自动继续注册"
status_info["status_message"] = (
f"已达到最大账号数量({active_count}/{MAX_ACCOUNTS}),正在监控账号状态,当账号数量减少时将自动继续注册"
)
elif task_status == "running":
status_info["status_message"] = f"正在执行注册流程,当前账号数:{active_count}/{MAX_ACCOUNTS}"
status_info["status_message"] = (
f"正在执行注册流程,当前账号数:{active_count}/{MAX_ACCOUNTS}"
)
else:
status_info["status_message"] = "注册任务未运行"
info(f"请求注册状态 (当前账号数: {count}, 活跃账号数: {active_count}, 状态: {task_status})")
info(
f"请求注册状态 (当前账号数: {count}, 活跃账号数: {active_count}, 状态: {task_status})"
)
return status_info
except Exception as e:
@@ -844,11 +862,11 @@ async def get_account_usage(email: str):
remaining_days = Cursor.get_trial_remaining_days(
account.user, account.token
)
# 计算总额度和已使用额度
total_limit = 150 # 默认总额度
used_limit = 0
if remaining_balance is not None:
used_limit = total_limit - remaining_balance
if remaining_days is not None and remaining_days == 0:
@@ -884,16 +902,14 @@ async def get_account_usage(email: str):
except Exception as e:
error(f"查询账号使用量失败: {str(e)}")
error(traceback.format_exc())
return {
"success": False,
"message": f"Failed to get account usage: {str(e)}"
}
return {"success": False, "message": f"Failed to get account usage: {str(e)}"}
# 添加通过ID删除账号的API
@app.delete("/account/id/{id}", response_model=AccountResponse, tags=["Accounts"])
async def delete_account_by_id(id: int, hard_delete: bool = False):
"""通过ID删除或停用账号
如果 hard_delete=True则物理删除账号
否则仅将状态设置为'deleted'
"""
@@ -906,28 +922,22 @@ async def delete_account_by_id(id: int, hard_delete: bool = False):
account = result.scalar_one_or_none()
if not account:
return AccountResponse(
success=False, message=f"ID为 {id} 的账号不存在"
)
return AccountResponse(success=False, message=f"ID为 {id} 的账号不存在")
email = account.email # 保存邮箱以在响应中显示
if hard_delete:
# 物理删除账号
await session.execute(
delete(AccountModel).where(AccountModel.id == id)
)
await session.execute(delete(AccountModel).where(AccountModel.id == id))
delete_message = f"账号 {email} (ID: {id}) 已永久删除"
else:
# 逻辑删除:将状态更新为'deleted'
account.status = "deleted"
delete_message = f"账号 {email} (ID: {id}) 已标记为删除状态"
await session.commit()
return AccountResponse(
success=True, message=delete_message
)
return AccountResponse(success=True, message=delete_message)
except Exception as e:
error(f"通过ID删除账号失败: {str(e)}")
error(traceback.format_exc())
@@ -936,6 +946,7 @@ async def delete_account_by_id(id: int, hard_delete: bool = False):
detail=f"删除账号失败: {str(e)}",
)
# 添加"使用Token"功能
@app.post("/account/use-token/{id}", tags=["Accounts"])
async def use_account_token(id: int):
@@ -950,28 +961,41 @@ async def use_account_token(id: int):
if not account:
return {"success": False, "message": f"ID为 {id} 的账号不存在"}
# 调用CursorAuthManager更新认证
from cursor_auth_manager import CursorAuthManager
auth_manager = CursorAuthManager()
success = auth_manager.update_auth(email=account.email, access_token=account.token, refresh_token=account.token)
success = auth_manager.update_auth(
email=account.email,
access_token=account.token,
refresh_token=account.token,
)
# 重置Cursor的机器ID
from cursor_shadow_patcher import CursorShadowPatcher
resetter = CursorShadowPatcher()
patch_success = resetter.reset_machine_ids()
if success and patch_success:
return {"success": True, "message": f"成功使用账号 {account.email} 的Token并重置了机器ID"}
return {
"success": True,
"message": f"成功使用账号 {account.email} 的Token并重置了机器ID",
}
elif success:
return {"success": True, "message": f"成功使用账号 {account.email} 的Token但机器ID重置失败"}
return {
"success": True,
"message": f"成功使用账号 {account.email} 的Token但机器ID重置失败",
}
else:
return {"success": False, "message": "Token更新失败"}
except Exception as e:
error(f"使用账号Token失败: {str(e)}")
error(traceback.format_exc())
return {"success": False, "message": f"使用Token失败: {str(e)}"}
# 添加配置相关模型
class ConfigModel(BaseModel):
BROWSER_HEADLESS: bool
@@ -983,6 +1007,7 @@ class ConfigModel(BaseModel):
BROWSER_PATH: Optional[str] = None
CURSOR_PATH: Optional[str] = None
# 获取配置端点
@app.get("/config", tags=["Config"])
async def get_config():
@@ -990,7 +1015,7 @@ async def get_config():
try:
# 重新加载配置以确保获取最新值
load_dotenv()
config = {
"BROWSER_HEADLESS": os.getenv("BROWSER_HEADLESS", "True").lower() == "true",
"BROWSER_USER_AGENT": os.getenv("BROWSER_USER_AGENT", ""),
@@ -999,15 +1024,16 @@ async def get_config():
"EMAIL_USERNAME": os.getenv("EMAIL_USERNAME", ""),
"EMAIL_PIN": os.getenv("EMAIL_PIN", ""),
"BROWSER_PATH": os.getenv("BROWSER_PATH", ""),
"CURSOR_PATH": os.getenv("CURSOR_PATH", "")
"CURSOR_PATH": os.getenv("CURSOR_PATH", ""),
}
return {"success": True, "data": config}
except Exception as e:
error(f"获取配置失败: {str(e)}")
error(traceback.format_exc())
return {"success": False, "message": f"获取配置失败: {str(e)}"}
# 更新配置端点
@app.put("/config", tags=["Config"])
async def update_config(config: ConfigModel):
@@ -1015,13 +1041,13 @@ async def update_config(config: ConfigModel):
try:
# 获取.env文件路径
env_path = Path(__file__).parent / ".env"
# 读取当前.env文件内容
current_lines = []
if env_path.exists():
with open(env_path, "r", encoding="utf-8") as f:
current_lines = f.readlines()
# 构建配置字典
config_dict = {
"BROWSER_HEADLESS": str(config.BROWSER_HEADLESS),
@@ -1029,52 +1055,53 @@ async def update_config(config: ConfigModel):
"MAX_ACCOUNTS": str(config.MAX_ACCOUNTS),
"EMAIL_DOMAINS": config.EMAIL_DOMAINS,
"EMAIL_USERNAME": config.EMAIL_USERNAME,
"EMAIL_PIN": config.EMAIL_PIN
"EMAIL_PIN": config.EMAIL_PIN,
}
# 添加可选配置(如果提供)
if config.BROWSER_PATH:
config_dict["BROWSER_PATH"] = config.BROWSER_PATH
if config.CURSOR_PATH:
config_dict["CURSOR_PATH"] = config.CURSOR_PATH
# 处理现有行或创建新行
updated_lines = []
updated_keys = set()
for line in current_lines:
line = line.strip()
if not line or line.startswith("#"):
updated_lines.append(line)
continue
key, value = line.split("=", 1) if "=" in line else (line, "")
key = key.strip()
if key in config_dict:
updated_lines.append(f"{key}={config_dict[key]}")
updated_keys.add(key)
else:
updated_lines.append(line)
# 添加未更新的配置项
for key, value in config_dict.items():
if key not in updated_keys and value:
updated_lines.append(f"{key}={value}")
# 写入更新后的配置
with open(env_path, "w", encoding="utf-8") as f:
f.write("\n".join(updated_lines))
# 重新加载环境变量
load_dotenv(override=True)
return {"success": True, "message": "配置已更新"}
except Exception as e:
error(f"更新配置失败: {str(e)}")
error(traceback.format_exc())
return {"success": False, "message": f"更新配置失败: {str(e)}"}
if __name__ == "__main__":
uvicorn.run(
"api:app",

View File

@@ -2,7 +2,7 @@ from DrissionPage import ChromiumOptions, Chromium
import sys
import os
from dotenv import load_dotenv
from logger import info, warning
from logger import info
load_dotenv()
@@ -20,7 +20,7 @@ class BrowserManager:
def _get_browser_options(self):
"""获取浏览器配置"""
co = ChromiumOptions()
# 设置浏览器路径 - 添加这段代码
browser_path = os.getenv("BROWSER_PATH")
if not browser_path and sys.platform == "win32":
@@ -29,20 +29,20 @@ class BrowserManager:
r"C:\Program Files\Google\Chrome\Application\chrome.exe",
r"C:\Program Files (x86)\Google\Chrome\Application\chrome.exe",
r"C:\Program Files\Microsoft\Edge\Application\msedge.exe",
r"C:\Program Files (x86)\Microsoft\Edge\Application\msedge.exe"
r"C:\Program Files (x86)\Microsoft\Edge\Application\msedge.exe",
]
for path in possible_paths:
if os.path.exists(path):
browser_path = path
info(f"自动找到浏览器路径: {browser_path}")
break
if not browser_path:
info("未找到浏览器路径,请在.env文件中设置BROWSER_PATH环境变量")
if browser_path:
co.set_browser_path(browser_path)
try:
extension_path = self._get_extension_path()
co.add_extension(extension_path)
@@ -118,4 +118,4 @@ class BrowserManager:
try:
self.browser.quit()
except:
pass
pass

View File

@@ -1,6 +1,5 @@
import os
from dotenv import load_dotenv
import logging
# 加载.env文件中的环境变量
load_dotenv()
@@ -57,7 +56,9 @@ EMAIL_DOMAIN = os.getenv("EMAIL_DOMAIN", "mailto.plus")
# 临时邮箱PIN码如果需要
EMAIL_PIN = os.getenv("EMAIL_PIN", "")
# 可用于注册的邮箱域名列表(逗号分隔)
EMAIL_DOMAINS = [domain.strip() for domain in os.getenv("EMAIL_DOMAINS", "ddcat.store").split(",")]
EMAIL_DOMAINS = [
domain.strip() for domain in os.getenv("EMAIL_DOMAINS", "ddcat.store").split(",")
]
# 邮件验证码获取最大重试次数
EMAIL_VERIFICATION_RETRIES = int(os.getenv("EMAIL_VERIFICATION_RETRIES", 5))
# 邮件验证码获取重试间隔(秒)
@@ -65,7 +66,7 @@ EMAIL_VERIFICATION_WAIT = int(os.getenv("EMAIL_VERIFICATION_WAIT", 5))
# ===== 数据库配置 =====
# 数据库连接URL默认使用SQLite
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite+aiosqlite:////app/accounts.db")
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite+aiosqlite:////app/accounts.db")
# ===== Cursor main.js 配置 =====
# Cursor 主文件路径

View File

@@ -15,14 +15,16 @@ class CursorAuthManager:
self.db_path = os.path.join(
appdata, "Cursor", "User", "globalStorage", "state.vscdb"
)
elif sys.platform == "darwin": # macOS
self.db_path = os.path.abspath(os.path.expanduser(
"~/Library/Application Support/Cursor/User/globalStorage/state.vscdb"
))
elif sys.platform == "linux" : # Linux 和其他类Unix系统
self.db_path = os.path.abspath(os.path.expanduser(
"~/.config/Cursor/User/globalStorage/state.vscdb"
))
elif sys.platform == "darwin": # macOS
self.db_path = os.path.abspath(
os.path.expanduser(
"~/Library/Application Support/Cursor/User/globalStorage/state.vscdb"
)
)
elif sys.platform == "linux": # Linux 和其他类Unix系统
self.db_path = os.path.abspath(
os.path.expanduser("~/.config/Cursor/User/globalStorage/state.vscdb")
)
else:
raise NotImplementedError(f"不支持的操作系统: {sys.platform}")
@@ -55,10 +57,9 @@ class CursorAuthManager:
cursor = conn.cursor()
for key, value in updates:
# 如果没有更新任何行,说明key不存在,执行插入
# 检查 accessToken 是否存在
check_query = f"SELECT COUNT(*) FROM itemTable WHERE key = ?"
check_query = "SELECT COUNT(*) FROM itemTable WHERE key = ?"
cursor.execute(check_query, (key,))
if cursor.fetchone()[0] == 0:
insert_query = "INSERT INTO itemTable (key, value) VALUES (?, ?)"
@@ -83,4 +84,4 @@ class CursorAuthManager:
return False
finally:
if conn:
conn.close()
conn.close()

View File

@@ -1,13 +1,15 @@
import os
import sys
import psutil
import time
import random
from logger import info, warning, error
from logger import info, warning
import traceback
from config import (
LOGIN_URL, SIGN_UP_URL, SETTINGS_URL,
EMAIL_DOMAINS, REGISTRATION_MAX_RETRIES
LOGIN_URL,
SIGN_UP_URL,
SETTINGS_URL,
EMAIL_DOMAINS,
REGISTRATION_MAX_RETRIES,
)
@@ -61,7 +63,7 @@ def handle_turnstile(tab):
return True
except:
pass
# 如果页面已准备好且没有验证需要处理,则可以返回
if page_ready:
info("页面已准备好,没有检测到需要处理的验证")
@@ -115,7 +117,9 @@ def get_cursor_session_token(tab, max_attempts=5, retry_interval=3):
attempts += 1
if attempts < max_attempts:
warning(f"未找到Cursor会话Token重试中... ({attempts}/{max_attempts})")
warning(
f"未找到Cursor会话Token重试中... ({attempts}/{max_attempts})"
)
time.sleep(retry_interval)
else:
info("未找到Cursor会话Token已达到最大尝试次数")
@@ -124,7 +128,9 @@ def get_cursor_session_token(tab, max_attempts=5, retry_interval=3):
info(f"获取Token出错: {str(e)}")
attempts += 1
if attempts < max_attempts:
info(f"重试获取Token等待时间: {retry_interval}秒,尝试次数: {attempts}/{max_attempts}")
info(
f"重试获取Token等待时间: {retry_interval}秒,尝试次数: {attempts}/{max_attempts}"
)
time.sleep(retry_interval)
return False
@@ -133,6 +139,7 @@ def get_cursor_session_token(tab, max_attempts=5, retry_interval=3):
warning(f"获取Token过程出错: {str(e)}")
return False
def sign_up_account(browser, tab, account_info):
info("=============开始注册账号=============")
info(
@@ -206,7 +213,9 @@ def sign_up_account(browser, tab, account_info):
if tab.ele("@data-index=0"):
info("等待输入验证码...")
# 切换到邮箱标签页
code = email_handler.get_verification_code(source_email=account_info["email"])
code = email_handler.get_verification_code(
source_email=account_info["email"]
)
if code is None:
info("未获取到验证码...系统异常,正在退出....")
return "EMAIL_GET_CODE_FAILED"
@@ -218,7 +227,7 @@ def sign_up_account(browser, tab, account_info):
i += 1
info("验证码输入完成")
time.sleep(random.uniform(3, 5))
# 在验证码输入完成后检测是否出现了Turnstile验证
info("检查是否出现了Turnstile验证...")
try:
@@ -228,7 +237,7 @@ def sign_up_account(browser, tab, account_info):
handle_turnstile(tab)
except:
info("未检测到Turnstile验证继续下一步")
break
except Exception as e:
info(f"验证码处理失败: {str(e)}")
@@ -248,7 +257,7 @@ class EmailGenerator:
# 将密码生成移到这里,避免类定义时执行随机密码生成
self.default_first_name = self.generate_random_name()
self.default_last_name = self.generate_random_name()
# 从配置文件获取域名配置
self.domains = EMAIL_DOMAINS
info(f"当前可用域名: {self.domains}")
@@ -262,18 +271,20 @@ class EmailGenerator:
upper_chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
digits = "0123456789"
special = "!@#$%^&*"
# 确保密码包含至少一个大写字母、一个数字和一个特殊字符
password = [
random.choice(chars),
random.choice(upper_chars),
random.choice(digits),
random.choice(special)
random.choice(special),
]
# 添加剩余随机字符
password.extend(random.choices(chars + upper_chars + digits + special, k=length-4))
password.extend(
random.choices(chars + upper_chars + digits + special, k=length - 4)
)
# 打乱密码顺序
random.shuffle(password)
return "".join(password)
@@ -285,10 +296,12 @@ class EmailGenerator:
random.choices("abcdefghijklmnopqrstuvwxyz", k=length - 1)
)
return first_letter + rest_letters
def generate_email(self, length=8):
"""生成随机邮箱地址,使用随机域名"""
random_str = "".join(random.choices("abcdefghijklmnopqrstuvwxyz1234567890", k=length))
random_str = "".join(
random.choices("abcdefghijklmnopqrstuvwxyz1234567890", k=length)
)
timestamp = str(int(time.time()))[-4:] # 使用时间戳后4位
# 随机选择一个域名
domain = random.choice(self.domains)
@@ -344,7 +357,7 @@ class EmailGenerator:
usage_limit=str(total_usage),
created_at=datetime.now().strftime("%Y-%m-%d %H:%M"),
status="active", # 设置默认状态为活跃
id=timestamp_ms # 设置毫秒时间戳id
id=timestamp_ms, # 设置毫秒时间戳id
)
session.add(account)
@@ -394,8 +407,10 @@ def main():
while current_retry < max_retries:
try:
account_info = email_generator.get_account_info()
info(f"初始化账号信息成功 => 邮箱: {account_info['email']}, 用户名: {account_info['first_name']}, 密码: {account_info['password']}")
info(
f"初始化账号信息成功 => 邮箱: {account_info['email']}, 用户名: {account_info['first_name']}, 密码: {account_info['password']}"
)
signup_tab = browser.new_tab(LOGIN_URL)
browser.activate_tab(signup_tab)
@@ -412,7 +427,12 @@ def main():
else:
info("获取Cursor会话Token失败")
current_retry += 1
elif result in ["EMAIL_USED", "SIGNUP_RESTRICTED", "VERIFY_FAILED", "EMAIL_GET_CODE_FAILED"]:
elif result in [
"EMAIL_USED",
"SIGNUP_RESTRICTED",
"VERIFY_FAILED",
"EMAIL_GET_CODE_FAILED",
]:
info(f"遇到问题: {result},尝试切换邮箱...")
continue # 使用新邮箱重试注册
else: # ERROR
@@ -440,4 +460,4 @@ def main():
info(f"错误详情: {traceback.format_exc()}")
cleanup_and_exit(browser_manager, 1)
finally:
cleanup_and_exit(browser_manager, 1)
cleanup_and_exit(browser_manager, 1)

View File

@@ -5,12 +5,9 @@ import shutil
import pathlib
import platform
from uuid import uuid4
import sys
from logger import info, warning, error
from config import (
CURSOR_PATH
)
from config import CURSOR_PATH
# 颜色常量定义,保留用于日志输出
@@ -25,36 +22,49 @@ SYSTEM = platform.system()
if SYSTEM not in ("Windows", "Linux", "Darwin"):
raise OSError(f"不支持的操作系统: {SYSTEM}")
def uuid():
"""生成随机UUID"""
return str(uuid4())
def path(path_str):
"""获取绝对路径"""
return pathlib.Path(path_str).resolve()
def randomuuid(randomuuid_str):
"""获取随机UUID如果提供则使用提供的值"""
if not randomuuid_str:
randomuuid_str = uuid()
return randomuuid_str
def random_mac():
"""生成随机MAC地址"""
mac = [0x00, 0x16, 0x3e, random.randint(0x00, 0x7f),
random.randint(0x00, 0xff), random.randint(0x00, 0xff)]
return ':'.join(map(lambda x: "%02x" % x, mac))
mac = [
0x00,
0x16,
0x3E,
random.randint(0x00, 0x7F),
random.randint(0x00, 0xFF),
random.randint(0x00, 0xFF),
]
return ":".join(map(lambda x: "%02x" % x, mac))
def load(file_path: pathlib.Path):
"""加载文件内容"""
with open(file_path, "rb") as f:
return f.read()
def save(file_path: pathlib.Path, data: bytes):
"""保存文件内容"""
with open(file_path, "wb") as f:
f.write(data)
def backup(file_path: pathlib.Path):
"""备份文件"""
backup_path = file_path.with_suffix(file_path.suffix + ".bak")
@@ -62,22 +72,26 @@ def backup(file_path: pathlib.Path):
shutil.copy2(file_path, backup_path)
print(f"已备份 {file_path} -> {backup_path}")
def replace(data: bytes, pattern: str, replace_str: str, probe: str = None) -> bytes:
"""替换文件内容"""
pattern_bytes = pattern.encode() if isinstance(pattern, str) else pattern
replace_bytes = replace_str.encode() if isinstance(replace_str, str) else replace_str
replace_bytes = (
replace_str.encode() if isinstance(replace_str, str) else replace_str
)
if probe:
probe_bytes = probe.encode() if isinstance(probe, str) else probe
if re.search(probe_bytes, data):
print(f"检测到已经被修补的代码,跳过...")
print("检测到已经被修补的代码,跳过...")
return data
return re.sub(pattern_bytes, replace_bytes, data)
def find_main_js():
"""查找Cursor的main.js文件"""
error(f'SYSTEM: {SYSTEM}')
error(f"SYSTEM: {SYSTEM}")
if SYSTEM == "Windows":
localappdata = os.getenv("LOCALAPPDATA")
if not localappdata:
@@ -86,18 +100,32 @@ def find_main_js():
# 使用本地变量保存路径
cursor_path = CURSOR_PATH
if not cursor_path:
error(f'当前windows系统, 环境变量 CURSOR_PATH 不存在,使用默认路径')
error("当前windows系统, 环境变量 CURSOR_PATH 不存在,使用默认路径")
cursor_path = os.getenv("LOCALAPPDATA", "")
else:
info(f'当前windows系统, CURSOR_PATH: {cursor_path}')
info(f"当前windows系统, CURSOR_PATH: {cursor_path}")
# 常见的Cursor安装路径
paths = [
path(os.path.join(cursor_path, "resources", "app", "out", "main.js")),
path(os.path.join(localappdata, "Programs", "cursor", "resources", "app", "out", "main.js")),
path(os.path.join(localappdata, "cursor", "resources", "app", "out", "main.js")),
path(
os.path.join(
localappdata,
"Programs",
"cursor",
"resources",
"app",
"out",
"main.js",
)
),
path(
os.path.join(
localappdata, "cursor", "resources", "app", "out", "main.js"
)
),
]
for p in paths:
info(f"检查路径: {p}")
if p.exists():
@@ -105,41 +133,48 @@ def find_main_js():
return p
else:
warning(f"路径不存在: {p}")
elif SYSTEM == "Darwin": # macOS
paths = [
path("/Applications/Cursor.app/Contents/Resources/app/out/main.js"),
path(os.path.expanduser("~/Applications/Cursor.app/Contents/Resources/app/out/main.js"))
path(
os.path.expanduser(
"~/Applications/Cursor.app/Contents/Resources/app/out/main.js"
)
),
]
for p in paths:
if p.exists():
return p
elif SYSTEM == "Linux":
# Linux上常见的安装路径
paths = [
path("/usr/share/cursor/resources/app/out/main.js"),
path(os.path.expanduser("~/.local/share/cursor/resources/app/out/main.js"))
path(os.path.expanduser("~/.local/share/cursor/resources/app/out/main.js")),
]
for p in paths:
if p.exists():
return p
raise FileNotFoundError("无法找到Cursor的main.js文件请手动指定路径")
def patch_cursor(js_path=None, machine_id=None, mac_addr=None, sqm_id=None, dev_id=None):
def patch_cursor(
js_path=None, machine_id=None, mac_addr=None, sqm_id=None, dev_id=None
):
"""
修补Cursor的main.js文件替换机器ID等识别信息
参数:
js_path: main.js文件路径如果为None则自动查找
machine_id: 机器ID如果为None则随机生成
mac_addr: MAC地址如果为None则随机生成
sqm_id: Windows SQM ID如果为None则使用空字符串
dev_id: 设备ID如果为None则随机生成
返回:
bool: 是否成功
"""
@@ -149,102 +184,105 @@ def patch_cursor(js_path=None, machine_id=None, mac_addr=None, sqm_id=None, dev_
js_path = find_main_js()
else:
js_path = path(js_path)
# 如果找不到main.js文件
if not js_path.exists():
print(f"错误: 找不到文件 {js_path}")
return False
print(f"找到main.js文件: {js_path}")
# 随机生成ID
machine_id = randomuuid(machine_id)
mac_addr = mac_addr or random_mac()
sqm_id = sqm_id or ""
dev_id = randomuuid(dev_id)
# 加载文件内容
data = load(js_path)
# 备份文件
backup(js_path)
# 替换机器ID
data = replace(
data,
r"=.{0,50}timeout.{0,10}5e3.*?,",
f'=/*csp1*/"{machine_id}"/*1csp*/,',
r"=/\*csp1\*/.*?/\*1csp\*/,"
r"=/\*csp1\*/.*?/\*1csp\*/,",
)
# 替换MAC地址
data = replace(
data,
r"(function .{0,50}\{).{0,300}Unable to retrieve mac address.*?(\})",
f'\\1return/*csp2*/"{mac_addr}"/*2csp*/;\\2',
r"()return/\*csp2\*/.*?/\*2csp\*/;()"
r"()return/\*csp2\*/.*?/\*2csp\*/;()",
)
# 替换SQM ID
data = replace(
data,
r'return.{0,50}\.GetStringRegKey.*?HKEY_LOCAL_MACHINE.*?MachineId.*?\|\|.*?""',
f'return/*csp3*/"{sqm_id}"/*3csp*/',
r"return/\*csp3\*/.*?/\*3csp\*/"
r"return/\*csp3\*/.*?/\*3csp\*/",
)
# 替换设备ID
data = replace(
data,
r"return.{0,50}vscode\/deviceid.*?getDeviceId\(\)",
f'return/*csp4*/"{dev_id}"/*4csp*/',
r"return/\*csp4\*/.*?/\*4csp\*/"
r"return/\*csp4\*/.*?/\*4csp\*/",
)
# 保存修改后的文件
save(js_path, data)
print(f"成功修补 {js_path}")
print(f"机器ID: {machine_id}")
print(f"MAC地址: {mac_addr}")
print(f"SQM ID: {sqm_id}")
print(f"设备ID: {dev_id}")
return True
except Exception as e:
print(f"错误: {str(e)}")
import traceback
traceback.print_exc()
return False
class CursorShadowPatcher:
"""Cursor机器标识修改器"""
@staticmethod
def reset_machine_ids():
"""重置所有机器标识"""
return patch_cursor()
if __name__ == "__main__":
# 作为独立脚本运行时,执行交互式修补
print(f"\n{'='*50}")
print(f"Cursor 机器标识重置工具 (Shadow Patch 增强版)")
print(f"{'='*50}")
js_path = input(f"请输入main.js路径 (留空=自动检测): ")
machine_id = input(f"机器ID (留空=随机生成): ")
mac_addr = input(f"MAC地址 (留空=随机生成): ")
sqm_id = input(f"Windows SQM ID (留空=使用空值): ")
dev_id = input(f"设备ID (留空=随机生成): ")
print(f"\n{'=' * 50}")
print("Cursor 机器标识重置工具 (Shadow Patch 增强版)")
print(f"{'=' * 50}")
js_path = input("请输入main.js路径 (留空=自动检测): ")
machine_id = input("机器ID (留空=随机生成): ")
mac_addr = input("MAC地址 (留空=随机生成): ")
sqm_id = input("Windows SQM ID (留空=使用空值): ")
dev_id = input("设备ID (留空=随机生成): ")
success = patch_cursor(js_path, machine_id, mac_addr, sqm_id, dev_id)
if success:
print(f"\n{'='*50}")
print(f"修补成功!")
print(f"\n{'=' * 50}")
print("修补成功!")
else:
print(f"\n{'='*50}")
print(f"修补失败!")
input("按回车键退出...")
print(f"\n{'=' * 50}")
print("修补失败!")
input("按回车键退出...")

View File

@@ -1,13 +1,11 @@
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy import Column, String, DateTime, Text, text, BigInteger, Index
from datetime import datetime
import os
import pathlib
from sqlalchemy import Column, String, Text, text, BigInteger
from contextlib import asynccontextmanager
from logger import info, error
from config import DATABASE_URL
# 基础模型类
class Base(DeclarativeBase):
pass

View File

@@ -1,8 +1,15 @@
from logger import info, warning, error
from logger import info, error
import time
import re
import requests
from config import EMAIL_USERNAME, EMAIL_DOMAIN, EMAIL_PIN, EMAIL_VERIFICATION_RETRIES, EMAIL_VERIFICATION_WAIT
from config import (
EMAIL_USERNAME,
EMAIL_DOMAIN,
EMAIL_PIN,
EMAIL_VERIFICATION_RETRIES,
EMAIL_VERIFICATION_WAIT,
)
class EmailVerificationHandler:
def __init__(self, username=None, domain=None, pin=None):
@@ -11,22 +18,26 @@ class EmailVerificationHandler:
self.session = requests.Session()
self.emailExtension = f"@{self.domain}"
self.pin = pin or EMAIL_PIN
info(f"初始化邮箱验证器成功: {self.username}{self.emailExtension} pin: {self.pin}")
info(
f"初始化邮箱验证器成功: {self.username}{self.emailExtension} pin: {self.pin}"
)
def get_verification_code(self, source_email=None, max_retries=None, wait_time=None):
def get_verification_code(
self, source_email=None, max_retries=None, wait_time=None
):
"""
获取验证码,增加了重试机制
Args:
max_retries: 最大重试次数
wait_time: 每次重试间隔时间(秒)
Returns:
str: 验证码或None
"""
max_retries = max_retries or EMAIL_VERIFICATION_RETRIES
wait_time = wait_time or EMAIL_VERIFICATION_WAIT
info(f'开始获取邮箱验证码=>最大重试次数:{max_retries}, 等待时间:{wait_time}')
info(f"开始获取邮箱验证码=>最大重试次数:{max_retries}, 等待时间:{wait_time}")
for attempt in range(max_retries):
try:
code, mail_id = self._get_latest_mail_code(source_email)
@@ -34,32 +45,36 @@ class EmailVerificationHandler:
info(f"成功获取验证码: {code}")
return code
if attempt < max_retries - 1:
info(f"未找到验证码,{wait_time}秒后重试 ({attempt+1}/{max_retries})...")
info(
f"未找到验证码,{wait_time}秒后重试 ({attempt + 1}/{max_retries})..."
)
time.sleep(wait_time)
except Exception as e:
error(f"获取验证码失败: {str(e)}")
if attempt < max_retries - 1:
info(f"将在{wait_time}秒后重试...")
time.sleep(wait_time)
return None
# 手动输入验证码
def _get_latest_mail_code(self, source_email=None):
info(f"开始获取邮件列表")
info("开始获取邮件列表")
# 获取邮件列表
mail_list_url = f"https://tempmail.plus/api/mails?email={self.username}{self.emailExtension}&limit=20&epin={self.pin}"
try:
mail_list_response = self.session.get(mail_list_url, timeout=10) # 添加超时参数
mail_list_response = self.session.get(
mail_list_url, timeout=10
) # 添加超时参数
mail_list_data = mail_list_response.json()
time.sleep(0.5)
if not mail_list_data.get("result"):
return None, None
except requests.exceptions.Timeout:
error(f"获取邮件列表超时")
error("获取邮件列表超时")
return None, None
except requests.exceptions.ConnectionError:
error(f"获取邮件列表连接错误")
error("获取邮件列表连接错误")
return None, None
except Exception as e:
error(f"获取邮件列表发生错误: {str(e)}")
@@ -73,16 +88,18 @@ class EmailVerificationHandler:
# 获取具体邮件内容
mail_detail_url = f"https://tempmail.plus/api/mails/{first_id}?email={self.username}{self.emailExtension}&epin={self.pin}"
try:
mail_detail_response = self.session.get(mail_detail_url, timeout=10) # 添加超时参数
mail_detail_response = self.session.get(
mail_detail_url, timeout=10
) # 添加超时参数
mail_detail_data = mail_detail_response.json()
time.sleep(0.5)
if not mail_detail_data.get("result"):
return None, None
except requests.exceptions.Timeout:
error(f"获取邮件详情超时")
error("获取邮件详情超时")
return None, None
except requests.exceptions.ConnectionError:
error(f"获取邮件详情连接错误")
error("获取邮件详情连接错误")
return None, None
except Exception as e:
error(f"获取邮件详情发生错误: {str(e)}")
@@ -90,7 +107,7 @@ class EmailVerificationHandler:
# 从邮件文本中提取6位数字验证码
mail_text = mail_detail_data.get("text", "")
# 如果提供了source_email确保邮件内容中包含该邮箱地址
if source_email and source_email.lower() not in mail_text.lower():
error(f"邮件内容不包含指定的邮箱地址: {source_email}")

View File

@@ -1,5 +1,4 @@
import logging
import os
import sys
from config import LOG_LEVEL, LOG_FORMAT, LOG_DATE_FORMAT
@@ -16,14 +15,18 @@ logging.basicConfig(
logger = logging.getLogger()
def info(message):
logger.info(message)
def warning(message):
logger.warning(message)
def error(message):
logger.error(message)
def debug(message):
logger.debug(message)
logger.debug(message)

View File

@@ -1,30 +1,32 @@
import asyncio
import time
from sqlalchemy import select, update
from sqlalchemy import select
from database import init_db, get_session, AccountModel
async def migrate_add_id():
"""为现有记录添加ID字段"""
await init_db()
async with get_session() as session:
# 查询所有没有id的记录
result = await session.execute(
select(AccountModel).where(AccountModel.id == None)
)
accounts = result.scalars().all()
print(f"找到 {len(accounts)} 条需要更新的记录")
# 为每条记录添加id
for i, account in enumerate(accounts):
# 生成基于索引的间隔时间戳,避免所有记录使用同一时间戳
timestamp_ms = int(time.time() * 1000) - (len(accounts) - i) * 1000
account.id = timestamp_ms
print(f"更新记录 {account.email} 的ID为 {timestamp_ms}")
await session.commit()
print("迁移完成")
if __name__ == "__main__":
asyncio.run(migrate_add_id())
asyncio.run(migrate_add_id())

View File

@@ -3,7 +3,6 @@ import sys
import json
import uuid
import hashlib
import shutil
from colorama import Fore, Style, init
# 初始化colorama
@@ -123,12 +122,12 @@ class MachineIDResetter:
if __name__ == "__main__":
print(f"\n{Fore.CYAN}{'='*50}{Style.RESET_ALL}")
print(f"\n{Fore.CYAN}{'=' * 50}{Style.RESET_ALL}")
print(f"{Fore.CYAN}{EMOJI['RESET']} Cursor 机器标识重置工具{Style.RESET_ALL}")
print(f"{Fore.CYAN}{'='*50}{Style.RESET_ALL}")
print(f"{Fore.CYAN}{'=' * 50}{Style.RESET_ALL}")
resetter = MachineIDResetter()
resetter.reset_machine_ids()
print(f"\n{Fore.CYAN}{'='*50}{Style.RESET_ALL}")
input(f"{EMOJI['INFO']} 按回车键退出...")
print(f"\n{Fore.CYAN}{'=' * 50}{Style.RESET_ALL}")
input(f"{EMOJI['INFO']} 按回车键退出...")

View File

@@ -2,7 +2,6 @@ import requests
class Cursor:
models = [
"claude-3-5-sonnet-20241022",
"claude-3-opus",
@@ -46,7 +45,7 @@ class Cursor:
@classmethod
def get_trial_remaining_days(cls, user, token):
url = f"https://www.cursor.com/api/auth/stripe"
url = "https://www.cursor.com/api/auth/stripe"
headers = {
"Content-Type": "application/json",

View File

@@ -45,7 +45,7 @@ def handle_oneapi_cursor_channel(
)
if None in [remaining_balance, remaining_days]:
print(f"[OneAPI] Invalid resposne")
print("[OneAPI] Invalid resposne")
return None
if remaining_balance < low_balance_threshold or (
@@ -64,7 +64,6 @@ def handle_oneapi_cursor_channel(
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--oneapi_url", type=str, required=False, help="URL link for One-API website"

View File

@@ -1,13 +1,13 @@
import requests
class OneAPIManager:
def __init__(self, url, access_token):
self.base_url = url
self.access_token = access_token
self.headers = {
"Content-Type": "application/json",
"Authorization": self.access_token
"Authorization": self.access_token,
}
def get_channel(self, id):
@@ -23,74 +23,69 @@ class OneAPIManager:
return response
# Support multiple keys separated by '\n'
def add_channel(self, name, base_url, key, models, rate_limit_count = 0):
def add_channel(self, name, base_url, key, models, rate_limit_count=0):
url = self.base_url + "/api/channel"
data = {"name": name,
"type": 1,
"key": key,
"openai_organization": "",
"base_url": base_url,
"other": "",
"model_mapping":"",
"status_code_mapping":"",
"headers":"",
"models": ','.join(models),
"auto_ban":0,
"is_image_url_enabled": 0,
"model_test": models[0],
"tested_time": 0,
"priority": 0,
"weight": 0,
"groups": ["default"],
"proxy_url": "",
"region": "",
"sk": "",
"ak": "",
"project_id": "",
"client_id": "",
"client_secret": "",
"refresh_token": "",
"gcp_account": "",
"rate_limit_count":rate_limit_count,
"gemini_model":"",
"tags":"",
"rate_limited":rate_limit_count>0,
"is_tools": False,
"claude_original_request": False,
"group":"default"
data = {
"name": name,
"type": 1,
"key": key,
"openai_organization": "",
"base_url": base_url,
"other": "",
"model_mapping": "",
"status_code_mapping": "",
"headers": "",
"models": ",".join(models),
"auto_ban": 0,
"is_image_url_enabled": 0,
"model_test": models[0],
"tested_time": 0,
"priority": 0,
"weight": 0,
"groups": ["default"],
"proxy_url": "",
"region": "",
"sk": "",
"ak": "",
"project_id": "",
"client_id": "",
"client_secret": "",
"refresh_token": "",
"gcp_account": "",
"rate_limit_count": rate_limit_count,
"gemini_model": "",
"tags": "",
"rate_limited": rate_limit_count > 0,
"is_tools": False,
"claude_original_request": False,
"group": "default",
}
response = requests.post(url, json=data, headers=self.headers)
return response
def delete_channel(self, id):
url = self.base_url + f"/api/channel/{id}"
response = requests.delete(url, headers=self.headers)
return response
def enable_channel(self, id):
url = self.base_url + f"/api/channel"
data = {
"id": id,
"status": 1
}
url = self.base_url + "/api/channel"
data = {"id": id, "status": 1}
response = requests.put(url, json=data, headers=self.headers)
return response
def disable_channel(self, id):
url = self.base_url + f"/api/channel"
data = {
"id": id,
"status": 2
}
url = self.base_url + "/api/channel"
data = {"id": id, "status": 2}
response = requests.put(url, json=data, headers=self.headers)
return response
def test_channel(self, id, model = ""):
def test_channel(self, id, model=""):
url = self.base_url + f"/api/channel/test/{id}?model={model}"
response = requests.get(url, headers=self.headers)