我经营几个集装箱,Kafka,Spark和Zookeeper。我将数据推送到一个kafka主题(kafka工作并且正在处理数据),并将spark与这个主题连接起来,但是如果我使用“kafkautils.createdirectstream”,jupiter就不会给出任何输出。你能帮帮我吗?docker容器来自lambda体系结构
# # Kafka/ pyspark streaming
# Kafka retrieve topics
import sys
from kafka import KafkaClient
client = KafkaClient(bootstrap_servers='localhost:9092')
future = client.cluster.request_update()
client.poll(future=future)
metadata = client.cluster
print(metadata.topics())
# Import libraries to load csv data on topic 'test'
from kafka import KafkaProducer
import logging
from json import dumps, loads
import csv
logging.basicConfig(level=logging.INFO)
# load csv data into kafka topic 'test
producer = KafkaProducer(bootstrap_servers='localhost:9092', value_serializer=lambda
K:dumps(K).encode('utf-8'))
with open('/Users/karsten/Desktop/Datensets/divvy_data.csv', 'r') as file:
reader = csv.reader(file)
for messages in reader:
producer.send('test', messages)
producer.flush()
import findspark
findspark.init()
import pyspark
def spark_context_creator():
conf = SparkConf()
#set name for our app
conf.setAppName('divvy_test')
#the master url to connect
conf.setMaster('spark://spark-master:7077')
sc = None
try:
sc.stop()
sc = SparkContext(conf=conf)
except:
sc = SparkContext(conf=conf)
return sc
sc = spark_context_creator()
ssc = StreamingContext(sc,1)
kafkastream = KafkaUtils.createDirectStream(ssc, 'zookeeper:2181', 'my-created-consumer-group', {'test':1})
# extract json data from tupil
data = kafkastream.map(lambda x: json.loads(x[1])).pprint()
print(type(data))
rdd = scc.parallelize(data)
print(type(rdd))
ssc.start()
ssc.awaitTermination()
print(rdd)
暂无答案!
目前还没有任何答案,快来回答吧!