diff options
Diffstat (limited to 'g4f/__init__.py')
-rw-r--r-- | g4f/__init__.py | 73 |
1 files changed, 36 insertions, 37 deletions
diff --git a/g4f/__init__.py b/g4f/__init__.py index a5cee80b..5d0b47d8 100644 --- a/g4f/__init__.py +++ b/g4f/__init__.py @@ -1,31 +1,30 @@ from __future__ import annotations -from g4f import models -from .Provider import BaseProvider -from .typing import CreateResult, Union -from .debug import logging -from requests import get +from requests import get +from g4f.models import Model, ModelUtils +from .Provider import BaseProvider +from .typing import CreateResult, Union +from .debug import logging -logging = False version = '0.1.5.4' -def check_pypi_version(): +def check_pypi_version() -> None: try: - response = get(f"https://pypi.org/pypi/g4f/json").json() + response = get("https://pypi.org/pypi/g4f/json").json() latest_version = response["info"]["version"] - + if version != latest_version: print(f'New pypi version: {latest_version} (current: {version}) | pip install -U g4f') - + except Exception as e: print(f'Failed to check g4f pypi version: {e}') check_pypi_version() -def get_model_and_provider(model: Union[models.Model, str], provider: type[BaseProvider], stream: bool): +def get_model_and_provider(model: Union[Model, str], provider: Union[type[BaseProvider], None], stream: bool) -> tuple[Model, type[BaseProvider]]: if isinstance(model, str): - if model in models.ModelUtils.convert: - model = models.ModelUtils.convert[model] + if model in ModelUtils.convert: + model = ModelUtils.convert[model] else: raise Exception(f'The model: {model} does not exist') @@ -34,14 +33,13 @@ def get_model_and_provider(model: Union[models.Model, str], provider: type[BaseP if not provider: raise Exception(f'No provider found for model: {model}') - + if not provider.working: raise Exception(f'{provider.__name__} is not working') - + if not provider.supports_stream and stream: - raise Exception( - f'ValueError: {provider.__name__} does not support "stream" argument') - + raise Exception(f'ValueError: {provider.__name__} does not support "stream" argument') + if logging: print(f'Using {provider.__name__} provider') @@ -50,11 +48,11 @@ def get_model_and_provider(model: Union[models.Model, str], provider: type[BaseP class ChatCompletion: @staticmethod def create( - model : Union[models.Model, str], - messages : list[dict[str, str]], - provider : Union[type[BaseProvider], None] = None, - stream : bool = False, - auth : Union[str, None] = None, + model: Union[Model, str], + messages: list[dict[str, str]], + provider: Union[type[BaseProvider], None] = None, + stream: bool = False, + auth: Union[str, None] = None, **kwargs ) -> Union[CreateResult, str]: @@ -63,7 +61,7 @@ class ChatCompletion: if provider.needs_auth and not auth: raise Exception( f'ValueError: {provider.__name__} requires authentication (use auth=\'cookie or token or jwt ...\' param)') - + if provider.needs_auth: kwargs['auth'] = auth @@ -72,9 +70,9 @@ class ChatCompletion: @staticmethod async def create_async( - model : Union[models.Model, str], - messages : list[dict[str, str]], - provider : Union[type[BaseProvider], None] = None, + model: Union[Model, str], + messages: list[dict[str, str]], + provider: Union[type[BaseProvider], None] = None, **kwargs ) -> str: model, provider = get_model_and_provider(model, provider, False) @@ -84,11 +82,13 @@ class ChatCompletion: class Completion: @staticmethod def create( - model : Union[models.Model, str], - prompt : str, - provider : Union[type[BaseProvider], None] = None, - stream : bool = False, **kwargs) -> Union[CreateResult, str]: - + model: str, + prompt: str, + provider: Union[type[BaseProvider], None] = None, + stream: bool = False, + **kwargs + ) -> Union[CreateResult, str]: + allowed_models = [ 'code-davinci-002', 'text-ada-001', @@ -97,13 +97,12 @@ class Completion: 'text-davinci-002', 'text-davinci-003' ] - + if model not in allowed_models: raise Exception(f'ValueError: Can\'t use {model} with Completion.create()') - + model, provider = get_model_and_provider(model, provider, stream) - result = provider.create_completion(model.name, - [{"role": "user", "content": prompt}], stream, **kwargs) + result = provider.create_completion(model.name, [{"role": "user", "content": prompt}], stream, **kwargs) - return result if stream else ''.join(result) + return result if stream else ''.join(result)
\ No newline at end of file |