summaryrefslogtreecommitdiffstats
path: root/g4f/Provider/bing
diff options
context:
space:
mode:
Diffstat (limited to 'g4f/Provider/bing')
-rw-r--r--g4f/Provider/bing/conversation.py44
-rw-r--r--g4f/Provider/bing/create_images.py224
-rw-r--r--g4f/Provider/bing/upload_image.py188
3 files changed, 327 insertions, 129 deletions
diff --git a/g4f/Provider/bing/conversation.py b/g4f/Provider/bing/conversation.py
index 9e011c26..36ada3b0 100644
--- a/g4f/Provider/bing/conversation.py
+++ b/g4f/Provider/bing/conversation.py
@@ -1,13 +1,33 @@
from aiohttp import ClientSession
-
-class Conversation():
+class Conversation:
+ """
+ Represents a conversation with specific attributes.
+ """
def __init__(self, conversationId: str, clientId: str, conversationSignature: str) -> None:
+ """
+ Initialize a new conversation instance.
+
+ Args:
+ conversationId (str): Unique identifier for the conversation.
+ clientId (str): Client identifier.
+ conversationSignature (str): Signature for the conversation.
+ """
self.conversationId = conversationId
self.clientId = clientId
self.conversationSignature = conversationSignature
async def create_conversation(session: ClientSession, proxy: str = None) -> Conversation:
+ """
+ Create a new conversation asynchronously.
+
+ Args:
+ session (ClientSession): An instance of aiohttp's ClientSession.
+ proxy (str, optional): Proxy URL. Defaults to None.
+
+ Returns:
+ Conversation: An instance representing the created conversation.
+ """
url = 'https://www.bing.com/turing/conversation/create?bundleVersion=1.1199.4'
async with session.get(url, proxy=proxy) as response:
try:
@@ -24,12 +44,32 @@ async def create_conversation(session: ClientSession, proxy: str = None) -> Conv
return Conversation(conversationId, clientId, conversationSignature)
async def list_conversations(session: ClientSession) -> list:
+ """
+ List all conversations asynchronously.
+
+ Args:
+ session (ClientSession): An instance of aiohttp's ClientSession.
+
+ Returns:
+ list: A list of conversations.
+ """
url = "https://www.bing.com/turing/conversation/chats"
async with session.get(url) as response:
response = await response.json()
return response["chats"]
async def delete_conversation(session: ClientSession, conversation: Conversation, proxy: str = None) -> bool:
+ """
+ Delete a conversation asynchronously.
+
+ Args:
+ session (ClientSession): An instance of aiohttp's ClientSession.
+ conversation (Conversation): The conversation to delete.
+ proxy (str, optional): Proxy URL. Defaults to None.
+
+ Returns:
+ bool: True if deletion was successful, False otherwise.
+ """
url = "https://sydney.bing.com/sydney/DeleteSingleConversation"
json = {
"conversationId": conversation.conversationId,
diff --git a/g4f/Provider/bing/create_images.py b/g4f/Provider/bing/create_images.py
index a1ecace3..29daccbd 100644
--- a/g4f/Provider/bing/create_images.py
+++ b/g4f/Provider/bing/create_images.py
@@ -1,9 +1,16 @@
+"""
+This module provides functionalities for creating and managing images using Bing's service.
+It includes functions for user login, session creation, image creation, and processing.
+"""
+
import asyncio
-import time, json, os
+import time
+import json
+import os
from aiohttp import ClientSession
from bs4 import BeautifulSoup
from urllib.parse import quote
-from typing import Generator
+from typing import Generator, List, Dict
from ..create_images import CreateImagesProvider
from ..helper import get_cookies, get_event_loop
@@ -12,23 +19,47 @@ from ...base_provider import ProviderType
from ...image import format_images_markdown
BING_URL = "https://www.bing.com"
+TIMEOUT_LOGIN = 1200
+TIMEOUT_IMAGE_CREATION = 300
+ERRORS = [
+ "this prompt is being reviewed",
+ "this prompt has been blocked",
+ "we're working hard to offer image creator in more languages",
+ "we can't create your images right now"
+]
+BAD_IMAGES = [
+ "https://r.bing.com/rp/in-2zU3AJUdkgFe7ZKv19yPBHVs.png",
+ "https://r.bing.com/rp/TX9QuO3WzcCJz1uaaSwQAz39Kb0.jpg",
+]
+
+def wait_for_login(driver: WebDriver, timeout: int = TIMEOUT_LOGIN) -> None:
+ """
+ Waits for the user to log in within a given timeout period.
-def wait_for_login(driver: WebDriver, timeout: int = 1200) -> None:
+ Args:
+ driver (WebDriver): Webdriver for browser automation.
+ timeout (int): Maximum waiting time in seconds.
+
+ Raises:
+ RuntimeError: If the login process exceeds the timeout.
+ """
driver.get(f"{BING_URL}/")
- value = driver.get_cookie("_U")
- if value:
- return
start_time = time.time()
- while True:
+ while not driver.get_cookie("_U"):
if time.time() - start_time > timeout:
raise RuntimeError("Timeout error")
- value = driver.get_cookie("_U")
- if value:
- time.sleep(1)
- return
time.sleep(0.5)
-def create_session(cookies: dict) -> ClientSession:
+def create_session(cookies: Dict[str, str]) -> ClientSession:
+ """
+ Creates a new client session with specified cookies and headers.
+
+ Args:
+ cookies (Dict[str, str]): Cookies to be used for the session.
+
+ Returns:
+ ClientSession: The created client session.
+ """
headers = {
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"accept-encoding": "gzip, deflate, br",
@@ -47,28 +78,32 @@ def create_session(cookies: dict) -> ClientSession:
"upgrade-insecure-requests": "1",
}
if cookies:
- headers["cookie"] = "; ".join(f"{k}={v}" for k, v in cookies.items())
+ headers["Cookie"] = "; ".join(f"{k}={v}" for k, v in cookies.items())
return ClientSession(headers=headers)
-async def create_images(session: ClientSession, prompt: str, proxy: str = None, timeout: int = 300) -> list:
- url_encoded_prompt = quote(prompt)
+async def create_images(session: ClientSession, prompt: str, proxy: str = None, timeout: int = TIMEOUT_IMAGE_CREATION) -> List[str]:
+ """
+ Creates images based on a given prompt using Bing's service.
+
+ Args:
+ session (ClientSession): Active client session.
+ prompt (str): Prompt to generate images.
+ proxy (str, optional): Proxy configuration.
+ timeout (int): Timeout for the request.
+
+ Returns:
+ List[str]: A list of URLs to the created images.
+
+ Raises:
+ RuntimeError: If image creation fails or times out.
+ """
+ url_encoded_prompt = quote(prompt)
payload = f"q={url_encoded_prompt}&rt=4&FORM=GENCRE"
url = f"{BING_URL}/images/create?q={url_encoded_prompt}&rt=4&FORM=GENCRE"
- async with session.post(
- url,
- allow_redirects=False,
- data=payload,
- timeout=timeout,
- ) as response:
+ async with session.post(url, allow_redirects=False, data=payload, timeout=timeout) as response:
response.raise_for_status()
- errors = [
- "this prompt is being reviewed",
- "this prompt has been blocked",
- "we're working hard to offer image creator in more languages",
- "we can't create your images right now"
- ]
text = (await response.text()).lower()
- for error in errors:
+ for error in ERRORS:
if error in text:
raise RuntimeError(f"Create images failed: {error}")
if response.status != 302:
@@ -107,54 +142,109 @@ async def create_images(session: ClientSession, prompt: str, proxy: str = None,
raise RuntimeError(error)
return read_images(text)
-def read_images(text: str) -> list:
- html_soup = BeautifulSoup(text, "html.parser")
- tags = html_soup.find_all("img")
- image_links = [img["src"] for img in tags if "mimg" in img["class"]]
- images = [link.split("?w=")[0] for link in image_links]
- bad_images = [
- "https://r.bing.com/rp/in-2zU3AJUdkgFe7ZKv19yPBHVs.png",
- "https://r.bing.com/rp/TX9QuO3WzcCJz1uaaSwQAz39Kb0.jpg",
- ]
- if any(im in bad_images for im in images):
+def read_images(html_content: str) -> List[str]:
+ """
+ Extracts image URLs from the HTML content.
+
+ Args:
+ html_content (str): HTML content containing image URLs.
+
+ Returns:
+ List[str]: A list of image URLs.
+ """
+ soup = BeautifulSoup(html_content, "html.parser")
+ tags = soup.find_all("img", class_="mimg")
+ images = [img["src"].split("?w=")[0] for img in tags]
+ if any(im in BAD_IMAGES for im in images):
raise RuntimeError("Bad images found")
if not images:
raise RuntimeError("No images found")
return images
-async def create_images_markdown(cookies: dict, prompt: str, proxy: str = None) -> str:
- session = create_session(cookies)
- try:
+async def create_images_markdown(cookies: Dict[str, str], prompt: str, proxy: str = None) -> str:
+ """
+ Creates markdown formatted string with images based on the prompt.
+
+ Args:
+ cookies (Dict[str, str]): Cookies to be used for the session.
+ prompt (str): Prompt to generate images.
+ proxy (str, optional): Proxy configuration.
+
+ Returns:
+ str: Markdown formatted string with images.
+ """
+ async with create_session(cookies) as session:
images = await create_images(session, prompt, proxy)
return format_images_markdown(images, prompt)
- finally:
- await session.close()
-def get_cookies_from_browser(proxy: str = None) -> dict:
- driver = get_browser(proxy=proxy)
- try:
+def get_cookies_from_browser(proxy: str = None) -> Dict[str, str]:
+ """
+ Retrieves cookies from the browser using webdriver.
+
+ Args:
+ proxy (str, optional): Proxy configuration.
+
+ Returns:
+ Dict[str, str]: Retrieved cookies.
+ """
+ with get_browser(proxy=proxy) as driver:
wait_for_login(driver)
+ time.sleep(1)
return get_driver_cookies(driver)
- finally:
- driver.quit()
-
-def create_completion(prompt: str, cookies: dict = None, proxy: str = None) -> Generator:
- loop = get_event_loop()
- if not cookies:
- cookies = get_cookies(".bing.com")
- if "_U" not in cookies:
- login_url = os.environ.get("G4F_LOGIN_URL")
- if login_url:
- yield f"Please login: [Bing]({login_url})\n\n"
- cookies = get_cookies_from_browser(proxy)
- yield loop.run_until_complete(create_images_markdown(cookies, prompt, proxy))
-
-async def create_async(prompt: str, cookies: dict = None, proxy: str = None) -> str:
- if not cookies:
- cookies = get_cookies(".bing.com")
- if "_U" not in cookies:
- cookies = get_cookies_from_browser(proxy)
- return await create_images_markdown(cookies, prompt, proxy)
+
+class CreateImagesBing:
+ """A class for creating images using Bing."""
+
+ _cookies: Dict[str, str] = {}
+
+ @classmethod
+ def create_completion(cls, prompt: str, cookies: Dict[str, str] = None, proxy: str = None) -> Generator[str]:
+ """
+ Generator for creating imagecompletion based on a prompt.
+
+ Args:
+ prompt (str): Prompt to generate images.
+ cookies (Dict[str, str], optional): Cookies for the session. If None, cookies are retrieved automatically.
+ proxy (str, optional): Proxy configuration.
+
+ Yields:
+ Generator[str, None, None]: The final output as markdown formatted string with images.
+ """
+ loop = get_event_loop()
+ cookies = cookies or cls._cookies or get_cookies(".bing.com")
+ if "_U" not in cookies:
+ login_url = os.environ.get("G4F_LOGIN_URL")
+ if login_url:
+ yield f"Please login: [Bing]({login_url})\n\n"
+ cls._cookies = cookies = get_cookies_from_browser(proxy)
+ yield loop.run_until_complete(create_images_markdown(cookies, prompt, proxy))
+
+ @classmethod
+ async def create_async(cls, prompt: str, cookies: Dict[str, str] = None, proxy: str = None) -> str:
+ """
+ Asynchronously creates a markdown formatted string with images based on the prompt.
+
+ Args:
+ prompt (str): Prompt to generate images.
+ cookies (Dict[str, str], optional): Cookies for the session. If None, cookies are retrieved automatically.
+ proxy (str, optional): Proxy configuration.
+
+ Returns:
+ str: Markdown formatted string with images.
+ """
+ cookies = cookies or cls._cookies or get_cookies(".bing.com")
+ if "_U" not in cookies:
+ cls._cookies = cookies = get_cookies_from_browser(proxy)
+ return await create_images_markdown(cookies, prompt, proxy)
def patch_provider(provider: ProviderType) -> CreateImagesProvider:
- return CreateImagesProvider(provider, create_completion, create_async) \ No newline at end of file
+ """
+ Patches a provider to include image creation capabilities.
+
+ Args:
+ provider (ProviderType): The provider to be patched.
+
+ Returns:
+ CreateImagesProvider: The patched provider with image creation capabilities.
+ """
+ return CreateImagesProvider(provider, CreateImagesBing.create_completion, CreateImagesBing.create_async) \ No newline at end of file
diff --git a/g4f/Provider/bing/upload_image.py b/g4f/Provider/bing/upload_image.py
index 1af902ef..4d70659f 100644
--- a/g4f/Provider/bing/upload_image.py
+++ b/g4f/Provider/bing/upload_image.py
@@ -1,64 +1,107 @@
-from __future__ import annotations
+"""
+Module to handle image uploading and processing for Bing AI integrations.
+"""
+from __future__ import annotations
import string
import random
import json
import math
-from ...typing import ImageType
from aiohttp import ClientSession
+from PIL import Image
+
+from ...typing import ImageType, Tuple
from ...image import to_image, process_image, to_base64, ImageResponse
-image_config = {
+IMAGE_CONFIG = {
"maxImagePixels": 360000,
"imageCompressionRate": 0.7,
- "enableFaceBlurDebug": 0,
+ "enableFaceBlurDebug": False,
}
async def upload_image(
- session: ClientSession,
- image: ImageType,
- tone: str,
+ session: ClientSession,
+ image_data: ImageType,
+ tone: str,
proxy: str = None
) -> ImageResponse:
- image = to_image(image)
- width, height = image.size
- max_image_pixels = image_config['maxImagePixels']
- if max_image_pixels / (width * height) < 1:
- new_width = int(width * math.sqrt(max_image_pixels / (width * height)))
- new_height = int(height * math.sqrt(max_image_pixels / (width * height)))
- else:
- new_width = width
- new_height = height
- new_img = process_image(image, new_width, new_height)
- new_img_binary_data = to_base64(new_img, image_config['imageCompressionRate'])
- data, boundary = build_image_upload_api_payload(new_img_binary_data, tone)
- headers = session.headers.copy()
- headers["content-type"] = f'multipart/form-data; boundary={boundary}'
- headers["referer"] = 'https://www.bing.com/search?q=Bing+AI&showconv=1&FORM=hpcodx'
- headers["origin"] = 'https://www.bing.com'
+ """
+ Uploads an image to Bing's AI service and returns the image response.
+
+ Args:
+ session (ClientSession): The active session.
+ image_data (bytes): The image data to be uploaded.
+ tone (str): The tone of the conversation.
+ proxy (str, optional): Proxy if any. Defaults to None.
+
+ Raises:
+ RuntimeError: If the image upload fails.
+
+ Returns:
+ ImageResponse: The response from the image upload.
+ """
+ image = to_image(image_data)
+ new_width, new_height = calculate_new_dimensions(image)
+ processed_img = process_image(image, new_width, new_height)
+ img_binary_data = to_base64(processed_img, IMAGE_CONFIG['imageCompressionRate'])
+
+ data, boundary = build_image_upload_payload(img_binary_data, tone)
+ headers = prepare_headers(session, boundary)
+
async with session.post("https://www.bing.com/images/kblob", data=data, headers=headers, proxy=proxy) as response:
if response.status != 200:
raise RuntimeError("Failed to upload image.")
- image_info = await response.json()
- if not image_info.get('blobId'):
- raise RuntimeError("Failed to parse image info.")
- result = {'bcid': image_info.get('blobId', "")}
- result['blurredBcid'] = image_info.get('processedBlobId', "")
- if result['blurredBcid'] != "":
- result["imageUrl"] = "https://www.bing.com/images/blob?bcid=" + result['blurredBcid']
- elif result['bcid'] != "":
- result["imageUrl"] = "https://www.bing.com/images/blob?bcid=" + result['bcid']
- result['originalImageUrl'] = (
- "https://www.bing.com/images/blob?bcid="
- + result['blurredBcid']
- if image_config["enableFaceBlurDebug"]
- else "https://www.bing.com/images/blob?bcid="
- + result['bcid']
- )
- return ImageResponse(result["imageUrl"], "", result)
-
-def build_image_upload_api_payload(image_bin: str, tone: str):
- payload = {
+ return parse_image_response(await response.json())
+
+def calculate_new_dimensions(image: Image.Image) -> Tuple[int, int]:
+ """
+ Calculates the new dimensions for the image based on the maximum allowed pixels.
+
+ Args:
+ image (Image): The PIL Image object.
+
+ Returns:
+ Tuple[int, int]: The new width and height for the image.
+ """
+ width, height = image.size
+ max_image_pixels = IMAGE_CONFIG['maxImagePixels']
+ if max_image_pixels / (width * height) < 1:
+ scale_factor = math.sqrt(max_image_pixels / (width * height))
+ return int(width * scale_factor), int(height * scale_factor)
+ return width, height
+
+def build_image_upload_payload(image_bin: str, tone: str) -> Tuple[str, str]:
+ """
+ Builds the payload for image uploading.
+
+ Args:
+ image_bin (str): Base64 encoded image binary data.
+ tone (str): The tone of the conversation.
+
+ Returns:
+ Tuple[str, str]: The data and boundary for the payload.
+ """
+ boundary = "----WebKitFormBoundary" + ''.join(random.choices(string.ascii_letters + string.digits, k=16))
+ data = f"--{boundary}\r\n" \
+ f"Content-Disposition: form-data; name=\"knowledgeRequest\"\r\n\r\n" \
+ f"{json.dumps(build_knowledge_request(tone), ensure_ascii=False)}\r\n" \
+ f"--{boundary}\r\n" \
+ f"Content-Disposition: form-data; name=\"imageBase64\"\r\n\r\n" \
+ f"{image_bin}\r\n" \
+ f"--{boundary}--\r\n"
+ return data, boundary
+
+def build_knowledge_request(tone: str) -> dict:
+ """
+ Builds the knowledge request payload.
+
+ Args:
+ tone (str): The tone of the conversation.
+
+ Returns:
+ dict: The knowledge request payload.
+ """
+ return {
'invokedSkills': ["ImageById"],
'subscriptionId': "Bing.Chat.Multimodal",
'invokedSkillsRequestData': {
@@ -69,21 +112,46 @@ def build_image_upload_api_payload(image_bin: str, tone: str):
'convotone': tone
}
}
- knowledge_request = {
- 'imageInfo': {},
- 'knowledgeRequest': payload
- }
- boundary="----WebKitFormBoundary" + ''.join(random.choices(string.ascii_letters + string.digits, k=16))
- data = (
- f'--{boundary}'
- + '\r\nContent-Disposition: form-data; name="knowledgeRequest"\r\n\r\n'
- + json.dumps(knowledge_request, ensure_ascii=False)
- + "\r\n--"
- + boundary
- + '\r\nContent-Disposition: form-data; name="imageBase64"\r\n\r\n'
- + image_bin
- + "\r\n--"
- + boundary
- + "--\r\n"
+
+def prepare_headers(session: ClientSession, boundary: str) -> dict:
+ """
+ Prepares the headers for the image upload request.
+
+ Args:
+ session (ClientSession): The active session.
+ boundary (str): The boundary string for the multipart/form-data.
+
+ Returns:
+ dict: The headers for the request.
+ """
+ headers = session.headers.copy()
+ headers["Content-Type"] = f'multipart/form-data; boundary={boundary}'
+ headers["Referer"] = 'https://www.bing.com/search?q=Bing+AI&showconv=1&FORM=hpcodx'
+ headers["Origin"] = 'https://www.bing.com'
+ return headers
+
+def parse_image_response(response: dict) -> ImageResponse:
+ """
+ Parses the response from the image upload.
+
+ Args:
+ response (dict): The response dictionary.
+
+ Raises:
+ RuntimeError: If parsing the image info fails.
+
+ Returns:
+ ImageResponse: The parsed image response.
+ """
+ if not response.get('blobId'):
+ raise RuntimeError("Failed to parse image info.")
+
+ result = {'bcid': response.get('blobId', ""), 'blurredBcid': response.get('processedBlobId', "")}
+ result["imageUrl"] = f"https://www.bing.com/images/blob?bcid={result['blurredBcid'] or result['bcid']}"
+
+ result['originalImageUrl'] = (
+ f"https://www.bing.com/images/blob?bcid={result['blurredBcid']}"
+ if IMAGE_CONFIG["enableFaceBlurDebug"] else
+ f"https://www.bing.com/images/blob?bcid={result['bcid']}"
)
- return data, boundary \ No newline at end of file
+ return ImageResponse(result["imageUrl"], "", result) \ No newline at end of file