summaryrefslogblamecommitdiffstats
path: root/g4f/Provider/Bing.py
blob: 40e112a9be52955117ecdcb5211ea23e684aeb0f (plain) (tree)









































































































































































































































































































































































                                                                                                                                                              
import asyncio
import json
import os
import random
import ssl
import uuid

import aiohttp
import certifi
import requests

from ..typing import Any, AsyncGenerator, CreateResult, Tuple
from .base_provider import BaseProvider


class Bing(BaseProvider):
    url = "https://bing.com/chat"
    supports_gpt_4 = True

    @staticmethod
    def create_completion(
        model: str,
        messages: list[dict[str, str]],
        stream: bool,
        **kwargs: Any,
    ) -> CreateResult:
        if len(messages) < 2:
            prompt = messages[0]["content"]
            context = False

        else:
            prompt = messages[-1]["content"]
            context = convert(messages[:-1])

        response = run(stream_generate(prompt, jailbreak, context))
        for token in response:
            yield token


def convert(messages: list[dict[str, str]]):
    context = ""

    for message in messages:
        context += "[%s](#message)\n%s\n\n" % (message["role"], message["content"])

    return context


jailbreak = {
    "optionsSets": [
        "saharasugg",
        "enablenewsfc",
        "clgalileo",
        "gencontentv3",
        "nlu_direct_response_filter",
        "deepleo",
        "disable_emoji_spoken_text",
        "responsible_ai_policy_235",
        "enablemm",
        "h3precise"
        # "harmonyv3",
        "dtappid",
        "cricinfo",
        "cricinfov2",
        "dv3sugg",
        "nojbfedge",
    ]
}


ssl_context = ssl.create_default_context()
ssl_context.load_verify_locations(certifi.where())


def _format(msg: dict[str, Any]) -> str:
    return json.dumps(msg, ensure_ascii=False) + Defaults.delimiter


async def stream_generate(
    prompt: str,
    mode: dict[str, list[str]] = jailbreak,
    context: bool | str = False,
):
    timeout = aiohttp.ClientTimeout(total=900)
    session = aiohttp.ClientSession(timeout=timeout)

    conversationId, clientId, conversationSignature = await create_conversation()

    wss = await session.ws_connect(
        "wss://sydney.bing.com/sydney/ChatHub",
        ssl=ssl_context,
        autoping=False,
        headers={
            "accept": "application/json",
            "accept-language": "en-US,en;q=0.9",
            "content-type": "application/json",
            "sec-ch-ua": '"Not_A Brand";v="99", "Microsoft Edge";v="110", "Chromium";v="110"',
            "sec-ch-ua-arch": '"x86"',
            "sec-ch-ua-bitness": '"64"',
            "sec-ch-ua-full-version": '"109.0.1518.78"',
            "sec-ch-ua-full-version-list": '"Chromium";v="110.0.5481.192", "Not A(Brand";v="24.0.0.0", "Microsoft Edge";v="110.0.1587.69"',
            "sec-ch-ua-mobile": "?0",
            "sec-ch-ua-model": "",
            "sec-ch-ua-platform": '"Windows"',
            "sec-ch-ua-platform-version": '"15.0.0"',
            "sec-fetch-dest": "empty",
            "sec-fetch-mode": "cors",
            "sec-fetch-site": "same-origin",
            "x-ms-client-request-id": str(uuid.uuid4()),
            "x-ms-useragent": "azsdk-js-api-client-factory/1.0.0-beta.1 core-rest-pipeline/1.10.0 OS/Win32",
            "Referer": "https://www.bing.com/search?q=Bing+AI&showconv=1&FORM=hpcodx",
            "Referrer-Policy": "origin-when-cross-origin",
            "x-forwarded-for": Defaults.ip_address,
        },
    )

    await wss.send_str(_format({"protocol": "json", "version": 1}))
    await wss.receive(timeout=900)

    argument: dict[str, Any] = {
        **mode,
        "source": "cib",
        "allowedMessageTypes": Defaults.allowedMessageTypes,
        "sliceIds": Defaults.sliceIds,
        "traceId": os.urandom(16).hex(),
        "isStartOfSession": True,
        "message": Defaults.location
        | {
            "author": "user",
            "inputMethod": "Keyboard",
            "text": prompt,
            "messageType": "Chat",
        },
        "conversationSignature": conversationSignature,
        "participant": {"id": clientId},
        "conversationId": conversationId,
    }

    if context:
        argument["previousMessages"] = [
            {
                "author": "user",
                "description": context,
                "contextType": "WebPage",
                "messageType": "Context",
                "messageId": "discover-web--page-ping-mriduna-----",
            }
        ]

    struct: dict[str, list[dict[str, Any]] | str | int] = {
        "arguments": [argument],
        "invocationId": "0",
        "target": "chat",
        "type": 4,
    }

    await wss.send_str(_format(struct))

    final = False
    draw = False
    resp_txt = ""
    result_text = ""
    resp_txt_no_link = ""
    cache_text = ""

    while not final:
        msg = await wss.receive(timeout=900)
        objects = msg.data.split(Defaults.delimiter)  # type: ignore

        for obj in objects:  # type: ignore
            if obj is None or not obj:
                continue

            response = json.loads(obj)  # type: ignore
            if response.get("type") == 1 and response["arguments"][0].get(
                "messages",
            ):
                if not draw:
                    if (
                        response["arguments"][0]["messages"][0]["contentOrigin"]
                        != "Apology"
                    ) and not draw:
                        resp_txt = result_text + response["arguments"][0]["messages"][
                            0
                        ]["adaptiveCards"][0]["body"][0].get("text", "")
                        resp_txt_no_link = result_text + response["arguments"][0][
                            "messages"
                        ][0].get("text", "")

                        if response["arguments"][0]["messages"][0].get(
                            "messageType",
                        ):
                            resp_txt = (
                                resp_txt
                                + response["arguments"][0]["messages"][0][
                                    "adaptiveCards"
                                ][0]["body"][0]["inlines"][0].get("text")
                                + "\n"
                            )
                            result_text = (
                                result_text
                                + response["arguments"][0]["messages"][0][
                                    "adaptiveCards"
                                ][0]["body"][0]["inlines"][0].get("text")
                                + "\n"
                            )

                    if cache_text.endswith("   "):
                        final = True
                        if wss and not wss.closed:
                            await wss.close()
                        if session and not session.closed:
                            await session.close()

                    yield (resp_txt.replace(cache_text, ""))
                    cache_text = resp_txt

            elif response.get("type") == 2:
                if response["item"]["result"].get("error"):
                    if wss and not wss.closed:
                        await wss.close()
                    if session and not session.closed:
                        await session.close()

                    raise Exception(
                        f"{response['item']['result']['value']}: {response['item']['result']['message']}"
                    )

                if draw:
                    cache = response["item"]["messages"][1]["adaptiveCards"][0]["body"][
                        0
                    ]["text"]
                    response["item"]["messages"][1]["adaptiveCards"][0]["body"][0][
                        "text"
                    ] = (cache + resp_txt)

                if (
                    response["item"]["messages"][-1]["contentOrigin"] == "Apology"
                    and resp_txt
                ):
                    response["item"]["messages"][-1]["text"] = resp_txt_no_link
                    response["item"]["messages"][-1]["adaptiveCards"][0]["body"][0][
                        "text"
                    ] = resp_txt

                    # print('Preserved the message from being deleted', file=sys.stderr)

                final = True
                if wss and not wss.closed:
                    await wss.close()
                if session and not session.closed:
                    await session.close()


