通道

通道(Channels) 是一组相关功能,旨在促进事件流的路由,例如可用于向 WebSocket 客户端广播消息。

通道提供:

  1. 独立的 broker 后端,可选地处理进程间通信和按需数据持久化

  2. 基于"通道"的 subscription 管理

  3. 订阅者对象作为个性化 event stream 的抽象,提供后台工作器和托管订阅

  4. 同步和异步数据发布

  5. channel 基础的可选 history 管理

  6. WebSocket 集成,为应用程序生成 WebSocket 路由处理器,以处理 subscription 和将传入事件发布到已连接的客户端

基本概念

使用通道涉及几个移动部件。为了更好地熟悉概念、术语和数据流,提供了以下术语表和流程图

术语表

点击切换术语表
event

发布到或从绑定到最初发布它的 channelbackend 接收的单个数据片段

event stream

事件 流,由 Subscriber 之前订阅的所有通道的事件组成

subscriber

Subscriber:包装 event stream 并通过各种方法提供访问的对象

backend

ChannelsBackend。此对象管理插件和 broker 之间的通信,向其发布消息并从中接收消息。每个插件实例与恰好一个后端关联。

broker

负责接收和发布消息到所有已连接的 backends;共享相同 broker 的所有后端将访问相同的消息,允许进程间通信。这通常由单独的实体处理,例如 Redis

plugin

ChannelsPlugin,管理 subscribers 的中心实例,从 backend 读取消息,将它们放入适当的 event stream,并将数据发布到后端

channel

命名的订阅者组,可以向其发布数据。订阅者可以订阅多个通道,通道可以有多个订阅者

subscription

subscriberchannel 之间的连接,允许订阅者从通道接收事件

backpressure

防止 subscriber 的积压无限增长的机制,通过丢弃新消息或驱逐旧消息

history

backend 存储并可推送到 subscriber 的一组先前发布的 events

fanout

将消息发送到通道的所有订阅者的过程

eviction

backpressure 策略,当积压已满时添加新消息时丢弃积压中最旧的消息

backoff

backpressure 策略,只要积压已满就丢弃新传入的消息

流程图

点击切换流程图
        flowchart LR
    Backend(Backend) --> Broker[(Broker)]

    Plugin{{Plugin}} --> Backend

    Application[[Application]] --> Plugin
    

从应用程序到 broker 的发布流

        flowchart TD
    Broker[(Broker)]

    Broker --> Backend_1(Backend)
    Broker --> Backend_2(Backend)

    Backend_1 --> Plugin_1{{Plugin}}
    Backend_2 --> Plugin_2{{Plugin}}

    Plugin_1 --> Subscriber_1[Subscriber]
    Plugin_1 --> Subscriber_2[Subscriber]
    Plugin_1 --> Subscriber_3[Subscriber]

    Plugin_2 --> Subscriber_4[Subscriber]
    Plugin_2 --> Subscriber_5[Subscriber]
    Plugin_2 --> Subscriber_6[Subscriber]
    

broker 到套接字的数据扇出流,显示多个插件实例

ChannelsPlugin

ChannelsPlugin 充当管理通道和订阅者的中心实体。它用于发布消息、控制数据存储方式以及管理 subscribers、路由处理器和配置。

小技巧

插件在 channels 键下作为依赖项提供自己,这意味着不需要导入它,而是可以直接从依赖树中的路由处理器或其他可调用对象中使用它

配置 channels

插件管理的 channels 可以预先定义,将它们传递给 channels 参数,或通过将 arbitrary_channels_allowed 设置为 True "即时"创建(即在第一次 subscription 到通道时)。

显式传递通道
from litestar.channels import ChannelsPlugin

channels_plugin = ChannelsPlugin(..., channels=["foo", "bar"])
允许任意通道
from litestar.channels import ChannelsPlugin

channels_plugin = ChannelsPlugin(..., arbitrary_channels_allowed=True)

如果 arbitrary_channels_allowed 不是 True,尝试发布或订阅未传递给 channelschannel 将引发 ChannelsException

发布数据

插件的核心方面之一是发布数据,这通过其 publish() 方法完成:

使用 publish() 将数据发布到通道
channels.publish({"message": "Hello"}, "general")

上面的示例将数据发布到通道 general,随后将其放入所有订阅者的 event stream 中以供消费。

此方法是非阻塞的,即使通道和关联的 backends 从根本上是异步的。

调用 publish() 有效地将消息排队以发送到后端,因此无法保证事件在此调用后立即在后端中可用。

或者,可以使用异步 wait_published() 方法,它跳过内部消息队列,直接将数据发布到后端。

备注

虽然调用 publish() 不保证消息立即发送到后端,但它将 最终 发送到那里;在关闭时,插件将等待所有队列清空

管理 subscriptions

插件的另一个核心功能是管理 subscriptions,为此存在两种不同的方法:

  1. 通过 subscribe()unsubscribe() 方法手动管理

  2. 通过使用 start_subscription() 上下文管理器

subscribe()start_subscription() 都产生一个 Subscriber,可用于与订阅的事件流交互。

应该首选上下文管理器,因为它确保通道被取消订阅。仅在无法使用 context manager 时才应使用 subscribe()unsubscribe() 方法,例如当 subscription 跨越不同上下文时。

手动调用 subscription 方法
subscriber = await channels.subscribe(["foo", "bar"])
try:
    ...  # 在这里做一些事情
finally:
    await channels.unsubscribe(subscriber)
async with channels.start_subscription(["foo", "bar"]) as subscriber:
    ...  # 在这里做一些事情

也可以取消订阅单个 channels,如果需要动态管理 subscriptions,这可能是理想的。

