我是新的Spark,我试图做一些代码尖峰使用scala和Spark流读取数据从kinesis流,转换为dataframe和显示结果与下面的代码。我最初面临许多版本不匹配的问题,可以解决这些问题。我创建了一个示例kinesis数据流,并使用kinesis数据生成器生成了一些数据。我能够让代码在本地环境中工作,并显示从我的kinesis数据流读取的消息。我试着在emr中部署相同的代码,使用1个主代码和2个内核(发行版)label:emr-5.31.0)并尝试使用spark submit命令执行我的代码。它只是循环通过,但不从动觉中提取数据。这个信息从来都不是从动觉中提取出来的。请帮忙。
代码段:
def parseStreamDataSample() = {
val sparkConf = new SparkConf().setAppName("SBSStreamingReader")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
val ssc = new StreamingContext(spark.sparkContext, Duration(5000))
val messageSchema: StructType = new StructType().add("time", "string").add("id", "string").add("messageId", "int").add("data", "string")
val kinesisStream = KinesisInputDStream.builder
.streamingContext(ssc).endpointUrl("kinesis.ap-south-1.amazonaws.com")
.regionName(RegionUtils.getRegionMetadata.getRegionByEndpoint("kinesis.ap-south-1.amazonaws.com").getName())
.streamName(kinesisStreamName).initialPositionInStream(InitialPositionInStream.LATEST)
.checkpointAppName("kinesisStreamingSample").checkpointInterval(Duration(5000))
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
.build()
val lines = kinesisStream.map(x => new String(x))
import spark.implicits._
lines.foreachRDD((rdd) => { // Works, Yeah !!
if (!rdd.isEmpty) {
val batchDF = rdd.toDF()
.selectExpr("CAST(value AS STRING)")
.select(from_json($"value", messageSchema, Map.empty[String, String]) as "jsonData")
.select("jsonData.*")
batchDF.show()
}
}
)
ssc.start()
ssc.awaitTermination;
}
内部版本.sbt:
scalaVersion := "2.11.12"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.4.6" ,
"org.apache.spark" %% "spark-streaming" % "2.4.6" ,
"org.apache.spark" %% "spark-sql" % "2.4.6" ,
"org.apache.spark" % "spark-streaming-kinesis-asl_2.11" % "2.4.6" ,
"org.apache.bahir" %% "spark-streaming-twitter" % "2.1.0"
)
暂无答案!
目前还没有任何答案,快来回答吧!