資料庫存取 – Django Channels:實現 WebSocket 異步通信,打造高效的長連接系統

A. Overview

Django ORM是同步的程式碼模組,這意味著它以同步方式執行操作。因此,如果您想從異步程式碼中訪問Django ORM,就需要進行一些特殊處理,確保資料庫連接能夠正確關閉。

當您使用 `SyncConsumer` 或基於它的任何消費者(例如 `JsonWebsocketConsumer`)時,不需要進行任何特殊處理。這是因為在這種情況下,所有的程式碼都以同步模式運行,Django Channels 將會負責進行必要的清理工作,這是 `SyncConsumer` 模型的一部分。

然而,當您編寫異步程式碼時,則需要在一個安全的同步上下文中調用資料庫方法。這可以通過以下兩種方式進行實現:

1. 使用 `database_sync_to_async`:這是一個函數,可將同步資料庫方法封裝使其能在異步環境中安全執行。例如:


from channels.db import database_sync_to_async

async def my_async_function():

result = await database_sync_to_async(MyModel.objects.get)(id=1)

2. 使用異步方法:許多模型方法有相應的異步版本,這些方法的名稱以 `a` 作為前綴,例如 `Model.objects.aget()`。這意味著在異步環境中,您可以直接使用這些方法來獲取資料而不需再次封裝。例如:


async def my_async_function():

result = await MyModel.objects.aget(id=1)

透過這些方法,您可以確保在異步程式設計中安全而有效地進行資料庫操作,而不會意外地持有不必要的資料庫連接。這一點在高流量和高併發的異步應用中尤為重要,能夠有效避免潛在的資源泄漏問題。

B. 資料庫連線

Django Channels 引入了非同步功能,使得處理 WebSockets 和其他長連接協議變得更加便捷。然而,這也可能會帶來一些挑戰,尤其是在管理資料庫連線方面。

當您使用同步消費者(即利用多執行緒來處理請求)時,可能每個執行緒都會開啟一個資料庫連線。這意味著,如果您的應用程式需要處理大量並發連線,資料庫連線數可能會迅速增加。為了控制這一情況,您可以通過設定 `ASGI_THREADS` 環境變數來限制最大執行緒數量。目前,對於 Python 3.7 及以下版本,一般預設設定為「CPU 數量 x 5」,而對於 Python 3.8 及以上版本,則設定為`min(32, os.cpu_count() + 4)`。

如要更有效地管理資料庫連線而不會耗費過多資源,可以考慮使用非同步消費者。這將允許您的應用程式更靈活地在需要時才使用執行緒,尤其是在需要訪問 Django 的 ORM 時透過 `database_sync_to_async` 來完成同步切換。

當使用非同步消費者時,Channels 會自動調用 Django 的 `close_old_connections` 方法來控制連線,這樣可以在建立新連線、結束連線,或從客戶端接收數據時關閉不必要的舊連線。然而,當消費者發送數據時,並不會自動關閉連線。因為系統無法預測這次發送是否僅此一次,或是連續的多次發送(連接頻繁開啟關閉會影響效能)。

因此,若您的應用程式涉及長時間運行的非同步消費者,您應定期手動調用 `close_old_connections` 以確保系統效能不因過多的閒置連線而降低。這樣可以讓您在享受非同步處理優勢的同時,保持資料庫連接的穩定性與高效性。

C. database_sync_to_async

`channels.db.database_sync_to_async` 是 `asgiref.sync.sync_to_async` 的一個版本,它在結束時還會清理資料庫連線。

使用方式如下:

from channels.db import database_sync_to_async

async def connect(self):
    self.username = await database_sync_to_async(get_name)()

def get_name(self):
    return User.objects.all()[0].name
from channels.db import database_sync_to_async

async def connect(self):
    self.username = await get_name()

@database_sync_to_async
def get_name(self):
    return User.objects.all()[0].name

D. aclose_old_connections

`django.db.aclose_old_connections` 是 Django 的 `close_old_connections` 的非同步封裝版本。在使用長時間持續運行的 `AsyncConsumer` 並且呼叫 Django ORM 時,定期呼叫此函數是很重要的。最好是在一段時間內進行第一次查詢之前呼叫此函數。例如,如果 Consumer 被一個 channels layer 事件喚醒並需要進行一些 ORM 查詢來判斷要發送給客戶端的內容,這函數應在進行這些查詢之前被呼叫。雖然多次呼叫此函數並非一定是不好的,但這需要切換到同步代碼,從而產生小的性能負擔。

E. 總結

在 Django Channels 中,處理資料庫存取需要考慮同步與異步編程之間的差異。 Django 的 ORM 是同步的,因此在異步環境中需要進行額外的處理。

1. 同步與異步操作

  • 同步消費者(如 `SyncConsumer` 和 `JsonWebsocketConsumer`)不需要特殊處理來訪問 ORM,Channels 會自動進行必要的資源清理。
  • 在異步消費者中,則需要使用工具如 `database_sync_to_async` 或內建的異步模型方法(以 `a` 開頭的方法,如 `aget()`)來安全地訪問資料庫。

2. 資料庫連接管理

  • 當應用程序需要高併發處理時,資料庫連接數可能迅速增加。使用同步消費者時,可以透過設定 `ASGI_THREADS` 來限制最大執行緒數。
  • 使用非同步消費者將更有效地使用資源,可透過 `database_sync_to_async` 切換至同步執行。

3. 連接清理

  • `database_sync_to_async` 可以提供資料庫連接清理的功能。
  • 定期調用 `aclose_old_connections` 用於在異步消費者中保持連接穩定性和系統效能。

這些措施能夠有效管理 Django Channels 應用中的資料庫連接,特別是在處理高並發和長連接情況下,避免資源泄漏和維持系統性能。

F. 系列文章