Перейти к содержанию

События

GroupLongPoll

GroupLongPoll

GroupLongPoll(
    api,
    /,
    *,
    group_id=None,
    wait=25,
    new_event_callbacks=None,
    requests_session=None,
    json_parser=None,
)

Bases: BaseLongPoll

Source code in vkflow\longpoll.py
def __init__(
    self,
    api: API,
    /,
    *,
    group_id: int | None = None,
    wait: int = 25,
    new_event_callbacks: list[EventsCallback] | None = None,
    requests_session: aiohttp.ClientSession | None = None,
    json_parser: BaseJSONParser | None = None,
) -> None:
    super().__init__(
        api=api,
        event_wrapper=GroupEvent,
        new_event_callbacks=new_event_callbacks,
        requests_session=requests_session,
        json_parser=json_parser,
    )

    self._group_id = group_id
    self._wait = wait

UserLongPoll

UserLongPoll

UserLongPoll(
    api,
    /,
    *,
    version=3,
    wait=15,
    mode=234,
    new_event_callbacks=None,
    requests_session=None,
    json_parser=None,
)

Bases: BaseLongPoll

Source code in vkflow\longpoll.py
def __init__(
    self,
    api: API,
    /,
    *,
    version: int = 3,
    wait: int = 15,
    mode: int = 234,
    new_event_callbacks: list[EventsCallback] | None = None,
    requests_session: aiohttp.ClientSession | None = None,
    json_parser: BaseJSONParser | None = None,
) -> None:
    super().__init__(
        api=api,
        event_wrapper=UserEvent,
        new_event_callbacks=new_event_callbacks,
        requests_session=requests_session,
        json_parser=json_parser,
    )

    self._version = version

    self._wait = wait
    self._mode = mode

GroupEvent

GroupEvent

GroupEvent(content)

Bases: BaseEvent[dict]

Обертка над событием в группе

Source code in vkflow\base\event.py
def __init__(self, content: ContentT):
    self._content = content

UserEvent

UserEvent

UserEvent(content)

Bases: BaseEvent[list]

Обертка над событием у пользователя

Source code in vkflow\base\event.py
def __init__(self, content: ContentT):
    self._content = content

WebhookApp

WebhookApp dataclass

WebhookApp(
    prefixes=list(),
    name="VK Flow App",
    filter=None,
    commands=list(),
    event_handlers=(lambda: defaultdict(list))(),
    message_handlers=list(),
    startup_handlers=list(),
    shutdown_handlers=list(),
    button_onclick_handlers=dict(),
    button_callback_handlers=dict(),
    inviting_handlers=list(),
    fsm_storage=None,
    fsm_strategy="user_chat",
    packages=list(),
    debug=False,
    strict_mode=False,
    description="Чат-бот для ВКонтакте, написанный на Python с использованием VK Flow",
    addons=list(),
    payload_factory=None,
    experimental=dict(),
    secret_key=None,
    confirmation_key=None,
    path="/webhook",
)

Bases: App

App subclass that uses VK Callback API (webhooks) instead of LongPoll.

All features of :class:App are fully supported -commands, cogs, FSM, middleware, event/message handlers, callback buttons, ViewStore, startup/shutdown hooks, on_ready, wait_for, conquer_new_message, addons, and extensions.

Single bot::

app = WebhookApp(
    prefixes=["/"],
    secret_key="abc",
    confirmation_key="xyz",
)

@app.command("ping")
async def ping(ctx):
    await ctx.reply("pong")

app.run("token", host="0.0.0.0", port=8080)

Multi bot::

app = WebhookApp(prefixes=["/"])
app.run(
    WebhookBotEntry("token1", secret_key="s1", confirmation_key="c1"),
    WebhookBotEntry("token2", secret_key="s2", confirmation_key="c2"),
    host="0.0.0.0", port=8080,
)

Mixed (plain tokens inherit app-level keys)::

app = WebhookApp(
    prefixes=["/"],
    secret_key="shared_secret",
    confirmation_key="shared_conf",
)
app.run("token1", "token2", host="0.0.0.0", port=8080)

Manual integration with an existing aiohttp app::

async def main():
    await app.prepare("token")
    aiohttp_app = app.create_aiohttp_app()
    aiohttp.web.run_app(aiohttp_app)

run

run(
    *tokens,
    host="0.0.0.0",
    port=8080,
    bot_payload_factory=None
)

Synchronous entry-point (mirrors App.run).

Source code in vkflow\webhook.py
def run(  # type: ignore[override]
    self,
    *tokens: str | API | WebhookBotEntry,
    host: str = "0.0.0.0",
    port: int = 8080,
    bot_payload_factory: type | None = None,
) -> asyncio.Task | None:
    """Synchronous entry-point (mirrors ``App.run``)."""
    coro = self.start(
        *tokens,
        host=host,
        port=port,
        bot_payload_factory=bot_payload_factory,
    )
    try:
        loop = asyncio.get_running_loop()
    except RuntimeError:
        self._run_sync(coro)
        return None
    else:
        return loop.create_task(coro, name="vkflow_webhook_app")

start async

