From 920fe19608ba06ed8c2b4c9a23944af35cf24e56 Mon Sep 17 00:00:00 2001 From: Raju Komati Date: Fri, 28 Apr 2023 00:40:43 +0530 Subject: added main module for accessing all services --- quora/api.py | 545 ----------------------------------------------------------- 1 file changed, 545 deletions(-) delete mode 100644 quora/api.py (limited to 'quora/api.py') diff --git a/quora/api.py b/quora/api.py deleted file mode 100644 index 697f6663..00000000 --- a/quora/api.py +++ /dev/null @@ -1,545 +0,0 @@ -# This file was taken from the repository poe-api https://github.com/ading2210/poe-api and is unmodified -# This file is licensed under the GNU GPL v3 and written by @ading2210 - -# license: -# ading2210/poe-api: a reverse engineered Python API wrapepr for Quora's Poe -# Copyright (C) 2023 ading2210 - -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. - -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. - -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -import hashlib -import json -import logging -import queue -import random -import re -import threading -import time -import traceback -from pathlib import Path -from urllib.parse import urlparse - -import requests -import requests.adapters -import websocket - -parent_path = Path(__file__).resolve().parent -queries_path = parent_path / "graphql" -queries = {} - -logging.basicConfig() -logger = logging.getLogger() - -user_agent = "Mozilla/5.0 (X11; Linux x86_64; rv:102.0) Gecko/20100101 Firefox/102.0" - - -def load_queries(): - for path in queries_path.iterdir(): - if path.suffix != ".graphql": - continue - with open(path) as f: - queries[path.stem] = f.read() - - -def generate_payload(query_name, variables): - return {"query": queries[query_name], "variables": variables} - - -def request_with_retries(method, *args, **kwargs): - attempts = kwargs.get("attempts") or 10 - url = args[0] - for i in range(attempts): - r = method(*args, **kwargs) - if r.status_code == 200: - return r - logger.warn( - f"Server returned a status code of {r.status_code} while downloading {url}. Retrying ({i + 1}/{attempts})..." - ) - - raise RuntimeError(f"Failed to download {url} too many times.") - - -class Client: - gql_url = "https://poe.com/api/gql_POST" - gql_recv_url = "https://poe.com/api/receive_POST" - home_url = "https://poe.com" - settings_url = "https://poe.com/api/settings" - - def __init__(self, token, proxy=None): - self.proxy = proxy - self.session = requests.Session() - self.adapter = requests.adapters.HTTPAdapter(pool_connections=100, pool_maxsize=100) - self.session.mount("http://", self.adapter) - self.session.mount("https://", self.adapter) - - if proxy: - self.session.proxies = {"http": self.proxy, "https": self.proxy} - logger.info(f"Proxy enabled: {self.proxy}") - - self.active_messages = {} - self.message_queues = {} - - self.session.cookies.set("p-b", token, domain="poe.com") - self.headers = { - "User-Agent": user_agent, - "Referrer": "https://poe.com/", - "Origin": "https://poe.com", - } - self.session.headers.update(self.headers) - - self.setup_connection() - self.connect_ws() - - def setup_connection(self): - self.ws_domain = f"tch{random.randint(1, 1e6)}" - self.next_data = self.get_next_data(overwrite_vars=True) - self.channel = self.get_channel_data() - self.bots = self.get_bots(download_next_data=False) - self.bot_names = self.get_bot_names() - - self.gql_headers = { - "poe-formkey": self.formkey, - "poe-tchannel": self.channel["channel"], - } - self.gql_headers = {**self.gql_headers, **self.headers} - self.subscribe() - - def extract_formkey(self, html): - script_regex = r"" - script_text = re.search(script_regex, html).group(1) - key_regex = r'var .="([0-9a-f]+)",' - key_text = re.search(key_regex, script_text).group(1) - cipher_regex = r".\[(\d+)\]=.\[(\d+)\]" - cipher_pairs = re.findall(cipher_regex, script_text) - - formkey_list = [""] * len(cipher_pairs) - for pair in cipher_pairs: - formkey_index, key_index = map(int, pair) - formkey_list[formkey_index] = key_text[key_index] - formkey = "".join(formkey_list) - - return formkey - - def get_next_data(self, overwrite_vars=False): - logger.info("Downloading next_data...") - - r = request_with_retries(self.session.get, self.home_url) - json_regex = r'' - json_text = re.search(json_regex, r.text).group(1) - next_data = json.loads(json_text) - - if overwrite_vars: - self.formkey = self.extract_formkey(r.text) - self.viewer = next_data["props"]["pageProps"]["payload"]["viewer"] - self.next_data = next_data - - return next_data - - def get_bot(self, display_name): - url = f'https://poe.com/_next/data/{self.next_data["buildId"]}/{display_name}.json' - - r = request_with_retries(self.session.get, url) - - chat_data = r.json()["pageProps"]["payload"]["chatOfBotDisplayName"] - return chat_data - - def get_bots(self, download_next_data=True): - logger.info("Downloading all bots...") - if download_next_data: - next_data = self.get_next_data(overwrite_vars=True) - else: - next_data = self.next_data - - if not "availableBots" in self.viewer: - raise RuntimeError("Invalid token or no bots are available.") - bot_list = self.viewer["availableBots"] - - threads = [] - bots = {} - - def get_bot_thread(bot): - chat_data = self.get_bot(bot["displayName"]) - bots[chat_data["defaultBotObject"]["nickname"]] = chat_data - - for bot in bot_list: - thread = threading.Thread(target=get_bot_thread, args=(bot,), daemon=True) - threads.append(thread) - - for thread in threads: - thread.start() - for thread in threads: - thread.join() - - self.bots = bots - self.bot_names = self.get_bot_names() - return bots - - def get_bot_names(self): - bot_names = {} - for bot_nickname in self.bots: - bot_obj = self.bots[bot_nickname]["defaultBotObject"] - bot_names[bot_nickname] = bot_obj["displayName"] - return bot_names - - def get_remaining_messages(self, chatbot): - chat_data = self.get_bot(self.bot_names[chatbot]) - return chat_data["defaultBotObject"]["messageLimit"]["numMessagesRemaining"] - - def get_channel_data(self, channel=None): - logger.info("Downloading channel data...") - r = request_with_retries(self.session.get, self.settings_url) - data = r.json() - - return data["tchannelData"] - - def get_websocket_url(self, channel=None): - if channel is None: - channel = self.channel - query = f'?min_seq={channel["minSeq"]}&channel={channel["channel"]}&hash={channel["channelHash"]}' - return f'wss://{self.ws_domain}.tch.{channel["baseHost"]}/up/{channel["boxName"]}/updates' + query - - def send_query(self, query_name, variables): - for i in range(20): - json_data = generate_payload(query_name, variables) - payload = json.dumps(json_data, separators=(",", ":")) - - base_string = payload + self.gql_headers["poe-formkey"] + "WpuLMiXEKKE98j56k" - - headers = { - "content-type": "application/json", - "poe-tag-id": hashlib.md5(base_string.encode()).hexdigest(), - } - headers = {**self.gql_headers, **headers} - - r = request_with_retries(self.session.post, self.gql_url, data=payload, headers=headers) - - data = r.json() - if data["data"] == None: - logger.warn(f'{query_name} returned an error: {data["errors"][0]["message"]} | Retrying ({i + 1}/20)') - time.sleep(2) - continue - - return r.json() - - raise RuntimeError(f"{query_name} failed too many times.") - - def subscribe(self): - logger.info("Subscribing to mutations") - result = self.send_query( - "SubscriptionsMutation", - { - "subscriptions": [ - { - "subscriptionName": "messageAdded", - "query": queries["MessageAddedSubscription"], - }, - { - "subscriptionName": "viewerStateUpdated", - "query": queries["ViewerStateUpdatedSubscription"], - }, - ] - }, - ) - - def ws_run_thread(self): - kwargs = {} - if self.proxy: - proxy_parsed = urlparse(self.proxy) - kwargs = { - "proxy_type": proxy_parsed.scheme, - "http_proxy_host": proxy_parsed.hostname, - "http_proxy_port": proxy_parsed.port, - } - - self.ws.run_forever(**kwargs) - - def connect_ws(self): - self.ws_connected = False - self.ws = websocket.WebSocketApp( - self.get_websocket_url(), - header={"User-Agent": user_agent}, - on_message=self.on_message, - on_open=self.on_ws_connect, - on_error=self.on_ws_error, - on_close=self.on_ws_close, - ) - t = threading.Thread(target=self.ws_run_thread, daemon=True) - t.start() - while not self.ws_connected: - time.sleep(0.01) - - def disconnect_ws(self): - if self.ws: - self.ws.close() - self.ws_connected = False - - def on_ws_connect(self, ws): - self.ws_connected = True - - def on_ws_close(self, ws, close_status_code, close_message): - self.ws_connected = False - logger.warn(f"Websocket closed with status {close_status_code}: {close_message}") - - def on_ws_error(self, ws, error): - self.disconnect_ws() - self.connect_ws() - - def on_message(self, ws, msg): - try: - data = json.loads(msg) - - if not "messages" in data: - return - - for message_str in data["messages"]: - message_data = json.loads(message_str) - if message_data["message_type"] != "subscriptionUpdate": - continue - message = message_data["payload"]["data"]["messageAdded"] - - copied_dict = self.active_messages.copy() - for key, value in copied_dict.items(): - # add the message to the appropriate queue - if value == message["messageId"] and key in self.message_queues: - self.message_queues[key].put(message) - return - - # indicate that the response id is tied to the human message id - elif key != "pending" and value == None and message["state"] != "complete": - self.active_messages[key] = message["messageId"] - self.message_queues[key].put(message) - return - - except Exception: - logger.error(traceback.format_exc()) - self.disconnect_ws() - self.connect_ws() - - def send_message(self, chatbot, message, with_chat_break=False, timeout=20): - # if there is another active message, wait until it has finished sending - while None in self.active_messages.values(): - time.sleep(0.01) - - # None indicates that a message is still in progress - self.active_messages["pending"] = None - - logger.info(f"Sending message to {chatbot}: {message}") - - # reconnect websocket - if not self.ws_connected: - self.disconnect_ws() - self.setup_connection() - self.connect_ws() - - message_data = self.send_query( - "SendMessageMutation", - { - "bot": chatbot, - "query": message, - "chatId": self.bots[chatbot]["chatId"], - "source": None, - "withChatBreak": with_chat_break, - }, - ) - del self.active_messages["pending"] - - if not message_data["data"]["messageEdgeCreate"]["message"]: - raise RuntimeError(f"Daily limit reached for {chatbot}.") - try: - human_message = message_data["data"]["messageEdgeCreate"]["message"] - human_message_id = human_message["node"]["messageId"] - except TypeError: - raise RuntimeError(f"An unknown error occurred. Raw response data: {message_data}") - - # indicate that the current message is waiting for a response - self.active_messages[human_message_id] = None - self.message_queues[human_message_id] = queue.Queue() - - last_text = "" - message_id = None - while True: - try: - message = self.message_queues[human_message_id].get(timeout=timeout) - except queue.Empty: - del self.active_messages[human_message_id] - del self.message_queues[human_message_id] - raise RuntimeError("Response timed out.") - - # only break when the message is marked as complete - if message["state"] == "complete": - if last_text and message["messageId"] == message_id: - break - else: - continue - - # update info about response - message["text_new"] = message["text"][len(last_text):] - last_text = message["text"] - message_id = message["messageId"] - - yield message - - del self.active_messages[human_message_id] - del self.message_queues[human_message_id] - - def send_chat_break(self, chatbot): - logger.info(f"Sending chat break to {chatbot}") - result = self.send_query("AddMessageBreakMutation", {"chatId": self.bots[chatbot]["chatId"]}) - return result["data"]["messageBreakCreate"]["message"] - - def get_message_history(self, chatbot, count=25, cursor=None): - logger.info(f"Downloading {count} messages from {chatbot}") - - messages = [] - if cursor == None: - chat_data = self.get_bot(self.bot_names[chatbot]) - if not chat_data["messagesConnection"]["edges"]: - return [] - messages = chat_data["messagesConnection"]["edges"][:count] - cursor = chat_data["messagesConnection"]["pageInfo"]["startCursor"] - count -= len(messages) - - cursor = str(cursor) - if count > 50: - messages = self.get_message_history(chatbot, count=50, cursor=cursor) + messages - while count > 0: - count -= 50 - new_cursor = messages[0]["cursor"] - new_messages = self.get_message_history(chatbot, min(50, count), cursor=new_cursor) - messages = new_messages + messages - return messages - elif count <= 0: - return messages - - result = self.send_query( - "ChatListPaginationQuery", - {"count": count, "cursor": cursor, "id": self.bots[chatbot]["id"]}, - ) - query_messages = result["data"]["node"]["messagesConnection"]["edges"] - messages = query_messages + messages - return messages - - def delete_message(self, message_ids): - logger.info(f"Deleting messages: {message_ids}") - if not type(message_ids) is list: - message_ids = [int(message_ids)] - - result = self.send_query("DeleteMessageMutation", {"messageIds": message_ids}) - - def purge_conversation(self, chatbot, count=-1): - logger.info(f"Purging messages from {chatbot}") - last_messages = self.get_message_history(chatbot, count=50)[::-1] - while last_messages: - message_ids = [] - for message in last_messages: - if count == 0: - break - count -= 1 - message_ids.append(message["node"]["messageId"]) - - self.delete_message(message_ids) - - if count == 0: - return - last_messages = self.get_message_history(chatbot, count=50)[::-1] - logger.info(f"No more messages left to delete.") - - def create_bot( - self, - handle, - prompt="", - base_model="chinchilla", - description="", - intro_message="", - api_key=None, - api_bot=False, - api_url=None, - prompt_public=True, - pfp_url=None, - linkification=False, - markdown_rendering=True, - suggested_replies=False, - private=False, - ): - result = self.send_query( - "PoeBotCreateMutation", - { - "model": base_model, - "handle": handle, - "prompt": prompt, - "isPromptPublic": prompt_public, - "introduction": intro_message, - "description": description, - "profilePictureUrl": pfp_url, - "apiUrl": api_url, - "apiKey": api_key, - "isApiBot": api_bot, - "hasLinkification": linkification, - "hasMarkdownRendering": markdown_rendering, - "hasSuggestedReplies": suggested_replies, - "isPrivateBot": private, - }, - ) - - data = result["data"]["poeBotCreate"] - if data["status"] != "success": - raise RuntimeError(f"Poe returned an error while trying to create a bot: {data['status']}") - self.get_bots() - return data - - def edit_bot( - self, - bot_id, - handle, - prompt="", - base_model="chinchilla", - description="", - intro_message="", - api_key=None, - api_url=None, - private=False, - prompt_public=True, - pfp_url=None, - linkification=False, - markdown_rendering=True, - suggested_replies=False, - ): - result = self.send_query( - "PoeBotEditMutation", - { - "baseBot": base_model, - "botId": bot_id, - "handle": handle, - "prompt": prompt, - "isPromptPublic": prompt_public, - "introduction": intro_message, - "description": description, - "profilePictureUrl": pfp_url, - "apiUrl": api_url, - "apiKey": api_key, - "hasLinkification": linkification, - "hasMarkdownRendering": markdown_rendering, - "hasSuggestedReplies": suggested_replies, - "isPrivateBot": private, - }, - ) - - data = result["data"]["poeBotEdit"] - if data["status"] != "success": - raise RuntimeError(f"Poe returned an error while trying to edit a bot: {data['status']}") - self.get_bots() - return data - - -load_queries() -- cgit v1.2.3