kafka和pyspark集成

lymgl2op  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(375)

我对大数据很幼稚,我试图把Kafka和spark联系起来。这是我的制作人代码

import os
import sys
import pykafka
def get_text():
    ## This block generates my required text. 
      text_as_bytes=text.encode(text)
      producer.produce(text_as_bytes)

if __name__ == "__main__":
    client = pykafka.KafkaClient("localhost:9092")
    print ("topics",client.topics)
    producer = client.topics[b'imagetext'].get_producer()

    get_text()

当我执行bin/kafka-console-consumer.sh--zookeeper时,这是在console-consumer上打印生成的文本localhost:2181 --topic imagetext--从头开始
现在我想用spark使用这个文本,这是我的jupyter代码

import findspark
findspark.init()
import os
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /spark-2.1.1-bin-hadoop2.6/spark-streaming-kafka-0-8-assembly_2.11-2.1.0.jar pyspark-shell'

conf = SparkConf().setMaster("local[2]").setAppName("Streamer")
sc = SparkContext(conf=conf)

ssc = StreamingContext(sc,5)
print('ssc =================== {} {}')

kstream = KafkaUtils.createDirectStream(ssc, topics = ['imagetext'], 
     kafkaParams = {"metadata.broker.list": 'localhost:9092'})

print('contexts =================== {} {}')
lines = kstream.map(lambda x: x[1])
lines.pprint()

ssc.start()
ssc.awaitTermination()
ssc.stop(stopGraceFully = True)

但这是在我的jupyter上产生的输出

Time: 2018-02-21 15:03:25
-------------------------------------------

-------------------------------------------
Time: 2018-02-21 15:03:30
-------------------------------------------

不是我控制台上的文本。。请帮忙,找不出错误。

qjp7pelc

qjp7pelc1#

只需将您在consumer中的端口从9092更改为2181,因为它是zookeeper。从生产商方面,它必须连接到Kafka,端口号为9092。从拖缆侧,它必须连接到端口号为2181的zookeeper。

t98cgbkg

t98cgbkg2#

我找到了另一个解决办法。而解决 get_text() 在循环工作时,它不是正确的解决方案。您的数据在Kafka中发送时不是连续的。因此,spark streaming不应该以这种方式获得它。
kafka python库提供了 get(timeout) 使Kafka等待请求的功能。

producer.send(topic,data).get(timeout=10)

既然你用的是 pykafka ,我不确定它是否有效。尽管如此,你仍然可以尝试一次,不放 get_text() 循环中。

相关问题