Add OpenaiChat and Hugchat Provider

Add tests for providers with auth
Improve async support / 2x faster
Shared get_cookies by domain  function
This commit is contained in:
Heiner Lohaus
2023-08-25 06:41:32 +02:00
parent d1b6921b5f
commit 126496d3ca
7 changed files with 354 additions and 79 deletions

View File

@@ -1,7 +1,11 @@
from abc import ABC, abstractmethod
from ..typing import Any, CreateResult
from ..typing import Any, CreateResult, AsyncGenerator, Union
import browser_cookie3
import asyncio
from time import time
import math
class BaseProvider(ABC):
url: str
@@ -30,4 +34,81 @@ class BaseProvider(ABC):
("stream", "bool"),
]
param = ", ".join([": ".join(p) for p in params])
return f"g4f.provider.{cls.__name__} supports: ({param})"
return f"g4f.provider.{cls.__name__} supports: ({param})"
_cookies = {}
def get_cookies(cookie_domain: str) -> dict:
if cookie_domain not in _cookies:
_cookies[cookie_domain] = {}
for cookie in browser_cookie3.load(cookie_domain):
_cookies[cookie_domain][cookie.name] = cookie.value
return _cookies[cookie_domain]
class AsyncProvider(BaseProvider):
@classmethod
def create_completion(
cls,
model: str,
messages: list[dict[str, str]],
stream: bool = False,
**kwargs: Any
) -> CreateResult:
yield asyncio.run(cls.create_async(model, messages, **kwargs))
@staticmethod
@abstractmethod
async def create_async(
model: str,
messages: list[dict[str, str]],
**kwargs: Any,
) -> str:
raise NotImplementedError()
class AsyncGeneratorProvider(AsyncProvider):
@classmethod
def create_completion(
cls,
model: str,
messages: list[dict[str, str]],
stream: bool = True,
**kwargs: Any
) -> CreateResult:
if stream:
yield from run_generator(cls.create_async_generator(model, messages, **kwargs))
else:
yield from AsyncProvider.create_completion(cls=cls, model=model, messages=messages, **kwargs)
@classmethod
async def create_async(
cls,
model: str,
messages: list[dict[str, str]],
**kwargs: Any,
) -> str:
chunks = [chunk async for chunk in cls.create_async_generator(model, messages, **kwargs)]
if chunks:
return "".join(chunks)
@staticmethod
@abstractmethod
def create_async_generator(
model: str,
messages: list[dict[str, str]],
) -> AsyncGenerator:
raise NotImplementedError()
def run_generator(generator: AsyncGenerator[Union[Any, str], Any]):
loop = asyncio.new_event_loop()
gen = generator.__aiter__()
while True:
try:
yield loop.run_until_complete(gen.__anext__())
except StopAsyncIteration:
break