From 0d59789eedf3784cf4c3aaf764785a4ad91723c4 Mon Sep 17 00:00:00 2001 From: Heiner Lohaus Date: Wed, 1 Jan 2025 14:01:33 +0100 Subject: Add File API Documentation for Python and JS Format Bucket Placeholder in GUI --- g4f/tools/files.py | 103 +++++++++++++++++++++++++++++++---------------------- 1 file changed, 60 insertions(+), 43 deletions(-) (limited to 'g4f/tools/files.py') diff --git a/g4f/tools/files.py b/g4f/tools/files.py index d0d1c23b..b06fe5ea 100644 --- a/g4f/tools/files.py +++ b/g4f/tools/files.py @@ -3,7 +3,7 @@ from __future__ import annotations import os import json from pathlib import Path -from typing import Iterator, Optional +from typing import Iterator, Optional, AsyncIterator from aiohttp import ClientSession, ClientError, ClientResponse, ClientTimeout import urllib.parse import time @@ -74,6 +74,7 @@ except ImportError: from .web_search import scrape_text from ..cookies import get_cookies_dir from ..requests.aiohttp import get_connector +from ..providers.asyncio import to_sync_generator from ..errors import MissingRequirementsError from .. import debug @@ -148,10 +149,12 @@ def spacy_refine_chunks(source_iterator): def get_filenames(bucket_dir: Path): files = bucket_dir / FILE_LIST - with files.open('r') as f: - return [filename.strip() for filename in f.readlines()] + if files.exists(): + with files.open('r') as f: + return [filename.strip() for filename in f.readlines()] + return [] -def stream_read_files(bucket_dir: Path, filenames: list) -> Iterator[str]: +def stream_read_files(bucket_dir: Path, filenames: list, delete_files: bool = False) -> Iterator[str]: for filename in filenames: file_path: Path = bucket_dir / filename if not file_path.exists() and 0 > file_path.lstat().st_size: @@ -161,17 +164,18 @@ def stream_read_files(bucket_dir: Path, filenames: list) -> Iterator[str]: with zipfile.ZipFile(file_path, 'r') as zip_ref: zip_ref.extractall(bucket_dir) try: - yield from stream_read_files(bucket_dir, [f for f in zip_ref.namelist() if supports_filename(f)]) + yield from stream_read_files(bucket_dir, [f for f in zip_ref.namelist() if supports_filename(f)], delete_files) except zipfile.BadZipFile: pass finally: - for unlink in zip_ref.namelist()[::-1]: - filepath = os.path.join(bucket_dir, unlink) - if os.path.exists(filepath): - if os.path.isdir(filepath): - os.rmdir(filepath) - else: - os.unlink(filepath) + if delete_files: + for unlink in zip_ref.namelist()[::-1]: + filepath = os.path.join(bucket_dir, unlink) + if os.path.exists(filepath): + if os.path.isdir(filepath): + os.rmdir(filepath) + else: + os.unlink(filepath) continue yield f"```{filename}\n" if has_pypdf2 and filename.endswith(".pdf"): @@ -320,7 +324,7 @@ def split_file_by_size_and_newline(input_filename, output_dir, chunk_size_bytes= with open(output_filename, 'w', encoding='utf-8') as outfile: outfile.write(current_chunk) -async def get_filename(response: ClientResponse): +async def get_filename(response: ClientResponse) -> str: """ Attempts to extract a filename from an aiohttp response. Prioritizes Content-Disposition, then URL. @@ -347,8 +351,9 @@ async def get_filename(response: ClientResponse): if extension: parsed_url = urllib.parse.urlparse(url) sha256_hash = hashlib.sha256(url.encode()).digest() - base64_encoded = base64.b32encode(sha256_hash).decode().lower() - return f"{parsed_url.netloc} {parsed_url.path[1:].replace('/', '_')} {base64_encoded[:6]}{extension}" + base32_encoded = base64.b32encode(sha256_hash).decode() + url_hash = base32_encoded[:24].lower() + return f"{parsed_url.netloc} {parsed_url.path[1:].replace('/', '_')} {url_hash}{extension}" return None @@ -404,21 +409,22 @@ def read_links(html: str, base: str) -> set[str]: for link in soup.select("a"): if "rel" not in link.attrs or "nofollow" not in link.attrs["rel"]: url = link.attrs.get("href") - if url and url.startswith("https://"): + if url and url.startswith("https://") or url.startswith("/"): urls.append(url.split("#")[0]) return set([urllib.parse.urljoin(base, link) for link in urls]) async def download_urls( bucket_dir: Path, urls: list[str], - max_depth: int = 2, - loaded_urls: set[str] = set(), + max_depth: int = 1, + loading_urls: set[str] = set(), lock: asyncio.Lock = None, delay: int = 3, + new_urls: list[str] = list(), group_size: int = 5, timeout: int = 10, proxy: Optional[str] = None -) -> list[str]: +) -> AsyncIterator[str]: if lock is None: lock = asyncio.Lock() async with ClientSession( @@ -433,30 +439,37 @@ async def download_urls( if not filename: print(f"Failed to get filename for {url}") return None - newfiles = [filename] + if not supports_filename(filename) or filename == DOWNLOADS_FILE: + return None if filename.endswith(".html") and max_depth > 0: - new_urls = read_links(await response.text(), str(response.url)) - async with lock: - new_urls = [new_url for new_url in new_urls if new_url not in loaded_urls] - [loaded_urls.add(url) for url in new_urls] - if new_urls: - for i in range(0, len(new_urls), group_size): - newfiles += await download_urls(bucket_dir, new_urls[i:i + group_size], max_depth - 1, loaded_urls, lock, delay + 1) - await asyncio.sleep(delay) - if supports_filename(filename) and filename != DOWNLOADS_FILE: - target = bucket_dir / filename - with target.open("wb") as f: - async for chunk in response.content.iter_chunked(4096): - f.write(chunk) - return newfiles + add_urls = read_links(await response.text(), str(response.url)) + if add_urls: + async with lock: + add_urls = [add_url for add_url in add_urls if add_url not in loading_urls] + [loading_urls.add(add_url) for add_url in add_urls] + [new_urls.append(add_url) for add_url in add_urls if add_url not in new_urls] + target = bucket_dir / filename + with target.open("wb") as f: + async for chunk in response.content.iter_chunked(4096): + if b'', f'\n'.encode())) + return filename except (ClientError, asyncio.TimeoutError) as e: debug.log(f"Download failed: {e.__class__.__name__}: {e}") return None - files = set() - for results in await asyncio.gather(*[download_url(url) for url in urls]): - if results: - [files.add(url) for url in results] - return files + for filename in await asyncio.gather(*[download_url(url) for url in urls]): + if filename: + yield filename + else: + await asyncio.sleep(delay) + while new_urls: + next_urls = list() + for i in range(0, len(new_urls), group_size): + chunked_urls = new_urls[i:i + group_size] + async for filename in download_urls(bucket_dir, chunked_urls, max_depth - 1, loading_urls, lock, delay + 1, next_urls): + yield filename + await asyncio.sleep(delay) + new_urls = next_urls def get_streaming(bucket_dir: str, delete_files = False, refine_chunks_with_spacy = False, event_stream: bool = False) -> Iterator[str]: bucket_dir = Path(bucket_dir) @@ -473,9 +486,13 @@ def get_streaming(bucket_dir: str, delete_files = False, refine_chunks_with_spac if "url" in item: urls.append(item["url"]) if urls: - filenames = asyncio.run(download_urls(bucket_dir, urls)) + count = 0 with open(os.path.join(bucket_dir, FILE_LIST), 'w') as f: - [f.write(f"{filename}\n") for filename in filenames if filename] + for filename in to_sync_generator(download_urls(bucket_dir, urls)): + f.write(f"{filename}\n") + if event_stream: + count += 1 + yield f'data: {json.dumps({"action": "download", "count": count})}\n\n' if refine_chunks_with_spacy: size = 0 @@ -486,7 +503,7 @@ def get_streaming(bucket_dir: str, delete_files = False, refine_chunks_with_spac else: yield chunk else: - streaming = stream_read_files(bucket_dir, get_filenames(bucket_dir)) + streaming = stream_read_files(bucket_dir, get_filenames(bucket_dir), delete_files) streaming = cache_stream(streaming, bucket_dir) size = 0 for chunk in streaming: @@ -504,7 +521,7 @@ def get_streaming(bucket_dir: str, delete_files = False, refine_chunks_with_spac if event_stream: yield f'data: {json.dumps({"action": "delete_files"})}\n\n' if event_stream: - yield f'data: {json.dumps({"action": "done"})}\n\n' + yield f'data: {json.dumps({"action": "done", "size": size})}\n\n' except Exception as e: if event_stream: yield f'data: {json.dumps({"error": {"message": str(e)}})}\n\n' -- cgit v1.2.3