Перейти к содержанию

Команды

command

command

command(
    name=None,
    *,
    aliases=None,
    prefixes=None,
    filter=None,
    description=None,
    help=None,
    brief=None,
    usage=None,
    enabled=True,
    hidden=False,
    routing_re_flags=IGNORECASE,
    exclude_from_autodoc=False,
    invalid_argument_config=None,
    **kwargs
)

Decorator, преобразующий корутину в Command.

Аргументы

name: Имя команды. Если не указано, используется имя функции aliases: Альтернативные имена команды prefixes: Префиксы команды (переопределяют префиксы пакета) filter: Фильтр для применения к команде description: Подробное описание команды help: Текст справки команды brief: Краткое описание команды usage: Примеры использования команды (например, " ") enabled: Включена ли команда hidden: Скрыть ли из справки routing_re_flags: Флаги regex для маршрутизации команды exclude_from_autodoc: Исключить ли из autodoc invalid_argument_config: Конфигурация для невалидных аргументов **kwargs: Дополнительные параметры

Возвращает

Функцию-decorator

Пример

@commands.command() async def hello(ctx: Context): '''Поздороваться''' await ctx.send("Hello!")

@commands.command(name="greet", aliases=["hi"], usage="") async def greet_command(ctx: Context, name: str): '''Поприветствовать кого-то''' await ctx.send(f"Hello, {name}!")

Source code in vkflow\commands\core.py
def command(
    name: str | None = None,
    *,
    aliases: list[str] | None = None,
    prefixes: list[str] | None = None,
    filter: BaseFilter | None = None,
    description: str | None = None,
    help: str | None = None,
    brief: str | None = None,
    usage: str | None = None,
    enabled: bool = True,
    hidden: bool = False,
    routing_re_flags: re.RegexFlag | int = re.IGNORECASE,
    exclude_from_autodoc: bool = False,
    invalid_argument_config: InvalidArgumentConfig | None = None,
    **kwargs,
) -> typing.Callable[[Handler], Command]:
    """
    Decorator, преобразующий корутину в Command.

    Аргументы:
        name: Имя команды. Если не указано, используется имя функции
        aliases: Альтернативные имена команды
        prefixes: Префиксы команды (переопределяют префиксы пакета)
        filter: Фильтр для применения к команде
        description: Подробное описание команды
        help: Текст справки команды
        brief: Краткое описание команды
        usage: Примеры использования команды (например, "<user> <reason>")
        enabled: Включена ли команда
        hidden: Скрыть ли из справки
        routing_re_flags: Флаги regex для маршрутизации команды
        exclude_from_autodoc: Исключить ли из autodoc
        invalid_argument_config: Конфигурация для невалидных аргументов
        **kwargs: Дополнительные параметры

    Возвращает:
        Функцию-decorator

    Пример:
        @commands.command()
        async def hello(ctx: Context):
            '''Поздороваться'''
            await ctx.send("Hello!")

        @commands.command(name="greet", aliases=["hi"], usage="<name>")
        async def greet_command(ctx: Context, name: str):
            '''Поприветствовать кого-то'''
            await ctx.send(f"Hello, {name}!")
    """

    def decorator(func: Handler) -> Command:
        cmd_kwargs = {
            "prefixes": prefixes or [],
            "filter": filter,
            "description": description,
            "help": help,
            "brief": brief,
            "usage": usage,
            "enabled": enabled,
            "hidden": hidden,
            "routing_re_flags": routing_re_flags,
            "exclude_from_autodoc": exclude_from_autodoc or hidden,
        }

        if invalid_argument_config is not None:
            cmd_kwargs["invalid_argument_config"] = invalid_argument_config

        cmd_kwargs.update(kwargs)

        if hasattr(func, "__vkflow_checks__"):
            checks = func.__vkflow_checks__
            if checks:
                combined_filter = cmd_kwargs["filter"]
                for check in checks:
                    combined_filter = check if combined_filter is None else combined_filter & check
                cmd_kwargs["filter"] = combined_filter

        return Command(func, name=name, aliases=aliases, **cmd_kwargs)

    return decorator

group

group

group(name=None, *, aliases=None, **kwargs)

Decorator, преобразующий корутину в Group.

Аргументы

name: Имя группы. Если не указано, используется имя функции aliases: Альтернативные имена группы **kwargs: Дополнительные параметры (те же, что у decorator command)

Возвращает

Функцию-decorator

Пример

@commands.group() async def config(ctx: Context): '''Команды конфигурации''' pass

@config.command() async def show(ctx: Context): '''Показать конфигурацию''' await ctx.send("Config: ...")

С псевдонимами

@commands.group(aliases=["cfg", "conf"]) async def config(ctx: Context): pass

Source code in vkflow\commands\core.py
def group(
    name: str | None = None, *, aliases: list[str] | None = None, **kwargs
) -> typing.Callable[[Handler], Group]:
    """
    Decorator, преобразующий корутину в Group.

    Аргументы:
        name: Имя группы. Если не указано, используется имя функции
        aliases: Альтернативные имена группы
        **kwargs: Дополнительные параметры (те же, что у decorator command)

    Возвращает:
        Функцию-decorator

    Пример:
        @commands.group()
        async def config(ctx: Context):
            '''Команды конфигурации'''
            pass

        @config.command()
        async def show(ctx: Context):
            '''Показать конфигурацию'''
            await ctx.send("Config: ...")

        # С псевдонимами
        @commands.group(aliases=["cfg", "conf"])
        async def config(ctx: Context):
            pass
    """

    def decorator(func: Handler) -> Group:
        if hasattr(func, "__vkflow_checks__"):
            checks = func.__vkflow_checks__
            if checks:
                combined_filter = kwargs.get("filter")
                for check in checks:
                    combined_filter = check if combined_filter is None else combined_filter & check
                kwargs["filter"] = combined_filter

        return Group(func, name=name, aliases=aliases, **kwargs)

    return decorator

Command

Command

Command(callback, name=None, aliases=None, **kwargs)

Bases: Command

Класс, представляющий команду в фреймворке ext.commands.

Атрибуты

callback: Корутина, выполняемая при вызове команды name: Имя команды aliases: Псевдонимы команды help: Текст справки команды brief: Краткое описание команды enabled: Включена ли команда hidden: Скрыта ли команда из справки usage: Примеры использования команды parent: Родительская Group, если команда является подкомандой

Source code in vkflow\commands\core.py
def __init__(
    self, callback: Handler, name: str | None = None, aliases: list[str] | None = None, **kwargs
):
    if name is None:
        name = callback.__name__

    names = [name]
    if aliases:
        names.extend(aliases)

    self.callback = callback
    self.help = kwargs.pop("help", None)
    self.brief = kwargs.pop("brief", None)
    self.enabled = kwargs.pop("enabled", True)
    self.hidden = kwargs.pop("hidden", False)
    self.usage: str | None = kwargs.pop("usage", None)

    self.parent: Group | None = None

    self.__doc__ = callback.__doc__ if callback.__doc__ else ""

    self._cooldown_handler: typing.Callable[..., typing.Awaitable] | None = None
    self._cooldown_mappings: list[typing.Any] = []

    self._max_concurrency_mapping: typing.Any = None
    self._max_concurrency_handler: typing.Callable[..., typing.Awaitable] | None = None

    self._before_invoke: typing.Callable[..., typing.Awaitable] | None = None
    self._after_invoke: typing.Callable[..., typing.Awaitable] | None = None

    if hasattr(callback, "__max_concurrency__"):
        self._max_concurrency_mapping = callback.__max_concurrency__

    if hasattr(callback, "__vkflow_checks__"):
        for check in callback.__vkflow_checks__:
            if hasattr(check, "_cooldown_mapping"):
                self._cooldown_mappings.append(check._cooldown_mapping)

    super().__init__(handler=callback, names=names, **kwargs)

name property

name

Имя команды

aliases property

aliases

Псевдонимы команды

parents property

parents

Список всех родительских групп от ближайшей к корневой.

Возвращает

list[Group]: Список групп от ближайшего родителя к корню

Пример

Для команды: config settings show

show.parents = [settings, config]

__hash__

__hash__()

Хеширование по идентификатору объекта

Source code in vkflow\commands\core.py
def __hash__(self) -> int:
    """Хеширование по идентификатору объекта"""
    return id(self)

__eq__

__eq__(other)

Сравнение команд по идентификатору

Source code in vkflow\commands\core.py
def __eq__(self, other: object) -> bool:
    """Сравнение команд по идентификатору"""
    return self is other

__get__

__get__(instance, owner)

Привязка обработчиков кулдауна и хуков к экземпляру

Source code in vkflow\commands\core.py
def __get__(self, instance, owner):
    """Привязка обработчиков кулдауна и хуков к экземпляру"""
    if instance is None:
        return self

    bound_command = super().__get__(instance, owner)

    bound_command._cooldown_handler = _bind_if_descriptor(self._cooldown_handler, instance, owner)
    bound_command._max_concurrency_handler = _bind_if_descriptor(
        self._max_concurrency_handler, instance, owner
    )
    bound_command._before_invoke = _bind_if_descriptor(self._before_invoke, instance, owner)
    bound_command._after_invoke = _bind_if_descriptor(self._after_invoke, instance, owner)

    bound_command.parent = self.parent
    bound_command.usage = self.usage
    bound_command._package = self._package

    return bound_command

on_cooldown

on_cooldown()

Decorator для регистрации обработчика cooldown для этой команды.

Этот обработчик вызывается, когда команда находится на cooldown.

Обработчик может принимать следующие необязательные параметры: - ctx (Context): Контекст команды - error (OnCooldownError): Исключение OnCooldownError - remaining (float): Секунды до окончания cooldown (то же, что error.retry_after)

Возвращает

Функцию-decorator

Пример

@commands.command() @commands.cooldown(rate=3, per=60, type=BucketType.USER) async def test(ctx: Context): await ctx.send("Command executed!")

С параметром error

@test.on_cooldown() async def on_test_cooldown(ctx: Context, error: OnCooldownError): await ctx.send(f"Try again in {error.retry_after:.1f} seconds")

С параметром remaining (удобнее)

@test.on_cooldown() async def on_test_cooldown(ctx: Context, remaining: float): await ctx.send(f"Wait {remaining:.1f}s")

Минимальный вариант -только ctx

@test.on_cooldown() async def on_test_cooldown(ctx: Context): await ctx.send("On cooldown!")

Source code in vkflow\commands\core.py
def on_cooldown(
    self,
) -> typing.Callable[[typing.Callable[..., typing.Awaitable]], typing.Callable[..., typing.Awaitable]]:
    """
    Decorator для регистрации обработчика cooldown для этой команды.

    Этот обработчик вызывается, когда команда находится на cooldown.

    Обработчик может принимать следующие необязательные параметры:
    - ctx (Context): Контекст команды
    - error (OnCooldownError): Исключение OnCooldownError
    - remaining (float): Секунды до окончания cooldown (то же, что error.retry_after)

    Возвращает:
        Функцию-decorator

    Пример:
        @commands.command()
        @commands.cooldown(rate=3, per=60, type=BucketType.USER)
        async def test(ctx: Context):
            await ctx.send("Command executed!")

        # С параметром error
        @test.on_cooldown()
        async def on_test_cooldown(ctx: Context, error: OnCooldownError):
            await ctx.send(f"Try again in {error.retry_after:.1f} seconds")

        # С параметром remaining (удобнее)
        @test.on_cooldown()
        async def on_test_cooldown(ctx: Context, remaining: float):
            await ctx.send(f"Wait {remaining:.1f}s")

        # Минимальный вариант -только ctx
        @test.on_cooldown()
        async def on_test_cooldown(ctx: Context):
            await ctx.send("On cooldown!")
    """

    def decorator(
        func: typing.Callable[..., typing.Awaitable],
    ) -> typing.Callable[..., typing.Awaitable]:
        if self._cooldown_handler is not None:
            raise ValueError(
                f"Команда '{self.name}' уже имеет обработчик cooldown. "
                "Допускается только один decorator @.on_cooldown()."
            )
        self._cooldown_handler = func
        return func

    return decorator

on_max_concurrency

on_max_concurrency()

Decorator для регистрации обработчика max concurrency для этой команды.

Этот обработчик вызывается при достижении лимита одновременных выполнений.

Обработчик может принимать следующие необязательные параметры: - ctx (Context): Контекст команды - error (MaxConcurrencyReachedError): Исключение MaxConcurrencyReachedError - limit (int): Максимально допустимое количество одновременных выполнений - current (int): Текущее количество активных выполнений

Возвращает

Функцию-decorator

Пример

@commands.command() @commands.max_concurrency(2, BucketType.CHAT) async def heavy(ctx: Context): await asyncio.sleep(10) await ctx.send("Done!")

С параметром error

@heavy.on_max_concurrency() async def on_heavy_concurrency(ctx: Context, error: MaxConcurrencyReachedError): await ctx.send(f"Limit reached: {error.current}/{error.number}")

С параметрами limit и current (удобнее)

@heavy.on_max_concurrency() async def on_heavy_concurrency(ctx: Context, limit: int, current: int): await ctx.send(f"Too many uses: {current}/{limit}")

Минимальный вариант -только ctx

@heavy.on_max_concurrency() async def on_heavy_concurrency(ctx: Context): await ctx.send("Too many concurrent executions!")

Source code in vkflow\commands\core.py
def on_max_concurrency(
    self,
) -> typing.Callable[[typing.Callable[..., typing.Awaitable]], typing.Callable[..., typing.Awaitable]]:
    """
    Decorator для регистрации обработчика max concurrency для этой команды.

    Этот обработчик вызывается при достижении лимита одновременных выполнений.

    Обработчик может принимать следующие необязательные параметры:
    - ctx (Context): Контекст команды
    - error (MaxConcurrencyReachedError): Исключение MaxConcurrencyReachedError
    - limit (int): Максимально допустимое количество одновременных выполнений
    - current (int): Текущее количество активных выполнений

    Возвращает:
        Функцию-decorator

    Пример:
        @commands.command()
        @commands.max_concurrency(2, BucketType.CHAT)
        async def heavy(ctx: Context):
            await asyncio.sleep(10)
            await ctx.send("Done!")

        # С параметром error
        @heavy.on_max_concurrency()
        async def on_heavy_concurrency(ctx: Context, error: MaxConcurrencyReachedError):
            await ctx.send(f"Limit reached: {error.current}/{error.number}")

        # С параметрами limit и current (удобнее)
        @heavy.on_max_concurrency()
        async def on_heavy_concurrency(ctx: Context, limit: int, current: int):
            await ctx.send(f"Too many uses: {current}/{limit}")

        # Минимальный вариант -только ctx
        @heavy.on_max_concurrency()
        async def on_heavy_concurrency(ctx: Context):
            await ctx.send("Too many concurrent executions!")
    """

    def decorator(
        func: typing.Callable[..., typing.Awaitable],
    ) -> typing.Callable[..., typing.Awaitable]:
        if self._max_concurrency_handler is not None:
            raise ValueError(
                f"Команда '{self.name}' уже имеет обработчик max concurrency. "
                "Допускается только один decorator @.on_max_concurrency()."
            )
        self._max_concurrency_handler = func
        return func

    return decorator

before_invoke

before_invoke()

Decorator для регистрации хука before_invoke для этой команды.

Этот хук вызывается перед выполнением обработчика команды. Если хук возвращает False, выполнение команды будет отменено.

Обработчик может принимать следующие необязательные параметры: - ctx (Context): Контекст команды - А также любые распарсенные аргументы команды с теми же именами

Возвращает

Функцию-decorator

Пример

@commands.command() async def test(ctx: Context, user: User): await ctx.send(f"Hello, {user.name}!")

@test.before_invoke() async def before_test(ctx: Context): print(f"Command {ctx.command.name} is about to run")

С аргументами

@test.before_invoke() async def before_test(ctx: Context, user: User): print(f"About to greet {user.name}")

Верните False для отмены команды

@test.before_invoke() async def check_permissions(ctx: Context): if not await has_permission(ctx.author): await ctx.send("No permission!") return False

Source code in vkflow\commands\core.py
def before_invoke(
    self,
) -> typing.Callable[[typing.Callable[..., typing.Awaitable]], typing.Callable[..., typing.Awaitable]]:
    """
    Decorator для регистрации хука before_invoke для этой команды.

    Этот хук вызывается перед выполнением обработчика команды.
    Если хук возвращает False, выполнение команды будет отменено.

    Обработчик может принимать следующие необязательные параметры:
    - ctx (Context): Контекст команды
    - А также любые распарсенные аргументы команды с теми же именами

    Возвращает:
        Функцию-decorator

    Пример:
        @commands.command()
        async def test(ctx: Context, user: User):
            await ctx.send(f"Hello, {user.name}!")

        @test.before_invoke()
        async def before_test(ctx: Context):
            print(f"Command {ctx.command.name} is about to run")

        # С аргументами
        @test.before_invoke()
        async def before_test(ctx: Context, user: User):
            print(f"About to greet {user.name}")

        # Верните False для отмены команды
        @test.before_invoke()
        async def check_permissions(ctx: Context):
            if not await has_permission(ctx.author):
                await ctx.send("No permission!")
                return False
    """

    def decorator(
        func: typing.Callable[..., typing.Awaitable],
    ) -> typing.Callable[..., typing.Awaitable]:
        if self._before_invoke is not None:
            raise ValueError(
                f"Команда '{self.name}' уже имеет обработчик before_invoke. "
                "Допускается только один decorator @.before_invoke()."
            )
        self._before_invoke = func
        return func

    return decorator

after_invoke

after_invoke()

Decorator для регистрации хука after_invoke для этой команды.

Этот хук вызывается после завершения обработчика команды (успешно или с ошибкой).

Обработчик может принимать следующие необязательные параметры: - ctx (Context): Контекст команды - result: Возвращаемое значение обработчика команды - error (Exception | None): Исключение при ошибке команды, None при успехе - А также любые распарсенные аргументы команды с теми же именами

Возвращает

Функцию-decorator

Пример

@commands.command() async def test(ctx: Context): return "success"

@test.after_invoke() async def after_test(ctx: Context, result): print(f"Command returned: {result}")

@test.after_invoke() async def after_test(ctx: Context, error): if error: print(f"Command failed: {error}") else: print("Command succeeded")

Source code in vkflow\commands\core.py
def after_invoke(
    self,
) -> typing.Callable[[typing.Callable[..., typing.Awaitable]], typing.Callable[..., typing.Awaitable]]:
    """
    Decorator для регистрации хука after_invoke для этой команды.

    Этот хук вызывается после завершения обработчика команды (успешно или с ошибкой).

    Обработчик может принимать следующие необязательные параметры:
    - ctx (Context): Контекст команды
    - result: Возвращаемое значение обработчика команды
    - error (Exception | None): Исключение при ошибке команды, None при успехе
    - А также любые распарсенные аргументы команды с теми же именами

    Возвращает:
        Функцию-decorator

    Пример:
        @commands.command()
        async def test(ctx: Context):
            return "success"

        @test.after_invoke()
        async def after_test(ctx: Context, result):
            print(f"Command returned: {result}")

        @test.after_invoke()
        async def after_test(ctx: Context, error):
            if error:
                print(f"Command failed: {error}")
            else:
                print("Command succeeded")
    """

    def decorator(
        func: typing.Callable[..., typing.Awaitable],
    ) -> typing.Callable[..., typing.Awaitable]:
        if self._after_invoke is not None:
            raise ValueError(
                f"Команда '{self.name}' уже имеет обработчик after_invoke. "
                "Допускается только один decorator @.after_invoke()."
            )
        self._after_invoke = func
        return func

    return decorator

