当行展平逻辑中存在错误/异常时,使用flatmap函数跳过spark数据集中的特定行

eiee3dmh  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(229)

如何跳过spark中的特定行 Dataset<Row> 当一行扁平化为多个记录时出错,并继续处理数据集中的下一行。
示例:行到字的Map使用带有flatmapfunction的dataset api引发作业,如果任何行有字“error”,则希望跳过该行并继续处理下一行。
第1行: I AM A JAVA PROGRAMMER -可以处理。
第2行: THERE IS AN ERROR IN THIS LINE -跳过这一行,包括其中的单词,因为它包含“错误”单词。
第3行: CODE LOOKS BETTER WORK BETTER -可以处理
输出应为: I, AM, A, JAVA, PROGRAMMER ,CODE, LOOKS, BETTER, WORK, BETTER https://docs.datafabric.hpe.com/61/spark/structuredstreamingwordcountapplication_2.html
示例代码:

Dataset<String> inputLines = sparkSession.readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", "http://127.0.0.1:9092")
                .option("subscribe", "sampleTopic")
                .option("startingOffsets", "earliest")
                .load()
                .selectExpr("CAST(value AS STRING)").as(Encoders.STRING());
        inputLines.printSchema();

       Dataset<String> inputWords = inputLines.flatMap(new TestMapper(), Encoders.STRING()).as(Encoders.STRING());

        inputWords.writeStream()
                .format("console")
                .outputMode(OutputMode.Append())
                .option("checkpointLocation", "D:\\tmp\\")
                .start()
                .awaitTermination();

public class TestMapper implements FlatMapFunction<String, String> {
@Override
public Iterator<String> call(String line) throws Exception {
    return flattenWord(line);

}

private Iterator<String> flattenWord(String line) {
    if(line.contains("ERROR"))
        throw new RuntimeException("Bad word****************");
    return Arrays.asList(line.split(" ") ).iterator();
}

}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题