diff options
Diffstat (limited to 'g4f/Provider/helper.py')
-rw-r--r-- | g4f/Provider/helper.py | 68 |
1 files changed, 27 insertions, 41 deletions
diff --git a/g4f/Provider/helper.py b/g4f/Provider/helper.py index cf204e39..0af61d8d 100644 --- a/g4f/Provider/helper.py +++ b/g4f/Provider/helper.py @@ -1,57 +1,37 @@ from __future__ import annotations -import asyncio import os import random import secrets import string -from asyncio import AbstractEventLoop, BaseEventLoop from aiohttp import BaseConnector -from platformdirs import user_config_dir -from browser_cookie3 import ( - chrome, chromium, opera, opera_gx, - brave, edge, vivaldi, firefox, - _LinuxPasswordManager, BrowserCookieError -) + +try: + from platformdirs import user_config_dir + has_platformdirs = True +except ImportError: + has_platformdirs = False +try: + from browser_cookie3 import ( + chrome, chromium, opera, opera_gx, + brave, edge, vivaldi, firefox, + _LinuxPasswordManager, BrowserCookieError + ) + has_browser_cookie3 = True +except ImportError: + has_browser_cookie3 = False + from ..typing import Dict, Messages, Optional -from ..errors import AiohttpSocksError +from ..errors import AiohttpSocksError, MissingRequirementsError from .. import debug # Global variable to store cookies _cookies: Dict[str, Dict[str, str]] = {} -def get_event_loop() -> AbstractEventLoop: - """ - Get the current asyncio event loop. If the loop is closed or not set, create a new event loop. - If a loop is running, handle nested event loops. Patch the loop if 'nest_asyncio' is installed. - - Returns: - AbstractEventLoop: The current or new event loop. - """ - try: - loop = asyncio.get_event_loop() - if isinstance(loop, BaseEventLoop): - loop._check_closed() - except RuntimeError: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - try: - asyncio.get_running_loop() - if not hasattr(loop.__class__, "_nest_patched"): - import nest_asyncio - nest_asyncio.apply(loop) - except RuntimeError: - pass - except ImportError: - raise RuntimeError( - 'Use "create_async" instead of "create" function in a running event loop. Or install "nest_asyncio" package.' - ) - return loop - -if os.environ.get('DBUS_SESSION_BUS_ADDRESS') == "/dev/null": +if has_browser_cookie3 and os.environ.get('DBUS_SESSION_BUS_ADDRESS') == "/dev/null": _LinuxPasswordManager.get_password = lambda a, b: b"secret" -def get_cookies(domain_name: str = '') -> Dict[str, str]: +def get_cookies(domain_name: str = '', raise_requirements_error: bool = True) -> Dict[str, str]: """ Load cookies for a given domain from all supported browsers and cache the results. @@ -64,11 +44,11 @@ def get_cookies(domain_name: str = '') -> Dict[str, str]: if domain_name in _cookies: return _cookies[domain_name] - cookies = _load_cookies_from_browsers(domain_name) + cookies = load_cookies_from_browsers(domain_name, raise_requirements_error) _cookies[domain_name] = cookies return cookies -def _load_cookies_from_browsers(domain_name: str) -> Dict[str, str]: +def load_cookies_from_browsers(domain_name: str, raise_requirements_error: bool = True) -> Dict[str, str]: """ Helper function to load cookies from various browsers. @@ -78,6 +58,10 @@ def _load_cookies_from_browsers(domain_name: str) -> Dict[str, str]: Returns: Dict[str, str]: A dictionary of cookie names and values. """ + if not has_browser_cookie3: + if raise_requirements_error: + raise MissingRequirementsError('Install "browser_cookie3" package') + return {} cookies = {} for cookie_fn in [_g4f, chrome, chromium, opera, opera_gx, brave, edge, vivaldi, firefox]: try: @@ -104,6 +88,8 @@ def _g4f(domain_name: str) -> list: Returns: list: List of cookies. """ + if not has_platformdirs: + return [] user_data_dir = user_config_dir("g4f") cookie_file = os.path.join(user_data_dir, "Default", "Cookies") return [] if not os.path.exists(cookie_file) else chrome(cookie_file, domain_name) |