我收到Kafka发来的jsondatetime“:”2020-04-25 11:16:02.979,“请求头”:[{“x-v”:“1446”},{“ip地址”:“10.101.10.0”}]}
我正在阅读此数据并定义此数据的结构,如下所示:
schema = StructType([
StructField("datetime", TimestampType(), True),
StructField("req_head", ArrayType(MapType(StringType(), StringType())
), True)])
我在数据框中创建了searchstring列,以方便对传入数据的搜索。searchstring column是我要搜索的列的数组。value列是从kafka接收的纯文本列。
Stream_jsonDF = StringDF.select("value", from_json(col("value"), schema).alias("data")) \
.withColumn("searchstring", array(lit("x-v"),lit("ip-address"), lit("portno"))) \
.select("data.datetime",
"data.req_head",
"searchstring",
"value"
)
这是我真正关心的问题。我编写了以下方法来查找列的存在性。
def col_presence(df, col):
print("The type of parameters are :", type(df), type(col))
return df["value"].contains(col)
Stream_jsonDF.withColumn("ip", when(col_presence( Stream_jsonDF, Stream_jsonDF["searchstring"][1]),
getVal("req_head",Stream_jsonDF["searchstring"][1])
).otherwise(exit("column at Index 1 not found"))
) \
.writeStream.format("console").option("truncate", value=False).start().awaitTermination()
我的要求是,当col\u presence()返回true时,必须处理getval(),并且必须进一步处理代码。如果col_presence()返回false,那么程序应该退出并显示代码中提到的文本消息。
换句话说,搜索传入数据中是否存在该列。如果存在,则处理数据。否则退出。
如果我使用exit(),不管col\u presence()返回的是true还是false,代码都在退出。如果我没有使用exit()并在otherwise()中编写纯文本,那么代码工作正常。例如:。否则(“未找到索引1处的列”)
我以为col_presence()总是返回false。我试图通过调用col\u presence()来创建一个标志列,以检查函数返回的内容。
Stream_jsonDF.withColumn("flag", when(col_presence( Stream_jsonDF, Stream_jsonDF["searchstring"][1])))
如果列存在,则完全显示true,否则显示false。
使用exit()时,即使col\u presence()返回一个真正的布尔标志,我也无法找出代码失败的原因。只有在使用exit()时才会发生这种情况。
有人能看一下代码,让我知道为什么控件总是会出错吗?
暂无答案!
目前还没有任何答案,快来回答吧!