Consumers深入說明 – Django Channels:為 Django 提供異步通信與長連接協議支持

A. 簡介

Channels 是基於一個名為 ASGI(Asynchronous Server Gateway Interface)的基礎低階規範構建的。ASGI 更加注重可互操作性,而不是用于撰寫複雜的應用程式。因此,Channels 提供了 Consumers,一種豐富的抽象,讓您可以輕鬆地創建 ASGI 應用程式。

Consumers 特別做到以下幾件事:

1. 結構化代碼:將您的代碼組織成一系列函數,以便在事件發生時被調用,而不是讓您自己編寫事件迴圈。這使得代碼的組織和維護更加清晰和易於管理。

2. 支援同步與非同步代碼:允許您編寫同步或非同步代碼,並為您處理協作和執行緒問題。這意味著您可以專注於業務邏輯,而不必擔心底層的執行細節,尤其是在處理網路 I/O 或長連接時。

當然,您可以選擇不使用 Consumers,轉而使用 Channels 其他部分的功能,例如路由、會話管理和身份驗證,將其與任何 ASGI 應用混合使用。然而,使用 Consumers 通常是撰寫應用代碼的最佳方式,因為它簡化了開發過程,提高了代碼的可讀性和可維護性。

這樣的設計讓 Django Channels 成為讓 Django 應用支援異步通信和長連接協議(如 WebSockets)的理想選擇。Channels 使得將 Django 應用擴展到即時通信或其他需要持久連接的領域變得更加容易。

B. Basic Layout

在 Django Channels 中,消費者(Consumer)有兩種類型:AsyncConsumer 和 SyncConsumer。AsyncConsumer 適合撰寫異步程式碼,尤其是需要處理長時間運行或 I/O 操作的任務,如 WebSockets。SyncConsumer 則在線程池同步運行代碼,適合短暫且快速的操作。選擇哪一類型的消費者取決於應用的需求,異步消費者適於高併發場景,而同步的則適合簡單、快速的任務。

以下是一個SyncConsumer的範例:

from channels.consumer import SyncConsumer

class EchoConsumer(SyncConsumer):

    def websocket_connect(self, event):
        self.send({
            "type": "websocket.accept",
        })

    def websocket_receive(self, event):
        self.send({
            "type": "websocket.send",
            "text": event["text"],
        })

這是一個非常簡單的 WebSocket 回聲服務器 —— 它會接受所有傳入的 WebSocket 連結,並回覆所有傳入的 WebSocket 訊息文本框架,回覆的內容與接收到的相同。

消費者(Consumers)是基於一系列命名方法結構而成,這些方法對應他們將接收到的訊息類型值,其中任何”.”將被替換為”_”. 以上提到的兩個處理器分別處理 `websocket.connect` 和 `websocket.receive` 訊息。

我們是如何知道將會收到哪些事件類型以及事件中包含什麼(如 `websocket.receive` 包含一個 `text` 鍵)的呢?這是因為我們依據 ASGI WebSocket 規範來設計,它告訴我們 WebSockets 是如何展現的。

除此之外,唯一其他的基本 API 是 `self.send(event)`。這讓你可以依照協議將事件發送回客戶端或協議服務器——如果你查看 WebSocket 協議,就會發現我們發送的字典就是向客戶端發送文本框架的方式。

`AsyncConsumer` 的結構非常相似,但所有的處理方法必須是協程,而 `self.send` 也是一個協程:

from channels.consumer import AsyncConsumer

class EchoConsumer(AsyncConsumer):

    async def websocket_connect(self, event):
        await self.send({
            "type": "websocket.accept",
        })

    async def websocket_receive(self, event):
        await self.send({
            "type": "websocket.send",
            "text": event["text"],
        })

1. 如何選擇使用AsyncConsumer或SyncConsumer

