事件

Litestar 支持事件发射器/监听器模式的简单实现:

from dataclasses import dataclass

from litestar import Request, post
from litestar.events import listener
from litestar import Litestar

from db import user_repository
from utils.email import send_welcome_mail


@listener("user_created")
async def send_welcome_email_handler(email: str) -> None:
    # 在这里做一些事情来发送电子邮件
    await send_welcome_mail(email)


@dataclass
class CreateUserDTO:
    first_name: str
    last_name: str
    email: str


@post("/users")
async def create_user_handler(data: UserDTO, request: Request) -> None:
    # 在这里做一些事情来创建新用户
    # 例如,将用户插入数据库
    await user_repository.insert(data)

    # 假设我们现在已经插入了一个用户,我们想发送一封欢迎邮件。
    # 为了以非阻塞的方式做到这一点,我们将向监听器发出一个事件,该监听器将发送电子邮件,
    # 使用与我们返回响应的不同的异步块。
    request.app.emit("user_created", email=data.email)


app = Litestar(
    route_handlers=[create_user_handler], listeners=[send_welcome_email_handler]
)

上面的示例说明了这种模式的强大之处 - 它允许我们在不阻塞的情况下执行异步操作,并且不会减慢响应周期。

监听多个事件

事件监听器可以监听多个事件:

from litestar.events import listener


@listener("user_created", "password_changed")
async def send_email_handler(email: str, message: str) -> None:
    # 在这里做一些事情来发送电子邮件

    await send_email(email, message)

使用多个监听器

您还可以使用多个监听器监听相同的事件:

from uuid import UUID
from dataclasses import dataclass

from litestar import Request, post
from litestar.events import listener

from db import user_repository
from utils.client import client
from utils.email import send_farewell_email


@listener("user_deleted")
async def send_farewell_email_handler(email: str, **kwargs) -> None:
    # 在这里做一些事情来发送电子邮件
    await send_farewell_email(email)


@listener("user_deleted")
async def notify_customer_support(reason: str, **kwargs) -> None:
    # 在这里做一些事情来发送电子邮件
    await client.post("some-url", reason)


@dataclass
class DeleteUserDTO:
    email: str
    reason: str


@post("/users")
async def delete_user_handler(data: UserDTO, request: Request) -> None:
    await user_repository.delete({"email": email})
    request.app.emit("user_deleted", email=data.email, reason="deleted")

在上面的示例中,我们为同一事件执行了两个副作用,一个向用户发送电子邮件,另一个向服务管理系统发送 HTTP 请求以创建问题。

向监听器传递参数

方法 emit 具有以下签名:

def emit(self, event_id: str, *args: Any, **kwargs: Any) -> None: ...

这意味着它期望一个 event_id 字符串,后跟任意数量的位置参数和关键字参数。虽然这非常灵活,但也意味着您需要确保给定事件的监听器可以处理所有预期的 args 和 kwargs。

例如,以下代码在 Python 中会引发异常:

@listener("user_deleted")
async def send_farewell_email_handler(email: str) -> None:
    await send_farewell_email(email)


@listener("user_deleted")
async def notify_customer_support(reason: str) -> None:
    # 在这里做一些事情来发送电子邮件
    await client.post("some-url", reason)


@dataclass
class DeleteUserDTO:
    email: str
    reason: str


@post("/users")
async def delete_user_handler(data: UserDTO, request: Request) -> None:
    await user_repository.delete({"email": email})
    request.app.emit("user_deleted", email=data.email, reason="deleted")

这是因为两个监听器都将接收两个 kwargs - emailreason。为了避免这种情况,前面的示例在两者中都有 **kwargs

@listener("user_deleted")
async def send_farewell_email_handler(email: str, **kwargs) -> None:
    await send_farewell_email(email)


@listener("user_deleted")
async def notify_customer_support(reason: str, **kwargs) -> None:
    await client.post("some-url", reason)

创建事件发射器

"事件发射器"是一个继承自 BaseEventEmitterBackend 的类,它本身继承自 contextlib.AbstractAsyncContextManager

  • emit:这是执行实际发射逻辑的方法。

此外,必须实现 contextlib.AbstractAsyncContextManager 的抽象 __aenter____aexit__ 方法,允许发射器用作异步上下文管理器。

默认情况下,Litestar 使用 SimpleEventEmitter,它提供内存中的异步队列。

如果系统不需要依赖复杂的行为,例如重试机制、持久性或调度/cron,则此解决方案效果很好。对于这些更复杂的用例,用户应该使用支持事件的数据库/键值存储(Redis、Postgres 等)或消息代理、作业队列或任务队列技术来实现自己的后端。