diff options
Diffstat (limited to '')
-rw-r--r-- | g4f/Provider/needs_auth/Gemini.py | 205 |
1 files changed, 205 insertions, 0 deletions
diff --git a/g4f/Provider/needs_auth/Gemini.py b/g4f/Provider/needs_auth/Gemini.py new file mode 100644 index 00000000..402fc02f --- /dev/null +++ b/g4f/Provider/needs_auth/Gemini.py @@ -0,0 +1,205 @@ +from __future__ import annotations + +import os +import json +import random +import re + +from aiohttp import ClientSession + +try: + from selenium.webdriver.common.by import By + from selenium.webdriver.support.ui import WebDriverWait + from selenium.webdriver.support import expected_conditions as EC +except ImportError: + pass + +from ...typing import Messages, Cookies, ImageType, AsyncResult +from ..base_provider import AsyncGeneratorProvider +from ..helper import format_prompt, get_cookies +from ...errors import MissingAuthError, MissingRequirementsError +from ...image import to_bytes, ImageResponse +from ...webdriver import get_browser, get_driver_cookies + +REQUEST_HEADERS = { + "authority": "gemini.google.com", + "origin": "https://gemini.google.com", + "referer": "https://gemini.google.com/", + 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36', + 'x-same-domain': '1', +} +REQUEST_BL_PARAM = "boq_assistant-bard-web-server_20240201.08_p8" +REQUEST_URL = "https://gemini.google.com/_/BardChatUi/data/assistant.lamda.BardFrontendService/StreamGenerate" +UPLOAD_IMAGE_URL = "https://content-push.googleapis.com/upload/" +UPLOAD_IMAGE_HEADERS = { + "authority": "content-push.googleapis.com", + "accept": "*/*", + "accept-language": "en-US,en;q=0.7", + "authorization": "Basic c2F2ZXM6cyNMdGhlNmxzd2F2b0RsN3J1d1U=", + "content-type": "application/x-www-form-urlencoded;charset=UTF-8", + "origin": "https://gemini.google.com", + "push-id": "feeds/mcudyrk2a4khkz", + "referer": "https://gemini.google.com/", + "x-goog-upload-command": "start", + "x-goog-upload-header-content-length": "", + "x-goog-upload-protocol": "resumable", + "x-tenant-id": "bard-storage", +} + +class Gemini(AsyncGeneratorProvider): + url = "https://gemini.google.com" + needs_auth = True + working = True + supports_stream = False + + @classmethod + async def create_async_generator( + cls, + model: str, + messages: Messages, + proxy: str = None, + cookies: Cookies = None, + image: ImageType = None, + image_name: str = None, + **kwargs + ) -> AsyncResult: + prompt = format_prompt(messages) + + if not cookies: + driver = None + try: + driver = get_browser(proxy=proxy) + try: + driver.get(f"{cls.url}/app") + WebDriverWait(driver, 5).until( + EC.visibility_of_element_located((By.CSS_SELECTOR, "div.ql-editor.textarea")) + ) + except: + login_url = os.environ.get("G4F_LOGIN_URL") + if login_url: + yield f"Please login: [Google Gemini]({login_url})\n\n" + WebDriverWait(driver, 240).until( + EC.visibility_of_element_located((By.CSS_SELECTOR, "div.ql-editor.textarea")) + ) + cookies = get_driver_cookies(driver) + except MissingRequirementsError: + pass + finally: + if driver: + driver.close() + + if not cookies: + cookies = get_cookies(".google.com", False) + if "__Secure-1PSID" not in cookies: + raise MissingAuthError('Missing "__Secure-1PSID" cookie') + + image_url = await cls.upload_image(to_bytes(image), image_name, proxy) if image else None + + async with ClientSession( + cookies=cookies, + headers=REQUEST_HEADERS + ) as session: + async with session.get(cls.url, proxy=proxy) as response: + text = await response.text() + match = re.search(r'SNlM0e\":\"(.*?)\"', text) + if match: + snlm0e = match.group(1) + else: + raise RuntimeError("SNlM0e not found") + + params = { + 'bl': REQUEST_BL_PARAM, + '_reqid': random.randint(1111, 9999), + 'rt': 'c' + } + data = { + 'at': snlm0e, + 'f.req': json.dumps([None, json.dumps(cls.build_request( + prompt, + image_url=image_url, + image_name=image_name + ))]) + } + async with session.post( + REQUEST_URL, + data=data, + params=params, + proxy=proxy + ) as response: + response = await response.text() + response_part = json.loads(json.loads(response.splitlines()[-5])[0][2]) + if response_part[4] is None: + response_part = json.loads(json.loads(response.splitlines()[-7])[0][2]) + + content = response_part[4][0][1][0] + image_prompt = None + match = re.search(r'\[Imagen of (.*?)\]', content) + if match: + image_prompt = match.group(1) + content = content.replace(match.group(0), '') + + yield content + if image_prompt: + images = [image[0][3][3] for image in response_part[4][0][12][7][0]] + resolved_images = [] + for image in images: + async with session.get(image, allow_redirects=False) as fetch: + image = fetch.headers["location"] + async with session.get(image, allow_redirects=False) as fetch: + image = fetch.headers["location"] + resolved_images.append(image) + yield ImageResponse(resolved_images, image_prompt, {"orginal_links": images}) + + def build_request( + prompt: str, + conversation_id: str = "", + response_id: str = "", + choice_id: str = "", + image_url: str = None, + image_name: str = None, + tools: list[list[str]] = [] + ) -> list: + image_list = [[[image_url, 1], image_name]] if image_url else [] + return [ + [prompt, 0, None, image_list, None, None, 0], + ["en"], + [conversation_id, response_id, choice_id, None, None, []], + None, + None, + None, + [1], + 0, + [], + tools, + 1, + 0, + ] + + async def upload_image(image: bytes, image_name: str = None, proxy: str = None): + async with ClientSession( + headers=UPLOAD_IMAGE_HEADERS + ) as session: + async with session.options(UPLOAD_IMAGE_URL, proxy=proxy) as reponse: + reponse.raise_for_status() + + headers = { + "size": str(len(image)), + "x-goog-upload-command": "start" + } + data = f"File name: {image_name}" if image_name else None + async with session.post( + UPLOAD_IMAGE_URL, headers=headers, data=data, proxy=proxy + ) as response: + response.raise_for_status() + upload_url = response.headers["X-Goog-Upload-Url"] + + async with session.options(upload_url, headers=headers) as response: + response.raise_for_status() + + headers["x-goog-upload-command"] = "upload, finalize" + headers["X-Goog-Upload-Offset"] = "0" + async with session.post( + upload_url, headers=headers, data=image, proxy=proxy + ) as response: + response.raise_for_status() + return await response.text()
\ No newline at end of file |