summaryrefslogtreecommitdiffstats
path: root/g4f/Provider/PI.py
blob: 1e2edde81848651da0c3ad2b6a8eb69d3743f1ee (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
from __future__ import annotations

from ..typing import AsyncResult, Messages
from .base_provider import AsyncGeneratorProvider

import json
import cloudscraper

class PI(AsyncGeneratorProvider):
    url                   = "https://chat-gpt.com"
    working               = True

    @classmethod
    async def create_async_generator(
        cls,
        model: str,
        messages: Messages,
        proxy: str = None,
        **kwargs
    ) -> AsyncResult:
        Conversation = kwargs['conversation']
        UserPrompt = messages[-1]
        if UserPrompt['role'] == 'user':
            UserPrompt = UserPrompt['content']
        else:
            UserPrompt = messages[-2]['content']
        if Conversation == None:
            Conversation = PI.Start_Conversation()
        Answer = Ask_PI(UserPrompt,Conversation['sid'],Conversation['cookies'])

        yield Answer[0]['text']

    def Start_Conversation():
        scraper.headers = {
            'accept-type': 'application/json'
        }
        response = scraper.post('https://pi.ai/api/chat/start', data="{}",headers={'x-api-version': '3'})
        cookies = response.cookies

        if 'Just a moment' in response.text:
            return {
                'error': 'cloudflare detected',
                'sid': None,
                'cookies': None,
            }
        return {
            'sid': response.json()['conversations'][0]['sid'],
            'cookies': cookies
        }
        
    def GetConversationTitle(Conversation):
        response = scraper.post('https://pi.ai/api/chat/start', data="{}",headers={'x-api-version': '3'}, cookies=Conversation['cookies'])
        if 'Just a moment' in response.text:
            return {
                'error': 'cloudflare detected',
                'title': 'Couldnt get the title',
            }
        return {
            'title': response.json()['conversations'][0]['title']
        }
        
    def GetChatHistory(Conversation):
        params = {
            'conversation': Conversation['sid'],
        }
        response = scraper.get('https://pi.ai/api/chat/history', params=params, cookies=Conversation['cookies'])
        if 'Just a moment' in response.text:
            return {
                'error': 'cloudflare detected',
                'traceback': 'Couldnt get the chat history'
            }
        return response.json()

session = cloudscraper.session()

scraper = cloudscraper.create_scraper(
    browser={
        'browser': 'chrome',
        'platform': 'windows',
        'desktop': True
    },
    sess=session
)

scraper.headers = {
    'Accept': '*/*',
    'Accept-Encoding': 'deflate,gzip,br',
}

def Ask_PI(message,sid,cookies):
    json_data = {
        'text': message,
        'conversation': sid,
        'mode': 'BASE',
    }
    response = scraper.post('https://pi.ai/api/chat', json=json_data, cookies=cookies)
    
    if 'Just a moment' in response.text:
        return [{
            'error': 'cloudflare detected',
            'text': 'Couldnt generate the answer because we got detected by cloudflare please try again later'
        }
        ]
    result = []
    for line in response.iter_lines(chunk_size=1024, decode_unicode=True):
        if line.startswith('data: {"text":'):
            result.append(json.loads(line.split('data: ')[1].encode('utf-8')))
        if line.startswith('data: {"title":'):
            result.append(json.loads(line.split('data: ')[1].encode('utf-8')))
            
    return result