reset_cooldown

reset_cooldown(
    ctx=None, *, type=None, user=None, chat=None
)

Сбросить cooldown(ы) для этой команды.

Аргументы

ctx: Необязательный Context, NewMessage или None - Если Context/NewMessage: сбросить cooldown для конкретного пользователя/чата по типу bucket - Если None и нет других аргументов: сбросить ВСЕ cooldown для этой команды type: Необязательный BucketType для фильтрации сбрасываемых cooldown - Если указан, сбрасывает только cooldown с этим типом - Если None, сбрасывает все cooldown user: ID пользователя для сброса cooldown (только для типов USER/MEMBER) chat: ID чата/peer для сброса cooldown (только для типов CHAT/MEMBER)

Примеры

Сбросить все cooldown для всех пользователей/чатов

command.reset_cooldown()

Сбросить cooldown для конкретного пользователя (из контекста)

command.reset_cooldown(ctx)

Сбросить только cooldown типа USER

command.reset_cooldown(type=BucketType.USER)

Сбросить cooldown для конкретного ID пользователя

command.reset_cooldown(user=123456)

Сбросить cooldown для конкретного чата

command.reset_cooldown(chat=2000000001)

Сбросить cooldown для конкретного участника (пользователь в чате)

command.reset_cooldown(user=123456, chat=2000000001)

Сбросить только CHAT cooldown для конкретного чата

command.reset_cooldown(type=BucketType.CHAT, chat=2000000001)

В другой команде

@commands.command() @is_admin() async def reset(self, ctx: commands.Context, cmd_name: str): cmd = getattr(self, cmd_name, None) if cmd: cmd.reset_cooldown() await ctx.send(f"Cooldown for {cmd_name} reset!")

Source code in vkflow\commands\core.py
def reset_cooldown(
    self,
    ctx: typing.Any = None,
    *,
    type: BucketType | None = None,
    user: int | None = None,
    chat: int | None = None,
):
    """
    Сбросить cooldown(ы) для этой команды.

    Аргументы:
        ctx: Необязательный Context, NewMessage или None
            - Если Context/NewMessage: сбросить cooldown для конкретного пользователя/чата по типу bucket
            - Если None и нет других аргументов: сбросить ВСЕ cooldown для этой команды
        type: Необязательный BucketType для фильтрации сбрасываемых cooldown
            - Если указан, сбрасывает только cooldown с этим типом
            - Если None, сбрасывает все cooldown
        user: ID пользователя для сброса cooldown (только для типов USER/MEMBER)
        chat: ID чата/peer для сброса cooldown (только для типов CHAT/MEMBER)

    Примеры:
        # Сбросить все cooldown для всех пользователей/чатов
        command.reset_cooldown()

        # Сбросить cooldown для конкретного пользователя (из контекста)
        command.reset_cooldown(ctx)

        # Сбросить только cooldown типа USER
        command.reset_cooldown(type=BucketType.USER)

        # Сбросить cooldown для конкретного ID пользователя
        command.reset_cooldown(user=123456)

        # Сбросить cooldown для конкретного чата
        command.reset_cooldown(chat=2000000001)

        # Сбросить cooldown для конкретного участника (пользователь в чате)
        command.reset_cooldown(user=123456, chat=2000000001)

        # Сбросить только CHAT cooldown для конкретного чата
        command.reset_cooldown(type=BucketType.CHAT, chat=2000000001)

        # В другой команде
        @commands.command()
        @is_admin()
        async def reset(self, ctx: commands.Context, cmd_name: str):
            cmd = getattr(self, cmd_name, None)
            if cmd:
                cmd.reset_cooldown()
                await ctx.send(f"Cooldown for {cmd_name} reset!")
    """
    for mapping in self._cooldown_mappings:
        if type is not None and mapping.type != type:
            continue

        mapping.reset(ctx, user=user, chat=chat)

handle_message async

handle_message(ctx)

Обработка сообщения с поддержкой max_concurrency

Source code in vkflow\commands\core.py
async def handle_message(self, ctx: NewMessage) -> None:
    """Обработка сообщения с поддержкой max_concurrency"""
    from .cooldowns import MaxConcurrencyReachedError
    from vkflow.exceptions import ArgumentParsingError

    routing = await self._match_routing(ctx)
    if routing is not None:
        is_routing_matched, _, prefix, invoked_with = routing

        try:
            arguments = await self._make_arguments(
                ctx,
                ctx.msg.text[is_routing_matched.end() :],
                prefix=prefix,
                invoked_with=invoked_with,
            )
        except ArgumentParsingError as error:
            await self._handle_argument_parsing_error(ctx, error)
            return

        if arguments is not None:
            passed_filter = await self._run_through_filters(ctx)
            if passed_filter:
                if self._max_concurrency_mapping is not None:
                    check_ctx = await self._create_context(
                        ctx, command=self, prefix=prefix, invoked_with=invoked_with
                    )

                    try:
                        async with self._max_concurrency_mapping(check_ctx):
                            await self._call_handler(ctx, arguments)
                    except MaxConcurrencyReachedError as e:
                        if self._max_concurrency_handler is not None:
                            await inject_and_call(
                                self._max_concurrency_handler,
                                {
                                    "ctx": check_ctx,
                                    "error": e,
                                    "limit": e.number,
                                    "current": e.current,
                                },
                            )
                else:
                    await self._call_handler(ctx, arguments)

Group

Group

Group(callback, name=None, aliases=None, **kwargs)

Bases: GroupMixin, Command

Класс, представляющий группу команд.

Наследуется от GroupMixin (управление подкомандами) и Command.

Атрибуты

invoke_without_command: Вызывать ли обработчик группы, если подкоманда не найдена

Пример

@commands.group() async def config(ctx: Context): '''Команды конфигурации''' if ctx.invoked_subcommand is None: await ctx.send("Неверная подкоманда")

@config.command() async def show(ctx: Context): '''Показать конфигурацию''' await ctx.send("Config: ...")

@commands.group(aliases=["cfg", "conf"]) async def config(ctx: Context): pass

Source code in vkflow\commands\core.py
def __init__(
    self, callback: Handler, name: str | None = None, aliases: list[str] | None = None, **kwargs
):
    self.invoke_without_command = kwargs.pop("invoke_without_command", False)
    super().__init__(callback, name=name, aliases=aliases, **kwargs)

__get__

__get__(instance, owner)

Привязка подкоманд к экземпляру

Source code in vkflow\commands\core.py
def __get__(self, instance, owner):
    """Привязка подкоманд к экземпляру"""
    if instance is None:
        return self

    bound_group = super().__get__(instance, owner)

    bound_group.all_commands = {}
    for cmd_name, cmd in self.all_commands.items():
        if hasattr(cmd.handler, "__get__"):
            bound_subcmd = copy.copy(cmd)
            bound_subcmd.handler = cmd.handler.__get__(instance, owner)

            bound_subcmd._error_handlers = [
                (_bind_if_descriptor(eh, instance, owner), et) for eh, et in cmd._error_handlers
            ]

            bound_subcmd._cooldown_handler = _bind_if_descriptor(cmd._cooldown_handler, instance, owner)
            bound_subcmd._max_concurrency_handler = _bind_if_descriptor(
                cmd._max_concurrency_handler, instance, owner
            )
            bound_subcmd._before_invoke = _bind_if_descriptor(cmd._before_invoke, instance, owner)
            bound_subcmd._after_invoke = _bind_if_descriptor(cmd._after_invoke, instance, owner)

            bound_group.all_commands[cmd_name] = bound_subcmd
        else:
            bound_group.all_commands[cmd_name] = cmd

    return bound_group

update_prefix

update_prefix(*prefixes)

Обновление префиксов включая подкоманды

Source code in vkflow\commands\core.py
def update_prefix(self, *prefixes: str) -> None:
    """Обновление префиксов включая подкоманды"""
    super().update_prefix(*prefixes)
    for cmd in self.commands:
        cmd.update_prefix(*prefixes)

handle_message async

handle_message(ctx)

Обработка сообщения с поддержкой подкоманд

