update Yupp and LMArena providers (#3291)

* Handle Cloudflare WAF errors in Qwen provider

Added detection and raising of CloudflareError when the response indicates an Aliyun WAF block. This improves error handling for cases where requests are blocked by Cloudflare.

* Improve Qwen and LMArena provider authentication handling

Refactors Qwen provider to better manage authentication cookies and cache, including fallback and refresh logic for Cloudflare errors and rate limits. Adds use of AuthFileMixin to Qwen, improves argument retrieval from cache or nodriver, and ensures cookies are merged after requests. Updates LMArena to prioritize args from kwargs before reading from cache, improving flexibility and reliability in authentication.

* Improve LMArena provider recaptcha handling and error logging

Refactors the LMArena provider to better handle recaptcha token acquisition and error cases, including new async methods for recaptcha retrieval and improved error logging. Updates dependencies and imports, and enhances the raise_for_status utility to detect LMArena-specific recaptcha validation failures.

* Add image upload and caching to LMArena provider

Introduces image upload support with caching in the LMArena provider by implementing a prepare_images method. Images are uploaded, cached by hash, and attached to user messages for models supporting vision. Refactors attachment handling to use the new upload logic and improves code formatting and error handling.

* Update and expand model definitions in LMArena.py

Replaces the previous 'models' list with an updated and expanded set of model definitions, including new fields such as 'name', 'rank', and 'rankByModality'. This change adds new models, updates capabilities, and provides more detailed metadata for each model, improving model selection and feature support.

* Improve reCAPTCHA handling and set default timeout

Refactors the reCAPTCHA execution to use the enterprise.ready callback and adds error handling for token retrieval. Also sets a default timeout of 5 minutes for StreamSession if not provided.

* Update LMArena.py

* StreamSession

* Improve error logging for Qwen Cloudflare errors

Replaces a generic debug log with a more detailed error log that includes the exception message when a CloudflareError is caught in the Qwen provider. This enhances troubleshooting by providing more context in logs.

* generate ssxmod

* Improve error handling for Qwen provider responses

Adds checks for JSON error responses and raises RuntimeError when 'success' is false or a 'code' is present in the response data. Also refines HTML error detection logic in raise_for_status.

* Update fingerprint.py

* Update Yupp.py

for test only

* Update Yupp.py

* Add Qwen bx-ua header generator and update Qwen provider

Introduces g4f/Provider/qwen/generate_ua.py for generating bx-ua headers, including AES encryption and fingerprinting logic. Updates Qwen provider to support dynamic UA/cookie handling and refactors image preparation in LMArena to handle empty media lists. Minor cleanup in cookie_generator.py and preparation for integrating bx-ua header in Qwen requests.

* Update LMArena.py

* Update LMArena.py

* Update LMArena.py

* Add user_info method to Yupp provider

Introduces a new async class method user_info to fetch and parse user details, credits, and model information from Yupp. Updates create_async_generator to yield user_info at the start of the conversation flow. Also fixes a bug in get_last_user_message call by passing a boolean for the prompt argument.

* Update Yupp.py

* Update models.py

* Update Yupp.py

* Enhance LMArena action ID handling and parsing

Refactored LMArena to dynamically extract and update action IDs from HTML/JS, replacing hardcoded values with a class-level dictionary. Added HTML parsing logic to load available actions and models, improving maintainability and adaptability to backend changes. Minor cleanup and improved code structure in Yupp and LMArena providers.

* Update LMArena.py
This commit is contained in:
Ammar
2025-12-22 14:43:35 +02:00
committed by GitHub
parent 807b7c8b06
commit ddcdcef882
9 changed files with 1671 additions and 569 deletions

View File

@@ -13,15 +13,14 @@ from urllib.parse import quote
import aiohttp
from g4f.image import to_bytes, detect_file_type
from g4f.requests import raise_for_status
from .base_provider import AsyncGeneratorProvider, ProviderModelMixin
from .helper import get_last_user_message
from .qwen.cookie_generator import generate_cookies
from .. import debug
from ..errors import RateLimitError, ResponseError
from ..errors import RateLimitError, ResponseError, CloudflareError
from ..image import to_bytes, detect_file_type
from ..providers.response import JsonConversation, Reasoning, Usage, ImageResponse, FinishReason
from ..requests import sse_stream
from ..requests.aiohttp import StreamSession
from ..requests import sse_stream, StreamSession, raise_for_status, get_args_from_nodriver
from ..tools.media import merge_media
from ..typing import AsyncResult, Messages, MediaListType
@@ -31,7 +30,12 @@ try:
has_curl_cffi = True
except ImportError:
has_curl_cffi = False
try:
import nodriver
has_nodriver = True
except ImportError:
has_nodriver = False
# Global variables to manage Qwen Image Cache
ImagesCache: Dict[str, dict] = {}
@@ -154,7 +158,7 @@ class Qwen(AsyncGeneratorProvider, ProviderModelMixin):
return cls.models
@classmethod
async def prepare_files(cls, media, session: aiohttp.ClientSession, headers=None) -> list:
async def prepare_files(cls, media, session: StreamSession, headers=None) -> list:
if headers is None:
headers = {}
files = []
@@ -252,10 +256,38 @@ class Qwen(AsyncGeneratorProvider, ProviderModelMixin):
"file_class": file_class,
"uploadTaskId": str(uuid.uuid4())
}
debug.log(f"Uploading file: {file_url}")
ImagesCache[image_hash] = file
files.append(file)
return files
@classmethod
async def get_args(cls, proxy, **kwargs):
grecaptcha = []
async def callback(page: nodriver.Tab):
while not await page.evaluate('window.__baxia__ && window.__baxia__.getFYModule'):
await asyncio.sleep(1)
captcha = await page.evaluate(
"""window.baxiaCommon.getUA()""",
await_promise=True)
if isinstance(captcha, str):
grecaptcha.append(captcha)
else:
raise Exception(captcha)
args = await get_args_from_nodriver(cls.url, proxy=proxy, callback=callback)
return args, next(iter(grecaptcha))
@classmethod
async def raise_for_status(cls, response, message=None):
await raise_for_status(response, message)
content_type = response.headers.get("content-type", "")
if content_type.startswith("text/html"):
html = (await response.text()).strip()
if html.startswith('<!doctypehtml>') and "aliyun_waf_aa" in html:
raise CloudflareError(message or html)
@classmethod
async def create_async_generator(
cls,
@@ -283,9 +315,29 @@ class Qwen(AsyncGeneratorProvider, ProviderModelMixin):
Txt2Txt = "t2t"
WebDev = "web_dev"
"""
# cache_file = cls.get_cache_file()
# cookie: str = kwargs.get("cookie", "") # ssxmod_itna=1-...
# args = kwargs.get("qwen_args", {})
# args.setdefault("cookies", {})
token = kwargs.get("token")
# if not args and cache_file.exists():
# try:
# with cache_file.open("r") as f:
# args = json.load(f)
# except json.JSONDecodeError:
# debug.log(f"Cache file {cache_file} is corrupted, removing it.")
# cache_file.unlink()
# if not cookie:
# if not args:
# args = await cls.get_args(proxy, **kwargs)
# cookie = "; ".join([f"{k}={v}" for k, v in args["cookies"].items()])
model_name = cls.get_model(model)
cookie = kwargs.get("cookie", "") # ssxmod_itna=1-...
prompt = get_last_user_message(messages)
timeout = kwargs.get("timeout") or 5 * 60
# for _ in range(2):
# data = generate_cookies()
# args,ua = await cls.get_args(proxy, **kwargs)
headers = {
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36',
'Accept': '*/*',
@@ -297,24 +349,19 @@ class Qwen(AsyncGeneratorProvider, ProviderModelMixin):
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
'Connection': 'keep-alive',
'Cookie': cookie,
# 'Cookie': f'ssxmod_itna={data["ssxmod_itna"]};ssxmod_itna2={data["ssxmod_itna2"]}',
'Authorization': f'Bearer {token}' if token else "Bearer",
'Source': 'web'
}
prompt = get_last_user_message(messages)
_timeout = kwargs.get("timeout")
if isinstance(_timeout, aiohttp.ClientTimeout):
timeout = _timeout
else:
total = float(_timeout) if isinstance(_timeout, (int, float)) else 5 * 60
timeout = aiohttp.ClientTimeout(total=total)
# try:
async with StreamSession(headers=headers) as session:
try:
async with session.get('https://chat.qwen.ai/api/v1/auths/', proxy=proxy) as user_info_res:
user_info_res.raise_for_status()
await cls.raise_for_status(user_info_res)
debug.log(await user_info_res.json())
except:
...
except Exception as e:
debug.error(e)
for attempt in range(5):
try:
if not cls._midtoken:
@@ -336,19 +383,21 @@ class Qwen(AsyncGeneratorProvider, ProviderModelMixin):
req_headers = session.headers.copy()
req_headers['bx-umidtoken'] = cls._midtoken
req_headers['bx-v'] = '2.5.31'
# req_headers['bx-ua'] = ua
message_id = str(uuid.uuid4())
if conversation is None:
chat_payload = {
"title": "New Chat",
"models": [model_name],
"chat_mode": "normal",
"chat_mode": "normal",# local
"chat_type": chat_type,
"timestamp": int(time() * 1000)
}
async with session.post(
f'{cls.url}/api/v2/chats/new', json=chat_payload, headers=req_headers, proxy=proxy
f'{cls.url}/api/v2/chats/new', json=chat_payload, headers=req_headers,
proxy=proxy
) as resp:
resp.raise_for_status()
await cls.raise_for_status(resp)
data = await resp.json()
if not (data.get('success') and data['data'].get('id')):
raise RuntimeError(f"Failed to create chat: {data}")
@@ -367,7 +416,7 @@ class Qwen(AsyncGeneratorProvider, ProviderModelMixin):
"stream": stream,
"incremental_output": stream,
"chat_id": conversation.chat_id,
"chat_mode": "normal",
"chat_mode": "normal",# local
"model": model_name,
"parent_id": conversation.parent_id,
"messages": [
@@ -400,22 +449,24 @@ class Qwen(AsyncGeneratorProvider, ProviderModelMixin):
msg_payload["size"] = aspect_ratio
async with session.post(
f'{cls.url}/api/v2/chat/completions?chat_id={conversation.chat_id}', json=msg_payload,
f'{cls.url}/api/v2/chat/completions?chat_id={conversation.chat_id}',
json=msg_payload,
headers=req_headers, proxy=proxy, timeout=timeout, cookies=conversation.cookies
) as resp:
first_line = await resp.content.readline()
line_str = first_line.decode().strip()
if line_str.startswith('{'):
data = json.loads(line_str)
if data.get("data", {}).get("code"):
raise RuntimeError(f"Response: {data}")
conversation.parent_id = data.get("response.created", {}).get("response_id")
yield conversation
await cls.raise_for_status(resp)
if resp.headers.get("content-type", "").startswith("application/json"):
resp_json = await resp.json()
if resp_json.get("success") is False or resp_json.get("data", {}).get("code"):
raise RuntimeError(f"Response: {resp_json}")
# args["cookies"] = merge_cookies(args.get("cookies"), resp)
thinking_started = False
usage = None
async for chunk in sse_stream(resp):
try:
if "response.created" in chunk:
conversation.parent_id = chunk.get("response.created", {}).get(
"response_id")
yield conversation
error = chunk.get("error", {})
if error:
raise ResponseError(f'{error["code"]}: {error["details"]}')
@@ -457,5 +508,11 @@ class Qwen(AsyncGeneratorProvider, ProviderModelMixin):
continue
else:
raise e
raise RateLimitError("The Qwen provider reached the request limit after 5 attempts.")
# except CloudflareError as e:
# debug.error(f"{cls.__name__}: {e}")
# args = await cls.get_args(proxy, **kwargs)
# cookie = "; ".join([f"{k}={v}" for k, v in args["cookies"].items()])
# continue
raise RateLimitError("The Qwen provider reached the limit Cloudflare.")

View File

@@ -9,7 +9,7 @@ import uuid
import aiohttp
from .helper import get_last_user_message
from .yupp.models import YuppModelManager
from .yupp.models import YuppModelManager, ModelProcessor
from ..cookies import get_cookies
from ..debug import log
from ..errors import RateLimitError, ProviderException, MissingAuthError
@@ -300,6 +300,114 @@ class Yupp(AsyncGeneratorProvider, ProviderModelMixin):
files.append(file)
return files
@classmethod
async def user_info(cls, account: YUPP_ACCOUNT, kwargs: dict):
history: dict = {}
user_info = {}
def pars_children(data):
data = data["children"]
if len(data) < 4:
return
if data[1] in ["div", "defs", "style", "script"]:
return
pars_data(data[3])
def pars_data(data):
if not isinstance(data, (list, dict)):
return
if isinstance(data, dict):
json_data = data.get("json") or {}
elif data[0] == "$":
if data[1] in ["div", "defs", "style", "script"]:
return
json_data = data[3]
else:
return
if 'session' in json_data:
user_info.update(json_data['session']['user'])
elif "state" in json_data:
for query in json_data["state"]["queries"]:
if query["state"]["dataUpdateCount"] == 0:
continue
if "getCredits" in query["queryHash"]:
credits = query["state"]["data"]["json"]
user_info["credits"] = credits
elif "getSidebarChatsV2" in query["queryHash"]:
for page in query["state"]["data"]["json"]["pages"]:
for item in page["items"]:
history[item["id"]] = item
elif 'categories' in json_data:
...
elif 'children' in json_data:
pars_children(json_data)
elif isinstance(json_data, list):
if "supportedAttachmentMimeTypes" in json_data[0]:
models = json_data
cls.models_tags = {model.get("name"): ModelProcessor.generate_tags(model) for
model in models}
cls.models = [model.get("name") for model in models]
cls.image_models = [model.get("name") for model in models if
model.get("isImageGeneration")]
cls.vision_models = [model.get("name") for model in models if
"image/*" in model.get("supportedAttachmentMimeTypes", [])]
try:
async with StreamSession() as session:
headers = {
"content-type": "text/plain;charset=UTF-8",
"cookie": f"__Secure-yupp.session-token={account['token']}",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36 Edg/137.0.0.0",
}
async with session.get("https://yupp.ai", headers=headers, ) as response:
response.raise_for_status()
response.content._high_water = 10 * 1024 * 1024 # 10MB per line
line_pattern = re.compile("^([0-9a-fA-F]+):(.*)")
async for line in response.content:
line = line.decode()
# Pattern to match self.__next_f.push([...])
pattern = r'self\.__next_f\.push\((\[[\s\S]*?\])\)(?=<\/script>)'
matches = re.findall(pattern, line)
for match in matches:
# Parse the JSON array
data = json.loads(match)
for chunk in data[1].split("\n"):
match = line_pattern.match(chunk)
if not match:
continue
chunk_id, chunk_data = match.groups()
if chunk_data.startswith("I["):
data = json.loads(chunk_data[1:])
if data[2] == "HomePagePromptForm":
for js in data[1][::-1]:
js_url = f"{cls.url}{js}"
async with session.get(js_url, headers=headers, ) as js_response:
js_text = await js_response.text()
if "startNewChat" in js_text:
# changeStyle, continueChat, retryResponse, showMoreResponses, startNewChat
start_id = re.findall(r'\("([a-f0-9]{40,})".*?"(\w+)"\)', js_text)
for v, k in start_id:
kwargs[k] = v
break
elif chunk_data.startswith(("[", "{")):
try:
data = json.loads(chunk_data)
pars_data(data)
except json.decoder.JSONDecodeError:
...
except Exception as e:
log_debug(f"user_info error: {str(e)}")
except Exception as e:
log_debug(f"user_info error: {str(e)}")
if user_info:
log_debug(
f"user:{user_info.get('name')} credits:{user_info.get('credits')} onboardingStatus:{user_info.get('onboardingStatus')}")
return user_info
@classmethod
async def create_async_generator(
cls,
@@ -331,7 +439,7 @@ class Yupp(AsyncGeneratorProvider, ProviderModelMixin):
if is_new_conversation:
prompt = format_messages_for_yupp(messages)
else:
prompt = get_last_user_message(messages, prompt)
prompt = get_last_user_message(messages, bool(prompt))
log_debug(
f"Use url_uuid: {url_uuid}, Formatted prompt length: {len(prompt)}, Is new conversation: {is_new_conversation}")
@@ -342,11 +450,12 @@ class Yupp(AsyncGeneratorProvider, ProviderModelMixin):
account = await get_best_yupp_account()
if not account:
raise ProviderException("No valid Yupp accounts available")
# user_info, models. prev conversation, credits
user_info: dict = await cls.user_info(account, kwargs)
yield PlainTextResponse(str(user_info))
try:
async with StreamSession() as session:
turn_id = str(uuid.uuid4())
# Handle media attachments
media = kwargs.get("media")
if media:
@@ -375,7 +484,7 @@ class Yupp(AsyncGeneratorProvider, ProviderModelMixin):
url = f"https://yupp.ai/chat/{url_uuid}?stream=true"
# Yield the conversation info first
yield JsonConversation(url_uuid=url_uuid)
next_action = kwargs.get("next_action", "7f7de0a21bc8dc3cee8ba8b6de632ff16f769649dd")
next_action = kwargs.get("startNewChat") or kwargs.get("next_action", "7f7de0a21bc8dc3cee8ba8b6de632ff16f769649dd")
else:
# Continuing existing conversation
payload = [
@@ -389,8 +498,7 @@ class Yupp(AsyncGeneratorProvider, ProviderModelMixin):
files
]
url = f"https://yupp.ai/chat/{url_uuid}?stream=true"
next_action = kwargs.get("next_action", "7f9ec99a63cbb61f69ef18c0927689629bda07f1bf")
next_action = kwargs.get("continueChat") or kwargs.get("next_action", "7f9ec99a63cbb61f69ef18c0927689629bda07f1bf")
headers = {
"accept": "text/x-component",
"content-type": "text/plain;charset=UTF-8",
@@ -401,17 +509,13 @@ class Yupp(AsyncGeneratorProvider, ProviderModelMixin):
log_debug(f"Sending request to: {url}")
log_debug(f"Payload structure: {type(payload)}, length: {len(str(payload))}")
_timeout = kwargs.get("timeout")
if isinstance(_timeout, aiohttp.ClientTimeout):
timeout = _timeout
else:
total = float(_timeout) if isinstance(_timeout, (int, float)) else 5 * 60
timeout = aiohttp.ClientTimeout(total=total)
timeout = kwargs.get("timeout") or 5 * 60
# Send request
async with session.post(url, json=payload, headers=headers, proxy=proxy,
timeout=timeout) as response:
response.raise_for_status()
if response.status == 303:
...
# Make chat private in background
asyncio.create_task(make_chat_private(session, account, url_uuid))
# ٍSolve ValueError: Chunk too big

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,475 @@
import random
import time
from typing import Any, Callable, Dict, List, Optional, Union
from g4f import debug
# Import fingerprint generator (from the Python-converted fingerprint module)
# Make sure you have fingerprint.py in the same folder.
from g4f.Provider.qwen.fingerprint import generate_fingerprint # noqa: F401
# ==================== Config ====================
CUSTOM_BASE64_CHARS = "DGi0YA7BemWnQjCl4_bR3f8SKIF9tUz/xhr2oEOgPpac=61ZqwTudLkM5vHyNXsVJ"
# Hash field positions (need random regeneration)
HASH_FIELDS: Dict[int, str] = {
16: "split", # plugin hash: "count|hash" (replace only hash part)
17: "full", # canvas hash
18: "full", # UA hash 1
31: "full", # UA hash 2
34: "full", # URL hash
36: "full", # doc attribute hash (10-100)
}
# ==================== LZW Compression (JS-faithful port) ====================
def lzw_compress(data: Optional[str], bits: int, char_func: Callable[[int], str]) -> str:
if data is None:
return ""
dictionary: Dict[str, int] = {}
dict_to_create: Dict[str, bool] = {}
c = ""
wc = ""
w = ""
enlarge_in = 2
dict_size = 3
num_bits = 2
result: List[str] = []
value = 0
position = 0
for i in range(len(data)):
c = data[i]
if c not in dictionary:
dictionary[c] = dict_size
dict_size += 1
dict_to_create[c] = True
wc = w + c
if wc in dictionary:
w = wc
else:
if w in dict_to_create:
# output "w" as a raw char (8-bit or 16-bit)
if ord(w[0]) < 256:
# write num_bits zeros
for _ in range(num_bits):
value = (value << 1)
if position == bits - 1:
position = 0
result.append(char_func(value))
value = 0
else:
position += 1
char_code = ord(w[0])
for _ in range(8):
value = (value << 1) | (char_code & 1)
if position == bits - 1:
position = 0
result.append(char_func(value))
value = 0
else:
position += 1
char_code >>= 1
else:
# write a 1 marker
char_code = 1
for _ in range(num_bits):
value = (value << 1) | char_code
if position == bits - 1:
position = 0
result.append(char_func(value))
value = 0
else:
position += 1
char_code = 0
char_code = ord(w[0])
for _ in range(16):
value = (value << 1) | (char_code & 1)
if position == bits - 1:
position = 0
result.append(char_func(value))
value = 0
else:
position += 1
char_code >>= 1
enlarge_in -= 1
if enlarge_in == 0:
enlarge_in = 2 ** num_bits
num_bits += 1
del dict_to_create[w]
else:
# output dictionary code for w
char_code = dictionary[w]
for _ in range(num_bits):
value = (value << 1) | (char_code & 1)
if position == bits - 1:
position = 0
result.append(char_func(value))
value = 0
else:
position += 1
char_code >>= 1
enlarge_in -= 1
if enlarge_in == 0:
enlarge_in = 2 ** num_bits
num_bits += 1
dictionary[wc] = dict_size
dict_size += 1
w = c
# flush remaining w
if w != "":
if w in dict_to_create:
if ord(w[0]) < 256:
for _ in range(num_bits):
value = (value << 1)
if position == bits - 1:
position = 0
result.append(char_func(value))
value = 0
else:
position += 1
char_code = ord(w[0])
for _ in range(8):
value = (value << 1) | (char_code & 1)
if position == bits - 1:
position = 0
result.append(char_func(value))
value = 0
else:
position += 1
char_code >>= 1
else:
char_code = 1
for _ in range(num_bits):
value = (value << 1) | char_code
if position == bits - 1:
position = 0
result.append(char_func(value))
value = 0
else:
position += 1
char_code = 0
char_code = ord(w[0])
for _ in range(16):
value = (value << 1) | (char_code & 1)
if position == bits - 1:
position = 0
result.append(char_func(value))
value = 0
else:
position += 1
char_code >>= 1
enlarge_in -= 1
if enlarge_in == 0:
enlarge_in = 2 ** num_bits
num_bits += 1
del dict_to_create[w]
else:
char_code = dictionary[w]
for _ in range(num_bits):
value = (value << 1) | (char_code & 1)
if position == bits - 1:
position = 0
result.append(char_func(value))
value = 0
else:
position += 1
char_code >>= 1
enlarge_in -= 1
if enlarge_in == 0:
enlarge_in = 2 ** num_bits
num_bits += 1
# end-of-stream marker (2)
char_code = 2
for _ in range(num_bits):
value = (value << 1) | (char_code & 1)
if position == bits - 1:
position = 0
result.append(char_func(value))
value = 0
else:
position += 1
char_code >>= 1
# pad to complete a char
while True:
value = (value << 1)
if position == bits - 1:
result.append(char_func(value))
break
position += 1
return "".join(result)
# ==================== Encoding ====================
def custom_encode(data: Optional[str], url_safe: bool) -> str:
if data is None:
return ""
base64_chars = CUSTOM_BASE64_CHARS
compressed = lzw_compress(
data,
6,
lambda index: base64_chars[index] # index should be 0..63
)
if not url_safe:
mod = len(compressed) % 4
if mod == 1:
return compressed + "==="
if mod == 2:
return compressed + "=="
if mod == 3:
return compressed + "="
return compressed
return compressed
# ==================== Helpers ====================
def random_hash() -> int:
return random.randint(0, 0xFFFFFFFF)
def generate_device_id() -> str:
return "".join(random.choice("0123456789abcdef") for _ in range(20))
# ==================== Data parse/process ====================
def parse_real_data(real_data: str) -> List[str]:
return real_data.split("^")
def process_fields(fields: List[str]) -> List[Union[str, int]]:
processed: List[Union[str, int]] = list(fields)
current_timestamp = int(time.time() * 1000)
for idx, typ in HASH_FIELDS.items():
if idx >= len(processed):
continue
if typ == "split":
# field 16: "count|hash" -> replace only hash
val = str(processed[idx])
parts = val.split("|")
if len(parts) == 2:
processed[idx] = f"{parts[0]}|{random_hash()}"
elif typ == "full":
if idx == 36:
processed[idx] = random.randint(10, 100) # 10-100
else:
processed[idx] = random_hash()
# field 33: current timestamp
if 33 < len(processed):
processed[33] = current_timestamp
return processed
# ==================== Cookie generation ====================
def generate_cookies(
real_data: Optional[str] = None,
fingerprint_options: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
if fingerprint_options is None:
fingerprint_options = {}
fingerprint = real_data or generate_fingerprint(fingerprint_options)
fields = parse_real_data(fingerprint)
processed_fields = process_fields(fields)
# ssxmod_itna (37 fields)
ssxmod_itna_data = "^".join(map(str, processed_fields))
ssxmod_itna = "1-" + custom_encode(ssxmod_itna_data, True)
# ssxmod_itna2 (18 fields)
ssxmod_itna2_data = "^".join(map(str, [
processed_fields[0], # device id
processed_fields[1], # sdk version
processed_fields[23], # mode (P/M)
0, "", 0, "", "", 0, # event-related (empty in P mode)
0, 0,
processed_fields[32], # constant (11)
processed_fields[33], # current timestamp
0, 0, 0, 0, 0
]))
ssxmod_itna2 = "1-" + custom_encode(ssxmod_itna2_data, True)
return {
"ssxmod_itna": ssxmod_itna,
"ssxmod_itna2": ssxmod_itna2,
"timestamp": int(processed_fields[33]),
"rawData": ssxmod_itna_data,
"rawData2": ssxmod_itna2_data,
}
def generate_batch(
count: int = 10,
real_data: Optional[str] = None,
fingerprint_options: Optional[Dict[str, Any]] = None
) -> List[Dict[str, Any]]:
return [generate_cookies(real_data, fingerprint_options or {}) for _ in range(count)]
# ssxmod_manager.py
"""
SSXMOD Cookie Manager
Responsible for generating and periodically refreshing ssxmod_itna and ssxmod_itna2 cookies.
"""
# ssxmod_manager_async.py
"""
Async SSXMOD Cookie Manager (asyncio)
Generates and periodically refreshes ssxmod_itna and ssxmod_itna2 cookies.
"""
import asyncio
from typing import Any, Dict, Optional
# Global cookie store
_current_cookies: Dict[str, Any] = {
"ssxmod_itna": "",
"ssxmod_itna2": "",
"timestamp": 0,
}
# Refresh interval (15 minutes) in seconds
REFRESH_INTERVAL_SECONDS = 15 * 60
# Async state
_lock = asyncio.Lock()
_task: Optional[asyncio.Task] = None
_stop_event = asyncio.Event()
async def refresh_cookies():
"""Refresh SSXMOD cookies (async wrapper)."""
global _current_cookies
try:
# generate_cookies() is CPU-bound sync; run it off the event loop.
result = await asyncio.to_thread(generate_cookies)
async with _lock:
_current_cookies = {
"ssxmod_itna": result["ssxmod_itna"],
"ssxmod_itna2": result["ssxmod_itna2"],
"timestamp": result["timestamp"],
}
debug.log("SSXMOD Cookie 已刷新", "SSXMOD")
except Exception as e:
debug.error("SSXMOD Cookie 刷新失败", "SSXMOD", "", str(e))
return _current_cookies
async def _refresh_loop() -> None:
"""Background refresh loop."""
try:
# immediate refresh
await refresh_cookies()
while not _stop_event.is_set():
try:
await asyncio.wait_for(_stop_event.wait(), timeout=REFRESH_INTERVAL_SECONDS)
except asyncio.TimeoutError:
# timeout => refresh
await refresh_cookies()
finally:
# allow restart cleanly
_stop_event.clear()
def init_ssxmod_manager() -> None:
"""
Start the background refresh loop.
Call this AFTER an event loop is running (e.g., inside async main or FastAPI startup).
"""
global _task
if _task is not None and not _task.done():
# already running
return
_stop_event.clear()
_task = asyncio.create_task(_refresh_loop())
debug.log(
f"SSXMOD 管理器已启动,刷新间隔: {REFRESH_INTERVAL_SECONDS / 60:.0f} 分钟",
"SSXMOD",
)
async def stop_refresh() -> None:
"""Stop the background refresh loop."""
global _task
if _task is None:
return
_stop_event.set()
try:
await _task
finally:
_task = None
debug.log("SSXMOD 定时刷新已停止", "SSXMOD")
async def get_ssxmod_itna() -> str:
"""Get current ssxmod_itna."""
async with _lock:
return str(_current_cookies.get("ssxmod_itna", ""))
async def get_ssxmod_itna2() -> str:
"""Get current ssxmod_itna2."""
async with _lock:
return str(_current_cookies.get("ssxmod_itna2", ""))
async def get_cookies() -> Dict[str, Any]:
"""Get full cookie object."""
async with _lock:
return dict(_current_cookies)
# -----------------------
# Example usage
# -----------------------
if __name__ == "__main__":
raw = generate_fingerprint()
data = raw.encode("utf-8")
import zlib
compressed = zlib.compress(data)
import base64
b64_payload = base64.b64encode(compressed).decode("ascii")
print(data)
header_value = f"231!{b64_payload}"

View File

@@ -0,0 +1,207 @@
import random
import time
from typing import Dict, List, Any
# =========================
# DEFAULT TEMPLATE
# =========================
DEFAULT_TEMPLATE: Dict[str, Any] = {
"deviceId": "84985177a19a010dea49",
"sdkVersion": "websdk-2.3.15d",
"initTimestamp": "1765348410850",
"field3": "91",
"field4": "1|15",
"language": "zh-CN",
"timezoneOffset": "-480",
"colorDepth": "16705151|12791",
"screenInfo": "1470|956|283|797|158|0|1470|956|1470|798|0|0",
"field9": "5",
"platform": "MacIntel",
"field11": "10",
"webglRenderer": (
"ANGLE (Apple, ANGLE Metal Renderer: Apple M4, Unspecified Version)"
"|Google Inc. (Apple)"
),
"field13": "30|30",
"field14": "0",
"field15": "28",
"pluginCount": "5",
"vendor": "Google Inc.",
"field29": "8",
"touchInfo": "-1|0|0|0|0",
"field32": "11",
"field35": "0",
"mode": "P",
}
# =========================
# PRESETS
# =========================
SCREEN_PRESETS = {
"1920x1080": "1920|1080|283|1080|158|0|1920|1080|1920|922|0|0",
"2560x1440": "2560|1440|283|1440|158|0|2560|1440|2560|1282|0|0",
"1470x956": "1470|956|283|797|158|0|1470|956|1470|798|0|0",
"1440x900": "1440|900|283|900|158|0|1440|900|1440|742|0|0",
"1536x864": "1536|864|283|864|158|0|1536|864|1536|706|0|0",
}
PLATFORM_PRESETS = {
"macIntel": {
"platform": "MacIntel",
"webglRenderer": (
"ANGLE (Apple, ANGLE Metal Renderer: Apple M4, Unspecified Version)"
"|Google Inc. (Apple)"
),
"vendor": "Google Inc.",
},
"macM1": {
"platform": "MacIntel",
"webglRenderer": (
"ANGLE (Apple, ANGLE Metal Renderer: Apple M1, Unspecified Version)"
"|Google Inc. (Apple)"
),
"vendor": "Google Inc.",
},
"win64": {
"platform": "Win32",
"webglRenderer": (
"ANGLE (NVIDIA, NVIDIA GeForce RTX 3080 Direct3D11 "
"vs_5_0 ps_5_0, D3D11)|Google Inc. (NVIDIA)"
),
"vendor": "Google Inc.",
},
"linux": {
"platform": "Linux x86_64",
"webglRenderer": (
"ANGLE (Intel, Mesa Intel(R) UHD Graphics 630, OpenGL 4.6)"
"|Google Inc. (Intel)"
),
"vendor": "Google Inc.",
},
}
LANGUAGE_PRESETS = {
"zh-CN": {"language": "zh-CN", "timezoneOffset": "-480"},
"zh-TW": {"language": "zh-TW", "timezoneOffset": "-480"},
"en-US": {"language": "en-US", "timezoneOffset": "480"},
"ja-JP": {"language": "ja-JP", "timezoneOffset": "-540"},
"ko-KR": {"language": "ko-KR", "timezoneOffset": "-540"},
}
# =========================
# HELPERS
# =========================
def generate_device_id() -> str:
"""Generate a 20-character hex device ID"""
return "".join(random.choice("0123456789abcdef") for _ in range(20))
def generate_hash() -> int:
"""Generate a 32-bit unsigned random hash"""
return random.randint(0, 0xFFFFFFFF)
# =========================
# CORE LOGIC
# =========================
def generate_fingerprint(options: Dict[str, Any] = None) -> str:
if options is None:
options = {}
config = DEFAULT_TEMPLATE.copy()
# platform preset
platform = options.get("platform")
if platform in PLATFORM_PRESETS:
config.update(PLATFORM_PRESETS[platform])
# screen preset
screen = options.get("screen")
if screen in SCREEN_PRESETS:
config["screenInfo"] = SCREEN_PRESETS[screen]
# language preset
locale = options.get("locale")
if locale in LANGUAGE_PRESETS:
config.update(LANGUAGE_PRESETS[locale])
# custom overrides
if "custom" in options and isinstance(options["custom"], dict):
config.update(options["custom"])
device_id = options.get("deviceId") or generate_device_id()
current_timestamp = int(time.time() * 1000)
plugin_hash = generate_hash()
canvas_hash = generate_hash()
ua_hash1 = generate_hash()
ua_hash2 = generate_hash()
url_hash = generate_hash()
doc_hash = random.randint(10, 100)
fields: List[Any] = [
device_id,
config["sdkVersion"],
config["initTimestamp"],
config["field3"],
config["field4"],
config["language"],
config["timezoneOffset"],
config["colorDepth"],
config["screenInfo"],
config["field9"],
config["platform"],
config["field11"],
config["webglRenderer"],
config["field13"],
config["field14"],
config["field15"],
f'{config["pluginCount"]}|{plugin_hash}',
canvas_hash,
ua_hash1,
"1",
"0",
"1",
"0",
config["mode"],
"0",
"0",
"0",
"416",
config["vendor"],
config["field29"],
config["touchInfo"],
ua_hash2,
config["field32"],
current_timestamp,
url_hash,
config["field35"],
doc_hash,
]
return "^".join(map(str, fields))
def generate_fingerprint_batch(count: int, options: Dict[str, Any] = None) -> List[str]:
return [generate_fingerprint(options) for _ in range(count)]
def parse_fingerprint(fingerprint: str) -> Dict[str, Any]:
fields = fingerprint.split("^")
return {
"deviceId": fields[0],
"sdkVersion": fields[1],
"initTimestamp": fields[2],
"language": fields[5],
"timezoneOffset": fields[6],
"platform": fields[10],
"webglRenderer": fields[12],
"mode": fields[23],
"vendor": fields[28],
"timestamp": fields[33],
"raw": fields,
}

View File

@@ -0,0 +1,348 @@
import random
import time
import base64
import hashlib
from typing import Dict, List, Any, Optional
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad
import json
# =========================
# DEFAULT TEMPLATE
# =========================
DEFAULT_TEMPLATE: Dict[str, Any] = {
"deviceId": "84985177a19a010dea49",
"sdkVersion": "websdk-2.3.15d",
"initTimestamp": "1765348410850",
"field3": "91",
"field4": "1|15",
"language": "zh-CN",
"timezoneOffset": "-480",
"colorDepth": "16705151|12791",
"screenInfo": "1470|956|283|797|158|0|1470|956|1470|798|0|0",
"field9": "5",
"platform": "MacIntel",
"field11": "10",
"webglRenderer": (
"ANGLE (Apple, ANGLE Metal Renderer: Apple M4, Unspecified Version)"
"|Google Inc. (Apple)"
),
"field13": "30|30",
"field14": "0",
"field15": "28",
"pluginCount": "5",
"vendor": "Google Inc.",
"field29": "8",
"touchInfo": "-1|0|0|0|0",
"field32": "11",
"field35": "0",
"mode": "P",
}
# =========================
# BX-UA GENERATOR
# =========================
class BXUAGenerator:
def __init__(self):
self.version = "231"
self.aes_key = None
self.aes_iv = None
def _generate_key_iv(self, seed_data: str) -> tuple:
"""Generate AES key and IV from seed data"""
# Create deterministic key/IV from seed
seed_hash = hashlib.sha256(seed_data.encode()).digest()
key = seed_hash[:16] # 128-bit key
iv = seed_hash[16:32] # 128-bit IV
return key, iv
def _encrypt_aes_cbc(self, data: bytes, key: bytes, iv: bytes) -> bytes:
"""Encrypt data using AES-CBC"""
cipher = AES.new(key, AES.MODE_CBC, iv)
padded_data = pad(data, AES.block_size)
encrypted = cipher.encrypt(padded_data)
return encrypted
def _create_payload(self, fingerprint: str, timestamp: Optional[int] = None) -> Dict[str, Any]:
"""Create the payload structure to be encrypted"""
if timestamp is None:
timestamp = int(time.time() * 1000)
# Extract components from fingerprint
fields = fingerprint.split("^")
payload = {
"v": self.version,
"ts": timestamp,
"fp": fingerprint,
"d": {
"deviceId": fields[0],
"sdkVer": fields[1],
"lang": fields[5],
"tz": fields[6],
"platform": fields[10],
"renderer": fields[12],
"mode": fields[23],
"vendor": fields[28],
},
"rnd": random.randint(1000, 9999),
"seq": 1,
}
# Add checksum
checksum_str = f"{fingerprint}{timestamp}{payload['rnd']}"
payload["cs"] = hashlib.md5(checksum_str.encode()).hexdigest()[:8]
return payload
def generate(self, fingerprint: str, options: Optional[Dict[str, Any]] = None) -> str:
"""
Generate bx-ua header value
Args:
fingerprint: The fingerprint string generated by generate_fingerprint()
options: Optional configuration
- timestamp: Custom timestamp (default: current time)
- seed: Custom seed for key generation
"""
if options is None:
options = {}
# Get timestamp
timestamp = options.get("timestamp")
if timestamp is None:
timestamp = int(time.time() * 1000)
# Create payload
payload = self._create_payload(fingerprint, timestamp)
# Convert to JSON
payload_json = json.dumps(payload, separators=(',', ':'))
# Generate key and IV
seed = options.get("seed", fingerprint)
key, iv = self._generate_key_iv(seed)
# Encrypt
encrypted = self._encrypt_aes_cbc(payload_json.encode(), key, iv)
# Base64 encode
encrypted_b64 = base64.b64encode(encrypted).decode()
# Return in format: version!base64_encoded_data
return f"{self.version}!{encrypted_b64}"
def batch_generate(self, fingerprints: List[str], options: Optional[Dict[str, Any]] = None) -> List[str]:
"""Generate multiple bx-ua values"""
return [self.generate(fp, options) for fp in fingerprints]
# =========================
# FINGERPRINT GENERATOR (From your code)
# =========================
def generate_device_id() -> str:
"""Generate a 20-character hex device ID"""
return "".join(random.choice("0123456789abcdef") for _ in range(20))
def generate_hash() -> int:
"""Generate a 32-bit unsigned random hash"""
return random.randint(0, 0xFFFFFFFF)
def generate_fingerprint(options: Dict[str, Any] = None) -> str:
if options is None:
options = {}
config = DEFAULT_TEMPLATE.copy()
# platform preset
platform = options.get("platform")
if platform:
# Handle platform presets if needed
pass
# screen preset
screen = options.get("screen")
if screen:
# Handle screen presets if needed
pass
# language preset
locale = options.get("locale")
if locale:
# Handle language presets if needed
pass
# custom overrides
if "custom" in options and isinstance(options["custom"], dict):
config.update(options["custom"])
device_id = options.get("deviceId") or generate_device_id()
current_timestamp = int(time.time() * 1000)
plugin_hash = generate_hash()
canvas_hash = generate_hash()
ua_hash1 = generate_hash()
ua_hash2 = generate_hash()
url_hash = generate_hash()
doc_hash = random.randint(10, 100)
fields: List[Any] = [
device_id,
config["sdkVersion"],
config["initTimestamp"],
config["field3"],
config["field4"],
config["language"],
config["timezoneOffset"],
config["colorDepth"],
config["screenInfo"],
config["field9"],
config["platform"],
config["field11"],
config["webglRenderer"],
config["field13"],
config["field14"],
config["field15"],
f'{config["pluginCount"]}|{plugin_hash}',
canvas_hash,
ua_hash1,
"1",
"0",
"1",
"0",
config["mode"],
"0",
"0",
"0",
"416",
config["vendor"],
config["field29"],
config["touchInfo"],
ua_hash2,
config["field32"],
current_timestamp,
url_hash,
config["field35"],
doc_hash,
]
return "^".join(map(str, fields))
# =========================
# USAGE EXAMPLE
# =========================
def example_usage():
"""Example of how to generate bx-ua headers"""
# Initialize generators
fp_gen = BXUAGenerator()
# Generate a fingerprint
fingerprint = generate_fingerprint({
"deviceId": "84985177a19a010dea49",
"custom": {
"language": "zh-CN",
"platform": "MacIntel",
}
})
print("Generated Fingerprint:")
print(fingerprint[:100] + "...")
print()
# Generate bx-ua header
bx_ua = fp_gen.generate(fingerprint, {
"timestamp": int(time.time() * 1000),
"seed": "consistent_seed_for_deterministic_output"
})
print(bx_ua)
print("Generated bx-ua header:")
print(bx_ua[:100] + "...")
print(f"Total length: {len(bx_ua)}")
print()
# Parse the fingerprint (for debugging)
fields = fingerprint.split("^")
parsed = {
"deviceId": fields[0],
"sdkVersion": fields[1],
"language": fields[5],
"timezoneOffset": fields[6],
"platform": fields[10],
"webglRenderer": fields[12],
"mode": fields[23],
"vendor": fields[28],
"timestamp": fields[33],
}
print("Parsed fingerprint info:")
for key, value in parsed.items():
print(f" {key}: {value}")
return bx_ua
def batch_example():
"""Example of batch generation"""
fp_gen = BXUAGenerator()
# Generate multiple fingerprints
fingerprints = [
generate_fingerprint({"deviceId": generate_device_id()})
for _ in range(3)
]
# Generate bx-ua for each
bx_ua_values = fp_gen.batch_generate(fingerprints, {
"seed": "batch_seed"
})
print("Batch Generation Results:")
for i, (fp, bx_ua) in enumerate(zip(fingerprints, bx_ua_values)):
print(f"\n{i + 1}. Device ID: {fp.split('^')[0]}")
print(f" bx-ua: {bx_ua[:80]}...")
return bx_ua_values
if __name__ == "__main__":
print("=" * 60)
print("BX-UA Header Generator")
print("=" * 60)
# Run single example
print("\n1. Single Generation Example:")
print("-" * 40)
example_ua = example_usage()
print("\n" + "=" * 60)
# Run batch example
print("\n2. Batch Generation Example:")
print("-" * 40)
batch_ua = batch_example()
print("\n" + "=" * 60)
print("\nTo use in requests:")
print("```python")
print("import requests")
print("")
print("headers = {")
print(' "bx-ua": f"{example_ua}",')
print(' "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) ...",')
print(' "Accept": "application/json, text/plain, */*",')
print(' "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",')
print(' "Accept-Encoding": "gzip, deflate, br",')
print(' "Connection": "keep-alive",')
print("}")
print('')
print('response = requests.get("https://example.com/api", headers=headers)')
print("```")

View File

@@ -210,11 +210,11 @@ gpt_4o_mini = Model(
best_provider = IterListProvider([Chatai, OIVSCodeSer2, Startnest, OpenaiChat])
)
gpt_4o_mini_audio = AudioModel(
name = 'gpt-4o-mini-audio-preview',
base_provider = 'OpenAI',
best_provider = PollinationsAI
)
# gpt_4o_mini_audio = AudioModel(
# name = 'gpt-4o-mini-audio-preview',
# base_provider = 'OpenAI',
# best_provider = PollinationsAI
# )
gpt_4o_mini_tts = AudioModel(
name = 'gpt-4o-mini-tts',

View File

@@ -1,25 +1,29 @@
from __future__ import annotations
import os
import time
import random
import asyncio
import json
from urllib.parse import urlparse
from typing import Iterator, AsyncIterator
import os
import random
import time
from contextlib import asynccontextmanager
from http.cookies import Morsel
from pathlib import Path
from contextlib import asynccontextmanager
import asyncio
from typing import Iterator, AsyncIterator
from urllib.parse import urlparse
try:
from curl_cffi.requests import Session, Response
from .curl_cffi import StreamResponse, StreamSession, FormData
has_curl_cffi = True
except ImportError:
from typing import Type as Response
from .aiohttp import StreamResponse, StreamSession, FormData
has_curl_cffi = False
try:
import webview
has_webview = True
except ImportError:
has_webview = False
@@ -28,13 +32,16 @@ try:
from nodriver.cdp.network import CookieParam
from nodriver.core.config import find_chrome_executable
from nodriver import Browser, Tab, util
has_nodriver = True
except ImportError:
from typing import Type as Browser
from typing import Type as Tab
has_nodriver = False
try:
from platformdirs import user_config_dir
has_platformdirs = True
except ImportError:
has_platformdirs = False
@@ -51,6 +58,7 @@ if not has_curl_cffi:
def __init__(self, **kwargs):
raise MissingRequirementsError('Install "curl_cffi" package | pip install -U curl_cffi')
async def get_args_from_webview(url: str) -> dict:
if not has_webview:
raise MissingRequirementsError('Install "webview" package')
@@ -74,6 +82,7 @@ async def get_args_from_webview(url: str) -> dict:
window.destroy()
return {"headers": headers, "cookies": cookies}
def get_cookie_params_from_dict(cookies: Cookies, url: str = None, domain: str = None) -> list[CookieParam]:
[CookieParam.from_json({
"name": key,
@@ -82,18 +91,20 @@ def get_cookie_params_from_dict(cookies: Cookies, url: str = None, domain: str =
"domain": domain
}) for key, value in cookies.items()]
async def get_args_from_nodriver(
url: str,
proxy: str = None,
timeout: int = 120,
wait_for: str = None,
callback: callable = None,
cookies: Cookies = None,
browser: Browser = None,
user_data_dir: str = "nodriver"
url: str,
proxy: str = None,
timeout: int = 120,
wait_for: str = None,
callback: callable = None,
cookies: Cookies = None,
browser: Browser = None,
user_data_dir: str = "nodriver",
browser_args: list = None
) -> dict:
if browser is None:
browser, stop_browser = await get_nodriver(proxy=proxy, timeout=timeout, user_data_dir=user_data_dir)
browser, stop_browser = await get_nodriver(proxy=proxy, timeout=timeout, user_data_dir=user_data_dir, browser_args=browser_args)
else:
def stop_browser():
pass
@@ -129,6 +140,7 @@ async def get_args_from_nodriver(
stop_browser()
raise
def merge_cookies(cookies: Iterator[Morsel], response: Response) -> Cookies:
if cookies is None:
cookies = {}
@@ -140,18 +152,21 @@ def merge_cookies(cookies: Iterator[Morsel], response: Response) -> Cookies:
cookies[key] = value
return cookies
def set_browser_executable_path(browser_executable_path: str):
BrowserConfig.browser_executable_path = browser_executable_path
async def get_nodriver(
proxy: str = None,
user_data_dir = "nodriver",
timeout: int = 300,
browser_executable_path: str = None,
**kwargs
proxy: str = None,
user_data_dir="nodriver",
timeout: int = 300,
browser_executable_path: str = None,
**kwargs
) -> tuple[Browser, callable]:
if not has_nodriver:
raise MissingRequirementsError('Install "nodriver" and "platformdirs" package | pip install -U nodriver platformdirs')
raise MissingRequirementsError(
'Install "nodriver" and "platformdirs" package | pip install -U nodriver platformdirs')
user_data_dir = user_config_dir(f"g4f-{user_data_dir}") if user_data_dir and has_platformdirs else None
if browser_executable_path is None:
browser_executable_path = BrowserConfig.browser_executable_path
@@ -190,7 +205,8 @@ async def get_nodriver(
lock_file.write_text(str(time.time()))
debug.log(f"Open nodriver with user_dir: {user_data_dir}")
try:
browser_args = ["--no-sandbox"]
browser_args = kwargs.pop("browser_args", None) or ["--no-sandbox"]
if BrowserConfig.port:
browser_executable_path = "/bin/google-chrome"
browser = await nodriver.start(
@@ -209,6 +225,7 @@ async def get_nodriver(
browser = util.get_registered_instances().pop()
else:
raise
def on_stop():
try:
if BrowserConfig.port is None and browser.connection:
@@ -218,15 +235,18 @@ async def get_nodriver(
finally:
if user_data_dir:
lock_file.unlink(missing_ok=True)
BrowserConfig.stop_browser = on_stop
return browser, on_stop
@asynccontextmanager
async def get_nodriver_session(**kwargs):
browser, stop_browser = await get_nodriver(**kwargs)
yield browser
stop_browser()
async def sse_stream(iter_lines: AsyncIterator[bytes]) -> AsyncIterator[dict]:
if hasattr(iter_lines, "content"):
iter_lines = iter_lines.content
@@ -244,6 +264,7 @@ async def sse_stream(iter_lines: AsyncIterator[bytes]) -> AsyncIterator[dict]:
except json.JSONDecodeError:
raise ValueError(f"Invalid JSON data: {rest}")
async def iter_lines(iter_response: AsyncIterator[bytes], delimiter=None):
"""
iterate streaming content line by line, separated by ``\\n``.
@@ -267,4 +288,4 @@ async def iter_lines(iter_response: AsyncIterator[bytes], delimiter=None):
yield line
if pending is not None:
yield pending
yield pending

View File

@@ -17,6 +17,8 @@ def is_cloudflare(text: str) -> bool:
def is_openai(text: str) -> bool:
return "<p>Unable to load site</p>" in text or 'id="challenge-error-text"' in text
def is_lmarena(text: str) -> bool:
return 'recaptcha validation failed' in text
async def raise_for_status_async(response: Union[StreamResponse, ClientResponse], message: str = None):
if response.ok:
@@ -38,7 +40,7 @@ async def raise_for_status_async(response: Union[StreamResponse, ClientResponse]
message = await response.text()
else:
message = (await response.text()).strip()
is_html = content_type.startswith("text/html") or message.startswith("<!DOCTYPE")
is_html = content_type.startswith("text/html") or message.lower().startswith("<!DOCTYPE".lower())
if message is None or is_html:
if response.status == 520:
message = "Unknown error (Cloudflare)"
@@ -48,7 +50,7 @@ async def raise_for_status_async(response: Union[StreamResponse, ClientResponse]
raise MissingAuthError(f"Response {response.status}: {message}")
if response.status == 403 and is_cloudflare(message):
raise CloudflareError(f"Response {response.status}: Cloudflare detected")
elif response.status == 403 and is_openai(message):
elif response.status == 403 and (is_openai(message) or is_lmarena(message)):
raise MissingAuthError(f"Response {response.status}: OpenAI Bot detected")
elif response.status == 502:
raise ResponseStatusError(f"Response {response.status}: Bad Gateway")