summaryrefslogblamecommitdiffstats
path: root/g4f/Provider/Copilot.py
blob: c79f028d74196bd4b9617924115e9bb2420d5a4e (plain) (tree)
1
2
3
4
5
6
7


                                  
              
                                    

                              




                                                      









                                            


                                                             
                                                      

                                                        
                                                




                                     
                     
 
                                                                                              

                                              
                                        





                                         
                             











                                                                          
                                




                                          
                                                                                                              
 

                                         
                      
                                                                               
                                               













                                                                                            






                                                             
                                                                                          
                                                
                                                                              


                                                              
                                                                          
 







                                                                       
                                          

                                                                                  



                                                       
                                      




                                         


                              

                       

                                         




                                                                   
                         
                              
                                                          




                                                                                                  
                                                                                     
                                                                           




                                                                                

                                   











                                                                                         

                                    




                                                                              
from __future__ import annotations

import json
import asyncio
from http.cookiejar import CookieJar
from urllib.parse import quote

try:
    from curl_cffi.requests import Session, CurlWsFlag
    has_curl_cffi = True
except ImportError:
    has_curl_cffi = False
try:
    import nodriver
    has_nodriver = True
except ImportError:
    has_nodriver = False
try:
    from platformdirs import user_config_dir
    has_platformdirs = True
except ImportError:
    has_platformdirs = False

from .base_provider import AbstractProvider, BaseConversation
from .helper import format_prompt
from ..typing import CreateResult, Messages, ImageType
from ..errors import MissingRequirementsError
from ..requests.raise_for_status import raise_for_status
from ..image import to_bytes, is_accepted_format
from .. import debug

class Conversation(BaseConversation):
    conversation_id: str
    cookie_jar: CookieJar
    access_token: str

    def __init__(self, conversation_id: str, cookie_jar: CookieJar, access_token: str = None):
        self.conversation_id = conversation_id
        self.cookie_jar = cookie_jar
        self.access_token = access_token

class Copilot(AbstractProvider):
    label = "Microsoft Copilot"
    url = "https://copilot.microsoft.com"
    working = True
    supports_stream = True
    default_model = "Copilot"

    websocket_url = "wss://copilot.microsoft.com/c/api/chat?api-version=2"
    conversation_url = f"{url}/c/api/conversations"

    @classmethod
    def create_completion(
        cls,
        model: str,
        messages: Messages,
        stream: bool = False,
        proxy: str = None,
        timeout: int = 900,
        image: ImageType = None,
        conversation: Conversation = None,
        return_conversation: bool = False,
        **kwargs
    ) -> CreateResult:
        if not has_curl_cffi:
            raise MissingRequirementsError('Install or update "curl_cffi" package | pip install -U curl_cffi')

        websocket_url = cls.websocket_url
        access_token = None
        headers = None
        cookies = conversation.cookie_jar if conversation is not None else None
        if cls.needs_auth or image is not None:
            if conversation is None or conversation.access_token is None:
                access_token, cookies = asyncio.run(cls.get_access_token_and_cookies(proxy))
            else:
                access_token = conversation.access_token
            websocket_url = f"{websocket_url}&acessToken={quote(access_token)}"
            headers = {"Authorization": f"Bearer {access_token}"}
    
        with Session(
            timeout=timeout,
            proxy=proxy,
            impersonate="chrome",
            headers=headers,
            cookies=cookies
        ) as session:
            response = session.get(f"{cls.url}/")
            raise_for_status(response)
            if conversation is None:
                response = session.post(cls.conversation_url)
                raise_for_status(response)
                conversation_id = response.json().get("id")
                if return_conversation:
                    yield Conversation(conversation_id, session.cookies.jar, access_token)
                prompt = format_prompt(messages)
                debug.log(f"Copilot: Created conversation: {conversation_id}")
            else:
                conversation_id = conversation.conversation_id
                prompt = messages[-1]["content"]
                debug.log(f"Copilot: Use conversation: {conversation_id}")

            images = []
            if image is not None:
                data = to_bytes(image)
                response = session.post(
                    "https://copilot.microsoft.com/c/api/attachments",
                    headers={"content-type": is_accepted_format(data)},
                    data=data
                )
                raise_for_status(response)
                images.append({"type":"image", "url": response.json().get("url")})

            wss = session.ws_connect(cls.websocket_url)
            wss.send(json.dumps({
                "event": "send",
                "conversationId": conversation_id,
                "content": [*images, {
                    "type": "text",
                    "text": prompt,
                }],
                "mode": "chat"
            }).encode(), CurlWsFlag.TEXT)

            is_started = False
            msg = None
            while True:
                try:
                    msg = wss.recv()[0]
                    msg = json.loads(msg)
                except:
                    break
                if msg.get("event") == "appendText":
                    yield msg.get("text")
                elif msg.get("event") in ["done", "partCompleted"]:
                    break
            if not is_started:
                raise RuntimeError(f"Last message: {msg}")

    @classmethod
    async def get_access_token_and_cookies(cls, proxy: str = None):
        if not has_nodriver:
            raise MissingRequirementsError('Install "nodriver" package | pip install -U nodriver')
        user_data_dir = user_config_dir("g4f-nodriver") if has_platformdirs else None
        debug.log(f"Copilot: Open nodriver with user_dir: {user_data_dir}")
        browser = await nodriver.start(
            user_data_dir=user_data_dir,
            browser_args=None if proxy is None else [f"--proxy-server={proxy}"],
        )
        page = await browser.get(cls.url)
        access_token = None
        while access_token is None:
            access_token = await page.evaluate("""
                (() => {
                    for (var i = 0; i < localStorage.length; i++) {
                        try {
                            item = JSON.parse(localStorage.getItem(localStorage.key(i)));
                            if (item.credentialType == "AccessToken") {
                                return item.secret;
                            }
                        } catch(e) {}
                    }
                })()
            """)
            if access_token is None:
                asyncio.sleep(1)
        cookies = {}
        for c in await page.send(nodriver.cdp.network.get_cookies([cls.url])):
            cookies[c.name] = c.value
        await page.close()
        return access_token, cookies