summaryrefslogblamecommitdiffstats
path: root/g4f/Provider/MagickPen.py
blob: eab705360a322a2a8a98e7c8e720b6b2e98feb03 (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11
12
13
14













                                                                     

                                                             



                           

                            
    



                              
















































































                                                                                                                                  





















                                                                                                   
from __future__ import annotations

import time
import random
import hashlib
import re
from aiohttp import ClientSession

from ..typing import AsyncResult, Messages
from .base_provider import AsyncGeneratorProvider, ProviderModelMixin
from .helper import format_prompt

class MagickPen(AsyncGeneratorProvider, ProviderModelMixin):
    url = "https://magickpen.com"
    api_endpoint_free = "https://api.magickpen.com/chat/free"
    api_endpoint_ask = "https://api.magickpen.com/ask"
    working = True
    supports_gpt_4 = True
    supports_stream = False
    
    default_model = 'free'
    models = ['free', 'ask']
    
    model_aliases = {
        "gpt-4o-mini": "free",
        "gpt-4o-mini": "ask",
    }

    @classmethod
    def get_model(cls, model: str) -> str:
        if model in cls.models:
            return model
        elif model in cls.model_aliases:
            return cls.model_aliases[model]
        else:
            return cls.default_model

    @classmethod
    async def get_secrets(cls):
        url = 'https://magickpen.com/_nuxt/02c76dc.js'
        async with ClientSession() as session:
            async with session.get(url) as response:
                if response.status == 200:
                    text = await response.text()
                    x_api_secret_match = re.search(r'"X-API-Secret":"([^"]+)"', text)
                    secret_match = re.search(r'secret:\s*"([^"]+)"', text)
                    
                    x_api_secret = x_api_secret_match.group(1) if x_api_secret_match else None
                    secret = secret_match.group(1) if secret_match else None
                    
                    # Generate timestamp and nonce dynamically
                    timestamp = str(int(time.time() * 1000))
                    nonce = str(random.random())
                    
                    # Generate signature
                    signature_parts = ["TGDBU9zCgM", timestamp, nonce]
                    signature_string = "".join(sorted(signature_parts))
                    signature = hashlib.md5(signature_string.encode()).hexdigest()
                    
                    return {
                        'X-API-Secret': x_api_secret,
                        'signature': signature,
                        'timestamp': timestamp,
                        'nonce': nonce,
                        'secret': secret
                    }
                else:
                    print(f"Error while fetching the file: {response.status}")
                    return None

    @classmethod
    async def create_async_generator(
        cls,
        model: str,
        messages: Messages,
        proxy: str = None,
        **kwargs
    ) -> AsyncResult:
        model = cls.get_model(model)
        
        secrets = await cls.get_secrets()
        if not secrets:
            raise Exception("Failed to obtain necessary secrets")

        headers = {
            "accept": "application/json, text/plain, */*",
            "accept-language": "en-US,en;q=0.9",
            "cache-control": "no-cache",
            "content-type": "application/json",
            "nonce": secrets['nonce'],
            "origin": "https://magickpen.com",
            "pragma": "no-cache",
            "priority": "u=1, i",
            "referer": "https://magickpen.com/",
            "sec-ch-ua": '"Chromium";v="127", "Not)A;Brand";v="99"',
            "sec-ch-ua-mobile": "?0",
            "sec-ch-ua-platform": '"Linux"',
            "sec-fetch-dest": "empty",
            "sec-fetch-mode": "cors",
            "sec-fetch-site": "same-site",
            "secret": secrets['secret'],
            "signature": secrets['signature'],
            "timestamp": secrets['timestamp'],
            "user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36",
            "x-api-secret": secrets['X-API-Secret']
        }
        
        async with ClientSession(headers=headers) as session:
            if model == 'free':
                data = {
                    "history": [{"role": "user", "content": format_prompt(messages)}]
                }
                async with session.post(cls.api_endpoint_free, json=data, proxy=proxy) as response:
                    response.raise_for_status()
                    result = await response.text()
                    yield result
            
            elif model == 'ask':
                data = {
                    "query": format_prompt(messages),
                    "plan": "Pay as you go"
                }
                async with session.post(cls.api_endpoint_ask, json=data, proxy=proxy) as response:
                    response.raise_for_status()
                    async for chunk in response.content:
                        if chunk:
                            yield chunk.decode()
            
            else:
                raise ValueError(f"Unknown model: {model}")