Source code in vkflow\commands\core.py
async def handle_message(self, ctx: NewMessage) -> None:
    """Обработка сообщения с поддержкой подкоманд"""
    from vkflow.exceptions import ArgumentParsingError

    routing = await self._match_routing(ctx)
    if routing is not None:
        is_routing_matched, _, prefix, invoked_with = routing

        remaining_text = ctx.msg.text[is_routing_matched.end() :].lstrip()

        invoked_subcommand = None
        subcommand_obj = None

        if remaining_text:
            for cmd in self.all_commands.values():
                for cmd_name_or_alias in cmd.names:
                    pattern = re.escape(cmd_name_or_alias) + r"(?:\s|$)"
                    match = re.match(pattern, remaining_text, flags=self.routing_re_flags)

                    if match:
                        invoked_subcommand = cmd_name_or_alias
                        subcommand_obj = cmd
                        remaining_text = remaining_text[match.end() :].lstrip()
                        break

                if subcommand_obj:
                    break

        passed_group_filter = await self._run_through_filters(ctx)
        if not passed_group_filter:
            return

        if subcommand_obj is not None:
            try:
                subcommand_arguments = await subcommand_obj._make_arguments(
                    ctx,
                    remaining_text,
                    prefix=prefix,
                    invoked_with=invoked_subcommand,
                )
            except ArgumentParsingError as error:
                await subcommand_obj._handle_argument_parsing_error(ctx, error)
                return

            if subcommand_arguments is not None:
                passed_subcommand_filter = await subcommand_obj._run_through_filters(ctx)
                if not passed_subcommand_filter:
                    return

                try:
                    group_arguments = await self._make_arguments(
                        ctx,
                        "",
                        prefix=prefix,
                        invoked_with=invoked_with,
                    )
                except ArgumentParsingError as error:
                    await self._handle_argument_parsing_error(ctx, error)
                    return

                if group_arguments is not None:
                    if self._ctx_argument_name:
                        group_ctx_arg = group_arguments.get(self._ctx_argument_name)
                        if isinstance(group_ctx_arg, Context):
                            group_ctx_arg.invoked_subcommand = subcommand_obj

                    await self._call_handler(ctx, group_arguments)

                if subcommand_obj._ctx_argument_name:
                    subcmd_ctx_arg = subcommand_arguments.get(subcommand_obj._ctx_argument_name)
                    if isinstance(subcmd_ctx_arg, Context):
                        subcmd_ctx_arg.invoked_subcommand = None

                await subcommand_obj._call_handler(ctx, subcommand_arguments)
        else:
            try:
                arguments = await self._make_arguments(
                    ctx,
                    remaining_text,
                    prefix=prefix,
                    invoked_with=invoked_with,
                )
            except ArgumentParsingError as error:
                await self._handle_argument_parsing_error(ctx, error)
                return

            if arguments is not None:
                if self._ctx_argument_name:
                    ctx_arg = arguments.get(self._ctx_argument_name)
                    if isinstance(ctx_arg, Context):
                        ctx_arg.invoked_subcommand = None

                await self._call_handler(ctx, arguments)

Cog

Cog

Cog()

Базовый класс для создания когов -коллекций команд и обработчиков.

Система когов позволяет организовать функциональность бота в отдельные модули.

Атрибуты

qualified_name: Имя кога (по умолчанию -имя класса) description: Описание кога (по умолчанию -docstring класса) app: Экземпляр App (инжектируется при добавлении кога) bot: Экземпляр Bot (инжектируется при добавлении кога)

Примеры

app.add_cog(SimpleCog()) app.add_cog(CounterCog())

Инициализация Cog.

Этот метод может быть переопределён в подклассах БЕЗ вызова super().init(). Базовая инициализация выполняется автоматически в new.

Для обратной совместимости вызов super().init() по-прежнему работает, но не является обязательным.

Source code in vkflow\commands\cog.py
def __init__(self):
    """
    Инициализация Cog.

    Этот метод может быть переопределён в подклассах БЕЗ вызова super().__init__().
    Базовая инициализация выполняется автоматически в __new__.

    Для обратной совместимости вызов super().__init__() по-прежнему работает,
    но не является обязательным.
    """
    self._cog_base_init()

qualified_name property

qualified_name

Квалифицированное имя кога

description property

description

Описание кога

__new__

__new__(*args, **kwargs)

Создать новый экземпляр Cog и выполнить базовую инициализацию.

Этот метод вызывается ДО init и гарантирует, что базовая инициализация всегда происходит, даже если пользователь не вызывает super().init().

Это ключ к тому, чтобы super().init() был необязательным!

Source code in vkflow\commands\cog.py
def __new__(cls, *args, **kwargs):
    """
    Создать новый экземпляр Cog и выполнить базовую инициализацию.

    Этот метод вызывается ДО __init__ и гарантирует, что базовая
    инициализация всегда происходит, даже если пользователь
    не вызывает super().__init__().

    Это ключ к тому, чтобы super().__init__() был необязательным!
    """
    instance = super().__new__(cls)
    instance._cog_base_init()

    return instance

listener classmethod

listener(name=None)

Декоратор для пометки метода как обработчика событий.

Аргументы

name: Имя события для прослушивания. Если не указано, используется имя метода

Пример

@commands.Cog.listener() async def on_message_new(self, event): print(f"Новое сообщение: {event}")

@commands.Cog.listener("message_new") async def handle_message(self, event): print(f"Обработка: {event}")

Source code in vkflow\commands\cog.py
@classmethod
def listener(cls, name: str | None = None):
    """
    Декоратор для пометки метода как обработчика событий.

    Аргументы:
        name: Имя события для прослушивания. Если не указано, используется имя метода

    Пример:
        @commands.Cog.listener()
        async def on_message_new(self, event):
            print(f"Новое сообщение: {event}")

        @commands.Cog.listener("message_new")
        async def handle_message(self, event):
            print(f"Обработка: {event}")
    """

    def decorator(func):
        func.__cog_listener__ = True
        func.__cog_listener_name__ = name or func.__name__

        return func

    return decorator

get_commands

get_commands()

Получить все команды в этом коге.

Возвращает

Список объектов Command

Source code in vkflow\commands\cog.py
def get_commands(self) -> list[Command]:
    """
    Получить все команды в этом коге.

    Возвращает:
        Список объектов Command
    """
    return self._cog_commands.copy()

walk_commands

walk_commands()

Итерация по всем командам в этом коге, включая подкоманды групп.

Yields:

Type Description
Command

Объекты Command

Source code in vkflow\commands\cog.py
def walk_commands(self) -> typing.Generator[Command, None, None]:
    """
    Итерация по всем командам в этом коге, включая подкоманды групп.

    Yields:
        Объекты Command
    """
    for command in self._cog_commands:
        yield command

        if hasattr(command, "all_commands"):
            yield from set(command.all_commands.values())

cog_load async

cog_load()

Вызывается при загрузке Cog в App.

Это асинхронный хук, вызываемый после регистрации Cog и добавления всех команд/обработчиков в App.

Переопределите этот метод для пользовательской логики инициализации: - Загрузка данных из базы данных - Установка соединений - Запуск фоновых задач

Пример

class MyCog(commands.Cog): async def cog_load(self): print(f"Ког {self.qualified_name} загружен!") self.db = await connect_to_database()

Source code in vkflow\commands\cog.py
async def cog_load(self):
    """
    Вызывается при загрузке Cog в App.

    Это асинхронный хук, вызываемый после регистрации Cog
    и добавления всех команд/обработчиков в App.

    Переопределите этот метод для пользовательской логики инициализации:
    - Загрузка данных из базы данных
    - Установка соединений
    - Запуск фоновых задач

    Пример:
        class MyCog(commands.Cog):
            async def cog_load(self):
                print(f"Ког {self.qualified_name} загружен!")
                self.db = await connect_to_database()
    """

cog_unload async

cog_unload()

Вызывается при выгрузке Cog из App.

Это асинхронный хук, вызываемый перед удалением Cog и снятием регистрации его команд/обработчиков из App.

Переопределите этот метод для пользовательской логики очистки: - Закрытие соединений с базой данных - Сохранение состояния - Отмена фоновых задач

Пример

class MyCog(commands.Cog): async def cog_unload(self): print(f"Ког {self.qualified_name} выгружается...") await self.db.close()

Source code in vkflow\commands\cog.py
async def cog_unload(self):
    """
    Вызывается при выгрузке Cog из App.

    Это асинхронный хук, вызываемый перед удалением Cog
    и снятием регистрации его команд/обработчиков из App.

    Переопределите этот метод для пользовательской логики очистки:
    - Закрытие соединений с базой данных
    - Сохранение состояния
    - Отмена фоновых задач

    Пример:
        class MyCog(commands.Cog):
            async def cog_unload(self):
                print(f"Ког {self.qualified_name} выгружается...")
                await self.db.close()
    """

cog_check async

cog_check(ctx)

Глобальная проверка, применяемая ко всем командам в этом коге.

Эта проверка вызывается для каждой команды в коге до оценки собственных фильтров/проверок команды.

Аргументы

ctx: Контекст команды (Context)

Возвращает

True, если команда должна быть выполнена, False иначе

Пример

class AdminCog(commands.Cog): async def cog_check(self, ctx): return ctx.author in self.admin_ids

Source code in vkflow\commands\cog.py
async def cog_check(self, ctx) -> bool:
    """
    Глобальная проверка, применяемая ко всем командам в этом коге.

    Эта проверка вызывается для каждой команды в коге до
    оценки собственных фильтров/проверок команды.

    Аргументы:
        ctx: Контекст команды (Context)

    Возвращает:
        True, если команда должна быть выполнена, False иначе

    Пример:
        class AdminCog(commands.Cog):
            async def cog_check(self, ctx):
                return ctx.author in self.admin_ids
    """
    return True

cog_before_invoke async

cog_before_invoke(ctx)

Вызывается перед выполнением любой команды в этом коге.

Этот хук вызывается после прохождения всех проверок, но до выполнения handler команды. Если метод вернёт False, команда будет отменена.

Аргументы

ctx: Контекст команды (Context)

Возвращает

True для продолжения выполнения, False для отмены

Пример

class MyCog(commands.Cog): async def cog_before_invoke(self, ctx): print(f"Запускаю {ctx.command.name}")

Source code in vkflow\commands\cog.py
async def cog_before_invoke(self, ctx):
    """
    Вызывается перед выполнением любой команды в этом коге.

    Этот хук вызывается после прохождения всех проверок, но до
    выполнения handler команды. Если метод вернёт False,
    команда будет отменена.

    Аргументы:
        ctx: Контекст команды (Context)

    Возвращает:
        True для продолжения выполнения, False для отмены

    Пример:
        class MyCog(commands.Cog):
            async def cog_before_invoke(self, ctx):
                print(f"Запускаю {ctx.command.name}")
    """

cog_after_invoke async

cog_after_invoke(ctx, result=None, error=None)

