为什么不能在sparkDataframe列上使用条件表达式?

cwtwac6a  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(268)

我正在使用spark结构化流媒体与Kafka。下面的语句无效,正在抛出错误。
Dataframedf中的value列按原样从kafka主题提要获取。

def col_presence(df,searchstr ):
    if df["value"].contains(searchstr):
        return True
    else: return False

Stream_jsonDF.withColumn("x-v",getVal(Stream_jsonDF["req_header"], Stream_jsonDF["searchstring"][0])) \
    .withColumn("ip",
                when(lit(col_exists(Stream_jsonDF , "ip-address")),
                     getVal("req_header",Stream_jsonDF["searchstring"][1])
                     ).otherwise("not found")
                ) \
    .writeStream.format("console").option("truncate", value=False).start().awaitTermination()

无法对Dataframe列应用条件表达式。
它在下面也试过了,但运气不好。

l = df.withColumn("flg",df["value"].contains(col)).select("flg").collect()

It throws following exception pyspark.sql.utils.AnalysisException: 'Queries with streaming sources must be executed with writeStream.start();;

我认为,与其传递dataframe,不如通过将def注册为udf来传递值本身,这样就可以将其捕获为字符串值。例:如下所示

@udf
def col_presence(df,searchstr ):
        return True

但它在when()中失败了。

Stream_jsonDF.withColumn("ip",
                when(lit(col_exists(Stream_jsonDF["value"], Stream_jsonDF["searchstring"][1])), getVal("req_header",Stream_jsonDF["searchstring"][1])).otherwise("not found")
                ) \
    .writeStream.format("console").option("truncate", value=False).start().awaitTermination()

虽然我在udf中通过硬编码返回布尔值,但当()表示数据类型不匹配时,返回值失败。下面是错误

"cannot resolve 'CASE WHEN col_exists(value, searchstring[1]) THEN 'Found' ELSE 'not found' END' due to data type mismatch: WHEN expressions in CaseWhen should all be boolean type, but the 1th when expression's type is col_exists(value#21, searchstring#26[1])

casewhen中的when表达式都应该是布尔类型-->我在when()中编写了一个udf,该udf返回true。不支持在when()中使用自定义项?
总而言之,你能回答以下问题吗?为什么我不能在列数据类型上使用if语句?

if df["value"].contains(searchstr): return True

when()不支持udf调用吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题