Маршрутизатор

Повикмлення

from litegram import Router
from litegram.types import Message

my_router = Router(name=__name__)

@my_router.message()
async def message_handler(message: Message) -> Any:
    await message.answer('Hello from my router!')
class litegram.dispatcher.router.Router(*, name: str | None = None)[исходный код]

Базовые классы: object

Маршрутизатор может маршрутизировать события, а также вложенные типы обновлений, такие как сообщения, запросы обратного вызова, опросы и все другие типы событий.

Обработчики событий могут быть зарегистрированы в обсервере двумя способами:

  • С помощью метода обсервера - router.<event_type>.register(handler, <filters, ...>)

  • С помощью декоратора - @router.<event_type>(<filters, ...>)

__init__(*, name: str | None = None) None[исходный код]
Параметры:

name – Дополнительное имя маршрутизатора, может быть полезно для отладки

include_router(router: Router) Router[исходный код]

Подключение маршрутизатора.

Параметры:

router

Результат:

include_routers(*routers: Router) None[исходный код]

Подключение маршрутизаторов.

Параметры:

routers

Результат:

resolve_used_update_types(skip_events: set[str] | None = None) list[str][исходный код]

Resolve registered event names

Is useful for getting updates only for registered event types.

Параметры:

skip_events – skip specified event names

Результат:

set of registered names

Обсерверы событий

Предупреждение

Все обработчики всегда должны быть асинхронными. Имя функции обработки не имеет значения. Название аргумента события также не важно, но рекомендуется не перекрывать название данными контекста, так как функция не может принимать два аргумента с одинаковым названием.

Вот список доступных обсерверов и примеры того, как зарегистрировать обработчики

В этих примерах используются только обработчики регистрации в стиле декоратора, но если вам не нравятся @декораторы, просто используйте метод <event type>.register(...).

Сообщение

Внимание

Будьте внимательны при фильтрации этого события

Вам следует ожидать, что это событие может иметь разные наборы атрибутов в разных случаях

(Например, текст, стикер и документ всегда имеют разные типы содержимого)

Рекомендуемый способ проверить наличие полей перед использованием, например, с помощью magic filter: F.text для обработки текста, F.sticker для обработки только стикеров и т. д.

@router.message()
async def message_handler(message: types.Message) -> Any: pass

Отредактированное сообщение

@router.edited_message()
async def edited_message_handler(edited_message: types.Message) -> Any: pass

Пост в канале

@router.channel_post()
async def channel_post_handler(channel_post: types.Message) -> Any: pass

Отредактированный пост в канале

@router.edited_channel_post()
async def edited_channel_post_handler(edited_channel_post: types.Message) -> Any: pass

Inline запрос

@router.inline_query()
async def inline_query_handler(inline_query: types.InlineQuery) -> Any: pass

Выбранный результат inline запроса

@router.chosen_inline_result()
async def chosen_inline_result_handler(chosen_inline_result: types.ChosenInlineResult) -> Any: pass

Запрос обратного вызова (callback query)

@router.callback_query()
async def callback_query_handler(callback_query: types.CallbackQuery) -> Any: pass

Запрос доставки (shipping query)

@router.shipping_query()
async def shipping_query_handler(shipping_query: types.ShippingQuery) -> Any: pass

Запрос перед оформлением заказа (pre checkout query)

@router.pre_checkout_query()
async def pre_checkout_query_handler(pre_checkout_query: types.PreCheckoutQuery) -> Any: pass

Опрос

@router.poll()
async def poll_handler(poll: types.Poll) -> Any: pass

Ответ в опросе

@router.poll_answer()
async def poll_answer_handler(poll_answer: types.PollAnswer) -> Any: pass

My chat member

@router.my_chat_member()
async def my_chat_member_handler(my_chat_member: types.ChatMemberUpdated) -> Any: pass

Chat member

@router.chat_member()
async def chat_member_handler(chat_member: types.ChatMemberUpdated) -> Any: pass

Chat join request

@router.chat_join_request()
async def chat_join_request_handler(chat_join_request: types.ChatJoinRequest) -> Any: pass

Повикмлення

@router.message_reaction()
async def message_reaction_handler(message_reaction: types.MessageReactionUpdated) -> Any: pass

Message reaction count

@router.message_reaction_count()
async def message_reaction_count_handler(message_reaction_count: types.MessageReactionCountUpdated) -> Any: pass

Пост на канали

@router.chat_boost()
async def chat_boost_handler(chat_boost: types.ChatBoostUpdated) -> Any: pass

Remove chat boost

@router.removed_chat_boost()
async def removed_chat_boost_handler(removed_chat_boost: types.ChatBoostRemoved) -> Any: pass

Ошибки

@router.errors()
async def error_handler(exception: types.ErrorEvent) -> Any: pass

Полезно для обработки ошибок других обработчиков

Вложенные маршрутизаторы

Предупреждение

Кстати, маршрутизаторы могут быть вложены в другие маршрутизаторы с некоторыми ограничениями:

1. Router CAN NOT include itself 1. Routers CAN NOT be used for circular including (router 1 include router 2, router 2 include router 3, router 3 include router 1)

Пример:

module_2.py
router2 = Router()

@router2.message()
...
module_2.py
from module_2 import router2


router1 = Router()
router1.include_router(router2)

Обновление

@dispatcher.update()
async def message_handler(update: types.Update) -> Any: pass

Предупреждение

The only root Router (Dispatcher) can handle this type of event.

Примечание

Dispatcher already has default handler for this event type, so you can use it for handling all updates that are not handled by any other handlers.

Как это работает?

Например, у диспетчера есть 2 маршрутизатора, при этом у последнего маршрутизатора также есть один вложенный маршрутизатор:

Пример вложенных маршрутизаторов

В этом случае поток распространения обновления будет иметь вид:

Пример вложенных маршрутизаторов