Вызывается после завершения любой команды в этом коге (успех или ошибка).

Этот хук вызывается всегда, независимо от того, завершилась команда успешно или вызвала исключение.

Аргументы

ctx: Контекст команды (Context) result: Возвращаемое значение команды (None, если команда упала) error: Исключение, если команда упала, None при успехе

Пример

class MyCog(commands.Cog): async def cog_after_invoke(self, ctx, result, error): if error: print(f"Команда {ctx.command.name} упала: {error}") else: print(f"Команда {ctx.command.name} выполнена")

Source code in vkflow\commands\cog.py
async def cog_after_invoke(self, ctx, result=None, error=None):
    """
    Вызывается после завершения любой команды в этом коге (успех или ошибка).

    Этот хук вызывается всегда, независимо от того, завершилась
    команда успешно или вызвала исключение.

    Аргументы:
        ctx: Контекст команды (Context)
        result: Возвращаемое значение команды (None, если команда упала)
        error: Исключение, если команда упала, None при успехе

    Пример:
        class MyCog(commands.Cog):
            async def cog_after_invoke(self, ctx, result, error):
                if error:
                    print(f"Команда {ctx.command.name} упала: {error}")
                else:
                    print(f"Команда {ctx.command.name} выполнена")
    """

cog_command_error async

cog_command_error(ctx, error)

Вызывается при возникновении ошибки в любой команде этого кога.

Этот обработчик выполняется параллельно с обычным потоком обработки ошибок и НЕ влияет на то, будет ли ошибка проброшена или обработана. Полезен для логирования, метрик или уведомлений.

Аргументы

ctx: Контекст команды (Context) error: Возникшее исключение

Пример

class MyCog(commands.Cog): async def cog_command_error(self, ctx, error): print(f"Ошибка в {ctx.command.name}: {error}") await self.send_error_to_admin(error)

Source code in vkflow\commands\cog.py
async def cog_command_error(self, ctx, error):
    """
    Вызывается при возникновении ошибки в любой команде этого кога.

    Этот обработчик выполняется параллельно с обычным потоком обработки
    ошибок и НЕ влияет на то, будет ли ошибка проброшена или обработана.
    Полезен для логирования, метрик или уведомлений.

    Аргументы:
        ctx: Контекст команды (Context)
        error: Возникшее исключение

    Пример:
        class MyCog(commands.Cog):
            async def cog_command_error(self, ctx, error):
                print(f"Ошибка в {ctx.command.name}: {error}")
                await self.send_error_to_admin(error)
    """

cog_command_fallback async

cog_command_fallback(ctx, error)

Вызывается, когда ошибка команды не обработана ни одним обработчиком ошибок.

Это fallback, который выполняется ПОСЛЕ проверки всех обработчиков ошибок конкретной команды. Если этот метод вызван, значит ни один обработчик @command.on_error() не подошёл к ошибке.

В отличие от cog_command_error, может «обработать» ошибку и предотвратить её повторный проброс.

Аргументы

ctx: Контекст команды (Context) error: Возникшее исключение

Пример

class MyCog(commands.Cog): async def cog_command_fallback(self, ctx, error): await ctx.send(f"Произошла ошибка: {type(error).name}")

Source code in vkflow\commands\cog.py
async def cog_command_fallback(self, ctx, error):
    """
    Вызывается, когда ошибка команды не обработана ни одним обработчиком ошибок.

    Это fallback, который выполняется ПОСЛЕ проверки всех обработчиков
    ошибок конкретной команды. Если этот метод вызван, значит ни один
    обработчик @command.on_error() не подошёл к ошибке.

    В отличие от cog_command_error, может «обработать» ошибку и
    предотвратить её повторный проброс.

    Аргументы:
        ctx: Контекст команды (Context)
        error: Возникшее исключение

    Пример:
        class MyCog(commands.Cog):
            async def cog_command_fallback(self, ctx, error):
                await ctx.send(f"Произошла ошибка: {type(error).__name__}")
    """

get_fsm

get_fsm(ctx, *, strategy=None)

Получить FSMContext для данного контекста.

Аргументы

ctx: Context, NewMessage или CallbackButtonPressed strategy: Опциональное переопределение стратегии ключа

Возвращает

Экземпляр FSMContext

Исключения

ValueError: Если fsm_storage не настроен

Пример

@commands.command() async def start_order(self, ctx): fsm = self.get_fsm(ctx) await fsm.set_state(OrderStates.waiting_name) await ctx.send("Введите ваше имя:")

Source code in vkflow\commands\cog.py
def get_fsm(self, ctx, *, strategy: str | None = None):
    """
    Получить FSMContext для данного контекста.

    Аргументы:
        ctx: Context, NewMessage или CallbackButtonPressed
        strategy: Опциональное переопределение стратегии ключа

    Возвращает:
        Экземпляр FSMContext

    Исключения:
        ValueError: Если fsm_storage не настроен

    Пример:
        @commands.command()
        async def start_order(self, ctx):
            fsm = self.get_fsm(ctx)
            await fsm.set_state(OrderStates.waiting_name)
            await ctx.send("Введите ваше имя:")
    """
    from vkflow.app.fsm import Context as FSMContext

    if self.fsm_storage is None:
        raise ValueError(
            f"FSM-хранилище не настроено для {self.qualified_name}. "
            "Установите self.fsm_storage в __init__ или как атрибут класса."
        )

    message = ctx._message if hasattr(ctx, "_message") else ctx

    return FSMContext.from_message(
        self.fsm_storage,
        message,
        strategy=strategy or self.fsm_strategy,
    )

process_fsm async

process_fsm(message)

Обработать сообщение через FSM-обработчики в этом коге.

Аргументы

message: NewMessage для обработки

Возвращает

True, если обработчик был вызван, False иначе

Пример

В App.route_message или пользовательском обработчике:

for cog in self.cogs.values(): if await cog.process_fsm(message): return # FSM обработал сообщение

Source code in vkflow\commands\cog.py
async def process_fsm(self, message) -> bool:
    """
    Обработать сообщение через FSM-обработчики в этом коге.

    Аргументы:
        message: NewMessage для обработки

    Возвращает:
        True, если обработчик был вызван, False иначе

    Пример:
        # В App.route_message или пользовательском обработчике:
        for cog in self.cogs.values():
            if await cog.process_fsm(message):
                return  # FSM обработал сообщение
    """
    if self.fsm_storage is None or not self._cog_fsm_handlers:
        return False

    from vkflow.app.fsm import Context as FSMContext

    fsm_ctx = FSMContext.from_message(
        self.fsm_storage,
        message,
        strategy=self.fsm_strategy,
    )

    current_state = await fsm_ctx.get_state()
    if current_state is None:
        return False

    handler = self._cog_fsm_handlers.get(current_state)
    if handler is None:
        return False

    sig = inspect.signature(handler)
    kwargs = {}

    for param_name in sig.parameters:
        if param_name == "self":
            continue
        if param_name in ("ctx", "fsm"):
            kwargs[param_name] = fsm_ctx
        elif param_name in ("msg", "message"):
            kwargs[param_name] = message
        elif param_name == "data":
            kwargs[param_name] = await fsm_ctx.get_data()
        elif param_name == "state":
            kwargs[param_name] = current_state

    await handler(**kwargs)
    return True

get_fsm_states

get_fsm_states()

Получить все FSM-состояния, обрабатываемые этим когом.

Возвращает

Список строк с именами состояний

Source code in vkflow\commands\cog.py
def get_fsm_states(self) -> list[str]:
    """
    Получить все FSM-состояния, обрабатываемые этим когом.

    Возвращает:
        Список строк с именами состояний
    """
    return list(self._cog_fsm_handlers.keys())

Listener

Listener

Listener(callback, event_name=None)

A class that represents an event listener.

This allows you to listen to VK events, custom library events, and chat action events.

Attributes:

Name Type Description
callback

The coroutine that is executed when the event is received

event_name

The name of the event to listen to

is_raw

Whether this is a raw event listener

is_chat_action

Whether this listens to chat action events

Examples:

Standard VK events

@commands.listener() async def on_message_new(payload): print(f"New message: {payload}")

Chat action events (with wrapper class)

@commands.listener() async def on_member_join(event: MemberJoinEvent): print(f"User {event.member_id} joined")

Raw chat action events (with raw dict)

@commands.listener() async def on_raw_member_join(payload, member_id): print(f"User {member_id} joined")

Specific action type

@commands.listener() async def on_chat_invite_user(event: MemberJoinEvent, inviter_id): print(f"User invited by {inviter_id}")

Pin/unpin without chat_ prefix

@commands.listener() async def on_pin_message(event: PinMessageEvent, conversation_message_id): print(f"Message {conversation_message_id} pinned")

Source code in vkflow\commands\listener.py
def __init__(
    self,
    callback: Handler,
    event_name: str | None = None,
):
    self.callback = callback
    self.event_name, self.is_raw = self._determine_event_name(callback, event_name)

    self.is_chat_action = is_chat_action_event(self.event_name)
    self.action_types = get_action_types_for_event(self.event_name) if self.is_chat_action else []

    self.__cog_listener__ = True
    self.__cog_listener_name__ = self.event_name

    self._parse_signature()

__set_name__

__set_name__(owner, name)

Вызывается при назначении Listener атрибуту класса

Source code in vkflow\commands\listener.py
def __set_name__(self, owner, name):
    """Вызывается при назначении Listener атрибуту класса"""
    self._cog_class = owner
    self._method_name = name

__get__

__get__(instance, owner)

Протокол дескриптора -привязка callback при доступе через экземпляр

