From f8e403a745c5caff31d7edb854dcba40eba3166d Mon Sep 17 00:00:00 2001 From: kqlio67 Date: Tue, 24 Sep 2024 13:23:53 +0300 Subject: Resolved merge conflicts --- g4f/Provider/MagickPen.py | 155 +++++++++++++++++----------------------------- 1 file changed, 58 insertions(+), 97 deletions(-) (limited to 'g4f/Provider/MagickPen.py') diff --git a/g4f/Provider/MagickPen.py b/g4f/Provider/MagickPen.py index eab70536..b6a47417 100644 --- a/g4f/Provider/MagickPen.py +++ b/g4f/Provider/MagickPen.py @@ -1,72 +1,57 @@ from __future__ import annotations +from aiohttp import ClientSession +import hashlib import time import random -import hashlib import re -from aiohttp import ClientSession - +import json from ..typing import AsyncResult, Messages from .base_provider import AsyncGeneratorProvider, ProviderModelMixin from .helper import format_prompt class MagickPen(AsyncGeneratorProvider, ProviderModelMixin): url = "https://magickpen.com" - api_endpoint_free = "https://api.magickpen.com/chat/free" - api_endpoint_ask = "https://api.magickpen.com/ask" + api_endpoint = "https://api.magickpen.com/ask" working = True supports_gpt_4 = True - supports_stream = False - - default_model = 'free' - models = ['free', 'ask'] + supports_stream = True + supports_system_message = True + supports_message_history = True - model_aliases = { - "gpt-4o-mini": "free", - "gpt-4o-mini": "ask", - } - - @classmethod - def get_model(cls, model: str) -> str: - if model in cls.models: - return model - elif model in cls.model_aliases: - return cls.model_aliases[model] - else: - return cls.default_model + default_model = 'gpt-4o-mini' + models = ['gpt-4o-mini'] @classmethod - async def get_secrets(cls): - url = 'https://magickpen.com/_nuxt/02c76dc.js' + async def fetch_api_credentials(cls) -> tuple: + url = "https://magickpen.com/_nuxt/9e47cd7579e60a9d1f13.js" async with ClientSession() as session: async with session.get(url) as response: - if response.status == 200: - text = await response.text() - x_api_secret_match = re.search(r'"X-API-Secret":"([^"]+)"', text) - secret_match = re.search(r'secret:\s*"([^"]+)"', text) - - x_api_secret = x_api_secret_match.group(1) if x_api_secret_match else None - secret = secret_match.group(1) if secret_match else None - - # Generate timestamp and nonce dynamically - timestamp = str(int(time.time() * 1000)) - nonce = str(random.random()) - - # Generate signature - signature_parts = ["TGDBU9zCgM", timestamp, nonce] - signature_string = "".join(sorted(signature_parts)) - signature = hashlib.md5(signature_string.encode()).hexdigest() - - return { - 'X-API-Secret': x_api_secret, - 'signature': signature, - 'timestamp': timestamp, - 'nonce': nonce, - 'secret': secret - } - else: - print(f"Error while fetching the file: {response.status}") - return None + text = await response.text() + + # Extract the necessary values from the file + pattern = r'"X-API-Secret":"(\w+)"' + match = re.search(pattern, text) + X_API_SECRET = match.group(1) if match else None + + # Generate timestamp and nonce + timestamp = str(int(time.time() * 1000)) # in milliseconds + nonce = str(random.random()) + + # Generate the signature + s = ["TGDBU9zCgM", timestamp, nonce] + s.sort() + signature_string = ''.join(s) + signature = hashlib.md5(signature_string.encode()).hexdigest() + + pattern = r'secret:"(\w+)"' + match = re.search(pattern, text) + secret = match.group(1) if match else None + + if X_API_SECRET and timestamp and nonce and secret: + return X_API_SECRET, signature, timestamp, nonce, secret + else: + raise Exception("Unable to extract all the necessary data from the JavaScript file.") @classmethod async def create_async_generator( @@ -77,54 +62,30 @@ class MagickPen(AsyncGeneratorProvider, ProviderModelMixin): **kwargs ) -> AsyncResult: model = cls.get_model(model) + X_API_SECRET, signature, timestamp, nonce, secret = await cls.fetch_api_credentials() - secrets = await cls.get_secrets() - if not secrets: - raise Exception("Failed to obtain necessary secrets") - headers = { - "accept": "application/json, text/plain, */*", - "accept-language": "en-US,en;q=0.9", - "cache-control": "no-cache", - "content-type": "application/json", - "nonce": secrets['nonce'], - "origin": "https://magickpen.com", - "pragma": "no-cache", - "priority": "u=1, i", - "referer": "https://magickpen.com/", - "sec-ch-ua": '"Chromium";v="127", "Not)A;Brand";v="99"', - "sec-ch-ua-mobile": "?0", - "sec-ch-ua-platform": '"Linux"', - "sec-fetch-dest": "empty", - "sec-fetch-mode": "cors", - "sec-fetch-site": "same-site", - "secret": secrets['secret'], - "signature": secrets['signature'], - "timestamp": secrets['timestamp'], - "user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36", - "x-api-secret": secrets['X-API-Secret'] + 'accept': 'application/json, text/plain, */*', + 'accept-language': 'en-US,en;q=0.9', + 'content-type': 'application/json', + 'nonce': nonce, + 'origin': cls.url, + 'referer': f"{cls.url}/", + 'secret': secret, + 'signature': signature, + 'timestamp': timestamp, + 'x-api-secret': X_API_SECRET, } async with ClientSession(headers=headers) as session: - if model == 'free': - data = { - "history": [{"role": "user", "content": format_prompt(messages)}] - } - async with session.post(cls.api_endpoint_free, json=data, proxy=proxy) as response: - response.raise_for_status() - result = await response.text() - yield result - - elif model == 'ask': - data = { - "query": format_prompt(messages), - "plan": "Pay as you go" - } - async with session.post(cls.api_endpoint_ask, json=data, proxy=proxy) as response: - response.raise_for_status() - async for chunk in response.content: - if chunk: - yield chunk.decode() - - else: - raise ValueError(f"Unknown model: {model}") + prompt = format_prompt(messages) + payload = { + 'query': prompt, + 'turnstileResponse': '', + 'action': 'verify' + } + async with session.post(cls.api_endpoint, json=payload, proxy=proxy) as response: + response.raise_for_status() + async for chunk in response.content: + if chunk: + yield chunk.decode() -- cgit v1.2.3