This commit is contained in:
ddCat
2025-03-25 20:47:16 +08:00
commit 72ed581f3a
23 changed files with 4261 additions and 0 deletions

BIN
.DS_Store vendored Normal file

Binary file not shown.

34
.env Normal file
View File

@@ -0,0 +1,34 @@
# 浏览器是否无头模式为True时为无头模式无界面为False时为有头模式有界面
BROWSER_HEADLESS=True
# 浏览器用户代理
BROWSER_USER_AGENT="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36"
# 浏览器代理
BROWSER_PROXY=
# 多个域名使用逗号分隔
EMAIL_DOMAINS=xxx.xxx
# 临时邮箱用户名
EMAIL_USERNAME=xxxx
# 临时邮箱PIN码如果需要
EMAIL_PIN=
# ===== 账号管理配置 =====
# 系统最大已激活的账号数量,如果达到这个数量,则停止注册
# so 要么在页面维护好当前已激活的账号,要么在页面删除账号,或者直接增大该值
MAX_ACCOUNTS=10
# 数据库URL
DATABASE_URL="sqlite+aiosqlite:///./accounts.db"
# ===== API服务配置 =====
# API服务监听主机地址0.0.0.0 允许非本机访问
API_HOST="0.0.0.0"
# API服务端口号
API_PORT=8000
# 是否启用调试模式
API_DEBUG=True
# API服务工作进程数量Windows下建议使用1
API_WORKERS=1
# 是否启用UI
ENABLE_UI=True

85
README.md Normal file
View File