Source code in vkflow\commands\listener.py
def __get__(self, instance, owner):
    """Протокол дескриптора -привязка callback при доступе через экземпляр"""
    if instance is None:
        return self

    bound_listener = copy.copy(self)

    if hasattr(self.callback, "__get__") and not hasattr(self.callback, "__self__"):
        with contextlib.suppress(TypeError, AttributeError):
            bound_listener.callback = self.callback.__get__(instance, owner)

    return bound_listener

invoke async

invoke(event_storage)

Invoke the listener with the event data.

Parameters:

Name Type Description Default
event_storage NewEvent

The NewEvent storage containing event data

required
Source code in vkflow\commands\listener.py
async def invoke(self, event_storage: NewEvent):
    """
    Invoke the listener with the event data.

    Args:
        event_storage: The NewEvent storage containing event data
    """
    kwargs = {}

    payload = event_storage.event.object

    if self.is_raw:
        for param_name in self._params:
            if param_name == "payload":
                kwargs["payload"] = payload
            elif param_name == "event":
                kwargs["event"] = event_storage
            elif param_name == "bot":
                kwargs["bot"] = event_storage.bot
            elif isinstance(payload, dict) and param_name in payload:
                kwargs[param_name] = payload[param_name]
    else:
        for param_name, param_info in self._params.items():
            if param_name == "payload":
                kwargs["payload"] = payload
            elif param_name == "event":
                kwargs["event"] = event_storage
            elif param_name == "bot":
                kwargs["bot"] = event_storage.bot
            elif isinstance(payload, dict) and param_name in payload:
                kwargs[param_name] = payload[param_name]
            elif not param_info["has_default"]:
                pass

    result = self.callback(**kwargs)

    if inspect.iscoroutine(result):
        await result

invoke_chat_action async

invoke_chat_action(action_event, raw_action)

Invoke the listener with chat action event data.

Parameters:

Name Type Description Default
action_event ChatActionEvent

The ChatActionEvent wrapper instance

required
raw_action dict

The raw action dict from VK API

required
Source code in vkflow\commands\listener.py
async def invoke_chat_action(
    self,
    action_event: ChatActionEvent,
    raw_action: dict,
):
    """
    Invoke the listener with chat action event data.

    Args:
        action_event: The ChatActionEvent wrapper instance
        raw_action: The raw action dict from VK API
    """
    kwargs = {}

    for param_name in self._params:
        if self.is_raw:
            if param_name == "payload":
                kwargs["payload"] = raw_action
            elif param_name == "raw":
                kwargs["raw"] = raw_action
            elif param_name == "event":
                kwargs["event"] = action_event
            elif param_name == "ctx":
                kwargs["ctx"] = action_event.ctx
            elif param_name == "bot":
                kwargs["bot"] = action_event.bot
            elif param_name == "api":
                kwargs["api"] = action_event.api
            elif param_name in raw_action:
                kwargs[param_name] = raw_action[param_name]
            elif hasattr(action_event, param_name):
                kwargs[param_name] = getattr(action_event, param_name)
        else:
            if param_name == "event":
                kwargs["event"] = action_event
            elif param_name == "payload":
                kwargs["payload"] = raw_action
            elif param_name == "raw":
                kwargs["raw"] = raw_action
            elif param_name == "ctx":
                kwargs["ctx"] = action_event.ctx
            elif param_name == "bot":
                kwargs["bot"] = action_event.bot
            elif param_name == "api":
                kwargs["api"] = action_event.api
            elif hasattr(action_event, param_name):
                kwargs[param_name] = getattr(action_event, param_name)
            elif param_name in raw_action:
                kwargs[param_name] = raw_action[param_name]

    result = self.callback(**kwargs)

    if inspect.iscoroutine(result):
        await result

matches_action_type

matches_action_type(action_type)

Check if this listener should handle the given action type.

Parameters:

Name Type Description Default
action_type str

The VK action type (e.g., "chat_invite_user")

required

Returns:

Type Description
bool

True if this listener handles this action type

Source code in vkflow\commands\listener.py
def matches_action_type(self, action_type: str) -> bool:
    """
    Check if this listener should handle the given action type.

    Args:
        action_type: The VK action type (e.g., "chat_invite_user")

    Returns:
        True if this listener handles this action type
    """
    if not self.is_chat_action:
        return False
    return action_type in self.action_types

Loop

Loop

Loop(
    coro,
    *,
    seconds=0,
    minutes=0,
    hours=0,
    time=MISSING,
    cron=MISSING,
    count=None,
    reconnect=True
)

Bases: Generic[LF]

Source code in vkflow\commands\tasks.py
def __init__(
    self,
    coro: LF,
    *,
    seconds: float = 0,
    minutes: float = 0,
    hours: float = 0,
    time: datetime.time | Sequence[datetime.time] = MISSING,
    cron: str | MissingSentinel = MISSING,
    count: int | None = None,
    reconnect: bool = True,
) -> None:
    if not inspect.iscoroutinefunction(coro):
        raise TypeError(f"Expected coroutine function, not {type(coro).__name__!r}.")

    if count is not None and count <= 0:
        raise ValueError("count must be greater than 0 or None.")

    self.coro: LF = coro
    self.reconnect: bool = reconnect
    self.count: int | None = count

    self._current_loop = 0

    self._handle: SleepHandle | None = None
    self._task: asyncio.Task[None] | None = None

    self._injected: Any = None

    self._valid_exception = (
        OSError,
        aiohttp.ClientError,
        asyncio.TimeoutError,
    )

    self._before_loop = None
    self._after_loop = None

    self._error_handler = None
    self._is_being_cancelled = False
    self._has_failed = False

    self._stop_next_iteration = False
    self._last_iteration_failed = False

    self._last_iteration: datetime.datetime | None = None
    self._next_iteration: datetime.datetime | None = None

    self._time_index: int = 0

    self._seconds: float | MissingSentinel = MISSING
    self._minutes: float | MissingSentinel = MISSING
    self._hours: float | MissingSentinel = MISSING
    self._time: list[datetime.time] | MissingSentinel = MISSING
    self._interval_seconds: float | MissingSentinel = MISSING
    self._cron: str | MissingSentinel = MISSING

    self.change_interval(seconds=seconds, minutes=minutes, hours=hours, time=time, cron=cron)

cron property

cron

Cron-выражение, если задано.

Middleware

Middleware dataclass

Middleware(callback, priority=NORMAL, event_types=None)

Middleware, перехватывающий и обрабатывающий события/команды.

Middleware может использоваться для: - Логирования команд/событий - Аутентификации/авторизации - Модификации контекста перед выполнением команды - Глобальной обработки ошибок - Сбора аналитики

Пример

@app.middleware() async def logging_middleware(ctx: Context, call_next): print(f"Команда {ctx.command.name} запущена") result = await call_next() print(f"Команда {ctx.command.name} завершена") return result

MiddlewareManager

MiddlewareManager()

Управляет регистрацией и выполнением middleware.

Source code in vkflow\commands\middleware.py
def __init__(self):
    self._middlewares: list[Middleware] = []
    self._before_command_hooks: list[Handler] = []
    self._after_command_hooks: list[Handler] = []
    self._event_middlewares: dict[str, list[Middleware]] = {}

add_middleware

add_middleware(middleware)

Добавить middleware в менеджер.

Source code in vkflow\commands\middleware.py
def add_middleware(
    self,
    middleware: Middleware,
) -> None:
    """Добавить middleware в менеджер."""
    self._middlewares.append(middleware)
    self._middlewares.sort(key=lambda m: m.priority.value)

    if middleware.event_types:
        for event_type in middleware.event_types:
            if event_type not in self._event_middlewares:
                self._event_middlewares[event_type] = []
            self._event_middlewares[event_type].append(middleware)
            self._event_middlewares[event_type].sort(key=lambda m: m.priority.value)

remove_middleware

remove_middleware(middleware)

Удалить middleware из менеджера.

Source code in vkflow\commands\middleware.py
def remove_middleware(self, middleware: Middleware) -> bool:
    """Удалить middleware из менеджера."""
    if middleware in self._middlewares:
        self._middlewares.remove(middleware)

        if middleware.event_types:
            for event_type in middleware.event_types:
                if (
                    event_type in self._event_middlewares
                    and middleware in self._event_middlewares[event_type]
                ):
                    self._event_middlewares[event_type].remove(middleware)
        return True
    return False

add_before_command_hook

add_before_command_hook(hook)

Добавить хук, вызываемый перед выполнением команды.

Source code in vkflow\commands\middleware.py
def add_before_command_hook(self, hook: Handler) -> None:
    """Добавить хук, вызываемый перед выполнением команды."""
    self._before_command_hooks.append(hook)

add_after_command_hook

add_after_command_hook(hook)

Добавить хук, вызываемый после выполнения команды.

Source code in vkflow\commands\middleware.py
def add_after_command_hook(self, hook: Handler) -> None:
    """Добавить хук, вызываемый после выполнения команды."""
    self._after_command_hooks.append(hook)

get_middlewares_for_event

get_middlewares_for_event(event_type)

Получить все middleware, применимые к типу события.

Source code in vkflow\commands\middleware.py
def get_middlewares_for_event(self, event_type: str) -> list[Middleware]:
    """Получить все middleware, применимые к типу события."""
    global_middlewares = [m for m in self._middlewares if m.event_types is None]
    event_specific = self._event_middlewares.get(event_type, [])

    all_middlewares = global_middlewares + event_specific
    all_middlewares.sort(key=lambda m: m.priority.value)
    return all_middlewares

run_before_command_hooks async

run_before_command_hooks(ctx, **kwargs)

Выполнить все before_command хуки.

Возвращает

True если все хуки прошли, False если команда отменена