在選擇使用 Django Channels 中的 `AsyncConsumer` 或 `SyncConsumer` 時,需考慮應用的性能需求和操作性質。Django Channels 提供兩種主要的消費者(Consumer)類型:`AsyncConsumer` 和 `SyncConsumer`。選擇使用哪一種主要取決於你的應用需要支持的異步特性,例如 WebSockets 和長輪詢(long-polling)。

1-1. 使用AsyncConsumer

`AsyncConsumer` 是專為與異步代碼進行交互而設計的。當你的應用需要支援高併發和非阻塞 I/O 操作時,`AsyncConsumer` 是理想的選擇。這在使用 WebSockets 或需要同時處理多個網頁請求(例如使用 HTTPX 擷取多個頁面的資料)時特別有用。通過利用 Python 的 `asyncio` 模塊,`AsyncConsumer` 提升了效率和延展性。然而,要注意的是,如果在 `AsyncConsumer` 中調用耗時的同步函數,這可能會導致整個事件循環(event loop)被阻塞或延遲。因此,只有在確保操作以異步方式進行且使用異步原生庫時,才選擇 `AsyncConsumer`。

如果你在 `AsyncConsumer` 中確實需要調用同步函數,一個解決方案是使用 `asgiref.sync.sync_to_async` 工具。這個工具能將同步可調用物件轉換為異步協程,使得可以在異步環境下調用同步代碼而不阻塞事件循環。這對於擴展應用程序的處理能力非常重要,特別是在處理 WebSockets 和其他需要持久連接的協議時。

1-2. 使用SyncConsumer

相反地,當應用需要調用傳統的同步操作,如 Django ORM 或其他同步代碼,應考慮使用 `SyncConsumer`。`SyncConsumer` 在獨立執行緒中運行,這讓它能夠有效地避免同步操作(如 ORM 查詢)造成的阻塞影響。在預設情況下,尤其是大多數處理過程中,選擇 `SyncConsumer` 可確保系統的穩定性和效率。這是因為大部分同步代碼需要簡單地運行在其設計的線程內而不是事件循環中,特別是當你的應用並未經常需要可平行處理的長時間運行任務時。

總結來說,選擇 `AsyncConsumer` 或 `SyncConsumer` 應基於具體的應用需求。對於 WebSockets 和異步任務,`AsyncConsumer` 提供了強大的支持;而對於處理同步邏輯和避免複雜性,`SyncConsumer` 是更好的替代方案。

2. 關閉Consumers

當附加到您的消費者(consumer)的套接字或連接關閉時——無論是由您或是客戶端關閉——您很可能會收到一個事件通知(例如,`http.disconnect`或`websocket.disconnect`),並且您的應用實例會在短時間內執行相應操作。

完成斷開連接後的清理工作後,您需要引發`channels.exceptions.StopConsumer`例外,以清楚地停止ASGI應用並讓伺服器進行清理。如果您不引發此例外並讓系統持續執行的話,伺服器會在超出應用關閉超時時間後(Daphne預設為10秒)強制終止您的應用並發出警告。

下文將介紹通用消費者如何為您自動完成這項工作,因此這部分只有在您基於`AsyncConsumer`或`SyncConsumer`編寫自己的消費者類時才需要關注。然而,如果您重寫了它們的`__call__`方法,或阻止其調用的方法返回,您可能仍然會遇到此問題;如果您想獲取更多信息,可以查看其源代碼。

另外,如果您啟動了自己的背景協程,確保在連接結束時也將它們關閉,否則這些協程會滲漏到伺服器中。這樣會造成不必要的資源消耗與管理負擔。

這裡對於任何想要更好處理連接斷開及清理過程的開發者提供了一個基礎指引。正確處理這些過程不僅有助於系統資源的有效利用,也能夠提升應用的穩定性和響應速度。記住要時刻關注伺服器的超時設定以及異步資源的管理,這樣才能確保Django Channels應用的良好運行。

3. Channel Layers

