from __future__ import annotations
import os
from .errors import *
from .models import Model, ModelUtils, _all_models
from .Provider import AsyncGeneratorProvider, ProviderUtils
from .typing import Messages, CreateResult, AsyncResult, Union
from . import debug, version
from .base_provider import BaseRetryProvider, ProviderType
def get_model_and_provider(model : Union[Model, str],
provider : Union[ProviderType, str, None],
stream : bool,
ignored : list[str] = None,
ignore_working: bool = False,
ignore_stream: bool = False) -> tuple[str, ProviderType]:
if debug.version_check:
debug.version_check = False
version.utils.check_version()
if isinstance(provider, str):
if provider in ProviderUtils.convert:
provider = ProviderUtils.convert[provider]
else:
raise ProviderNotFoundError(f'Provider not found: {provider}')
if not provider:
if isinstance(model, str):
if model in ModelUtils.convert:
model = ModelUtils.convert[model]
else:
raise ModelNotFoundError(f'Model not found: {model}')
provider = model.best_provider
if not provider:
raise ProviderNotFoundError(f'No provider found for model: {model}')
if isinstance(model, Model):
model = model.name
if ignored and isinstance(provider, BaseRetryProvider):
provider.providers = [p for p in provider.providers if p.__name__ not in ignored]
if not ignore_working and not provider.working:
raise ProviderNotWorkingError(f'{provider.__name__} is not working')
if not ignore_stream and not provider.supports_stream and stream:
raise StreamNotSupportedError(f'{provider.__name__} does not support "stream" argument')
if debug.logging:
if model:
print(f'Using {provider.__name__} provider and {model} model')
else:
print(f'Using {provider.__name__} provider')
debug.last_provider = provider
return model, provider
class ChatCompletion:
@staticmethod
def create(model : Union[Model, str],
messages : Messages,
provider : Union[ProviderType, str, None] = None,
stream : bool = False,
auth : Union[str, None] = None,
ignored : list[str] = None,
ignore_working: bool = False,
ignore_stream_and_auth: bool = False,
patch_provider: callable = None,
**kwargs) -> Union[CreateResult, str]:
model, provider = get_model_and_provider(model, provider, stream, ignored, ignore_working, ignore_stream_and_auth)
if not ignore_stream_and_auth and provider.needs_auth and not auth:
raise AuthenticationRequiredError(f'{provider.__name__} requires authentication (use auth=\'cookie or token or jwt ...\' param)')
if auth:
kwargs['auth'] = auth
if "proxy" not in kwargs:
proxy = os.environ.get("G4F_PROXY")
if proxy:
kwargs['proxy'] = proxy
if patch_provider:
provider = patch_provider(provider)
result = provider.create_completion(model, messages, stream, **kwargs)
return result if stream else ''.join(result)
@staticmethod
def create_async(model : Union[Model, str],
messages : Messages,
provider : Union[ProviderType, str, None] = None,
stream : bool = False,
ignored : list[str] = None,
patch_provider: callable = None,
**kwargs) -> Union[AsyncResult, str]:
model, provider = get_model_and_provider(model, provider, False, ignored)
if stream:
if isinstance(provider, type) and issubclass(provider, AsyncGeneratorProvider):
return provider.create_async_generator(model, messages, **kwargs)
raise StreamNotSupportedError(f'{provider.__name__} does not support "stream" argument in "create_async"')
if patch_provider:
provider = patch_provider(provider)
return provider.create_async(model, messages, **kwargs)
class Completion:
@staticmethod
def create(model : Union[Model, str],
prompt : str,
provider : Union[ProviderType, None] = None,
stream : bool = False,
ignored : list[str] = None, **kwargs) -> Union[CreateResult, str]:
allowed_models = [
'code-davinci-002',
'text-ada-001',
'text-babbage-001',
'text-curie-001',
'text-davinci-002',
'text-davinci-003'
]
if model not in allowed_models:
raise ModelNotAllowedError(f'Can\'t use {model} with Completion.create()')
model, provider = get_model_and_provider(model, provider, stream, ignored)
result = provider.create_completion(model, [{"role": "user", "content": prompt}], stream, **kwargs)
return result if stream else ''.join(result)
def get_last_provider(as_dict: bool = False) -> Union[ProviderType, dict[str, str]]:
last = debug.last_provider
if isinstance(last, BaseRetryProvider):
last = last.last_provider
if last and as_dict:
return {"name": last.__name__, "url": last.url}
return last