我用pyspark从Kafka那里获取数据,然后把它插入Cassandra。我快到了,我只需要最后一步。
def Spark_Kafka_Receiver():
# STEP 1 OK!
dc = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "000.00.0.240:9092") \
.option("subscribe", "MyTopic") \
.load()
dc.selectExpr("CAST(key as STRING)", "CAST(value AS STRING) as msg")
# STEP 2 OK!
dc.writeStream \
.outputMode("append") \
.foreachBatch(foreach_batch_function) \
.start() \
.awaitTermination()
# STEP 3 NEED HELP
def foreach_batch_function(df, epoch_id):
Value = df.select(df.value)
???????
# WRITE DATA FRAME ON CASSANDRA
df.write \
.format("org.apache.spark.sql.cassandra") \
.mode('append') \
.options(table=table_name, keyspace=keyspace) \
.save()
我的价值观是这样的:
Dataframe[值:二进制]
我需要插入一些打开我的值的东西,把二进制文件放在里面,用正确的格式创建一个好的Dataframe,用它来处理数据库并执行我代码的最后一部分。
1条答案
按热度按时间nnvyjq4y1#
你不需要使用
foreachBatch
不再。您只需升级到spark cassandra connector 2.5,它本机支持spark结构化流媒体,因此您只需编写:关于问题的第二部分-如果要将值转换为多列,则需要使用
from_json
函数,将模式传递给它。下面是scala中的示例,但python代码应该非常类似:然后你可以通过
writeStream