what is it?
Channels augments Django to bring WebSocket, long-poll HTTP, task offloading and other async support to your code, using familiar Django design patterns
terminology
-
Consumers are akin to regular Django views. However, regular views can only process incoming requests whereas a consumer can send and receive messages and react to a WebSocket connection being opened or closed
-
Channels are mailboxes that messages can be sent to. Each channel has a name. Anyone who has the name of a channel can send a message to that channel
-
Groups are collections of related channels. Each group has a name. Anyone who has the name of a group can add or remove a channel to the group by name and send a message to all channels in the group
implementation using redis
copy pasted it from here
requirements.txt
daphne==4.0.0
channels==4.0.0
channels-redis==4.1.0settings.py
INSTALLED_APPS = [
'daphne', # should put this on top
...
'channels',
]
# comment out WSGI_APPLICATION and add ASGI_APPLICATION
# WSGI_APPLICATION = 'django_celery_example.wsgi.application'
ASGI_APPLICATION = 'django_celery_example.asgi.application'
# add the channels config
CHANNEL_LAYERS = {
'default': {
'BACKEND': 'channels_redis.core.RedisChannelLayer',
'CONFIG': {
"hosts": [(os.environ.get("CHANNELS_REDIS", "redis://127.0.0.1:6379/0"))],
},
},
}asgi.py
import os
from channels.routing import ProtocolTypeRouter, URLRouter
from django.core.asgi import get_asgi_application
from polls import routing
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_celery_example.settings')
application = ProtocolTypeRouter({
"http": get_asgi_application(),
'websocket': URLRouter(
routing.urlpatterns
)
})- we do not need to config the HTTP router since Channels handles this for us
- the
websocketrouter along withrouting.urlpatternswas added to point to thepollsapp
polls/routing.py config
from django.urls import path
from polls import consumers
urlpatterns = [
path('ws/task_status/<task_id>/', consumers.TaskStatusConsumer.as_asgi()),
]the ws://localhost:8010/ws/task_status/{task_id}/ URL points to the consumers.TaskStatusConsumer consumer
polls/consumers.py
import json
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
from channels.generic.websocket import AsyncWebsocketConsumer
from celery.result import AsyncResult
def get_task_info(task_id):
"""
return task info according to the task_id
"""
task = AsyncResult(task_id)
state = task.state
if state == 'FAILURE':
error = str(task.result)
response = {
'state': state,
'error': error,
}
else:
response = {
'state': state,
}
return response
def notify_channel_layer(task_id):
"""
This function would be called in Celery task.
Since Celery now still not support `asyncio`, so we should use async_to_sync
to make it synchronous
https://channels.readthedocs.io/en/stable/topics/channel_layers.html#using-outside-of-consumers
"""
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
task_id,
{'type': 'update_task_status', 'data': get_task_info(task_id)}
)
class TaskStatusConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.task_id = self.scope['url_route']['kwargs']['task_id']
await self.channel_layer.group_add(
self.task_id,
self.channel_name
)
await self.accept()
await self.send(text_data=json.dumps(get_task_info(self.task_id)))
async def disconnect(self, close_code):
await self.channel_layer.group_discard(
self.task_id,
self.channel_name
)
async def update_task_status(self, event):
data = event['data']
await self.send(text_data=json.dumps(data))