import ast import logging from fastapi import FastAPI, Response, Request from fastapi.responses import StreamingResponse from typing import List, Union, Any, Dict, AnyStr from ._tokenizer import tokenize from .. import BaseProvider import time import json import random import string import uvicorn import nest_asyncio import g4f class Api: def __init__(self, engine: g4f, debug: bool = True, sentry: bool = False, list_ignored_providers: List[Union[str, BaseProvider]] = None) -> None: self.engine = engine self.debug = debug self.sentry = sentry self.list_ignored_providers = list_ignored_providers self.app = FastAPI() nest_asyncio.apply() JSONObject = Dict[AnyStr, Any] JSONArray = List[Any] JSONStructure = Union[JSONArray, JSONObject] @self.app.get("/") async def read_root(): return Response(content=json.dumps({"info": "g4f API"}, indent=4), media_type="application/json") @self.app.get("/v1") async def read_root_v1(): return Response(content=json.dumps({"info": "Go to /v1/chat/completions or /v1/models."}, indent=4), media_type="application/json") @self.app.get("/v1/models") async def models(): model_list = [{ 'id': model, 'object': 'model', 'created': 0, 'owned_by': 'g4f'} for model in g4f.Model.__all__()] return Response(content=json.dumps({ 'object': 'list', 'data': model_list}, indent=4), media_type="application/json") @self.app.get("/v1/models/{model_name}") async def model_info(model_name: str): try: model_info = (g4f.ModelUtils.convert[model_name]) return Response(content=json.dumps({ 'id': model_name, 'object': 'model', 'created': 0, 'owned_by': model_info.base_provider }, indent=4), media_type="application/json") except: return Response(content=json.dumps({"error": "The model does not exist."}, indent=4), media_type="application/json") @self.app.post("/v1/chat/completions") async def chat_completions(request: Request, item: JSONStructure = None): item_data = { 'model': 'gpt-3.5-turbo', 'stream': False, } # item contains byte keys, and dict.get suppresses error item_data.update({key.decode('utf-8') if isinstance(key, bytes) else key: str(value) for key, value in (item or {}).items()}) # messages is str, need dict if isinstance(item_data.get('messages'), str): item_data['messages'] = ast.literal_eval(item_data.get('messages')) model = item_data.get('model') stream = item_data.get('stream') messages = item_data.get('messages') try: response = g4f.ChatCompletion.create( model=model, stream=stream, messages=messages, list_ignored_providers=self.list_ignored_providers) except Exception as e: logging.exception(e) return Response(content=json.dumps({"error": "An error occurred while generating the response."}, indent=4), media_type="application/json") completion_id = ''.join(random.choices(string.ascii_letters + string.digits, k=28)) completion_timestamp = int(time.time()) if not stream: prompt_tokens, _ = tokenize(''.join([message['content'] for message in messages])) completion_tokens, _ = tokenize(response) json_data = { 'id': f'chatcmpl-{completion_id}', 'object': 'chat.completion', 'created': completion_timestamp, 'model': model, 'choices': [ { 'index': 0, 'message': { 'role': 'assistant', 'content': response, }, 'finish_reason': 'stop', } ], 'usage': { 'prompt_tokens': prompt_tokens, 'completion_tokens': completion_tokens, 'total_tokens': prompt_tokens + completion_tokens, }, } return Response(content=json.dumps(json_data, indent=4), media_type="application/json") def streaming(): try: for chunk in response: completion_data = { 'id': f'chatcmpl-{completion_id}', 'object': 'chat.completion.chunk', 'created': completion_timestamp, 'model': model, 'choices': [ { 'index': 0, 'delta': { 'content': chunk, }, 'finish_reason': None, } ], } content = json.dumps(completion_data, separators=(',', ':')) yield f'data: {content}\n\n' time.sleep(0.03) end_completion_data = { 'id': f'chatcmpl-{completion_id}', 'object': 'chat.completion.chunk', 'created': completion_timestamp, 'model': model, 'choices': [ { 'index': 0, 'delta': {}, 'finish_reason': 'stop', } ], } content = json.dumps(end_completion_data, separators=(',', ':')) yield f'data: {content}\n\n' except GeneratorExit: pass return StreamingResponse(streaming(), media_type="text/event-stream") @self.app.post("/v1/completions") async def completions(): return Response(content=json.dumps({'info': 'Not working yet.'}, indent=4), media_type="application/json") def run(self, ip): split_ip = ip.split(":") uvicorn.run(app=self.app, host=split_ip[0], port=int(split_ip[1]), use_colors=False)