refactor: restructure core utilities, typing, and request handling

- In `g4f/__init__.py`, changed logger setup to use fixed "g4f" name and refactored `ChatCompletion.create` and `create_async` to share `_prepare_request` logic for preprocessing arguments
- In `g4f/config.py`, added `__future__.annotations`, `lru_cache` import, wrapped `get_config_dir` with `@lru_cache`, and simplified platform branch logic
- In `g4f/cookies.py`, added typing imports, renamed `browsers` to `BROWSERS`, reformatted `DOMAINS`, updated docstrings, improved loop logic in `load_cookies_from_browsers` with additional exception handling, split HAR/JSON parsing into `_parse_har_file` and `_parse_json_cookie_file`, and enhanced `read_cookie_files` with optional filters and `.env` loading
- In `g4f/debug.py`, added enable/disable logging functions, updated log handler typing, appended messages to `logs` in `log()`, and improved `error()` formatting
- In `g4f/errors.py`, introduced base `G4FError` and updated all exception classes to inherit from it or relevant subclasses, with descriptive docstrings for each
- In `g4f/files.py`, added `max_length` parameter to `secure_filename`, adjusted regex formatting, and added docstring; updated `get_bucket_dir` to sanitize parts inline with docstring
- In `g4f/typing.py`, added `__future__.annotations`, reorganized imports, restricted PIL import to type-checking, defined `ContentPart` and `Message` TypedDicts, updated type aliases and `__all__` to include new types
- In `g4f/version.py`, added `lru_cache` and request timeout constant, applied caching to `get_pypi_version` and `get_github_version`, added response validation and explicit exceptions, refactored `VersionUtils.current_version` with clearer sources and error on miss, changed `check_version` to return a boolean with optional silent mode, and improved error handling outputs
This commit is contained in:
hlohaus
2025-08-09 03:29:44 +02:00
parent b6dd7b39be
commit b6f51f00d8
8 changed files with 441 additions and 282 deletions

View File

