Refactor model handling in PollinationsAI and related providers; update API endpoints, enhance model loading logic, and improve error handling in get_models method

This commit is contained in:
hlohaus
2025-12-09 18:08:35 +01:00
parent aa0aade9c4
commit 0366acbcd1
6 changed files with 104 additions and 46 deletions

View File

@@ -4,9 +4,12 @@ import time
import random
import requests
import asyncio
import json
from urllib.parse import quote, quote_plus
from datetime import datetime
from typing import Optional
from aiohttp import ClientSession, ClientTimeout
from pathlib import Path
from .helper import filter_none, format_media_prompt
from .base_provider import AsyncGeneratorProvider, ProviderModelMixin
@@ -16,9 +19,11 @@ from ..errors import MissingAuthError
from ..requests.raise_for_status import raise_for_status
from ..requests.aiohttp import get_connector
from ..image import use_aspect_ratio
from ..providers.response import ImageResponse, Reasoning, TitleGeneration, SuggestedFollowups, JsonRequest
from ..providers.response import ImageResponse, Reasoning, VideoResponse, JsonRequest
from ..tools.media import render_messages
from ..config import REFFERER_URL
from ..tools.run_tools import AuthManager
from ..cookies import get_cookies_dir
from .template.OpenaiTemplate import read_response
from .. import debug
@@ -40,9 +45,16 @@ class PollinationsAI(AsyncGeneratorProvider, ProviderModelMixin):
supports_message_history = True
# API endpoints
text_api_endpoint = "https://text.pollinations.ai"
openai_endpoint = "https://text.pollinations.ai/openai"
image_api_endpoint = "https://image.pollinations.ai/"
text_api_endpoint = "https://text.pollinations.ai/openai"
image_api_endpoint = "https://image.pollinations.ai/prompt/{}"
gen_image_api_endpoint = "https://gen.pollinations.ai/image/{}"
gen_text_api_endpoint = "https://gen.pollinations.ai/v1/chat/completions"
image_models_endpoint = "https://image.pollinations.ai/models"
text_models_endpoint = "https://text.pollinations.ai/models"
gen_image_models_endpoint = "https://gen.pollinations.ai/image/models"
gen_text_models_endpoint = "https://gen.pollinations.ai/text/models"
g4f_text_models_endpoint = "https://g4f.dev/api/pollinations/models"
nectar_text_models_endpoint = "https://g4f.dev/api/nectar/models"
# Models configuration
default_model = "openai"
@@ -51,11 +63,12 @@ class PollinationsAI(AsyncGeneratorProvider, ProviderModelMixin):
default_vision_model = default_model
default_audio_model = "openai-audio"
default_voice = "alloy"
text_models = [default_model, "evil"]
text_models = [default_model]
image_models = [default_image_model, "turbo", "kontext"]
audio_models = {default_audio_model: []}
vision_models = [default_vision_model]
_models_loaded = False
_gen_models_loaded = False
_free_models_loaded = False
model_aliases = {
"gpt-4.1-nano": "openai-fast",
"llama-4-scout": "llamascout",
@@ -71,19 +84,34 @@ class PollinationsAI(AsyncGeneratorProvider, ProviderModelMixin):
swap_model_aliases = {v: k for k, v in model_aliases.items()}
@classmethod
def get_models(cls, **kwargs):
def get_models(cls, api_key: Optional[str] = None, timeout: Optional[float] = None, **kwargs):
def get_alias(model: dict) -> str:
alias = model.get("name")
if (model.get("aliases")):
alias = model.get("aliases")[0]
elif alias in cls.swap_model_aliases:
alias = cls.swap_model_aliases[alias]
if alias == "searchgpt":
return model.get("name")
return str(alias).replace("-instruct", "").replace("qwen-", "qwen").replace("qwen", "qwen-")
if not api_key:
api_key = AuthManager.load_api_key(cls)
if not cls._models_loaded:
if not cls._free_models_loaded or api_key and not cls._gen_models_loaded:
path = Path(get_cookies_dir()) / "models" / datetime.today().strftime('%Y-%m-%d') / f"{cls.__name__}{'-auth' if api_key else ''}.json"
if path.exists():
try:
data = path.read_text()
models_data = json.loads(data)
for key, value in models_data.items():
setattr(cls, key, value)
return cls.models
except Exception as e:
debug.error(f"Failed to load cached models from {path}: {e}")
try:
# Update of image models
image_response = requests.get("https://image.pollinations.ai/models")
image_response = requests.get(cls.gen_image_models_endpoint if api_key else cls.image_models_endpoint, timeout=timeout)
if image_response.ok:
new_image_models = image_response.json()
else:
@@ -94,14 +122,19 @@ class PollinationsAI(AsyncGeneratorProvider, ProviderModelMixin):
# Add extra image models if not already in the list
for model in new_image_models:
alias = get_alias(model) if isinstance(model, dict) else model
if model not in image_models:
image_models.append(model)
if isinstance(model, str) or "image" in model.get("output_modalities", []):
image_models.append(alias)
if isinstance(model, dict) and alias != model.get("name"):
cls.model_aliases[alias] = model.get("name") if isinstance(model, dict) else model
cls.image_models = image_models
cls.video_models = [get_alias(model) for model in new_image_models if "video" in model.get("output_modalities", [])]
text_response = requests.get("https://g4f.dev/api/pollinations.ai/models")
text_response = requests.get(cls.nectar_text_models_endpoint if api_key else cls.g4f_text_models_endpoint, timeout=timeout)
if not text_response.ok:
text_response = requests.get("https://text.pollinations.ai/models")
text_response = requests.get(cls.gen_text_models_endpoint if api_key else cls.text_models_endpoint, timeout=timeout)
text_response.raise_for_status()
models = text_response.json()
@@ -130,6 +163,7 @@ class PollinationsAI(AsyncGeneratorProvider, ProviderModelMixin):
elif model.get("name") not in cls.text_models:
cls.text_models.append(model.get("name"))
cls.live += 1
cls.swap_model_aliases = {v: k for k, v in cls.model_aliases.items()}
except Exception as e:
# Save default models in case of an error
@@ -140,22 +174,41 @@ class PollinationsAI(AsyncGeneratorProvider, ProviderModelMixin):
debug.error(f"Failed to fetch models: {e}")
finally:
cls._models_loaded = True
# Return unique models across all categories
all_models = cls.text_models.copy()
all_models.extend(cls.image_models)
all_models.extend(cls.audio_models.keys())
if cls.default_audio_model in cls.audio_models:
all_models.extend(cls.audio_models[cls.default_audio_model])
return list(dict.fromkeys(all_models))
if api_key:
cls._gen_models_loaded = True
else:
cls._free_models_loaded = True
# Return unique models across all categories
all_models = cls.text_models.copy()
all_models.extend(cls.image_models)
all_models.extend(cls.audio_models.keys())
if cls.default_audio_model in cls.audio_models:
all_models.extend(cls.audio_models[cls.default_audio_model])
cls.models = all_models
# Cache the models to a file
try:
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "w") as f:
json.dump({
"text_models": cls.text_models,
"image_models": cls.image_models,
"video_models": cls.video_models,
"audio_models": cls.audio_models,
"vision_models": cls.vision_models,
"model_aliases": cls.model_aliases,
"models": cls.models
}, f, indent=4)
except Exception as e:
debug.error(f"Failed to cache models to {path}: {e}")
return cls.models
@classmethod
def get_grouped_models(cls) -> dict[str, list[str]]:
cls.get_models()
def get_grouped_models(cls, **kwargs) -> dict[str, list[str]]:
cls.get_models(**kwargs)
return [
{"group": "Text Generation", "models": cls.text_models},
{"group": "Image Generation", "models": cls.image_models},
{"group": "Video Generation", "models": cls.video_models},
{"group": "Audio Generation", "models": list(cls.audio_models.keys())},
{"group": "Audio Voices", "models": cls.audio_models.get(cls.default_audio_model, [])},
]
@@ -206,13 +259,15 @@ class PollinationsAI(AsyncGeneratorProvider, ProviderModelMixin):
has_audio = True
break
model = cls.default_audio_model if has_audio else cls.default_model
elif cls._models_loaded or cls.get_models():
elif (cls._gen_models_loaded if api_key else cls._free_models_loaded) or cls.get_models(api_key=api_key, timeout=kwargs.get("timeout")):
if model in cls.model_aliases:
model = cls.model_aliases[model]
debug.log(f"Using model: {model}")
if model in cls.image_models:
alias = cls.swap_model_aliases.get(model, model)
if alias in cls.image_models or alias in cls.video_models:
async for chunk in cls._generate_image(
model="gptimage" if model == "transparent" else model,
alias=alias,
prompt=format_media_prompt(messages, prompt),
media=media,
proxy=proxy,
@@ -267,6 +322,7 @@ class PollinationsAI(AsyncGeneratorProvider, ProviderModelMixin):
async def _generate_image(
cls,
model: str,
alias: str,
prompt: str,
media: MediaListType,
proxy: str,
@@ -311,11 +367,9 @@ class PollinationsAI(AsyncGeneratorProvider, ProviderModelMixin):
if model == "gptimage" and aspect_ratio is not None:
encoded_prompt = f"{encoded_prompt} aspect-ratio: {aspect_ratio}"
encoded_prompt = quote_plus(encoded_prompt)[:4096 - len(cls.image_api_endpoint) - len(query) - 8].rstrip("%")
url = f"{cls.image_api_endpoint}prompt/{encoded_prompt}?{query}"
url = (cls.gen_image_api_endpoint if api_key else cls.image_api_endpoint).format(f"{encoded_prompt}?{query}")
def get_url_with_seed(i: int, seed: Optional[int] = None):
if model == "gptimage":
return url
if i == 0:
if not cache and seed is None:
seed = random.randint(0, 2 ** 32)
@@ -323,16 +377,16 @@ class PollinationsAI(AsyncGeneratorProvider, ProviderModelMixin):
seed = random.randint(0, 2 ** 32)
return f"{url}&seed={seed}" if seed else url
headers = {"referer": referrer}
headers = None
if api_key:
headers["authorization"] = f"Bearer {api_key}"
headers = {"authorization": f"Bearer {api_key}"}
async with ClientSession(
headers=DEFAULT_HEADERS,
connector=get_connector(proxy=proxy),
timeout=ClientTimeout(timeout)
headers=DEFAULT_HEADERS,
connector=get_connector(proxy=proxy),
timeout=ClientTimeout(timeout)
) as session:
responses = set()
yield Reasoning(label=f"Generating {n} {'image' if n == 1 else 'images'}")
yield Reasoning(label=f"Generating {n} {('video' if alias in cls.video_models else 'image') + '' if n == 1 else 's'}")
finished = 0
start = time.time()
@@ -344,12 +398,12 @@ class PollinationsAI(AsyncGeneratorProvider, ProviderModelMixin):
except Exception as e:
responses.add(e)
debug.error(f"Error fetching image:", e)
if response.headers['content-type'].startswith("image/"):
if response.headers.get('content-type', '').startswith("image/"):
responses.add(ImageResponse(str(response.url), prompt, {"headers": headers}))
elif response.headers.get('content-type', '').startswith("video/"):
responses.add(VideoResponse(str(response.url), prompt, {"headers": headers}))
else:
t_ = await response.text()
debug.error(f"UnHandel Error fetching image:", t_)
responses.add(t_)
responses.add(Exception(f"Unexpected content type: {response.headers.get('content-type')}"))
tasks: list[asyncio.Task] = []
for i in range(int(n)):
@@ -426,11 +480,11 @@ class PollinationsAI(AsyncGeneratorProvider, ProviderModelMixin):
referrer=referrer,
**extra_body
)
headers = {"referer": referrer}
headers = None
if api_key:
headers["authorization"] = f"Bearer {api_key}"
headers = {"authorization": f"Bearer {api_key}"}
yield JsonRequest.from_dict(data)
async with session.post(cls.openai_endpoint, json=data, headers=headers) as response:
async with session.post(cls.gen_text_api_endpoint if api_key else cls.text_api_endpoint, json=data, headers=headers) as response:
if response.status in (400, 500):
debug.error(f"Error: {response.status} - Bad Request: {data}")
async for chunk in read_response(response, stream, format_media_prompt(messages), cls.get_dict(),

View File

@@ -55,8 +55,12 @@ class PollinationsImage(PollinationsAI):
) -> AsyncResult:
# Calling model updates before creating a generator
cls.get_models()
alias = cls.swap_model_aliases.get(model, model)
if model in cls.model_aliases:
model = cls.model_aliases[model]
async for chunk in cls._generate_image(
model=model,
alias=alias,
prompt=format_media_prompt(messages, prompt),
media=media,
proxy=proxy,

View File

@@ -18,7 +18,7 @@ class Azure(OpenaiTemplate):
working = True
needs_auth = True
models_needs_auth = True
active_by_default = True
active_by_default = False
login_url = "https://discord.gg/qXA4Wf4Fsm"
routes: dict[str, str] = {}
audio_models = ["gpt-4o-mini-audio-preview"]

View File

@@ -47,7 +47,7 @@ class HuggingChat(AsyncAuthedProvider, ProviderModelMixin):
text_models = fallback_models
@classmethod
def get_models(cls):
def get_models(cls, **kwargs) -> list[str]:
if not cls.models:
try:
models = requests.get(f"{cls.url}/api/v2/models").json().get("json")

View File

@@ -115,7 +115,7 @@ class Api:
def get_all_models(self) -> dict[str, list]:
def safe_get_provider_models(provider: ProviderModelMixin) -> list[str]:
try:
return list(provider.get_models())
return list(provider.get_models(timeout=10))
except Exception as e:
debug.error(f"{provider.__name__}: get_models error:", e)
return []
@@ -223,7 +223,7 @@ class Api:
yield self._format_json("preview", chunk.to_string(), urls=chunk.urls, alt=chunk.alt)
elif isinstance(chunk, MediaResponse):
media = chunk
if download_media or chunk.get("cookies"):
if download_media or chunk.get("cookies") or chunk.get("headers"):
chunk.alt = format_media_prompt(kwargs.get("messages"), chunk.alt)
width, height = get_width_height(chunk.get("width"), chunk.get("height"))
tags = [model, kwargs.get("aspect_ratio"), kwargs.get("resolution")]

View File

@@ -76,7 +76,7 @@ class AnyModelProviderMixin(ProviderModelMixin):
return ignored
@classmethod
def get_models(cls, ignored: list[str] = []) -> list[str]:
def get_models(cls, ignored: list[str] = [], **kwargs) -> list[str]:
if not cls.models:
cls.update_model_map()
if not ignored: