summaryrefslogblamecommitdiffstats
path: root/g4f/Provider/Bing.py
blob: 726faa2bd0f3cc0a2220cec8ef10b3b54e0abd32 (plain) (tree)
1
2
3
4
5
6
7
8
9
10
                                  
 
             


             



                  
           
                   
                     
                                                       
                                                
                                                 
 



                         
 








                                  
                                   

                                 
                                   
                         


                               
                   

                           

                                   
                          
                
                     


                                           

                                            
                                                   
        
                                                 
                                     
                                                                            
 
                                       
                   
                                                                                                                                              

                               
 
                     
                                                                                                                     


                                                          
                                  
 

                                                                                                                       
                                                               
                                    
 




                                                                                               
                                                             































                                                                                                        
                                                                                     


                                                                                                                                                     
                                                           

                                                                  
















                                                                                                             


                                                                                






                                                             
                                                                                                             




                                                                    
                        
                                   
     
                                                                     

                                                       





                                                                                                   
                        
               
                  
                                
                   
                              


                               



                                                                                                                    
                            
                                


                









                        



















                                                                             






















                                                                                                                                                          
                   














                                     
     
 

                                                                   
 
















                                                                                                           











                                                                               

                         
                                        











                                                                                                            
                                                   



















                                                                                            
                                             






                                     
                                        
        
                                   









                                                                                                  
                                                                                                      





                                                                            
                                     
                                                     
                                     
                                                     
                                     





                                                    
                                                           


                                                                                   
                                                                         


                          
                                                                                                 








                                            
    
                                  


                      
                                
                                            



                                                                    
                                        



                                                


                                            
                  
                                   
                             

                                                              


                                               

             
                            


                         




                                                                                                                                











                                                               
                  
                          


                            

                             



                                             
                                                                             
            

                                                                                                                                                                                       
                                                                                      
                                              
                                                                                       

                                 

                                  





                                                                
 



                                                                                                  











                                                                                           








                                                                                          
                                  
                
                                                                   
from __future__ import annotations

import string
import random
import json
import os
import re
import io
import base64
import numpy as np
import uuid
import urllib.parse
from PIL import Image
from aiohttp        import ClientSession, ClientTimeout
from ..typing       import AsyncResult, Messages
from .base_provider import AsyncGeneratorProvider

class Tones():
    creative = "Creative"
    balanced = "Balanced"
    precise = "Precise"

default_cookies = {
    'SRCHD'         : 'AF=NOFORM',
    'PPLState'      : '1',
    'KievRPSSecAuth': '',
    'SUID'          : '',
    'SRCHUSR'       : '',
    'SRCHHPGUSR'    : '',
}

class Bing(AsyncGeneratorProvider):
    url = "https://bing.com/chat"
    working = True
    supports_message_history = True
    supports_gpt_4 = True
        
    @staticmethod
    def create_async_generator(
        model: str,
        messages: Messages,
        proxy: str = None,
        cookies: dict = None,
        tone: str = Tones.creative,
        image: str = None,
        **kwargs
    ) -> AsyncResult:
        if len(messages) < 2:
            prompt = messages[0]["content"]
            context = None
        else:
            prompt = messages[-1]["content"]
            context = create_context(messages[:-1])
        
        if not cookies or "SRCHD" not in cookies:
            cookies = default_cookies
        return stream_generate(prompt, tone, image, context, proxy, cookies)

def create_context(messages: Messages):
    return "".join(
        f"[{message['role']}]" + ("(#message)" if message['role']!="system" else "(#additional_instructions)") + f"\n{message['content']}\n\n"
        for message in messages
    )

class Conversation():
    def __init__(self, conversationId: str, clientId: str, conversationSignature: str, imageInfo: dict=None) -> None:
        self.conversationId = conversationId
        self.clientId = clientId
        self.conversationSignature = conversationSignature
        self.imageInfo = imageInfo

async def create_conversation(session: ClientSession, tone: str, image: str = None, proxy: str = None) -> Conversation:
    url = 'https://www.bing.com/turing/conversation/create?bundleVersion=1.1199.4'
    async with await session.get(url, proxy=proxy) as response:
        data = await response.json()

        conversationId = data.get('conversationId')
        clientId = data.get('clientId')
        conversationSignature = response.headers.get('X-Sydney-Encryptedconversationsignature')

        if not conversationId or not clientId or not conversationSignature:
            raise Exception('Failed to create conversation.')
        conversation = Conversation(conversationId, clientId, conversationSignature, None)
        if isinstance(image,str):
            try:
                config = {
                    "visualSearch": {
                        "maxImagePixels": 360000,
                        "imageCompressionRate": 0.7,
                        "enableFaceBlurDebug": 0,
                    }
                }
                is_data_uri_an_image(image)
                img_binary_data = extract_data_uri(image)
                is_accepted_format(img_binary_data)
                img = Image.open(io.BytesIO(img_binary_data))
                width, height = img.size
                max_image_pixels = config['visualSearch']['maxImagePixels']
                compression_rate = config['visualSearch']['imageCompressionRate']

                if max_image_pixels / (width * height) < 1:
                    new_width = int(width * np.sqrt(max_image_pixels / (width * height)))
                    new_height = int(height * np.sqrt(max_image_pixels / (width * height)))
                else:
                    new_width = width
                    new_height = height
                try:
                    orientation = get_orientation(img)
                except Exception:
                    orientation = None
                new_img = process_image(orientation, img, new_width, new_height)
                new_img_binary_data = compress_image_to_base64(new_img, compression_rate)
                data, boundary = build_image_upload_api_payload(new_img_binary_data, conversation, tone)
                headers = session.headers.copy()
                headers["content-type"] = f'multipart/form-data; boundary={boundary}'
                headers["referer"] = 'https://www.bing.com/search?q=Bing+AI&showconv=1&FORM=hpcodx'
                headers["origin"] = 'https://www.bing.com'
                async with await session.post("https://www.bing.com/images/kblob", data=data, headers=headers, proxy=proxy) as image_upload_response:
                    if image_upload_response.status != 200:
                        raise Exception("Failed to upload image.")

                    image_info = await image_upload_response.json()
                    if not image_info.get('blobId'):
                        raise Exception("Failed to parse image info.")
                    result = {'bcid': image_info.get('blobId', "")}
                    result['blurredBcid'] = image_info.get('processedBlobId', "")
                    if result['blurredBcid'] != "":
                        result["imageUrl"] = "https://www.bing.com/images/blob?bcid=" + result['blurredBcid']
                    elif result['bcid'] != "":
                        result["imageUrl"] = "https://www.bing.com/images/blob?bcid=" + result['bcid']
                    result['originalImageUrl'] = (
                        "https://www.bing.com/images/blob?bcid="
                        + result['blurredBcid']
                        if config['visualSearch']["enableFaceBlurDebug"]
                        else "https://www.bing.com/images/blob?bcid="
                        + result['bcid']
                    )
                    conversation.imageInfo = result
            except Exception as e:
                print(f"An error happened while trying to send image: {str(e)}")
        return conversation

async def list_conversations(session: ClientSession) -> list:
    url = "https://www.bing.com/turing/conversation/chats"
    async with session.get(url) as response:
        response = await response.json()
        return response["chats"]
        
async def delete_conversation(session: ClientSession, conversation: Conversation, proxy: str = None) -> list:
    url = "https://sydney.bing.com/sydney/DeleteSingleConversation"
    json = {
        "conversationId": conversation.conversationId,
        "conversationSignature": conversation.conversationSignature,
        "participant": {"id": conversation.clientId},
        "source": "cib",
        "optionsSets": ["autosave"]
    }
    async with session.post(url, json=json, proxy=proxy) as response:
        response = await response.json()
        return response["result"]["value"] == "Success"