在 Django Channels 中,消費者(Consumers)是處理異步通信的核心組件之一。它們除了負責管理 WebSocket 連接或其他協議的邏輯外,還可以與 Channel 層進行交互,以支持消息的傳遞。

Channel 層允許不同的消費者之間進行點對點(one-to-one)的消息傳遞,或者通過群組(groups)這種廣播系統來分發消息。這意味著你可以設計你的應用,使之能夠將信息發送給特定的消費者,或者廣播給訂閱了某個群組的所有消費者。例如,在一個多人聊天室應用中,當某位用戶發送消息時,這條消息可以被廣播給該聊天室中的所有用戶。

預設情況下,消費者將使用默認的 channel layer,這是在設置中配置的。但如果在子類化(subclassing)任意提供的 Consumer 類時設置了 `channel_layer_alias` 屬性,你可以改變這個行為。`channel_layer_alias` 屬性允許你指定要使用的不同的 channel layer,這對於多環境設置或需要使用不同通訊策略的特殊情境中特別有用。

這種設計使 Django Channels 成為一個強大且靈活的工具,可以輕鬆實現複雜的實時通信功能。需要注意的是,設置和操作 Channel 層需要考慮並發與延遲等因素,以確保應用的性能和可靠性。

C. Scope

在 Django Channels 中,消費者(Consumers)在被調用時會接收到一個「作用域」(scope),這個作用域包含許多你在 Django 視圖中的 request 對象上可以找到的信息。在消費者的方法中,你可以透過 `self.scope` 來訪問這些信息。

作用域是 ASGI 規範的一部分,但以下是一些你可能會常用的項目:

1. `scope[“path”]`:這表示請求的路徑(用於 HTTP 和 WebSocket)。

2. `scope[“headers”]`:包含請求的原始名稱/值標頭對(用於 HTTP 和 WebSocket)。

3. `scope[“method”]`:表示請求所使用的方法名稱(僅適用於 HTTP)。

如果你啟用了像身份驗證這類功能,也能透過 `scope[“user”]` 來訪問 user 對象。例如,URLRouter 會將來自 URL 的捕獲群組放入 `scope[“url_route”]`。

總體來說,作用域是獲取連接信息的地方,並且中間件會將其想讓你訪問的屬性放在這裡(就像 Django 的中間件會新增一些屬性到 request 上一樣)。

D. Generic Consumers

以上所看到的是一個適用於任何協議的消費者(Consumer)的基本佈局。就像 Django 的generic view一樣,Channels 提供了通用消費者,它們將常見的功能封裝起來,讓你不需要重寫它,特別是針對 HTTP 和 WebSocket 處理。

1. WebsocketConsumer

在 Django Channels 中,可以使用 `channels.generic.websocket.WebsocketConsumer` 來處理 WebSocket 連接。這個消費者(Consumer)將原本冗長的 ASGI 訊息發送和接收過程包裝起來,使開發者只需處理文字或二進制框架即可。

from channels.generic.websocket import WebsocketConsumer

class MyConsumer(WebsocketConsumer):
    groups = ["broadcast"]

    def connect(self):
        # Called on connection.
        # To accept the connection call:
        self.accept()
        # Or accept the connection and specify a chosen subprotocol.
        # A list of subprotocols specified by the connecting client
        # will be available in self.scope['subprotocols']
        self.accept("subprotocol")
        # To reject the connection, call:
        self.close()

    def receive(self, text_data=None, bytes_data=None):
        # Called with either text_data or bytes_data for each frame
        # You can call:
        self.send(text_data="Hello world!")
        # Or, to send a binary frame:
        self.send(bytes_data="Hello world!")
        # Want to force-close the connection? Call:
        self.close()
        # Or add a custom WebSocket error code!
        self.close(code=4123)

    def disconnect(self, close_code):
        # Called when the socket closes

您還可以在 `connect` 方法內的任何地方拋出 `channels.exceptions.AcceptConnection` 或 `channels.exceptions.DenyConnection`,以接受或拒絕連接,這樣您就可以使用不需要混入類(mixins)即可重複使用的身份驗證或限速代碼。

