如何以更有效的方式使用spark流将数据从kafka插入hbase?

0ejtzxu1  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(322)

我尝试通过pyspark将kafka数据摄取到hbase中。目前,我尝试使用happybase将kafka的数据插入hbase,但是速度非常慢。我认为批量加载也使用happybase不会显著提高性能。下面是当前的源代码。我需要达到最高可能的表现。你有什么想法吗?也许 吧 saveAsNewAPIHadoopDataset 或者其他解决方案?

def SaveToHBase(rdd):
    print("=====Pull from Stream=====")
    if not rdd.isEmpty():

        print(len(rdd.collect()))
        print(datetime.now())
        for line in rdd.collect():
            ctable.put((line.log_id), { \
            b'log:content': (line.log)})

kds = KafkaUtils.createDirectStream(ssc, topic, k_params, fromOffsets=None)

parsed = kds.filter(lambda x: x != None and len(x) > 0 )
parsed = parsed.map(lambda x: x[1])
parsed = parsed.map(lambda rec: rec.split(","))
parsed = parsed.filter(lambda x: x != None and len(x) == 2 )
parsed = parsed.map(lambda data:Row(log_id=getValue(str,data[0]), \
        log=getValue(str,data[1])))

parsed.foreachRDD(SaveToHBase)
pkwftd7m

pkwftd7m1#

Kafka连接(kafka connect)通常是一个很好的工具,用于获取Kafka与外部源和目标之间的数据。
kafka connect是apachekafka的一部分,提供可伸缩的流式集成,只需要一个配置文件即可实现。有很多预建的连接器,你也可以写你自己的如果你想。您可以在一台机器上运行kafka connect,也可以在集群上运行kafka connect,以提高弹性和吞吐量。它和你的Kafka经纪人分开经营。
如果要在数据到达目标之前对其进行处理,可以使用流处理技术(spark streaming、kafka streams、ksql等)并将结果写入kafka主题。然后,该kafka主题作为kafka connect写入目标数据存储(在您的示例中是hbase)的源。
您可以在以下位置找到用于hbase的kafka connect连接器:https://www.confluent.io/connector/kafka-connect-hbase-sink/

相关问题