@@ -0,0 +1,85 @@
# Cursor 本地开发指南
本项目是建在巨人肩膀上的一个开源项目,不做收费,仅供学习参考。
参考项目:
[chengazhen/cursor-auto-free](https://github.com/chengazhen/cursor-auto-free)
[cursor-account-api](https://github.com/Elawen-Carl/cursor-account-api)
## 环境要求
- Python 3.8+
- pip (Python包管理器)
## 本地开发设置步骤
1. 安装 Python 依赖
```bash
pip install -r requirements.txt
```
2. 配置环境变量
- 按照 `config.py` 配置 `.env` 环境参数
## 使用说明
### 1. 环境变量配置:
在项目根目录创建 .env 文件:
```
# 多个域名使用逗号分隔
EMAIL_DOMAINS=xxx.xx
# 临时邮箱用户名
EMAIL_USERNAME=test
# 临时邮箱PIN码如果需要
EMAIL_PIN=
# 数据库URL
DATABASE_URL="sqlite+aiosqlite:///./accounts.db"
# ===== API服务配置 =====
# API服务监听主机地址0.0.0.0 允许非本机访问
API_HOST="0.0.0.0"
# API服务端口号
API_PORT=8000
# 是否启用UI
ENABLE_UI=True
# 最大注册账号数量
MAX_ACCOUNTS=1
```
### 3. 数据持久化:
数据库文件会保存在 `accounts.db` 文件
日志文件会保存在容器内的 `api.log`
*注意事项:*
确保 `.env` 文件中的配置正确
数据目录 `accounts.db`需要适当的权限
容器内使用无头模式运行Chrome浏览器
API服务默认在8000端口运行
### 检查API服务是否正常运行
```
curl http://localhost:8000/health
```
## API 端点
- `GET /accounts` - 获取所有账号
- `GET /account/random` - 随机获取一个账号
- `POST /account` - 创建新账号
## 可视化页面
运行服务器后,访问:
- UI: http://localhost:8000/
## API 文档
运行服务器后,访问:
- Swagger UI: http://localhost:8000/docs
- ReDoc: http://localhost:8000/redoc
## 开发工具建议
- Cursor 或 PyCharm
## 调试提示
1. 查看日志
```bash
tail -f app.log
```

949
api.py Normal file
View File

@@ -0,0 +1,949 @@
from fastapi import FastAPI, HTTPException, Depends, status, Body
from pydantic import BaseModel
from typing import Optional, List
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, delete, desc
from pathlib import Path
from database import get_session, AccountModel, init_db
from fastapi.middleware.cors import CORSMiddleware
from datetime import datetime, timedelta
import uvicorn
import asyncio
import os
import traceback
from fastapi.responses import JSONResponse, FileResponse
from cursor_pro_keep_alive import main as register_account
from browser_utils import BrowserManager
from logger import info, error
from tokenManager.oneapi_cursor_cleaner import handle_oneapi_cursor_channel
from tokenManager.oneapi_manager import OneAPIManager
from contextlib import asynccontextmanager
from tokenManager.cursor import Cursor # 添加这个导入
import concurrent.futures
from functools import lru_cache
from config import MAX_ACCOUNTS, REGISTRATION_INTERVAL, API_HOST, API_PORT, API_DEBUG, API_WORKERS
from fastapi.staticfiles import StaticFiles
# 全局状态追踪
registration_status = {
"is_running": False,
"last_run": None,
"last_status": None,
"next_run": None,
"total_runs": 0,
"successful_runs": 0,
"failed_runs": 0,
}
# 定义静态文件目录
static_path = Path(__file__).parent / "static"
static_path.mkdir(exist_ok=True) # 确保目录存在
# 全局任务存储
background_tasks = {"registration_task": None}
# 添加lifespan管理器在应用启动时初始化数据库
@asynccontextmanager
async def lifespan(app: FastAPI):
# 启动时初始化数据库
await init_db()
info("数据库已初始化")
yield
# 关闭时的清理操作
info("应用程序关闭")
app = FastAPI(
title="Cursor Account API",
description="API for managing Cursor accounts",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc",
debug=API_DEBUG,
)
# 挂载静态文件目录
app.mount("/static", StaticFiles(directory=str(static_path)), name="static")
# 使用startup事件初始化数据库
@app.on_event("startup")
async def startup_event():
await init_db()
info("数据库已初始化")
# 添加CORS中间件
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class Account(BaseModel):
email: str
password: Optional[str] = None
token: str
user: str
usage_limit: Optional[str] = None
created_at: Optional[str] = None
status: str = "active" # 默认为"active"
id: Optional[int] = None # 添加id字段可选
class Config:
from_attributes = True
class AccountResponse(BaseModel):
success: bool
data: Optional[Account] = None
message: str = ""
async def get_active_account_count() -> int:
"""获取当前账号总数"""
async with get_session() as session:
result = await session.execute(select(func.count()).select_from(AccountModel).where(AccountModel.status == "active"))
return result.scalar()
async def get_account_count() -> int:
"""获取当前账号总数"""
async with get_session() as session:
result = await session.execute(select(func.count()).select_from(AccountModel))
return result.scalar()
async def run_registration():
"""运行注册脚本"""
global registration_status
browser_manager = None
try:
info("注册任务开始运行")
while registration_status["is_running"]:
try:
count = await get_active_account_count()
info(f"当前数据库已激活账号数: {count}")
if count >= MAX_ACCOUNTS:
info(f"已达到最大账号数量 ({count}/{MAX_ACCOUNTS})")
registration_status["last_status"] = "completed"
registration_status["is_running"] = False
break
info(f"开始注册尝试 (当前账号数: {count}/{MAX_ACCOUNTS})")
registration_status["last_run"] = datetime.now().isoformat()
registration_status["total_runs"] += 1
# 初始化浏览器管理器
if not browser_manager:
browser_manager = BrowserManager()
if not browser_manager.init_browser():
error("浏览器初始化失败,终止注册任务")
registration_status["failed_runs"] += 1
registration_status["last_status"] = "error"
registration_status["is_running"] = False
break
# 调用注册函数
try:
success = await asyncio.get_event_loop().run_in_executor(
None, register_account
)
if success:
registration_status["successful_runs"] += 1
registration_status["last_status"] = "success"
info("注册成功")
else:
registration_status["failed_runs"] += 1
registration_status["last_status"] = "failed"
info("注册失败")
except SystemExit:
# 捕获 SystemExit 异常,这是注册脚本正常退出的方式
info("注册脚本正常退出")
if registration_status["last_status"] != "error":
registration_status["last_status"] = "completed"
except Exception as e:
error(f"注册过程执行出错: {str(e)}")
error(traceback.format_exc())
registration_status["failed_runs"] += 1
registration_status["last_status"] = "error"
# 更新下次运行时间
next_run = datetime.now().timestamp() + REGISTRATION_INTERVAL
registration_status["next_run"] = next_run
info(f"等待 {REGISTRATION_INTERVAL} 秒后进行下一次尝试")
await asyncio.sleep(REGISTRATION_INTERVAL)
except asyncio.CancelledError:
info("注册迭代被取消")
raise
except Exception as e:
registration_status["failed_runs"] += 1
registration_status["last_status"] = "error"
error(f"注册过程出错: {str(e)}")
error(traceback.format_exc())
if not registration_status["is_running"]:
break
await asyncio.sleep(REGISTRATION_INTERVAL)
except asyncio.CancelledError:
info("注册任务被取消")
raise
except Exception as e:
error(f"注册任务致命错误: {str(e)}")
error(traceback.format_exc())
raise
finally:
registration_status["is_running"] = False
if browser_manager:
try:
browser_manager.quit()
except Exception as e:
error(f"清理浏览器资源时出错: {str(e)}")
error(traceback.format_exc())
@app.get("/", tags=["UI"])
async def serve_index():
"""提供Web UI界面"""
index_path = Path(__file__).parent / "index.html"
return FileResponse(index_path)
@app.get("/general", tags=["General"])
async def root():
"""API根路径返回API信息"""
try:
# 获取当前账号数量和使用情况
async with get_session() as session:
result = await session.execute(select(AccountModel))
accounts = result.scalars().all()
usage_info = []
total_balance = 0
active_accounts = 0
for acc in accounts:
remaining_balance = Cursor.get_remaining_balance(acc.user, acc.token)
remaining_days = Cursor.get_trial_remaining_days(acc.user, acc.token)
if remaining_balance is not None and remaining_balance > 0:
active_accounts += 1
total_balance += remaining_balance
usage_info.append(
{
"email": acc.email,
"balance": remaining_balance,
"days": remaining_days,
"status": (
"active"
if remaining_balance is not None and remaining_balance > 0
else "inactive"
),
}
)
return {
"service": {
"name": "Cursor Account API",
"version": "1.0.0",
"status": "running",
"description": "API for managing Cursor Pro accounts and automatic registration",
},
"statistics": {
"total_accounts": len(accounts),
"active_accounts": active_accounts,
"total_remaining_balance": total_balance,
"max_accounts": MAX_ACCOUNTS,
"remaining_slots": MAX_ACCOUNTS - len(accounts),
"registration_interval": f"{REGISTRATION_INTERVAL} seconds",
},
"accounts_info": usage_info, # 添加账号详细信息
"registration_status": {
"is_running": registration_status["is_running"],
"last_run": registration_status["last_run"],
"last_status": registration_status["last_status"],
"next_run": registration_status["next_run"],
"statistics": {
"total_runs": registration_status["total_runs"],
"successful_runs": registration_status["successful_runs"],
"failed_runs": registration_status["failed_runs"],
"success_rate": (
f"{(registration_status['successful_runs'] / registration_status['total_runs'] * 100):.1f}%"
if registration_status["total_runs"] > 0
else "N/A"
),
},
},
"endpoints": {
"documentation": {"swagger": "/docs", "redoc": "/redoc"},
"health": {
"check": "/health",
"registration_status": "/registration/status",
},
"accounts": {
"list_all": "/accounts",
"random": "/account/random",
"create": {"path": "/account", "method": "POST"},
"delete": {"path": "/account/{email}", "method": "DELETE"},
"usage": {
"path": "/account/{email}/usage",
"method": "GET",
"description": "Get account usage by email",
},
},
"registration": {
"start": {"path": "/registration/start", "method": "GET"},
"stop": {"path": "/registration/stop", "method": "POST"},
"status": {"path": "/registration/status", "method": "GET"},
},
"usage": {"check": {"path": "/usage", "method": "GET"}},
"clean": {
"run": {
"path": "/clean",
"method": "POST",
"params": {"clean_type": ["check", "disable", "delete"]},
}
},
},
"support": {
"github": "https://github.com/Elawen-Carl/cursor-account-api",
"author": "Elawen Carl",
"contact": "elawencarl@gmail.com",
},
"timestamp": datetime.now().isoformat(),
}
except Exception as e:
error(f"根端点错误: {str(e)}")
error(traceback.format_exc())
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error fetching API information",
)
@app.get("/health", tags=["General"])
async def health_check():
"""健康检查端点"""
return {"status": "healthy"}
@app.get("/accounts", response_model=List[Account], tags=["Accounts"])
async def get_accounts(status: Optional[str] = None):
"""获取所有可用的账号和token
可选参数 status 用于过滤账号状态:
- active: 只返回正常账号
- disabled: 只返回停用账号
- deleted: 只返回已删除账号
- 不提供参数则返回所有账号
结果按id降序排列最新的账号排在前面
"""
try:
async with get_session() as session:
# 构建基本查询添加按id降序排序
query = select(AccountModel).order_by(desc(AccountModel.id))
# 根据状态过滤
if status:
query = query.where(AccountModel.status == status)
result = await session.execute(query)
accounts = result.scalars().all()
if not accounts:
raise HTTPException(status_code=404, detail="没有找到符合条件的账号")
return accounts
except Exception as e:
error(f"获取账号失败: {str(e)}")
error(traceback.format_exc())
raise HTTPException(status_code=500, detail="服务器内部错误")
@app.get("/account/random", response_model=AccountResponse, tags=["Accounts"])
async def get_random_account():
"""随机获取一个可用的账号和token"""
try:
async with get_session() as session:
result = await session.execute(
select(AccountModel).order_by(func.random()).limit(1)
)
account = result.scalar_one_or_none()
if not account:
return AccountResponse(success=False, message="No accounts available")
return AccountResponse(success=True, data=Account.from_orm(account))
except Exception as e:
error(f"获取随机账号失败: {str(e)}")
error(traceback.format_exc())
raise HTTPException(status_code=500, detail="Internal server error")
@app.post("/account", response_model=AccountResponse, tags=["Accounts"])
async def create_account(account: Account):
"""创建新账号"""
try:
async with get_session() as session:
db_account = AccountModel(
email=account.email,
password=account.password,
token=account.token,
usage_limit=account.usage_limit,
created_at=account.created_at,
)
session.add(db_account)
await session.commit()
return AccountResponse(
success=True, data=account, message="Account created successfully"
)
except Exception as e:
error(f"创建账号失败: {str(e)}")
error(traceback.format_exc())
return AccountResponse(
success=False, message=f"Failed to create account: {str(e)}"
)
@app.delete("/account/{email}", response_model=AccountResponse, tags=["Accounts"])
async def delete_account(email: str, hard_delete: bool = False):
"""删除或停用指定邮箱的账号
如果 hard_delete=True则物理删除账号
否则仅将状态设置为'deleted'
"""
try:
async with get_session() as session:
# 先检查账号是否存在
result = await session.execute(
select(AccountModel).where(AccountModel.email == email)
)
account = result.scalar_one_or_none()
if not account:
return AccountResponse(
success=False, message=f"账号 {email} 不存在"
)
if hard_delete:
# 物理删除账号
await session.execute(
delete(AccountModel).where(AccountModel.email == email)
)
delete_message = f"账号 {email} 已永久删除"
else:
# 逻辑删除:将状态更新为'deleted'
account.status = "deleted"
delete_message = f"账号 {email} 已标记为删除状态"
await session.commit()
return AccountResponse(
success=True, message=delete_message
)
except Exception as e:
error(f"删除账号失败: {str(e)}")
error(traceback.format_exc())
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"删除账号失败: {str(e)}",
)
# 添加状态更新的请求体模型
class StatusUpdate(BaseModel):
status: str
@app.put("/account/id/{id}/status", response_model=AccountResponse, tags=["Accounts"])
async def update_account_status(id: str, update: StatusUpdate):
"""更新账号状态
可选状态: active (正常), disabled (停用), deleted (已删除)
"""
# 使用update.status替代原先的status参数
try:
# 验证状态值
valid_statuses = ["active", "disabled", "deleted"]
if update.status not in valid_statuses:
return AccountResponse(
success=False,
message=f"无效的状态值。允许的值: {', '.join(valid_statuses)}"
)
async with get_session() as session:
# 通过邮箱查询账号
result = await session.execute(
select(AccountModel).where(AccountModel.id == id)
)
account = result.scalar_one_or_none()
if not account:
return AccountResponse(
success=False, message=f"邮箱为 {email} 的账号不存在"
)
# 更新状态
account.status = update.status
await session.commit()
return AccountResponse(
success=True, message=f"账号 {account.email} 状态已更新为 '{update.status}'"
)
except Exception as e:
error(f"通过邮箱更新账号状态失败: {str(e)}")
error(traceback.format_exc())
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"更新账号状态失败: {str(e)}",
)
@app.get("/registration/start", tags=["Registration"])
async def start_registration():
"""手动启动注册任务"""
info("手动启动注册任务")
global background_tasks, registration_status
try:
# 检查是否已达到最大账号数
count = await get_active_account_count()
if count >= MAX_ACCOUNTS:
info(f"拒绝注册请求 - 已达到最大账号数 ({count}/{MAX_ACCOUNTS})")
return {
"success": False,
"message": f"Already have maximum number of accounts ({MAX_ACCOUNTS})",
}
# 如果任务已在运行,返回相应消息
if (
background_tasks["registration_task"]
and not background_tasks["registration_task"].done()
):
info("注册请求被忽略 - 任务已在运行")
return {
"success": True,
"message": "Registration task is already running",
"status": {
"is_running": registration_status["is_running"],
"last_run": registration_status["last_run"],
"next_run": (
datetime.fromtimestamp(
registration_status["next_run"]
).isoformat()
if registration_status["next_run"]
else None
),
"last_status": registration_status["last_status"],
},
}
# 重置注册状态
registration_status.update(
{
"is_running": True,
"last_status": "starting",
"last_run": datetime.now().isoformat(),
"next_run": datetime.now().timestamp() + REGISTRATION_INTERVAL,
"total_runs": 0,
"successful_runs": 0,
"failed_runs": 0,
}
)
# 创建并启动新任务
loop = asyncio.get_running_loop()
task = loop.create_task(run_registration())
background_tasks["registration_task"] = task
# 添加任务完成回调
def task_done_callback(task):
try:
task.result() # 这将重新引发任何未处理的异常
except asyncio.CancelledError:
info("注册任务被取消")
registration_status["last_status"] = "cancelled"
except Exception as e:
error(f"注册任务失败: {str(e)}")
error(traceback.format_exc())
registration_status["last_status"] = "error"
finally:
if registration_status["is_running"]: # 只有在任务仍在运行时才更新状态
registration_status["is_running"] = False
background_tasks["registration_task"] = None
task.add_done_callback(task_done_callback)
info("手动启动注册任务")
# 等待任务实际开始运行
await asyncio.sleep(1)
# 检查任务是否成功启动
if task.done():
try:
task.result() # 如果任务已完成,检查是否有异常
except Exception as e:
error(f"注册任务启动失败: {str(e)}")
error(traceback.format_exc())
registration_status["is_running"] = False
registration_status["last_status"] = "error"
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to start registration task: {str(e)}",
)
return {
"success": True,
"message": "Registration task started successfully",
"status": {
"is_running": registration_status["is_running"],
"last_run": registration_status["last_run"],
"next_run": datetime.fromtimestamp(
registration_status["next_run"]
).isoformat(),
"last_status": registration_status["last_status"],
},
}
except Exception as e:
error(f"启动注册任务失败: {str(e)}")
error(traceback.format_exc())
registration_status["is_running"] = False
registration_status["last_status"] = "error"
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to start registration task: {str(e)}",
)
@app.get("/registration/stop", tags=["Registration"])
async def stop_registration():
"""手动停止注册任务"""
global background_tasks
try:
if (
not background_tasks["registration_task"]
or background_tasks["registration_task"].done()
):
return {"success": False, "message": "No running registration task found"}
background_tasks["registration_task"].cancel()
try:
await background_tasks["registration_task"]
except asyncio.CancelledError:
info("注册任务被取消")
background_tasks["registration_task"] = None
registration_status["is_running"] = False
registration_status["last_status"] = "manually stopped"
return {"success": True, "message": "Registration task stopped successfully"}
except Exception as e:
error(f"停止注册任务失败: {str(e)}")
error(traceback.format_exc())
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to stop registration task: {str(e)}",
)
@app.get("/registration/status", tags=["Registration"])
async def get_registration_status():
"""获取注册状态"""
try:
count = await get_account_count()
task_status = (
"running"
if (
background_tasks["registration_task"]
and not background_tasks["registration_task"].done()
)
else "stopped"
)
status_info = {
"current_count": count,
"max_accounts": MAX_ACCOUNTS,
"is_registration_active": count < MAX_ACCOUNTS,
"remaining_slots": MAX_ACCOUNTS - count,
"task_status": task_status,
"registration_details": {
"is_running": registration_status["is_running"],
"last_run": registration_status["last_run"],
"last_status": registration_status["last_status"],
"next_run": registration_status["next_run"],
"statistics": {
"total_runs": registration_status["total_runs"],
"successful_runs": registration_status["successful_runs"],
"failed_runs": registration_status["failed_runs"],
"success_rate": (
f"{(registration_status['successful_runs'] / registration_status['total_runs'] * 100):.1f}%"
if registration_status["total_runs"] > 0
else "N/A"
),
},
},
}
info(f"请求注册状态 (当前账号数: {count}, 状态: {task_status})")
return status_info
except Exception as e:
error(f"获取注册状态失败: {str(e)}")
error(traceback.format_exc())
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to get registration status: {str(e)}",
)
# 自定义异常处理
@app.exception_handler(HTTPException)
async def http_exception_handler(request, exc):
error(f"HTTP错误发生: {exc.detail}")
return JSONResponse(
status_code=exc.status_code, content={"success": False, "message": exc.detail}
)
@app.exception_handler(Exception)
async def general_exception_handler(request, exc):
error(f"意外错误发生: {str(exc)}")
error(f"错误详情: {traceback.format_exc()}")
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={
"success": False,
"message": "Internal server error occurred",
"detail": str(exc) if app.debug else None,
},
)
# 添加缓存装饰器
@lru_cache(maxsize=100)
def get_account_status(user: str, token: str, timestamp: int):
"""缓存10分钟内的账号状态"""
balance = Cursor.get_remaining_balance(user, token)
days = Cursor.get_trial_remaining_days(user, token)
return {
"balance": balance,
"days": days,
"status": "active" if balance is not None and balance > 0 else "inactive",
}
# 修改 check_usage 接口
@app.get("/usage")
async def check_usage():
try:
async with get_session() as session:
result = await session.execute(select(AccountModel))
accounts = result.scalars().all()
# 使用当前时间的10分钟间隔作为缓存key
cache_timestamp = int(datetime.now().timestamp() / 600)
# 使用线程池并发获取账号状态
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
futures = [
executor.submit(
get_account_status, acc.user, acc.token, cache_timestamp
)
for acc in accounts
]
usage_info = []
for acc, future in zip(accounts, futures):
status = future.result()
usage_info.append(
{
"email": acc.email,
"usage_limit": status["balance"],
"remaining_days": status["days"],
"status": status["status"],
}
)
return {
"total_accounts": len(accounts),
"usage_info": usage_info,
"summary": {
"active_accounts": sum(
1 for info in usage_info if info["status"] == "active"
),
"inactive_accounts": sum(
1 for info in usage_info if info["status"] == "inactive"
),
"total_remaining_balance": sum(
info["usage_limit"] or 0 for info in usage_info
),
},
}
except Exception as e:
error(f"检查使用量失败: {str(e)}")
error(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
@app.get("/account/{email}/usage", tags=["Accounts"])
async def get_account_usage(email: str):
"""根据邮箱查询账户使用量并更新数据库"""
try:
async with get_session() as session:
# 查询指定邮箱的账号
result = await session.execute(
select(AccountModel).where(AccountModel.email == email)
)
account = result.scalar_one_or_none()
if not account:
raise HTTPException(
status_code=404, detail=f"Account with email {email} not found"
)
# 获取账号使用量
remaining_balance = Cursor.get_remaining_balance(
account.user, account.token
)
remaining_days = Cursor.get_trial_remaining_days(
account.user, account.token
)
# 计算总额度和已使用额度
total_limit = 150 # 默认总额度
used_limit = 0
if remaining_balance is not None:
used_limit = total_limit - remaining_balance
if remaining_days is not None and remaining_days == 0:
account.status = "disabled"
# 更新数据库中的usage_limit字段
account.usage_limit = str(remaining_balance)
await session.commit()
db_updated = True
else:
db_updated = False
return {
"success": True,
"email": account.email,
"usage": {
"remaining_balance": remaining_balance,
"total_limit": total_limit,
"used_limit": used_limit,
"remaining_days": remaining_days,
"status": (
"active"
if remaining_balance is not None and remaining_balance > 0
else "inactive"
),
},
"db_updated": db_updated,
"timestamp": datetime.now().isoformat(),
}
except HTTPException:
raise
except Exception as e:
error(f"查询账号使用量失败: {str(e)}")
error(traceback.format_exc())
return {
"success": False,
"message": f"Failed to get account usage: {str(e)}"
}
# 添加通过ID删除账号的API
@app.delete("/account/id/{id}", response_model=AccountResponse, tags=["Accounts"])
async def delete_account_by_id(id: int, hard_delete: bool = False):
"""通过ID删除或停用账号
如果 hard_delete=True则物理删除账号
否则仅将状态设置为'deleted'
"""
try:
async with get_session() as session:
# 通过ID查询账号
result = await session.execute(
select(AccountModel).where(AccountModel.id == id)
)
account = result.scalar_one_or_none()
if not account:
return AccountResponse(
success=False, message=f"ID为 {id} 的账号不存在"
)
email = account.email # 保存邮箱以在响应中显示
if hard_delete:
# 物理删除账号
await session.execute(
delete(AccountModel).where(AccountModel.id == id)
)
delete_message = f"账号 {email} (ID: {id}) 已永久删除"
else:
# 逻辑删除:将状态更新为'deleted'
account.status = "deleted"
delete_message = f"账号 {email} (ID: {id}) 已标记为删除状态"
await session.commit()
return AccountResponse(
success=True, message=delete_message
)
except Exception as e:
error(f"通过ID删除账号失败: {str(e)}")
error(traceback.format_exc())
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"删除账号失败: {str(e)}",
)
# 添加"使用Token"功能
@app.post("/account/use-token/{id}", tags=["Accounts"])
async def use_account_token(id: int):
"""使用指定账号的Token更新Cursor认证"""
try:
async with get_session() as session:
# 通过ID查询账号
result = await session.execute(
select(AccountModel).where(AccountModel.id == id)
)
account = result.scalar_one_or_none()
if not account:
return {"success": False, "message": f"ID为 {id} 的账号不存在"}
# 调用CursorAuthManager更新认证
from cursor_auth_manager import CursorAuthManager
auth_manager = CursorAuthManager()
success = auth_manager.update_auth(email=account.email, access_token=account.token, refresh_token=account.token)
# 重置Cursor的机器ID
from reset_machine import MachineIDResetter
resetter = MachineIDResetter()
resetter.reset_machine_ids()
if success:
return {"success": True, "message": f"成功使用账号 {account.email} 的Token"}
else:
return {"success": False, "message": "Token更新失败"}
except Exception as e:
error(f"使用账号Token失败: {str(e)}")
error(traceback.format_exc())
return {"success": False, "message": f"使用Token失败: {str(e)}"}
if __name__ == "__main__":
uvicorn.run(
"api:app",
host=API_HOST,
port=API_PORT,
reload=API_DEBUG,
access_log=True,
log_level="info",
workers=API_WORKERS,
loop="asyncio", # Windows下使用默认的asyncio
)

