我是新来Kafka的。我使用python设置了一个twitter侦听器,它正在localhost:9092 kafka 服务器。我可以使用kafka客户机工具(conduktor)和命令“bin/kafka-console-consumer.sh——bootstrap server”来使用侦听器生成的流localhost:9092 --topic twitter——从一开始,“但是当我尝试使用spark结构化流媒体来使用同一个流时,它没有捕获并抛出错误-找不到数据源:kafka。请按照“结构化流媒体+Kafka集成指南”的部署部分部署应用程序。;找到下面的截图
命令输出-消耗数据
spark consumer的jupyter输出-不使用数据
我的生产者或听众代码:
auth = tweepy.OAuthHandler("**********", "*************")
auth.set_access_token("*************", "***********************")
# session.set('request_token', auth.request_token)
api = tweepy.API(auth)
class KafkaPushListener(StreamListener):
def __init__(self):
#localhost:9092 = Default Zookeeper Producer Host and Port Adresses
self.client = pykafka.KafkaClient("0.0.0.0:9092")
#Get Producer that has topic name is Twitter
self.producer = self.client.topics[bytes("twitter", "ascii")].get_producer()
def on_data(self, data):
#Producer produces data for consumer
#Data comes from Twitter
self.producer.produce(bytes(data, "ascii"))
return True
def on_error(self, status):
print(status)
return True
twitter_stream = Stream(auth, KafkaPushListener())
twitter_stream.filter(track=['#fashion'])
spark结构化流媒体的用户访问
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "twitter") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
2条答案
按热度按时间xv8emn3q1#
当我提交spark作业时,发现缺少了什么,我必须包含正确的依赖包版本。我有spark 3.0.0,因此,我包括了-org.apache。spark:spark-sql-kafka-0-10_2.12:3.0.0包
vdzxcuhz2#
添加
sink
它将启动Kafka的消费数据。检查以下代码。