async def create_conversation() -> Tuple[str, str, str]:
    create = requests.get(
        "https://www.bing.com/turing/conversation/create",
        headers={
            "authority": "edgeservices.bing.com",
            "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
            "accept-language": "en-US,en;q=0.9",
            "cache-control": "max-age=0",
            "sec-ch-ua": '"Chromium";v="110", "Not A(Brand";v="24", "Microsoft Edge";v="110"',
            "sec-ch-ua-arch": '"x86"',
            "sec-ch-ua-bitness": '"64"',
            "sec-ch-ua-full-version": '"110.0.1587.69"',
            "sec-ch-ua-full-version-list": '"Chromium";v="110.0.5481.192", "Not A(Brand";v="24.0.0.0", "Microsoft Edge";v="110.0.1587.69"',
            "sec-ch-ua-mobile": "?0",
            "sec-ch-ua-model": '""',
            "sec-ch-ua-platform": '"Windows"',
            "sec-ch-ua-platform-version": '"15.0.0"',
            "sec-fetch-dest": "document",
            "sec-fetch-mode": "navigate",
            "sec-fetch-site": "none",
            "sec-fetch-user": "?1",
            "upgrade-insecure-requests": "1",
            "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36 Edg/110.0.1587.69",
            "x-edge-shopping-flag": "1",
            "x-forwarded-for": Defaults.ip_address,
        },
    )

    conversationId = create.json().get("conversationId")
    clientId = create.json().get("clientId")
    conversationSignature = create.json().get("conversationSignature")

    if not conversationId or not clientId or not conversationSignature:
        raise Exception("Failed to create conversation.")

    return conversationId, clientId, conversationSignature


class Defaults:
    delimiter = "\x1e"
    ip_address = f"13.{random.randint(104, 107)}.{random.randint(0, 255)}.{random.randint(0, 255)}"

    allowedMessageTypes = [
        "Chat",
        "Disengaged",
        "AdsQuery",
        "SemanticSerp",
        "GenerateContentQuery",
        "SearchQuery",
        "ActionRequest",
        "Context",
        "Progress",
        "AdsQuery",
        "SemanticSerp",
    ]

    sliceIds = [
        # "222dtappid",
        # "225cricinfo",
        # "224locals0"
        "winmuid3tf",
        "osbsdusgreccf",
        "ttstmout",
        "crchatrev",
        "winlongmsgtf",
        "ctrlworkpay",
        "norespwtf",
        "tempcacheread",
        "temptacache",
        "505scss0",
        "508jbcars0",
        "515enbotdets0",
        "5082tsports",
        "515vaoprvs",
        "424dagslnv1s0",
        "kcimgattcf",
        "427startpms0",
    ]

    location = {
        "locale": "en-US",
        "market": "en-US",
        "region": "US",
        "locationHints": [
            {
                "country": "United States",
                "state": "California",
                "city": "Los Angeles",
                "timezoneoffset": 8,
                "countryConfidence": 8,
                "Center": {"Latitude": 34.0536909, "Longitude": -118.242766},
                "RegionType": 2,
                "SourceType": 1,
            }
        ],
    }


def run(generator: AsyncGenerator[Any | str, Any]):
    loop = asyncio.get_event_loop()
    gen = generator.__aiter__()

    while True:
        try:
            yield loop.run_until_complete(gen.__anext__())

        except StopAsyncIteration:
            break