summaryrefslogtreecommitdiffstats
path: root/quora/api.py
diff options
context:
space:
mode:
Diffstat (limited to 'quora/api.py')
-rw-r--r--quora/api.py258
1 files changed, 152 insertions, 106 deletions
diff --git a/quora/api.py b/quora/api.py
index 9436f869..b28c124b 100644
--- a/quora/api.py
+++ b/quora/api.py
@@ -55,10 +55,7 @@ def load_queries():
def generate_payload(query_name, variables):
- return {
- "query": queries[query_name],
- "variables": variables
- }
+ return {"query": queries[query_name], "variables": variables}
def request_with_retries(method, *args, **kwargs):
@@ -69,7 +66,8 @@ def request_with_retries(method, *args, **kwargs):
if r.status_code == 200:
return r
logger.warn(
- f"Server returned a status code of {r.status_code} while downloading {url}. Retrying ({i+1}/{attempts})...")
+ f"Server returned a status code of {r.status_code} while downloading {url}. Retrying ({i+1}/{attempts})..."
+ )
raise RuntimeError(f"Failed to download {url} too many times.")
@@ -84,15 +82,13 @@ class Client:
self.proxy = proxy
self.session = requests.Session()
self.adapter = requests.adapters.HTTPAdapter(
- pool_connections=100, pool_maxsize=100)
+ pool_connections=100, pool_maxsize=100
+ )
self.session.mount("http://", self.adapter)
self.session.mount("https://", self.adapter)
if proxy:
- self.session.proxies = {
- "http": self.proxy,
- "https": self.proxy
- }
+ self.session.proxies = {"http": self.proxy, "https": self.proxy}
logger.info(f"Proxy enabled: {self.proxy}")
self.active_messages = {}
@@ -124,11 +120,11 @@ class Client:
self.subscribe()
def extract_formkey(self, html):
- script_regex = r'<script>if\(.+\)throw new Error;(.+)</script>'
+ script_regex = r"<script>if\(.+\)throw new Error;(.+)</script>"
script_text = re.search(script_regex, html).group(1)
key_regex = r'var .="([0-9a-f]+)",'
key_text = re.search(key_regex, script_text).group(1)
- cipher_regex = r'.\[(\d+)\]=.\[(\d+)\]'
+ cipher_regex = r".\[(\d+)\]=.\[(\d+)\]"
cipher_pairs = re.findall(cipher_regex, script_text)
formkey_list = [""] * len(cipher_pairs)
@@ -143,7 +139,9 @@ class Client:
logger.info("Downloading next_data...")
r = request_with_retries(self.session.get, self.home_url)
- json_regex = r'<script id="__NEXT_DATA__" type="application\/json">(.+?)</script>'
+ json_regex = (
+ r'<script id="__NEXT_DATA__" type="application\/json">(.+?)</script>'
+ )
json_text = re.search(json_regex, r.text).group(1)
next_data = json.loads(json_text)
@@ -181,8 +179,7 @@ class Client:
bots[chat_data["defaultBotObject"]["nickname"]] = chat_data
for bot in bot_list:
- thread = threading.Thread(
- target=get_bot_thread, args=(bot,), daemon=True)
+ thread = threading.Thread(target=get_bot_thread, args=(bot,), daemon=True)
threads.append(thread)
for thread in threads:
@@ -216,50 +213,59 @@ class Client:
if channel is None:
channel = self.channel
query = f'?min_seq={channel["minSeq"]}&channel={channel["channel"]}&hash={channel["channelHash"]}'
- return f'wss://{self.ws_domain}.tch.{channel["baseHost"]}/up/{channel["boxName"]}/updates'+query
+ return (
+ f'wss://{self.ws_domain}.tch.{channel["baseHost"]}/up/{channel["boxName"]}/updates'
+ + query
+ )
def send_query(self, query_name, variables):
for i in range(20):
json_data = generate_payload(query_name, variables)
payload = json.dumps(json_data, separators=(",", ":"))
- base_string = payload + \
- self.gql_headers["poe-formkey"] + "WpuLMiXEKKE98j56k"
+ base_string = (
+ payload + self.gql_headers["poe-formkey"] + "WpuLMiXEKKE98j56k"
+ )
headers = {
"content-type": "application/json",
- "poe-tag-id": hashlib.md5(base_string.encode()).hexdigest()
+ "poe-tag-id": hashlib.md5(base_string.encode()).hexdigest(),
}
headers = {**self.gql_headers, **headers}
r = request_with_retries(
- self.session.post, self.gql_url, data=payload, headers=headers)
+ self.session.post, self.gql_url, data=payload, headers=headers
+ )
data = r.json()
if data["data"] == None:
logger.warn(
- f'{query_name} returned an error: {data["errors"][0]["message"]} | Retrying ({i+1}/20)')
+ f'{query_name} returned an error: {data["errors"][0]["message"]} | Retrying ({i+1}/20)'
+ )
time.sleep(2)
continue
return r.json()
- raise RuntimeError(f'{query_name} failed too many times.')
+ raise RuntimeError(f"{query_name} failed too many times.")
def subscribe(self):
logger.info("Subscribing to mutations")
- result = self.send_query("SubscriptionsMutation", {
- "subscriptions": [
- {
- "subscriptionName": "messageAdded",
- "query": queries["MessageAddedSubscription"]
- },
- {
- "subscriptionName": "viewerStateUpdated",
- "query": queries["ViewerStateUpdatedSubscription"]
- }
- ]
- })
+ result = self.send_query(
+ "SubscriptionsMutation",
+ {
+ "subscriptions": [
+ {
+ "subscriptionName": "messageAdded",
+ "query": queries["MessageAddedSubscription"],
+ },
+ {
+ "subscriptionName": "viewerStateUpdated",
+ "query": queries["ViewerStateUpdatedSubscription"],
+ },
+ ]
+ },
+ )
def ws_run_thread(self):
kwargs = {}
@@ -268,7 +274,7 @@ class Client:
kwargs = {
"proxy_type": proxy_parsed.scheme,
"http_proxy_host": proxy_parsed.hostname,
- "http_proxy_port": proxy_parsed.port
+ "http_proxy_port": proxy_parsed.port,
}
self.ws.run_forever(**kwargs)
@@ -281,7 +287,7 @@ class Client:
on_message=self.on_message,
on_open=self.on_ws_connect,
on_error=self.on_ws_error,
- on_close=self.on_ws_close
+ on_close=self.on_ws_close,
)
t = threading.Thread(target=self.ws_run_thread, daemon=True)
t.start()
@@ -299,7 +305,8 @@ class Client:
def on_ws_close(self, ws, close_status_code, close_message):
self.ws_connected = False
logger.warn(
- f"Websocket closed with status {close_status_code}: {close_message}")
+ f"Websocket closed with status {close_status_code}: {close_message}"
+ )
def on_ws_error(self, ws, error):
self.disconnect_ws()
@@ -326,7 +333,11 @@ class Client:
return
# indicate that the response id is tied to the human message id
- elif key != "pending" and value == None and message["state"] != "complete":
+ elif (
+ key != "pending"
+ and value == None
+ and message["state"] != "complete"
+ ):
self.active_messages[key] = message["messageId"]
self.message_queues[key].put(message)
return
@@ -352,13 +363,16 @@ class Client:
self.setup_connection()
self.connect_ws()
- message_data = self.send_query("SendMessageMutation", {
- "bot": chatbot,
- "query": message,
- "chatId": self.bots[chatbot]["chatId"],
- "source": None,
- "withChatBreak": with_chat_break
- })
+ message_data = self.send_query(
+ "SendMessageMutation",
+ {
+ "bot": chatbot,
+ "query": message,
+ "chatId": self.bots[chatbot]["chatId"],
+ "source": None,
+ "withChatBreak": with_chat_break,
+ },
+ )
del self.active_messages["pending"]
if not message_data["data"]["messageEdgeCreate"]["message"]:
@@ -368,7 +382,8 @@ class Client:
human_message_id = human_message["node"]["messageId"]
except TypeError:
raise RuntimeError(
- f"An unknown error occurred. Raw response data: {message_data}")
+ f"An unknown error occurred. Raw response data: {message_data}"
+ )
# indicate that the current message is waiting for a response
self.active_messages[human_message_id] = None
@@ -378,8 +393,7 @@ class Client:
message_id = None
while True:
try:
- message = self.message_queues[human_message_id].get(
- timeout=timeout)
+ message = self.message_queues[human_message_id].get(timeout=timeout)
except queue.Empty:
del self.active_messages[human_message_id]
del self.message_queues[human_message_id]
@@ -393,7 +407,7 @@ class Client:
continue
# update info about response
- message["text_new"] = message["text"][len(last_text):]
+ message["text_new"] = message["text"][len(last_text) :]
last_text = message["text"]
message_id = message["messageId"]
@@ -404,9 +418,9 @@ class Client:
def send_chat_break(self, chatbot):
logger.info(f"Sending chat break to {chatbot}")
- result = self.send_query("AddMessageBreakMutation", {
- "chatId": self.bots[chatbot]["chatId"]
- })
+ result = self.send_query(
+ "AddMessageBreakMutation", {"chatId": self.bots[chatbot]["chatId"]}
+ )
return result["data"]["messageBreakCreate"]["message"]
def get_message_history(self, chatbot, count=25, cursor=None):
@@ -423,23 +437,24 @@ class Client:
cursor = str(cursor)
if count > 50:
- messages = self.get_message_history(
- chatbot, count=50, cursor=cursor) + messages
+ messages = (
+ self.get_message_history(chatbot, count=50, cursor=cursor) + messages
+ )
while count > 0:
count -= 50
new_cursor = messages[0]["cursor"]
new_messages = self.get_message_history(
- chatbot, min(50, count), cursor=new_cursor)
+ chatbot, min(50, count), cursor=new_cursor
+ )
messages = new_messages + messages
return messages
elif count <= 0:
return messages
- result = self.send_query("ChatListPaginationQuery", {
- "count": count,
- "cursor": cursor,
- "id": self.bots[chatbot]["id"]
- })
+ result = self.send_query(
+ "ChatListPaginationQuery",
+ {"count": count, "cursor": cursor, "id": self.bots[chatbot]["id"]},
+ )
query_messages = result["data"]["node"]["messagesConnection"]["edges"]
messages = query_messages + messages
return messages
@@ -449,9 +464,7 @@ class Client:
if not type(message_ids) is list:
message_ids = [int(message_ids)]
- result = self.send_query("DeleteMessageMutation", {
- "messageIds": message_ids
- })
+ result = self.send_query("DeleteMessageMutation", {"messageIds": message_ids})
def purge_conversation(self, chatbot, count=-1):
logger.info(f"Purging messages from {chatbot}")
@@ -471,60 +484,93 @@ class Client:
last_messages = self.get_message_history(chatbot, count=50)[::-1]
logger.info(f"No more messages left to delete.")
- def create_bot(self, handle, prompt="", base_model="chinchilla", description="",
- intro_message="", api_key=None, api_bot=False, api_url=None,
- prompt_public=True, pfp_url=None, linkification=False,
- markdown_rendering=True, suggested_replies=False, private=False):
- result = self.send_query("PoeBotCreateMutation", {
- "model": base_model,
- "handle": handle,
- "prompt": prompt,
- "isPromptPublic": prompt_public,
- "introduction": intro_message,
- "description": description,
- "profilePictureUrl": pfp_url,
- "apiUrl": api_url,
- "apiKey": api_key,
- "isApiBot": api_bot,
- "hasLinkification": linkification,
- "hasMarkdownRendering": markdown_rendering,
- "hasSuggestedReplies": suggested_replies,
- "isPrivateBot": private
- })
+ def create_bot(
+ self,
+ handle,
+ prompt="",
+ base_model="chinchilla",
+ description="",
+ intro_message="",
+ api_key=None,
+ api_bot=False,
+ api_url=None,
+ prompt_public=True,
+ pfp_url=None,
+ linkification=False,
+ markdown_rendering=True,
+ suggested_replies=False,
+ private=False,
+ ):
+ result = self.send_query(
+ "PoeBotCreateMutation",
+ {
+ "model": base_model,
+ "handle": handle,
+ "prompt": prompt,
+ "isPromptPublic": prompt_public,
+ "introduction": intro_message,
+ "description": description,
+ "profilePictureUrl": pfp_url,
+ "apiUrl": api_url,
+ "apiKey": api_key,
+ "isApiBot": api_bot,
+ "hasLinkification": linkification,
+ "hasMarkdownRendering": markdown_rendering,
+ "hasSuggestedReplies": suggested_replies,
+ "isPrivateBot": private,
+ },
+ )
data = result["data"]["poeBotCreate"]
if data["status"] != "success":
raise RuntimeError(
- f"Poe returned an error while trying to create a bot: {data['status']}")
+ f"Poe returned an error while trying to create a bot: {data['status']}"
+ )
self.get_bots()
return data
- def edit_bot(self, bot_id, handle, prompt="", base_model="chinchilla", description="",
- intro_message="", api_key=None, api_url=None, private=False,
- prompt_public=True, pfp_url=None, linkification=False,
- markdown_rendering=True, suggested_replies=False):
-
- result = self.send_query("PoeBotEditMutation", {
- "baseBot": base_model,
- "botId": bot_id,
- "handle": handle,
- "prompt": prompt,
- "isPromptPublic": prompt_public,
- "introduction": intro_message,
- "description": description,
- "profilePictureUrl": pfp_url,
- "apiUrl": api_url,
- "apiKey": api_key,
- "hasLinkification": linkification,
- "hasMarkdownRendering": markdown_rendering,
- "hasSuggestedReplies": suggested_replies,
- "isPrivateBot": private
- })
+ def edit_bot(
+ self,
+ bot_id,
+ handle,
+ prompt="",
+ base_model="chinchilla",
+ description="",
+ intro_message="",
+ api_key=None,
+ api_url=None,
+ private=False,
+ prompt_public=True,
+ pfp_url=None,
+ linkification=False,
+ markdown_rendering=True,
+ suggested_replies=False,
+ ):
+ result = self.send_query(
+ "PoeBotEditMutation",
+ {
+ "baseBot": base_model,
+ "botId": bot_id,
+ "handle": handle,
+ "prompt": prompt,
+ "isPromptPublic": prompt_public,
+ "introduction": intro_message,
+ "description": description,
+ "profilePictureUrl": pfp_url,
+ "apiUrl": api_url,
+ "apiKey": api_key,
+ "hasLinkification": linkification,
+ "hasMarkdownRendering": markdown_rendering,
+ "hasSuggestedReplies": suggested_replies,
+ "isPrivateBot": private,
+ },
+ )
data = result["data"]["poeBotEdit"]
if data["status"] != "success":
raise RuntimeError(
- f"Poe returned an error while trying to edit a bot: {data['status']}")
+ f"Poe returned an error while trying to edit a bot: {data['status']}"
+ )
self.get_bots()
return data