summaryrefslogblamecommitdiffstats
path: root/g4f/Provider/bing/create_images.py
blob: 7a08ddfe7349ea796b26f4d3e473334f279f803c (plain) (tree)
1
2
3
4
5
6
7
8
9
                                  
 
              

           
                                                
                              
                             
 





                                 
                                  
                                                              
                                                                   

                                 
                    











                                                                  
































                                                                            
                                                                                                                 








                                                                     

















                                                                                                                                                                
                                                                             
                                                                                    
 
                                                                                                                 














                                                                

                                                                          
                                      

                                                                             
                                                                                                   
                                   
                                              

                                                                                                  
                            



                                                                                 
                                                                                         
                                      
                                                                                    










                                                                                               
                                                                    

                                                        
                                                                                    
                                        
                                                                













                                                    











                                                               

                                                       

                                                         


                                              
                
from __future__ import annotations

import asyncio
import time
import json
from aiohttp import ClientSession, BaseConnector
from urllib.parse import quote
from typing import List, Dict

try:
    from bs4 import BeautifulSoup
    has_requirements = True
except ImportError:
    has_requirements = False

from ..helper import get_connector
from ...errors import MissingRequirementsError, RateLimitError
from ...webdriver import WebDriver, get_driver_cookies, get_browser

BING_URL = "https://www.bing.com"
TIMEOUT_LOGIN = 1200
TIMEOUT_IMAGE_CREATION = 300
ERRORS = [
    "this prompt is being reviewed",
    "this prompt has been blocked",
    "we're working hard to offer image creator in more languages",
    "we can't create your images right now"
]
BAD_IMAGES = [
    "https://r.bing.com/rp/in-2zU3AJUdkgFe7ZKv19yPBHVs.png",
    "https://r.bing.com/rp/TX9QuO3WzcCJz1uaaSwQAz39Kb0.jpg",
]

def wait_for_login(driver: WebDriver, timeout: int = TIMEOUT_LOGIN) -> None:
    """
    Waits for the user to log in within a given timeout period.

    Args:
        driver (WebDriver): Webdriver for browser automation.
        timeout (int): Maximum waiting time in seconds.

    Raises:
        RuntimeError: If the login process exceeds the timeout.
    """
    driver.get(f"{BING_URL}/")
    start_time = time.time()
    while not driver.get_cookie("_U"):
        if time.time() - start_time > timeout:
            raise RuntimeError("Timeout error")
        time.sleep(0.5)

def get_cookies_from_browser(proxy: str = None) -> dict[str, str]:
    """
    Retrieves cookies from the browser using webdriver.

    Args:
        proxy (str, optional): Proxy configuration.

    Returns:
        dict[str, str]: Retrieved cookies.
    """
    with get_browser(proxy=proxy) as driver:
        wait_for_login(driver)
        time.sleep(1)
        return get_driver_cookies(driver)

def create_session(cookies: Dict[str, str], proxy: str = None, connector: BaseConnector = None) -> ClientSession:
    """
    Creates a new client session with specified cookies and headers.

    Args:
        cookies (Dict[str, str]): Cookies to be used for the session.

    Returns:
        ClientSession: The created client session.
    """
    headers = {
        "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
        "accept-encoding": "gzip, deflate, br",
        "accept-language": "en-US,en;q=0.9,zh-CN;q=0.8,zh-TW;q=0.7,zh;q=0.6",
        "content-type": "application/x-www-form-urlencoded",
        "referrer-policy": "origin-when-cross-origin",
        "referrer": "https://www.bing.com/images/create/",
        "origin": "https://www.bing.com",
        "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36 Edg/111.0.1661.54",
        "sec-ch-ua": "\"Microsoft Edge\";v=\"111\", \"Not(A:Brand\";v=\"8\", \"Chromium\";v=\"111\"",
        "sec-ch-ua-mobile": "?0",
        "sec-fetch-dest": "document",
        "sec-fetch-mode": "navigate",
        "sec-fetch-site": "same-origin",
        "sec-fetch-user": "?1",
        "upgrade-insecure-requests": "1",
    }
    if cookies:
        headers["Cookie"] = "; ".join(f"{k}={v}" for k, v in cookies.items())
    return ClientSession(headers=headers, connector=get_connector(connector, proxy))

async def create_images(session: ClientSession, prompt: str, timeout: int = TIMEOUT_IMAGE_CREATION) -> List[str]:
    """
    Creates images based on a given prompt using Bing's service.

    Args:
        session (ClientSession): Active client session.
        prompt (str): Prompt to generate images.
        proxy (str, optional): Proxy configuration.
        timeout (int): Timeout for the request.

    Returns:
        List[str]: A list of URLs to the created images.

    Raises:
        RuntimeError: If image creation fails or times out.
    """
    if not has_requirements:
        raise MissingRequirementsError('Install "beautifulsoup4" package')
    url_encoded_prompt = quote(prompt)
    payload = f"q={url_encoded_prompt}&rt=4&FORM=GENCRE"
    url = f"{BING_URL}/images/create?q={url_encoded_prompt}&rt=4&FORM=GENCRE"
    async with session.post(url, allow_redirects=False, data=payload, timeout=timeout) as response:
        response.raise_for_status()
        text = (await response.text()).lower()
        if "0 coins available" in text:
            raise RateLimitError("No coins left. Log in with a different account or wait a while")
        for error in ERRORS:
            if error in text:
                raise RuntimeError(f"Create images failed: {error}")
    if response.status != 302:
        url = f"{BING_URL}/images/create?q={url_encoded_prompt}&rt=3&FORM=GENCRE"
        async with session.post(url, allow_redirects=False, timeout=timeout) as response:
            if response.status != 302:
                raise RuntimeError(f"Create images failed. Code: {response.status}")

    redirect_url = response.headers["Location"].replace("&nfy=1", "")
    redirect_url = f"{BING_URL}{redirect_url}"
    request_id = redirect_url.split("id=")[1]
    async with session.get(redirect_url) as response:
        response.raise_for_status()

    polling_url = f"{BING_URL}/images/create/async/results/{request_id}?q={url_encoded_prompt}"
    start_time = time.time()
    while True:
        if time.time() - start_time > timeout:
            raise RuntimeError(f"Timeout error after {timeout} sec")
        async with session.get(polling_url) as response:
            if response.status != 200:
                raise RuntimeError(f"Polling images faild. Code: {response.status}")
            text = await response.text()
            if not text or "GenerativeImagesStatusPage" in text:
                await asyncio.sleep(1)
            else:
                break
    error = None
    try:
        error = json.loads(text).get("errorMessage")
    except:
        pass
    if error == "Pending":
        raise RuntimeError("Prompt is been blocked")
    elif error:
        raise RuntimeError(error)
    return read_images(text)

def read_images(html_content: str) -> List[str]:
    """
    Extracts image URLs from the HTML content.

    Args:
        html_content (str): HTML content containing image URLs.

    Returns:
        List[str]: A list of image URLs.
    """
    soup = BeautifulSoup(html_content, "html.parser")
    tags = soup.find_all("img", class_="mimg")
    if not tags:
        tags = soup.find_all("img", class_="gir_mmimg")
    images = [img["src"].split("?w=")[0] for img in tags]
    if any(im in BAD_IMAGES for im in images):
        raise RuntimeError("Bad images found")
    if not images:
        raise RuntimeError("No images found")
    return images