生产者/消费者kafka spark流媒体

7gs2gvoe  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(423)

我试图用kafka、spark流媒体和python编写生产者和消费者的代码;场景如下:有一个与json格式的里程计相关的随机消息的生产者,它使用线程每隔3秒发布一个主题的消息:

from kafka import KafkaProducer
from kafka.errors import KafkaError import threading
from random import randint import random
import json
import math

def sendMessage():

    #the function is called every 3 seconds, then a message is sent every 3 seconds
    threading.Timer(3.0, sendMessage).start()

    #connection with message broker
    producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda m: json.dumps(m).encode('ascii'))    

    #the id is initially fixed to 1, but there could be more robots
    robotId = 1
    #generation of random int
    deltaSpace = randint(1, 9) #.encode()
    thetaTwist = random.uniform(0, math.pi*2) #.encode()

    future = producer.send('odometry', key=b'message', value={'robotId': robotId, 'deltaSpace': deltaSpace, 'thetaTwist': thetaTwist}).add_callback(on_send_success).add_errback(on_send_error)

    # Block for 'synchronous' sends
    try:
        record_metadata = future.get(timeout=10)
    except KafkaError:
    # Decide what to do if produce request failed...
        log.exception()
        pass

    producer.flush()

def on_send_success(record_metadata):
    print ("topic name: " + record_metadata.topic)
    print ("number of partitions: " + str(record_metadata.partition))
    print ("offset: " + str(record_metadata.offset))

def on_send_error(excp):
    log.error('I am an errback', exc_info=excp)
    # handle exception

sendMessage()

然后,有一个消费者每3秒钟消费一次关于同一主题的消息,并使用spark流处理它们;代码如下:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json 

# Create a local StreamingContext with two working thread and batch interval of 3 second

sc = SparkContext("local[2]", "OdometryConsumer")
ssc = StreamingContext(sc, 3)

kafkaStream = KafkaUtils.createDirectStream(ssc, ['odometry'], {'metadata.broker.list': 'localhost:9092'})

parsed = kafkaStream.map(lambda v: json.loads(v))

def f(x): 
    print(x)

fore = parsed.foreachRDD(f) 

ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate

为了运行应用程序,我在端口2181上启动zookeeper服务器

sudo /opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties

然后在端口9092上启动kafka的服务器/代理

sudo /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties

然后我开始生产商和消费者

python3 Producer.py

./spark-submit --jars spark-streaming-kafka-0-8-assembly_2.11-2.3.1.jar /home/erca/Scrivania/proveTesi/SparkConsumer.py

应用程序运行时没有错误,但我不确定消息是否真的被使用了;我能做些什么来证实这一点?谢谢所有帮助我的人!

q1qsirdb

q1qsirdb1#

在ssc.start()之前使用parsed.pprint(),它将在控制台上打印记录

相关问题