我是浮士德的新用户,当我同时运行3个浮士德应用程序时,不知道如何解决这个问题。具体而言:
我有3个python文件,在每个文件中,我运行1个服务来监听Kafka服务器。每个文件包含如下代码,每个文件中的不同之处只是TOPIC_INPUT名称。
app = faust.App(
'UserInfoReceive',
broker= 'kafka://' + SERVER_INPUT + f':{DVWAP_KAFKA_PORT}',
value_serializer='raw',
)
kafka_topic = app.topic(TOPIC_INPUT)
@app.agent(kafka_topic)
async def userSettingInput(streamInput):
async for msg in streamInput:
userResgister(msg)
预期行为
Expect 3 python文件可以正常运行并监听coming Kafka事件
实际行为
它生成OSError如下img
大家好,
我是浮士德的新用户,当我同时运行3个浮士德应用程序时,不知道如何解决这个问题。具体而言:
我有3个python文件,在每个文件中,我运行1个服务来监听Kafka服务器。每个文件包含如下代码,每个文件中唯一的区别是TOPIC_INPUT名称。app = faust.App('UserInfoReceive',broker= 'Kafka://' + SERVER_INPUT + f':{DVWAP_KAFKA_PORT}',value_serializer='raw',)
Kafka_topic = app.topic(TOPIC_INPUT)
@app.agent(Kafka_topic)async def userSettingInput(streamInput):async for msg in streamInput:userResgister(msg)预期行为Expect 3 python文件可以正常运行并监听coming Kafka事件
实际行为它生成OSError如下img
版本号
Python版本:3.9浮士德版本1.10.4操作系统WSL Linux子系统在Windows上Kafka版本kafka-python==1.4.7
1条答案
按热度按时间68de4m5k1#
这是因为默认情况下每个Faust worker都使用相同的web_port(6066)。您可以在配置应用示例时为每个其他应用示例更改此配置。