diff options
Diffstat (limited to 'g4f/Provider/needs_auth')
-rw-r--r-- | g4f/Provider/needs_auth/Bard.py | 143 |
1 files changed, 72 insertions, 71 deletions
diff --git a/g4f/Provider/needs_auth/Bard.py b/g4f/Provider/needs_auth/Bard.py index 092268f8..6cb40c90 100644 --- a/g4f/Provider/needs_auth/Bard.py +++ b/g4f/Provider/needs_auth/Bard.py @@ -1,90 +1,91 @@ from __future__ import annotations -import json -import random -import re +import time -from aiohttp import ClientSession +from ...typing import CreateResult, Messages +from ..base_provider import BaseProvider +from ..helper import WebDriver, format_prompt, get_browser -from ...typing import Messages -from ..base_provider import AsyncProvider -from ..helper import format_prompt, get_cookies - - -class Bard(AsyncProvider): +class Bard(BaseProvider): url = "https://bard.google.com" - needs_auth = True working = True - _snlm0e = None + needs_auth = True @classmethod - async def create_async( + def create_completion( cls, model: str, messages: Messages, + stream: bool, proxy: str = None, - cookies: dict = None, + browser: WebDriver = None, + hidden_display: bool = True, **kwargs - ) -> str: + ) -> CreateResult: prompt = format_prompt(messages) - if not cookies: - cookies = get_cookies(".google.com") + if browser: + driver = browser + else: + if hidden_display: + driver, display = get_browser(None, True, proxy) + else: + driver = get_browser(None, False, proxy) - headers = { - 'authority': 'bard.google.com', - 'origin': cls.url, - 'referer': f'{cls.url}/', - 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36', - 'x-same-domain': '1', - } - async with ClientSession( - cookies=cookies, - headers=headers - ) as session: - if not cls._snlm0e: - async with session.get(cls.url, proxy=proxy) as response: - text = await response.text() + from selenium.webdriver.common.by import By + from selenium.webdriver.support.ui import WebDriverWait + from selenium.webdriver.support import expected_conditions as EC - if match := re.search(r'SNlM0e\":\"(.*?)\"', text): - cls._snlm0e = match.group(1) + try: + driver.get(f"{cls.url}/chat") + wait = WebDriverWait(driver, 10) + wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, "div.ql-editor.textarea"))) + except: + # Reopen browser for login + if not browser: + driver.quit() + # New browser should be visible + if hidden_display: + display.stop() + driver = get_browser(None, False, proxy) + driver.get(f"{cls.url}/chat") + wait = WebDriverWait(driver, 240) + wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, "div.ql-editor.textarea"))) + else: + raise RuntimeError("Prompt textarea not found. You may not be logged in.") - else: - raise RuntimeError("No snlm0e value.") - params = { - 'bl': 'boq_assistant-bard-web-server_20230326.21_p0', - '_reqid': random.randint(1111, 9999), - 'rt': 'c' - } - - data = { - 'at': cls._snlm0e, - 'f.req': json.dumps([None, json.dumps([[prompt]])]) - } + try: + # Add hook in XMLHttpRequest + script = """ +const _http_request_open = XMLHttpRequest.prototype.open; +window._message = ""; +XMLHttpRequest.prototype.open = function(method, url) { + if (url.includes("/assistant.lamda.BardFrontendService/StreamGenerate")) { + this.addEventListener("load", (event) => { + window._message = JSON.parse(JSON.parse(this.responseText.split("\\n")[3])[0][2])[4][0][1][0]; + }); + } + return _http_request_open.call(this, method, url); +} +""" + driver.execute_script(script) - intents = '.'.join([ - 'assistant', - 'lamda', - 'BardFrontendService' - ]) - async with session.post( - f'{cls.url}/_/BardChatUi/data/{intents}/StreamGenerate', - data=data, - params=params, - proxy=proxy - ) as response: - response = await response.text() - response = json.loads(response.splitlines()[3])[0][2] - response = json.loads(response)[4][0][1][0] - return response + # Input and submit prompt + driver.find_element(By.CSS_SELECTOR, "div.ql-editor.ql-blank.textarea").send_keys(prompt) + driver.find_element(By.CSS_SELECTOR, "button.send-button").click() - @classmethod - @property - def params(cls): - params = [ - ("model", "str"), - ("messages", "list[dict[str, str]]"), - ("stream", "bool"), - ("proxy", "str"), - ] - param = ", ".join([": ".join(p) for p in params]) - return f"g4f.provider.{cls.__name__} supports: ({param})" + # Yield response + script = "return window._message;" + while True: + chunk = driver.execute_script(script) + if chunk: + yield chunk + return + else: + time.sleep(0.1) + finally: + driver.close() + if not browser: + time.sleep(0.1) + driver.quit() + if hidden_display: + display.stop()
\ No newline at end of file |