我想过滤一个来自kafka服务器的传入json,根据某个版本类型,我需要对它们进行分离和解析,并分别对它们进行某些计算。但在这里,当iam使用if语句时,iam得到错误“no output operations registered”,因为iam使用if语句中的print语句。运行代码的替代方法是什么?提前谢谢。 enter code here
```
if(packetType.equals("P300")) {
val three300s=dstream.map(parser300)
three300s.print()
}
else if(packetType.equals("P30")) {
val thirty30s=dstream.map(parser30)
thirty30s.print()
}
else if(packetType.equals("P6")) {
val six6s=dstream.map(parser6)
six6s.print()
}
ssc.start()
ssc.awaitTermination()
}
}
我正在获取“exception in thread”main“org.apache.spark.sparkeexception:task not serializable”。我观察到序列化有一些问题,但无法确切地发现。
enter code here
pType.map(rdd => {
val pkt= rdd.toString()
if(pkt.equals("P300")) {
val t300=dstream.map(par300)
t300.print()
}else if(pkt.equals("P30")) {
val t30=dstream.map(par30)
t30.print()
}else if(pkt.equals("P6")) {
val t6=dstream.map(par6)
t6.print()
}
})
1条答案
按热度按时间8aqjt8rx1#
看来
packetType
与您提供的任何案例都不匹配。你能补充一下吗else
声明如下。它一定会消除你所得到的错误。否则你得确定
packetType
和任何一个案子都匹配。希望这有帮助!