98
browser_utils.py Normal file
View File

@@ -0,0 +1,98 @@
from DrissionPage import ChromiumOptions, Chromium
import sys
import os
from dotenv import load_dotenv
from logger import info, warning
load_dotenv()
class BrowserManager:
def __init__(self):
self.browser = None
def init_browser(self):
"""初始化浏览器"""
co = self._get_browser_options()
self.browser = Chromium(co)
return self.browser
def _get_browser_options(self):
"""获取浏览器配置"""
co = ChromiumOptions()
try:
extension_path = self._get_extension_path()
co.add_extension(extension_path)
except FileNotFoundError as e:
info(f"警告: {e}")
co.set_user_agent(
os.getenv(
"BROWSER_USER_AGENT",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36",
)
)
co.set_pref("credentials_enable_service", False)
co.set_argument("--hide-crash-restore-bubble")
proxy = os.getenv("BROWSER_PROXY")
if proxy:
co.set_proxy(proxy)
# 禁用自动化特征(关键参数)
co.set_argument("--disable-blink-features=AutomationControlled")
co.set_argument("--disable-features=AutomationControlled")
co.set_argument("--disable-automation-extension")
# 随机化指纹参数
co.set_pref("webgl.vendor", "NVIDIA Corporation")
co.set_pref(
"webgl.renderer",
"ANGLE (NVIDIA, NVIDIA GeForce RTX 3070 Direct3D11 vs_5_0 ps_5_0)",
)
co.set_pref("navigator.plugins.length", 5)
co.set_pref("navigator.hardwareConcurrency", 8)
# 覆盖自动化特征(关键)
co.set_pref("dom.webdriver.enabled", False)
co.set_pref("useAutomationExtension", False)
# 设置时区参数
co.set_argument("--timezone=Asia/Shanghai")
co.set_pref("timezone.override", "Asia/Shanghai")
# 设置更真实的屏幕参数
co.set_pref("screen.width", 1920)
co.set_pref("screen.height", 1080)
co.set_pref("screen.pixelDepth", 24)
co.auto_port()
co.headless(
os.getenv("BROWSER_HEADLESS", "True").lower() == "true"
) # 生产环境使用无头模式
# Mac 系统特殊处理
if sys.platform == "darwin" or sys.platform == "linux":
co.set_argument("--no-sandbox")
co.set_argument("--disable-gpu")
return co
def _get_extension_path(self):
"""获取插件路径"""
root_dir = os.getcwd()
extension_path = os.path.join(root_dir, "turnstilePatch")
if hasattr(sys, "_MEIPASS"):
extension_path = os.path.join(sys._MEIPASS, "turnstilePatch")
if not os.path.exists(extension_path):
raise FileNotFoundError(f"插件不存在: {extension_path}")
info(f"插件路径: {extension_path}")
return extension_path
def quit(self):
"""关闭浏览器"""
if self.browser:
try:
self.browser.quit()
except:
pass

