From 920fe19608ba06ed8c2b4c9a23944af35cf24e56 Mon Sep 17 00:00:00 2001 From: Raju Komati Date: Fri, 28 Apr 2023 00:40:43 +0530 Subject: added main module for accessing all services --- openai_rev/quora/api.py | 545 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 545 insertions(+) create mode 100644 openai_rev/quora/api.py (limited to 'openai_rev/quora/api.py') diff --git a/openai_rev/quora/api.py b/openai_rev/quora/api.py new file mode 100644 index 00000000..42814f2c --- /dev/null +++ b/openai_rev/quora/api.py @@ -0,0 +1,545 @@ +# This file was taken from the repository poe-api https://github.com/ading2210/poe-api and is unmodified +# This file is licensed under the GNU GPL v3 and written by @ading2210 + +# license: +# ading2210/poe-api: a reverse engineered Python API wrapepr for Quora's Poe +# Copyright (C) 2023 ading2210 + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import hashlib +import json +import logging +import queue +import random +import re +import threading +import time +import traceback +from pathlib import Path +from urllib.parse import urlparse + +import requests +import requests.adapters +import websocket + +parent_path = Path(__file__).resolve().parent +queries_path = parent_path / "graphql" +queries = {} + +logging.basicConfig() +logger = logging.getLogger() + +user_agent = "Mozilla/5.0 (X11; Linux x86_64; rv:102.0) Gecko/20100101 Firefox/102.0" + + +def load_queries(): + for path in queries_path.iterdir(): + if path.suffix != ".graphql": + continue + with open(path) as f: + queries[path.stem] = f.read() + + +def generate_payload(query_name, variables): + return {"query": queries[query_name], "variables": variables} + + +def request_with_retries(method, *args, **kwargs): + attempts = kwargs.get("attempts") or 10 + url = args[0] + for i in range(attempts): + r = method(*args, **kwargs) + if r.status_code == 200: + return r + logger.warn( + f"Server returned a status code of {r.status_code} while downloading {url}. Retrying ({i + 1}/{attempts})..." + ) + + raise RuntimeError(f"Failed to download {url} too many times.") + + +class Client: + gql_url = "https://poe.com/api/gql_POST" + gql_recv_url = "https://poe.com/api/receive_POST" + home_url = "https://poe.com" + settings_url = "https://poe.com/api/settings" + + def __init__(self, token, proxy=None): + self.proxy = proxy + self.session = requests.Session() + self.adapter = requests.adapters.HTTPAdapter(pool_connections=100, pool_maxsize=100) + self.session.mount("http://", self.adapter) + self.session.mount("https://", self.adapter) + + if proxy: + self.session.proxies = {"http": self.proxy, "https": self.proxy} + logger.info(f"Proxy enabled: {self.proxy}") + + self.active_messages = {} + self.message_queues = {} + + self.session.cookies.set("p-b", token, domain="poe.com") + self.headers = { + "User-Agent": user_agent, + "Referrer": "https://poe.com/", + "Origin": "https://poe.com", + } + self.session.headers.update(self.headers) + + self.setup_connection() + self.connect_ws() + + def setup_connection(self): + self.ws_domain = f"tch{random.randint(1, 1e6)}" + self.next_data = self.get_next_data(overwrite_vars=True) + self.channel = self.get_channel_data() + self.bots = self.get_bots(download_next_data=False) + self.bot_names = self.get_bot_names() + + self.gql_headers = { + "poe-formkey": self.formkey, + "poe-tchannel": self.channel["channel"], + } + self.gql_headers = {**self.gql_headers, **self.headers} + self.subscribe() + + def extract_formkey(self, html): + script_regex = r"" + script_text = re.search(script_regex, html).group(1) + key_regex = r'var .="([0-9a-f]+)",' + key_text = re.search(key_regex, script_text).group(1) + cipher_regex = r".\[(\d+)\]=.\[(\d+)\]" + cipher_pairs = re.findall(cipher_regex, script_text) + + formkey_list = [""] * len(cipher_pairs) + for pair in cipher_pairs: + formkey_index, key_index = map(int, pair) + formkey_list[formkey_index] = key_text[key_index] + formkey = "".join(formkey_list) + + return formkey + + def get_next_data(self, overwrite_vars=False): + logger.info("Downloading next_data...") + + r = request_with_retries(self.session.get, self.home_url) + json_regex = r'' + json_text = re.search(json_regex, r.text).group(1) + next_data = json.loads(json_text) + + if overwrite_vars: + self.formkey = self.extract_formkey(r.text) + self.viewer = next_data["props"]["pageProps"]["payload"]["viewer"] + self.next_data = next_data + + return next_data + + def get_bot(self, display_name): + url = f'https://poe.com/_next/data/{self.next_data["buildId"]}/{display_name}.json' + + r = request_with_retries(self.session.get, url) + + chat_data = r.json()["pageProps"]["payload"]["chatOfBotDisplayName"] + return chat_data + + def get_bots(self, download_next_data=True): + logger.info("Downloading all bots...") + if download_next_data: + next_data = self.get_next_data(overwrite_vars=True) + else: + next_data = self.next_data + + if not "availableBots" in self.viewer: + raise RuntimeError("Invalid token or no bots are available.") + bot_list = self.viewer["availableBots"] + + threads = [] + bots = {} + + def get_bot_thread(bot): + chat_data = self.get_bot(bot["displayName"]) + bots[chat_data["defaultBotObject"]["nickname"]] = chat_data + + for bot in bot_list: + thread = threading.Thread(target=get_bot_thread, args=(bot,), daemon=True) + threads.append(thread) + + for thread in threads: + thread.start() + for thread in threads: + thread.join() + + self.bots = bots + self.bot_names = self.get_bot_names() + return bots + + def get_bot_names(self): + bot_names = {} + for bot_nickname in self.bots: + bot_obj = self.bots[bot_nickname]["defaultBotObject"] + bot_names[bot_nickname] = bot_obj["displayName"] + return bot_names + + def get_remaining_messages(self, chatbot): + chat_data = self.get_bot(self.bot_names[chatbot]) + return chat_data["defaultBotObject"]["messageLimit"]["numMessagesRemaining"] + + def get_channel_data(self, channel=None): + logger.info("Downloading channel data...") + r = request_with_retries(self.session.get, self.settings_url) + data = r.json() + + return data["tchannelData"] + + def get_websocket_url(self, channel=None): + if channel is None: + channel = self.channel + query = f'?min_seq={channel["minSeq"]}&channel={channel["channel"]}&hash={channel["channelHash"]}' + return f'wss://{self.ws_domain}.tch.{channel["baseHost"]}/up/{channel["boxName"]}/updates' + query + + def send_query(self, query_name, variables): + for i in range(20): + json_data = generate_payload(query_name, variables) + payload = json.dumps(json_data, separators=(",", ":")) + + base_string = payload + self.gql_headers["poe-formkey"] + "WpuLMiXEKKE98j56k" + + headers = { + "content-type": "application/json", + "poe-tag-id": hashlib.md5(base_string.encode()).hexdigest(), + } + headers = {**self.gql_headers, **headers} + + r = request_with_retries(self.session.post, self.gql_url, data=payload, headers=headers) + + data = r.json() + if data["data"] == None: + logger.warn(f'{query_name} returned an error: {data["errors"][0]["message"]} | Retrying ({i + 1}/20)') + time.sleep(2) + continue + + return r.json() + + raise RuntimeError(f"{query_name} failed too many times.") + + def subscribe(self): + logger.info("Subscribing to mutations") + result = self.send_query( + "SubscriptionsMutation", + { + "subscriptions": [ + { + "subscriptionName": "messageAdded", + "query": queries["MessageAddedSubscription"], + }, + { + "subscriptionName": "viewerStateUpdated", + "query": queries["ViewerStateUpdatedSubscription"], + }, + ] + }, + ) + + def ws_run_thread(self): + kwargs = {} + if self.proxy: + proxy_parsed = urlparse(self.proxy) + kwargs = { + "proxy_type": proxy_parsed.scheme, + "http_proxy_host": proxy_parsed.hostname, + "http_proxy_port": proxy_parsed.port, + } + + self.ws.run_forever(**kwargs) + + def connect_ws(self): + self.ws_connected = False + self.ws = websocket.WebSocketApp( + self.get_websocket_url(), + header={"User-Agent": user_agent}, + on_message=self.on_message, + on_open=self.on_ws_connect, + on_error=self.on_ws_error, + on_close=self.on_ws_close, + ) + t = threading.Thread(target=self.ws_run_thread, daemon=True) + t.start() + while not self.ws_connected: + time.sleep(0.01) + + def disconnect_ws(self): + if self.ws: + self.ws.close() + self.ws_connected = False + + def on_ws_connect(self, ws): + self.ws_connected = True + + def on_ws_close(self, ws, close_status_code, close_message): + self.ws_connected = False + logger.warn(f"Websocket closed with status {close_status_code}: {close_message}") + + def on_ws_error(self, ws, error): + self.disconnect_ws() + self.connect_ws() + + def on_message(self, ws, msg): + try: + data = json.loads(msg) + + if not "messages" in data: + return + + for message_str in data["messages"]: + message_data = json.loads(message_str) + if message_data["message_type"] != "subscriptionUpdate": + continue + message = message_data["payload"]["data"]["messageAdded"] + + copied_dict = self.active_messages.copy() + for key, value in copied_dict.items(): + # add the message to the appropriate queue + if value == message["messageId"] and key in self.message_queues: + self.message_queues[key].put(message) + return + + # indicate that the response id is tied to the human message id + elif key != "pending" and value == None and message["state"] != "complete": + self.active_messages[key] = message["messageId"] + self.message_queues[key].put(message) + return + + except Exception: + logger.error(traceback.format_exc()) + self.disconnect_ws() + self.connect_ws() + + def send_message(self, chatbot, message, with_chat_break=False, timeout=20): + # if there is another active message, wait until it has finished sending + while None in self.active_messages.values(): + time.sleep(0.01) + + # None indicates that a message is still in progress + self.active_messages["pending"] = None + + logger.info(f"Sending message to {chatbot}: {message}") + + # reconnect websocket + if not self.ws_connected: + self.disconnect_ws() + self.setup_connection() + self.connect_ws() + + message_data = self.send_query( + "SendMessageMutation", + { + "bot": chatbot, + "query": message, + "chatId": self.bots[chatbot]["chatId"], + "source": None, + "withChatBreak": with_chat_break, + }, + ) + del self.active_messages["pending"] + + if not message_data["data"]["messageEdgeCreate"]["message"]: + raise RuntimeError(f"Daily limit reached for {chatbot}.") + try: + human_message = message_data["data"]["messageEdgeCreate"]["message"] + human_message_id = human_message["node"]["messageId"] + except TypeError: + raise RuntimeError(f"An unknown error occurred. Raw response data: {message_data}") + + # indicate that the current message is waiting for a response + self.active_messages[human_message_id] = None + self.message_queues[human_message_id] = queue.Queue() + + last_text = "" + message_id = None + while True: + try: + message = self.message_queues[human_message_id].get(timeout=timeout) + except queue.Empty: + del self.active_messages[human_message_id] + del self.message_queues[human_message_id] + raise RuntimeError("Response timed out.") + + # only break when the message is marked as complete + if message["state"] == "complete": + if last_text and message["messageId"] == message_id: + break + else: + continue + + # update info about response + message["text_new"] = message["text"][len(last_text) :] + last_text = message["text"] + message_id = message["messageId"] + + yield message + + del self.active_messages[human_message_id] + del self.message_queues[human_message_id] + + def send_chat_break(self, chatbot): + logger.info(f"Sending chat break to {chatbot}") + result = self.send_query("AddMessageBreakMutation", {"chatId": self.bots[chatbot]["chatId"]}) + return result["data"]["messageBreakCreate"]["message"] + + def get_message_history(self, chatbot, count=25, cursor=None): + logger.info(f"Downloading {count} messages from {chatbot}") + + messages = [] + if cursor == None: + chat_data = self.get_bot(self.bot_names[chatbot]) + if not chat_data["messagesConnection"]["edges"]: + return [] + messages = chat_data["messagesConnection"]["edges"][:count] + cursor = chat_data["messagesConnection"]["pageInfo"]["startCursor"] + count -= len(messages) + + cursor = str(cursor) + if count > 50: + messages = self.get_message_history(chatbot, count=50, cursor=cursor) + messages + while count > 0: + count -= 50 + new_cursor = messages[0]["cursor"] + new_messages = self.get_message_history(chatbot, min(50, count), cursor=new_cursor) + messages = new_messages + messages + return messages + elif count <= 0: + return messages + + result = self.send_query( + "ChatListPaginationQuery", + {"count": count, "cursor": cursor, "id": self.bots[chatbot]["id"]}, + ) + query_messages = result["data"]["node"]["messagesConnection"]["edges"] + messages = query_messages + messages + return messages + + def delete_message(self, message_ids): + logger.info(f"Deleting messages: {message_ids}") + if not type(message_ids) is list: + message_ids = [int(message_ids)] + + result = self.send_query("DeleteMessageMutation", {"messageIds": message_ids}) + + def purge_conversation(self, chatbot, count=-1): + logger.info(f"Purging messages from {chatbot}") + last_messages = self.get_message_history(chatbot, count=50)[::-1] + while last_messages: + message_ids = [] + for message in last_messages: + if count == 0: + break + count -= 1 + message_ids.append(message["node"]["messageId"]) + + self.delete_message(message_ids) + + if count == 0: + return + last_messages = self.get_message_history(chatbot, count=50)[::-1] + logger.info(f"No more messages left to delete.") + + def create_bot( + self, + handle, + prompt="", + base_model="chinchilla", + description="", + intro_message="", + api_key=None, + api_bot=False, + api_url=None, + prompt_public=True, + pfp_url=None, + linkification=False, + markdown_rendering=True, + suggested_replies=False, + private=False, + ): + result = self.send_query( + "PoeBotCreateMutation", + { + "model": base_model, + "handle": handle, + "prompt": prompt, + "isPromptPublic": prompt_public, + "introduction": intro_message, + "description": description, + "profilePictureUrl": pfp_url, + "apiUrl": api_url, + "apiKey": api_key, + "isApiBot": api_bot, + "hasLinkification": linkification, + "hasMarkdownRendering": markdown_rendering, + "hasSuggestedReplies": suggested_replies, + "isPrivateBot": private, + }, + ) + + data = result["data"]["poeBotCreate"] + if data["status"] != "success": + raise RuntimeError(f"Poe returned an error while trying to create a bot: {data['status']}") + self.get_bots() + return data + + def edit_bot( + self, + bot_id, + handle, + prompt="", + base_model="chinchilla", + description="", + intro_message="", + api_key=None, + api_url=None, + private=False, + prompt_public=True, + pfp_url=None, + linkification=False, + markdown_rendering=True, + suggested_replies=False, + ): + result = self.send_query( + "PoeBotEditMutation", + { + "baseBot": base_model, + "botId": bot_id, + "handle": handle, + "prompt": prompt, + "isPromptPublic": prompt_public, + "introduction": intro_message, + "description": description, + "profilePictureUrl": pfp_url, + "apiUrl": api_url, + "apiKey": api_key, + "hasLinkification": linkification, + "hasMarkdownRendering": markdown_rendering, + "hasSuggestedReplies": suggested_replies, + "isPrivateBot": private, + }, + ) + + data = result["data"]["poeBotEdit"] + if data["status"] != "success": + raise RuntimeError(f"Poe returned an error while trying to edit a bot: {data['status']}") + self.get_bots() + return data + + +load_queries() -- cgit v1.2.3