我在这里面临一个奇怪的问题,我正在阅读 Avro
从Kafka的记录,并试图反序列化它,并将其存储到一个文件中。我可以从Kafka那里得到一些记录,但是当我试图在rdd记录上使用一个函数时,它拒绝做任何事情
import java.util.UUID
import io.confluent.kafka.serializers.KafkaAvroDecoder
import com.my.project.avro.AvroDeserializer
import com.my.project.util.SparkJobLogging
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext, Time}
import org.apache.spark.streaming.kafka._
import kafka.serializer.{DefaultDecoder, StringDecoder}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.dstream.{DStream}
object KafkaConsumer extends SparkJobLogging {
var schemaRegistry: SchemaRegistryClient = null
val url="url:8181"
schemaRegistry= new CachedSchemaRegistryClient(url, 1000)
def createKafkaStream(ssc: StreamingContext): DStream[(String,Array[Byte])] = {
val kafkaParams = Map[String, String](
"zookeeper.connect" -> "zk.server:2181",
"group.id" -> s"${UUID.randomUUID().toString}",
"auto.offset.reset" -> "smallest",
"bootstrap.servers" -> "bootstrap.server:9092",
"zookeeper.connection.timeout.ms" -> "6000",
"schema.registry.url" ->"registry.url:8181"
)
val topic = "my.topic"
KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, Set(topic))
}
def processRecord( avroStream: Array[Byte])={
println(AvroDeserializer.toRecord(avroStream, schemaRegistry) )
}
def main(args: Array[String]) = {
val sparkConf = new SparkConf().setAppName("AvroDeserilizer")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(5))
val topicStream = createKafkaStream(ssc)map(_._2)
topicStream.foreachRDD(
rdd => if (!rdd.isEmpty()){
logger.info(rdd.count())
rdd.foreach(avroRecords=> processRecord(avroRecords))
}
)
ssc.start()
ssc.awaitTermination()
}
}
object AvroDeserializer extends SparkJobLogging{
def toRecord(buffer: Array[Byte], registry: SchemaRegistryClient): GenericRecord = {
val bb = ByteBuffer.wrap(buffer)
bb.get() // consume MAGIC_BYTE
val schemaId = bb.getInt // consume schemaId
val schema = registry.getByID(schemaId) // consult the Schema Registry
val reader = new GenericDatumReader[GenericRecord](schema)
val decoder = DecoderFactory.get().binaryDecoder(buffer, bb.position(), bb.remaining(), null)
reader.read(null, decoder) //null -> as we are not providing any datum
}
}
till语句 logger.info(rdd.count())
一切正常,我在日志中看到了准确的记录。然而在那之后什么都不起作用。当我累的时候
val record= rdd.first()
processRecord(record)
成功了,但是 rdd.foreach(avroRecords=> processRecord(avroRecords))
以及 rdd.map(avroRecords=> processRecord(avroRecords))
不起作用。每次流式通话都会打印以下内容:
17/05/14 01:01:24 INFO scheduler.DAGScheduler: Job 2 finished: foreach at KafkaConsumer.scala:56, took 42.684999 s
17/05/14 01:01:24 INFO scheduler.JobScheduler: Finished job streaming job 1494738000000 ms.0 from job set of time 1494738000000 ms
17/05/14 01:01:24 INFO scheduler.JobScheduler: Total delay: 84.888 s for time 1494738000000 ms (execution: 84.719 s)
17/05/14 01:01:24 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer()
17/05/14 01:01:24 INFO scheduler.InputInfoTracker: remove old batch metadata:
17/05/14 01:01:26 INFO yarn.YarnAllocator: Canceling requests for 0 executor containers
17/05/14 01:01:26 WARN yarn.YarnAllocator: Expected to find pending requests, but found none.
17/05/14 01:01:29 INFO yarn.YarnAllocator: Canceling requests for 0 executor containers
17/05/14 01:01:29 WARN yarn.YarnAllocator: Expected to find pending requests, but found none.
它只打印日志中的最后两行,直到下一个流上下文调用。
3条答案
按热度按时间j5fpnvbx1#
虽然上面的方法对我不起作用,但我在合流文档中找到了一种不同的方法。这个
KafkaAvroDecoder
将与模式注册表通信,获取模式并反序列化数据。因此,它消除了自定义反序列化程序的需要。依赖项jar:
kafka-avro-serializer-3.1.1.jar
. 这是完美的工作,我现在和我希望这将有助于在未来的人。kxe2p93d2#
你的
println
语句正在不在当前进程中的分布式worker上运行,因此您看不到它们。你可以试着换一个println
与log.info
核实一下。理想情况下你应该
DStream[Array[Byte]]
到DStream[GenericRecord]
并将其写入文件,使用.saveAsTextFiles
或者别的什么。你可能需要一个stream.take()
因为水流可能是无限的。http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-数据流上的操作
sg24os4d3#
foreachrdd是一个强大的原语,允许将数据发送到外部系统。然而,了解如何正确有效地使用这个原语是很重要的。要避免的一些常见错误如下。
数据流由输出操作延迟执行,就像rdd由rdd操作延迟执行一样。具体来说,dstream输出操作中的rdd操作强制处理接收到的数据。因此,如果应用程序没有任何输出操作,或者像dstream.foreachrdd()这样的输出操作中没有任何rdd操作,那么就不会执行任何操作。系统只需接收数据并丢弃它。