Channel Layers – Django Channels:實現 WebSocket 異步通信,打造高效的長連接系統

A. 簡介

通道層允許您在應用程式的不同實例之間進行通信。如果您不想將所有消息或事件都透過資料庫進行傳遞,它們是在製作分散式實時應用程式時非常有用的一部分。此外,通道層還可以與工作者進程結合使用,以建立基本的任務佇列或卸載任務——更多資訊請參閱<Workers與背景任務 – Django Channels:為 Django 提供異步通信與長連接協議支持>

通道層是 Channels 的一個完全可選的部分。如果你不想使用它們,只需讓 CHANNEL_LAYERS 保持未設置狀態,或者將其設置為空字典 {}

通道層具有純粹的異步介面(無論是發送還是接收);如果你想從同步代碼調用它們,你需要將它們包裝在一個轉換器中(見下文)。

B. 設定

通道層是透過 Django 設定中的 CHANNEL_LAYERS 進行配置的。 

你可以使用 `channels.layers.get_channel_layer()` 從專案中獲取預設的通道層,但如果你正在使用消費者(consumers),則消費者將自動為你提供一個副本,可以透過 `self.channel_layer` 來存取。

1. Redis Channel Layer

channels_redis 是唯一由 Django 官方維護,且支援生產環境使用的 channel layer。此層使用 Redis 作為其後端存儲,並支持單伺服器和分片配置,以及群組支持。要使用此層,您需要安裝 channels_redis 套件。

使用範例如下:

CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels_redis.core.RedisChannelLayer",
        "CONFIG": {
            "hosts": [("127.0.0.1", 6379)],
        },
    },
}

2. In-Memory Channel Layer

Channels 還隨附了一個內存中的 Channels 層。這個層在測試或本地開發環境中會很有幫助:

CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels.layers.InMemoryChannelLayer"
    }
}
生產環境中不建議使用In-Memory Channel Layer

記憶體內的通道層運作時,每個進程都作為一個獨立的層級運行。這意味著無法進行跨進程的訊息傳遞。由於通道層的核心價值在於提供分散式的訊息傳遞,因此在記憶體中使用通道層會導致不理想的性能,並且最終會在多實例環境中造成資料遺失。

C. Synchronous Functions

預設情況下,send()group_send()group_add() 和其他函式是非同步函式,這表示你必須使用 await 來呼叫它們。如果你需要從同步程式碼中呼叫它們,你需要使用方便的 asgiref.sync.async_to_sync 包裝器。

from asgiref.sync import async_to_sync

async_to_sync(channel_layer.send)("channel_name", {...})

D. 通過通道層發送的內容

通道層是用於high-level的應用程序到應用程序通信。當您發送一條消息時,監聽這個Group或Channel的消費者將接收到該消息。這意味著您應該在通道層上發送高層次的事件,然後讓消費者處理這些事件,並對其連接的客戶端進行適當的低層次網絡操作。

例如,一個聊天應用程序可以通過通道層發送這樣的事件:

await self.channel_layer.group_send(
    room.group_name,
    {
        "type": "chat.message",
        "room_id": room_id,
        "username": self.scope["user"].username,
        "message": message,
    }
)

然後,消費者定義一個處理函數來接收這些事件並將它們轉換為 WebSocket frames:

async def chat_message(self, event):
    """
    Called when someone has messaged our chat.
    """
    # Send a message down to the client
    await self.send_json(
        {
            "msg_type": settings.MSG_TYPE_MESSAGE,
            "room": event["room_id"],
            "username": event["username"],
            "message": event["message"],
        },
    )

在 Django Channels 框架中,每一個基於 SyncConsumer 或 AsyncConsumer 的消費者都自帶 `self.channel_layer` 和 `self.channel_name` 屬性,這些屬性代表著通道層的實例和通道名稱,確保消息能準確地到達消費者。當有消息發送至該通道名稱或其所屬的群組時,這些消息會像事件一樣被消費者接收,並通過名稱對應的方法進行處理。這些方法名是基於事件類型的名稱轉化而成,其中的句點會被替換為下劃線。例如,若接收到 `chat.join` 事件,則會執行 `chat_join` 方法。

