Source code for quackamollie.core.meta.model.meta_quackamollie_model

# -*- coding: utf-8 -*-
__all__ = ["MetaQuackamollieModel"]
__author__ = "QuacktorAI"
__copyright__ = "Copyright 2024, Forge of Absurd Ducks"
__credits__ = ["QuacktorAI"]

from abc import ABCMeta, abstractmethod
from typing import AsyncIterable, List, Optional, Tuple


[docs] class MetaQuackamollieModel(metaclass=ABCMeta): """ Metaclass for models managed by model managers """
[docs] @classmethod @abstractmethod async def astream_answer(cls, content: str, chat_history: List, model_config: Optional[str] = None, **kwargs) -> AsyncIterable[Tuple[str, bool]]: """ Asynchronous iterator to stream the answer from a LLM model :param content: The new message content :type content: str :param chat_history: A list of past messages formatted accordingly by model manager :type chat_history: List :param model_config: Additional configuration given as a string through CLI or Telegram `App Settings` and retrieved from the database :type model_config: Optional[str] :param kwargs: Additional streaming arguments :type kwargs: Dict :return: An asynchronous iterator giving a tuple containing the new chunk and a boolean indicating if the model is done or not :rtype: AsyncIterable[Tuple[str, bool]] """ yield NotImplementedError("Abstract method 'astream_answer' not implemented.")