import re import urllib.parse import json from curl_cffi import requests from ..typing import Any, CreateResult from .base_provider import BaseProvider class You(BaseProvider): url = "https://you.com" working = True supports_gpt_35_turbo = True @staticmethod def create_completion( model: str, messages: list[dict[str, str]], stream: bool, **kwargs: Any, ) -> CreateResult: url_param = _create_url_param(messages, kwargs.get("history", [])) headers = _create_header() url = f"https://you.com/api/streamingSearch?{url_param}" response = requests.get( url, headers=headers, impersonate="chrome107", ) response.raise_for_status() start = 'data: {"youChatToken": ' for line in response.content.splitlines(): line = line.decode('utf-8') if line.startswith(start): yield json.loads(line[len(start): -1]) def _create_url_param(messages: list[dict[str, str]], history: list[dict[str, str]]): prompt = "" for message in messages: prompt += "%s: %s\n" % (message["role"], message["content"]) prompt += "assistant:" chat = _convert_chat(history) param = {"q": prompt, "domain": "youchat", "chat": chat} return urllib.parse.urlencode(param) def _convert_chat(messages: list[dict[str, str]]): message_iter = iter(messages) return [ {"question": user["content"], "answer": assistant["content"]} for user, assistant in zip(message_iter, message_iter) ] def _create_header(): return { "accept": "text/event-stream", "referer": "https://you.com/search?fromSearchBar=true&tbm=youchat", }