此外,通過 `channel_layer.group_send` 功能,我們能方便地將消息發送到指定的群組中。這個函數需要兩個參數:一是群組名稱,它是一個字串,用於指明消息的目標群組;二是待發送內容的字典,這個字典包含必須的 `type` 鍵,用以標識群組中的消費者接收消息時應使用的處理函數。例如,當 `type` 的值為 `’chat.message’` 時,對應的 `chat_message` 方法會在消費者中被調用。因此,開發者需要預先在消費者中定義這些方法,以確保消息能夠被正確解析並響應。這種設計結合了群組消息和具體業務邏輯的處理,使得應用程序能靈活地對不同的事件做出適宜的反應。

E. 單一Channels

每個應用實例——例如,每個長時間運行的 HTTP 請求或開啟的 WebSocket——都對應一個 Consumer 實例,如果啟用了 channel layers,Consumers 會為自己生成一個唯一的通道名稱,並開始在該通道上監聽事件。這意味著您可以從外部進程發送事件給這些消費者,可能是來自其他消費者,又或者是管理命令,它們將對這些事件做出反應,運行代碼,就像它們對客戶端連接的事件做出反應一樣。通道名稱可以通過 self.channel_name 在 consumer 上獲取。下面是一個在連接時將通道名稱寫入數據庫,然後指定其事件處理方法的範例:

class ChatConsumer(WebsocketConsumer):

    def connect(self):
        # Make a database row with our channel name
        Clients.objects.create(channel_name=self.channel_name)

    def disconnect(self, close_code):
        # Note that in some rare cases (power loss, etc) disconnect may fail
        # to run; this naive example would leave zombie channel names around.
        Clients.objects.filter(channel_name=self.channel_name).delete()

    def chat_message(self, event):
        # Handles the "chat.message" event when it's sent to us.
        self.send(text_data=event["text"])

請注意,由於您正在混合使用來自通道層和協議連接的事件處理,您需要確保類型名稱不發生衝突。建議您在類型名稱前添加前綴(例如,我們在此使用 chat.),以避免衝突。

要發送到單個頻道,只需找到其頻道名稱(對於上述例子,我們可以在數據庫中查找),然後使用 `channel_layer.send`:

from channels.layers import get_channel_layer

channel_layer = get_channel_layer()
await channel_layer.send("channel_name", {
    "type": "chat.message",
    "text": "Hello there!",
})

F. Groups

顯然,僅僅發送訊息到單一頻道並不是特別有用——在大多數情況下,您會希望一次性將訊息廣播到多個頻道或消費者。例如在聊天室中,您可能希望將接收到的訊息發送給房間中所有人,甚至希望將訊息發送給可能有多個瀏覽器標籤或設備連接的單一用戶。

您可以選擇使用現有的數據儲存方案自行建構這種解決方案,或者使用某些頻道層內建的群組系統。群組是一種廣播系統,具有以下特點:

– 允許您將頻道名稱新增到命名群組中,或從中移除,並向這些命名群組發送訊息。

– 提供群組到期機制,以清理那些未能執行斷線處理程序的連接(如遇上停電)。

這些群組系統不允許您列舉或列出群組中的頻道;這是一個純粹的廣播系統。如果您需要更精確的控制,或需要知道誰已連接,則應該建立自己的系統或使用合適的第三方系統。

在使用 WebSocket 通用消費者時,您可以在連接期間將一個頻道新增到群組中,並在斷線期間移除它,如下所示:

# This example uses WebSocket consumer, which is synchronous, and so
# needs the async channel layer functions to be converted.
from asgiref.sync import async_to_sync

class ChatConsumer(WebsocketConsumer):

    def connect(self):
        async_to_sync(self.channel_layer.group_add)("chat", self.channel_name)

    def disconnect(self, close_code):
        async_to_sync(self.channel_layer.group_discard)("chat", self.channel_name)
