summaryrefslogtreecommitdiffstats
path: root/g4f/Provider/GigaChat.py
blob: 699353b13ea668718cd5671d0938647e06b7ceaa (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
from __future__ import annotations

import base64
import os
import ssl
import time
import uuid

import json
from aiohttp import ClientSession, BaseConnector, TCPConnector
from g4f.requests import raise_for_status

from ..typing import AsyncResult, Messages, ImageType
from .base_provider import AsyncGeneratorProvider, ProviderModelMixin
from ..image import to_bytes, is_accepted_format
from ..errors import MissingAuthError
from .helper import get_connector

access_token = ''
token_expires_at = 0

ssl_ctx = ssl.create_default_context(
    cafile=os.path.dirname(__file__) + '/gigachat_crt/russian_trusted_root_ca_pem.crt')


class GigaChat(AsyncGeneratorProvider, ProviderModelMixin):
    url = "https://developers.sber.ru/gigachat"
    working = True
    supports_message_history = True
    supports_system_message = True
    supports_stream = True

    needs_auth = True
    default_model = "GigaChat:latest"
    models = ["GigaChat:latest", "GigaChat-Plus", "GigaChat-Pro"]

    @classmethod
    async def create_async_generator(
            cls,
            model: str,
            messages: Messages,
            stream: bool = True,
            proxy: str = None,
            api_key: str = None,
            scope: str = "GIGACHAT_API_PERS",
            update_interval: float = 0,
            **kwargs
    ) -> AsyncResult:
        global access_token, token_expires_at
        model = cls.get_model(model)

        if not api_key:
            raise MissingAuthError('Missing "api_key"')

        connector = TCPConnector(ssl_context=ssl_ctx)

        async with ClientSession(connector=get_connector(connector, proxy)) as session:
            if token_expires_at - int(time.time() * 1000) < 60000:
                async with session.post(url="https://ngw.devices.sberbank.ru:9443/api/v2/oauth",
                                        headers={"Authorization": f"Bearer {api_key}",
                                                 "RqUID": str(uuid.uuid4()),
                                                 "Content-Type": "application/x-www-form-urlencoded"},
                                        data={"scope": scope}) as response:
                    await raise_for_status(response)
                    data = await response.json()
                access_token = data['access_token']
                token_expires_at = data['expires_at']

            async with session.post(url="https://gigachat.devices.sberbank.ru/api/v1/chat/completions",
                                    headers={"Authorization": f"Bearer {access_token}"},
                                    json={
                                        "model": model,
                                        "messages": messages,
                                        "stream": stream,
                                        "update_interval": update_interval,
                                        **kwargs
                                    }) as response:
                await raise_for_status(response)

                async for line in response.content:
                    if not stream:
                        yield json.loads(line.decode("utf-8"))['choices'][0]['message']['content']
                        return

                    if line and line.startswith(b"data:"):
                        line = line[6:-1]  # remove "data: " prefix and "\n" suffix
                        if line.strip() == b"[DONE]":
                            return
                        else:
                            msg = json.loads(line.decode("utf-8"))['choices'][0]
                            content = msg['delta']['content']

                            if content:
                                yield content

                            if 'finish_reason' in msg:
                                return