如何使用faustpython包将kafka主题与web端点连接起来?

sg3maiej  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(430)

我有一个简单的应用程序,有两个功能,一个用于听主题,另一个用于web端点。我想创建服务器端事件流(sse),即文本/事件流,以便在客户端使用eventsource侦听它。
我现在有以下代码,其中每个函数都在执行特定的任务:

import faust

from faust.web import Response

app = faust.App("app1", broker="kafka://localhost:29092", value_serializer="raw")
test_topic = app.topic("test")

@app.agent(test_topic)
async def test_topic_agent(stream):
    async for value in stream:
        print(f"test_topic_agent RECEIVED -- {value!r}")
        yield value

@app.page("/")
async def index(self, request):
    return self.text("yey")

现在,我想在索引中,像这样的代码,但是使用浮士德:

import asyncio
from aiohttp import web
from aiohttp.web import Response
from aiohttp_sse import sse_response
from datetime import datetime

async def hello(request):
    loop = request.app.loop
    async with sse_response(request) as resp:
        while True:
            data = 'Server Time : {}'.format(datetime.now())
            print(data)
            await resp.send(data)
            await asyncio.sleep(1, loop=loop)
    return resp

async def index(request):
    d = """
        <html>
        <body>
            <script>
                var evtSource = new EventSource("/hello");
                evtSource.onmessage = function(e) {
                    document.getElementById('response').innerText = e.data
                }
            </script>
            <h1>Response from server:</h1>
            <div id="response"></div>
        </body>
    </html>
    """
    return Response(text=d, content_type='text/html')

app = web.Application()
app.router.add_route('GET', '/hello', hello)
app.router.add_route('GET', '/', index)
web.run_app(app, host='127.0.0.1', port=8080)

我试过这个:

import faust

from faust.web import Response

app = faust.App("app1", broker="kafka://localhost:29092", value_serializer="raw")
test_topic = app.topic("test")

# @app.agent(test_topic)

# async def test_topic_agent(stream):

# async for value in stream:

# print(f"test_topic_agent RECEIVED -- {value!r}")

# yield value

@app.page("/", name="t1")
@app.agent(test_topic, name="t")
async def index(self, request):
    return self.text("yey")

但它给了我以下错误:

