diff options
Diffstat (limited to '')
-rw-r--r-- | g4f/Provider/bing/create_images.py | 224 |
1 files changed, 157 insertions, 67 deletions
diff --git a/g4f/Provider/bing/create_images.py b/g4f/Provider/bing/create_images.py index a1ecace3..29daccbd 100644 --- a/g4f/Provider/bing/create_images.py +++ b/g4f/Provider/bing/create_images.py @@ -1,9 +1,16 @@ +""" +This module provides functionalities for creating and managing images using Bing's service. +It includes functions for user login, session creation, image creation, and processing. +""" + import asyncio -import time, json, os +import time +import json +import os from aiohttp import ClientSession from bs4 import BeautifulSoup from urllib.parse import quote -from typing import Generator +from typing import Generator, List, Dict from ..create_images import CreateImagesProvider from ..helper import get_cookies, get_event_loop @@ -12,23 +19,47 @@ from ...base_provider import ProviderType from ...image import format_images_markdown BING_URL = "https://www.bing.com" +TIMEOUT_LOGIN = 1200 +TIMEOUT_IMAGE_CREATION = 300 +ERRORS = [ + "this prompt is being reviewed", + "this prompt has been blocked", + "we're working hard to offer image creator in more languages", + "we can't create your images right now" +] +BAD_IMAGES = [ + "https://r.bing.com/rp/in-2zU3AJUdkgFe7ZKv19yPBHVs.png", + "https://r.bing.com/rp/TX9QuO3WzcCJz1uaaSwQAz39Kb0.jpg", +] + +def wait_for_login(driver: WebDriver, timeout: int = TIMEOUT_LOGIN) -> None: + """ + Waits for the user to log in within a given timeout period. -def wait_for_login(driver: WebDriver, timeout: int = 1200) -> None: + Args: + driver (WebDriver): Webdriver for browser automation. + timeout (int): Maximum waiting time in seconds. + + Raises: + RuntimeError: If the login process exceeds the timeout. + """ driver.get(f"{BING_URL}/") - value = driver.get_cookie("_U") - if value: - return start_time = time.time() - while True: + while not driver.get_cookie("_U"): if time.time() - start_time > timeout: raise RuntimeError("Timeout error") - value = driver.get_cookie("_U") - if value: - time.sleep(1) - return time.sleep(0.5) -def create_session(cookies: dict) -> ClientSession: +def create_session(cookies: Dict[str, str]) -> ClientSession: + """ + Creates a new client session with specified cookies and headers. + + Args: + cookies (Dict[str, str]): Cookies to be used for the session. + + Returns: + ClientSession: The created client session. + """ headers = { "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", "accept-encoding": "gzip, deflate, br", @@ -47,28 +78,32 @@ def create_session(cookies: dict) -> ClientSession: "upgrade-insecure-requests": "1", } if cookies: - headers["cookie"] = "; ".join(f"{k}={v}" for k, v in cookies.items()) + headers["Cookie"] = "; ".join(f"{k}={v}" for k, v in cookies.items()) return ClientSession(headers=headers) -async def create_images(session: ClientSession, prompt: str, proxy: str = None, timeout: int = 300) -> list: - url_encoded_prompt = quote(prompt) +async def create_images(session: ClientSession, prompt: str, proxy: str = None, timeout: int = TIMEOUT_IMAGE_CREATION) -> List[str]: + """ + Creates images based on a given prompt using Bing's service. + + Args: + session (ClientSession): Active client session. + prompt (str): Prompt to generate images. + proxy (str, optional): Proxy configuration. + timeout (int): Timeout for the request. + + Returns: + List[str]: A list of URLs to the created images. + + Raises: + RuntimeError: If image creation fails or times out. + """ + url_encoded_prompt = quote(prompt) payload = f"q={url_encoded_prompt}&rt=4&FORM=GENCRE" url = f"{BING_URL}/images/create?q={url_encoded_prompt}&rt=4&FORM=GENCRE" - async with session.post( - url, - allow_redirects=False, - data=payload, - timeout=timeout, - ) as response: + async with session.post(url, allow_redirects=False, data=payload, timeout=timeout) as response: response.raise_for_status() - errors = [ - "this prompt is being reviewed", - "this prompt has been blocked", - "we're working hard to offer image creator in more languages", - "we can't create your images right now" - ] text = (await response.text()).lower() - for error in errors: + for error in ERRORS: if error in text: raise RuntimeError(f"Create images failed: {error}") if response.status != 302: @@ -107,54 +142,109 @@ async def create_images(session: ClientSession, prompt: str, proxy: str = None, raise RuntimeError(error) return read_images(text) -def read_images(text: str) -> list: - html_soup = BeautifulSoup(text, "html.parser") - tags = html_soup.find_all("img") - image_links = [img["src"] for img in tags if "mimg" in img["class"]] - images = [link.split("?w=")[0] for link in image_links] - bad_images = [ - "https://r.bing.com/rp/in-2zU3AJUdkgFe7ZKv19yPBHVs.png", - "https://r.bing.com/rp/TX9QuO3WzcCJz1uaaSwQAz39Kb0.jpg", - ] - if any(im in bad_images for im in images): +def read_images(html_content: str) -> List[str]: + """ + Extracts image URLs from the HTML content. + + Args: + html_content (str): HTML content containing image URLs. + + Returns: + List[str]: A list of image URLs. + """ + soup = BeautifulSoup(html_content, "html.parser") + tags = soup.find_all("img", class_="mimg") + images = [img["src"].split("?w=")[0] for img in tags] + if any(im in BAD_IMAGES for im in images): raise RuntimeError("Bad images found") if not images: raise RuntimeError("No images found") return images -async def create_images_markdown(cookies: dict, prompt: str, proxy: str = None) -> str: - session = create_session(cookies) - try: +async def create_images_markdown(cookies: Dict[str, str], prompt: str, proxy: str = None) -> str: + """ + Creates markdown formatted string with images based on the prompt. + + Args: + cookies (Dict[str, str]): Cookies to be used for the session. + prompt (str): Prompt to generate images. + proxy (str, optional): Proxy configuration. + + Returns: + str: Markdown formatted string with images. + """ + async with create_session(cookies) as session: images = await create_images(session, prompt, proxy) return format_images_markdown(images, prompt) - finally: - await session.close() -def get_cookies_from_browser(proxy: str = None) -> dict: - driver = get_browser(proxy=proxy) - try: +def get_cookies_from_browser(proxy: str = None) -> Dict[str, str]: + """ + Retrieves cookies from the browser using webdriver. + + Args: + proxy (str, optional): Proxy configuration. + + Returns: + Dict[str, str]: Retrieved cookies. + """ + with get_browser(proxy=proxy) as driver: wait_for_login(driver) + time.sleep(1) return get_driver_cookies(driver) - finally: - driver.quit() - -def create_completion(prompt: str, cookies: dict = None, proxy: str = None) -> Generator: - loop = get_event_loop() - if not cookies: - cookies = get_cookies(".bing.com") - if "_U" not in cookies: - login_url = os.environ.get("G4F_LOGIN_URL") - if login_url: - yield f"Please login: [Bing]({login_url})\n\n" - cookies = get_cookies_from_browser(proxy) - yield loop.run_until_complete(create_images_markdown(cookies, prompt, proxy)) - -async def create_async(prompt: str, cookies: dict = None, proxy: str = None) -> str: - if not cookies: - cookies = get_cookies(".bing.com") - if "_U" not in cookies: - cookies = get_cookies_from_browser(proxy) - return await create_images_markdown(cookies, prompt, proxy) + +class CreateImagesBing: + """A class for creating images using Bing.""" + + _cookies: Dict[str, str] = {} + + @classmethod + def create_completion(cls, prompt: str, cookies: Dict[str, str] = None, proxy: str = None) -> Generator[str]: + """ + Generator for creating imagecompletion based on a prompt. + + Args: + prompt (str): Prompt to generate images. + cookies (Dict[str, str], optional): Cookies for the session. If None, cookies are retrieved automatically. + proxy (str, optional): Proxy configuration. + + Yields: + Generator[str, None, None]: The final output as markdown formatted string with images. + """ + loop = get_event_loop() + cookies = cookies or cls._cookies or get_cookies(".bing.com") + if "_U" not in cookies: + login_url = os.environ.get("G4F_LOGIN_URL") + if login_url: + yield f"Please login: [Bing]({login_url})\n\n" + cls._cookies = cookies = get_cookies_from_browser(proxy) + yield loop.run_until_complete(create_images_markdown(cookies, prompt, proxy)) + + @classmethod + async def create_async(cls, prompt: str, cookies: Dict[str, str] = None, proxy: str = None) -> str: + """ + Asynchronously creates a markdown formatted string with images based on the prompt. + + Args: + prompt (str): Prompt to generate images. + cookies (Dict[str, str], optional): Cookies for the session. If None, cookies are retrieved automatically. + proxy (str, optional): Proxy configuration. + + Returns: + str: Markdown formatted string with images. + """ + cookies = cookies or cls._cookies or get_cookies(".bing.com") + if "_U" not in cookies: + cls._cookies = cookies = get_cookies_from_browser(proxy) + return await create_images_markdown(cookies, prompt, proxy) def patch_provider(provider: ProviderType) -> CreateImagesProvider: - return CreateImagesProvider(provider, create_completion, create_async)
\ No newline at end of file + """ + Patches a provider to include image creation capabilities. + + Args: + provider (ProviderType): The provider to be patched. + + Returns: + CreateImagesProvider: The patched provider with image creation capabilities. + """ + return CreateImagesProvider(provider, CreateImagesBing.create_completion, CreateImagesBing.create_async)
\ No newline at end of file |