我试图在apache flink上准备一个小示例应用程序,主要目的是演示如何使用广播变量。此应用程序读取csv文件并准备数据集[构建信息]
case class BuildingInformation(
buildingID: Int, buildingManager: String, buildingAge: Int,
productID: String, country: String
)
这就是我目前创建buildinginformation数据集的方式:
val buildingsBroadcastSet =
envDefault
.fromElements(
readBuildingInfo(
envDefault,
"./SensorFiles/building.csv")
)
后来,我开始这样的转变:
val hvacStream = readHVACReadings(envDefault,"./SensorFiles/HVAC.csv")
hvacStream
.map(new HVACToBuildingMapper)
.withBroadcastSet(buildingsBroadcastSet,"buildingData")
.writeAsCsv("./hvacTemp.csv")
(buildingid->buildinginformation)的Map是我想要的作为广播的参考数据。为了做好准备,我实现了一个richmap函数:
class HVACToBuildingMapper
extends RichMapFunction [HVACData,EnhancedHVACTempReading] {
var allBuildingDetails: Map[Int, BuildingInformation] = _
override def open(configuration: Configuration): Unit = {
allBuildingDetails =
getRuntimeContext
.getBroadcastVariableWithInitializer(
"buildingData",
new BroadcastVariableInitializer [BuildingInformation,Map[Int,BuildingInformation]] {
def initializeBroadcastVariable(valuesPushed:java.lang.Iterable[BuildingInformation]): Map[Int,BuildingInformation] = {
valuesPushed
.asScala
.toList
.map(nextBuilding => (nextBuilding.buildingID,nextBuilding))(breakOut)
}
}
)
}
override def map(nextReading: HVACData): EnhancedHVACTempReading = {
val buildingDetails = allBuildingDetails.getOrElse(nextReading.buildingID,UndefinedBuildingInformation)
// ... more intermediate data creation logic here
EnhancedHVACTempReading(
nextReading.buildingID,
rangeOfTempRecorded,
isExtremeTempRecorded,
buildingDetails.country,
buildingDetails.productID,
buildingDetails.buildingAge,
buildingDetails.buildingManager
)
}
}
在函数签名中
def initializeBroadcastVariable(valuesPushed:java.lang.Iterable[BuildingInformation]): Map[Int,BuildingInformation]
对java.lang.iterable的限定是我的补充。没有这一点,编译器就会抱怨intellij。
在运行时,应用程序在以下点失败:我正在创建一个由框架传递给open()函数的iterable[buildinginformation]Map:
java.lang.Exception: The user defined 'open()' method caused an exception: scala.collection.immutable.$colon$colon cannot be cast to org.nirmalya.hortonworks.tutorial.BuildingInformation
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:475)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: scala.collection.immutable.$colon$colon cannot be cast to org.nirmalya.hortonworks.tutorial.BuildingInformation
at org.nirmalya.hortonworks.tutorial.HVACReadingsAnalysis$HVACToBuildingMapper$$anon$7$$anonfun$initializeBroadcastVariable$1.apply(HVACReadingsAnalysis.scala:139)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.nirmalya.hortonworks.tutorial.HVACReadingsAnalysis$HVACToBuildingMapper$$anon$7.initializeBroadcastVariable(HVACReadingsAnalysis.scala:139)
at org.nirmalya.hortonworks.tutorial.HVACReadingsAnalysis$HVACToBuildingMapper$$anon$7.initializeBroadcastVariable(HVACReadingsAnalysis.scala:133)
at org.apache.flink.runtime.broadcast.BroadcastVariableMaterialization.getVariable(BroadcastVariableMaterialization.java:234)
at org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext.getBroadcastVariableWithInitializer(DistributedRuntimeUDFContext.java:84)
at org.nirmalya.hortonworks.tutorial.HVACReadingsAnalysis$HVACToBuildingMapper.open(HVACReadingsAnalysis.scala:131)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:471)
... 3 more
09:28:54,389 INFO org.apache.flink.runtime.client.JobClientActor - 04/29/2016 09:28:54 Job execution switched to status FAILED.
假设这可能是从(java)iterable转换case类失败的特殊情况(尽管我自己并不确信),我尝试用其所有成员字段的tuple5替换buildinginformation。行为没有改变。
我本可以试着提供一个canbuildfrom,但我没有做到。我的大脑拒绝简单的case类不能Map到另一个数据结构。有点不对劲,我看不出来。
为了完成这篇文章,我尝试了对应于scala2.11.x和scala2.10.x的flink版本:行为是相同的。
此外,这里还有enhancedhvactempreading(为了更好地理解代码):
case class EnhancedHVACTempReading(buildingID: Int, rangeOfTemp: String, extremeIndicator: Boolean,country: String, productID: String,buildingAge: Int, buildingManager: String)
我有一种预感,jvm的不适与java的iterable被用作scala的列表有关,但是我当然不确定。
有人能帮我找出错误吗?
1条答案
按热度按时间vvppvyoh1#
问题是,你必须把一些东西放回原处
map
中的函数readBuildingInfo
. 此外,你不应该使用fromElements
如果你提供List[BuildingInformation]
而是使用fromCollection
如果你想把名单压平。下面的代码片段显示了必要的更改。和