一個 `WebsocketConsumer` 的通道會自動在連接時添加到其 `groups` 類屬性中提到的任何群組中,並在斷開連接時從這些群組中移除。`groups` 必須是可迭代的,並且作為通道後端設置的通道層必須支持群組功能(`channels.layers.InMemoryChannelLayer` 和 `channels_redis.core.RedisChannelLayer` 都支持群組)。如果未配置通道層或通道層不支持群組,則連接到具有非空 `groups` 屬性的 `WebsocketConsumer` 將會引發 `channels.exceptions.InvalidChannelLayerError`。

2. AsyncWebsocketConsumer

可作為 `channels.generic.websocket.AsyncWebsocketConsumer` 使用,這個類別的方法和介面與 `WebsocketConsumer` 完全相同,但所有內容都是異步的,並且您需要實作的函數也必須是異步的:

from channels.generic.websocket import AsyncWebsocketConsumer

class MyConsumer(AsyncWebsocketConsumer):
    groups = ["broadcast"]

    async def connect(self):
        # Called on connection.
        # To accept the connection call:
        await self.accept()
        # Or accept the connection and specify a chosen subprotocol.
        # A list of subprotocols specified by the connecting client
        # will be available in self.scope['subprotocols']
        await self.accept("subprotocol")
        # To reject the connection, call:
        await self.close()

    async def receive(self, text_data=None, bytes_data=None):
        # Called with either text_data or bytes_data for each frame
        # You can call:
        await self.send(text_data="Hello world!")
        # Or, to send a binary frame:
        await self.send(bytes_data="Hello world!")
        # Want to force-close the connection? Call:
        await self.close()
        # Or add a custom WebSocket error code!
        await self.close(code=4123)

    async def disconnect(self, close_code):
        # Called when the socket closes

3. JsonWebsocketConsumer

JsonWebsocketConsumer 類別可以作為 channels.generic.websocket.JsonWebsocketConsumer 使用。這個類別的運作方式類似於 WebsocketConsumer,不過它會自動將發送和接收的 WebSocket 文本框架編碼和解碼為 JSON。

唯一的 API 差異有:

1. 你的 `receive_json` 方法必須接受一個參數 `content`,這個參數就是解碼後的 JSON 物件。

2. `self.send_json` 方法僅接受一個參數 `content`,系統會自動為你將此內容編碼為 JSON。

如果您想要自訂 JSON 的編碼和解碼,可以覆寫 `encode_json` 和 `decode_json` 類別方法。

4. AsyncJsonWebsocketConsumer

JsonWebsocketConsumer 的異步版本,可作為 channels.generic.websocket.AsyncJsonWebsocketConsumer 使用。請注意,甚至 encode_json 和 decode_json 也是異步函數。

5. AsyncHttpConsumer

作為 `channels.generic.http.AsyncHttpConsumer` 提供,這提供了用於實現 HTTP 端點的基本原語:

from channels.generic.http import AsyncHttpConsumer

class BasicHttpConsumer(AsyncHttpConsumer):
    async def handle(self, body):
        await asyncio.sleep(10)
        await self.send_response(200, b"Your response bytes", headers=[
            (b"Content-Type", b"text/plain"),
        ])

您需要實作您自己的 `handle` 方法。該方法接收整個請求體作為一個字節串(bytestring)。標頭可以作為元組列表或字典傳遞。回應體的內容預期為一個字節串。

如果您希望在斷開連接時執行一些代碼,您也可以實作一個 `disconnect` 方法。例如,用於關閉您啟動的任何協程。即使是在非正常斷開連接的情況下,這段代碼仍會運行,所以不要期望 handle 能夠乾淨地完成運行。

