summaryrefslogblamecommitdiffstats
path: root/g4f/Provider/Copilot.py
blob: f10202bff7a9d764e4578b8a72a77e28a71e1630 (plain) (tree)
1
2
3
4
5
6
7


                                  
              
                                    

                              




                                                      









                                            










                                                             
                     
 
                                                                                              

                                              
                                        






















                                                                          
                                                                                                              
 

                                         
                      
                                                                               














                                                                                            






                                                             
                                                                                          


























                                                                              





































                                                                                                  
from __future__ import annotations

import json
import asyncio
from http.cookiejar import CookieJar
from urllib.parse import quote

try:
    from curl_cffi.requests import Session, CurlWsFlag
    has_curl_cffi = True
except ImportError:
    has_curl_cffi = False
try:
    import nodriver
    has_nodriver = True
except ImportError:
    has_nodriver = False
try:
    from platformdirs import user_config_dir
    has_platformdirs = True
except ImportError:
    has_platformdirs = False

from .base_provider import AbstractProvider, BaseConversation
from .helper import format_prompt
from ..typing import CreateResult, Messages
from ..errors import MissingRequirementsError
from ..requests.raise_for_status import raise_for_status
from .. import debug

class Conversation(BaseConversation):
    conversation_id: str
    cookie_jar: CookieJar
    access_token: str

    def __init__(self, conversation_id: str, cookie_jar: CookieJar, access_token: str = None):
        self.conversation_id = conversation_id
        self.cookie_jar = cookie_jar
        self.access_token = access_token

class Copilot(AbstractProvider):
    label = "Microsoft Copilot"
    url = "https://copilot.microsoft.com"
    working = True
    supports_stream = True

    websocket_url = "wss://copilot.microsoft.com/c/api/chat?api-version=2"
    conversation_url = f"{url}/c/api/conversations"

    @classmethod
    def create_completion(
        cls,
        model: str,
        messages: Messages,
        stream: bool = False,
        proxy: str = None,
        timeout: int = 900,
        conversation: Conversation = None,
        return_conversation: bool = False,
        **kwargs
    ) -> CreateResult:
        if not has_curl_cffi:
            raise MissingRequirementsError('Install or update "curl_cffi" package | pip install -U curl_cffi')

        websocket_url = cls.websocket_url
        access_token = None
        headers = None
        cookies = conversation.cookie_jar if conversation is not None else None
        if cls.needs_auth:
            if conversation is None or conversation.access_token is None:
                access_token, cookies = asyncio.run(cls.get_access_token_and_cookies(proxy))
            else:
                access_token = conversation.access_token
            websocket_url = f"{websocket_url}&acessToken={quote(access_token)}"
            headers = {"Authorization": f"Bearer {access_token}"}
    
        with Session(
            timeout=timeout,
            proxy=proxy,
            impersonate="chrome",
            headers=headers,
            cookies=cookies
        ) as session:
            response = session.get(f"{cls.url}/")
            raise_for_status(response)
            if conversation is None:
                response = session.post(cls.conversation_url)
                raise_for_status(response)
                conversation_id = response.json().get("id")
                if return_conversation:
                    yield Conversation(conversation_id, session.cookies.jar, access_token)
                prompt = format_prompt(messages)
                if debug.logging:
                    print(f"Copilot: Created conversation: {conversation_id}")
            else:
                conversation_id = conversation.conversation_id
                prompt = messages[-1]["content"]
                if debug.logging:
                    print(f"Copilot: Use conversation: {conversation_id}")

            wss = session.ws_connect(cls.websocket_url)
            wss.send(json.dumps({
                "event": "send",
                "conversationId": conversation_id,
                "content": [{
                    "type": "text",
                    "text": prompt,
                }],
                "mode": "chat"
            }).encode(), CurlWsFlag.TEXT)
            while True:
                try:
                    msg = json.loads(wss.recv()[0])
                except:
                    break
                if msg.get("event") == "appendText":
                    yield msg.get("text")
                elif msg.get("event") in ["done", "partCompleted"]:
                    break

    @classmethod
    async def get_access_token_and_cookies(cls, proxy: str = None):
        if not has_nodriver:
            raise MissingRequirementsError('Install "nodriver" package | pip install -U nodriver')
        if has_platformdirs:
            user_data_dir = user_config_dir("g4f-nodriver")
        else:
            user_data_dir = None
        if debug.logging:
            print(f"Copilot: Open nodriver with user_dir: {user_data_dir}")
        browser = await nodriver.start(
            user_data_dir=user_data_dir,
            browser_args=None if proxy is None else [f"--proxy-server={proxy}"],
        )
        page = await browser.get(cls.url)
        while True:
            access_token = await page.evaluate("""
                (() => {
                    for (var i = 0; i < localStorage.length; i++) {
                        try {
                            item = JSON.parse(localStorage.getItem(localStorage.key(i)));
                            if (item.credentialType == "AccessToken") {
                                return item.secret;
                            }
                        } catch(e) {}
                    }
                })()
            """)
            if access_token:
                break
            asyncio.sleep(1)
        cookies = {}
        for c in await page.send(nodriver.cdp.network.get_cookies([cls.url])):
            cookies[c.name] = c.value
        await page.close()
        return access_token, cookies