summaryrefslogblamecommitdiffstats
path: root/g4f/requests.py
blob: 1a0c612c6172b3697a4713355288e6d656838c03 (plain) (tree)













































































                                                                                 
from __future__ import annotations

import json, sys
from aiohttp import StreamReader
from aiohttp.base_protocol import BaseProtocol

from curl_cffi.requests import AsyncSession
from curl_cffi.requests.cookies import Request
from curl_cffi.requests.cookies import Response


class StreamResponse:
    def __init__(self, inner: Response, content: StreamReader, request: Request):
        self.inner = inner
        self.content = content
        self.request = request
        self.status_code = inner.status_code
        self.reason = inner.reason
        self.ok = inner.ok

    async def text(self) -> str:
        content = await self.content.read()
        return content.decode()

    def raise_for_status(self):
        if not self.ok:
            raise RuntimeError(f"HTTP Error {self.status_code}: {self.reason}")

    async def json(self, **kwargs):
        return json.loads(await self.content.read(), **kwargs)


class StreamRequest:
    def __init__(self, session: AsyncSession, method: str, url: str, **kwargs):
        self.session = session
        self.loop = session.loop
        self.content = StreamReader(
            BaseProtocol(session.loop),
            sys.maxsize,
            loop=session.loop
        )
        self.method = method
        self.url = url
        self.options = kwargs

    def on_content(self, data):
        if not self.enter.done():
            self.enter.set_result(None)
        self.content.feed_data(data)

    def on_done(self, task):
        self.content.feed_eof()

    async def __aenter__(self) -> StreamResponse:
        self.curl = await self.session.pop_curl()
        self.enter = self.session.loop.create_future()
        request, _, header_buffer = self.session._set_curl_options(
            self.curl,
            self.method,
            self.url,
            content_callback=self.on_content,
            **self.options
        )
        handle = self.session.acurl.add_handle(self.curl)
        self.handle = self.session.loop.create_task(handle)
        self.handle.add_done_callback(self.on_done)
        await self.enter
        return StreamResponse(
            self.session._parse_response(self.curl, request, _, header_buffer),
            self.content,
            request
        )
            
    async def __aexit__(self, exc_type, exc, tb):
        await self.handle
        self.curl.clean_after_perform()
        self.curl.reset()
        self.session.push_curl(self.curl)