如果您需要對回應進行更多的控制,例如為了實作長輪詢,請使用更底層的 `self.send_headers` 和 `self.send_body` 方法。這個例子已經提到通道層(channel layers),這個部分將會在後面詳細說明:

import json
from channels.generic.http import AsyncHttpConsumer

class LongPollConsumer(AsyncHttpConsumer):
    async def handle(self, body):
        await self.send_headers(headers=[
            (b"Content-Type", b"application/json"),
        ])
        # Headers are only sent after the first body event.
        # Set "more_body" to tell the interface server to not
        # finish the response yet:
        await self.send_body(b"", more_body=True)

    async def chat_message(self, event):
        # Send JSON and finish the response:
        await self.send_body(json.dumps(event).encode("utf-8"))

5-1. Server-sent events

在 Djanog Channels 中,您可以利用異步通信的特性,透過長連接協議來實現服務端發送事件。SSE 是一種允許伺服器在建立的 HTTP 連接上推送更新給客戶端的技術,這在需要實時通知或更新的應用中非常有用。

以下是如何實現 SSE HTTP 端點的一些基本步驟和解釋:

a. 設置 Django Channels

確保您已經在專案中設置好 Django Channels。這通常包括在 settings.py 中配置 ASGI 應用及相關配置。

b. 創建消費者 (Consumer)

消費者是處理連接的地方。您可以創建一個 HTTP 消費者來處理 SSE 連接。這個消費者需要能夠保持連接並持續發送數據更新。

c. 編寫 SSE 消費者邏輯

在您的消費者中,利用 `async` 和 `await` 關鍵字來編寫邏輯。如果事件發生,如數據庫的更新,您可以使用 consumer 的 `send` 方法向客戶端發送消息。例如:

import asyncio
from channels.generic.http import AsyncHttpConsumer

class SSEConsumer(AsyncHttpConsumer):
    async def handle(self, body):
        self.scope['headers'].append((b'Cache-Control', b'no-cache'))
        self.scope['headers'].append((b'Content-Type', b'text/event-stream'))
        await self.send_headers()

        while True:
            # This loop can push data every so often, simulate it with asyncio.sleep
            await self.send_body(b"data: The current time is 1234567890\n\n", more_body=True)
            await asyncio.sleep(1)

d. 配置路由

配置 URL 路由以便不同的請求能夠正確地被分派到您的 SSE 消費者。您需要在 Django 的路由中添加相應的配置來處理這些 URL 請求。

e. 客戶端處理

在客戶端,使用 JavaScript 的 `EventSource` API 來接收 SSE 更新。像這樣:

const eventSource = new EventSource('/your-sse-endpoint-url/');

eventSource.onmessage = function(event) {
    console.log(event.data);
};

eventSource.onerror = function(event) {
    console.error("An error occurred: ", event);
};

SSE 有一些優點,比如使用單一的連接通道來持續發送消息到客戶端,並且相對於 WebSocket,更加簡單和容易實現;但缺點是 SSE 隻支持單向通信,無法從客戶端發送消息給伺服器。不過,這對於某些應用場景,如即時資料推送和更新,是非常理想的選擇。

使用 Django Channels 實現 SSE 可以充分利用 Django 的異步處理能力,使您的應用程式在需要實時通信的情況下運行得更流暢和高效。

E. 總結

文章中根據官方文件對Consumer的說明,簡單的翻譯成了繁體中文,同時也增加了些許補充。文章探討了 Django Channels 中 Consumers 的使用,強調其在支持異步通信和長連接協議上的重要性。Channels 基於 ASGI(Asynchronous Server Gateway Interface)構建,允許開發者使用 Consumers 來簡化開發過程,提升代碼的可讀性和維護性。文章詳細描述了 Consumers 如何支持同步與非同步代碼,並提供了多種 Consumer 類型的使用範例,如 WebsocketConsumer 和 AsyncWebsocketConsumer,展示了它們在處理 WebSocket 連接的過程中提供的靈活性和便利性。

F. 系列文章