kafka spark stream不适用于jupyter

bfnvny8b  于 2021-05-26  发布在  Spark
关注(0)|答案(0)|浏览(275)

我经营几个集装箱,Kafka,Spark和Zookeeper。我将数据推送到一个kafka主题(kafka工作并且正在处理数据),并将spark与这个主题连接起来,但是如果我使用“kafkautils.createdirectstream”,jupiter就不会给出任何输出。你能帮帮我吗?docker容器来自lambda体系结构


# # Kafka/ pyspark streaming

# Kafka retrieve topics

import sys

from kafka import KafkaClient

client = KafkaClient(bootstrap_servers='localhost:9092')

future = client.cluster.request_update()
client.poll(future=future)

metadata = client.cluster
print(metadata.topics())

# Import libraries to load csv data on topic 'test'

from kafka import KafkaProducer

import logging
from json import dumps, loads
import csv
logging.basicConfig(level=logging.INFO)

# load csv data into kafka topic 'test

producer = KafkaProducer(bootstrap_servers='localhost:9092', value_serializer=lambda 
   K:dumps(K).encode('utf-8'))

with open('/Users/karsten/Desktop/Datensets/divvy_data.csv', 'r') as file:
    reader = csv.reader(file)
    for messages in reader:
        producer.send('test', messages)
        producer.flush()

import findspark
findspark.init()
import pyspark

def spark_context_creator():
    conf = SparkConf()
    #set name for our app
    conf.setAppName('divvy_test')
    #the master url to connect
    conf.setMaster('spark://spark-master:7077')
    sc = None
    try:
        sc.stop()
        sc = SparkContext(conf=conf)
    except:
        sc = SparkContext(conf=conf)
    return sc

sc = spark_context_creator()

ssc = StreamingContext(sc,1)

kafkastream = KafkaUtils.createDirectStream(ssc, 'zookeeper:2181', 'my-created-consumer-group', {'test':1})

# extract json data from tupil

data = kafkastream.map(lambda x: json.loads(x[1])).pprint()
print(type(data))
rdd = scc.parallelize(data)
print(type(rdd))
ssc.start()
ssc.awaitTermination()

print(rdd)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题