Команды¶
command¶
command
¶
command(
name=None,
*,
aliases=None,
prefixes=None,
filter=None,
description=None,
help=None,
brief=None,
usage=None,
enabled=True,
hidden=False,
routing_re_flags=IGNORECASE,
exclude_from_autodoc=False,
invalid_argument_config=None,
**kwargs
)
Decorator, преобразующий корутину в Command.
Аргументы
name: Имя команды. Если не указано, используется имя функции
aliases: Альтернативные имена команды
prefixes: Префиксы команды (переопределяют префиксы пакета)
filter: Фильтр для применения к команде
description: Подробное описание команды
help: Текст справки команды
brief: Краткое описание команды
usage: Примеры использования команды (например, "
Возвращает
Функцию-decorator
Пример
@commands.command() async def hello(ctx: Context): '''Поздороваться''' await ctx.send("Hello!")
@commands.command(name="greet", aliases=["hi"], usage="
Source code in vkflow\commands\core.py
1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 | |
group¶
group
¶
Decorator, преобразующий корутину в Group.
Аргументы
name: Имя группы. Если не указано, используется имя функции aliases: Альтернативные имена группы **kwargs: Дополнительные параметры (те же, что у decorator command)
Возвращает
Функцию-decorator
Пример
@commands.group() async def config(ctx: Context): '''Команды конфигурации''' pass
@config.command() async def show(ctx: Context): '''Показать конфигурацию''' await ctx.send("Config: ...")
С псевдонимами¶
@commands.group(aliases=["cfg", "conf"]) async def config(ctx: Context): pass
Source code in vkflow\commands\core.py
Command¶
Command
¶
Bases: Command
Класс, представляющий команду в фреймворке ext.commands.
Атрибуты
callback: Корутина, выполняемая при вызове команды name: Имя команды aliases: Псевдонимы команды help: Текст справки команды brief: Краткое описание команды enabled: Включена ли команда hidden: Скрыта ли команда из справки usage: Примеры использования команды parent: Родительская Group, если команда является подкомандой
Source code in vkflow\commands\core.py
parents
property
¶
__hash__
¶
__eq__
¶
__get__
¶
Привязка обработчиков кулдауна и хуков к экземпляру
Source code in vkflow\commands\core.py
on_cooldown
¶
Decorator для регистрации обработчика cooldown для этой команды.
Этот обработчик вызывается, когда команда находится на cooldown.
Обработчик может принимать следующие необязательные параметры: - ctx (Context): Контекст команды - error (OnCooldownError): Исключение OnCooldownError - remaining (float): Секунды до окончания cooldown (то же, что error.retry_after)
Возвращает
Функцию-decorator
Пример
@commands.command() @commands.cooldown(rate=3, per=60, type=BucketType.USER) async def test(ctx: Context): await ctx.send("Command executed!")
С параметром error¶
@test.on_cooldown() async def on_test_cooldown(ctx: Context, error: OnCooldownError): await ctx.send(f"Try again in {error.retry_after:.1f} seconds")
С параметром remaining (удобнее)¶
@test.on_cooldown() async def on_test_cooldown(ctx: Context, remaining: float): await ctx.send(f"Wait {remaining:.1f}s")
Минимальный вариант -только ctx¶
@test.on_cooldown() async def on_test_cooldown(ctx: Context): await ctx.send("On cooldown!")
Source code in vkflow\commands\core.py
on_max_concurrency
¶
Decorator для регистрации обработчика max concurrency для этой команды.
Этот обработчик вызывается при достижении лимита одновременных выполнений.
Обработчик может принимать следующие необязательные параметры: - ctx (Context): Контекст команды - error (MaxConcurrencyReachedError): Исключение MaxConcurrencyReachedError - limit (int): Максимально допустимое количество одновременных выполнений - current (int): Текущее количество активных выполнений
Возвращает
Функцию-decorator
Пример
@commands.command() @commands.max_concurrency(2, BucketType.CHAT) async def heavy(ctx: Context): await asyncio.sleep(10) await ctx.send("Done!")
С параметром error¶
@heavy.on_max_concurrency() async def on_heavy_concurrency(ctx: Context, error: MaxConcurrencyReachedError): await ctx.send(f"Limit reached: {error.current}/{error.number}")
С параметрами limit и current (удобнее)¶
@heavy.on_max_concurrency() async def on_heavy_concurrency(ctx: Context, limit: int, current: int): await ctx.send(f"Too many uses: {current}/{limit}")
Минимальный вариант -только ctx¶
@heavy.on_max_concurrency() async def on_heavy_concurrency(ctx: Context): await ctx.send("Too many concurrent executions!")
Source code in vkflow\commands\core.py
before_invoke
¶
Decorator для регистрации хука before_invoke для этой команды.
Этот хук вызывается перед выполнением обработчика команды. Если хук возвращает False, выполнение команды будет отменено.
Обработчик может принимать следующие необязательные параметры: - ctx (Context): Контекст команды - А также любые распарсенные аргументы команды с теми же именами
Возвращает
Функцию-decorator
Пример
@commands.command() async def test(ctx: Context, user: User): await ctx.send(f"Hello, {user.name}!")
@test.before_invoke() async def before_test(ctx: Context): print(f"Command {ctx.command.name} is about to run")
С аргументами¶
@test.before_invoke() async def before_test(ctx: Context, user: User): print(f"About to greet {user.name}")
Верните False для отмены команды¶
@test.before_invoke() async def check_permissions(ctx: Context): if not await has_permission(ctx.author): await ctx.send("No permission!") return False
Source code in vkflow\commands\core.py
after_invoke
¶
Decorator для регистрации хука after_invoke для этой команды.
Этот хук вызывается после завершения обработчика команды (успешно или с ошибкой).
Обработчик может принимать следующие необязательные параметры: - ctx (Context): Контекст команды - result: Возвращаемое значение обработчика команды - error (Exception | None): Исключение при ошибке команды, None при успехе - А также любые распарсенные аргументы команды с теми же именами
Возвращает
Функцию-decorator
Пример
@commands.command() async def test(ctx: Context): return "success"
@test.after_invoke() async def after_test(ctx: Context, result): print(f"Command returned: {result}")
@test.after_invoke() async def after_test(ctx: Context, error): if error: print(f"Command failed: {error}") else: print("Command succeeded")
Source code in vkflow\commands\core.py
reset_cooldown
¶
Сбросить cooldown(ы) для этой команды.
Аргументы
ctx: Необязательный Context, NewMessage или None - Если Context/NewMessage: сбросить cooldown для конкретного пользователя/чата по типу bucket - Если None и нет других аргументов: сбросить ВСЕ cooldown для этой команды type: Необязательный BucketType для фильтрации сбрасываемых cooldown - Если указан, сбрасывает только cooldown с этим типом - Если None, сбрасывает все cooldown user: ID пользователя для сброса cooldown (только для типов USER/MEMBER) chat: ID чата/peer для сброса cooldown (только для типов CHAT/MEMBER)
Примеры
Сбросить все cooldown для всех пользователей/чатов¶
command.reset_cooldown()
Сбросить cooldown для конкретного пользователя (из контекста)¶
command.reset_cooldown(ctx)
Сбросить только cooldown типа USER¶
command.reset_cooldown(type=BucketType.USER)
Сбросить cooldown для конкретного ID пользователя¶
command.reset_cooldown(user=123456)
Сбросить cooldown для конкретного чата¶
command.reset_cooldown(chat=2000000001)
Сбросить cooldown для конкретного участника (пользователь в чате)¶
command.reset_cooldown(user=123456, chat=2000000001)
Сбросить только CHAT cooldown для конкретного чата¶
command.reset_cooldown(type=BucketType.CHAT, chat=2000000001)
В другой команде¶
@commands.command() @is_admin() async def reset(self, ctx: commands.Context, cmd_name: str): cmd = getattr(self, cmd_name, None) if cmd: cmd.reset_cooldown() await ctx.send(f"Cooldown for {cmd_name} reset!")
Source code in vkflow\commands\core.py
handle_message
async
¶
Обработка сообщения с поддержкой max_concurrency
Source code in vkflow\commands\core.py
Group¶
Group
¶
Bases: GroupMixin, Command
Класс, представляющий группу команд.
Наследуется от GroupMixin (управление подкомандами) и Command.
Атрибуты
invoke_without_command: Вызывать ли обработчик группы, если подкоманда не найдена
Пример
@commands.group() async def config(ctx: Context): '''Команды конфигурации''' if ctx.invoked_subcommand is None: await ctx.send("Неверная подкоманда")
@config.command() async def show(ctx: Context): '''Показать конфигурацию''' await ctx.send("Config: ...")
@commands.group(aliases=["cfg", "conf"]) async def config(ctx: Context): pass
Source code in vkflow\commands\core.py
__get__
¶
Привязка подкоманд к экземпляру
Source code in vkflow\commands\core.py
update_prefix
¶
handle_message
async
¶
Обработка сообщения с поддержкой подкоманд
Source code in vkflow\commands\core.py
982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 | |
Cog¶
Cog
¶
Базовый класс для создания когов -коллекций команд и обработчиков.
Система когов позволяет организовать функциональность бота в отдельные модули.
Атрибуты
qualified_name: Имя кога (по умолчанию -имя класса) description: Описание кога (по умолчанию -docstring класса) app: Экземпляр App (инжектируется при добавлении кога) bot: Экземпляр Bot (инжектируется при добавлении кога)
Примеры
app.add_cog(SimpleCog()) app.add_cog(CounterCog())
Инициализация Cog.
Этот метод может быть переопределён в подклассах БЕЗ вызова super().init(). Базовая инициализация выполняется автоматически в new.
Для обратной совместимости вызов super().init() по-прежнему работает, но не является обязательным.
Source code in vkflow\commands\cog.py
__new__
¶
Создать новый экземпляр Cog и выполнить базовую инициализацию.
Этот метод вызывается ДО init и гарантирует, что базовая инициализация всегда происходит, даже если пользователь не вызывает super().init().
Это ключ к тому, чтобы super().init() был необязательным!
Source code in vkflow\commands\cog.py
listener
classmethod
¶
Декоратор для пометки метода как обработчика событий.
Аргументы
name: Имя события для прослушивания. Если не указано, используется имя метода
Пример
@commands.Cog.listener() async def on_message_new(self, event): print(f"Новое сообщение: {event}")
@commands.Cog.listener("message_new") async def handle_message(self, event): print(f"Обработка: {event}")
Source code in vkflow\commands\cog.py
get_commands
¶
walk_commands
¶
Итерация по всем командам в этом коге, включая подкоманды групп.
Yields:
| Type | Description |
|---|---|
Command
|
Объекты Command |
Source code in vkflow\commands\cog.py
cog_load
async
¶
Вызывается при загрузке Cog в App.
Это асинхронный хук, вызываемый после регистрации Cog и добавления всех команд/обработчиков в App.
Переопределите этот метод для пользовательской логики инициализации: - Загрузка данных из базы данных - Установка соединений - Запуск фоновых задач
Пример
class MyCog(commands.Cog): async def cog_load(self): print(f"Ког {self.qualified_name} загружен!") self.db = await connect_to_database()
Source code in vkflow\commands\cog.py
cog_unload
async
¶
Вызывается при выгрузке Cog из App.
Это асинхронный хук, вызываемый перед удалением Cog и снятием регистрации его команд/обработчиков из App.
Переопределите этот метод для пользовательской логики очистки: - Закрытие соединений с базой данных - Сохранение состояния - Отмена фоновых задач
Пример
class MyCog(commands.Cog): async def cog_unload(self): print(f"Ког {self.qualified_name} выгружается...") await self.db.close()
Source code in vkflow\commands\cog.py
cog_check
async
¶
Глобальная проверка, применяемая ко всем командам в этом коге.
Эта проверка вызывается для каждой команды в коге до оценки собственных фильтров/проверок команды.
Аргументы
ctx: Контекст команды (Context)
Возвращает
True, если команда должна быть выполнена, False иначе
Пример
class AdminCog(commands.Cog): async def cog_check(self, ctx): return ctx.author in self.admin_ids
Source code in vkflow\commands\cog.py
cog_before_invoke
async
¶
Вызывается перед выполнением любой команды в этом коге.
Этот хук вызывается после прохождения всех проверок, но до выполнения handler команды. Если метод вернёт False, команда будет отменена.
Аргументы
ctx: Контекст команды (Context)
Возвращает
True для продолжения выполнения, False для отмены
Пример
class MyCog(commands.Cog): async def cog_before_invoke(self, ctx): print(f"Запускаю {ctx.command.name}")
Source code in vkflow\commands\cog.py
cog_after_invoke
async
¶
Вызывается после завершения любой команды в этом коге (успех или ошибка).
Этот хук вызывается всегда, независимо от того, завершилась команда успешно или вызвала исключение.
Аргументы
ctx: Контекст команды (Context) result: Возвращаемое значение команды (None, если команда упала) error: Исключение, если команда упала, None при успехе
Пример
class MyCog(commands.Cog): async def cog_after_invoke(self, ctx, result, error): if error: print(f"Команда {ctx.command.name} упала: {error}") else: print(f"Команда {ctx.command.name} выполнена")
Source code in vkflow\commands\cog.py
cog_command_error
async
¶
Вызывается при возникновении ошибки в любой команде этого кога.
Этот обработчик выполняется параллельно с обычным потоком обработки ошибок и НЕ влияет на то, будет ли ошибка проброшена или обработана. Полезен для логирования, метрик или уведомлений.
Аргументы
ctx: Контекст команды (Context) error: Возникшее исключение
Пример
class MyCog(commands.Cog): async def cog_command_error(self, ctx, error): print(f"Ошибка в {ctx.command.name}: {error}") await self.send_error_to_admin(error)
Source code in vkflow\commands\cog.py
cog_command_fallback
async
¶
Вызывается, когда ошибка команды не обработана ни одним обработчиком ошибок.
Это fallback, который выполняется ПОСЛЕ проверки всех обработчиков ошибок конкретной команды. Если этот метод вызван, значит ни один обработчик @command.on_error() не подошёл к ошибке.
В отличие от cog_command_error, может «обработать» ошибку и предотвратить её повторный проброс.
Аргументы
ctx: Контекст команды (Context) error: Возникшее исключение
Пример
class MyCog(commands.Cog): async def cog_command_fallback(self, ctx, error): await ctx.send(f"Произошла ошибка: {type(error).name}")
Source code in vkflow\commands\cog.py
get_fsm
¶
Получить FSMContext для данного контекста.
Аргументы
ctx: Context, NewMessage или CallbackButtonPressed strategy: Опциональное переопределение стратегии ключа
Возвращает
Экземпляр FSMContext
Исключения
ValueError: Если fsm_storage не настроен
Пример
@commands.command() async def start_order(self, ctx): fsm = self.get_fsm(ctx) await fsm.set_state(OrderStates.waiting_name) await ctx.send("Введите ваше имя:")
Source code in vkflow\commands\cog.py
process_fsm
async
¶
Обработать сообщение через FSM-обработчики в этом коге.
Аргументы
message: NewMessage для обработки
Возвращает
True, если обработчик был вызван, False иначе
Пример
В App.route_message или пользовательском обработчике:¶
for cog in self.cogs.values(): if await cog.process_fsm(message): return # FSM обработал сообщение
Source code in vkflow\commands\cog.py
get_fsm_states
¶
Получить все FSM-состояния, обрабатываемые этим когом.
Возвращает
Список строк с именами состояний
Listener¶
Listener
¶
A class that represents an event listener.
This allows you to listen to VK events, custom library events, and chat action events.
Attributes:
| Name | Type | Description |
|---|---|---|
callback |
The coroutine that is executed when the event is received |
|
event_name |
The name of the event to listen to |
|
is_raw |
Whether this is a raw event listener |
|
is_chat_action |
Whether this listens to chat action events |
Examples:
Standard VK events¶
@commands.listener() async def on_message_new(payload): print(f"New message: {payload}")
Chat action events (with wrapper class)¶
@commands.listener() async def on_member_join(event: MemberJoinEvent): print(f"User {event.member_id} joined")
Raw chat action events (with raw dict)¶
@commands.listener() async def on_raw_member_join(payload, member_id): print(f"User {member_id} joined")
Specific action type¶
@commands.listener() async def on_chat_invite_user(event: MemberJoinEvent, inviter_id): print(f"User invited by {inviter_id}")
Pin/unpin without chat_ prefix¶
@commands.listener() async def on_pin_message(event: PinMessageEvent, conversation_message_id): print(f"Message {conversation_message_id} pinned")
Source code in vkflow\commands\listener.py
__set_name__
¶
__get__
¶
Протокол дескриптора -привязка callback при доступе через экземпляр
Source code in vkflow\commands\listener.py
invoke
async
¶
Invoke the listener with the event data.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event_storage
|
NewEvent
|
The NewEvent storage containing event data |
required |
Source code in vkflow\commands\listener.py
invoke_chat_action
async
¶
Invoke the listener with chat action event data.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
action_event
|
ChatActionEvent
|
The ChatActionEvent wrapper instance |
required |
raw_action
|
dict
|
The raw action dict from VK API |
required |
Source code in vkflow\commands\listener.py
matches_action_type
¶
Check if this listener should handle the given action type.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
action_type
|
str
|
The VK action type (e.g., "chat_invite_user") |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if this listener handles this action type |
Source code in vkflow\commands\listener.py
Loop¶
Loop
¶
Loop(
coro,
*,
seconds=0,
minutes=0,
hours=0,
time=MISSING,
cron=MISSING,
count=None,
reconnect=True
)
Bases: Generic[LF]
Source code in vkflow\commands\tasks.py
Middleware¶
Middleware
dataclass
¶
Middleware, перехватывающий и обрабатывающий события/команды.
Middleware может использоваться для: - Логирования команд/событий - Аутентификации/авторизации - Модификации контекста перед выполнением команды - Глобальной обработки ошибок - Сбора аналитики
Пример
@app.middleware() async def logging_middleware(ctx: Context, call_next): print(f"Команда {ctx.command.name} запущена") result = await call_next() print(f"Команда {ctx.command.name} завершена") return result
MiddlewareManager
¶
Управляет регистрацией и выполнением middleware.
Source code in vkflow\commands\middleware.py
add_middleware
¶
Добавить middleware в менеджер.
Source code in vkflow\commands\middleware.py
remove_middleware
¶
Удалить middleware из менеджера.
Source code in vkflow\commands\middleware.py
add_before_command_hook
¶
add_after_command_hook
¶
get_middlewares_for_event
¶
Получить все middleware, применимые к типу события.
Source code in vkflow\commands\middleware.py
run_before_command_hooks
async
¶
Выполнить все before_command хуки.
Возвращает
True если все хуки прошли, False если команда отменена
Source code in vkflow\commands\middleware.py
run_after_command_hooks
async
¶
Выполнить все after_command хуки.
Source code in vkflow\commands\middleware.py
MiddlewarePriority
¶
Bases: Enum
Уровни приоритета для порядка выполнения middleware.
Checks¶
checks
¶
Check
dataclass
¶
Bases: BaseFilter
Проверка, которую можно применить к командам через декораторы
Пример
def is_admin(): async def predicate(ctx): admin_ids = [123456, 789012] return ctx.author in admin_ids return check(predicate)
@commands.command() @is_admin() async def admin_command(ctx: commands.Context): await ctx.send("You are an admin!")
make_decision
async
¶
Выполнить предикат проверки
Source code in vkflow\commands\checks.py
__call__
¶
Позволяет использовать Check как декоратор
Source code in vkflow\commands\checks.py
CheckFailureError
dataclass
¶
Bases: CommandError
Исключение при провале проверки команды
check
¶
Создать проверку из функции-предиката
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
predicate
|
CheckFunction
|
Функция (синхронная или асинхронная), принимающая Context и возвращающая bool |
required |
error_message
|
str | None
|
Необязательное сообщение об ошибке при провале проверки (отправляется пользователю) |
None
|
delete_after
|
int | float | None
|
Удалить сообщение об ошибке через указанное количество секунд. Если None -сообщение не удаляется автоматически. |
None
|
Returns:
| Type | Description |
|---|---|
Check
|
Экземпляр Check |
Пример
def is_owner(): async def predicate(ctx): return ctx.author == OWNER_ID return check(predicate, error_message="Только владелец может это использовать!")
def is_premium(): def predicate(ctx): return ctx.author in PREMIUM_IDS return check(predicate, error_message="Только для премиум-пользователей!", delete_after=10)
is_not_bot = lambda: check( lambda ctx: ctx.author > 0, error_message="Боты не могут это использовать!" )
@commands.command() @is_owner() async def secret(ctx: commands.Context): await ctx.send("Секретная команда!")
Source code in vkflow\commands\checks.py
check_any
¶
Объединить несколько проверок логикой ИЛИ
Хотя бы одна проверка должна пройти для выполнения команды
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*checks
|
Check
|
Объекты Check для объединения |
()
|
error_message
|
str | None
|
Сообщение при провале всех проверок. По умолчанию "Ни одна из проверок не пройдена". |
'Ни одна из проверок не пройдена'
|
delete_after
|
int | float | None
|
Удалить сообщение об ошибке через указанное количество секунд. |
None
|
Returns:
| Type | Description |
|---|---|
Check
|
Экземпляр Check |
Пример
@commands.command() @check_any(is_owner(), is_admin(), error_message="Нужны права владельца или админа!") async def manage(ctx: commands.Context): await ctx.send("Вы владелец или админ!")
Source code in vkflow\commands\checks.py
is_admin
¶
Проверка, является ли автор сообщения администратором чата.
Проверяет через messages.getConversationMembers (is_admin / is_owner). Результат кэшируется на 5 минут.
В личных сообщениях -всегда False.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
delete_after
|
int | float | None
|
Удалить сообщение об ошибке через указанное количество секунд. |
None
|
Returns:
| Type | Description |
|---|---|
Check
|
Экземпляр Check |
Пример
@commands.command() @is_admin() async def admin_cmd(ctx: commands.Context): await ctx.send("Команда для админов!")
@commands.command() @is_admin(delete_after=10) async def admin_cmd(ctx: commands.Context): await ctx.send("Команда для админов!")
Source code in vkflow\commands\checks.py
is_owner
¶
Проверка, является ли автор сообщения владельцем или администратором группы.
Определяет group_id автоматически из токена бота и проверяет роль автора через groups.getMembers (role: creator, administrator). Результат кэшируется на 10 минут.
Работает только с групповым токеном. С пользовательским -всегда False.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
delete_after
|
int | float | None
|
Удалить сообщение об ошибке через указанное количество секунд. |
None
|
Returns:
| Type | Description |
|---|---|
Check
|
Экземпляр Check |
Пример
@commands.command() @is_owner() async def owner_cmd(ctx: commands.Context): await ctx.send("Команда владельца!")
@commands.command() @is_owner(delete_after=5) async def owner_cmd(ctx: commands.Context): await ctx.send("Команда владельца!")
Source code in vkflow\commands\checks.py
is_private_message
¶
Проверка, что сообщение из личного чата
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
delete_after
|
int | float | None
|
Удалить сообщение об ошибке через указанное количество секунд. |
None
|
Returns:
| Type | Description |
|---|---|
Check
|
Экземпляр Check |
Пример
@commands.command() @is_private_message() async def private_cmd(ctx: commands.Context): await ctx.send("Это личное сообщение!")
Source code in vkflow\commands\checks.py
is_group_chat
¶
Проверка, что сообщение из группового чата
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
delete_after
|
int | float | None
|
Удалить сообщение об ошибке через указанное количество секунд. |
None
|
Returns:
| Type | Description |
|---|---|
Check
|
Экземпляр Check |
Пример
@commands.command() @is_group_chat() async def group_cmd(ctx: commands.Context): await ctx.send("Это групповой чат!")
Source code in vkflow\commands\checks.py
cooldown
¶
Применить cooldown к команде.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
rate
|
int
|
Количество раз, сколько команда может быть использована |
required |
per
|
float | int
|
Временной период в секундах (int/float/timedelta) |
required |
type
|
BucketType | None
|
BucketType (DEFAULT, USER, CHAT, MEMBER) |
None
|
delete_after
|
int | float | None
|
Удалить сообщение об ошибке через указанное количество секунд. |
None
|
Returns:
| Type | Description |
|---|---|
Check
|
Экземпляр Check |
Пример
from vkflow.commands import BucketType
@commands.command() @commands.cooldown(rate=3, per=60.0, type=BucketType.USER) async def spam_cmd(ctx: commands.Context): await ctx.send("Спам!")
@commands.command() @commands.cooldown(rate=1, per=30, type=BucketType.MEMBER) async def limited(ctx: commands.Context): await ctx.send("Ограниченная команда!")
Source code in vkflow\commands\checks.py
max_concurrency
¶
Ограничить количество одновременных выполнений команды.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
number
|
int
|
Максимальное количество одновременных выполнений |
required |
per
|
BucketType | None
|
BucketType (DEFAULT, USER, CHAT, MEMBER) |
None
|
Returns:
| Type | Description |
|---|---|
Callable
|
Функция-декоратор |
Пример
from vkflow.commands import BucketType
@commands.command() @commands.max_concurrency(2, BucketType.CHAT) async def heavy(ctx: commands.Context): await asyncio.sleep(10) await ctx.send("Готово!")
@commands.command() @commands.max_concurrency(1, BucketType.USER) async def unique(ctx: commands.Context): await ctx.send("Выполняется!")
Source code in vkflow\commands\checks.py
BucketType¶
BucketType
¶
Bases: Enum
Перечисление типов бакетов для cooldown.
Каждый тип определяет, как отслеживаются cooldown: - DEFAULT: Глобальный cooldown для всех пользователей и чатов - USER: Cooldown на пользователя (один пользователь во всех чатах) - CHAT: Cooldown на чат (один peer_id) - MEMBER: Cooldown на участника (комбинация пользователя и чата)
Пример
@commands.cooldown(rate=3, per=60, type=commands.BucketType.USER) async def spam_cmd(ctx: commands.Context): await ctx.send("Команда выполнена!")