群組名稱規範

群組名稱僅限於 ASCII 字母數字、連字號和句點,且在預設後端中其長度限制為最多 100 個字元。

接著,要向一個群組發送消息,可以使用 `group_send`,就像在這個小示例中,結合上面的代碼來將聊天消息廣播給每個連接的 socket:

class ChatConsumer(WebsocketConsumer):

    ...

    def receive(self, text_data):
        async_to_sync(self.channel_layer.group_send)(
            "chat",
            {
                "type": "chat.message",
                "text": text_data,
            },
        )

    def chat_message(self, event):
        self.send(text_data=event["text"])

G. 在Consumers使用Channel Layers

您通常會希望從一個消費者的範圍之外發送訊息到通道層,因此您不會擁有 `self.channel_layer`。在這種情況下,您應該使用 `get_channel_layer` 函數來取得它:

from channels.layers import get_channel_layer
channel_layer = get_channel_layer()

然後,一旦您擁有它,您只需調用其方法。請記住,通道層僅支持異步方法,因此您可以從自己的異步上下文中調用它:

for chat_name in chats:
    await channel_layer.group_send(
        chat_name,
        {"type": "chat.system_message", "text": announcement_text},
    )

或者你將需要使用 `async_to_sync`:

from asgiref.sync import async_to_sync

async_to_sync(channel_layer.group_send)("chat", {"type": "chat.force_disconnect"})

H. 使用規範

通道層現在僅供 Channels 內部使用,而不是作為 ASGI 的一部分使用。此規範定義了使用 Channels 編寫的應用程式期望通道層提供的內容。

接下來會概述一組標準化定義的通道和通道層,這些提供了通過通道發送和接收消息的機制。它們允許在不同進程之間進行進程間通信,以幫助構建不同客戶端之間具有消息和事件的應用程式。

1. 總覽

1-1. Messages

信息必須是字典。由於這些信息有時會通過網絡發送,因此需要可序列化,所以只允許包含以下類型:

  • 位元組字串
  • Unicode 字串
  • 整數(在帶符號的64位範圍內)
  • 浮點數(在 IEEE 754 雙精度範圍內)
  • 清單(應將元組編碼為清單)
  • 字典(鍵必須是 Unicode 字串)
  • 布林值
  • None(空值)

2-2. Channels

頻道是由一個Unicode字符串名稱識別,該名稱僅包含ASCII字母、ASCII數字、句點 (.)、破折號 (-) 和下劃線 (_),加上一個可選的類型字符(見下文)。 

頻道是一個先進先出的隊列,具有「最多一次送達」的語義。它們可以由多個寫入者和多個讀取者使用;每個寫入的消息應只由一個讀取者獲得。實現中必須保證消息不會被多次交付或交付給多個讀取者,並且如果有必要達成這種限制,必須丟棄部分消息。 

為了協助擴展和網絡架構,頻道區分了多讀取者頻道和由單一已知進程讀取的進程特定頻道。 

普通頻道名稱不包含類型字符,後端可以根據需要任意路由它們;特別地,它們不一定要顯示為全局一致的,後端可能將它們的內容分散到不同的服務器,以便查詢的客戶端只能看到部分消息。在這些頻道上調用 `receive` 不保證您會按順序獲得消息,或即使頻道非空時也不保證會獲得任何內容。 

進程特定頻道名稱包含一個驚嘆號(!),將遠程部分和本地部分分開。這些頻道以不同的方式接收;只有到包括!字符的名稱將傳遞給 `receive()` 調用,對於任何具有該前綴的頻道都會接收到消息。這允許一個進程,例如HTTP終止器,在一個進程特定頻道上進行監聽,然後使用本地部分(即!之後的部分)將傳入的請求分發到相應的客戶端套接字。這些本地部分必須由消費它們的進程生成和管理。這些頻道,像單讀取者頻道一樣,保證如果從單一進程接收則以順序方式給予任何現存消息。 

