diff options
Diffstat (limited to '')
-rw-r--r-- | g4f/Provider/Blackbox.py | 439 |
1 files changed, 155 insertions, 284 deletions
diff --git a/g4f/Provider/Blackbox.py b/g4f/Provider/Blackbox.py index 4052893a..8d820344 100644 --- a/g4f/Provider/Blackbox.py +++ b/g4f/Provider/Blackbox.py @@ -1,21 +1,16 @@ from __future__ import annotations -import asyncio -import aiohttp +from aiohttp import ClientSession import random import string import json -import uuid import re -from typing import Optional, AsyncGenerator, Union - -from aiohttp import ClientSession, ClientResponseError +import aiohttp from ..typing import AsyncResult, Messages, ImageType from .base_provider import AsyncGeneratorProvider, ProviderModelMixin from ..image import ImageResponse, to_data_uri - class Blackbox(AsyncGeneratorProvider, ProviderModelMixin): label = "Blackbox AI" url = "https://www.blackbox.ai" @@ -24,102 +19,127 @@ class Blackbox(AsyncGeneratorProvider, ProviderModelMixin): supports_stream = True supports_system_message = True supports_message_history = True - + _last_validated_value = None + default_model = 'blackboxai' - image_models = ['ImageGeneration'] - models = [ - default_model, - 'blackboxai-pro', - *image_models, - "llama-3.1-8b", - 'llama-3.1-70b', - 'llama-3.1-405b', - 'gpt-4o', - 'gemini-pro', - 'gemini-1.5-flash', - 'claude-sonnet-3.5', - 'PythonAgent', - 'JavaAgent', - 'JavaScriptAgent', - 'HTMLAgent', - 'GoogleCloudAgent', - 'AndroidDeveloper', - 'SwiftDeveloper', - 'Next.jsAgent', - 'MongoDBAgent', - 'PyTorchAgent', - 'ReactAgent', - 'XcodeAgent', - 'AngularJSAgent', - ] - + + image_models = ['Image Generation', 'repomap'] + + userSelectedModel = ['gpt-4o', 'gemini-pro', 'claude-sonnet-3.5', 'blackboxai-pro'] + agentMode = { - 'ImageGeneration': {'mode': True, 'id': "ImageGenerationLV45LJp", 'name': "Image Generation"}, + 'Image Generation': {'mode': True, 'id': "ImageGenerationLV45LJp", 'name': "Image Generation"}, } - + trendingAgentMode = { - "blackboxai": {}, "gemini-1.5-flash": {'mode': True, 'id': 'Gemini'}, "llama-3.1-8b": {'mode': True, 'id': "llama-3.1-8b"}, 'llama-3.1-70b': {'mode': True, 'id': "llama-3.1-70b"}, - 'llama-3.1-405b': {'mode': True, 'id': "llama-3.1-405b"}, + 'llama-3.1-405b': {'mode': True, 'id': "llama-3.1-405"}, + # + 'Python Agent': {'mode': True, 'id': "Python Agent"}, + 'Java Agent': {'mode': True, 'id': "Java Agent"}, + 'JavaScript Agent': {'mode': True, 'id': "JavaScript Agent"}, + 'HTML Agent': {'mode': True, 'id': "HTML Agent"}, + 'Google Cloud Agent': {'mode': True, 'id': "Google Cloud Agent"}, + 'Android Developer': {'mode': True, 'id': "Android Developer"}, + 'Swift Developer': {'mode': True, 'id': "Swift Developer"}, + 'Next.js Agent': {'mode': True, 'id': "Next.js Agent"}, + 'MongoDB Agent': {'mode': True, 'id': "MongoDB Agent"}, + 'PyTorch Agent': {'mode': True, 'id': "PyTorch Agent"}, + 'React Agent': {'mode': True, 'id': "React Agent"}, + 'Xcode Agent': {'mode': True, 'id': "Xcode Agent"}, + 'AngularJS Agent': {'mode': True, 'id': "AngularJS Agent"}, 'blackboxai-pro': {'mode': True, 'id': "BLACKBOXAI-PRO"}, - 'PythonAgent': {'mode': True, 'id': "Python Agent"}, - 'JavaAgent': {'mode': True, 'id': "Java Agent"}, - 'JavaScriptAgent': {'mode': True, 'id': "JavaScript Agent"}, - 'HTMLAgent': {'mode': True, 'id': "HTML Agent"}, - 'GoogleCloudAgent': {'mode': True, 'id': "Google Cloud Agent"}, - 'AndroidDeveloper': {'mode': True, 'id': "Android Developer"}, - 'SwiftDeveloper': {'mode': True, 'id': "Swift Developer"}, - 'Next.jsAgent': {'mode': True, 'id': "Next.js Agent"}, - 'MongoDBAgent': {'mode': True, 'id': "MongoDB Agent"}, - 'PyTorchAgent': {'mode': True, 'id': "PyTorch Agent"}, - 'ReactAgent': {'mode': True, 'id': "React Agent"}, - 'XcodeAgent': {'mode': True, 'id': "Xcode Agent"}, - 'AngularJSAgent': {'mode': True, 'id': "AngularJS Agent"}, - } - - userSelectedModel = { - "gpt-4o": "gpt-4o", - "gemini-pro": "gemini-pro", - 'claude-sonnet-3.5': "claude-sonnet-3.5", - } - - model_prefixes = { - 'gpt-4o': '@GPT-4o', - 'gemini-pro': '@Gemini-PRO', - 'claude-sonnet-3.5': '@Claude-Sonnet-3.5', - 'PythonAgent': '@Python Agent', - 'JavaAgent': '@Java Agent', - 'JavaScriptAgent': '@JavaScript Agent', - 'HTMLAgent': '@HTML Agent', - 'GoogleCloudAgent': '@Google Cloud Agent', - 'AndroidDeveloper': '@Android Developer', - 'SwiftDeveloper': '@Swift Developer', - 'Next.jsAgent': '@Next.js Agent', - 'MongoDBAgent': '@MongoDB Agent', - 'PyTorchAgent': '@PyTorch Agent', - 'ReactAgent': '@React Agent', - 'XcodeAgent': '@Xcode Agent', - 'AngularJSAgent': '@AngularJS Agent', - 'blackboxai-pro': '@BLACKBOXAI-PRO', - 'ImageGeneration': '@Image Generation', - } - - model_referers = { - "blackboxai": "/?model=blackboxai", - "gpt-4o": "/?model=gpt-4o", - "gemini-pro": "/?model=gemini-pro", - "claude-sonnet-3.5": "/?model=claude-sonnet-3.5" + # + 'repomap': {'mode': True, 'id': "repomap"}, + # + 'Heroku Agent': {'mode': True, 'id': "Heroku Agent"}, + 'Godot Agent': {'mode': True, 'id': "Godot Agent"}, + 'Go Agent': {'mode': True, 'id': "Go Agent"}, + 'Gitlab Agent': {'mode': True, 'id': "Gitlab Agent"}, + 'Git Agent': {'mode': True, 'id': "Git Agent"}, + 'Flask Agent': {'mode': True, 'id': "Flask Agent"}, + 'Firebase Agent': {'mode': True, 'id': "Firebase Agent"}, + 'FastAPI Agent': {'mode': True, 'id': "FastAPI Agent"}, + 'Erlang Agent': {'mode': True, 'id': "Erlang Agent"}, + 'Electron Agent': {'mode': True, 'id': "Electron Agent"}, + 'Docker Agent': {'mode': True, 'id': "Docker Agent"}, + 'DigitalOcean Agent': {'mode': True, 'id': "DigitalOcean Agent"}, + 'Bitbucket Agent': {'mode': True, 'id': "Bitbucket Agent"}, + 'Azure Agent': {'mode': True, 'id': "Azure Agent"}, + 'Flutter Agent': {'mode': True, 'id': "Flutter Agent"}, + 'Youtube Agent': {'mode': True, 'id': "Youtube Agent"}, + 'builder Agent': {'mode': True, 'id': "builder Agent"}, } + + model_prefixes = {mode: f"@{value['id']}" for mode, value in trendingAgentMode.items() if mode not in ["gemini-1.5-flash", "llama-3.1-8b", "llama-3.1-70b", "llama-3.1-405b", "repomap"]} + + models = [default_model, *userSelectedModel, *list(agentMode.keys()), *list(trendingAgentMode.keys())] + model_aliases = { "gemini-flash": "gemini-1.5-flash", "claude-3.5-sonnet": "claude-sonnet-3.5", - "flux": "ImageGeneration", + "flux": "Image Generation", } @classmethod + async def fetch_validated(cls): + # If the key is already stored in memory, return it + if cls._last_validated_value: + return cls._last_validated_value + + # If the key is not found, perform a search + async with aiohttp.ClientSession() as session: + try: + async with session.get(cls.url) as response: + if response.status != 200: + print("Failed to load the page.") + return cls._last_validated_value + + page_content = await response.text() + js_files = re.findall(r'static/chunks/\d{4}-[a-fA-F0-9]+\.js', page_content) + + key_pattern = re.compile(r'w="([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})"') + + for js_file in js_files: + js_url = f"{cls.url}/_next/{js_file}" + async with session.get(js_url) as js_response: + if js_response.status == 200: + js_content = await js_response.text() + match = key_pattern.search(js_content) + if match: + validated_value = match.group(1) + cls._last_validated_value = validated_value # Keep in mind + return validated_value + except Exception as e: + print(f"Error fetching validated value: {e}") + + return cls._last_validated_value + + + @staticmethod + def generate_id(length=7): + characters = string.ascii_letters + string.digits + return ''.join(random.choice(characters) for _ in range(length)) + + @classmethod + def add_prefix_to_messages(cls, messages: Messages, model: str) -> Messages: + prefix = cls.model_prefixes.get(model, "") + if not prefix: + return messages + + new_messages = [] + for message in messages: + new_message = message.copy() + if message['role'] == 'user': + new_message['content'] = (prefix + " " + message['content']).strip() + new_messages.append(new_message) + + return new_messages + + @classmethod def get_model(cls, model: str) -> str: if model in cls.models: return model @@ -128,140 +148,55 @@ class Blackbox(AsyncGeneratorProvider, ProviderModelMixin): else: return cls.default_model - @staticmethod - def generate_random_string(length: int = 7) -> str: - characters = string.ascii_letters + string.digits - return ''.join(random.choices(characters, k=length)) - - @staticmethod - def generate_next_action() -> str: - return uuid.uuid4().hex - - @staticmethod - def generate_next_router_state_tree() -> str: - router_state = [ - "", - { - "children": [ - "(chat)", - { - "children": [ - "__PAGE__", - {} - ] - } - ] - }, - None, - None, - True - ] - return json.dumps(router_state) - - @staticmethod - def clean_response(text: str) -> str: - pattern = r'^\$\@\$v=undefined-rv1\$\@\$' - cleaned_text = re.sub(pattern, '', text) - return cleaned_text - @classmethod async def create_async_generator( cls, model: str, messages: Messages, - proxy: Optional[str] = None, + proxy: str = None, + web_search: bool = False, image: ImageType = None, image_name: str = None, - web_search: bool = False, **kwargs - ) -> AsyncGenerator[Union[str, ImageResponse], None]: - """ - Creates an asynchronous generator for streaming responses from Blackbox AI. - - Parameters: - model (str): Model to use for generating responses. - messages (Messages): Message history. - proxy (Optional[str]): Proxy URL, if needed. - image (ImageType): Image data to be processed, if any. - image_name (str): Name of the image file, if an image is provided. - web_search (bool): Enables or disables web search mode. - **kwargs: Additional keyword arguments. + ) -> AsyncResult: + model = cls.get_model(model) + message_id = cls.generate_id() + messages_with_prefix = cls.add_prefix_to_messages(messages, model) + validated_value = await cls.fetch_validated() - Yields: - Union[str, ImageResponse]: Segments of the generated response or ImageResponse objects. - """ - if image is not None: - messages[-1]['data'] = { + messages_with_prefix[-1]['data'] = { 'fileText': '', 'imageBase64': to_data_uri(image), 'title': image_name } - messages[-1]['content'] = 'FILE:BB\n$#$\n\n$#$\n' + messages[-1]['content'] - - model = cls.get_model(model) - - chat_id = cls.generate_random_string() - next_action = cls.generate_next_action() - next_router_state_tree = cls.generate_next_router_state_tree() - - agent_mode = cls.agentMode.get(model, {}) - trending_agent_mode = cls.trendingAgentMode.get(model, {}) - - prefix = cls.model_prefixes.get(model, "") - - formatted_prompt = "" - for message in messages: - role = message.get('role', '').capitalize() - content = message.get('content', '') - if role and content: - formatted_prompt += f"{role}: {content}\n" - - if prefix: - formatted_prompt = f"{prefix} {formatted_prompt}".strip() - referer_path = cls.model_referers.get(model, f"/?model={model}") - referer_url = f"{cls.url}{referer_path}" - - common_headers = { + headers = { 'accept': '*/*', 'accept-language': 'en-US,en;q=0.9', 'cache-control': 'no-cache', + 'content-type': 'application/json', 'origin': cls.url, 'pragma': 'no-cache', 'priority': 'u=1, i', - 'sec-ch-ua': '"Chromium";v="129", "Not=A?Brand";v="8"', + 'referer': f'{cls.url}/', + 'sec-ch-ua': '"Not?A_Brand";v="99", "Chromium";v="130"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Linux"', 'sec-fetch-dest': 'empty', 'sec-fetch-mode': 'cors', 'sec-fetch-site': 'same-origin', - 'user-agent': 'Mozilla/5.0 (X11; Linux x86_64) ' - 'AppleWebKit/537.36 (KHTML, like Gecko) ' - 'Chrome/129.0.0.0 Safari/537.36' - } - - headers_api_chat = { - 'Content-Type': 'application/json', - 'Referer': referer_url + 'user-agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36' } - headers_api_chat_combined = {**common_headers, **headers_api_chat} - - payload_api_chat = { - "messages": [ - { - "id": chat_id, - "content": formatted_prompt, - "role": "user", - "data": messages[-1].get('data') - } - ], - "id": chat_id, + + data = { + "messages": messages_with_prefix, + "id": message_id, "previewToken": None, "userId": None, "codeModelMode": True, - "agentMode": agent_mode, - "trendingAgentMode": trending_agent_mode, + "agentMode": cls.agentMode.get(model, {}) if model in cls.agentMode else {}, + "trendingAgentMode": cls.trendingAgentMode.get(model, {}) if model in cls.trendingAgentMode else {}, "isMicMode": False, "userSystemPrompt": None, "maxTokens": 1024, @@ -274,99 +209,35 @@ class Blackbox(AsyncGeneratorProvider, ProviderModelMixin): "clickedForceWebSearch": False, "visitFromDelta": False, "mobileClient": False, + "userSelectedModel": model if model in cls.userSelectedModel else None, "webSearchMode": web_search, - "userSelectedModel": cls.userSelectedModel.get(model, model) + "validated": validated_value, } - headers_chat = { - 'Accept': 'text/x-component', - 'Content-Type': 'text/plain;charset=UTF-8', - 'Referer': f'{cls.url}/chat/{chat_id}?model={model}', - 'next-action': next_action, - 'next-router-state-tree': next_router_state_tree, - 'next-url': '/' - } - headers_chat_combined = {**common_headers, **headers_chat} - - data_chat = '[]' - - async with ClientSession(headers=common_headers) as session: - try: - async with session.post( - cls.api_endpoint, - headers=headers_api_chat_combined, - json=payload_api_chat, - proxy=proxy - ) as response_api_chat: - response_api_chat.raise_for_status() - text = await response_api_chat.text() - cleaned_response = cls.clean_response(text) - - if model in cls.image_models: - match = re.search(r'!\[.*?\]\((https?://[^\)]+)\)', cleaned_response) - if match: - image_url = match.group(1) - image_response = ImageResponse(images=image_url, alt="Generated Image") - yield image_response - else: - yield cleaned_response - else: - if web_search: - match = re.search(r'\$~~~\$(.*?)\$~~~\$', cleaned_response, re.DOTALL) - if match: - source_part = match.group(1).strip() - answer_part = cleaned_response[match.end():].strip() - try: - sources = json.loads(source_part) - source_formatted = "**Source:**\n" - for item in sources: - title = item.get('title', 'No Title') - link = item.get('link', '#') - position = item.get('position', '') - source_formatted += f"{position}. [{title}]({link})\n" - final_response = f"{answer_part}\n\n{source_formatted}" - except json.JSONDecodeError: - final_response = f"{answer_part}\n\nSource information is unavailable." - else: - final_response = cleaned_response - else: - if '$~~~$' in cleaned_response: - final_response = cleaned_response.split('$~~~$')[0].strip() - else: - final_response = cleaned_response - - yield final_response - except ClientResponseError as e: - error_text = f"Error {e.status}: {e.message}" - try: - error_response = await e.response.text() - cleaned_error = cls.clean_response(error_response) - error_text += f" - {cleaned_error}" - except Exception: - pass - yield error_text - except Exception as e: - yield f"Unexpected error during /api/chat request: {str(e)}" - - chat_url = f'{cls.url}/chat/{chat_id}?model={model}' - - try: - async with session.post( - chat_url, - headers=headers_chat_combined, - data=data_chat, - proxy=proxy - ) as response_chat: - response_chat.raise_for_status() - pass - except ClientResponseError as e: - error_text = f"Error {e.status}: {e.message}" - try: - error_response = await e.response.text() - cleaned_error = cls.clean_response(error_response) - error_text += f" - {cleaned_error}" - except Exception: - pass - yield error_text - except Exception as e: - yield f"Unexpected error during /chat/{chat_id} request: {str(e)}" + async with ClientSession(headers=headers) as session: + async with session.post(cls.api_endpoint, json=data, proxy=proxy) as response: + response.raise_for_status() + response_text = await response.text() + + if model in cls.image_models: + image_matches = re.findall(r'!\[.*?\]\((https?://[^\)]+)\)', response_text) + if image_matches: + image_url = image_matches[0] + image_response = ImageResponse(images=[image_url], alt="Generated Image") + yield image_response + return + + response_text = re.sub(r'Generated by BLACKBOX.AI, try unlimited chat https://www.blackbox.ai', '', response_text, flags=re.DOTALL) + + json_match = re.search(r'\$~~~\$(.*?)\$~~~\$', response_text, re.DOTALL) + if json_match: + search_results = json.loads(json_match.group(1)) + answer = response_text.split('$~~~$')[-1].strip() + + formatted_response = f"{answer}\n\n**Source:**" + for i, result in enumerate(search_results, 1): + formatted_response += f"\n{i}. {result['title']}: {result['link']}" + + yield formatted_response + else: + yield response_text.strip() |