From 1d3a139a53b09be5ab8111bab38d5ffd2dd31f1e Mon Sep 17 00:00:00 2001 From: hlohaus <983577+hlohaus@users.noreply.github.com> Date: Wed, 26 Feb 2025 11:41:00 +0100 Subject: Add new media selection in UI Add HuggingFace provider provider Auto refresh Google Gemini cookies Add sources to search results --- g4f/tools/web_search.py | 47 ++++++++++++++++++++++++++++++++++------------- 1 file changed, 34 insertions(+), 13 deletions(-) (limited to 'g4f/tools/web_search.py') diff --git a/g4f/tools/web_search.py b/g4f/tools/web_search.py index f8f9b53b..a2f0f905 100644 --- a/g4f/tools/web_search.py +++ b/g4f/tools/web_search.py @@ -24,7 +24,7 @@ except: from typing import Iterator from ..cookies import get_cookies_dir -from ..providers.response import format_link +from ..providers.response import format_link, JsonMixin, Sources from ..errors import MissingRequirementsError from .. import debug @@ -33,11 +33,18 @@ Using the provided web search results, to write a comprehensive reply to the use Make sure to add the sources of cites using [[Number]](Url) notation after the reference. Example: [[0]](http://google.com) """ -class SearchResults(): +class SearchResults(JsonMixin): def __init__(self, results: list, used_words: int): self.results = results self.used_words = used_words + @classmethod + def from_dict(cls, data: dict): + return cls( + [SearchResultEntry(**item) for item in data["results"]], + data["used_words"] + ) + def __iter__(self): yield from self.results @@ -57,7 +64,17 @@ class SearchResults(): def __len__(self) -> int: return len(self.results) -class SearchResultEntry(): + + def get_sources(self) -> Sources: + return Sources([{"url": result.url, "title": result.title} for result in self.results]) + + def get_dict(self): + return { + "results": [result.get_dict() for result in self.results], + "used_words": self.used_words + } + +class SearchResultEntry(JsonMixin): def __init__(self, title: str, url: str, snippet: str, text: str = None): self.title = title self.url = url @@ -191,11 +208,11 @@ async def search(query: str, max_results: int = 5, max_words: int = 2500, backen return SearchResults(formatted_results, used_words) -async def do_search(prompt: str, query: str = None, instructions: str = DEFAULT_INSTRUCTIONS, **kwargs) -> str: +async def do_search(prompt: str, query: str = None, instructions: str = DEFAULT_INSTRUCTIONS, **kwargs) -> tuple[str, Sources]: if instructions and instructions in prompt: - return prompt # We have already added search results + return prompt, None # We have already added search results if prompt.startswith("##") and query is None: - return prompt # We have no search query + return prompt, None # We have no search query if query is None: query = prompt.strip().splitlines()[0] # Use the first line as the search query json_bytes = json.dumps({"query": query, **kwargs}, sort_keys=True).encode(errors="ignore") @@ -203,14 +220,19 @@ async def do_search(prompt: str, query: str = None, instructions: str = DEFAULT_ bucket_dir: Path = Path(get_cookies_dir()) / ".scrape_cache" / f"web_search" / f"{datetime.date.today()}" bucket_dir.mkdir(parents=True, exist_ok=True) cache_file = bucket_dir / f"{quote_plus(query[:20])}.{md5_hash}.cache" + search_results = None if cache_file.exists(): with cache_file.open("r") as f: search_results = f.read() - else: + try: + search_results = SearchResults.from_dict(json.loads(search_results)) + except json.JSONDecodeError: + search_results = None + if search_results is None: search_results = await search(query, **kwargs) if search_results.results: - with cache_file.open("wb") as f: - f.write(str(search_results).encode(errors="replace")) + with cache_file.open("w") as f: + f.write(json.dumps(search_results.get_dict())) if instructions: new_prompt = f""" {search_results} @@ -227,13 +249,12 @@ User request: {prompt} """ debug.log(f"Web search: '{query.strip()[:50]}...'") - if isinstance(search_results, SearchResults): - debug.log(f"with {len(search_results.results)} Results {search_results.used_words} Words") - return new_prompt + debug.log(f"with {len(search_results.results)} Results {search_results.used_words} Words") + return new_prompt, search_results.get_sources() def get_search_message(prompt: str, raise_search_exceptions=False, **kwargs) -> str: try: - return asyncio.run(do_search(prompt, **kwargs)) + return asyncio.run(do_search(prompt, **kwargs))[0] except (DuckDuckGoSearchException, MissingRequirementsError) as e: if raise_search_exceptions: raise e -- cgit v1.2.3