在頻道中未讀的消息應該在設置的時間後過期;建議的時間是一分鐘,儘管最佳值取決於頻道層及其部署方式,建議允許用戶配置過期時間。 

如果消息以JSON編碼,則最大消息大小為1MB;如果需要傳輸比這更大的數據,必須將其分段成較小的消息。所有頻道層必須支持不超過這一大小的消息,但建議頻道層的用戶保持遠低於此限制。

1-3. Extensions

擴展功能是指不是基本應用程式代碼所需的功能,也幾乎是所有協議伺服器代碼所不必需的,因此為了支持那些不需要這裡定義的完整功能集的應用程式而將其設為可選。這裡定義的擴展功能包括:

1. 群組(groups):允許將多個頻道組成一組,以便進行廣播;詳情請見下文。

2. 清空(flush):允許通過頻道層進行更輕鬆的測試和開發。

還有增加其他擴展的潛力;這些擴展可能由單獨的規範定義,或是這個規範的新版本定義。若應用程序代碼需要某個擴展,應盡快檢查是否存在,並在未提供時發出硬性錯誤。框架應鼓勵選擇性使用擴展,同時嘗試將任何未找到擴展的錯誤移至過程啟動階段,而不是消息處理階段。

1-4. Asynchronous Support

所有的通道層必須為其主要端點提供異步(協程)方法。最終使用者可以使用 asgiref.sync.async_to_sync 包裝器來實現同步版本。

1-5. Groups

雖然基本的頻道模型足以應對基本的應用需求,但許多更高級的非同步消息使用情境需要在事件發生時同時通知多個使用者——例如想像一個實況部落格,每當有新條目發布時,每位查看者應該接收到一個長輪詢回應或 WebSocket 封包。因此,有一個可選的群組擴展,讓對群組頻道的廣播消息更容易。當然,最終使用者可以選擇只使用頻道名稱和直接發送,並構建自己的持久性/廣播系統。

1-6.Capacity

為了提供反壓(backpressure),每個頻道層中的頻道可以設置一個容量,這個容量可以根據需要由層定義(建議透過頻道層構造函數的關鍵字參數由使用者配置,並且可以根據頻道名稱或名稱前綴進行配置)。當一個頻道達到或超過容量時,嘗試向該頻道發送(send())消息可能會引發 ChannelFull 異常,這表明對於發送者來說,頻道已經超出容量。發送者如何處理這個問題將取決於具體情境;例如,一個網頁應用試圖發送回應正文時,很可能會等待頻道重新變得可用,而一個 HTTP 介面伺服器試圖發送一個請求時,則可能會放棄該請求並返回503錯誤。

對於進程本地的頻道,必須在其非本地部分應用其容量(也就是包括到!字符的部分),因此容量在其中的所有「虛擬」頻道中共享。

對於發送消息到群組時,永遠不會引發 ChannelFull 異常;相反,如果超出容量,則必須默默地丟棄消息,這與 ASGI 的最多一次(at-most-once)傳遞策略一致。

2. 規範細節

一個通道層必須提供具備以下屬性的對象(所有函數參數都是位置參數):

  • coroutine send(channel, message),接受兩個參數:要發送的通道,以 unicode 字符串表示,以及要發送的消息,作為可序列化的字典。
  • coroutine receive(channel),接受一個通道名稱並返回該通道上接收到的下一條消息。
  • coroutine new_channel(),返回一個新的進程專用通道,可以用於傳遞給本地協程或接收者。
  • 當編碼的消息超過層的大小限制時引發的異常 `MessageTooLarge`。
  • 當發送操作因目標通道已達到容量上限而失敗時引發的異常 `ChannelFull`。
  • `extensions`,一個 unicode 字符串名稱的列表,指示此層提供的擴展功能,如果不支持任何擴展,則為空列表。可能的擴展可參考 Extensions

