我目前正在尝试使用confluent_Kafka在Python中允许两个脚本通信。
脚本1向脚本2生成一条温度阅读消息,脚本2使用来自s1的消息,并向脚本1返回当前温度,然后将温度打印到控制台。
下面的代码对我来说非常挑剔,有时候可以工作,但大多数时候script2.py会运行,但不消耗任何消息。我想知道这是否可能,或者是否有更好的替代方案在互联网上两个不同的python脚本之间通信。
脚本1.py
from confluent_kafka import Consumer, Producer
#from confluent_kafka import Producer
from dotenv import load_dotenv
import os
load_dotenv(".env")
#Gathers sensitive data from the .env file
bootstrap_server = os.getenv("BOOTSTRAP_SERVER")
sasl_user_name = os.getenv("CLIENT_ID")
sasl_password = os.getenv("CLIENT_SECRET")
#Set up the Kafka producer
p = Producer({
'bootstrap.servers': bootstrap_server,
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'PLAIN',
'sasl.username': sasl_user_name,
'sasl.password': sasl_password,
})
c = Consumer({
'bootstrap.servers': bootstrap_server,
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'PLAIN',
'sasl.username': sasl_user_name,
'sasl.password': sasl_password,
'group.id': 'script1-group',
'enable.auto.commit': False,
'auto.offset.reset': 'latest',
})
def delivery_report(err, msg):
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
p.poll(0)
data = 'temperature'
p.produce('script2', data.encode('utf-8'), callback=delivery_report)
p.flush()
c.subscribe(['script1'])
x = True
while x == True:
msg = c.poll(1.0)
if msg is None:
continue
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
temp = msg.value().decode('utf-8')
print("The temperature is " + temp)
x = False
脚本2.py
from confluent_kafka import Consumer, Producer
#from confluent_kafka import Producer
from dotenv import load_dotenv
import os
load_dotenv(".env")
#Gathers sensitive data from the .env file
bootstrap_server = os.getenv("BOOTSTRAP_SERVER")
sasl_user_name = os.getenv("CLIENT_ID")
sasl_password = os.getenv("CLIENT_SECRET")
#Set up the Kafka producer
p = Producer({
'bootstrap.servers': bootstrap_server,
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'PLAIN',
'sasl.username': sasl_user_name,
'sasl.password': sasl_password,
})
c = Consumer({
'bootstrap.servers': bootstrap_server,
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'PLAIN',
'sasl.username': sasl_user_name,
'sasl.password': sasl_password,
'group.id': 'script2Group',
'enable.auto.commit': False,
'auto.offset.reset': 'latest',
})
def delivery_report(err, msg):
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
c.subscribe(['script2'])
x = True
while x == True:
msg = c.poll(1.0)
if msg is None:
continue
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
if msg.value().decode('utf-8') == 'temperature':
p.poll(0)
data = "20 C"
p.produce('script1', data.encode('utf-8'), callback=delivery_report)
print("Temperature sent to Script 1")
p.flush()
x = False
1条答案
按热度按时间pdkcd3nj1#
在互联网上两个不同的python脚本之间进行通信的更好的替代方案。
您可以使用gPRC或HTTP,而不需要Kafka或任何外部消息队列...
您所做的与Kafka Steams的设计非常相似,这是另一个基于python库构建的库Fluvii所能做的。
关于你的代码,你的脚本在第一个被使用的消息上停止。这不是使用Kafka的推荐方式。至少,使用者应该总是长时间运行的。
你也禁用了偏移提交,并且没有自己提交。这和
auto.offset.reset=latest
一起意味着你永远不会从任何一个主题读取任何现有的数据;因此,只有在启动script 1之前启动script 2时,它才起作用