what is it?

Channels augments Django to bring WebSocket, long-poll HTTP, task offloading and other async support to your code, using familiar Django design patterns

terminology

  • Consumers are akin to regular Django views. However, regular views can only process incoming requests whereas a consumer can send and receive messages and react to a WebSocket connection being opened or closed

  • Channels are mailboxes that messages can be sent to. Each channel has a name. Anyone who has the name of a channel can send a message to that channel

  • Groups are collections of related channels. Each group has a name. Anyone who has the name of a group can add or remove a channel to the group by name and send a message to all channels in the group

implementation using redis

copy pasted it from here

requirements.txt

requirements.txt
daphne==4.0.0
channels==4.0.0
channels-redis==4.1.0

settings.py

settings.py
INSTALLED_APPS = [
    'daphne',  # should put this on top
 
    ...
 
    'channels',
]
 
# comment out WSGI_APPLICATION and add ASGI_APPLICATION
# WSGI_APPLICATION = 'django_celery_example.wsgi.application'
ASGI_APPLICATION = 'django_celery_example.asgi.application'
 
# add the channels config
CHANNEL_LAYERS = {
    'default': {
        'BACKEND': 'channels_redis.core.RedisChannelLayer',
        'CONFIG': {
            "hosts": [(os.environ.get("CHANNELS_REDIS", "redis://127.0.0.1:6379/0"))],
        },
    },
}

asgi.py

asgi.py
import os
 
from channels.routing import ProtocolTypeRouter, URLRouter
from django.core.asgi import get_asgi_application
 
from polls import routing
 
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_celery_example.settings')
 
application = ProtocolTypeRouter({
    "http": get_asgi_application(),
    'websocket': URLRouter(
        routing.urlpatterns
    )
})
  • we do not need to config the HTTP router since Channels handles this for us
  • the websocket router along with routing.urlpatterns was added to point to the polls app

polls/routing.py config

routing.py
from django.urls import path
 
from polls import consumers
 
 
urlpatterns = [
    path('ws/task_status/<task_id>/', consumers.TaskStatusConsumer.as_asgi()),
]

the ws://localhost:8010/ws/task_status/{task_id}/ URL points to the consumers.TaskStatusConsumer consumer

polls/consumers.py

consumers.py
import json
 
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
from channels.generic.websocket import AsyncWebsocketConsumer
from celery.result import AsyncResult
 
 
def get_task_info(task_id):
    """
    return task info according to the task_id
    """
    task = AsyncResult(task_id)
    state = task.state
 
    if state == 'FAILURE':
        error = str(task.result)
        response = {
            'state': state,
            'error': error,
        }
    else:
        response = {
            'state': state,
        }
    return response
 
 
def notify_channel_layer(task_id):
    """
    This function would be called in Celery task.
 
    Since Celery now still not support `asyncio`, so we should use async_to_sync
    to make it synchronous
 
    https://channels.readthedocs.io/en/stable/topics/channel_layers.html#using-outside-of-consumers
    """
    channel_layer = get_channel_layer()
    async_to_sync(channel_layer.group_send)(
        task_id,
        {'type': 'update_task_status', 'data': get_task_info(task_id)}
    )
 
 
class TaskStatusConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.task_id = self.scope['url_route']['kwargs']['task_id']
 
        await self.channel_layer.group_add(
            self.task_id,
            self.channel_name
        )
 
        await self.accept()
 
        await self.send(text_data=json.dumps(get_task_info(self.task_id)))
 
    async def disconnect(self, close_code):
        await self.channel_layer.group_discard(
            self.task_id,
            self.channel_name
        )
 
    async def update_task_status(self, event):
        data = event['data']
 
        await self.send(text_data=json.dumps(data))