从spark流媒体中的kafka消息中提取时间戳?

ztmd8pv5  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(430)

试图阅读Kafka的资料。我想从收到的消息中提取时间戳来进行结构化的spark流。Kafka(版本0.10.0.0)spark streaming(版本2.0.1)

lf5gs5x2

lf5gs5x21#

我建议两件事:
假设您通过最新的kafka流api(0.10 kafka)创建流
e、 g.使用依赖关系: "org.apache.spark" %% "spark-streaming-kafka-0-10" % 2.0.1 然后根据上面的文档创建流:

val kafkaParams = Map[String, Object](
     "bootstrap.servers" -> "broker1:9092,broker2:9092",
     "key.deserializer" -> classOf[StringDeserializer],
     "value.deserializer" -> classOf[ByteArrayDeserializer],
     "group.id" -> "spark-streaming-test",
     "auto.offset.reset" -> "latest",
     "enable.auto.commit" -> (false: java.lang.Boolean))

val sparkConf = new SparkConf()
// suppose you have 60 second window
val ssc = new StreamingContext(sparkConf, Seconds(60))
ssc.checkpoint("checkpoint")

val stream = KafkaUtils.createDirectStream(ssc, PreferConsistent,
Subscribe[String, Array[Byte]](topics, kafkaParams))

您的流将是consumerrecord[string,array[byte]]的数据流,您可以获得时间戳和键值,如下所示:

stream.map { record => (record.timestamp(), record.key(), record.value())  }

希望有帮助。

yzckvree

yzckvree2#

spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "your.server.com:9092")
  .option("subscribe", "your-topic")
  .load()
  .select($"timestamp", $"value")

字段“timestamp”是您要查找的内容。键入-java.sql.timestamp。确保您正在连接到0.10 kafka服务器。早期版本中没有时间戳。此处描述的字段的完整列表-http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#creating-a-kafka-source-for-batch-querys查询

相关问题