kinesis spark流读取器在电子病历中不工作

3zwtqj6y  于 2021-05-26  发布在  Spark
关注(0)|答案(0)|浏览(281)

我是新的Spark,我试图做一些代码尖峰使用scala和Spark流读取数据从kinesis流,转换为dataframe和显示结果与下面的代码。我最初面临许多版本不匹配的问题,可以解决这些问题。我创建了一个示例kinesis数据流,并使用kinesis数据生成器生成了一些数据。我能够让代码在本地环境中工作,并显示从我的kinesis数据流读取的消息。我试着在emr中部署相同的代码,使用1个主代码和2个内核(发行版)label:emr-5.31.0)并尝试使用spark submit命令执行我的代码。它只是循环通过,但不从动觉中提取数据。这个信息从来都不是从动觉中提取出来的。请帮忙。
代码段:

def parseStreamDataSample() = {
    val sparkConf = new SparkConf().setAppName("SBSStreamingReader")
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()
    val ssc = new StreamingContext(spark.sparkContext, Duration(5000))
    val messageSchema: StructType = new StructType().add("time", "string").add("id", "string").add("messageId", "int").add("data", "string")
    val kinesisStream = KinesisInputDStream.builder
      .streamingContext(ssc).endpointUrl("kinesis.ap-south-1.amazonaws.com")
      .regionName(RegionUtils.getRegionMetadata.getRegionByEndpoint("kinesis.ap-south-1.amazonaws.com").getName())
      .streamName(kinesisStreamName).initialPositionInStream(InitialPositionInStream.LATEST)
      .checkpointAppName("kinesisStreamingSample").checkpointInterval(Duration(5000))
      .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
      .build()

    val lines = kinesisStream.map(x => new String(x))
    import spark.implicits._
    lines.foreachRDD((rdd) => { // Works, Yeah !!
      if (!rdd.isEmpty) {
        val batchDF = rdd.toDF()
          .selectExpr("CAST(value AS STRING)")
          .select(from_json($"value", messageSchema, Map.empty[String, String]) as "jsonData")
          .select("jsonData.*")
        batchDF.show()
      }
     }
    )
    ssc.start()
    ssc.awaitTermination;
  }

内部版本.sbt:

scalaVersion := "2.11.12"
libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "2.4.6" ,
  "org.apache.spark" %% "spark-streaming" % "2.4.6" ,
  "org.apache.spark" %% "spark-sql" % "2.4.6" ,
  "org.apache.spark" % "spark-streaming-kinesis-asl_2.11" % "2.4.6" ,
  "org.apache.bahir" %% "spark-streaming-twitter" % "2.1.0"
)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题