68
config.py Normal file
View File

@@ -0,0 +1,68 @@
import os
from dotenv import load_dotenv
import logging
# 加载.env文件中的环境变量
load_dotenv()
# ===== 日志配置 =====
# 日志级别DEBUG, INFO, WARNING, ERROR, CRITICAL
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO")
# 日志格式:时间戳 - 日志级别 - 消息内容
LOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s"
# 日志日期格式:年-月-日 时:分:秒
LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
# ===== API服务配置 =====
# API服务监听主机地址
API_HOST = os.getenv("API_HOST", "127.0.0.1")
# API服务端口号
API_PORT = int(os.getenv("API_PORT", 8000))
# 是否启用调试模式
API_DEBUG = os.getenv("API_DEBUG", "false").lower() == "true"
# API服务工作进程数量Windows下建议使用1
API_WORKERS = int(os.getenv("API_WORKERS", 1))
# ===== 账号管理配置 =====
# 系统最大已激活的账号数量
MAX_ACCOUNTS = int(os.getenv("MAX_ACCOUNTS", 10))
# 每次注册间隔时间(秒)
REGISTRATION_INTERVAL = int(os.getenv("REGISTRATION_INTERVAL", 60))
# 注册失败时的最大重试次数
REGISTRATION_MAX_RETRIES = int(os.getenv("REGISTRATION_MAX_RETRIES", 3))
# 注册重试间隔时间(秒)
REGISTRATION_RETRY_INTERVAL = int(os.getenv("REGISTRATION_RETRY_INTERVAL", 5))
# ===== 浏览器配置 =====
# 是否以无头模式运行浏览器(不显示界面)
BROWSER_HEADLESS = os.getenv("BROWSER_HEADLESS", "true").lower() == "true"
# 浏览器可执行文件路径(为空则使用默认路径)
BROWSER_EXECUTABLE_PATH = os.getenv("BROWSER_EXECUTABLE_PATH", None)
# 浏览器下载文件保存路径
BROWSER_DOWNLOAD_PATH = os.getenv("BROWSER_DOWNLOAD_PATH", None)
# ===== Cursor URL配置 =====
# Cursor登录页面URL
LOGIN_URL = "https://authenticator.cursor.sh"
# Cursor注册页面URL
SIGN_UP_URL = "https://authenticator.cursor.sh/sign-up"
# Cursor设置页面URL
SETTINGS_URL = "https://www.cursor.com/settings"
# ===== 邮箱配置 =====
# 临时邮箱用户名
EMAIL_USERNAME = os.getenv("EMAIL_USERNAME", "ddcat666")
# 临时邮箱域名
EMAIL_DOMAIN = os.getenv("EMAIL_DOMAIN", "mailto.plus")
# 临时邮箱PIN码如果需要
EMAIL_PIN = os.getenv("EMAIL_PIN", "")
# 可用于注册的邮箱域名列表(逗号分隔)
EMAIL_DOMAINS = [domain.strip() for domain in os.getenv("EMAIL_DOMAINS", "ddcat.store").split(",")]
# 邮件验证码获取最大重试次数
EMAIL_VERIFICATION_RETRIES = int(os.getenv("EMAIL_VERIFICATION_RETRIES", 5))
# 邮件验证码获取重试间隔(秒)
EMAIL_VERIFICATION_WAIT = int(os.getenv("EMAIL_VERIFICATION_WAIT", 5))
# ===== 数据库配置 =====
# 数据库连接URL默认使用SQLite
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite+aiosqlite:////app/accounts.db")

86
cursor_auth_manager.py Normal file
View File

