我是Kafka的初学者,我正在尝试从一个 Kafka topic
使用 spark
在 scala
.
我的主要职能是:
def main(args : Array[String]) : Unit = {
val spark = SparkSession
.builder()
.appName("testKafka")
.master("local[*]")
.getOrCreate()
import spark.implicits._
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "sample_topic")
.load()
df.printSchema()
df
.writeStream
.outputMode("append")
.format("com.databricks.spark.csv")
.option("checkpointLocation", "/home/wintersoldier/Desktop/checkpoint")
.option("path","/home/wintersoldier/Documents/tookitaki/sparkTest/src/main/scala/kafka_out/outCSV")
.start()
.awaitTermination()
spark.stop()
spark.close()
}
然后我通过Kafka制作人终端发送信息: ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic sample_topic
应用程序正在运行,没有任何错误,但是没有创建包含主题消息数据的csv文件。我也尝试通过以下方式在终端上打印,而不是写入csv: df.format("console")
但我还是无法得到任何输出。
我的Kafka版本是: kafka_2.11-0.9.0.0
my build.sbt包含:
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.4.5" ,
"org.apache.spark" %% "spark-mllib" % "2.4.5" ,
"org.apache.spark" %% "spark-sql" % "2.4.5" ,
"org.apache.spark" %% "spark-hive" % "2.4.5" ,
"org.apache.spark" %% "spark-streaming" % "2.4.5" ,
"org.apache.spark" %% "spark-graphx" % "2.4.5",
"org.apache.spark" %% "spark-streaming-kafka" % "1.6.3",
"org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.7",
)
更新:终端日志:
[info] Running com.test.kafka.testKafka
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
20/11/29 16:35:52 INFO SparkContext: Running Spark version 2.4.5
20/11/29 16:35:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
20/11/29 16:35:52 INFO SparkContext: Submitted application: testKafka
20/11/29 16:35:52 INFO SecurityManager: Changing view acls to: wintersoldier
20/11/29 16:35:52 INFO SecurityManager: Changing modify acls to: wintersoldier
20/11/29 16:35:52 INFO SecurityManager: Changing view acls groups to:
20/11/29 16:35:52 INFO SecurityManager: Changing modify acls groups to:
20/11/29 16:35:52 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(wintersoldier); groups with view permissions: Set(); users with modify permissions: Set(wintersoldier); groups with modify permissions: Set()
20/11/29 16:35:53 INFO Utils: Successfully started service 'sparkDriver' on port 46047.
20/11/29 16:35:53 INFO SparkEnv: Registering MapOutputTracker
20/11/29 16:35:53 INFO SparkEnv: Registering BlockManagerMaster
20/11/29 16:35:53 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
20/11/29 16:35:53 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
20/11/29 16:35:53 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-8579f1c4-621e-428c-ba2f-aaa457a9b1d4
20/11/29 16:35:53 INFO MemoryStore: MemoryStore started with capacity 366.3 MB
20/11/29 16:35:53 INFO SparkEnv: Registering OutputCommitCoordinator
20/11/29 16:35:53 INFO Utils: Successfully started service 'SparkUI' on port 4040.
20/11/29 16:35:53 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://anonymouspirates:4040
20/11/29 16:35:53 INFO Executor: Starting executor ID driver on host localhost
20/11/29 16:35:53 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 35529.
20/11/29 16:35:53 INFO NettyBlockTransferService: Server created on anonymouspirates:35529
20/11/29 16:35:53 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
20/11/29 16:35:53 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, anonymouspirates, 35529, None)
20/11/29 16:35:53 INFO BlockManagerMasterEndpoint: Registering block manager anonymouspirates:35529 with 366.3 MB RAM, BlockManagerId(driver, anonymouspirates, 35529, None)
20/11/29 16:35:53 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, anonymouspirates, 35529, None)
20/11/29 16:35:53 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, anonymouspirates, 35529, None)
20/11/29 16:35:54 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/home/wintersoldier/Documents/tookitaki/sparkTest/spark-warehouse').
20/11/29 16:35:54 INFO SharedState: Warehouse path is 'file:/home/wintersoldier/Documents/tookitaki/sparkTest/spark-warehouse'.
20/11/29 16:35:54 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
20/11/29 16:35:56 INFO MicroBatchExecution: Starting [id = 3918834a-7b1d-41d6-9069-60bfb807019f, runId = 9a8756a8-de0e-4bc1-8e37-91fc02bfaeb0]. Use file:///home/wintersoldier/Desktop/checkpoint to store the query checkpoint.
20/11/29 16:35:56 INFO MicroBatchExecution: Using MicroBatchReader [KafkaV2[Subscribe[sample_topic]]] from DataSourceV2 named 'kafka' [org.apache.spark.sql.kafka010.KafkaSourceProvider@ebf74a2]
20/11/29 16:35:56 INFO MicroBatchExecution: Starting new streaming query.
20/11/29 16:35:56 INFO MicroBatchExecution: Stream started from {}
20/11/29 16:35:56 INFO ConsumerConfig: ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = spark-kafka-source-dba9525c-4dfe-4cde-bcc7-0d54fde3e897-810093942-driver-0
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 1
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
20/11/29 16:35:56 INFO AppInfoParser: Kafka version : 2.0.0
20/11/29 16:35:56 INFO AppInfoParser: Kafka commitId : 3402a8361b734732
暂无答案!
目前还没有任何答案,快来回答吧!