@@ -13,35 +13,38 @@ from .providers.types import ProviderType
from .providers.helper import concat_chunks, async_concat_chunks
from .client.service import get_model_and_provider
#Configure "g4f" logger
logger = logging.getLogger(__name__)
log_handler = logging.StreamHandler()
log_handler.setFormatter(logging.Formatter(logging.BASIC_FORMAT))
logger.addHandler(log_handler)
# Configure logger
logger = logging.getLogger("g4f")
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter(logging.BASIC_FORMAT))
logger.addHandler(handler)
logger.setLevel(logging.ERROR)
class ChatCompletion:
@staticmethod
def create(model : Union[Model, str],
messages : Messages,
provider : Union[ProviderType, str, None] = None,
stream : bool = False,
image : ImageType = None,
image_name: Optional[str] = None,
ignore_working: bool = False,
ignore_stream: bool = False,
**kwargs) -> Union[CreateResult, str]:
def _prepare_request(model: Union[Model, str],
messages: Messages,
provider: Union[ProviderType, str, None],
stream: bool,
image: ImageType,
image_name: Optional[str],
ignore_working: bool,
ignore_stream: bool,
**kwargs):
"""Shared pre-processing for sync/async create methods."""
if image is not None:
kwargs["media"] = [(image, image_name)]
elif "images" in kwargs:
kwargs["media"] = kwargs.pop("images")
model, provider = get_model_and_provider(
model, provider, stream,
ignore_working,
ignore_stream,
has_images="media" in kwargs,
)
if "proxy" not in kwargs:
proxy = os.environ.get("G4F_PROXY")
if proxy:
@@ -49,36 +52,40 @@ class ChatCompletion:
if ignore_stream:
kwargs["ignore_stream"] = True
result = provider.create_function(model, messages, stream=stream, **kwargs)
return model, provider, kwargs
@staticmethod
def create(model: Union[Model, str],
messages: Messages,
provider: Union[ProviderType, str, None] = None,
stream: bool = False,
image: ImageType = None,
image_name: Optional[str] = None,
ignore_working: bool = False,
ignore_stream: bool = False,
**kwargs) -> Union[CreateResult, str]:
model, provider, kwargs = ChatCompletion._prepare_request(
model, messages, provider, stream, image, image_name,
ignore_working, ignore_stream, **kwargs
)
result = provider.create_function(model, messages, stream=stream, **kwargs)
return result if stream or ignore_stream else concat_chunks(result)
@staticmethod
def create_async(model : Union[Model, str],
messages : Messages,
provider : Union[ProviderType, str, None] = None,
stream : bool = False,
image : ImageType = None,
def create_async(model: Union[Model, str],
messages: Messages,
provider: Union[ProviderType, str, None] = None,
stream: bool = False,
image: ImageType = None,
image_name: Optional[str] = None,
ignore_stream: bool = False,
ignore_working: bool = False,
ignore_stream: bool = False,
**kwargs) -> Union[AsyncResult, Coroutine[str]]:
if image is not None:
kwargs["media"] = [(image, image_name)]
elif "images" in kwargs:
kwargs["media"] = kwargs.pop("images")
model, provider = get_model_and_provider(model, provider, False, ignore_working, has_images="media" in kwargs)
if "proxy" not in kwargs:
proxy = os.environ.get("G4F_PROXY")
if proxy:
kwargs["proxy"] = proxy
if ignore_stream:
kwargs["ignore_stream"] = True
model, provider, kwargs = ChatCompletion._prepare_request(
model, messages, provider, stream, image, image_name,
ignore_working, ignore_stream, **kwargs
)
result = provider.async_create_function(model, messages, stream=stream, **kwargs)
if not stream and not ignore_stream:
if hasattr(result, "__aiter__"):
if not stream and not ignore_stream and hasattr(result, "__aiter__"):
result = async_concat_chunks(result)
return result

View File

@@ -1,17 +1,20 @@
from __future__ import annotations
import os
import sys
from pathlib import Path
from functools import lru_cache
# Platform-appropriate directories
@lru_cache(maxsize=1)
def get_config_dir() -> Path:
"""Get platform-appropriate config directory."""
if sys.platform == "win32":
return Path(os.environ.get("APPDATA", Path.home() / "AppData" / "Roaming"))
elif sys.platform == "darwin":
return Path.home() / "Library" / "Application Support"
else: # Linux and other UNIX-like
return Path.home() / ".config"
PACKAGE_NAME = "g4f"
CONFIG_DIR = get_config_dir() / PACKAGE_NAME
COOKIES_DIR = CONFIG_DIR / "cookies"

View File

@@ -3,12 +3,14 @@ from __future__ import annotations
import os
import time
import json
from typing import Optional, List
try:
from platformdirs import user_config_dir
has_platformdirs = True
except ImportError:
has_platformdirs = False
try:
from browser_cookie3 import (
chrome, chromium, opera, opera_gx,
@@ -19,12 +21,6 @@ try:
def g4f(domain_name: str) -> list:
"""
Load cookies from the 'g4f' browser (if exists).
Args:
domain_name (str): The domain for which to load cookies.
Returns:
list: List of cookies.
"""
if not has_platformdirs:
return []
@@ -32,7 +28,7 @@ try:
cookie_file = os.path.join(user_data_dir, "Default", "Cookies")
return [] if not os.path.exists(cookie_file) else chrome(cookie_file, domain_name)
browsers = [
BROWSERS = [
g4f, firefox,
chrome, chromium, opera, opera_gx,
brave, edge, vivaldi,
@@ -40,43 +36,38 @@ try:
has_browser_cookie3 = True
except ImportError:
has_browser_cookie3 = False
browsers = []
BROWSERS: List = []
from .typing import Dict, Cookies
from .errors import MissingRequirementsError
from .config import COOKIES_DIR, CUSTOM_COOKIES_DIR
from . import debug
class CookiesConfig():
class CookiesConfig:
cookies: Dict[str, Cookies] = {}
cookies_dir: str = CUSTOM_COOKIES_DIR if os.path.exists(CUSTOM_COOKIES_DIR) else str(COOKIES_DIR)
DOMAINS = [
DOMAINS = (
".bing.com",
".meta.ai",
".google.com",
"www.whiterabbitneo.com",
"huggingface.co",
".huggingface.co"
".huggingface.co",
"chat.reka.ai",
"chatgpt.com",
".cerebras.ai",
"github.com",
]
)
if has_browser_cookie3 and os.environ.get('DBUS_SESSION_BUS_ADDRESS') == "/dev/null":
if has_browser_cookie3 and os.environ.get("DBUS_SESSION_BUS_ADDRESS") == "/dev/null":
_LinuxPasswordManager.get_password = lambda a, b: b"secret"
def get_cookies(domain_name: str, raise_requirements_error: bool = True, single_browser: bool = False, cache_result: bool = True) -> Dict[str, str]:
"""
Load cookies for a given domain from all supported browsers and cache the results.
Args:
domain_name (str): The domain for which to load cookies.
Returns:
Dict[str, str]: A dictionary of cookie names and values.
"""
def get_cookies(domain_name: str, raise_requirements_error: bool = True,
single_browser: bool = False, cache_result: bool = True) -> Dict[str, str]:
"""Load cookies for a given domain from all supported browsers."""
if domain_name in CookiesConfig.cookies:
return CookiesConfig.cookies[domain_name]
@@ -85,120 +76,134 @@ def get_cookies(domain_name: str, raise_requirements_error: bool = True, single_
CookiesConfig.cookies[domain_name] = cookies
return cookies
def set_cookies(domain_name: str, cookies: Cookies = None) -> None:
"""Set or remove cookies for a given domain in the cache."""
if cookies:
CookiesConfig.cookies[domain_name] = cookies
elif domain_name in CookiesConfig.cookies:
CookiesConfig.cookies.pop(domain_name)
else:
CookiesConfig.cookies.pop(domain_name, None)
def load_cookies_from_browsers(domain_name: str, raise_requirements_error: bool = True, single_browser: bool = False) -> Cookies:
"""
Helper function to load cookies from various browsers.
Args:
domain_name (str): The domain for which to load cookies.
Returns:
Dict[str, str]: A dictionary of cookie names and values.
"""
def load_cookies_from_browsers(domain_name: str,
raise_requirements_error: bool = True,
single_browser: bool = False) -> Cookies:
"""Helper to load cookies from all supported browsers."""
if not has_browser_cookie3:
if raise_requirements_error:
raise MissingRequirementsError('Install "browser_cookie3" package')
return {}
cookies = {}
for cookie_fn in browsers:
for cookie_fn in BROWSERS:
try:
cookie_jar = cookie_fn(domain_name=domain_name)
if len(cookie_jar):
if cookie_jar:
debug.log(f"Read cookies from {cookie_fn.__name__} for {domain_name}")
for cookie in cookie_jar:
if cookie.name not in cookies:
if not cookie.expires or cookie.expires > time.time():
if cookie.name not in cookies and (not cookie.expires or cookie.expires > time.time()):
cookies[cookie.name] = cookie.value
if single_browser and len(cookie_jar):
if single_browser and cookie_jar:
break
except BrowserCookieError:
pass
except KeyboardInterrupt:
debug.error("Cookie loading interrupted by user.")
break
except Exception as e:
debug.error(f"Error reading cookies from {cookie_fn.__name__} for {domain_name}: {e}")
return cookies
def set_cookies_dir(dir: str) -> None:
CookiesConfig.cookies_dir = dir
def set_cookies_dir(dir_path: str) -> None:
CookiesConfig.cookies_dir = dir_path
def get_cookies_dir() -> str:
return CookiesConfig.cookies_dir
def read_cookie_files(dirPath: str = None):
dirPath = CookiesConfig.cookies_dir if dirPath is None else dirPath
if not os.access(dirPath, os.R_OK):
debug.log(f"Read cookies: {dirPath} dir is not readable")
def _parse_har_file(path: str) -> Dict[str, Dict[str, str]]:
"""Parse a HAR file and return cookies by domain."""
cookies_by_domain = {}
try:
with open(path, "rb") as file:
har_file = json.load(file)
debug.log(f"Read .har file: {path}")
def get_domain(entry: dict) -> Optional[str]:
headers = entry["request"].get("headers", [])
host_values = [h["value"] for h in headers if h["name"].lower() in ("host", ":authority")]
if not host_values:
return None
host = host_values.pop()
return next((d for d in DOMAINS if d in host), None)
for entry in har_file.get("log", {}).get("entries", []):
domain = get_domain(entry)
if domain:
v_cookies = {c["name"]: c["value"] for c in entry["request"].get("cookies", [])}
if v_cookies:
cookies_by_domain[domain] = v_cookies
except (json.JSONDecodeError, FileNotFoundError):
pass
return cookies_by_domain
def _parse_json_cookie_file(path: str) -> Dict[str, Dict[str, str]]:
"""Parse a JSON cookie export file."""
cookies_by_domain = {}
try:
with open(path, "rb") as file:
cookie_file = json.load(file)
if not isinstance(cookie_file, list):
return {}
debug.log(f"Read cookie file: {path}")
for c in cookie_file:
if isinstance(c, dict) and "domain" in c:
cookies_by_domain.setdefault(c["domain"], {})[c["name"]] = c["value"]
except (json.JSONDecodeError, FileNotFoundError):
pass
return cookies_by_domain
def read_cookie_files(dir_path: Optional[str] = None, domains_filter: Optional[List[str]] = None) -> None:
"""
Load cookies from .har and .json files in a directory.
"""
dir_path = dir_path or CookiesConfig.cookies_dir
if not os.access(dir_path, os.R_OK):
debug.log(f"Read cookies: {dir_path} dir is not readable")
return
# Optionally load environment variables
try:
from dotenv import load_dotenv
load_dotenv(os.path.join(dirPath, ".env"), override=True)
debug.log(f"Read cookies: Loaded environment variables from {dirPath}/.env")
load_dotenv(os.path.join(dir_path, ".env"), override=True)
debug.log(f"Read cookies: Loaded env vars from {dir_path}/.env")
except ImportError:
debug.error("Warning: 'python-dotenv' is not installed. Environment variables will not be loaded.")
debug.error("Warning: 'python-dotenv' is not installed. Env vars not loaded.")
def get_domain(v: dict) -> str:
host = [h["value"] for h in v['request']['headers'] if h["name"].lower() in ("host", ":authority")]
if not host:
return
host = host.pop()
for d in DOMAINS:
if d in host:
return d
harFiles = []
cookieFiles = []
for root, _, files in os.walk(dirPath):
har_files, json_files = [], []
for root, _, files in os.walk(dir_path):
for file in files:
if file.endswith(".har"):
harFiles.append(os.path.join(root, file))
har_files.append(os.path.join(root, file))
elif file.endswith(".json"):
cookieFiles.append(os.path.join(root, file))
break
json_files.append(os.path.join(root, file))
break # Do not recurse
CookiesConfig.cookies = {}
for path in harFiles:
with open(path, 'rb') as file:
try:
harFile = json.load(file)
except json.JSONDecodeError:
# Error: not a HAR file!
continue
debug.log(f"Read .har file: {path}")
new_cookies = {}
for v in harFile['log']['entries']:
domain = get_domain(v)
if domain is None:
continue
v_cookies = {}
for c in v['request']['cookies']:
v_cookies[c['name']] = c['value']
if len(v_cookies) > 0:
CookiesConfig.cookies[domain] = v_cookies
new_cookies[domain] = len(v_cookies)
for domain, new_values in new_cookies.items():
debug.log(f"Cookies added: {new_values} from {domain}")
for path in cookieFiles:
with open(path, 'rb') as file:
try:
cookieFile = json.load(file)
except json.JSONDecodeError:
# Error: not a json file!
continue
if not isinstance(cookieFile, list) or not isinstance(cookieFile[0], dict) or "domain" not in cookieFile[0]:
continue
debug.log(f"Read cookie file: {path}")
new_cookies = {}
for c in cookieFile:
if isinstance(c, dict) and "domain" in c:
if c["domain"] not in new_cookies:
new_cookies[c["domain"]] = {}
new_cookies[c["domain"]][c["name"]] = c["value"]
for domain, new_values in new_cookies.items():
CookiesConfig.cookies[domain] = new_values
debug.log(f"Cookies added: {len(new_values)} from {domain}")
CookiesConfig.cookies.clear()
# Load cookies from files
for path in har_files:
for domain, cookies in _parse_har_file(path).items():
if not domains_filter or domain in domains_filter:
CookiesConfig.cookies[domain] = cookies
debug.log(f"Cookies added: {len(cookies)} from {domain}")
for path in json_files:
for domain, cookies in _parse_json_cookie_file(path).items():
if not domains_filter or domain in domains_filter:
CookiesConfig.cookies[domain] = cookies
debug.log(f"Cookies added: {len(cookies)} from {domain}")

View File

@@ -4,15 +4,35 @@ from typing import Callable, List, Optional, Any
logging: bool = False
version_check: bool = True
version: Optional[str] = None
log_handler: Callable = print # More specifically: Callable[[Any, Optional[Any]], None]
log_handler: Callable[..., None] = print
logs: List[str] = []
def enable_logging(handler: Callable[..., None] = print) -> None:
"""Enable debug logging with optional handler."""
global logging, log_handler
logging = True
log_handler = handler
def disable_logging() -> None:
"""Disable debug logging."""
global logging
logging = False
def log(*text: Any, file: Optional[Any] = None) -> None:
"""Log a message if logging is enabled."""
if logging:
message = " ".join(map(str, text))
logs.append(message)
log_handler(*text, file=file)
def error(*error: Any, name: Optional[str] = None) -> None:
def error(*error_args: Any, name: Optional[str] = None) -> None:
"""Log an error message to stderr."""
error = [e if isinstance(e, str) else f"{type(e).__name__ if name is None else name}: {e}" for e in error]
log(*error, file=sys.stderr)
formatted_errors = [
e if isinstance(e, str) else f"{name or type(e).__name__}: {e}"
for e in error_args
]
log(*formatted_errors, file=sys.stderr)

View File

@@ -1,59 +1,103 @@
class ProviderNotFoundError(Exception):
...
class G4FError(Exception):
"""Base exception for all g4f-related errors."""
pass
class ProviderNotWorkingError(Exception):
...
class StreamNotSupportedError(Exception):
...
class ProviderNotFoundError(G4FError):
"""Raised when a provider is not found."""
pass
class ModelNotFoundError(Exception):
...
class ModelNotAllowedError(Exception):
...
class ProviderNotWorkingError(G4FError):
"""Raised when the provider is unavailable or failing."""
pass
class RetryProviderError(Exception):
...
class RetryNoProviderError(Exception):
...
class StreamNotSupportedError(G4FError):
"""Raised when the requested provider does not support streaming."""
pass
class VersionNotFoundError(Exception):
...
class MissingRequirementsError(Exception):
...
class ModelNotFoundError(G4FError):
"""Raised when a model is not found."""
pass
class ModelNotAllowedError(G4FError):
"""Raised when a model is not allowed by configuration or policy."""
pass
class RetryProviderError(G4FError):
"""Raised to retry with another provider."""
pass
class RetryNoProviderError(G4FError):
"""Raised when there are no providers left to retry."""
pass
class VersionNotFoundError(G4FError):
"""Raised when the version could not be determined."""
pass
class MissingRequirementsError(G4FError):
"""Raised when a required dependency is missing."""
pass
class NestAsyncioError(MissingRequirementsError):
...
"""Raised when 'nest_asyncio' is missing."""
pass
class MissingAuthError(Exception):
...
class PaymentRequiredError(Exception):
...
class MissingAuthError(G4FError):
"""Raised when authentication details are missing."""
pass
class NoMediaResponseError(Exception):
...
class ResponseError(Exception):
...
class PaymentRequiredError(G4FError):
"""Raised when a provider requires payment before access."""
pass
class NoMediaResponseError(G4FError):
"""Raised when a media request returns no response."""
pass
class ResponseError(G4FError):
"""Base class for response-related errors."""
pass
class ResponseStatusError(ResponseError):
"""Raised when an HTTP response returns a non-success status code."""
pass
class ResponseStatusError(Exception):
...
class CloudflareError(ResponseStatusError):
...
"""Raised when a request is blocked by Cloudflare."""
pass
class RateLimitError(ResponseStatusError):
...
"""Raised when the provider's rate limit has been exceeded."""
pass
class NoValidHarFileError(Exception):
...
class TimeoutError(Exception):
class NoValidHarFileError(G4FError):
"""Raised when no valid HAR file is found."""
pass
class TimeoutError(G4FError):
"""Raised for timeout errors during API requests."""
pass
class ConversationLimitError(Exception):
"""Raised for conversation limit during API requests to AI endpoint."""
class ConversationLimitError(G4FError):
"""Raised when a conversation limit is reached on the provider."""
pass

View File

@@ -1,26 +1,34 @@
from __future__ import annotations
import re
from urllib.parse import unquote
import os
from urllib.parse import unquote
from .cookies import get_cookies_dir
def secure_filename(filename: str) -> str:
def secure_filename(filename: str, max_length: int = 100) -> str:
"""Sanitize a filename for safe filesystem storage."""
if filename is None:
return None
# Keep letters, numbers, basic punctuation and all Unicode chars
# Keep letters, numbers, basic punctuation, underscores
filename = re.sub(
r'[^\w.,_+-]+',
'_',
r"[^\w.,_+\-]+",
"_",
unquote(filename).strip(),
flags=re.UNICODE
)
encoding = 'utf-8'
max_length = 100
encoding = "utf-8"
encoded = filename.encode(encoding)[:max_length]
decoded = encoded.decode(encoding, 'ignore')
decoded = encoded.decode(encoding, "ignore")
return decoded.strip(".,_+-")
def get_bucket_dir(*parts):
return os.path.join(get_cookies_dir(), "buckets", *[secure_filename(part) for part in parts if part])
def get_bucket_dir(*parts: str) -> str:
"""Return a path under the cookies 'buckets' directory with sanitized parts."""
return os.path.join(
get_cookies_dir(),
"buckets",
*[secure_filename(part) for part in parts if part]
)

View File

@@ -1,42 +1,89 @@
import os
from typing import Any, AsyncGenerator, Generator, AsyncIterator, Iterator, NewType, Tuple, Union, List, Dict, Type, IO, Optional, TypedDict
from __future__ import annotations
try:
from PIL.Image import Image
except ImportError:
class Image:
import os
from typing import (
Any,
AsyncGenerator,
Generator,
AsyncIterator,
Iterator,
NewType,
Tuple,
Union,
List,
Dict,
Type,
IO,
Optional,
TypedDict,
TYPE_CHECKING,
)
# Only import PIL for type-checkers; no runtime dependency required.
if TYPE_CHECKING:
from PIL.Image import Image as PILImage
else:
class PILImage: # minimal placeholder to avoid runtime import errors
pass
# Response chunk type from providers
from .providers.response import ResponseType
SHA256 = NewType('sha_256_hash', str)
# ---- Hashes & cookie aliases -------------------------------------------------
SHA256 = NewType("SHA256", str)
Cookies = Dict[str, str]
# ---- Streaming result types --------------------------------------------------
CreateResult = Iterator[Union[str, ResponseType]]
AsyncResult = AsyncIterator[Union[str, ResponseType]]
Messages = List[Dict[str, Union[str, List[Dict[str, Union[str, Dict[str, str]]]]]]]
Cookies = Dict[str, str]
ImageType = Union[str, bytes, IO, Image, os.PathLike]
# ---- Message schema ----------------------------------------------------------
# Typical message structure:
# {"role": "user" | "assistant" | "system" | "tool", "content": str | [ContentPart, ...]}
# where content parts can be text or (optionally) structured pieces like images.
class ContentPart(TypedDict, total=False):
type: str # e.g., "text", "image_url", etc.
text: str # present when type == "text"
image_url: Dict[str, str] # present when type == "image_url"
class Message(TypedDict):
role: str
content: Union[str, List[ContentPart]]
Messages = List[Message]
# ---- Media inputs ------------------------------------------------------------
# Paths, raw bytes, file-like objects, or PIL Image objects are accepted.
ImageType = Union[str, bytes, IO[bytes], PILImage, os.PathLike]
MediaListType = List[Tuple[ImageType, Optional[str]]]
__all__ = [
'Any',
'AsyncGenerator',
'Generator',
'AsyncIterator',
'Iterator'
'Tuple',
'Union',
'List',
'Dict',
'Type',
'IO',
'Optional',
'TypedDict',
'SHA256',
'CreateResult',
'AsyncResult',
'Messages',
'Cookies',
'Image',
'ImageType',
'MediaListType'
"Any",
"AsyncGenerator",
"Generator",
"AsyncIterator",
"Iterator",
"Tuple",
"Union",
"List",
"Dict",
"Type",
"IO",
"Optional",
"TypedDict",
"SHA256",
"CreateResult",
"AsyncResult",
"Messages",
"Message",
"ContentPart",
"Cookies",
"Image",
"ImageType",
"MediaListType",
"ResponseType",
]

View File

@@ -1,102 +1,114 @@
from __future__ import annotations
from os import environ
import requests
from functools import cached_property
from os import environ
from functools import cached_property, lru_cache
from importlib.metadata import version as get_package_version, PackageNotFoundError
from subprocess import check_output, CalledProcessError, PIPE
from .errors import VersionNotFoundError
from .config import PACKAGE_NAME, GITHUB_REPOSITORY
from . import debug
# Default request timeout (seconds)
REQUEST_TIMEOUT = 5
@lru_cache(maxsize=1)
def get_pypi_version(package_name: str) -> str:
"""
Retrieves the latest version of a package from PyPI.
Args:
package_name (str): The name of the package for which to retrieve the version.
Returns:
str: The latest version of the specified package from PyPI.
Raises:
VersionNotFoundError: If there is an error in fetching the version from PyPI.
VersionNotFoundError: If there is a network or parsing error.
"""
try:
response = requests.get(f"https://pypi.org/pypi/{package_name}/json").json()
return response["info"]["version"]
response = requests.get(
f"https://pypi.org/pypi/{package_name}/json",
timeout=REQUEST_TIMEOUT
)
response.raise_for_status()
return response.json()["info"]["version"]
except requests.RequestException as e:
raise VersionNotFoundError(f"Failed to get PyPI version: {e}")
raise VersionNotFoundError(
f"Failed to get PyPI version for '{package_name}'"
) from e
@lru_cache(maxsize=1)
def get_github_version(repo: str) -> str:
"""
Retrieves the latest release version from a GitHub repository.
Args:
repo (str): The name of the GitHub repository.
Returns:
str: The latest release version from the specified GitHub repository.
Raises:
VersionNotFoundError: If there is an error in fetching the version from GitHub.
VersionNotFoundError: If there is a network or parsing error.
"""
try:
response = requests.get(f"https://api.github.com/repos/{repo}/releases/latest")
response = requests.get(
f"https://api.github.com/repos/{repo}/releases/latest",
timeout=REQUEST_TIMEOUT
)
response.raise_for_status()
return response.json()["tag_name"]
data = response.json()
if "tag_name" not in data:
raise VersionNotFoundError(f"No tag_name found in latest GitHub release for '{repo}'")
return data["tag_name"]
except requests.RequestException as e:
raise VersionNotFoundError(f"Failed to get GitHub release version: {e}")
raise VersionNotFoundError(
f"Failed to get GitHub release version for '{repo}'"
) from e
def get_git_version() -> str:
# Read from git repository
def get_git_version() -> str | None:
"""Return latest Git tag if available, else None."""
try:
command = ["git", "describe", "--tags", "--abbrev=0"]
return check_output(command, text=True, stderr=PIPE).strip()
return check_output(
["git", "describe", "--tags", "--abbrev=0"],
text=True,
stderr=PIPE
).strip()
except CalledProcessError:
return None
class VersionUtils:
"""
Utility class for managing and comparing package versions of 'g4f'.
"""
@cached_property
def current_version(self) -> str:
"""
Retrieves the current version of the 'g4f' package.
Returns:
str: The current version of 'g4f'.
Raises:
VersionNotFoundError: If the version cannot be determined from the package manager,
Docker environment, or git repository.
Returns the current installed version of g4f from:
- debug override
- package metadata
- environment variable (Docker)
- git tags
"""
if debug.version:
return debug.version
# Read from package manager
try:
return get_package_version(PACKAGE_NAME)
except PackageNotFoundError:
pass
# Read from docker environment
version = environ.get("G4F_VERSION")
if version:
return version
version_env = environ.get("G4F_VERSION")
if version_env:
return version_env
return get_git_version()
git_version = get_git_version()
if git_version:
return git_version
raise VersionNotFoundError("Could not determine current g4f version.")
@property
def latest_version(self) -> str:
"""
Retrieves the latest version of the 'g4f' package.
Returns:
str: The latest version of 'g4f'.
Returns the latest available version of g4f.
If not installed via PyPI, falls back to GitHub releases.
"""
# Is installed via package manager?
try:
get_package_version(PACKAGE_NAME)
except PackageNotFoundError:
@@ -107,17 +119,30 @@ class VersionUtils:
def latest_version_cached(self) -> str:
return self.latest_version
def check_version(self) -> None:
def check_version(self, silent: bool = False) -> bool:
"""
Checks if the current version of 'g4f' is up to date with the latest version.
Note:
If a newer version is available, it prints a message with the new version and update instructions.
Checks if the current version is up-to-date.
Returns:
bool: True if current version is the latest, False otherwise.
"""
try:
if self.current_version != self.latest_version:
print(f'New g4f version: {self.latest_version} (current: {self.current_version}) | pip install -U g4f')
current = self.current_version
latest = self.latest_version
up_to_date = current == latest
if not silent:
if up_to_date:
print(f"g4f is up-to-date (version {current}).")
else:
print(
f"New g4f version available: {latest} "
f"(current: {current}) | pip install -U g4f"
)
return up_to_date
except Exception as e:
print(f'Failed to check g4f version: {e}')
if not silent:
print(f"Failed to check g4f version: {e}")
return True # Assume up-to-date if check fails
# Singleton instance
utils = VersionUtils()