手动取消订阅通道
subscriber = await channels.subscribe(["foo", "bar"])
try:
    ...  # 在这里做一些事情
finally:
    await channels.unsubscribe(subscriber, ["foo"])

或者,使用上下文管理器

使用 async context manager 取消订阅 channel
async with channels.start_subscription(["foo", "bar"]) as subscriber:
    ...  # 在这里做一些事情
    await channels.unsubscribe(subscriber, ["foo"])

管理 history

一些后端支持按 channelhistory,在存储中保留一定数量的 events。然后可以将此 history 推送到 subscriber

插件的 put_subscriber_history 可用于获取此 history 并将其放入订阅者的 event stream 中。

备注

history 的发布是按顺序进行的,一次一个 channel 和一个 event。这样做是为了确保事件的正确排序并避免填满 subscriber 的积压,这将导致丢弃 history 条目。如果条目数量超过最大积压大小,执行将等待直到先前的事件已被处理。

阅读更多:`管理反压`_

Subscriber

Subscriber 管理由插件提供给它的单个 event stream,表示订阅者已订阅的所有 channels 的事件总和。

它可以被视为所有 events 的端点,而后端充当源,插件充当路由器,负责将从后端收集的事件提供到适当的订阅者流中。

除了是 event stream 的抽象之外,Subscriber 提供了两种处理此流的方法:

iter_events

asynchronous generator,一次从流中产生一个事件,等待直到下一个可用

run_in_background

context manager,包装 asyncio.Task,使用 iter_events 产生的事件,为每个事件调用提供的 callback。退出时,它将尝试正常关闭正在运行的任务,等待流中当前排队的所有事件被处理。如果上下文以错误退出,任务将被取消。

可以通过在 run_in_background 中将 join 设置为 False 来强制任务立即停止,这将导致任务被取消。默认情况下,这仅在上下文以异常离开时发生。

重要

event streams 中的 events 始终是字节;调用 ChannelsPlugin.publish() 时,数据将在发送到后端之前被序列化。

消费 event stream

消费 event stream 有两种通用方法:

  1. 使用 iter_events 直接迭代它

  2. 使用 run_in_background 上下文管理器,它启动后台任务,迭代流,为每个接收的 event 调用提供的回调

直接迭代 stream 主要在处理事件是唯一关注点时有用,因为 iter_events 实际上是一个无限循环。对于所有其他应用程序,使用上下文管理器更可取,因为它允许轻松并发运行其他代码。

在上面的示例中,流用于将数据发送到 WebSocket

通过将 Websocket.send_text() 作为回调传递给 run_in_background(),可以实现相同的效果。这将导致每次新 eventstream 中可用时调用 WebSocket 的方法,但将控制权交还给应用程序,提供执行其他任务的机会,例如从套接字接收传入数据。

重要

与 WebSocket 一起使用时,应谨慎使用迭代 iter_events()

由于 WebSocketDisconnect 仅在相应的 ASGI 事件被 接收 后引发,它可能导致无限期挂起的协程。例如,如果客户端断开连接,但没有收到进一步的事件,就会发生这种情况。生成器将等待新事件,但由于它永远不会收到任何事件,因此不会在 WebSocket 上进行 send 调用,这反过来意味着不会引发异常来打破循环。

管理 backpressure

每个订阅者管理自己的积压:未处理的 events 队列。默认情况下,此积压的大小是无限的,允许它无限增长。对于大多数应用程序,这应该不是问题,但当接收者一致地无法比消息进入更快地处理消息时,应用程序可能选择处理这种情况。

通道插件为管理此 backpressure 提供了两种不同的策略:

  1. backoff 策略,只要积压已满就丢弃新传入的消息

  2. eviction 策略,当积压已满时添加新消息时丢弃积压中最旧的消息

Backoff 策略
from litestar.channels import ChannelsPlugin
from litestar.channels.memory import MemoryChannelsBackend

channels = ChannelsPlugin(
    backend=MemoryChannelsBackend(),
    max_backlog=1000,
    backlog_strategy="backoff",
)
Eviction 策略
from litestar.channels import ChannelsPlugin
from litestar.channels.memory import MemoryChannelsBackend

channels = ChannelsPlugin(
    backend=MemoryChannelsBackend(),
    max_backlog=1000,
    backlog_strategy="dropleft",
)

后端

消息的存储和 fanoutChannelsBackend 处理。

当前实现了以下后端:

MemoryChannelsBacked

基本的内存后端,主要用于测试和本地开发,但仍然完全有能力。由于它在进程内存储所有数据,因此可以实现所有后端中最高的性能,但同时不适合在多个进程上运行的应用程序。

RedisChannelsPubSubBackend

基于 Redis 的后端,使用 Pub/Sub 传递消息。此 Redis 后端具有低延迟和开销,如果不需要 history,通常推荐使用

RedisChannelsStreamBackend

基于 redis 的后端,使用 streams 传递消息。与 Pub/Sub 后端相比,发布时的延迟略高,但在消息 fanout 中实现相同的吞吐量。当需要 history 时推荐使用

AsyncPgChannelsBackend

使用 asyncpg 驱动程序的 postgres 后端

PsycoPgChannelsBackend

使用 psycopg3 异步驱动程序的 postgres 后端

与 websocket 处理器集成

生成路由处理器

一个常见的模式是为每个 channel 创建一个路由处理器,从该通道向已连接的客户端发送数据。这可以完全自动化,使用插件创建这些路由处理器。

生成的路由处理器可以选择配置为在客户端连接后发送 channelhistory

小技巧

ChannelsPlugin 上使用 arbitrary_channels_allowed 参数时,将生成单个路由处理器,使用 路径参数 指定通道名称