summaryrefslogtreecommitdiffstats
path: root/g4f/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'g4f/__init__.py')
-rw-r--r--g4f/__init__.py77
1 files changed, 40 insertions, 37 deletions
diff --git a/g4f/__init__.py b/g4f/__init__.py
index 11920356..47d2a7a3 100644
--- a/g4f/__init__.py
+++ b/g4f/__init__.py
@@ -1,42 +1,45 @@
-import sys
-from . import Provider
-from g4f import models
+from . import models
+from .Provider import BaseProvider
+from .typing import Any, CreateResult, Union
logging = False
+
class ChatCompletion:
@staticmethod
- def create(model: models.Model or str, messages: list, provider: Provider.Provider = None, stream: bool = False, auth: str = False, **kwargs):
- kwargs['auth'] = auth
- if provider and provider.working == False:
- return f'{provider.__name__} is not working'
-
- if provider and provider.needs_auth and not auth:
- print(
- f'ValueError: {provider.__name__} requires authentication (use auth="cookie or token or jwt ..." param)', file=sys.stderr)
- sys.exit(1)
-
- try:
- if isinstance(model, str):
- try:
- model = models.ModelUtils.convert[model]
- except KeyError:
- raise Exception(f'The model: {model} does not exist')
-
- engine = model.best_provider if not provider else provider
-
- if not engine.supports_stream and stream == True:
- print(
- f"ValueError: {engine.__name__} does not support 'stream' argument", file=sys.stderr)
- sys.exit(1)
-
- if logging: print(f'Using {engine.__name__} provider')
-
- return (engine._create_completion(model.name, messages, stream, **kwargs)
- if stream else ''.join(engine._create_completion(model.name, messages, stream, **kwargs)))
- except TypeError as e:
- print(e)
- arg: str = str(e).split("'")[1]
- print(
- f"ValueError: {engine.__name__} does not support '{arg}' argument", file=sys.stderr)
- sys.exit(1) \ No newline at end of file
+ def create(
+ model: Union[models.Model, str],
+ messages: list[dict[str, str]],
+ provider: Union[type[BaseProvider], None] = None,
+ stream: bool = False,
+ auth: Union[str, None] = None,
+ **kwargs: Any,
+ ) -> Union[CreateResult, str]:
+ if isinstance(model, str):
+ try:
+ model = models.ModelUtils.convert[model]
+ except KeyError:
+ raise Exception(f"The model: {model} does not exist")
+
+ provider = model.best_provider if provider == None else provider
+
+ if not provider.working:
+ raise Exception(f"{provider.__name__} is not working")
+
+ if provider.needs_auth and not auth:
+ raise Exception(
+ f'ValueError: {provider.__name__} requires authentication (use auth="cookie or token or jwt ..." param)'
+ )
+ if provider.needs_auth:
+ kwargs["auth"] = auth
+
+ if not provider.supports_stream and stream:
+ raise Exception(
+ f"ValueError: {provider.__name__} does not support 'stream' argument"
+ )
+
+ if logging:
+ print(f"Using {provider.__name__} provider")
+
+ result = provider.create_completion(model.name, messages, stream, **kwargs)
+ return result if stream else "".join(result)