如何跳过spark中的特定行 Dataset<Row>
当一行扁平化为多个记录时出错,并继续处理数据集中的下一行。
示例:行到字的Map使用带有flatmapfunction的dataset api引发作业,如果任何行有字“error”,则希望跳过该行并继续处理下一行。
第1行: I AM A JAVA PROGRAMMER
-可以处理。
第2行: THERE IS AN ERROR IN THIS LINE
-跳过这一行,包括其中的单词,因为它包含“错误”单词。
第3行: CODE LOOKS BETTER WORK BETTER
-可以处理
输出应为: I, AM, A, JAVA, PROGRAMMER ,CODE, LOOKS, BETTER, WORK, BETTER
https://docs.datafabric.hpe.com/61/spark/structuredstreamingwordcountapplication_2.html
示例代码:
Dataset<String> inputLines = sparkSession.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "http://127.0.0.1:9092")
.option("subscribe", "sampleTopic")
.option("startingOffsets", "earliest")
.load()
.selectExpr("CAST(value AS STRING)").as(Encoders.STRING());
inputLines.printSchema();
Dataset<String> inputWords = inputLines.flatMap(new TestMapper(), Encoders.STRING()).as(Encoders.STRING());
inputWords.writeStream()
.format("console")
.outputMode(OutputMode.Append())
.option("checkpointLocation", "D:\\tmp\\")
.start()
.awaitTermination();
public class TestMapper implements FlatMapFunction<String, String> {
@Override
public Iterator<String> call(String line) throws Exception {
return flattenWord(line);
}
private Iterator<String> flattenWord(String line) {
if(line.contains("ERROR"))
throw new RuntimeException("Bad word****************");
return Arrays.asList(line.split(" ") ).iterator();
}
}
暂无答案!
目前还没有任何答案,快来回答吧!