如何处理从kafka到cassandra的pyspark结构化流媒体

9njqaruj  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(514)

我用pyspark从Kafka那里获取数据,然后把它插入Cassandra。我快到了,我只需要最后一步。

def Spark_Kafka_Receiver():

# STEP 1 OK!

    dc = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "000.00.0.240:9092") \
        .option("subscribe", "MyTopic") \
    .load()
    dc.selectExpr("CAST(key as STRING)", "CAST(value AS STRING) as msg")

# STEP 2 OK!

    dc.writeStream \
        .outputMode("append") \
        .foreachBatch(foreach_batch_function) \
        .start() \
        .awaitTermination()

# STEP 3 NEED HELP

def foreach_batch_function(df, epoch_id):
    Value = df.select(df.value)

    ???????

    # WRITE DATA FRAME ON CASSANDRA
    df.write \
        .format("org.apache.spark.sql.cassandra") \
        .mode('append') \
        .options(table=table_name, keyspace=keyspace) \
        .save()

我的价值观是这样的:
Dataframe[值:二进制]
我需要插入一些打开我的值的东西,把二进制文件放在里面,用正确的格式创建一个好的Dataframe,用它来处理数据库并执行我代码的最后一部分。

nnvyjq4y

nnvyjq4y1#

你不需要使用 foreachBatch 不再。您只需升级到spark cassandra connector 2.5,它本机支持spark结构化流媒体,因此您只需编写:

dc.writeStream \
        .format("org.apache.spark.sql.cassandra") \
        .mode('append') \
        .options(table=table_name, keyspace=keyspace)
        .start() \
        .awaitTermination()

关于问题的第二部分-如果要将值转换为多列,则需要使用 from_json 函数,将模式传递给它。下面是scala中的示例,但python代码应该非常类似:

val schemaStr = "id:int, value:string"
val schema = StructType.fromDDL(schemaStr)
val data = dc.selectExpr("CAST(value AS STRING)")
  .select(from_json($"value", schema).as("data"))
  .select("data.*").drop("data")

然后你可以通过 writeStream

相关问题