summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorezerinz <edwinnnzx@gmail.com>2023-04-28 10:38:05 +0200
committerezerinz <edwinnnzx@gmail.com>2023-04-28 10:38:05 +0200
commitbe04fcd7f326a02ace84ea66d614298bee15f9e0 (patch)
treede6bd83fca8ad68fee05e83858d4a8d7d1ec160b
parentupdate readme/usage example (diff)
downloadgpt4free-be04fcd7f326a02ace84ea66d614298bee15f9e0.tar
gpt4free-be04fcd7f326a02ace84ea66d614298bee15f9e0.tar.gz
gpt4free-be04fcd7f326a02ace84ea66d614298bee15f9e0.tar.bz2
gpt4free-be04fcd7f326a02ace84ea66d614298bee15f9e0.tar.lz
gpt4free-be04fcd7f326a02ace84ea66d614298bee15f9e0.tar.xz
gpt4free-be04fcd7f326a02ace84ea66d614298bee15f9e0.tar.zst
gpt4free-be04fcd7f326a02ace84ea66d614298bee15f9e0.zip
-rw-r--r--openaihosted/__init__.py96
1 files changed, 49 insertions, 47 deletions
diff --git a/openaihosted/__init__.py b/openaihosted/__init__.py
index c773b3f9..4f25a2be 100644
--- a/openaihosted/__init__.py
+++ b/openaihosted/__init__.py
@@ -1,60 +1,62 @@
import json
import re
-from fake_useragent import UserAgent
-
import requests
+
class Completion:
@staticmethod
- def create(
- systemprompt:str,
- text:str,
- assistantprompt:str
- ):
-
- data = [
- {"role": "system", "content": systemprompt},
- {"role": "user", "content": "hi"},
- {"role": "assistant", "content": assistantprompt},
- {"role": "user", "content": text},
- ]
- url = f'https://openai.a2hosted.com/chat?q={Completion.__get_query_param(data)}'
-
- try:
- response = requests.get(url, headers=Completion.__get_headers(), stream=True)
- except:
+ def create(messages):
+ headers = {
+ "authority": "openai.a2hosted.com",
+ "accept": "text/event-stream",
+ "accept-language": "en-US,en;q=0.9,id;q=0.8,ja;q=0.7",
+ "cache-control": "no-cache",
+ "sec-fetch-dest": "empty",
+ "sec-fetch-mode": "cors",
+ "sec-fetch-site": "cross-site",
+ "user-agent": "Mozilla/5.0 (X11; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/112.0",
+ }
+
+ query_param = Completion.__create_query_param(messages)
+ url = f"https://openai.a2hosted.com/chat?q={query_param}"
+ request = requests.get(url, headers=headers, stream=True)
+ if request.status_code != 200:
return Completion.__get_failure_response()
- sentence = ""
+ content = request.content
+ response = Completion.__join_response(content)
+
+ return {"responses": response}
- for message in response.iter_content(chunk_size=1024):
- message = message.decode('utf-8')
- msg_match, num_match = re.search(r'"msg":"([^"]+)"', message), re.search(r'\[DONE\] (\d+)', message)
- if msg_match:
- # Put the captured group into a sentence
- sentence += msg_match.group(1)
- return {
- 'response': sentence
- }
-
- @classmethod
- def __get_headers(cls) -> dict:
- return {
- 'authority': 'openai.a2hosted.com',
- 'accept': 'text/event-stream',
- 'accept-language': 'en-US,en;q=0.9,id;q=0.8,ja;q=0.7',
- 'cache-control': 'no-cache',
- 'sec-fetch-dest': 'empty',
- 'sec-fetch-mode': 'cors',
- 'sec-fetch-site': 'cross-site',
- 'user-agent': UserAgent().random
- }
-
@classmethod
def __get_failure_response(cls) -> dict:
- return dict(response='Unable to fetch the response, Please try again.', links=[], extra={})
-
+ return dict(
+ response="Unable to fetch the response, Please try again.",
+ links=[],
+ extra={},
+ )
+
+ @classmethod
+ def __multiple_replace(cls, string, reps) -> str:
+ for original, replacement in reps.items():
+ string = string.replace(original, replacement)
+ return string
+
@classmethod
- def __get_query_param(cls, conversation) -> str:
+ def __create_query_param(cls, conversation) -> str:
encoded_conversation = json.dumps(conversation)
- return encoded_conversation.replace(" ", "%20").replace('"', '%22').replace("'", "%27") \ No newline at end of file
+ replacement = {" ": "%20", '"': "%22", "'": "%27"}
+ return Completion.__multiple_replace(encoded_conversation, replacement)
+
+ @classmethod
+ def __convert_escape_codes(cls, text) -> str:
+ replacement = {'\\\\"': '"', '\\"': '"', "\\n": "\n", "\\'": "'"}
+ return Completion.__multiple_replace(text, replacement)
+
+ @classmethod
+ def __join_response(cls, data) -> str:
+ data = data.decode("utf-8")
+ find_ans = re.findall(r'(?<={"msg":)[^}]*', str(data))
+ ans = [Completion.__convert_escape_codes(x[1:-1]) for x in find_ans]
+ response = "".join(ans)
+ return response