class Defaults:
    delimiter = "\x1e"
    ip_address = f"13.{random.randint(104, 107)}.{random.randint(0, 255)}.{random.randint(0, 255)}"

    allowedMessageTypes = [
        "ActionRequest",
        "Chat",
        "Context",
        # "Disengaged", unwanted
        "Progress",
        # "AdsQuery", unwanted
        "SemanticSerp",
        "GenerateContentQuery",
        "SearchQuery",
        # The following message types should not be added so that it does not flood with
        # useless messages (such as "Analyzing images" or "Searching the web") while it's retrieving the AI response
        # "InternalSearchQuery",
        # "InternalSearchResult",
        "RenderCardRequest",
        # "RenderContentRequest"
    ]

    sliceIds = [
        'abv2',
        'srdicton',
        'convcssclick',
        'stylewv2',
        'contctxp2tf',
        '802fluxv1pc_a',
        '806log2sphs0',
        '727savemem',
        '277teditgnds0',
        '207hlthgrds0',
    ]

    location = {
        "locale": "en-US",
        "market": "en-US",
        "region": "US",
        "locationHints": [
            {
                "country": "United States",
                "state": "California",
                "city": "Los Angeles",
                "timezoneoffset": 8,
                "countryConfidence": 8,
                "Center": {"Latitude": 34.0536909, "Longitude": -118.242766},
                "RegionType": 2,
                "SourceType": 1,
            }
        ],
    }

    headers = {
        'accept': '*/*',
        'accept-language': 'en-US,en;q=0.9',
        'cache-control': 'max-age=0',
        'sec-ch-ua': '"Chromium";v="110", "Not A(Brand";v="24", "Microsoft Edge";v="110"',
        'sec-ch-ua-arch': '"x86"',
        'sec-ch-ua-bitness': '"64"',
        'sec-ch-ua-full-version': '"110.0.1587.69"',
        'sec-ch-ua-full-version-list': '"Chromium";v="110.0.5481.192", "Not A(Brand";v="24.0.0.0", "Microsoft Edge";v="110.0.1587.69"',
        'sec-ch-ua-mobile': '?0',
        'sec-ch-ua-model': '""',
        'sec-ch-ua-platform': '"Windows"',
        'sec-ch-ua-platform-version': '"15.0.0"',
        'sec-fetch-dest': 'document',
        'sec-fetch-mode': 'navigate',
        'sec-fetch-site': 'none',
        'sec-fetch-user': '?1',
        'upgrade-insecure-requests': '1',
        'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36 Edg/110.0.1587.69',
        'x-edge-shopping-flag': '1',
        'x-forwarded-for': ip_address,
    }

    optionsSets = [
        'nlu_direct_response_filter',
        'deepleo',
        'disable_emoji_spoken_text',
        'responsible_ai_policy_235',
        'enablemm',
        'iyxapbing',
        'iycapbing',
        'gencontentv3',
        'fluxsrtrunc',
        'fluxtrunc',
        'fluxv1',
        'rai278',
        'replaceurl',
        'eredirecturl',
        'nojbfedge'
    ]

def format_message(msg: dict) -> str:
    return json.dumps(msg, ensure_ascii=False) + Defaults.delimiter

def build_image_upload_api_payload(image_bin: str, conversation: Conversation, tone: str):
    payload = {
        'invokedSkills': ["ImageById"],
        'subscriptionId': "Bing.Chat.Multimodal",
        'invokedSkillsRequestData': {
            'enableFaceBlur': True
        },
        'convoData': {
            'convoid': "",
            'convotone': tone
        }
    }
    knowledge_request = {
        'imageInfo': {},
        'knowledgeRequest': payload
    }
    boundary="----WebKitFormBoundary" + ''.join(random.choices(string.ascii_letters + string.digits, k=16))
    data = (
        f'--{boundary}'
        + '\r\nContent-Disposition: form-data; name="knowledgeRequest"\r\n\r\n'
        + json.dumps(knowledge_request, ensure_ascii=False)
        + "\r\n--"
        + boundary
        + '\r\nContent-Disposition: form-data; name="imageBase64"\r\n\r\n'
        + image_bin
        + "\r\n--"
        + boundary
        + "--\r\n"
    )
    return data, boundary

