from __future__ import annotations
import json
import aiohttp
from aiohttp import ClientSession
from ..typing import AsyncResult, Messages
from .base_provider import AsyncGeneratorProvider, ProviderModelMixin
from .helper import format_prompt
class DDG(AsyncGeneratorProvider, ProviderModelMixin):
url = "https://duckduckgo.com"
api_endpoint = "https://duckduckgo.com/duckchat/v1/chat"
working = True
supports_gpt_4 = True
supports_stream = True
supports_system_message = True
supports_message_history = True
default_model = "gpt-4o-mini"
models = [
"gpt-4o-mini",
"claude-3-haiku-20240307",
"meta-llama/Meta-Llama-3.1-70B-Instruct-Turbo",
"mistralai/Mixtral-8x7B-Instruct-v0.1"
]
model_aliases = {
"claude-3-haiku": "claude-3-haiku-20240307",
"llama-3.1-70b": "meta-llama/Meta-Llama-3.1-70B-Instruct-Turbo",
"mixtral-8x7b": "mistralai/Mixtral-8x7B-Instruct-v0.1"
}
@classmethod
def get_model(cls, model: str) -> str:
return cls.model_aliases.get(model, model) if model in cls.model_aliases else cls.default_model
@classmethod
async def get_vqd(cls):
status_url = "https://duckduckgo.com/duckchat/v1/status"
headers = {
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36',
'Accept': 'text/event-stream',
'x-vqd-accept': '1'
}
async with aiohttp.ClientSession() as session:
try:
async with session.get(status_url, headers=headers) as response:
if response.status == 200:
return response.headers.get("x-vqd-4")
else:
print(f"Error: Status code {response.status}")
return None
except Exception as e:
print(f"Error getting VQD: {e}")
return None
@classmethod
async def create_async_generator(
cls,
model: str,
messages: Messages,
conversation: dict = None,
proxy: str = None,
**kwargs
) -> AsyncResult:
model = cls.get_model(model)
headers = {
'accept': 'text/event-stream',
'content-type': 'application/json',
'user-agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36',
}
vqd = conversation.get('vqd') if conversation else await cls.get_vqd()
if not vqd:
raise Exception("Failed to obtain VQD token")
headers['x-vqd-4'] = vqd
if conversation:
message_history = conversation.get('messages', [])
message_history.append({"role": "user", "content": format_prompt(messages)})
else:
message_history = [{"role": "user", "content": format_prompt(messages)}]
async with ClientSession(headers=headers) as session:
data = {
"model": model,
"messages": message_history
}
async with session.post(cls.api_endpoint, json=data, proxy=proxy) as response:
response.raise_for_status()
async for line in response.content:
if line:
decoded_line = line.decode('utf-8')
if decoded_line.startswith('data: '):
json_str = decoded_line[6:]
if json_str == '[DONE]':
break
try:
json_data = json.loads(json_str)
if 'message' in json_data:
yield json_data['message']
except json.JSONDecodeError:
pass