Source code in vkflow\commands\middleware.py
async def run_before_command_hooks(self, ctx: Context, **kwargs) -> bool:
    """
    Выполнить все before_command хуки.

    Возвращает:
        True если все хуки прошли, False если команда отменена
    """
    from vkflow.utils.inject import inject_and_call

    available = {"ctx": ctx, **kwargs}

    for hook in self._before_command_hooks:
        try:
            result = await inject_and_call(hook, available)

            if result is False:
                return False

        except Exception:
            traceback.print_exc()

    return True

run_after_command_hooks async

run_after_command_hooks(
    ctx, result=None, error=None, **kwargs
)

Выполнить все after_command хуки.

Source code in vkflow\commands\middleware.py
async def run_after_command_hooks(
    self, ctx: Context, result: typing.Any = None, error: Exception | None = None, **kwargs
) -> None:
    """Выполнить все after_command хуки."""
    from vkflow.utils.inject import inject_and_call

    available = {"ctx": ctx, "result": result, "error": error, **kwargs}

    for hook in self._after_command_hooks:
        try:
            await inject_and_call(hook, available)
        except Exception:
            traceback.print_exc()

MiddlewarePriority

Bases: Enum

Уровни приоритета для порядка выполнения middleware.

Checks

checks

Check dataclass

Check(predicate, error_message=None, delete_after=None)

Bases: BaseFilter

Проверка, которую можно применить к командам через декораторы

Пример

def is_admin(): async def predicate(ctx): admin_ids = [123456, 789012] return ctx.author in admin_ids return check(predicate)

@commands.command() @is_admin() async def admin_command(ctx: commands.Context): await ctx.send("You are an admin!")

make_decision async

make_decision(ctx, **kwargs)

Выполнить предикат проверки

Source code in vkflow\commands\checks.py
async def make_decision(self, ctx: NewMessage, **kwargs):
    """Выполнить предикат проверки"""
    from .context import Context
    from .cooldowns import OnCooldownError

    check_ctx = Context.from_message(ctx) if not isinstance(ctx, Context) else ctx

    try:
        if inspect.iscoroutinefunction(self.predicate):
            result = await self.predicate(check_ctx)
        else:
            result = self.predicate(check_ctx)

        if not result:
            if self.error_message:
                await check_ctx.send(self.error_message, delete_after=self.delete_after)
            raise StopCurrentHandlingError()

    except StopCurrentHandlingError:
        raise

    except OnCooldownError as e:
        e._ctx = check_ctx
        raise StopCurrentHandlingError() from e

    except CheckFailureError as e:
        if e.message:
            da = e.delete_after if e.delete_after is not None else self.delete_after
            await check_ctx.send(e.message, delete_after=da)

        raise StopCurrentHandlingError() from e

    except Exception as e:
        if self.error_message:
            await check_ctx.send(self.error_message, delete_after=self.delete_after)
        raise StopCurrentHandlingError() from e

__call__

__call__(func_or_command)

Позволяет использовать Check как декоратор

Source code in vkflow\commands\checks.py
def __call__(self, func_or_command):
    """Позволяет использовать Check как декоратор"""
    from .core import Command

    if isinstance(func_or_command, Command):
        if func_or_command.filter is None:
            func_or_command.filter = self
        else:
            func_or_command.filter = func_or_command.filter & self

        return func_or_command

    if not hasattr(func_or_command, "__vkflow_checks__"):
        func_or_command.__vkflow_checks__ = []

    func_or_command.__vkflow_checks__.append(self)
    return func_or_command

CheckFailureError dataclass

CheckFailureError(check, message=None, delete_after=None)

Bases: CommandError

Исключение при провале проверки команды

check

check(predicate, *, error_message=None, delete_after=None)

Создать проверку из функции-предиката

Parameters:

Name Type Description Default
predicate CheckFunction

Функция (синхронная или асинхронная), принимающая Context и возвращающая bool

required
error_message str | None

Необязательное сообщение об ошибке при провале проверки (отправляется пользователю)

None
delete_after int | float | None

Удалить сообщение об ошибке через указанное количество секунд. Если None -сообщение не удаляется автоматически.

None

Returns:

Type Description
Check

Экземпляр Check

Пример

def is_owner(): async def predicate(ctx): return ctx.author == OWNER_ID return check(predicate, error_message="Только владелец может это использовать!")

def is_premium(): def predicate(ctx): return ctx.author in PREMIUM_IDS return check(predicate, error_message="Только для премиум-пользователей!", delete_after=10)

is_not_bot = lambda: check( lambda ctx: ctx.author > 0, error_message="Боты не могут это использовать!" )

@commands.command() @is_owner() async def secret(ctx: commands.Context): await ctx.send("Секретная команда!")

Source code in vkflow\commands\checks.py
def check(
    predicate: CheckFunction,
    *,
    error_message: str | None = None,
    delete_after: int | float | None = None,
) -> Check:
    """
    Создать проверку из функции-предиката

    Args:
        predicate: Функция (синхронная или асинхронная), принимающая Context и возвращающая bool
        error_message: Необязательное сообщение об ошибке при провале проверки (отправляется пользователю)
        delete_after: Удалить сообщение об ошибке через указанное количество секунд.
            Если None -сообщение не удаляется автоматически.

    Returns:
        Экземпляр Check

    Пример:
        def is_owner():
            async def predicate(ctx):
                return ctx.author == OWNER_ID
            return check(predicate, error_message="Только владелец может это использовать!")

        def is_premium():
            def predicate(ctx):
                return ctx.author in PREMIUM_IDS
            return check(predicate, error_message="Только для премиум-пользователей!", delete_after=10)

        is_not_bot = lambda: check(
            lambda ctx: ctx.author > 0,
            error_message="Боты не могут это использовать!"
        )

        @commands.command()
        @is_owner()
        async def secret(ctx: commands.Context):
            await ctx.send("Секретная команда!")
    """
    return Check(predicate=predicate, error_message=error_message, delete_after=delete_after)

check_any

check_any(
    *checks,
    error_message="Ни одна из проверок не пройдена",
    delete_after=None
)

Объединить несколько проверок логикой ИЛИ

Хотя бы одна проверка должна пройти для выполнения команды

Parameters:

Name Type Description Default
*checks Check

Объекты Check для объединения

()
error_message str | None

Сообщение при провале всех проверок. По умолчанию "Ни одна из проверок не пройдена".

'Ни одна из проверок не пройдена'
delete_after int | float | None

Удалить сообщение об ошибке через указанное количество секунд.

None

Returns:

Type Description
Check

Экземпляр Check

Пример

@commands.command() @check_any(is_owner(), is_admin(), error_message="Нужны права владельца или админа!") async def manage(ctx: commands.Context): await ctx.send("Вы владелец или админ!")

Source code in vkflow\commands\checks.py
def check_any(
    *checks: Check,
    error_message: str | None = "Ни одна из проверок не пройдена",
    delete_after: int | float | None = None,
) -> Check:
    """
    Объединить несколько проверок логикой ИЛИ

    Хотя бы одна проверка должна пройти для выполнения команды

    Args:
        *checks: Объекты Check для объединения
        error_message: Сообщение при провале всех проверок.
            По умолчанию "Ни одна из проверок не пройдена".
        delete_after: Удалить сообщение об ошибке через указанное количество секунд.

    Returns:
        Экземпляр Check

    Пример:
        @commands.command()
        @check_any(is_owner(), is_admin(), error_message="Нужны права владельца или админа!")
        async def manage(ctx: commands.Context):
            await ctx.send("Вы владелец или админ!")
    """

    async def predicate(ctx: Context) -> bool:
        for check_obj in checks:
            try:
                if inspect.iscoroutinefunction(check_obj.predicate):
                    result = await check_obj.predicate(ctx)
                else:
                    result = check_obj.predicate(ctx)
                if result:
                    return True
            except Exception:
                continue
        return False

    return check(predicate, error_message=error_message, delete_after=delete_after)

is_admin

is_admin(*, delete_after=None)

Проверка, является ли автор сообщения администратором чата.

Проверяет через messages.getConversationMembers (is_admin / is_owner). Результат кэшируется на 5 минут.

В личных сообщениях -всегда False.

Parameters:

Name Type Description Default
delete_after int | float | None

Удалить сообщение об ошибке через указанное количество секунд.

None

Returns:

Type Description
Check

Экземпляр Check

Пример

@commands.command() @is_admin() async def admin_cmd(ctx: commands.Context): await ctx.send("Команда для админов!")

@commands.command() @is_admin(delete_after=10) async def admin_cmd(ctx: commands.Context): await ctx.send("Команда для админов!")

Source code in vkflow\commands\checks.py
def is_admin(*, delete_after: int | float | None = None) -> Check:
    """
    Проверка, является ли автор сообщения администратором чата.

    Проверяет через messages.getConversationMembers (is_admin / is_owner).
    Результат кэшируется на 5 минут.

    В личных сообщениях -всегда False.

    Args:
        delete_after: Удалить сообщение об ошибке через указанное количество секунд.

    Returns:
        Экземпляр Check

    Пример:
        @commands.command()
        @is_admin()
        async def admin_cmd(ctx: commands.Context):
            await ctx.send("Команда для админов!")

        @commands.command()
        @is_admin(delete_after=10)
        async def admin_cmd(ctx: commands.Context):
            await ctx.send("Команда для админов!")
    """

    async def predicate(ctx: Context) -> bool:
        if ctx.peer_id <= 2000000000:
            return False

        cache_key = str(ctx.peer_id)
        cached = await _admin_cache.get(cache_key)

        if cached is not None:
            return ctx.author in cached

        response = await ctx.api.messages.get_conversation_members(
            peer_id=ctx.peer_id,
            extended=False,
        )

        admin_ids: set[int] = set()
        for item in response.get("items", []):
            if item.get("is_admin") or item.get("is_owner"):
                admin_ids.add(item["member_id"])

        await _admin_cache.set(cache_key, admin_ids)
        return ctx.author in admin_ids

    return check(
        predicate,
        error_message="Вы должны быть администратором, чтобы использовать эту команду!",
        delete_after=delete_after,
    )