def is_data_uri_an_image(data_uri: str):
    try:
        # Check if the data URI starts with 'data:image' and contains an image format (e.g., jpeg, png, gif)
        if not re.match(r'data:image/(\w+);base64,', data_uri):
            raise ValueError("Invalid data URI image.")
            # Extract the image format from the data URI
        image_format = re.match(r'data:image/(\w+);base64,', data_uri).group(1)
        # Check if the image format is one of the allowed formats (jpg, jpeg, png, gif)
        if image_format.lower() not in ['jpeg', 'jpg', 'png', 'gif']:
            raise ValueError("Invalid image format (from mime file type).")
    except Exception as e:
        raise e

def is_accepted_format(binary_data: bytes) -> bool:
        try:
            check = False
            if binary_data.startswith(b'\xFF\xD8\xFF'):
                check = True  # It's a JPEG image
            elif binary_data.startswith(b'\x89PNG\r\n\x1a\n'):
                check = True  # It's a PNG image
            elif binary_data.startswith(b'GIF87a') or binary_data.startswith(b'GIF89a'):
                check = True  # It's a GIF image
            elif binary_data.startswith(b'\x89JFIF') or binary_data.startswith(b'JFIF\x00'):
                check = True  # It's a JPEG image
            elif binary_data.startswith(b'\xFF\xD8'):
                check = True  # It's a JPEG image
            elif binary_data.startswith(b'RIFF') and binary_data[8:12] == b'WEBP':
                check = True  # It's a WebP image
            # else we raise ValueError
            if not check:
                raise ValueError("Invalid image format (from magic code).")
        except Exception as e:
            raise e
    
def extract_data_uri(data_uri: str) -> bytes:
    try:
        data = data_uri.split(",")[1]
        data = base64.b64decode(data)
        return data
    except Exception as e:
        raise e

def get_orientation(data: bytes) -> int:
    try:
        if data[:2] != b'\xFF\xD8':
            raise Exception('NotJpeg')
        with Image.open(data) as img:
            exif_data = img._getexif()
            if exif_data is not None:
                orientation = exif_data.get(274)  # 274 corresponds to the orientation tag in EXIF
                if orientation is not None:
                    return orientation
    except Exception:
        pass

def process_image(orientation: int, img: Image.Image, new_width: int, new_height: int) -> Image.Image:
    try:
        # Initialize the canvas
        new_img = Image.new("RGB", (new_width, new_height), color="#FFFFFF")
        if orientation:
            if orientation > 4:
                img = img.transpose(Image.FLIP_LEFT_RIGHT)
            if orientation in [3, 4]:
                img = img.transpose(Image.ROTATE_180)
            if orientation in [5, 6]:
                img = img.transpose(Image.ROTATE_270)
            if orientation in [7, 8]:
                img = img.transpose(Image.ROTATE_90)
        new_img.paste(img, (0, 0))
        return new_img
    except Exception as e:
        raise e
    
def compress_image_to_base64(img, compression_rate) -> str:
    try:
        output_buffer = io.BytesIO()
        img.save(output_buffer, format="JPEG", quality=int(compression_rate * 100))
        return base64.b64encode(output_buffer.getvalue()).decode('utf-8')
    except Exception as e:
        raise e

