summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--g4f/Provider/needs_auth/Gemini.py195
1 files changed, 100 insertions, 95 deletions
diff --git a/g4f/Provider/needs_auth/Gemini.py b/g4f/Provider/needs_auth/Gemini.py
index bd30d8b2..0621cacd 100644
--- a/g4f/Provider/needs_auth/Gemini.py
+++ b/g4f/Provider/needs_auth/Gemini.py
@@ -5,7 +5,9 @@ import json
import random
import re
-from aiohttp import ClientSession
+from aiohttp import ClientSession, BaseConnector
+
+from ..helper import get_connector
try:
from selenium.webdriver.common.by import By
@@ -58,93 +60,99 @@ class Gemini(AsyncGeneratorProvider):
messages: Messages,
proxy: str = None,
cookies: Cookies = None,
+ connector: BaseConnector = None,
image: ImageType = None,
image_name: str = None,
**kwargs
) -> AsyncResult:
prompt = format_prompt(messages)
cookies = cookies if cookies else get_cookies(".google.com", False, True)
- snlm0e = await cls.fetch_snlm0e(cookies, proxy) if cookies else None
- if not snlm0e:
- driver = None
- try:
- driver = get_browser(proxy=proxy)
- try:
- driver.get(f"{cls.url}/app")
- WebDriverWait(driver, 5).until(
- EC.visibility_of_element_located((By.CSS_SELECTOR, "div.ql-editor.textarea"))
- )
- except:
- login_url = os.environ.get("G4F_LOGIN_URL")
- if login_url:
- yield f"Please login: [Google Gemini]({login_url})\n\n"
- WebDriverWait(driver, 240).until(
- EC.visibility_of_element_located((By.CSS_SELECTOR, "div.ql-editor.textarea"))
- )
- cookies = get_driver_cookies(driver)
- except MissingRequirementsError:
- pass
- finally:
- if driver:
- driver.close()
-
- if not snlm0e:
- if "__Secure-1PSID" not in cookies:
- raise MissingAuthError('Missing "__Secure-1PSID" cookie')
- snlm0e = await cls.fetch_snlm0e(cookies, proxy)
- if not snlm0e:
- raise RuntimeError("Invalid auth. SNlM0e not found")
-
- image_url = await cls.upload_image(to_bytes(image), image_name, proxy) if image else None
-
+ base_connector = get_connector(connector, proxy)
async with ClientSession(
- cookies=cookies,
- headers=REQUEST_HEADERS
+ headers=REQUEST_HEADERS,
+ connector=base_connector
) as session:
- params = {
- 'bl': REQUEST_BL_PARAM,
- '_reqid': random.randint(1111, 9999),
- 'rt': 'c'
- }
- data = {
- 'at': snlm0e,
- 'f.req': json.dumps([None, json.dumps(cls.build_request(
- prompt,
- image_url=image_url,
- image_name=image_name
- ))])
- }
- async with session.post(
- REQUEST_URL,
- data=data,
- params=params,
- proxy=proxy
- ) as response:
- response = await response.text()
- response_part = json.loads(json.loads(response.splitlines()[-5])[0][2])
- if response_part[4] is None:
- response_part = json.loads(json.loads(response.splitlines()[-7])[0][2])
-
- content = response_part[4][0][1][0]
- image_prompt = None
- match = re.search(r'\[Imagen of (.*?)\]', content)
- if match:
- image_prompt = match.group(1)
- content = content.replace(match.group(0), '')
-
- yield content
- if image_prompt:
- images = [image[0][3][3] for image in response_part[4][0][12][7][0]]
- resolved_images = []
- preview = []
- for image in images:
- async with session.get(image, allow_redirects=False) as fetch:
- image = fetch.headers["location"]
- async with session.get(image, allow_redirects=False) as fetch:
- image = fetch.headers["location"]
- resolved_images.append(image)
- preview.append(image.replace('=s512', '=s200'))
- yield ImageResponse(resolved_images, image_prompt, {"orginal_links": images, "preview": preview})
+ snlm0e = await cls.fetch_snlm0e(session, cookies) if cookies else None
+ if not snlm0e:
+ driver = None
+ try:
+ driver = get_browser(proxy=proxy)
+ try:
+ driver.get(f"{cls.url}/app")
+ WebDriverWait(driver, 5).until(
+ EC.visibility_of_element_located((By.CSS_SELECTOR, "div.ql-editor.textarea"))
+ )
+ except:
+ login_url = os.environ.get("G4F_LOGIN_URL")
+ if login_url:
+ yield f"Please login: [Google Gemini]({login_url})\n\n"
+ WebDriverWait(driver, 240).until(
+ EC.visibility_of_element_located((By.CSS_SELECTOR, "div.ql-editor.textarea"))
+ )
+ cookies = get_driver_cookies(driver)
+ except MissingRequirementsError:
+ pass
+ finally:
+ if driver:
+ driver.close()
+
+ if not snlm0e:
+ if "__Secure-1PSID" not in cookies:
+ raise MissingAuthError('Missing "__Secure-1PSID" cookie')
+ snlm0e = await cls.fetch_snlm0e(session, cookies)
+ if not snlm0e:
+ raise RuntimeError("Invalid auth. SNlM0e not found")
+
+ image_url = await cls.upload_image(base_connector, to_bytes(image), image_name) if image else None
+
+ async with ClientSession(
+ cookies=cookies,
+ headers=REQUEST_HEADERS,
+ connector=base_connector,
+ ) as client:
+ params = {
+ 'bl': REQUEST_BL_PARAM,
+ '_reqid': random.randint(1111, 9999),
+ 'rt': 'c'
+ }
+ data = {
+ 'at': snlm0e,
+ 'f.req': json.dumps([None, json.dumps(cls.build_request(
+ prompt,
+ image_url=image_url,
+ image_name=image_name
+ ))])
+ }
+ async with client.post(
+ REQUEST_URL,
+ data=data,
+ params=params,
+ ) as response:
+ response = await response.text()
+ response_part = json.loads(json.loads(response.splitlines()[-5])[0][2])
+ if response_part[4] is None:
+ response_part = json.loads(json.loads(response.splitlines()[-7])[0][2])
+
+ content = response_part[4][0][1][0]
+ image_prompt = None
+ match = re.search(r'\[Imagen of (.*?)\]', content)
+ if match:
+ image_prompt = match.group(1)
+ content = content.replace(match.group(0), '')
+
+ yield content
+ if image_prompt:
+ images = [image[0][3][3] for image in response_part[4][0][12][7][0]]
+ resolved_images = []
+ preview = []
+ for image in images:
+ async with client.get(image, allow_redirects=False) as fetch:
+ image = fetch.headers["location"]
+ async with client.get(image, allow_redirects=False) as fetch:
+ image = fetch.headers["location"]
+ resolved_images.append(image)
+ preview.append(image.replace('=s512', '=s200'))
+ yield ImageResponse(resolved_images, image_prompt, {"orginal_links": images, "preview": preview})
def build_request(
prompt: str,
@@ -171,11 +179,12 @@ class Gemini(AsyncGeneratorProvider):
0,
]
- async def upload_image(image: bytes, image_name: str = None, proxy: str = None):
+ async def upload_image(connector: BaseConnector, image: bytes, image_name: str = None):
async with ClientSession(
- headers=UPLOAD_IMAGE_HEADERS
+ headers=UPLOAD_IMAGE_HEADERS,
+ connector=connector
) as session:
- async with session.options(UPLOAD_IMAGE_URL, proxy=proxy) as reponse:
+ async with session.options(UPLOAD_IMAGE_URL) as reponse:
reponse.raise_for_status()
headers = {
@@ -184,7 +193,7 @@ class Gemini(AsyncGeneratorProvider):
}
data = f"File name: {image_name}" if image_name else None
async with session.post(
- UPLOAD_IMAGE_URL, headers=headers, data=data, proxy=proxy
+ UPLOAD_IMAGE_URL, headers=headers, data=data
) as response:
response.raise_for_status()
upload_url = response.headers["X-Goog-Upload-Url"]
@@ -195,19 +204,15 @@ class Gemini(AsyncGeneratorProvider):
headers["x-goog-upload-command"] = "upload, finalize"
headers["X-Goog-Upload-Offset"] = "0"
async with session.post(
- upload_url, headers=headers, data=image, proxy=proxy
+ upload_url, headers=headers, data=image
) as response:
response.raise_for_status()
return await response.text()
@classmethod
- async def fetch_snlm0e(cls, cookies: Cookies, proxy: str = None):
- async with ClientSession(
- cookies=cookies,
- headers=REQUEST_HEADERS
- ) as session:
- async with session.get(cls.url, proxy=proxy) as response:
- text = await response.text()
- match = re.search(r'SNlM0e\":\"(.*?)\"', text)
- if match:
- return match.group(1) \ No newline at end of file
+ async def fetch_snlm0e(cls, session: ClientSession, cookies: Cookies):
+ async with session.get(cls.url, cookies=cookies) as response:
+ text = await response.text()
+ match = re.search(r'SNlM0e\":\"(.*?)\"', text)
+ if match:
+ return match.group(1) \ No newline at end of file