is_owner

is_owner(*, delete_after=None)

Проверка, является ли автор сообщения владельцем или администратором группы.

Определяет group_id автоматически из токена бота и проверяет роль автора через groups.getMembers (role: creator, administrator). Результат кэшируется на 10 минут.

Работает только с групповым токеном. С пользовательским -всегда False.

Parameters:

Name Type Description Default
delete_after int | float | None

Удалить сообщение об ошибке через указанное количество секунд.

None

Returns:

Type Description
Check

Экземпляр Check

Пример

@commands.command() @is_owner() async def owner_cmd(ctx: commands.Context): await ctx.send("Команда владельца!")

@commands.command() @is_owner(delete_after=5) async def owner_cmd(ctx: commands.Context): await ctx.send("Команда владельца!")

Source code in vkflow\commands\checks.py
def is_owner(*, delete_after: int | float | None = None) -> Check:
    """
    Проверка, является ли автор сообщения владельцем или администратором группы.

    Определяет group_id автоматически из токена бота и проверяет
    роль автора через groups.getMembers (role: creator, administrator).
    Результат кэшируется на 10 минут.

    Работает только с групповым токеном. С пользовательским -всегда False.

    Args:
        delete_after: Удалить сообщение об ошибке через указанное количество секунд.

    Returns:
        Экземпляр Check

    Пример:
        @commands.command()
        @is_owner()
        async def owner_cmd(ctx: commands.Context):
            await ctx.send("Команда владельца!")

        @commands.command()
        @is_owner(delete_after=5)
        async def owner_cmd(ctx: commands.Context):
            await ctx.send("Команда владельца!")
    """

    async def predicate(ctx: Context) -> bool:
        from vkflow.api import TokenOwner

        token_owner, owner = await ctx.api.define_token_owner()
        if token_owner != TokenOwner.GROUP:
            return False

        group_id = owner.id
        cache_key = str(group_id)
        cached = await _owner_cache.get(cache_key)

        if cached is not None:
            return ctx.author in cached

        response = await ctx.api.method(
            "groups.get_members",
            group_id=group_id,
            filter="managers",
        )

        owner_ids: set[int] = set()
        for member in response.get("items", []):
            if member.get("role") in ("creator", "administrator"):
                owner_ids.add(member["id"])

        await _owner_cache.set(cache_key, owner_ids)
        return ctx.author in owner_ids

    return check(
        predicate,
        error_message="Только владелец может использовать эту команду!",
        delete_after=delete_after,
    )

is_private_message

is_private_message(*, delete_after=None)

Проверка, что сообщение из личного чата

Parameters:

Name Type Description Default
delete_after int | float | None

Удалить сообщение об ошибке через указанное количество секунд.

None

Returns:

Type Description
Check

Экземпляр Check

Пример

@commands.command() @is_private_message() async def private_cmd(ctx: commands.Context): await ctx.send("Это личное сообщение!")

Source code in vkflow\commands\checks.py
def is_private_message(*, delete_after: int | float | None = None) -> Check:
    """
    Проверка, что сообщение из личного чата

    Args:
        delete_after: Удалить сообщение об ошибке через указанное количество секунд.

    Returns:
        Экземпляр Check

    Пример:
        @commands.command()
        @is_private_message()
        async def private_cmd(ctx: commands.Context):
            await ctx.send("Это личное сообщение!")
    """

    async def predicate(ctx: Context) -> bool:
        return ctx.peer_id == ctx.author

    return check(
        predicate,
        error_message="Эта команда доступна только в личных сообщениях!",
        delete_after=delete_after,
    )

is_group_chat

is_group_chat(*, delete_after=None)

Проверка, что сообщение из группового чата

Parameters:

Name Type Description Default
delete_after int | float | None

Удалить сообщение об ошибке через указанное количество секунд.

None

Returns:

Type Description
Check

Экземпляр Check

Пример

@commands.command() @is_group_chat() async def group_cmd(ctx: commands.Context): await ctx.send("Это групповой чат!")

Source code in vkflow\commands\checks.py
def is_group_chat(*, delete_after: int | float | None = None) -> Check:
    """
    Проверка, что сообщение из группового чата

    Args:
        delete_after: Удалить сообщение об ошибке через указанное количество секунд.

    Returns:
        Экземпляр Check

    Пример:
        @commands.command()
        @is_group_chat()
        async def group_cmd(ctx: commands.Context):
            await ctx.send("Это групповой чат!")
    """

    async def predicate(ctx: Context) -> bool:
        return ctx.peer_id != ctx.author and ctx.peer_id > 2000000000

    return check(
        predicate, error_message="Эта команда доступна только в групповых чатах!", delete_after=delete_after
    )

cooldown

cooldown(rate, per, type=None, *, delete_after=None)

Применить cooldown к команде.

Parameters:

Name Type Description Default
rate int

Количество раз, сколько команда может быть использована

required
per float | int

Временной период в секундах (int/float/timedelta)

required
type BucketType | None

BucketType (DEFAULT, USER, CHAT, MEMBER)

None
delete_after int | float | None

Удалить сообщение об ошибке через указанное количество секунд.

None

Returns:

Type Description
Check

Экземпляр Check

Пример

from vkflow.commands import BucketType

@commands.command() @commands.cooldown(rate=3, per=60.0, type=BucketType.USER) async def spam_cmd(ctx: commands.Context): await ctx.send("Спам!")

@commands.command() @commands.cooldown(rate=1, per=30, type=BucketType.MEMBER) async def limited(ctx: commands.Context): await ctx.send("Ограниченная команда!")

Source code in vkflow\commands\checks.py
def cooldown(
    rate: int, per: float | int, type: BucketType | None = None, *, delete_after: int | float | None = None
) -> Check:
    """
    Применить cooldown к команде.

    Args:
        rate: Количество раз, сколько команда может быть использована
        per: Временной период в секундах (int/float/timedelta)
        type: BucketType (DEFAULT, USER, CHAT, MEMBER)
        delete_after: Удалить сообщение об ошибке через указанное количество секунд.

    Returns:
        Экземпляр Check

    Пример:
        from vkflow.commands import BucketType

        @commands.command()
        @commands.cooldown(rate=3, per=60.0, type=BucketType.USER)
        async def spam_cmd(ctx: commands.Context):
            await ctx.send("Спам!")

        @commands.command()
        @commands.cooldown(rate=1, per=30, type=BucketType.MEMBER)
        async def limited(ctx: commands.Context):
            await ctx.send("Ограниченная команда!")
    """
    from .cooldowns import BucketType, Cooldown, CooldownMapping, OnCooldownError

    if type is None:
        type = BucketType.USER

    mapping = CooldownMapping(cooldown=Cooldown(rate=rate, per=per), type=type)

    async def predicate(ctx: Context) -> bool:
        retry_after = await mapping.acquire(ctx)

        if retry_after is not None:
            raise OnCooldownError(mapping._cooldown, retry_after, mapping.type)

        return True

    check_obj = check(predicate, error_message=None, delete_after=delete_after)
    check_obj._cooldown_mapping = mapping

    return check_obj

max_concurrency

max_concurrency(number, per=None)

Ограничить количество одновременных выполнений команды.

Parameters:

Name Type Description Default
number int

Максимальное количество одновременных выполнений

required
per BucketType | None

BucketType (DEFAULT, USER, CHAT, MEMBER)

None

Returns:

Type Description
Callable

Функция-декоратор

Пример

from vkflow.commands import BucketType

@commands.command() @commands.max_concurrency(2, BucketType.CHAT) async def heavy(ctx: commands.Context): await asyncio.sleep(10) await ctx.send("Готово!")

@commands.command() @commands.max_concurrency(1, BucketType.USER) async def unique(ctx: commands.Context): await ctx.send("Выполняется!")

Source code in vkflow\commands\checks.py
def max_concurrency(number: int, per: BucketType | None = None) -> typing.Callable:
    """
    Ограничить количество одновременных выполнений команды.

    Args:
        number: Максимальное количество одновременных выполнений
        per: BucketType (DEFAULT, USER, CHAT, MEMBER)

    Returns:
        Функция-декоратор

    Пример:
        from vkflow.commands import BucketType

        @commands.command()
        @commands.max_concurrency(2, BucketType.CHAT)
        async def heavy(ctx: commands.Context):
            await asyncio.sleep(10)
            await ctx.send("Готово!")

        @commands.command()
        @commands.max_concurrency(1, BucketType.USER)
        async def unique(ctx: commands.Context):
            await ctx.send("Выполняется!")
    """
    from .cooldowns import BucketType, MaxConcurrency, MaxConcurrencyMapping

    if per is None:
        per = BucketType.DEFAULT

    mapping = MaxConcurrencyMapping(max_concurrency=MaxConcurrency(number=number, per=per))

    def decorator(func):
        func.__max_concurrency__ = mapping
        return func

    return decorator

BucketType

BucketType

Bases: Enum

Перечисление типов бакетов для cooldown.

Каждый тип определяет, как отслеживаются cooldown: - DEFAULT: Глобальный cooldown для всех пользователей и чатов - USER: Cooldown на пользователя (один пользователь во всех чатах) - CHAT: Cooldown на чат (один peer_id) - MEMBER: Cooldown на участника (комбинация пользователя и чата)

Пример

@commands.cooldown(rate=3, per=60, type=commands.BucketType.USER) async def spam_cmd(ctx: commands.Context): await ctx.send("Команда выполнена!")