def create_message(conversation: Conversation, prompt: str, tone: str, context: str=None) -> str:
    options_sets = Defaults.optionsSets
    if tone == Tones.creative:
        options_sets.append("h3imaginative")
    elif tone == Tones.precise:
        options_sets.append("h3precise")
    elif tone == Tones.balanced:
        options_sets.append("galileo")
    else:
        options_sets.append("harmonyv3")
    
    request_id = str(uuid.uuid4())
    struct = {
        'arguments': [
            {
                'source': 'cib',
                'optionsSets': options_sets,
                'allowedMessageTypes': Defaults.allowedMessageTypes,
                'sliceIds': Defaults.sliceIds,
                'traceId': os.urandom(16).hex(),
                'isStartOfSession': True,
                'requestId': request_id,
                'message': Defaults.location | {
                    'author': 'user',
                    'inputMethod': 'Keyboard',
                    'text': prompt,
                    'messageType': 'Chat',
                    'requestId': request_id,
                    'messageId': request_id,
                },
                "scenario": "SERP",
                'tone': tone,
                'spokenTextMode': 'None',
                'conversationId': conversation.conversationId,
                'participant': {
                    'id': conversation.clientId
                },
            }
        ],
        'invocationId': '1',
        'target': 'chat',
        'type': 4
    }
    if conversation.imageInfo != None and "imageUrl" in conversation.imageInfo and "originalImageUrl" in conversation.imageInfo:
        struct['arguments'][0]['message']['originalImageUrl'] = conversation.imageInfo['originalImageUrl']
        struct['arguments'][0]['message']['imageUrl'] = conversation.imageInfo['imageUrl']
        struct['arguments'][0]['experienceType'] = None
        struct['arguments'][0]['attachedFileInfo'] = {"fileName": None, "fileType": None}
    if context:
        struct['arguments'][0]['previousMessages'] = [{
            "author": "user",
            "description": context,
            "contextType": "WebPage",
            "messageType": "Context",
            "messageId": "discover-web--page-ping-mriduna-----"
        }]
    return format_message(struct)

async def stream_generate(
        prompt: str,
        tone: str,
        image: str = None,
        context: str = None,
        proxy: str = None,
        cookies: dict = None
    ):
    async with ClientSession(
            timeout=ClientTimeout(total=900),
            cookies=cookies,
            headers=Defaults.headers,
        ) as session:
        conversation = await create_conversation(session, tone, image, proxy)
        try:
            async with session.ws_connect('wss://sydney.bing.com/sydney/ChatHub', autoping=False, params={'sec_access_token': conversation.conversationSignature}, proxy=proxy) as wss:

                await wss.send_str(format_message({'protocol': 'json', 'version': 1}))
                await wss.receive(timeout=900)
                await wss.send_str(create_message(conversation, prompt, tone, context))

                response_txt = ''
                returned_text = ''
                final = False
                while not final:
                    msg = await wss.receive(timeout=900)
                    objects = msg.data.split(Defaults.delimiter)
                    for obj in objects:
                        if obj is None or not obj:
                            continue

                        response = json.loads(obj)
                        if response.get('type') == 1 and response['arguments'][0].get('messages'):
                            message = response['arguments'][0]['messages'][0]
                            if (message['contentOrigin'] != 'Apology'):
                                if 'adaptiveCards' in message:
                                    card = message['adaptiveCards'][0]['body'][0]
                                    if "text" in card:
                                        response_txt = card.get('text')
                                    if message.get('messageType'):
                                        inline_txt = card['inlines'][0].get('text')
                                        response_txt += inline_txt + '\n'
                                elif message.get('contentType') == "IMAGE":
                                    query = urllib.parse.quote(message.get('text'))
                                    url = f"\nhttps://www.bing.com/images/create?q={query}"
                                    response_txt += url
                                    final = True
                            if response_txt.startswith(returned_text):
                                new = response_txt[len(returned_text):]
                                if new != "\n":
                                    yield new
                                    returned_text = response_txt
                        elif response.get('type') == 2:
                            result = response['item']['result']
                            if result.get('error'):
                                raise Exception(f"{result['value']}: {result['message']}")
                            return
        finally:
            await delete_conversation(session, conversation, proxy)