Django 3.1:StreamingHttpResponse与异步生成器

u0sqgete  于 2023-04-22  发布在  Go
关注(0)|答案(4)|浏览(521)

Django 3.1的文档是这样描述异步视图的:
主要的好处是能够在不使用Python线程的情况下为数百个连接提供服务。这允许您使用慢速流,长轮询和其他令人兴奋的响应类型。
我相信“慢流”意味着我们可以实现SSE视图,而不会为每个客户端独占一个线程,所以我尝试绘制一个简单的视图,如下所示:

async def stream(request):

    async def event_stream():
        while True:
            yield 'data: The server time is: %s\n\n' % datetime.datetime.now()
            await asyncio.sleep(1)

    return StreamingHttpResponse(event_stream(), content_type='text/event-stream')

(note:我改编了this response的代码)
不幸的是,当调用此视图时,它会引发以下异常:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/asgiref/sync.py", line 330, in thread_handler
    raise exc_info[1]
  File "/usr/local/lib/python3.7/site-packages/django/core/handlers/exception.py", line 38, in inner
    response = await get_response(request)
  File "/usr/local/lib/python3.7/site-packages/django/core/handlers/base.py", line 231, in _get_response_async
    response = await wrapped_callback(request, *callback_args, **callback_kwargs)
  File "./chat/views.py", line 144, in watch
    return StreamingHttpResponse(event_stream(), content_type='text/event-stream')
  File "/usr/local/lib/python3.7/site-packages/django/http/response.py", line 367, in __init__
    self.streaming_content = streaming_content
  File "/usr/local/lib/python3.7/site-packages/django/http/response.py", line 382, in streaming_content
    self._set_streaming_content(value)
  File "/usr/local/lib/python3.7/site-packages/django/http/response.py", line 386, in _set_streaming_content
    self._iterator = iter(value)
TypeError: 'async_generator' object is not iterable

对我来说,这表明StreamingHttpResponse目前不支持异步生成器。
我尝试修改StreamingHttpResponse以使用async for,但我做不了太多。
你知道我该怎么做吗

ffscu2ro

ffscu2ro1#

老实说,Django本身并不支持它,但我有一个使用Daphne的解决方案(它也在Django通道中使用)。
创建了自己的StreamingHttpResponse类,能够从异步方法中检索数据流,并将其提供给Django的同步部分。

import asyncio

# By design asyncio does not allow its event loop to be nested.
# Trying to do so will give the error "RuntimeError: This event loop is already running".
# This library solves that problem.
import nest_asyncio

from django.http.response import StreamingHttpResponse

class AsyncStreamingHttpResponse(StreamingHttpResponse):

    def __init__(self, streaming_content=(), *args, **kwargs):
        sync_streaming_content = self.get_sync_iterator(streaming_content)
        super().__init__(streaming_content=sync_streaming_content, *args, **kwargs)

    @staticmethod
    async def convert_async_iterable(stream):
        """Accepts async_generator and async_iterator"""
        return iter([chunk async for chunk in stream])

    def get_sync_iterator(self, async_iterable):
        nest_asyncio.apply()

        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        result = loop.run_until_complete(self.convert_async_iterable(async_iterable))
        return result

此外,您需要使用Daphne运行Django Web服务器以正确支持服务器发送事件(SSE)。它由“Django Software Foundation”正式支持,并且具有与gunicorn类似的语法,但使用asgi.py而不是wsgi.py
要使用它-您可以使用以下命令安装:pip install daphne
并将命令更改为:python manage.py runserver
比如说daphne -b 0.0.0.0 -p 8000 sse_demo.asgi:application
不确定它是否能与gunicorn一起工作。
如果你还有问题就告诉我。

w6lpcovy

w6lpcovy2#

另一种执行SSE的方法是使用特殊的库django-eventstream
将以下内容添加到将使用数据的HTML页面:

<script src="{% static 'django_eventstream/eventsource.min.js' %}"></script>
<script src="{% static 'django_eventstream/reconnecting-eventsource.js' %}"></script>

var es = new ReconnectingEventSource('/events/');

es.addEventListener('message', function (e) {
    console.log(e.data);
}, false);

es.addEventListener('stream-reset', function (e) {
    // ... client fell behind, reinitialize ...
}, false);

对于后端,你需要正确地设置Django,然后你就可以在任何需要做服务器端事件(SSE)的视图/任务/信号/方法中调用以下方法:
添加以下将生成数据(事件)的视图:

# from django_eventstream import send_event

send_event('test', 'message', {'text': 'hello world'})
hvvq6cgz

hvvq6cgz3#

我创建了一个名为stream的装饰器,它可以与一个协程函数一起使用,使其与Django的StreamingHttpResponse兼容。下面是一个例子:

import asyncio
import functools

from django.http import StreamingHttpResponse

def stream(coroutine_function):
    @functools.wraps(coroutine_function)
    def wrapper(*args, **kwargs):
        coroutine = coroutine_function(*args, **kwargs)
        try:
            while True:
                yield asyncio.run(coroutine.__anext__())
        except StopAsyncIteration:
            pass
    return wrapper

@stream
async def chunks():
    for char in 'Hello, world!':
        yield char
        await asyncio.sleep(1)

async def index(request):
    return StreamingHttpResponse(chunks())

我还需要添加nest_asyncio并在settings.py文件的顶部调用apply(),如下所示:

import nest_asyncio
nest_asyncio.apply()

nest_asyncio依赖项支持从stream装饰器创建的wrapper函数调用asyncio.run
最后,Django的asgi可以使用uvicorngunicorn运行,如下所示:

$ gunicorn -k uvicorn.workers.UvicornWorker www.asgi:application
zaqlnxep

zaqlnxep4#

你可以使用django-channel
Channels增强了Django,为您的代码带来了WebSocket,long-poll HTTP,任务卸载和其他异步支持,使用熟悉的Django设计模式和灵活的底层框架,不仅可以自定义行为,还可以为您自己的协议和需求编写支持。

相关问题