from __future__ import annotations import logging import os import asyncio from typing import Iterator from flask import send_from_directory, request from inspect import signature try: from PIL import Image has_pillow = True except ImportError: has_pillow = False from ...errors import VersionNotFoundError, MissingAuthError from ...image.copy_images import copy_media, ensure_media_dir, get_media_dir from ...image import get_width_height from ...tools.run_tools import iter_run_tools from ... import Provider from ...providers.base_provider import ProviderModelMixin from ...providers.retry_provider import BaseRetryProvider from ...providers.helper import format_media_prompt from ...providers.response import * from ...providers.any_model_map import model_map from ...providers.any_provider import AnyProvider from ...client.service import get_model_and_provider from ... import version, models from ... import debug logger = logging.getLogger(__name__) class Api: @staticmethod def get_models(): return [{ "name": model.name, "image": isinstance(model, models.ImageModel), "vision": isinstance(model, models.VisionModel), "audio": isinstance(model, models.AudioModel), "video": isinstance(model, models.VideoModel), "providers": [ getattr(provider, "parent", provider.__name__) for provider in providers if provider.working ] } for model, providers in models.__models__.values()] @staticmethod def get_provider_models(provider: str, api_key: str = None, api_base: str = None, ignored: list = None): def get_model_data(provider: ProviderModelMixin, model: str, default: bool = False) -> dict: return { "model": model, "label": model.split(":")[-1] if provider.__name__ == "AnyProvider" and not model.startswith("openrouter:") else model, "default": default or model == provider.default_model, "vision": model in provider.vision_models, "audio": False if provider.audio_models is None else model in provider.audio_models, "video": model in provider.video_models, "image": model in provider.image_models, "count": False if provider.models_count is None else provider.models_count.get(model), } if provider in Provider.__map__: provider = Provider.__map__[provider] if issubclass(provider, ProviderModelMixin): has_grouped_models = hasattr(provider, "get_grouped_models") method = provider.get_grouped_models if has_grouped_models else provider.get_models if "api_key" in signature(provider.get_models).parameters: models = method(api_key=api_key, api_base=api_base) elif "ignored" in signature(provider.get_models).parameters: models = method(ignored=ignored) else: models = method() if has_grouped_models: return [{ "group": model["group"], "models": [get_model_data(provider, name) for name in model["models"]] } for model in models] return [ get_model_data(provider, model) for model in models ] elif provider in model_map: return [get_model_data(AnyProvider, provider, True)] return [] @staticmethod def get_providers() -> dict[str, str]: def safe_get_models(provider: ProviderModelMixin): if not isinstance(provider, ProviderModelMixin): return True try: return provider.get_models() except Exception as e: logger.exception(e) return True return [{ "name": provider.__name__, "label": provider.label if hasattr(provider, "label") else provider.__name__, "parent": getattr(provider, "parent", None), "image": len(getattr(provider, "image_models", [])), "audio": len(getattr(provider, "audio_models", [])), "video": len(getattr(provider, "video_models", [])), "vision": getattr(provider, "default_vision_model", None) is not None, "nodriver": getattr(provider, "use_nodriver", False), "hf_space": getattr(provider, "hf_space", False), "active_by_default": False if provider.active_by_default is None else provider.active_by_default, "auth": provider.needs_auth, "login_url": getattr(provider, "login_url", None), "live": provider.live } for provider in Provider.__providers__ if provider.working and safe_get_models(provider)] def get_all_models(self) -> dict[str, list]: def safe_get_provider_models(provider: ProviderModelMixin) -> list[str]: try: return list(provider.get_models()) except Exception as e: debug.error("{provider.__name__} get_models error:", e) return [] return { provider.__name__: safe_get_provider_models(provider) for provider in Provider.__providers__ if provider.working and hasattr(provider, "get_models") } @staticmethod def get_version() -> dict: current_version = None latest_version = None try: current_version = version.utils.current_version try: if request.args.get("cache"): latest_version = version.utils.latest_version_cached except RuntimeError: pass if latest_version is None: latest_version = version.utils.latest_version except VersionNotFoundError: pass return { "version": current_version, "latest_version": latest_version, } def serve_images(self, name): ensure_media_dir() return send_from_directory(os.path.abspath(get_media_dir()), name) def _prepare_conversation_kwargs(self, json_data: dict): kwargs = {**json_data} model = kwargs.pop('model', None) provider = kwargs.pop('provider', None) messages = kwargs.pop('messages', None) action = kwargs.get('action') if action == "continue": kwargs["tool_calls"].append({ "function": { "name": "continue_tool" }, "type": "function" }) conversation = kwargs.pop("conversation", None) if isinstance(conversation, dict): kwargs["conversation"] = JsonConversation(**conversation) return { "model": model, "provider": provider, "messages": messages, "ignore_stream": True, **kwargs } def _create_response_stream(self, kwargs: dict, provider: str, download_media: bool = True, tempfiles: list[str] = []) -> Iterator: def decorated_log(*values: str, file = None): debug.logs.append(" ".join([str(value) for value in values])) if debug.logging: debug.log_handler(*values, file=file) if "user" not in kwargs: debug.log = decorated_log proxy = os.environ.get("G4F_PROXY") try: model, provider_handler = get_model_and_provider( kwargs.get("model"), provider or AnyProvider, has_images="media" in kwargs, ) if "user" in kwargs: debug.error("User:", kwargs.get("user", "Unknown")) debug.error("Referrer:", kwargs.get("referer", "")) debug.error("User-Agent:", kwargs.get("user-agent", "")) except Exception as e: logger.exception(e) yield self._format_json('error', type(e).__name__, message=get_error_message(e)) return if not isinstance(provider_handler, BaseRetryProvider): if not provider: provider = provider_handler.__name__ yield self.handle_provider(provider_handler, model) if hasattr(provider_handler, "get_parameters"): yield self._format_json("parameters", provider_handler.get_parameters(as_json=True)) try: result = iter_run_tools(provider_handler, **{**kwargs, "model": model, "download_media": download_media}) for chunk in result: if isinstance(chunk, ProviderInfo): yield self.handle_provider(chunk, model) elif isinstance(chunk, JsonConversation): if provider is not None: yield self._format_json("conversation", chunk.get_dict() if provider == "AnyProvider" else { provider: chunk.get_dict() }) elif isinstance(chunk, Exception): logger.exception(chunk) yield self._format_json('message', get_error_message(chunk), error=type(chunk).__name__) elif isinstance(chunk, RequestLogin): yield self._format_json("preview", chunk.to_string()) elif isinstance(chunk, PreviewResponse): yield self._format_json("preview", chunk.to_string()) elif isinstance(chunk, ImagePreview): yield self._format_json("preview", chunk.to_string(), urls=chunk.urls, alt=chunk.alt) elif isinstance(chunk, MediaResponse): media = chunk if download_media or chunk.get("cookies"): chunk.alt = format_media_prompt(kwargs.get("messages"), chunk.alt) width, height = get_width_height(chunk.get("width"), chunk.get("height")) tags = [model, kwargs.get("aspect_ratio"), kwargs.get("resolution")] media = asyncio.run(copy_media( chunk.get_list(), chunk.get("cookies"), chunk.get("headers"), proxy=proxy, alt=chunk.alt, tags=tags, add_url=True, timeout=kwargs.get("timeout"), return_target=True if isinstance(chunk, ImageResponse) else False, )) options = {} target_paths, urls = get_target_paths_and_urls(media) if target_paths: if has_pillow: try: with Image.open(target_paths[0]) as img: width, height = img.size options = {"width": width, "height": height} except Exception as e: logger.exception(e) options["target_paths"] = target_paths media = ImageResponse(urls, chunk.alt, options) if isinstance(chunk, ImageResponse) else VideoResponse(media, chunk.alt) yield self._format_json("content", str(media), urls=media.urls, alt=media.alt) elif isinstance(chunk, SynthesizeData): yield self._format_json("synthesize", chunk.get_dict()) elif isinstance(chunk, TitleGeneration): yield self._format_json("title", chunk.title) elif isinstance(chunk, RequestLogin): yield self._format_json("login", str(chunk)) elif isinstance(chunk, Parameters): yield self._format_json("parameters", chunk.get_dict()) elif isinstance(chunk, FinishReason): yield self._format_json("finish", chunk.get_dict()) elif isinstance(chunk, Usage): yield self._format_json("usage", chunk.get_dict()) elif isinstance(chunk, Reasoning): yield self._format_json("reasoning", **chunk.get_dict()) elif isinstance(chunk, YouTubeResponse): yield self._format_json("content", chunk.to_string()) elif isinstance(chunk, AudioResponse): yield self._format_json("content", str(chunk), data=chunk.data) elif isinstance(chunk, SuggestedFollowups): yield self._format_json("suggestions", chunk.suggestions) elif isinstance(chunk, DebugResponse): yield self._format_json("log", chunk.log) elif isinstance(chunk, ContinueResponse): yield self._format_json("continue", chunk.log) elif isinstance(chunk, RawResponse): yield self._format_json(chunk.type, **chunk.get_dict()) else: yield self._format_json("content", str(chunk)) except MissingAuthError as e: yield self._format_json('auth', type(e).__name__, message=get_error_message(e)) except (TimeoutError, asyncio.exceptions.CancelledError) as e: if "user" in kwargs: debug.error(e, "User:", kwargs.get("user", "Unknown")) yield self._format_json('error', type(e).__name__, message=get_error_message(e)) except Exception as e: if "user" in kwargs: debug.error(e, "User:", kwargs.get("user", "Unknown")) logger.exception(e) yield self._format_json('error', type(e).__name__, message=get_error_message(e)) finally: yield from self._yield_logs() for tempfile in tempfiles: try: os.remove(tempfile) except Exception as e: logger.exception(e) def _yield_logs(self): if debug.logs: for log in debug.logs: yield self._format_json("log", log) debug.logs = [] def _format_json(self, response_type: str, content = None, **kwargs): if content is not None and isinstance(response_type, str): return { 'type': response_type, response_type: content, **kwargs } return { 'type': response_type, **kwargs } def handle_provider(self, provider_handler, model): if not getattr(provider_handler, "model", False): return self._format_json("provider", {**provider_handler.get_dict(), "model": model}) return self._format_json("provider", provider_handler.get_dict()) def get_error_message(exception: Exception) -> str: return f"{type(exception).__name__}: {exception}" def get_target_paths_and_urls(media: list[Union[str, tuple[str, str]]]) -> tuple[list[str], list[str]]: target_paths = [] urls = [] for item in media: if isinstance(item, tuple): item, target_path = item target_paths.append(target_path) urls.append(item) return target_paths, urls