@@ -0,0 +1,86 @@
import sqlite3
import os
import sys
class CursorAuthManager:
"""Cursor认证信息管理器"""
def __init__(self):
# 判断操作系统
if sys.platform == "win32": # Windows
appdata = os.getenv("APPDATA")
if appdata is None:
raise EnvironmentError("APPDATA 环境变量未设置")
self.db_path = os.path.join(
appdata, "Cursor", "User", "globalStorage", "state.vscdb"
)
elif sys.platform == "darwin": # macOS
self.db_path = os.path.abspath(os.path.expanduser(
"~/Library/Application Support/Cursor/User/globalStorage/state.vscdb"
))
elif sys.platform == "linux" : # Linux 和其他类Unix系统
self.db_path = os.path.abspath(os.path.expanduser(
"~/.config/Cursor/User/globalStorage/state.vscdb"
))
else:
raise NotImplementedError(f"不支持的操作系统: {sys.platform}")
def update_auth(self, email=None, access_token=None, refresh_token=None):
"""
更新Cursor的认证信息
:param email: 新的邮箱地址
:param access_token: 新的访问令牌
:param refresh_token: 新的刷新令牌
:return: bool 是否成功更新
"""
updates = []
# 登录状态
updates.append(("cursorAuth/cachedSignUpType", "Auth_0"))
if email is not None:
updates.append(("cursorAuth/cachedEmail", email))
if access_token is not None:
updates.append(("cursorAuth/accessToken", access_token))
if refresh_token is not None:
updates.append(("cursorAuth/refreshToken", refresh_token))
if not updates:
print("没有提供任何要更新的值")
return False
conn = None
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
for key, value in updates:
# 如果没有更新任何行,说明key不存在,执行插入
# 检查 accessToken 是否存在
check_query = f"SELECT COUNT(*) FROM itemTable WHERE key = ?"
cursor.execute(check_query, (key,))
if cursor.fetchone()[0] == 0:
insert_query = "INSERT INTO itemTable (key, value) VALUES (?, ?)"
cursor.execute(insert_query, (key, value))
else:
update_query = "UPDATE itemTable SET value = ? WHERE key = ?"
cursor.execute(update_query, (value, key))
if cursor.rowcount > 0:
print(f"成功更新 {key.split('/')[-1]} {value}")
else:
print(f"未找到 {key.split('/')[-1]} 或值未变化")
conn.commit()
return True
except sqlite3.Error as e:
print("数据库错误:", str(e))
return False
except Exception as e:
print("发生错误:", str(e))
return False
finally:
if conn:
conn.close()

415
cursor_pro_keep_alive.py Normal file
View File

@@ -0,0 +1,415 @@
import os
import sys
import psutil
import time
import random
from logger import info, warning, error
import traceback
from config import (
LOGIN_URL, SIGN_UP_URL, SETTINGS_URL,
EMAIL_DOMAINS, REGISTRATION_MAX_RETRIES
)
if sys.stdout.encoding != "utf-8":
sys.stdout.reconfigure(encoding="utf-8")
if sys.stderr.encoding != "utf-8":
sys.stderr.reconfigure(encoding="utf-8")
from browser_utils import BrowserManager
from get_email_code import EmailVerificationHandler
from datetime import datetime # 添加这行导入
TOTAL_USAGE = 0
def handle_turnstile(tab):
info("=============正在检测 Turnstile 验证=============")
try:
while True:
try:
if tab.ele("@name=password"):
info("验证成功 - 已到达密码输入页面")
break
if tab.ele("@data-index=0"):
info("验证成功 - 已到达验证码输入页面")
break
if tab.ele("Account Settings"):
info("验证成功 - 已到达账户设置页面")
break
info("检测 Turnstile 验证...")
challengeCheck = (
tab.ele("@id=cf-turnstile", timeout=2)
.child()
.shadow_root.ele("tag:iframe")
.ele("tag:body")
.sr("tag:input")
)
if challengeCheck:
info("检测到 Turnstile 验证,正在处理...")
time.sleep(random.uniform(1, 3))
challengeCheck.click()
time.sleep(2)
info("Turnstile 验证通过")
return True
except:
pass
time.sleep(random.uniform(1, 2))
except Exception as e:
info(f"Turnstile 验证失败: {str(e)}")
return False
def get_cursor_session_token(tab, max_attempts=5, retry_interval=3):
try:
tab.get(SETTINGS_URL)
time.sleep(5)
try:
usage_selector = (
"css:div.col-span-2 > div > div > div > div > "
"div:nth-child(1) > div.flex.items-center.justify-between.gap-2 > "
"span.font-mono.text-sm\\/\\[0\\.875rem\\]"
)
usage_ele = tab.ele(usage_selector)
total_usage = "unknown"
if usage_ele:
total_usage = usage_ele.text.split("/")[-1].strip()
global TOTAL_USAGE
TOTAL_USAGE = total_usage
info(f"使用限制: {total_usage}")
else:
warning("未能找到使用量元素")
except Exception as e:
warning(f"获取使用量信息失败: {str(e)}")
# 继续执行,不要因为获取使用量失败而中断整个流程
info("获取Cookie中...")
attempts = 0
while attempts < max_attempts:
try:
cookies = tab.cookies()
for cookie in cookies:
if cookie.get("name") == "WorkosCursorSessionToken":
user = cookie["value"].split("%3A%3A")[0]
token = cookie["value"].split("%3A%3A")[1]
info(f"获取到账号Token: {token}, 用户: {user}")
return token, user
attempts += 1
if attempts < max_attempts:
warning(f"未找到Cursor会话Token重试中... ({attempts}/{max_attempts})")
time.sleep(retry_interval)
else:
info("未找到Cursor会话Token已达到最大尝试次数")
except Exception as e:
info(f"获取Token出错: {str(e)}")
attempts += 1
if attempts < max_attempts:
info(f"重试获取Token等待时间: {retry_interval}秒,尝试次数: {attempts}/{max_attempts}")
time.sleep(retry_interval)
return False
except Exception as e:
warning(f"获取Token过程出错: {str(e)}")
return False
def sign_up_account(browser, tab, account_info):
info("=============开始注册账号=============")
info(
f"账号信息: 邮箱: {account_info['email']}, 密码: {account_info['password']}, 姓名: {account_info['first_name']} {account_info['last_name']}"
)
tab.get(SIGN_UP_URL)
try:
if tab.ele("@name=first_name"):
info("=============正在填写个人信息=============")
tab.actions.click("@name=first_name").input(account_info["first_name"])
info(f"已输入名字: {account_info['first_name']}")
time.sleep(random.uniform(1, 3))
tab.actions.click("@name=last_name").input(account_info["last_name"])
info(f"已输入姓氏: {account_info['last_name']}")
time.sleep(random.uniform(1, 3))
tab.actions.click("@name=email").input(account_info["email"])
info(f"已输入邮箱: {account_info['email']}")
time.sleep(random.uniform(1, 3))
info("=============提交个人信息=============")
tab.actions.click("@type=submit")
except Exception as e:
info(f"填写个人信息失败: {str(e)}")
return "ERROR"
handle_turnstile(tab)
if tab.ele("Can't verify the user is human. Please try again.") or tab.ele(
"Can't verify the user is human. Please try again."
):
info("检测到turnstile验证失败正在重试...")
return "EMAIL_USED"
try:
if tab.ele("@name=password"):
info(f"设置密码:{account_info['password']}")
tab.ele("@name=password").input(account_info["password"])
time.sleep(random.uniform(1, 2))
info("提交密码...")
tab.ele("@type=submit").click()
info("密码设置成功,等待系统响应....")
except Exception as e:
info(f"密码设置失败: {str(e)}")
return "ERROR"
info("处理最终验证...")
handle_turnstile(tab)
if tab.ele("This email is not available."):
info("邮箱已被使用")
return "EMAIL_USED"
if tab.ele("Sign up is restricted."):
info("注册限制")
return "SIGNUP_RESTRICTED"
# 创建邮件处理器
email_handler = EmailVerificationHandler()
i = 0
while i < 5:
try:
if tab.ele("Account Settings"):
info("注册成功,已进入账号设置页面")
break
if tab.ele("@data-index=0"):
info("等待输入验证码...")
# 切换到邮箱标签页
code = email_handler.get_verification_code(source_email=account_info["email"])
info(f"输入验证码: {code}")
i = 0
for digit in code:
tab.ele(f"@data-index={i}").input(digit)
time.sleep(random.uniform(0.3, 0.6))
i += 1
info("验证码输入完成")
time.sleep(random.uniform(3, 5))
break
except Exception as e:
info(f"验证码处理失败: {str(e)}")
return "ERROR"
info("完成最终验证...")
handle_turnstile(tab)
time.sleep(random.uniform(3, 5))
info("账号注册流程完成")
return "SUCCESS"
class EmailGenerator:
def __init__(
self,
):
# 将密码生成移到这里,避免类定义时执行随机密码生成
self.default_first_name = self.generate_random_name()
self.default_last_name = self.generate_random_name()
# 从配置文件获取域名配置
self.domains = EMAIL_DOMAINS
info(f"当前可用域名: {self.domains}")
self.email = None
self.password = None
def generate_random_password(self, length=12):
"""生成随机密码 - 改进密码生成算法,确保包含各类字符"""
chars = "abcdefghijklmnopqrstuvwxyz"
upper_chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
digits = "0123456789"
special = "!@#$%^&*"
# 确保密码包含至少一个大写字母、一个数字和一个特殊字符
password = [
random.choice(chars),
random.choice(upper_chars),
random.choice(digits),
random.choice(special)
]
# 添加剩余随机字符
password.extend(random.choices(chars + upper_chars + digits + special, k=length-4))
# 打乱密码顺序
random.shuffle(password)
return "".join(password)
def generate_random_name(self, length=6):
"""生成随机用户名"""
first_letter = random.choice("ABCDEFGHIJKLMNOPQRSTUVWXYZ")
rest_letters = "".join(
random.choices("abcdefghijklmnopqrstuvwxyz", k=length - 1)
)
return first_letter + rest_letters
def generate_email(self, length=6):
"""生成随机邮箱地址,使用随机域名"""
random_str = "".join(random.choices("abcdefghijklmnopqrstuvwxyz", k=length))
timestamp = str(int(time.time()))[-4:] # 使用时间戳后4位
# 随机选择一个域名
domain = random.choice(self.domains)
return f"{random_str}{timestamp}@{domain}"
def get_account_info(self):
"""获取账号信息,确保每次调用都生成新的邮箱和密码"""
self.email = self.generate_email()
self.password = self.generate_random_password()
return {
"email": self.email,
"password": self.password,
"first_name": self.default_first_name.capitalize(),
"last_name": self.default_last_name.capitalize(),
}
def _save_account_info(self, user, token, total_usage):
try:
from database import get_session, AccountModel
import asyncio
import time
async def save_to_db():
info(f"开始保存账号信息: {self.email}")
async with get_session() as session:
# 检查账号是否已存在
from sqlalchemy import select
result = await session.execute(
select(AccountModel).where(AccountModel.email == self.email)
)
existing_account = result.scalar_one_or_none()
if existing_account:
info(f"更新现有账号信息 (ID: {existing_account.id})")
existing_account.token = token
existing_account.user = user
existing_account.password = self.password
existing_account.usage_limit = str(total_usage)
# 如果账号状态是删除,更新为活跃
if existing_account.status == "deleted":
existing_account.status = "active"
# 不更新id保留原始id值
else:
info("创建新账号记录")
# 生成毫秒级时间戳作为id
timestamp_ms = int(time.time() * 1000)
account = AccountModel(
email=self.email,
password=self.password,
token=token,
user=user,
usage_limit=str(total_usage),
created_at=datetime.now().strftime("%Y-%m-%d %H:%M"),
status="active", # 设置默认状态为活跃
id=timestamp_ms # 设置毫秒时间戳id
)
session.add(account)
await session.commit()
info(f"账号 {self.email} 信息保存成功")
return True
return asyncio.run(save_to_db())
except Exception as e:
info(f"保存账号信息失败: {str(e)}")
return False
def cleanup_and_exit(browser_manager=None, exit_code=0):
"""清理资源并退出程序"""
try:
if browser_manager:
info("正在关闭浏览器")
if hasattr(browser_manager, "browser"):
browser_manager.browser.quit()
current_process = psutil.Process()
children = current_process.children(recursive=True)
for child in children:
try:
child.terminate()
except:
pass
info("程序正常退出")
sys.exit(exit_code)
except Exception as e:
info(f"清理退出时发生错误: {str(e)}")
sys.exit(1)
def main():
browser_manager = None
max_retries = REGISTRATION_MAX_RETRIES # 从配置文件获取
current_retry = 0
try:
email_generator = EmailGenerator()
browser_manager = BrowserManager()
browser = browser_manager.init_browser()
while current_retry < max_retries:
try:
account_info = email_generator.get_account_info()
info(f"初始化账号信息成功 => 邮箱: {account_info['email']}, 用户名: {account_info['first_name']}, 密码: {account_info['password']}")
signup_tab = browser.new_tab(LOGIN_URL)
browser.activate_tab(signup_tab)
signup_tab.run_js("try { turnstile.reset() } catch(e) { }")
result = sign_up_account(browser, signup_tab, account_info)
if result == "SUCCESS":
token, user = get_cursor_session_token(signup_tab)
info(f"获取到账号Token: {token}, 用户: {user}")
if token:
email_generator._save_account_info(user, token, TOTAL_USAGE)
info("注册流程完成")
cleanup_and_exit(browser_manager, 0)
else:
info("获取Cursor会话Token失败")
current_retry += 1
elif result in ["EMAIL_USED", "SIGNUP_RESTRICTED", "VERIFY_FAILED"]:
info(f"遇到问题: {result},尝试切换邮箱...")
continue # 使用新邮箱重试注册
else: # ERROR
info("遇到错误,准备重试...")
current_retry += 1
# 关闭标签页,准备下一次尝试
signup_tab.close()
time.sleep(2)
except Exception as e:
info(f"当前尝试发生错误: {str(e)}")
current_retry += 1
time.sleep(2)
try:
# 尝试关闭可能存在的标签页
if "signup_tab" in locals():
signup_tab.close()
except:
pass
info(f"达到最大重试次数 {max_retries},注册失败")
except Exception as e:
info(f"主程序错误: {str(e)}")
info(f"错误详情: {traceback.format_exc()}")
cleanup_and_exit(browser_manager, 1)
finally:
cleanup_and_exit(browser_manager, 1)

84
database.py Normal file
View File

@@ -0,0 +1,84 @@
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy import Column, String, DateTime, Text, text, BigInteger, Index
from datetime import datetime
import os
import pathlib
from contextlib import asynccontextmanager
from logger import info, error
from config import DATABASE_URL
# 基础模型类
class Base(DeclarativeBase):
pass
# 账号模型
class AccountModel(Base):
__tablename__ = "accounts"
email = Column(String, primary_key=True)
user = Column(String, nullable=False)
password = Column(String, nullable=True)
token = Column(String, nullable=False)
usage_limit = Column(Text, nullable=True)
created_at = Column(Text, nullable=True)
status = Column(String, default="active", nullable=False)
id = Column(BigInteger, nullable=False, index=True) # 添加毫秒时间戳列并创建索引
def create_engine():
"""创建数据库引擎"""
# 直接使用配置文件中的数据库URL
engine = create_async_engine(
DATABASE_URL,
echo=False,
connect_args={"check_same_thread": False} if "sqlite" in DATABASE_URL else {},
future=True,
)
# info(f"数据库引擎创建成功: {DATABASE_URL}")
return engine
@asynccontextmanager
async def get_session() -> AsyncSession:
"""创建数据库会话的异步上下文管理器"""
# 为每个请求创建新的引擎和会话
engine = create_engine()
async_session = async_sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False, future=True
)
session = async_session()
try:
# 确保连接有效
await session.execute(text("SELECT 1"))
yield session
except Exception as e:
error(f"数据库会话错误: {str(e)}")
try:
await session.rollback()
except Exception as rollback_error:
error(f"回滚过程中出错: {str(rollback_error)}")
raise
finally:
try:
await session.close()
except Exception as e:
error(f"关闭会话时出错: {str(e)}")
try:
await engine.dispose()
except Exception as e:
error(f"释放引擎时出错: {str(e)}")
async def init_db():
"""初始化数据库表结构"""
try:
engine = create_engine()
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
await engine.dispose()
info("数据库初始化成功")
except Exception as e:
error(f"数据库初始化失败: {str(e)}")
raise

109
get_email_code.py Normal file
View File