Traceback (most recent call last):
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/cli/base.py", line 299, in find_app
    val = symbol_by_name(app, imp=imp)
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/utils/imports.py", line 262, in symbol_by_name
    module = imp(  # type: ignore
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/utils/imports.py", line 376, in import_from_cwd
    return imp(module, package=package)
  File "/Users/maverick/.pyenv/versions/3.8.1/lib/python3.8/importlib/__init__.py", line 127, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 1014, in _gcd_import
  File "<frozen importlib._bootstrap>", line 991, in _find_and_load
  File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 671, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 783, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/Users/maverick/company/demo1/baiohttp-demo/app1.py", line 18, in <module>
    async def index(self, request):
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/app/base.py", line 1231, in _decorator
    view = view_base.from_handler(cast(ViewHandlerFun, fun))
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/web/views.py", line 50, in from_handler
    return type(fun.__name__, (cls,), {
AttributeError: 'Agent' object has no attribute '__name__'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/maverick/.pyenv/versions/faust_demo/bin/faust", line 8, in <module>
    sys.exit(cli())
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/click/core.py", line 829, in __call__
    return self.main(*args,**kwargs)
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/click/core.py", line 781, in main
    with self.make_context(prog_name, args,**extra) as ctx:
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/cli/base.py", line 407, in make_context
    self._maybe_import_app()
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/cli/base.py", line 372, in _maybe_import_app
    find_app(appstr)
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/cli/base.py", line 303, in find_app
    val = imp(app)
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/utils/imports.py", line 376, in import_from_cwd
    return imp(module, package=package)
  File "/Users/maverick/.pyenv/versions/3.8.1/lib/python3.8/importlib/__init__.py", line 127, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 1014, in _gcd_import
  File "<frozen importlib._bootstrap>", line 991, in _find_and_load
  File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 671, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 783, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/Users/maverick/company/demo1/baiohttp-demo/app1.py", line 18, in <module>
    async def index(self, request):
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/app/base.py", line 1231, in _decorator
    view = view_base.from_handler(cast(ViewHandlerFun, fun))
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/web/views.py", line 50, in from_handler
    return type(fun.__name__, (cls,), {
AttributeError: 'Agent' object has no attribute '__name__'

我试过这个:

import faust

from faust.web import Response

app = faust.App("app1", broker="kafka://localhost:29092", value_serializer="raw")
test_topic = app.topic("test")

# @app.agent(test_topic)

# async def test_topic_agent(stream):

# async for value in stream:

# print(f"test_topic_agent RECEIVED -- {value!r}")

# yield value

@app.agent(test_topic, name="t")
@app.page("/", name="t1")
async def index(self, request):
    return self.text("yey")

但我得到以下错误:

[2020-03-28 10:32:50,676] [29976] [INFO] [^--Producer]: Creating topic 'app1-__assignor-__leader'
[2020-03-28 10:32:50,695] [29976] [INFO] [^--ReplyConsumer]: Starting...
[2020-03-28 10:32:50,695] [29976] [INFO] [^--AgentManager]: Starting...
[2020-03-28 10:32:50,695] [29976] [INFO] [^---Agent: app1.index]: Starting...
[2020-03-28 10:32:50,696] [29976] [ERROR] [^Worker]: Error: TypeError("__init__() missing 1 required positional argument: 'web'")
Traceback (most recent call last):
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/worker.py", line 273, in execute_from_commandline
    self.loop.run_until_complete(self._starting_fut)
  File "/Users/maverick/.pyenv/versions/3.8.1/lib/python3.8/asyncio/base_events.py", line 612, in run_until_complete
    return future.result()
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 736, in start
    await self._default_start()
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 743, in _default_start
    await self._actually_start()
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 767, in _actually_start
    await child.maybe_start()
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 795, in maybe_start
    await self.start()
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 736, in start
    await self._default_start()
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 743, in _default_start
    await self._actually_start()
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 767, in _actually_start
    await child.maybe_start()
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 795, in maybe_start
    await self.start()
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 736, in start
    await self._default_start()
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 743, in _default_start
    await self._actually_start()
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 760, in _actually_start
    await self.on_start()
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/agents/manager.py", line 58, in on_start
    await agent.maybe_start()
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 795, in maybe_start
    await self.start()
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 736, in start
    await self._default_start()
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 743, in _default_start
    await self._actually_start()
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 760, in _actually_start
    await self.on_start()
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/agents/agent.py", line 282, in on_start
    await self._on_start_supervisor()
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/agents/agent.py", line 312, in _on_start_supervisor
    res = await self._start_one(
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/agents/agent.py", line 251, in _start_one
    return await self._start_task(
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/agents/agent.py", line 617, in _start_task
    actor = self(
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/agents/agent.py", line 525, in __call__
    return self.actor_from_stream(stream,
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/agents/agent.py", line 552, in actor_from_stream
    res = self.fun(actual_stream)
TypeError: __init__() missing 1 required positional argument: 'web'
[2020-03-28 10:32:50,703] [29976] [INFO] [^Worker]: Stopping...
[2020-03-28 10:32:50,703] [29976] [INFO] [^-App]: Stopping...
[2020-03-28 10:32:50,703] [29976] [INFO] [^-App]: Flush producer buffer...
[2020-03-28 10:32:50,703] [29976] [INFO] [^--TableManager]: Stopping...

有办法吗?提前多谢了!

8ehkhllq

8ehkhllq1#

faust worker还将在每个示例上公开一个web服务器,默认情况下,该服务器在端口6066上运行。
服务器将使用aiohttphttpserver库,您可以利用这一点创建一个服务器端事件流(sse),如示例代码中所示。
您可以创建一个代理来读取Kafka主题 test 将更新一个变量 last_message_from_topic 对于主题的最后一条消息,此变量也将从您的网页中可见。
在索引页中( @app.page('/') )eventsource接口用于接收服务器发送的事件。它通过http连接到服务器,并从页面接收文本/事件流格式的事件 /hello 不关闭连接。
网页 /hello 每一秒都在发送一条带有Kafka主题最后一条信息的短信 test 以及服务器的当前时间。
这是我的档案 my_worker.py 代码:

import asyncio
from datetime import datetime

import faust
from aiohttp.web import Response
from aiohttp_sse import sse_response

app = faust.App(
    "app1",
    broker='kafka://localhost:9092',
    value_serializer='json',
)
test_topic = app.topic("test")

last_message_from_topic = ['No messages yet']

@app.agent(test_topic)
async def greet(greetings):
    async for greeting in greetings:
        last_message_from_topic[0] = greeting

@app.page('/hello')
async def hello(self, request):
    loop = request.app.loop
    async with sse_response(request) as resp:
        while True:
            data = f'last message from topic_test: {last_message_from_topic[0]} | '
            data += f'Server Time : {datetime.now()}'

            print(data)
            await resp.send(data)
            await asyncio.sleep(1, loop=loop)
    return resp

@app.page('/')
async def index(self, request):
    d = """
        <html>
        <body>
            <script>
                var evtSource = new EventSource("/hello");
                evtSource.onmessage = function(e) {
                    document.getElementById('response').innerText = e.data
                }
            </script>
            <h1>Response from server:</h1>
            <div id="response"></div>
        </body>
    </html>
    """
    return Response(text=d, content_type='text/html')

现在您必须使用以下命令启动faust worker:

faust -A my_worker worker -l info

在web浏览器上,您可以访问 http://localhost:6066/ :

以下是向Kafka发送有关该主题的消息的代码 test (来自另一个python文件):

import time
import json

from kafka import  KafkaProducer

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda x: json.dumps(x).encode('utf-8'))

for i in range(220):
    time.sleep(1)
    producer.send('test', value=f'Some message from kafka id {i}')
sulc1iza

sulc1iza2#

阅读浮士德网站文档,它似乎无法处理sse。 @app.agent 在使用kafka消息时回调 @app.page 处理http请求时。把它们结合起来可能是不可能的。
另一种方法是使用 faust.web 是从javascript轮询。
例如,使用:

import faust
from faust.web import Response

app = faust.App("myapp", broker="kafka://kafka:9092", value_serializer="raw")
test_topic = app.topic("test")
app.lastmsg = ""

@app.agent(test_topic)
async def test_topic_agent(stream):
    async for value in stream:
        app.lastmsg = str(value)
        yield value

@app.page("/msg")
async def msg(self, request):
    return self.text(app.lastmsg)

@app.page("/")
async def index(self, request):
    body = """
        <html>
        <body>
            <script>
                setInterval(()=>{
                    let xhr = new XMLHttpRequest();
                    xhr.open('GET', '/msg');
                    xhr.send();
                    xhr.onload = function() {
                        if (xhr.status == 200) {
                            document.getElementById('response').innerText = xhr.response
                        }
                    }
                },1000);
            </script>
            <h1>Response from server:</h1>
            <div id="response"></div>
        </body>
    </html>
    """
    return self.html(body)

这个天真的实现将kafka消息存储到 app.lastmsg 这可以通过api获得 /msg .
为了使用sse,您可以使用 asyncio 对于web部件和 faust 对于Kafka消费者来说。

相关问题