我想基于kafka传入数据中的列存在退出spark程序

t3psigkw  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(233)

我收到Kafka发来的jsondatetime“:”2020-04-25 11:16:02.979,“请求头”:[{“x-v”:“1446”},{“ip地址”:“10.101.10.0”}]}
我正在阅读此数据并定义此数据的结构,如下所示:

schema = StructType([
    StructField("datetime", TimestampType(), True),
    StructField("req_head", ArrayType(MapType(StringType(), StringType())
    ), True)])

我在数据框中创建了searchstring列,以方便对传入数据的搜索。searchstring column是我要搜索的列的数组。value列是从kafka接收的纯文本列。

Stream_jsonDF = StringDF.select("value", from_json(col("value"), schema).alias("data")) \
    .withColumn("searchstring", array(lit("x-v"),lit("ip-address"), lit("portno"))) \
    .select("data.datetime",
            "data.req_head",
            "searchstring",
            "value"
            )

这是我真正关心的问题。我编写了以下方法来查找列的存在性。

def col_presence(df, col):
    print("The type of parameters are :", type(df), type(col))
    return df["value"].contains(col)

Stream_jsonDF.withColumn("ip", when(col_presence( Stream_jsonDF, Stream_jsonDF["searchstring"][1]),
                               getVal("req_head",Stream_jsonDF["searchstring"][1])
                               ).otherwise(exit("column at Index 1 not found"))
                ) \
    .writeStream.format("console").option("truncate", value=False).start().awaitTermination()

我的要求是,当col\u presence()返回true时,必须处理getval(),并且必须进一步处理代码。如果col_presence()返回false,那么程序应该退出并显示代码中提到的文本消息。
换句话说,搜索传入数据中是否存在该列。如果存在,则处理数据。否则退出。
如果我使用exit(),不管col\u presence()返回的是true还是false,代码都在退出。如果我没有使用exit()并在otherwise()中编写纯文本,那么代码工作正常。例如:。否则(“未找到索引1处的列”)
我以为col_presence()总是返回false。我试图通过调用col\u presence()来创建一个标志列,以检查函数返回的内容。

Stream_jsonDF.withColumn("flag", when(col_presence( Stream_jsonDF, Stream_jsonDF["searchstring"][1])))

如果列存在,则完全显示true,否则显示false。
使用exit()时,即使col\u presence()返回一个真正的布尔标志,我也无法找出代码失败的原因。只有在使用exit()时才会发生这种情况。
有人能看一下代码,让我知道为什么控件总是会出错吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题