sparkstream无法从kafka主题读取数据

z9ju0rcb  于 2021-05-16  发布在  Spark
关注(0)|答案(0)|浏览(233)

我是Kafka的初学者,我正在尝试从一个 Kafka topic 使用 sparkscala .
我的主要职能是:

def main(args : Array[String]) : Unit = {

val spark = SparkSession
    .builder()
    .appName("testKafka")
    .master("local[*]")
    .getOrCreate()
import spark.implicits._

val df = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "sample_topic")
    .load()

df.printSchema()
df
    .writeStream
    .outputMode("append")
    .format("com.databricks.spark.csv")
    .option("checkpointLocation", "/home/wintersoldier/Desktop/checkpoint")
    .option("path","/home/wintersoldier/Documents/tookitaki/sparkTest/src/main/scala/kafka_out/outCSV")
    .start()
    .awaitTermination()

spark.stop()
spark.close()
}

然后我通过Kafka制作人终端发送信息: ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic sample_topic 应用程序正在运行,没有任何错误,但是没有创建包含主题消息数据的csv文件。我也尝试通过以下方式在终端上打印,而不是写入csv: df.format("console") 但我还是无法得到任何输出。
我的Kafka版本是: kafka_2.11-0.9.0.0 my build.sbt包含:

libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-core" % "2.4.5" ,
    "org.apache.spark" %% "spark-mllib" % "2.4.5" ,
    "org.apache.spark" %% "spark-sql" % "2.4.5" ,
    "org.apache.spark" %% "spark-hive" % "2.4.5" ,
    "org.apache.spark" %% "spark-streaming" % "2.4.5" ,
    "org.apache.spark" %% "spark-graphx" % "2.4.5",
    "org.apache.spark" %% "spark-streaming-kafka" % "1.6.3",
    "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.7",
)

更新:终端日志:

[info] Running com.test.kafka.testKafka 
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
20/11/29 16:35:52 INFO SparkContext: Running Spark version 2.4.5
20/11/29 16:35:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
20/11/29 16:35:52 INFO SparkContext: Submitted application: testKafka
20/11/29 16:35:52 INFO SecurityManager: Changing view acls to: wintersoldier
20/11/29 16:35:52 INFO SecurityManager: Changing modify acls to: wintersoldier
20/11/29 16:35:52 INFO SecurityManager: Changing view acls groups to: 
20/11/29 16:35:52 INFO SecurityManager: Changing modify acls groups to: 
20/11/29 16:35:52 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(wintersoldier); groups with view permissions: Set(); users  with modify permissions: Set(wintersoldier); groups with modify permissions: Set()
20/11/29 16:35:53 INFO Utils: Successfully started service 'sparkDriver' on port 46047.
20/11/29 16:35:53 INFO SparkEnv: Registering MapOutputTracker
20/11/29 16:35:53 INFO SparkEnv: Registering BlockManagerMaster
20/11/29 16:35:53 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
20/11/29 16:35:53 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
20/11/29 16:35:53 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-8579f1c4-621e-428c-ba2f-aaa457a9b1d4
20/11/29 16:35:53 INFO MemoryStore: MemoryStore started with capacity 366.3 MB
20/11/29 16:35:53 INFO SparkEnv: Registering OutputCommitCoordinator
20/11/29 16:35:53 INFO Utils: Successfully started service 'SparkUI' on port 4040.
20/11/29 16:35:53 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://anonymouspirates:4040
20/11/29 16:35:53 INFO Executor: Starting executor ID driver on host localhost
20/11/29 16:35:53 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 35529.
20/11/29 16:35:53 INFO NettyBlockTransferService: Server created on anonymouspirates:35529
20/11/29 16:35:53 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
20/11/29 16:35:53 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, anonymouspirates, 35529, None)
20/11/29 16:35:53 INFO BlockManagerMasterEndpoint: Registering block manager anonymouspirates:35529 with 366.3 MB RAM, BlockManagerId(driver, anonymouspirates, 35529, None)
20/11/29 16:35:53 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, anonymouspirates, 35529, None)
20/11/29 16:35:53 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, anonymouspirates, 35529, None)
20/11/29 16:35:54 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/home/wintersoldier/Documents/tookitaki/sparkTest/spark-warehouse').
20/11/29 16:35:54 INFO SharedState: Warehouse path is 'file:/home/wintersoldier/Documents/tookitaki/sparkTest/spark-warehouse'.
20/11/29 16:35:54 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

20/11/29 16:35:56 INFO MicroBatchExecution: Starting [id = 3918834a-7b1d-41d6-9069-60bfb807019f, runId = 9a8756a8-de0e-4bc1-8e37-91fc02bfaeb0]. Use file:///home/wintersoldier/Desktop/checkpoint to store the query checkpoint.
20/11/29 16:35:56 INFO MicroBatchExecution: Using MicroBatchReader [KafkaV2[Subscribe[sample_topic]]] from DataSourceV2 named 'kafka' [org.apache.spark.sql.kafka010.KafkaSourceProvider@ebf74a2]
20/11/29 16:35:56 INFO MicroBatchExecution: Starting new streaming query.
20/11/29 16:35:56 INFO MicroBatchExecution: Stream started from {}
20/11/29 16:35:56 INFO ConsumerConfig: ConsumerConfig values: 
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest
        bootstrap.servers = [localhost:9092]
        check.crcs = true
        client.id = 
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = false
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = spark-kafka-source-dba9525c-4dfe-4cde-bcc7-0d54fde3e897-810093942-driver-0
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        isolation.level = read_uncommitted
        key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 1
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = https
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

20/11/29 16:35:56 INFO AppInfoParser: Kafka version : 2.0.0
20/11/29 16:35:56 INFO AppInfoParser: Kafka commitId : 3402a8361b734732

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题