實現群組擴展的通道層還必須提供:

  • coroutine group_add(group, channel),將一個通道添加到由羣組指定的群組中。兩者都是 unicode 字符串。如果通道已在該群組中,則函數應正常返回。
  • coroutine group_discard(group, channel),如果通道在群組中,則從群組中移除該通道,否則什麼也不做。
  • coroutine group_send(group, message),接受兩個位置參數:要發送到的群組,作為 unicode 字符串,以及要發送的消息,作為可序列化的字典。它可能會引發 `MessageTooLarge`,但不能引發 `ChannelFull`。
  • group_expiry,一個以秒為單位的整數,指定群組成員資格自最近一次 `group_add` 調用後有效的時間(請參見下面的 Persistence)。

實現清空擴展的通道層還必須提供:

  • coroutineflush(),將通道層重置為空白狀態,不包含任何消息和任何群組(如果實現了群組擴展)。此調用必須阻塞直到系統清空,並且對任何客戶端始終呈現為空,如果通道層是分佈式的。

2-1. Channel Semantics

通道必須:

  • 在只有單個讀取器和寫入器的情況下,如果是單讀取器或進程專用通道,必須完美地保持消息的順序。
  • 絕不會重複傳遞消息。
  • 不會在消息發送時阻塞(儘管它們可能會引發 ChannelFull 或 MessageTooLarge 錯誤)。
  • 能夠處理至少 1MB 大小的消息,當以 JSON 編碼時(實現可以使用更好的編碼或壓縮,只要它滿足等效的大小)。
  • 最大名稱長度至少應為 100 字節。
  • 應儘可能在所有情況下努力保持順序,但在分佈式情況下,完美的全局排序顯然是不可能的。
  • 並不期望能夠傳遞所有消息,但在正常情況下,至少 99.99% 的成功率是被期望的。實現可能需要一個“韌性測試”模式,在此模式下,它們故意比平常丟棄更多的消息,以便開發人員可以測試他們的代碼在這些情況下的處理能力。

2-2. Persistence

通道層不需要長期保存數據;群組成員資格只需要在連接存在的期間存在,而消息只需要存在到消息過期時間,通常為幾分鐘。

如果一個通道層實現了群組擴展功能,它必須在成員通道因未消費而消息過期時,至少保存該成員的群組資格,然後可以隨時移除該資格。如果一個通道之後成功投遞了消息,通道層必須在該通道上的另一條消息過期之前,不得刪除群組資格。

通道層還必須在該成員資格最晚的group_add調用之後的可配置長時間超時後刪除群組資格,預設值為86,400秒(一天)。該超時的值作為group_expiry屬性在通道層上公開。

2-3. Approximate Global Ordering

雖然要求許多實作能夠維持訊息的真正全域(跨頻道)排序是不合理的期望,但它們應該努力防止繁忙的頻道壓倒安靜的頻道。

例如,想像有兩個頻道:繁忙(busy),其流量激增至每秒 1000 個訊息,以及安靜(quiet),每秒接收一個訊息。有一個消費者正在執行 `receive([‘busy’, ‘quiet’])`,它可以大約每秒處理 200 個訊息。 在一個簡單的 for-loop 實作中,頻道層可能總是先檢查繁忙的頻道;由於它總是有可用的訊息,因此消費者甚至從未看到安靜頻道的訊息,即使該訊息是與第一批繁忙頻道訊息一起發送的。

解決這個問題的一個簡單方法是隨機排列在頻道層內檢查訊息時的頻道列表順序;還有其他更好的方法,但無論選擇哪一種,它都應該努力避免僅僅因為另一個頻道繁忙而導致訊息未被接收的情況。

2-4. Strings and Unicode

在這份文件以及所有子規範中,位元串(byte string)在 Python 2 中指的是 `str`,而在 Python 3 中則是 `bytes`。如果這種類型因為底層實作的關係仍支持 Unicode 編碼點,則所有的值應該維持在 0 至 255 範圍內。 

