是否可以使用confluent_kafka在同一Python文件中使用Kafka消费者和生产者

flseospp  于 2023-03-07  发布在  Apache
关注(0)|答案(1)|浏览(137)

我目前正在尝试使用confluent_Kafka在Python中允许两个脚本通信。
脚本1向脚本2生成一条温度阅读消息,脚本2使用来自s1的消息,并向脚本1返回当前温度,然后将温度打印到控制台。
下面的代码对我来说非常挑剔,有时候可以工作,但大多数时候script2.py会运行,但不消耗任何消息。我想知道这是否可能,或者是否有更好的替代方案在互联网上两个不同的python脚本之间通信。

脚本1.py

from confluent_kafka import Consumer, Producer
#from confluent_kafka import Producer
from dotenv import load_dotenv
import os

load_dotenv(".env")

#Gathers sensitive data from the .env file
bootstrap_server = os.getenv("BOOTSTRAP_SERVER")
sasl_user_name = os.getenv("CLIENT_ID")
sasl_password = os.getenv("CLIENT_SECRET")

#Set up the Kafka producer
p = Producer({
      'bootstrap.servers': bootstrap_server,
      'security.protocol': 'SASL_SSL',
      'sasl.mechanisms': 'PLAIN',
      'sasl.username': sasl_user_name,
      'sasl.password': sasl_password,
})

c = Consumer({
    'bootstrap.servers': bootstrap_server,
    'security.protocol': 'SASL_SSL',
    'sasl.mechanisms': 'PLAIN',
    'sasl.username': sasl_user_name,
    'sasl.password': sasl_password,
    'group.id': 'script1-group',
    'enable.auto.commit': False,
    'auto.offset.reset': 'latest',
    
})

def delivery_report(err, msg):
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))


p.poll(0)
data = 'temperature'
p.produce('script2', data.encode('utf-8'), callback=delivery_report)
p.flush()

c.subscribe(['script1'])

x = True
while x == True:
    msg = c.poll(1.0)
    if msg is None:
        continue
    if msg.error():
        print("Consumer error: {}".format(msg.error()))
        continue
    temp = msg.value().decode('utf-8')
    print("The temperature is " + temp)
    x = False

脚本2.py

from confluent_kafka import Consumer, Producer
#from confluent_kafka import Producer
from dotenv import load_dotenv
import os

load_dotenv(".env")

#Gathers sensitive data from the .env file
bootstrap_server = os.getenv("BOOTSTRAP_SERVER")
sasl_user_name = os.getenv("CLIENT_ID")
sasl_password = os.getenv("CLIENT_SECRET")

#Set up the Kafka producer
p = Producer({
      'bootstrap.servers': bootstrap_server,
      'security.protocol': 'SASL_SSL',
      'sasl.mechanisms': 'PLAIN',
      'sasl.username': sasl_user_name,
      'sasl.password': sasl_password,
})

c = Consumer({
    'bootstrap.servers': bootstrap_server,
    'security.protocol': 'SASL_SSL',
    'sasl.mechanisms': 'PLAIN',
    'sasl.username': sasl_user_name,
    'sasl.password': sasl_password,
    'group.id': 'script2Group',
    'enable.auto.commit': False,
    'auto.offset.reset': 'latest',
    
})

def delivery_report(err, msg):
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

c.subscribe(['script2'])

x = True
while x == True:
    msg = c.poll(1.0)
    if msg is None:
        continue
    if msg.error():
        print("Consumer error: {}".format(msg.error()))
        continue
    if msg.value().decode('utf-8') == 'temperature':
        p.poll(0)
        data = "20 C"
        p.produce('script1', data.encode('utf-8'), callback=delivery_report)
        print("Temperature sent to Script 1")
        p.flush()
        x = False
pdkcd3nj

pdkcd3nj1#

在互联网上两个不同的python脚本之间进行通信的更好的替代方案。
您可以使用gPRC或HTTP,而不需要Kafka或任何外部消息队列...
您所做的与Kafka Steams的设计非常相似,这是另一个基于python库构建的库Fluvii所能做的。
关于你的代码,你的脚本在第一个被使用的消息上停止。这不是使用Kafka的推荐方式。至少,使用者应该总是长时间运行的。
你也禁用了偏移提交,并且没有自己提交。这和auto.offset.reset=latest一起意味着你永远不会从任何一个主题读取任何现有的数据;因此,只有在启动script 1之前启动script 2时,它才起作用

相关问题