@@ -0,0 +1,109 @@
from logger import info, warning, error
import time
import re
import requests
from config import EMAIL_USERNAME, EMAIL_DOMAIN, EMAIL_PIN, EMAIL_VERIFICATION_RETRIES, EMAIL_VERIFICATION_WAIT
class EmailVerificationHandler:
def __init__(self, username=None, domain=None, pin=None):
self.username = username or EMAIL_USERNAME
self.domain = domain or EMAIL_DOMAIN
self.session = requests.Session()
self.emailExtension = f"@{self.domain}"
self.pin = pin or EMAIL_PIN
info(f"初始化邮箱验证器成功: {self.username}{self.emailExtension} pin: {self.pin}")
def get_verification_code(self, source_email=None, max_retries=None, wait_time=None):
"""
获取验证码,增加了重试机制
Args:
max_retries: 最大重试次数
wait_time: 每次重试间隔时间(秒)
Returns:
str: 验证码或None
"""
max_retries = max_retries or EMAIL_VERIFICATION_RETRIES
wait_time = wait_time or EMAIL_VERIFICATION_WAIT
for attempt in range(max_retries):
try:
code, mail_id = self._get_latest_mail_code(source_email)
if code:
info(f"成功获取验证码: {code}")
return code
if attempt < max_retries - 1:
info(f"未找到验证码,{wait_time}秒后重试 ({attempt+1}/{max_retries})...")
time.sleep(wait_time)
except Exception as e:
error(f"获取验证码失败: {str(e)}")
if attempt < max_retries - 1:
info(f"将在{wait_time}秒后重试...")
time.sleep(wait_time)
return None
# 手动输入验证码
def _get_latest_mail_code(self, source_email=None):
info(f"开始获取邮件列表")
# 获取邮件列表
mail_list_url = f"https://tempmail.plus/api/mails?email={self.username}{self.emailExtension}&limit=20&epin={self.pin}"
mail_list_response = self.session.get(mail_list_url)
mail_list_data = mail_list_response.json()
time.sleep(0.5)
if not mail_list_data.get("result"):
return None, None
# 获取最新邮件的ID
first_id = mail_list_data.get("first_id")
if not first_id:
return None, None
info(f"开始获取邮件详情: {first_id}")
# 获取具体邮件内容
mail_detail_url = f"https://tempmail.plus/api/mails/{first_id}?email={self.username}{self.emailExtension}&epin={self.pin}"
mail_detail_response = self.session.get(mail_detail_url)
mail_detail_data = mail_detail_response.json()
time.sleep(0.5)
if not mail_detail_data.get("result"):
return None, None
# 从邮件文本中提取6位数字验证码
mail_text = mail_detail_data.get("text", "")
# 如果提供了source_email确保邮件内容中包含该邮箱地址
if source_email and source_email.lower() not in mail_text.lower():
error(f"邮件内容不包含指定的邮箱地址: {source_email}")
else:
info(f"邮件内容包含指定的邮箱地址: {source_email}")
code_match = re.search(r"(?<![a-zA-Z@.])\b\d{6}\b", mail_text)
if code_match:
return code_match.group(), first_id
return None, None
def _cleanup_mail(self, first_id):
# 构造删除请求的URL和数据
delete_url = "https://tempmail.plus/api/mails/"
payload = {
"email": f"{self.username}{self.emailExtension}",
"first_id": first_id,
"epin": self.pin,
}
# 最多尝试5次
for _ in range(5):
response = self.session.delete(delete_url, data=payload)
try:
result = response.json().get("result")
if result is True:
return True
except:
pass
# 如果失败,等待0.5秒后重试
time.sleep(0.5)
return False

1804
index.html Normal file

File diff suppressed because it is too large Load Diff

29
logger.py Normal file
View File

@@ -0,0 +1,29 @@
import logging
import os
import sys
from config import LOG_LEVEL, LOG_FORMAT, LOG_DATE_FORMAT
# 配置日志
logging.basicConfig(
level=getattr(logging, LOG_LEVEL),
format=LOG_FORMAT,
datefmt=LOG_DATE_FORMAT,
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler("app.log", encoding="utf-8"),
],
)
logger = logging.getLogger()
def info(message):
logger.info(message)
def warning(message):
logger.warning(message)
def error(message):
logger.error(message)
def debug(message):
logger.debug(message)

30
migrate_add_id.py Normal file
View File

@@ -0,0 +1,30 @@
import asyncio
import time
from sqlalchemy import select, update
from database import init_db, get_session, AccountModel
async def migrate_add_id():
"""为现有记录添加ID字段"""
await init_db()
async with get_session() as session:
# 查询所有没有id的记录
result = await session.execute(
select(AccountModel).where(AccountModel.id == None)
)
accounts = result.scalars().all()
print(f"找到 {len(accounts)} 条需要更新的记录")
# 为每条记录添加id
for i, account in enumerate(accounts):
# 生成基于索引的间隔时间戳,避免所有记录使用同一时间戳
timestamp_ms = int(time.time() * 1000) - (len(accounts) - i) * 1000
account.id = timestamp_ms
print(f"更新记录 {account.email} 的ID为 {timestamp_ms}")
await session.commit()
print("迁移完成")
if __name__ == "__main__":
asyncio.run(migrate_add_id())

11
requirements.txt Normal file
View File

@@ -0,0 +1,11 @@
DrissionPage==4.1.0.9
psutil==6.1.0
fastapi==0.109.0
uvicorn==0.27.0
pydantic==2.5.3
sqlalchemy==2.0.25
asyncpg==0.29.0
python-dotenv==1.0.0
psycopg2-binary==2.9.9
playwright==1.41.2
aiosqlite==0.21.0

134
reset_machine.py Normal file
View File

@@ -0,0 +1,134 @@
import os
import sys
import json
import uuid
import hashlib
import shutil
from colorama import Fore, Style, init
# 初始化colorama
init()
# 定义emoji和颜色常量
EMOJI = {
"FILE": "📄",
"BACKUP": "💾",
"SUCCESS": "",
"ERROR": "",
"INFO": "",
"RESET": "🔄",
}
class MachineIDResetter:
def __init__(self):
# 判断操作系统
if sys.platform == "win32": # Windows
appdata = os.getenv("APPDATA")
if appdata is None:
raise EnvironmentError("APPDATA 环境变量未设置")
self.db_path = os.path.join(
appdata, "Cursor", "User", "globalStorage", "storage.json"
)
elif sys.platform == "darwin": # macOS
self.db_path = os.path.abspath(
os.path.expanduser(
"~/Library/Application Support/Cursor/User/globalStorage/storage.json"
)
)
elif sys.platform == "linux": # Linux 和其他类Unix系统
self.db_path = os.path.abspath(
os.path.expanduser("~/.config/Cursor/User/globalStorage/storage.json")
)
else:
raise NotImplementedError(f"不支持的操作系统: {sys.platform}")
def generate_new_ids(self):
"""生成新的机器ID"""
# 生成新的UUID
dev_device_id = str(uuid.uuid4())
# 生成新的machineId (64个字符的十六进制)
machine_id = hashlib.sha256(os.urandom(32)).hexdigest()
# 生成新的macMachineId (128个字符的十六进制)
mac_machine_id = hashlib.sha512(os.urandom(64)).hexdigest()
# 生成新的sqmId
sqm_id = "{" + str(uuid.uuid4()).upper() + "}"
return {
"telemetry.devDeviceId": dev_device_id,
"telemetry.macMachineId": mac_machine_id,
"telemetry.machineId": machine_id,
"telemetry.sqmId": sqm_id,
}
def reset_machine_ids(self):
"""重置机器ID并备份原文件"""
try:
print(f"{Fore.CYAN}{EMOJI['INFO']} 正在检查配置文件...{Style.RESET_ALL}")
# 检查文件是否存在
if not os.path.exists(self.db_path):
print(
f"{Fore.RED}{EMOJI['ERROR']} 配置文件不存在: {self.db_path}{Style.RESET_ALL}"
)
return False
# 检查文件权限
if not os.access(self.db_path, os.R_OK | os.W_OK):
print(
f"{Fore.RED}{EMOJI['ERROR']} 无法读写配置文件,请检查文件权限!{Style.RESET_ALL}"
)
print(
f"{Fore.RED}{EMOJI['ERROR']} 如果你使用过 go-cursor-help 来修改 ID; 请修改文件只读权限 {self.db_path} {Style.RESET_ALL}"
)
return False
# 读取现有配置
print(f"{Fore.CYAN}{EMOJI['FILE']} 读取当前配置...{Style.RESET_ALL}")
with open(self.db_path, "r", encoding="utf-8") as f:
config = json.load(f)
# 生成新的ID
print(f"{Fore.CYAN}{EMOJI['RESET']} 生成新的机器标识...{Style.RESET_ALL}")
new_ids = self.generate_new_ids()
# 更新配置
config.update(new_ids)
# 保存新配置
print(f"{Fore.CYAN}{EMOJI['FILE']} 保存新配置...{Style.RESET_ALL}")
with open(self.db_path, "w", encoding="utf-8") as f:
json.dump(config, f, indent=4)
print(f"{Fore.GREEN}{EMOJI['SUCCESS']} 机器标识重置成功!{Style.RESET_ALL}")
print(f"\n{Fore.CYAN}新的机器标识:{Style.RESET_ALL}")
for key, value in new_ids.items():
print(f"{EMOJI['INFO']} {key}: {Fore.GREEN}{value}{Style.RESET_ALL}")
return True
except PermissionError as e:
print(f"{Fore.RED}{EMOJI['ERROR']} 权限错误: {str(e)}{Style.RESET_ALL}")
print(
f"{Fore.YELLOW}{EMOJI['INFO']} 请尝试以管理员身份运行此程序{Style.RESET_ALL}"
)
return False
except Exception as e:
print(f"{Fore.RED}{EMOJI['ERROR']} 重置过程出错: {str(e)}{Style.RESET_ALL}")
return False
if __name__ == "__main__":
print(f"\n{Fore.CYAN}{'='*50}{Style.RESET_ALL}")
print(f"{Fore.CYAN}{EMOJI['RESET']} Cursor 机器标识重置工具{Style.RESET_ALL}")
print(f"{Fore.CYAN}{'='*50}{Style.RESET_ALL}")
resetter = MachineIDResetter()
resetter.reset_machine_ids()
print(f"\n{Fore.CYAN}{'='*50}{Style.RESET_ALL}")
input(f"{EMOJI['INFO']} 按回车键退出...")

