From bee75be8e38d25c4568c641412a49b576d425b24 Mon Sep 17 00:00:00 2001 From: H Lohaus Date: Wed, 10 Jan 2024 10:34:56 +0100 Subject: Add create images to Bing (#1426) Add create images from Bing Add FreeChatgpt Provider Fix Bard Provider --- g4f/Provider/bing/conversation.py | 43 ++++++++++ g4f/Provider/bing/create_images.py | 164 +++++++++++++++++++++++++++++++++++++ g4f/Provider/bing/upload_image.py | 162 ++++++++++++++++++++++++++++++++++++ 3 files changed, 369 insertions(+) create mode 100644 g4f/Provider/bing/conversation.py create mode 100644 g4f/Provider/bing/create_images.py create mode 100644 g4f/Provider/bing/upload_image.py (limited to 'g4f/Provider/bing') diff --git a/g4f/Provider/bing/conversation.py b/g4f/Provider/bing/conversation.py new file mode 100644 index 00000000..ef45cd82 --- /dev/null +++ b/g4f/Provider/bing/conversation.py @@ -0,0 +1,43 @@ +from aiohttp import ClientSession + + +class Conversation(): + def __init__(self, conversationId: str, clientId: str, conversationSignature: str) -> None: + self.conversationId = conversationId + self.clientId = clientId + self.conversationSignature = conversationSignature + +async def create_conversation(session: ClientSession, proxy: str = None) -> Conversation: + url = 'https://www.bing.com/turing/conversation/create?bundleVersion=1.1199.4' + async with session.get(url, proxy=proxy) as response: + data = await response.json() + + conversationId = data.get('conversationId') + clientId = data.get('clientId') + conversationSignature = response.headers.get('X-Sydney-Encryptedconversationsignature') + + if not conversationId or not clientId or not conversationSignature: + raise Exception('Failed to create conversation.') + return Conversation(conversationId, clientId, conversationSignature) + +async def list_conversations(session: ClientSession) -> list: + url = "https://www.bing.com/turing/conversation/chats" + async with session.get(url) as response: + response = await response.json() + return response["chats"] + +async def delete_conversation(session: ClientSession, conversation: Conversation, proxy: str = None) -> list: + url = "https://sydney.bing.com/sydney/DeleteSingleConversation" + json = { + "conversationId": conversation.conversationId, + "conversationSignature": conversation.conversationSignature, + "participant": {"id": conversation.clientId}, + "source": "cib", + "optionsSets": ["autosave"] + } + try: + async with session.post(url, json=json, proxy=proxy) as response: + response = await response.json() + return response["result"]["value"] == "Success" + except: + return False \ No newline at end of file diff --git a/g4f/Provider/bing/create_images.py b/g4f/Provider/bing/create_images.py new file mode 100644 index 00000000..b203a0dc --- /dev/null +++ b/g4f/Provider/bing/create_images.py @@ -0,0 +1,164 @@ +import asyncio +import time, json, os +from aiohttp import ClientSession +from bs4 import BeautifulSoup +from urllib.parse import quote +from typing import Generator + +from ..create_images import CreateImagesProvider +from ..helper import get_cookies, get_event_loop +from ...webdriver import WebDriver, get_driver_cookies, get_browser +from ...base_provider import ProviderType + +BING_URL = "https://www.bing.com" + +def wait_for_login(driver: WebDriver, timeout: int = 1200) -> None: + driver.get(f"{BING_URL}/") + value = driver.get_cookie("_U") + if value: + return + start_time = time.time() + while True: + if time.time() - start_time > timeout: + raise RuntimeError("Timeout error") + value = driver.get_cookie("_U") + if value: + return + time.sleep(0.5) + +def create_session(cookies: dict) -> ClientSession: + headers = { + "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", + "accept-encoding": "gzip, deflate, br", + "accept-language": "en-US,en;q=0.9,zh-CN;q=0.8,zh-TW;q=0.7,zh;q=0.6", + "content-type": "application/x-www-form-urlencoded", + "referrer-policy": "origin-when-cross-origin", + "referrer": "https://www.bing.com/images/create/", + "origin": "https://www.bing.com", + "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36 Edg/111.0.1661.54", + "sec-ch-ua": "\"Microsoft Edge\";v=\"111\", \"Not(A:Brand\";v=\"8\", \"Chromium\";v=\"111\"", + "sec-ch-ua-mobile": "?0", + "sec-fetch-dest": "document", + "sec-fetch-mode": "navigate", + "sec-fetch-site": "same-origin", + "sec-fetch-user": "?1", + "upgrade-insecure-requests": "1", + } + if cookies: + headers["cookie"] = "; ".join(f"{k}={v}" for k, v in cookies.items()) + return ClientSession(headers=headers) + +async def create_images(session: ClientSession, prompt: str, proxy: str = None, timeout: int = 300) -> list: + url_encoded_prompt = quote(prompt) + payload = f"q={url_encoded_prompt}&rt=4&FORM=GENCRE" + url = f"{BING_URL}/images/create?q={url_encoded_prompt}&rt=4&FORM=GENCRE" + async with session.post( + url, + allow_redirects=False, + data=payload, + timeout=timeout, + ) as response: + response.raise_for_status() + errors = [ + "this prompt is being reviewed", + "this prompt has been blocked", + "we're working hard to offer image creator in more languages" + ] + text = (await response.text()).lower() + for error in errors: + if error in text: + raise RuntimeError(f"Create images failed: {error}") + if response.status != 302: + url = f"{BING_URL}/images/create?q={url_encoded_prompt}&rt=3&FORM=GENCRE" + async with session.post(url, allow_redirects=False, proxy=proxy, timeout=timeout) as response: + if response.status != 302: + raise RuntimeError(f"Create images failed. Status Code: {response.status}") + + redirect_url = response.headers["Location"].replace("&nfy=1", "") + redirect_url = f"{BING_URL}{redirect_url}" + request_id = redirect_url.split("id=")[1] + async with session.get(redirect_url) as response: + response.raise_for_status() + + polling_url = f"{BING_URL}/images/create/async/results/{request_id}?q={url_encoded_prompt}" + start_time = time.time() + while True: + if time.time() - start_time > timeout: + raise RuntimeError(f"Timeout error after {timeout} seconds") + async with session.get(polling_url) as response: + if response.status != 200: + raise RuntimeError(f"Polling images faild. Status Code: {response.status}") + text = await response.text() + if not text: + await asyncio.sleep(1) + else: + break + error = None + try: + error = json.loads(text).get("errorMessage") + except: + pass + if error == "Pending": + raise RuntimeError("Prompt is been blocked") + elif error: + raise RuntimeError(error) + return read_images(text) + +def read_images(text: str) -> list: + html_soup = BeautifulSoup(text, "html.parser") + tags = html_soup.find_all("img") + image_links = [img["src"] for img in tags if "mimg" in img["class"]] + images = [link.split("?w=")[0] for link in image_links] + bad_images = [ + "https://r.bing.com/rp/in-2zU3AJUdkgFe7ZKv19yPBHVs.png", + "https://r.bing.com/rp/TX9QuO3WzcCJz1uaaSwQAz39Kb0.jpg", + ] + if any(im in bad_images for im in images): + raise RuntimeError("Bad images found") + if not images: + raise RuntimeError("No images found") + return images + +def format_images_markdown(images: list, prompt: str) -> str: + images = [f"[![#{idx+1} {prompt}]({image}?w=200&h=200)]({image})" for idx, image in enumerate(images)] + images = "\n".join(images) + start_flag = "\n" + end_flag = "\n" + return f"\n{start_flag}{images}\n{end_flag}\n" + +async def create_images_markdown(cookies: dict, prompt: str, proxy: str = None) -> str: + session = create_session(cookies) + try: + images = await create_images(session, prompt, proxy) + return format_images_markdown(images, prompt) + finally: + await session.close() + +def get_cookies_from_browser(proxy: str = None) -> dict: + driver = get_browser(proxy=proxy) + try: + wait_for_login(driver) + return get_driver_cookies(driver) + finally: + driver.quit() + +def create_completion(prompt: str, cookies: dict = None, proxy: str = None) -> Generator: + loop = get_event_loop() + if not cookies: + cookies = get_cookies(".bing.com") + if "_U" not in cookies: + login_url = os.environ.get("G4F_LOGIN_URL") + if login_url: + yield f"Please login: [Bing]({login_url})\n\n" + cookies = get_cookies_from_browser(proxy) + yield loop.run_until_complete(create_images_markdown(cookies, prompt, proxy)) + +async def create_async(prompt: str, cookies: dict = None, proxy: str = None) -> str: + if not cookies: + cookies = get_cookies(".bing.com") + if "_U" not in cookies: + cookies = get_cookies_from_browser(proxy) + return await create_images_markdown(cookies, prompt, proxy) + +def patch_provider(provider: ProviderType) -> CreateImagesProvider: + return CreateImagesProvider(provider, create_completion, create_async) \ No newline at end of file diff --git a/g4f/Provider/bing/upload_image.py b/g4f/Provider/bing/upload_image.py new file mode 100644 index 00000000..329e6df4 --- /dev/null +++ b/g4f/Provider/bing/upload_image.py @@ -0,0 +1,162 @@ +from __future__ import annotations + +import string +import random +import json +import re +import io +import base64 +import numpy as np +from PIL import Image +from aiohttp import ClientSession + +async def upload_image( + session: ClientSession, + image: str, + tone: str, + proxy: str = None +): + try: + image_config = { + "maxImagePixels": 360000, + "imageCompressionRate": 0.7, + "enableFaceBlurDebug": 0, + } + is_data_uri_an_image(image) + img_binary_data = extract_data_uri(image) + is_accepted_format(img_binary_data) + img = Image.open(io.BytesIO(img_binary_data)) + width, height = img.size + max_image_pixels = image_config['maxImagePixels'] + if max_image_pixels / (width * height) < 1: + new_width = int(width * np.sqrt(max_image_pixels / (width * height))) + new_height = int(height * np.sqrt(max_image_pixels / (width * height))) + else: + new_width = width + new_height = height + try: + orientation = get_orientation(img) + except Exception: + orientation = None + new_img = process_image(orientation, img, new_width, new_height) + new_img_binary_data = compress_image_to_base64(new_img, image_config['imageCompressionRate']) + data, boundary = build_image_upload_api_payload(new_img_binary_data, tone) + headers = session.headers.copy() + headers["content-type"] = f'multipart/form-data; boundary={boundary}' + headers["referer"] = 'https://www.bing.com/search?q=Bing+AI&showconv=1&FORM=hpcodx' + headers["origin"] = 'https://www.bing.com' + async with session.post("https://www.bing.com/images/kblob", data=data, headers=headers, proxy=proxy) as response: + if response.status != 200: + raise RuntimeError("Failed to upload image.") + image_info = await response.json() + if not image_info.get('blobId'): + raise RuntimeError("Failed to parse image info.") + result = {'bcid': image_info.get('blobId', "")} + result['blurredBcid'] = image_info.get('processedBlobId', "") + if result['blurredBcid'] != "": + result["imageUrl"] = "https://www.bing.com/images/blob?bcid=" + result['blurredBcid'] + elif result['bcid'] != "": + result["imageUrl"] = "https://www.bing.com/images/blob?bcid=" + result['bcid'] + result['originalImageUrl'] = ( + "https://www.bing.com/images/blob?bcid=" + + result['blurredBcid'] + if image_config["enableFaceBlurDebug"] + else "https://www.bing.com/images/blob?bcid=" + + result['bcid'] + ) + return result + except Exception as e: + raise RuntimeError(f"Upload image failed: {e}") + + +def build_image_upload_api_payload(image_bin: str, tone: str): + payload = { + 'invokedSkills': ["ImageById"], + 'subscriptionId': "Bing.Chat.Multimodal", + 'invokedSkillsRequestData': { + 'enableFaceBlur': True + }, + 'convoData': { + 'convoid': "", + 'convotone': tone + } + } + knowledge_request = { + 'imageInfo': {}, + 'knowledgeRequest': payload + } + boundary="----WebKitFormBoundary" + ''.join(random.choices(string.ascii_letters + string.digits, k=16)) + data = ( + f'--{boundary}' + + '\r\nContent-Disposition: form-data; name="knowledgeRequest"\r\n\r\n' + + json.dumps(knowledge_request, ensure_ascii=False) + + "\r\n--" + + boundary + + '\r\nContent-Disposition: form-data; name="imageBase64"\r\n\r\n' + + image_bin + + "\r\n--" + + boundary + + "--\r\n" + ) + return data, boundary + +def is_data_uri_an_image(data_uri: str): + # Check if the data URI starts with 'data:image' and contains an image format (e.g., jpeg, png, gif) + if not re.match(r'data:image/(\w+);base64,', data_uri): + raise ValueError("Invalid data URI image.") + # Extract the image format from the data URI + image_format = re.match(r'data:image/(\w+);base64,', data_uri).group(1) + # Check if the image format is one of the allowed formats (jpg, jpeg, png, gif) + if image_format.lower() not in ['jpeg', 'jpg', 'png', 'gif']: + raise ValueError("Invalid image format (from mime file type).") + +def is_accepted_format(binary_data: bytes) -> bool: + if binary_data.startswith(b'\xFF\xD8\xFF'): + pass # It's a JPEG image + elif binary_data.startswith(b'\x89PNG\r\n\x1a\n'): + pass # It's a PNG image + elif binary_data.startswith(b'GIF87a') or binary_data.startswith(b'GIF89a'): + pass # It's a GIF image + elif binary_data.startswith(b'\x89JFIF') or binary_data.startswith(b'JFIF\x00'): + pass # It's a JPEG image + elif binary_data.startswith(b'\xFF\xD8'): + pass # It's a JPEG image + elif binary_data.startswith(b'RIFF') and binary_data[8:12] == b'WEBP': + pass # It's a WebP image + else: + raise ValueError("Invalid image format (from magic code).") + +def extract_data_uri(data_uri: str) -> bytes: + data = data_uri.split(",")[1] + data = base64.b64decode(data) + return data + +def get_orientation(data: bytes) -> int: + if data[:2] != b'\xFF\xD8': + raise Exception('NotJpeg') + with Image.open(data) as img: + exif_data = img._getexif() + if exif_data is not None: + orientation = exif_data.get(274) # 274 corresponds to the orientation tag in EXIF + if orientation is not None: + return orientation + +def process_image(orientation: int, img: Image.Image, new_width: int, new_height: int) -> Image.Image: + # Initialize the canvas + new_img = Image.new("RGB", (new_width, new_height), color="#FFFFFF") + if orientation: + if orientation > 4: + img = img.transpose(Image.FLIP_LEFT_RIGHT) + if orientation in [3, 4]: + img = img.transpose(Image.ROTATE_180) + if orientation in [5, 6]: + img = img.transpose(Image.ROTATE_270) + if orientation in [7, 8]: + img = img.transpose(Image.ROTATE_90) + new_img.paste(img, (0, 0)) + return new_img + +def compress_image_to_base64(image: Image.Image, compression_rate: float) -> str: + output_buffer = io.BytesIO() + image.save(output_buffer, format="JPEG", quality=int(compression_rate * 100)) + return base64.b64encode(output_buffer.getvalue()).decode('utf-8') \ No newline at end of file -- cgit v1.2.3