Source code for quackamollie.core.bot.command.reset
# -*- coding: utf-8 -*-
__all__ = ["reset_router", "command_reset_handler"]
__author__ = "QuacktorAI"
__copyright__ = "Copyright 2024, Forge of Absurd Ducks"
__credits__ = ["QuacktorAI"]
from aiogram import Router
from aiogram.filters.command import Command
from aiogram.types import Message
from sqlalchemy import select
from typing import List, Optional
from quackamollie.core.bot.decorator.permissions import permission_authorized
from quackamollie.core.cli.settings import pass_quackamollie_settings, QuackamollieSettings
from quackamollie.core.bot.decorator.acknowledge_with_reactions import acknowledge_with_reactions
from quackamollie.core.database.model import ChatMessage
reset_router = Router()
[docs]
@reset_router.message(Command("reset"))
@permission_authorized
@acknowledge_with_reactions
@pass_quackamollie_settings
async def command_reset_handler(quackamollie_settings: QuackamollieSettings, message: Message):
""" Handler for the `/reset` command in Telegram chat, wipes context (messages history)
:param quackamollie_settings: The application settings initialized from click context
:type quackamollie_settings: QuackamollieSettings
:param message: The message as given by aiogram router
:type message: Message
"""
# Get database session from settings
async_session = quackamollie_settings.session
# Get chat info from message
chat_id: int = message.chat.id
async with async_session() as session:
chat_setting_result = await session.execute(select(ChatMessage).where(
ChatMessage.chat_id == chat_id
).where(
ChatMessage.active
))
chat_messages: Optional[List[ChatMessage]] = list(chat_setting_result.scalars().all())
if chat_messages:
for msg in chat_messages:
msg.active = False
await session.commit()
await message.answer(text="Chat has been reset")
else:
await message.answer(text="No history in current chat")