BIN
tokenManager/.DS_Store vendored Normal file

Binary file not shown.

57
tokenManager/cursor.py Normal file
View File

@@ -0,0 +1,57 @@
import requests
class Cursor:
models = [
"claude-3-5-sonnet-20241022",
"claude-3-opus",
"claude-3.5-haiku",
"claude-3.5-sonnet",
"cursor-fast",
"cursor-small",
"deepseek-r1",
"deepseek-v3",
"gemini-2.0-flash-exp",
"gemini-2.0-flash-thinking-exp",
"gemini-exp-1206",
"gpt-3.5-turbo",
"gpt-4",
"gpt-4-turbo-2024-04-09",
"gpt-4o",
"gpt-4o-mini",
"o1",
"o1-mini",
"o1-preview",
"o3-mini",
]
@classmethod
def get_remaining_balance(cls, user, token):
url = f"https://www.cursor.com/api/usage?user={user}"
headers = {
"Content-Type": "application/json",
"Cookie": f"WorkosCursorSessionToken={user}%3A%3A{token}",
}
response = requests.get(url, headers=headers)
usage = response.json().get("gpt-4", None)
if (
usage is None
or "maxRequestUsage" not in usage
or "numRequests" not in usage
):
return None
return usage["maxRequestUsage"] - usage["numRequests"]
@classmethod
def get_trial_remaining_days(cls, user, token):
url = f"https://www.cursor.com/api/auth/stripe"
headers = {
"Content-Type": "application/json",
"Cookie": f"WorkosCursorSessionToken={user}%3A%3A{token}",
}
response = requests.get(url, headers=headers)
remaining_days = response.json().get("daysRemainingOnTrial", None)
return remaining_days

View File

@@ -0,0 +1,125 @@
#####################################
#
# If you meet 429 when running this script, please increase the `GLOBAL_API_RATE_LIMIT` in your Chat-API service.
# See more details in https://github.com/ai365vip/chat-api?tab=readme-ov-file#%E7%8E%AF%E5%A2%83%E5%8F%98%E9%87%8F
#
#####################################
import argparse
import concurrent.futures
from .oneapi_manager import OneAPIManager
from .cursor import Cursor
def handle_oneapi_cursor_channel(
oneapi: OneAPIManager,
channel_id,
test_channel: bool,
disable_low_balance_channel: bool,
delete_low_balance_channel: bool,
low_balance_threshold=10,
):
if test_channel:
test_response = oneapi.test_channel(channel_id)
if test_response.status_code != 200:
print(
f"Fail to test channel {channel_id}. Status Code: {response.status_code}"
)
return None
response = oneapi.get_channel(channel_id)
if response.status_code != 200:
print(f"Fail to get channel {channel_id}. Status Code: {response.status_code}")
return None
data = response.json()["data"]
key = data["key"]
status = data["status"] # 1 for enable, 2 for disbale
test_time = data["test_time"]
response_time = data["response_time"]
remaining_balance = Cursor.get_remaining_balance(key)
remaining_days = Cursor.get_trial_remaining_days(key)
print(
f"[OneAPI] Channel {channel_id} Info: Balance = {remaining_balance}. Trial Remaining Days = {remaining_days}. Response Time = {response_time}"
)
if None in [remaining_balance, remaining_days]:
print(f"[OneAPI] Invalid resposne")
return None
if remaining_balance < low_balance_threshold or (
test_time != 0 and response_time < 1000
): # or remaining_days <= 0:
if delete_low_balance_channel:
response = oneapi.delete_channel(channel_id)
print(
f"[OneAPI] Delete Channel {channel_id}. Status Coue: {response.status_code}"
)
elif disable_low_balance_channel and status == 1:
response = oneapi.disable_channel(channel_id)
print(
f"[OneAPI] Disable Channel {channel_id}. Status Code: {response.status_code}"
)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--oneapi_url", type=str, required=False, help="URL link for One-API website"
)
parser.add_argument(
"--oneapi_token", type=str, required=False, help="Token for One-API website"
)
parser.add_argument(
"--test_channel", default=False, type=lambda x: (str(x).lower() == "true")
)
parser.add_argument(
"--disable_low_balance_accounts",
default=False,
type=lambda x: (str(x).lower() == "true"),
)
parser.add_argument(
"--delete_low_balance_accounts",
default=False,
type=lambda x: (str(x).lower() == "true"),
)
parser.add_argument(
"--max_workers",
type=int,
default=10,
help="How many workers in multi-threading",
)
args = parser.parse_args()
oneapi_url = args.oneapi_url
oneapi_token = args.oneapi_token
test_channel = args.test_channel
disable_low_balance_accounts = args.disable_low_balance_accounts
delete_low_balance_accounts = args.delete_low_balance_accounts
max_workers = args.max_workers
oneapi = OneAPIManager(oneapi_url, oneapi_token)
response_channels = oneapi.get_channels(0, 2147483647)
channels = response_channels.json()["data"]
channels_ids = [channel["id"] for channel in channels]
channels_ids = sorted(channels_ids, key=int)
print(f"[OneAPI] Channel Count: {len(channels_ids)}")
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [
executor.submit(
handle_oneapi_cursor_channel,
oneapi,
id,
test_channel,
disable_low_balance_accounts,
delete_low_balance_accounts,
)
for id in channels_ids
]
for future in concurrent.futures.as_completed(futures):
result = future.result()

View File

@@ -0,0 +1,97 @@
import requests
class OneAPIManager:
def __init__(self, url, access_token):
self.base_url = url
self.access_token = access_token
self.headers = {
"Content-Type": "application/json",
"Authorization": self.access_token
}
def get_channel(self, id):
url = self.base_url + f"/api/channel/{id}"
response = requests.get(url, headers=self.headers)
return response
def get_channels(self, page, pagesize):
url = self.base_url + f"/api/channel/?p={page}&page_size={pagesize}"
response = requests.get(url, headers=self.headers)
return response
# Support multiple keys separated by '\n'
def add_channel(self, name, base_url, key, models, rate_limit_count = 0):
url = self.base_url + "/api/channel"
data = {"name": name,
"type": 1,
"key": key,
"openai_organization": "",
"base_url": base_url,
"other": "",
"model_mapping":"",
"status_code_mapping":"",
"headers":"",
"models": ','.join(models),
"auto_ban":0,
"is_image_url_enabled": 0,
"model_test": models[0],
"tested_time": 0,
"priority": 0,
"weight": 0,
"groups": ["default"],
"proxy_url": "",
"region": "",
"sk": "",
"ak": "",
"project_id": "",
"client_id": "",
"client_secret": "",
"refresh_token": "",
"gcp_account": "",
"rate_limit_count":rate_limit_count,
"gemini_model":"",
"tags":"",
"rate_limited":rate_limit_count>0,
"is_tools": False,
"claude_original_request": False,
"group":"default"
}
response = requests.post(url, json=data, headers=self.headers)
return response
def delete_channel(self, id):
url = self.base_url + f"/api/channel/{id}"
response = requests.delete(url, headers=self.headers)
return response
def enable_channel(self, id):
url = self.base_url + f"/api/channel"
data = {
"id": id,
"status": 1
}
response = requests.put(url, json=data, headers=self.headers)
return response
def disable_channel(self, id):
url = self.base_url + f"/api/channel"
data = {
"id": id,
"status": 2
}
response = requests.put(url, json=data, headers=self.headers)
return response
def test_channel(self, id, model = ""):
url = self.base_url + f"/api/channel/test/{id}?model={model}"
response = requests.get(url, headers=self.headers)
return response

View File

@@ -0,0 +1,18 @@
{
"manifest_version": 3,
"name": "Turnstile Patcher",
"version": "2.1",
"content_scripts": [
{
"js": [
"./script.js"
],
"matches": [
"<all_urls>"
],
"run_at": "document_start",
"all_frames": true,
"world": "MAIN"
}
]
}

View File

@@ -0,0 +1 @@

12
turnstilePatch/script.js Normal file
View File

@@ -0,0 +1,12 @@
function getRandomInt(min, max) {
return Math.floor(Math.random() * (max - min + 1)) + min;
}
// old method wouldn't work on 4k screens
let screenX = getRandomInt(800, 1200);
let screenY = getRandomInt(400, 600);
Object.defineProperty(MouseEvent.prototype, 'screenX', { value: screenX });
Object.defineProperty(MouseEvent.prototype, 'screenY', { value: screenY });

15
vercel.json Normal file
View File

@@ -0,0 +1,15 @@
{
"version": 2,
"builds": [
{
"src": "api.py",
"use": "@vercel/python"
}
],
"routes": [
{
"src": "/(.*)",
"dest": "api.py"
}
]
}