Проверки, cooldown и max_concurrency¶
Встроенные проверки¶
import vkflow as vf
from vkflow import commands
@app.command("бан")
@vf.is_admin()
async def ban(ctx: vf.Context, user: vf.User):
"""Только для администраторов чата (is_admin в ЛС всегда False)"""
...
@app.command("настройки")
@vf.is_owner()
async def settings(ctx: vf.Context):
"""Только для владельца/администратора группы-бота"""
...
@app.command("лс")
@vf.is_private_message()
async def dm_only(ctx: vf.Context):
"""Только в личных сообщениях"""
...
@app.command("чат")
@vf.is_group_chat()
async def chat_only(ctx: vf.Context):
"""Только в групповых чатах"""
...
Кэширование проверок¶
is_admin() кэширует результат на 5 минут, is_owner() -на 10 минут. Это означает, что изменения прав вступят в силу не сразу.
Параметр delete_after¶
Все встроенные проверки поддерживают delete_after -автоудаление сообщения об ошибке:
@app.command("секрет")
@vf.is_admin(delete_after=10)
async def secret(ctx: vf.Context):
"""Сообщение 'Вы должны быть администратором...' удалится через 10 секунд"""
...
Алиасы¶
| Проверка | Алиас |
|---|---|
is_private_message() |
dm_only() |
is_group_chat() |
guild_only() |
Cooldown¶
Ограничение частоты вызова команды:
from vkflow.commands import cooldown, BucketType
# 1 раз в 60 секунд на пользователя
@commands.command()
@commands.cooldown(rate=1, per=60, type=BucketType.USER)
async def bonus(ctx: commands.Context):
await ctx.send("Бонус получен!")
# 3 раза в 60 секунд на пользователя
@commands.command()
@commands.cooldown(rate=3, per=60, type=BucketType.USER)
async def spam(ctx: commands.Context):
await ctx.send("Спам!")
# 1 раз в 30 секунд на чат
@commands.command()
@commands.cooldown(rate=1, per=30, type=BucketType.CHAT)
async def global_cmd(ctx: commands.Context):
await ctx.send("Глобальная команда!")
# 1 раз в 10 секунд на пользователя в конкретном чате
@commands.command()
@commands.cooldown(rate=1, per=10, type=BucketType.MEMBER)
async def limited(ctx: commands.Context):
await ctx.send("Ограниченная!")
BucketType¶
| Тип | Описание |
|---|---|
BucketType.DEFAULT |
Глобальный cooldown (один на всех) |
BucketType.USER |
На пользователя (одинаковый во всех чатах) |
BucketType.CHAT |
На чат |
BucketType.MEMBER |
На пользователя в конкретном чате |
Обработка cooldown¶
@commands.command()
@commands.cooldown(rate=1, per=60, type=BucketType.USER)
async def daily(ctx: commands.Context):
await ctx.send("Готово!")
# Обработчик cooldown
@daily.on_cooldown()
async def on_daily_cooldown(ctx: commands.Context, remaining: float):
await ctx.send(f"Подождите {remaining:.0f} сек.")
# Или с полным объектом ошибки
@daily.on_cooldown()
async def on_daily_cooldown(ctx: commands.Context, error: commands.OnCooldownError):
await ctx.send(f"Подождите {error.retry_after:.1f}с (тип: {error.type})")
Сброс cooldown¶
# Сбросить все cooldown для всех
daily.reset_cooldown()
# Сбросить для конкретного пользователя (из контекста)
daily.reset_cooldown(ctx)
# Сбросить для конкретного ID
daily.reset_cooldown(user=123456)
# Сбросить для конкретного чата
daily.reset_cooldown(chat=2000000001)
# Сбросить только определённый тип
daily.reset_cooldown(type=BucketType.USER)
Max Concurrency¶
Ограничение количества одновременных выполнений команды:
from vkflow.commands import max_concurrency, BucketType
@commands.command()
@commands.max_concurrency(2, BucketType.CHAT)
async def heavy(ctx: commands.Context):
import asyncio
await asyncio.sleep(10)
await ctx.send("Готово!")
# Обработчик превышения лимита
@heavy.on_max_concurrency()
async def on_heavy_concurrency(ctx: commands.Context, limit: int, current: int):
await ctx.send(f"Слишком много одновременных выполнений: {current}/{limit}")
# Или с объектом ошибки
@heavy.on_max_concurrency()
async def on_heavy_concurrency(ctx: commands.Context, error: commands.MaxConcurrencyReachedError):
await ctx.send(f"Лимит: {error.current}/{error.number}")
Свои проверки¶
from vkflow import check
# Создание через функцию check()
VIP_LIST = [123456, 789012]
def is_vip():
async def predicate(ctx) -> bool:
return ctx.author in VIP_LIST
return check(predicate, error_message="Только для VIP!")
# С удалением сообщения об ошибке
def is_premium():
def predicate(ctx) -> bool:
return ctx.author in PREMIUM_IDS
return check(predicate, error_message="Только для премиум!", delete_after=10)
# Использование
@commands.command()
@is_vip()
async def vip_command(ctx: commands.Context):
await ctx.send("VIP-команда!")
# Синхронные проверки тоже работают
def is_not_bot():
return check(lambda ctx: ctx.author > 0, error_message="Боты не допускаются!")
Комбинирование проверок¶
Логическое ИЛИ (check_any)¶
from vkflow.commands import check_any
@commands.command()
@check_any(is_owner(), is_admin(), error_message="Нужны права владельца или админа!")
async def manage(ctx: commands.Context):
await ctx.send("Вы владелец или админ!")
Логическое И (несколько декораторов)¶
@commands.command()
@is_vip()
@vf.is_group_chat()
async def vip_chat_cmd(ctx: commands.Context):
"""Доступно только VIP-пользователям И только в групповых чатах"""
...
Обработка ошибок проверок¶
Через on_error на команде¶
@commands.command()
@vf.is_admin()
async def admin_cmd(ctx: commands.Context):
return "Доступ есть"
@admin_cmd.on_error(vf.CheckFailureError)
async def admin_error(ctx: commands.Context, error: vf.CheckFailureError):
await ctx.send("У вас нет прав администратора!")
Через события¶
from vkflow import commands
@commands.listener()
async def on_check_error(ctx, error, command):
"""Глобальный слушатель ошибок проверок"""
print(f"Check failed for {command.name}: {error}")
Пример: Полный бот с проверками¶
import vkflow as vf
from vkflow import commands
app = vf.App(prefixes=["!"])
OWNER_ID = 123456789
def is_bot_owner():
return vf.check(
lambda ctx: ctx.author == OWNER_ID,
error_message="Только владелец бота!"
)
@commands.command(name="перезагрузка")
@is_bot_owner()
async def reload_cmd(ctx: commands.Context, ext: str):
"""Перезагрузить расширение (только для владельца)"""
await ctx.app.reload_extension(ext)
await ctx.send(f"Расширение {ext} перезагружено!")
@commands.command(name="бонус")
@commands.cooldown(rate=1, per=3600, type=commands.BucketType.USER)
@vf.is_group_chat()
async def bonus(ctx: commands.Context):
"""Ежечасный бонус (только в чатах)"""
await ctx.send("Вы получили бонус!")
@bonus.on_cooldown()
async def bonus_cd(ctx: commands.Context, remaining: float):
mins = int(remaining // 60)
secs = int(remaining % 60)
await ctx.send(f"Бонус доступен через {mins}м {secs}с")
@bonus.on_error(vf.CheckFailureError)
async def bonus_check_error(ctx, error):
await ctx.send("Бонус доступен только в групповых чатах!")
app.commands.extend([reload_cmd, bonus])
app.run("TOKEN")