start(
    *tokens,
    host="0.0.0.0",
    port=8080,
    bot_payload_factory=None
)

Async: initialise bots, start aiohttp server, wait for signal.

Source code in vkflow\webhook.py
async def start(  # type: ignore[override]
    self,
    *tokens: str | API | WebhookBotEntry,
    host: str = "0.0.0.0",
    port: int = 8080,
    bot_payload_factory: type | None = None,
) -> None:
    """Async: initialise bots, start aiohttp server, wait for signal."""
    await self.prepare(*tokens, bot_payload_factory=bot_payload_factory)

    app = self.create_aiohttp_app()
    self._runner = aiohttp.web.AppRunner(app)
    await self._runner.setup()

    site = aiohttp.web.TCPSite(self._runner, host, port)
    await site.start()

    logger.opt(colors=True).success(
        "Webhook server started on <b>{host}:{port}{path}</b> (<b>{count}</b> bot{postfix})",
        host=host,
        port=port,
        path=self.path,
        count=len(self._bots),
        postfix="s" if len(self._bots) > 1 else "",
    )

    loop = asyncio.get_running_loop()
    shutdown_event = asyncio.Event()

    def _signal_handler(sig: int) -> None:
        logger.opt(colors=True).warning(
            "Received signal <r>{sig_name}</r>, shutting down...",
            sig_name=signal.Signals(sig).name,
        )
        loop.call_soon_threadsafe(shutdown_event.set)

    if sys.platform == "win32":
        signal.signal(signal.SIGINT, lambda s, f: _signal_handler(s))
        signal.signal(signal.SIGTERM, lambda s, f: _signal_handler(s))
    else:
        for sig in (signal.SIGINT, signal.SIGTERM):
            loop.add_signal_handler(sig, lambda s=sig: _signal_handler(s))

    try:
        await shutdown_event.wait()
    except asyncio.CancelledError:
        logger.opt(colors=True).warning("Tasks cancelled, shutting down...")
    finally:
        await self.close()

        if sys.platform == "win32":
            signal.signal(signal.SIGINT, signal.default_int_handler)
            signal.signal(signal.SIGTERM, signal.SIG_DFL)
        else:
            for sig in (signal.SIGINT, signal.SIGTERM):
                loop.remove_signal_handler(sig)

        executor = getattr(loop, "_default_executor", None)
        if executor is not None:
            executor.shutdown(wait=False, cancel_futures=True)

        _prevent_shutdown_hang()

prepare async

prepare(*tokens, bot_payload_factory=None)

Initialise bots without starting the HTTP server.

Useful for manual integration with an existing aiohttp application (call :meth:create_aiohttp_app afterwards).

Source code in vkflow\webhook.py
async def prepare(
    self,
    *tokens: str | API | WebhookBotEntry,
    bot_payload_factory: type | None = None,
) -> None:
    """Initialise bots without starting the HTTP server.

    Useful for manual integration with an existing aiohttp application
    (call :meth:`create_aiohttp_app` afterwards).
    """
    await self._init_bots(*tokens, bot_payload_factory=bot_payload_factory)

    if self._ready_event is None:
        self._ready_event = asyncio.Event()

    for info in self._group_map.values():
        info.bot.events_factory._mark_ready()

    await self._call_startup(*self._bots)

    for bot in self._bots:
        await self.dispatch_event("ready", bot=bot)

create_aiohttp_app

create_aiohttp_app()

Return an aiohttp.web.Application with the webhook route registered.

Source code in vkflow\webhook.py
def create_aiohttp_app(self) -> aiohttp.web.Application:
    """Return an ``aiohttp.web.Application`` with the webhook route registered."""
    app = aiohttp.web.Application()
    app.router.add_post(self.path, self._handle_webhook)
    self._aiohttp_app = app
    return app

close async

close()

Graceful shutdown: stop factories, cleanup runner, call shutdown handlers, close sessions.

Source code in vkflow\webhook.py
async def close(self) -> None:
    """Graceful shutdown: stop factories, cleanup runner, call shutdown handlers, close sessions."""
    for info in self._group_map.values():
        info.bot.events_factory.stop()

    if self._runner is not None:
        await self._runner.cleanup()
        self._runner = None

    if self._bots:
        await self._call_shutdown(*self._bots)

        for bot in self._bots:
            await bot.close_sessions()

WebhookEventFactory

WebhookEventFactory

WebhookEventFactory(*, api, **kwargs)

Bases: BaseEventFactory

Event factory for webhook (Callback API) transport.

Does not poll a long-poll server. Events are pushed externally via :meth:push_event, which feeds any active :meth:listen generators (used by conquer_new_message, run_state_handling, etc.).

Source code in vkflow\webhook.py
def __init__(self, *, api: API, **kwargs) -> None:
    super().__init__(api=api, **kwargs)
    self._stop_event: asyncio.Event | None = None

push_event async

push_event(event)

Push an externally-received event through the callback chain.

Source code in vkflow\webhook.py
async def push_event(self, event: BaseEvent) -> None:
    """Push an externally-received event through the callback chain."""
    await self._run_through_callbacks(event)