From 46398e8ac643c90e68d04be77a63379aef5df6ec Mon Sep 17 00:00:00 2001 From: abc <98614666+xtekky@users.noreply.github.com> Date: Mon, 16 Oct 2023 00:46:58 +0100 Subject: ~ | new `test_providers.py` --- etc/testing/test_providers.py | 33 ++++++++++++++++++++ etc/testing/test_providers.v1.py | 66 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 99 insertions(+) create mode 100644 etc/testing/test_providers.v1.py diff --git a/etc/testing/test_providers.py b/etc/testing/test_providers.py index ec0e0271..4ba3a5b4 100644 --- a/etc/testing/test_providers.py +++ b/etc/testing/test_providers.py @@ -1,3 +1,36 @@ +# from g4f.Provider import __all__, ProviderUtils +# from g4f import ChatCompletion +# import concurrent.futures + +# _ = [ +# 'BaseProvider', +# 'AsyncProvider', +# 'AsyncGeneratorProvider', +# 'RetryProvider' +# ] + +# def test_provider(provider): +# try: +# provider = (ProviderUtils.convert[provider]) +# if provider.working and not provider.needs_auth: +# print('testing', provider.__name__) +# completion = ChatCompletion.create(model='gpt-3.5-turbo', +# messages=[{"role": "user", "content": "hello"}], provider=provider) +# return completion, provider.__name__ +# except Exception as e: +# #print(f'Failed to test provider: {provider} | {e}') +# return None + +# with concurrent.futures.ThreadPoolExecutor() as executor: +# futures = [] +# for provider in __all__: +# if provider not in _: +# futures.append(executor.submit(test_provider, provider)) +# for future in concurrent.futures.as_completed(futures): +# result = future.result() +# if result: +# print(f'{result[1]} | {result[0]}') + import sys from pathlib import Path from colorama import Fore, Style diff --git a/etc/testing/test_providers.v1.py b/etc/testing/test_providers.v1.py new file mode 100644 index 00000000..ec0e0271 --- /dev/null +++ b/etc/testing/test_providers.v1.py @@ -0,0 +1,66 @@ +import sys +from pathlib import Path +from colorama import Fore, Style + +sys.path.append(str(Path(__file__).parent.parent)) + +from g4f import BaseProvider, models, Provider + +logging = False + + +def main(): + providers = get_providers() + failed_providers = [] + + for _provider in providers: + if _provider.needs_auth: + continue + print("Provider:", _provider.__name__) + result = test(_provider) + print("Result:", result) + if _provider.working and not result: + failed_providers.append(_provider) + + print() + + if failed_providers: + print(f"{Fore.RED + Style.BRIGHT}Failed providers:{Style.RESET_ALL}") + for _provider in failed_providers: + print(f"{Fore.RED}{_provider.__name__}") + else: + print(f"{Fore.GREEN + Style.BRIGHT}All providers are working") + + +def get_providers() -> list[type[BaseProvider]]: + providers = dir(Provider) + providers = [getattr(Provider, provider) for provider in providers if provider != "RetryProvider"] + providers = [provider for provider in providers if isinstance(provider, type)] + return [provider for provider in providers if issubclass(provider, BaseProvider)] + + +def create_response(_provider: type[BaseProvider]) -> str: + model = models.gpt_35_turbo.name if _provider.supports_gpt_35_turbo else models.default.name + response = _provider.create_completion( + model=model, + messages=[{"role": "user", "content": "Hello, who are you? Answer in detail much as possible."}], + stream=False, + ) + return "".join(response) + + +def test(_provider: type[BaseProvider]) -> bool: + try: + response = create_response(_provider) + assert type(response) is str + assert len(response) > 0 + return response + except Exception as e: + if logging: + print(e) + return False + + +if __name__ == "__main__": + main() + \ No newline at end of file -- cgit v1.2.3