Files
gpt4free/g4f/Provider/needs_auth/Video.py
2025-06-19 11:29:54 +02:00

223 lines
9.3 KiB
Python

from __future__ import annotations
import time
import asyncio
import random
from aiohttp import ClientSession, ClientTimeout
from urllib.parse import quote, quote_plus
from aiohttp import ClientSession
try:
import nodriver
from nodriver.core.connection import ProtocolException
except:
pass
from ...typing import Messages, AsyncResult
from ...providers.response import VideoResponse, Reasoning, ContinueResponse, ProviderInfo
from ...requests import get_nodriver
from ...errors import MissingRequirementsError
from ..base_provider import AsyncGeneratorProvider, ProviderModelMixin
from ..helper import format_media_prompt
from ... import debug
PUBLIC_URL = "https://home.g4f.dev"
SEARCH_URL = f"{PUBLIC_URL}/search/video+"
class RequestConfig:
urls: dict[str, list[str]] = {}
headers: dict = {}
@classmethod
async def get_response(cls, prompt: str) -> VideoResponse | None:
if prompt in cls.urls and cls.urls[prompt]:
cls.urls[prompt] = list(set(cls.urls[prompt]))
return VideoResponse(cls.urls[prompt], prompt, {
"headers": {"authorization": cls.headers.get("authorization")} if cls.headers.get("authorization") else {},
"preview": [url.replace("md.mp4", "thumb.webp") for url in cls.urls[prompt]]
})
async with ClientSession() as session:
found_urls = []
for skip in range(0, 9):
async with session.get(SEARCH_URL + quote_plus(prompt) + f"?skip={skip}", timeout=ClientTimeout(total=10)) as response:
if response.ok:
found_urls.append(str(response.url))
else:
break
if found_urls:
return VideoResponse(found_urls, prompt)
class Video(AsyncGeneratorProvider, ProviderModelMixin):
urls = {
"search": "https://sora.chatgpt.com/explore?query={0}",
"sora": "https://sora.chatgpt.com/explore",
#"veo": "https://aistudio.google.com/generate-video"
}
api_url = f"{PUBLIC_URL}/backend-api/v2/create?provider=Video&cache=true&prompt="
drive_url = "https://www.googleapis.com/drive/v3/"
active_by_default = True
default_model = "search"
models = list(urls.keys())
video_models = models
needs_auth = True
working = True
browser = None
stop_browser = None
@classmethod
async def create_async_generator(
cls,
model: str,
messages: Messages,
proxy: str = None,
prompt: str = None,
aspect_ratio: str = None,
**kwargs
) -> AsyncResult:
if not model:
model = cls.default_model
if model not in cls.video_models:
raise ValueError(f"Model '{model}' is not supported by {cls.__name__}. Supported models: {cls.models}")
yield ProviderInfo(**cls.get_dict(), model="sora")
prompt = format_media_prompt(messages, prompt)[:100]
if not prompt:
raise ValueError("Prompt cannot be empty.")
response = await RequestConfig.get_response(prompt)
if response:
yield Reasoning(label=f"Found {len(response.urls)} Video(s)", status="")
yield response
return
try:
yield Reasoning(label="Open browser")
browser, stop_browser = await get_nodriver(proxy=proxy, user_data_dir="gemini")
except Exception as e:
debug.error(f"Error getting nodriver:", e)
async with ClientSession() as session:
yield Reasoning(label="Generating")
async with session.post(cls.api_url + quote(prompt)) as response:
if not response.ok:
debug.error(f"Failed to generate Video: {response.status}")
else:
yield Reasoning(label="Finished", status="")
if response.headers.get("content-type", "text/plain").startswith("text/plain"):
data = (await response.text()).split("\n")
yield VideoResponse([f"{PUBLIC_URL}{url}" if url.startswith("/") else url for url in data], prompt)
return
yield VideoResponse(str(response.url), prompt)
return
raise MissingRequirementsError("Video provider requires a browser to be installed.")
try:
yield ContinueResponse("Timeout waiting for Video URL")
cls.page = await browser.get(cls.urls[model].format(quote(prompt)))
except Exception as e:
debug.error(f"Error opening page:", e)
if prompt not in RequestConfig.urls:
RequestConfig.urls[prompt] = []
def on_request(event: nodriver.cdp.network.RequestWillBeSent, page=None):
if ".mp4" in event.request.url:
RequestConfig.headers = {}
for key, value in event.request.headers.items():
RequestConfig.headers[key.lower()] = value
RequestConfig.urls[prompt].append(event.request.url)
elif event.request.url.startswith(cls.drive_url):
RequestConfig.headers = {}
for key, value in event.request.headers.items():
RequestConfig.headers[key.lower()] = value
RequestConfig.urls[prompt].append(event.request.url)
await page.send(nodriver.cdp.network.enable())
page.add_handler(nodriver.cdp.network.RequestWillBeSent, on_request)
if model == "search":
asyncio.sleep(5)
response = await RequestConfig.get_response(prompt)
if response:
yield Reasoning(label="Found", status="")
yield response
return
try:
page = cls.page
await asyncio.sleep(3)
await page.select("textarea", 240)
try:
button = await page.find("Image")
if button:
await button.click()
else:
debug.error("No 'Image' button found.")
button = await page.find("Video")
if button:
await button.click()
yield Reasoning(label=f"Clicked 'Video' button")
else:
debug.error("No 'Video' button found.")
except Exception as e:
debug.error(f"Error clicking button:", e)
try:
if aspect_ratio:
button = await page.find(":")
if button:
await button.click()
else:
debug.error("No 'x:x' button found.")
await asyncio.sleep(1)
button = await page.find(aspect_ratio)
if button:
await button.click()
yield Reasoning(label=f"Clicked '{aspect_ratio}' button")
else:
debug.error(f"No '{aspect_ratio}' button found.")
except Exception as e:
debug.error(f"Error clicking button:", e)
debug.log(f"Using prompt: {prompt}")
textarea = await page.select("textarea", 180)
await textarea.send_keys(prompt)
yield Reasoning(label=f"Sending prompt", token=prompt)
try:
button = await page.select('button[type="submit"]', 5)
if button:
await button.click()
except Exception as e:
debug.error(f"Error clicking submit button:", e)
try:
button = await page.find("Create")
if button:
await button.click()
yield Reasoning(label=f"Clicked 'Create' button")
except Exception as e:
debug.error(f"Error clicking 'Create' button:", e)
try:
button = await page.find("Activity")
if button:
await button.click()
yield Reasoning(label=f"Clicked 'Activity' button")
except Exception as e:
debug.error(f"Error clicking 'Activity' button:", e)
for idx in range(60):
await asyncio.sleep(1)
try:
button = await page.find("Queued")
if button:
await button.click()
yield Reasoning(label=f"Clicked 'Queued' button")
break
except ProtocolException as e:
if idx == 59:
debug.error(e)
raise RuntimeError("Failed to click 'Queued' button")
for idx in range(600):
yield Reasoning(label="Waiting for Video...", status=f"{idx+1}/600")
await asyncio.sleep(1)
if RequestConfig.urls[prompt]:
await asyncio.sleep(2)
response = await RequestConfig.get_response(prompt)
if response:
yield Reasoning(label="Finished", status="")
yield response
return
if idx == 599:
raise RuntimeError("Failed to get Video URL")
finally:
stop_browser()