Unicode 字串(Unicode string)在 Python 2 中指的是 `unicode`,而在 Python 3 中則是 `str`。本文檔從不會單獨指定字串(string)——所有字串皆為上述兩種確切類型之一。 

有些序列化工具,如 JSON,無法區分位元串和 Unicode 字串;這些工具應該包含將一種類型封裝為另一種的邏輯(例如,將位元串编码成帶有特殊字元(例如 U+FFFF)的 base64 Unicode 字串)。 

頻道和群組名稱始終是 Unicode 字串,並且只能使用以下字符: 

  • ASCII 字母 
  • 數字 09 
  • 連字符 - 
  • 底線 _ 
  • 句點 . 
  • 問號 ? (僅用於標識單讀取器頻道名稱,每個名稱僅可使用一次) 
  • 驚嘆號 ! (僅用於標識特定於進程的頻道名稱,每個名稱僅可使用一次)

4. 小結

通道層(Channel Layers)是 Django Channels 中的一個關鍵組件,負責在不同進程之間進行消息傳遞。它現在僅作為 Channels 的內部使用規範,而不是整個 ASGI 的一部分。通道層的設計旨在支持異步消息傳遞,並為不同的客戶端提供穩定的消息和事件通信機制。

關鍵要點包括:

1. 信息格式:消息必須為可序列化的字典,支持的類型包括位元串、Unicode 字串、整數、浮點數、清單、字典(鍵必須是 Unicode 字串)、布林值以及 None。

2. 頻道(Channels):頻道透過 Unicode 字符串名稱識別,並使用先進先出(FIFO)語義,保證消息最多只會送達一次。頻道區分為多讀取者頻道和特定於進程的頻道(使用驚嘆號標識)。

3. 擴展功能:可選的擴展功能包括群組管理與清空頻道層,允許廣播消息和方便測試。

4. 異步支持:通道層必須提供異步方法,支持協程操作。

5. 群組(Groups):群組擴展允許將頻道組合成群組,以便針對多用戶的事件管理(如即時更新)。

6. 容量與反壓:設置頻道容量以實現反壓機制。當頻道滿載時,可能引發 `ChannelFull` 異常。

這個系統需要在消息順序、容量管理以及分散式部署策略上做到合理平衡,以確保高效可靠的消息傳遞。同時,對於處理不常見的系統故障情況,實現韌性測試是非常重要的。總體而言,通道層提供了強大的工具來支持 Django 應用程序的非同步通信和管理需求。

I. 總結

在這篇文章中,我們深入探討了 Django Channels 中的通道層(Channel Layers),了解其在實現分散式實時應用程式中的重要性及設定方法。通道層是一個可選的元件,允許應用程序實例之間進行非同步通信,而不依賴於資料庫,適合用於構建需要實時事件處理的應用。

通道層配置透過 Django 的 `CHANNEL_LAYERS` 完成,支持 Redis 及內存兩種主要後端,前者適合生產環境,後者則適合開發及測試。當需要從同步代碼中使用非同步方法時,需要使用 `asgiref.sync.async_to_sync` 進行包裝。

通道層的主要作用是將高層次事件轉發給消費者進行處理,消費者可以針對特定事件運行相應代碼,並應對客戶端的操作。關鍵的功能還包括將事件發送給群組或單一消費者,支援群組廣播等複雜的消息傳遞需求。

實現上,消費者可以透過 `self.channel_layer` 直接使用通道層,而不在消費者範圍內的代碼則可以使用 `get_channel_layer()` 獲取通道層實例。這種架構設計支持了靈活的應用場景,如即時聊天室、通知廣播等。

最後,文章指出,通道層不僅是 Django Channels 的核心組成部分,還對高效可靠的消息傳遞、容量管理、事件處理提供了穩定的支援,讓開發者能夠更方便地管理日益複雜的非同步應用程序需求。這種設計幫助開發者在分散式系統中實現高效的通信、任務卸載和韌性處理方案。

J. 系列文章