summaryrefslogtreecommitdiffstats
path: root/g4f/__init__.py
blob: d77fe76097a8c9d6e76b039326c9f1033f6da791 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
from __future__ import annotations

import os

from . import debug, version
from .models import Model
from .client import Client, AsyncClient
from .typing import Messages, CreateResult, AsyncResult, Union
from .errors import StreamNotSupportedError, ModelNotAllowedError
from .cookies import get_cookies, set_cookies
from .providers.types import ProviderType
from .providers.base_provider import AsyncGeneratorProvider
from .client.service import get_model_and_provider, get_last_provider

class ChatCompletion:
    @staticmethod
    def create(model    : Union[Model, str],
               messages : Messages,
               provider : Union[ProviderType, str, None] = None,
               stream   : bool = False,
               auth     : Union[str, None] = None,
               ignored  : list[str] = None, 
               ignore_working: bool = False,
               ignore_stream: bool = False,
               patch_provider: callable = None,
               **kwargs) -> Union[CreateResult, str]:
        model, provider = get_model_and_provider(
            model, provider, stream,
            ignored, ignore_working,
            ignore_stream or kwargs.get("ignore_stream_and_auth")
        )

        if auth is not None:
            kwargs['auth'] = auth
        
        if "proxy" not in kwargs:
            proxy = os.environ.get("G4F_PROXY")
            if proxy:
                kwargs['proxy'] = proxy

        if patch_provider:
            provider = patch_provider(provider)

        result = provider.create_completion(model, messages, stream=stream, **kwargs)

        return result if stream else ''.join([str(chunk) for chunk in result])

    @staticmethod
    def create_async(model    : Union[Model, str],
                     messages : Messages,
                     provider : Union[ProviderType, str, None] = None,
                     stream   : bool = False,
                     ignored  : list[str] = None,
                     ignore_working: bool = False,
                     patch_provider: callable = None,
                     **kwargs) -> Union[AsyncResult, str]:
        model, provider = get_model_and_provider(model, provider, False, ignored, ignore_working)

        if stream:
            if isinstance(provider, type) and issubclass(provider, AsyncGeneratorProvider):
                return provider.create_async_generator(model, messages, **kwargs)
            raise StreamNotSupportedError(f'{provider.__name__} does not support "stream" argument in "create_async"')

        if patch_provider:
            provider = patch_provider(provider)

        return provider.create_async(model, messages, **kwargs)

class Completion:
    @staticmethod
    def create(model    : Union[Model, str],
               prompt   : str,
               provider : Union[ProviderType, None] = None,
               stream   : bool = False,
               ignored  : list[str] = None, **kwargs) -> Union[CreateResult, str]:
        allowed_models = [
            'code-davinci-002',
            'text-ada-001',
            'text-babbage-001',
            'text-curie-001',
            'text-davinci-002',
            'text-davinci-003'
        ]
        if model not in allowed_models:
            raise ModelNotAllowedError(f'Can\'t use {model} with Completion.create()')

        model, provider = get_model_and_provider(model, provider, stream, ignored)

        result = provider.create_completion(model, [{"role": "user", "content": prompt}], stream=stream, **kwargs)

        return result if stream else ''.join(result)