我正在使用spark结构化流媒体与Kafka。下面的语句无效,正在抛出错误。
Dataframedf中的value列按原样从kafka主题提要获取。
def col_presence(df,searchstr ):
if df["value"].contains(searchstr):
return True
else: return False
Stream_jsonDF.withColumn("x-v",getVal(Stream_jsonDF["req_header"], Stream_jsonDF["searchstring"][0])) \
.withColumn("ip",
when(lit(col_exists(Stream_jsonDF , "ip-address")),
getVal("req_header",Stream_jsonDF["searchstring"][1])
).otherwise("not found")
) \
.writeStream.format("console").option("truncate", value=False).start().awaitTermination()
无法对Dataframe列应用条件表达式。
它在下面也试过了,但运气不好。
l = df.withColumn("flg",df["value"].contains(col)).select("flg").collect()
It throws following exception pyspark.sql.utils.AnalysisException: 'Queries with streaming sources must be executed with writeStream.start();;
我认为,与其传递dataframe,不如通过将def注册为udf来传递值本身,这样就可以将其捕获为字符串值。例:如下所示
@udf
def col_presence(df,searchstr ):
return True
但它在when()中失败了。
Stream_jsonDF.withColumn("ip",
when(lit(col_exists(Stream_jsonDF["value"], Stream_jsonDF["searchstring"][1])), getVal("req_header",Stream_jsonDF["searchstring"][1])).otherwise("not found")
) \
.writeStream.format("console").option("truncate", value=False).start().awaitTermination()
虽然我在udf中通过硬编码返回布尔值,但当()表示数据类型不匹配时,返回值失败。下面是错误
"cannot resolve 'CASE WHEN col_exists(value, searchstring[1]) THEN 'Found' ELSE 'not found' END' due to data type mismatch: WHEN expressions in CaseWhen should all be boolean type, but the 1th when expression's type is col_exists(value#21, searchstring#26[1])
casewhen中的when表达式都应该是布尔类型-->我在when()中编写了一个udf,该udf返回true。不支持在when()中使用自定义项?
总而言之,你能回答以下问题吗?为什么我不能在列数据类型上使用if语句?
if df["value"].contains(searchstr): return True
when()不支持udf调用吗?
暂无答案!
目前还没有任何答案,快来回答吧!