From 0d59789eedf3784cf4c3aaf764785a4ad91723c4 Mon Sep 17 00:00:00 2001 From: Heiner Lohaus Date: Wed, 1 Jan 2025 14:01:33 +0100 Subject: Add File API Documentation for Python and JS Format Bucket Placeholder in GUI --- g4f/tools/files.py | 103 ++++++++++++++++++++++++++++-------------------- g4f/tools/run_tools.py | 18 +++++++-- g4f/tools/web_search.py | 19 ++++++--- 3 files changed, 89 insertions(+), 51 deletions(-) (limited to 'g4f/tools') diff --git a/g4f/tools/files.py b/g4f/tools/files.py index d0d1c23b..b06fe5ea 100644 --- a/g4f/tools/files.py +++ b/g4f/tools/files.py @@ -3,7 +3,7 @@ from __future__ import annotations import os import json from pathlib import Path -from typing import Iterator, Optional +from typing import Iterator, Optional, AsyncIterator from aiohttp import ClientSession, ClientError, ClientResponse, ClientTimeout import urllib.parse import time @@ -74,6 +74,7 @@ except ImportError: from .web_search import scrape_text from ..cookies import get_cookies_dir from ..requests.aiohttp import get_connector +from ..providers.asyncio import to_sync_generator from ..errors import MissingRequirementsError from .. import debug @@ -148,10 +149,12 @@ def spacy_refine_chunks(source_iterator): def get_filenames(bucket_dir: Path): files = bucket_dir / FILE_LIST - with files.open('r') as f: - return [filename.strip() for filename in f.readlines()] + if files.exists(): + with files.open('r') as f: + return [filename.strip() for filename in f.readlines()] + return [] -def stream_read_files(bucket_dir: Path, filenames: list) -> Iterator[str]: +def stream_read_files(bucket_dir: Path, filenames: list, delete_files: bool = False) -> Iterator[str]: for filename in filenames: file_path: Path = bucket_dir / filename if not file_path.exists() and 0 > file_path.lstat().st_size: @@ -161,17 +164,18 @@ def stream_read_files(bucket_dir: Path, filenames: list) -> Iterator[str]: with zipfile.ZipFile(file_path, 'r') as zip_ref: zip_ref.extractall(bucket_dir) try: - yield from stream_read_files(bucket_dir, [f for f in zip_ref.namelist() if supports_filename(f)]) + yield from stream_read_files(bucket_dir, [f for f in zip_ref.namelist() if supports_filename(f)], delete_files) except zipfile.BadZipFile: pass finally: - for unlink in zip_ref.namelist()[::-1]: - filepath = os.path.join(bucket_dir, unlink) - if os.path.exists(filepath): - if os.path.isdir(filepath): - os.rmdir(filepath) - else: - os.unlink(filepath) + if delete_files: + for unlink in zip_ref.namelist()[::-1]: + filepath = os.path.join(bucket_dir, unlink) + if os.path.exists(filepath): + if os.path.isdir(filepath): + os.rmdir(filepath) + else: + os.unlink(filepath) continue yield f"```{filename}\n" if has_pypdf2 and filename.endswith(".pdf"): @@ -320,7 +324,7 @@ def split_file_by_size_and_newline(input_filename, output_dir, chunk_size_bytes= with open(output_filename, 'w', encoding='utf-8') as outfile: outfile.write(current_chunk) -async def get_filename(response: ClientResponse): +async def get_filename(response: ClientResponse) -> str: """ Attempts to extract a filename from an aiohttp response. Prioritizes Content-Disposition, then URL. @@ -347,8 +351,9 @@ async def get_filename(response: ClientResponse): if extension: parsed_url = urllib.parse.urlparse(url) sha256_hash = hashlib.sha256(url.encode()).digest() - base64_encoded = base64.b32encode(sha256_hash).decode().lower() - return f"{parsed_url.netloc} {parsed_url.path[1:].replace('/', '_')} {base64_encoded[:6]}{extension}" + base32_encoded = base64.b32encode(sha256_hash).decode() + url_hash = base32_encoded[:24].lower() + return f"{parsed_url.netloc} {parsed_url.path[1:].replace('/', '_')} {url_hash}{extension}" return None @@ -404,21 +409,22 @@ def read_links(html: str, base: str) -> set[str]: for link in soup.select("a"): if "rel" not in link.attrs or "nofollow" not in link.attrs["rel"]: url = link.attrs.get("href") - if url and url.startswith("https://"): + if url and url.startswith("https://") or url.startswith("/"): urls.append(url.split("#")[0]) return set([urllib.parse.urljoin(base, link) for link in urls]) async def download_urls( bucket_dir: Path, urls: list[str], - max_depth: int = 2, - loaded_urls: set[str] = set(), + max_depth: int = 1, + loading_urls: set[str] = set(), lock: asyncio.Lock = None, delay: int = 3, + new_urls: list[str] = list(), group_size: int = 5, timeout: int = 10, proxy: Optional[str] = None -) -> list[str]: +) -> AsyncIterator[str]: if lock is None: lock = asyncio.Lock() async with ClientSession( @@ -433,30 +439,37 @@ async def download_urls( if not filename: print(f"Failed to get filename for {url}") return None - newfiles = [filename] + if not supports_filename(filename) or filename == DOWNLOADS_FILE: + return None if filename.endswith(".html") and max_depth > 0: - new_urls = read_links(await response.text(), str(response.url)) - async with lock: - new_urls = [new_url for new_url in new_urls if new_url not in loaded_urls] - [loaded_urls.add(url) for url in new_urls] - if new_urls: - for i in range(0, len(new_urls), group_size): - newfiles += await download_urls(bucket_dir, new_urls[i:i + group_size], max_depth - 1, loaded_urls, lock, delay + 1) - await asyncio.sleep(delay) - if supports_filename(filename) and filename != DOWNLOADS_FILE: - target = bucket_dir / filename - with target.open("wb") as f: - async for chunk in response.content.iter_chunked(4096): - f.write(chunk) - return newfiles + add_urls = read_links(await response.text(), str(response.url)) + if add_urls: + async with lock: + add_urls = [add_url for add_url in add_urls if add_url not in loading_urls] + [loading_urls.add(add_url) for add_url in add_urls] + [new_urls.append(add_url) for add_url in add_urls if add_url not in new_urls] + target = bucket_dir / filename + with target.open("wb") as f: + async for chunk in response.content.iter_chunked(4096): + if b'', f'\n'.encode())) + return filename except (ClientError, asyncio.TimeoutError) as e: debug.log(f"Download failed: {e.__class__.__name__}: {e}") return None - files = set() - for results in await asyncio.gather(*[download_url(url) for url in urls]): - if results: - [files.add(url) for url in results] - return files + for filename in await asyncio.gather(*[download_url(url) for url in urls]): + if filename: + yield filename + else: + await asyncio.sleep(delay) + while new_urls: + next_urls = list() + for i in range(0, len(new_urls), group_size): + chunked_urls = new_urls[i:i + group_size] + async for filename in download_urls(bucket_dir, chunked_urls, max_depth - 1, loading_urls, lock, delay + 1, next_urls): + yield filename + await asyncio.sleep(delay) + new_urls = next_urls def get_streaming(bucket_dir: str, delete_files = False, refine_chunks_with_spacy = False, event_stream: bool = False) -> Iterator[str]: bucket_dir = Path(bucket_dir) @@ -473,9 +486,13 @@ def get_streaming(bucket_dir: str, delete_files = False, refine_chunks_with_spac if "url" in item: urls.append(item["url"]) if urls: - filenames = asyncio.run(download_urls(bucket_dir, urls)) + count = 0 with open(os.path.join(bucket_dir, FILE_LIST), 'w') as f: - [f.write(f"{filename}\n") for filename in filenames if filename] + for filename in to_sync_generator(download_urls(bucket_dir, urls)): + f.write(f"{filename}\n") + if event_stream: + count += 1 + yield f'data: {json.dumps({"action": "download", "count": count})}\n\n' if refine_chunks_with_spacy: size = 0 @@ -486,7 +503,7 @@ def get_streaming(bucket_dir: str, delete_files = False, refine_chunks_with_spac else: yield chunk else: - streaming = stream_read_files(bucket_dir, get_filenames(bucket_dir)) + streaming = stream_read_files(bucket_dir, get_filenames(bucket_dir), delete_files) streaming = cache_stream(streaming, bucket_dir) size = 0 for chunk in streaming: @@ -504,7 +521,7 @@ def get_streaming(bucket_dir: str, delete_files = False, refine_chunks_with_spac if event_stream: yield f'data: {json.dumps({"action": "delete_files"})}\n\n' if event_stream: - yield f'data: {json.dumps({"action": "done"})}\n\n' + yield f'data: {json.dumps({"action": "done", "size": size})}\n\n' except Exception as e: if event_stream: yield f'data: {json.dumps({"error": {"message": str(e)}})}\n\n' diff --git a/g4f/tools/run_tools.py b/g4f/tools/run_tools.py index 21e9ec09..b3febfcd 100644 --- a/g4f/tools/run_tools.py +++ b/g4f/tools/run_tools.py @@ -12,6 +12,10 @@ from .web_search import do_search, get_search_message from .files import read_bucket, get_bucket_dir from .. import debug +BUCKET_INSTRUCTIONS = """ +Instruction: Make sure to add the sources of cites using [[domain]](Url) notation after the reference. Example: [[a-z0-9.]](http://example.com) +""" + def validate_arguments(data: dict) -> dict: if "arguments" in data: if isinstance(data["arguments"], str): @@ -36,7 +40,7 @@ async def async_iter_run_tools(async_iter_callback, model, messages, tool_calls: ) elif tool.get("function", {}).get("name") == "continue": last_line = messages[-1]["content"].strip().splitlines()[-1] - content = f"Continue writing the story after this line start with a plus sign if you begin a new word.\n{last_line}" + content = f"Continue after this line.\n{last_line}" messages.append({"role": "user", "content": content}) response = async_iter_callback(model=model, messages=messages, **kwargs) if not hasattr(response, "__aiter__"): @@ -73,7 +77,7 @@ def iter_run_tools( elif tool.get("function", {}).get("name") == "continue_tool": if provider not in ("OpenaiAccount", "HuggingFace"): last_line = messages[-1]["content"].strip().splitlines()[-1] - content = f"continue after this line:\n{last_line}" + content = f"Continue after this line:\n{last_line}" messages.append({"role": "user", "content": content}) else: # Enable provider native continue @@ -82,6 +86,14 @@ def iter_run_tools( elif tool.get("function", {}).get("name") == "bucket_tool": def on_bucket(match): return "".join(read_bucket(get_bucket_dir(match.group(1)))) - messages[-1]["content"] = re.sub(r'{"bucket_id":"([^"]*)"}', on_bucket, messages[-1]["content"]) + has_bucket = False + for message in messages: + if "content" in message and isinstance(message["content"], str): + new_message_content = re.sub(r'{"bucket_id":"([^"]*)"}', on_bucket, message["content"]) + if new_message_content != message["content"]: + has_bucket = True + message["content"] = new_message_content + if has_bucket and isinstance(messages[-1]["content"], str): + messages[-1]["content"] += BUCKET_INSTRUCTIONS print(messages[-1]) return iter_callback(model=model, messages=messages, provider=provider, **kwargs) \ No newline at end of file diff --git a/g4f/tools/web_search.py b/g4f/tools/web_search.py index 9033e0ad..780e45df 100644 --- a/g4f/tools/web_search.py +++ b/g4f/tools/web_search.py @@ -4,7 +4,10 @@ from aiohttp import ClientSession, ClientTimeout, ClientError import json import hashlib from pathlib import Path -from collections import Counter +from urllib.parse import urlparse +import datetime +import asyncio + try: from duckduckgo_search import DDGS from duckduckgo_search.exceptions import DuckDuckGoSearchException @@ -17,13 +20,12 @@ try: has_spacy = True except: has_spacy = False + from typing import Iterator from ..cookies import get_cookies_dir from ..errors import MissingRequirementsError from .. import debug -import asyncio - DEFAULT_INSTRUCTIONS = """ Using the provided web search results, to write a comprehensive reply to the user request. Make sure to add the sources of cites using [[Number]](Url) notation after the reference. Example: [[0]](http://google.com) @@ -64,7 +66,8 @@ class SearchResultEntry(): self.text = text def scrape_text(html: str, max_words: int = None) -> Iterator[str]: - soup = BeautifulSoup(html, "html.parser") + source = BeautifulSoup(html, "html.parser") + soup = source for selector in [ "main", ".main-content-wrapper", @@ -96,12 +99,18 @@ def scrape_text(html: str, max_words: int = None) -> Iterator[str]: break yield " ".join(words) + "\n" + canonical_link = source.find("link", rel="canonical") + if canonical_link and "href" in canonical_link.attrs: + link = canonical_link["href"] + domain = urlparse(link).netloc + yield f"\nSource: [{domain}]({link})" + async def fetch_and_scrape(session: ClientSession, url: str, max_words: int = None) -> str: try: bucket_dir: Path = Path(get_cookies_dir()) / ".scrape_cache" / "fetch_and_scrape" bucket_dir.mkdir(parents=True, exist_ok=True) md5_hash = hashlib.md5(url.encode()).hexdigest() - cache_file = bucket_dir / f"{url.split('/')[3]}.{md5_hash}.txt" + cache_file = bucket_dir / f"{url.split('/')[3]}.{datetime.date.today()}.{md5_hash}.txt" if cache_file.exists(): return cache_file.read_text() async with session.get(url) as response: -- cgit v1.2.3