如何避免连续的“重置偏移”和“寻找最新偏移”?

sg24os4d  于 2021-07-13  发布在  Spark
关注(0)|答案(2)|浏览(594)

我试着遵循以下指南:https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html 但我不明白为什么我大部分时间都不向控制台写入数据,为什么它会滥发执行线程日志?
我需要配置什么吗?这是我的密码:

SparkSession spark = SparkSession
  .builder()
  .appName("Testing")
  .config("spark.master", "local")
  .getOrCreate();

StructType recordSchema = new StructType()
  .add("description", "string")
  .add("location", "string")
  .add("id", "string")
  .add("title", "string")
  .add("company", "string")
  .add("place", "string")
  .add("date", "string")
  .add("senorityLevel", "string")
  .add("function", "string")
  .add("employmentType", "string")
  .add("industries", "string");

Dataset<Row> df = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "127.0.0.1:9092")
  .option("subscribe", "linkedin-producer")
  .option("startingOffsets", "earliest")
  .option("kafka.group.id","test")
  .load();

StreamingQuery query = df
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .select(from_json(col("value").cast("string"), recordSchema).as("data"))
  .select("data.*")
  .writeStream()
  .outputMode(OutputMode.Append())
  .format("console")
  .start();

try {
  query.awaitTermination(10000);
} catch (StreamingQueryException e) {
  e.printStackTrace();
}

有时我会在控制台中获取df,但我的控制台中充满了这样的内容:

[Executor task launch worker for task 1] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.7.0
[Executor task launch worker for task 1] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 448719dc99a19793
[Executor task launch worker for task 1] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1613492229792
[Executor task launch worker for task 1] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-test-3, groupId=test] Subscribed to partition(s): linkedin-producer-0
[Executor task launch worker for task 1] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-test-3, groupId=test] Seeking to offset 0 for partition linkedin-producer-0
[Executor task launch worker for task 1] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-test-3, groupId=test] Cluster ID: N88wfukWTIS-ycMeSGhhng
[task-result-getter-0] INFO org.apache.spark.network.client.TransportClientFactory - Successfully created connection to /10.0.0.9:44237 after 76 ms (0 ms spent in bootstraps)
[Executor task launch worker for task 1] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-test-3, groupId=test] Seeking to offset 500 for partition linkedin-producer-0
[task-result-getter-0] INFO org.apache.spark.scheduler.TaskSetManager - Finished task 0.0 in stage 0.0 (TID 0) in 1069 ms on 10.0.0.9 (executor driver) (1/3)
[dispatcher-BlockManagerMaster] INFO org.apache.spark.storage.BlockManagerInfo - Removed taskresult_0 on 10.0.0.9:44237 in memory (size: 2.9 MiB, free: 848.4 MiB)
[Executor task launch worker for task 1] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-test-3, groupId=test] Seeking to offset 909 for partition linkedin-producer-0
[Executor task launch worker for task 1] INFO org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask - Commit authorized for partition 1 (task 1, attempt 0, stage 0.0)
[Executor task launch worker for task 1] INFO org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask - Committed partition 1 (task 1, attempt 0, stage 0.0)
[Executor task launch worker for task 1] INFO org.apache.spark.storage.memory.MemoryStore - Block taskresult_1 stored as bytes in memory (estimated size 2.9 MiB, free 845.5 MiB)
[dispatcher-BlockManagerMaster] INFO org.apache.spark.storage.BlockManagerInfo - Added taskresult_1 in memory on 10.0.0.9:44237 (size: 2.9 MiB, free: 845.5 MiB)
[Executor task launch worker for task 1] INFO org.apache.spark.executor.Executor - Finished task 1.0 in stage 0.0 (TID 1). 3003495 bytes result sent via BlockManager)
[dispatcher-event-loop-1] INFO org.apache.spark.scheduler.TaskSetManager - Starting task 2.0 in stage 0.0 (TID 2, 10.0.0.9, executor driver, partition 2, PROCESS_LOCAL, 8103 bytes)
[Executor task launch worker for task 2] INFO org.apache.spark.executor.Executor - Running task 2.0 in stage 0.0 (TID 2)
[task-result-getter-1] INFO org.apache.spark.scheduler.TaskSetManager - Finished task 1.0 in stage 0.0 (TID 1) in 304 ms on 10.0.0.9 (executor driver) (2/3)
[dispatcher-BlockManagerMaster] INFO org.apache.spark.storage.BlockManagerInfo - Removed taskresult_1 on 10.0.0.9:44237 in memory (size: 2.9 MiB, free: 848.4 MiB)
[Executor task launch worker for task 2] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: 
    allow.auto.create.topics = true
    auto.commit.interval.ms = 5000
    auto.offset.reset = none
    bootstrap.servers = [127.0.0.1:9092]
    check.crcs = true
    client.dns.lookup = use_all_dns_ips
    client.id = consumer-test-4
    client.rack = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = test
    group.instance.id = null
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    internal.throw.on.fetch.stable.offset.unsupported = false
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    socket.connection.setup.timeout.max.ms = 127000
    socket.connection.setup.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.2
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

[Executor task launch worker for task 2] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.7.0
[Executor task launch worker for task 2] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 448719dc99a19793
[Executor task launch worker for task 2] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1613492230087
[Executor task launch worker for task 2] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-test-4, groupId=test] Subscribed to partition(s): linkedin-producer-2
[Executor task launch worker for task 2] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-test-4, groupId=test] Seeking to offset 0 for partition linkedin-producer-2
[Executor task launch worker for task 2] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-test-4, groupId=test] Cluster ID: N88wfukWTIS-ycMeSGhhng
[Executor task launch worker for task 2] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-test-4, groupId=test] Seeking to offset 500 for partition linkedin-producer-2
[Executor task launch worker for task 2] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-test-4, groupId=test] Seeking to offset 905 for partition linkedin-producer-2
[Executor task launch worker for task 2] INFO org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask - Commit authorized for partition 2 (task 2, attempt 0, stage 0.0)
[Executor task launch worker for task 2] INFO org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask - Committed partition 2 (task 2, attempt 0, stage 0.0)
[Executor task launch worker for task 2] INFO org.apache.spark.storage.memory.MemoryStore - Block taskresult_2 stored as bytes in memory (estimated size 2.9 MiB, free 845.5 MiB)
[dispatcher-BlockManagerMaster] INFO org.apache.spark.storage.BlockManagerInfo - Added taskresult_2 in memory on 10.0.0.9:44237 (size: 2.9 MiB, free: 845.5 MiB)
[Executor task launch worker for task 2] INFO org.apache.spark.executor.Executor - Finished task 2.0 in stage 0.0 (TID 2). 3001144 bytes result sent via BlockManager)
[task-result-getter-2] INFO org.apache.spark.scheduler.TaskSetManager - Finished task 2.0 in stage 0.0 (TID 2) in 240 ms on 10.0.0.9 (executor driver) (3/3)
[dispatcher-BlockManagerMaster] INFO org.apache.spark.storage.BlockManagerInfo - Removed taskresult_2 on 10.0.0.9:44237 in memory (size: 2.9 MiB, free: 848.4 MiB)
[task-result-getter-2] INFO org.apache.spark.scheduler.TaskSchedulerImpl - Removed TaskSet 0.0, whose tasks have all completed, from pool 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - ResultStage 0 (start at Spark.java:73) finished in 1.730 s
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Job 0 is finished. Cancelling potential speculative or zombie tasks for this job
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.TaskSchedulerImpl - Killing all running tasks in stage 0: Stage finished
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.spark.scheduler.DAGScheduler - Job 0 finished: start at Spark.java:73, took 1.768779 s
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec - Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@52eea1c3 is committing.
-------------------------------------------
Batch: 0
-------------------------------------------
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator - Code generated in 29.841333 ms
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator - Code generated in 30.563541 ms
+--------------------+--------+----------+--------------------+--------------------+--------------------+----------+----------------+--------------------+--------------+--------------------+
|         description|location|        id|               title|             company|               place|      date|   senorityLevel|            function|employmentType|          industries|
+--------------------+--------+----------+--------------------+--------------------+--------------------+----------+----------------+--------------------+--------------+--------------------+
|Job Summary We ar...|  Israel|2406654159|       Retail Intern|Disney Media & En...|Tel Aviv, Tel Avi...|2021-02-11|Mid-Senior level|General Business,...|     Full-time|Marketing and Adv...|
|We're looking for...|  Israel|2404180635|  Personal Assistant|            Lemonade|Tel Aviv, Tel Avi...|2021-01-07|     Entry level|      Administrative|     Full-time|Marketing and Adv...|
|Job Summary We ar...|  Israel|2398561147|Retail intern -12...|The Walt Disney C...|    Tel Aviv, Israel|2021-02-10|      Internship|           Marketing|     Full-time|       Entertainment|
|We're looking for...|  Israel|2404180635|  Personal Assistant|            Lemonade|Tel Aviv, Tel Avi...|2021-01-07|     Entry level|      Administrative|     Full-time|Marketing and Adv...|
|We're looking for...|  Israel|2404180635|  Personal Assistant|            Lemonade|Tel Aviv, Tel Avi...|2021-01-07|     Entry level|      Administrative|     Full-time|Marketing and Adv...|
|Job Summary We ar...|  Israel|2406654159|       Retail Intern|Disney Media & En...|Tel Aviv, Tel Avi...|2021-02-11|Mid-Senior level|General Business,...|     Full-time|Marketing and Adv...|
|Job Summary We ar...|  Israel|2398561147|Retail intern -12...|The Walt Disney C...|    Tel Aviv, Israel|2021-02-10|      Internship|           Marketing|     Full-time|       Entertainment|
|At CrowdStrike we...|  Israel|2406653801|       HR Generalist|         CrowdStrike|Ramat Gan, Tel Av...|2021-02-11|       Associate|     Human Resources|     Full-time|Information Techn...|
|Job Description W...|  Israel|2406699205|HR Administrator ...| Akamai Technologies|Tel Aviv, Tel Avi...|2021-02-11|  Not Applicable|     Human Resources|     Full-time|Computer Networki...|
|JOB PURPOSE To as...|  Israel|2403563715|Research, Campaig...|Amnesty Internati...|Jerusalem Municip...|2021-02-09|     Entry level|            Research|      Contract|Nonprofit Organiz...|
|Job Description A...|  Israel|2383126490|Receptionist – Pa...|    Ceragon Networks|Tel Aviv, Tel Avi...|2021-02-01|  Not Applicable|      Administrative|     Full-time|Computer Networki...|
|Fiverr is looking...|  Israel|2419715658|        Data Analyst|        About Fiverr|    Tel Aviv, Israel|2021-02-11|Mid-Senior level|Information Techn...|     Full-time|            Internet|
|חברת AlfaCloud - ...|  Israel|2400094107|     Project Manager|AlfaCloud - ERP S...|    Tel Aviv, Israel|2021-02-11|     Entry level|Project Managemen...|     Full-time|   Computer Software|
|טדי הפקות מחפשת א...|  Israel|2396568054|       Booking Agent|    Tedy Productions|    Tel Aviv, Israel|2021-02-09|     Entry level|Design, Art/Creat...|     Full-time|                    |
|The Norman Tel Av...|  Israel|2418149015|    Front Desk Staff| The Norman Tel Aviv|    Tel Aviv, Israel|2021-02-10|     Entry level|      Administrative|     Full-time|         Hospitality|
|Are you a stellar...|  Israel|2405797088|Regional Operatio...|                Wolt|Tel Aviv, Tel Avi...|2021-02-11|        Director|          Management|     Full-time|Marketing and Adv...|
|About CXBuzz Inte...|  Israel|2400078284|   Journalism Intern|              CXBuzz|    Tel Aviv, Israel|2021-02-11|      Internship| Education, Training|    Internship|          Publishing|
|Job Summary We ar...|  Israel|2406654159|       Retail Intern|Disney Media & En...|Tel Aviv, Tel Avi...|2021-02-11|Mid-Senior level|General Business,...|     Full-time|Marketing and Adv...|
|Job Summary We ar...|  Israel|2398561147|Retail intern -12...|The Walt Disney C...|    Tel Aviv, Israel|2021-02-10|      Internship|           Marketing|     Full-time|       Entertainment|
|At CrowdStrike we...|  Israel|2406653801|       HR Generalist|         CrowdStrike|Ramat Gan, Tel Av...|2021-02-11|       Associate|     Human Resources|     Full-time|Information Techn...|
+--------------------+--------+----------+--------------------+--------------------+--------------------+----------+----------------+--------------------+--------------+--------------------+
only showing top 20 rows

[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec - Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@52eea1c3 committed.
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.spark.sql.execution.streaming.CheckpointFileManager - Writing atomically to file:/tmp/temporary-c9ccc957-c729-4f8f-8635-1a029de31511/commits/0 using temp file file:/tmp/temporary-c9ccc957-c729-4f8f-8635-1a029de31511/commits/.0.0cdf78cd-795c-4c3c-94d1-91341e38187f.tmp
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.spark.sql.execution.streaming.CheckpointFileManager - Renamed temp file file:/tmp/temporary-c9ccc957-c729-4f8f-8635-1a029de31511/commits/.0.0cdf78cd-795c-4c3c-94d1-91341e38187f.tmp to file:/tmp/temporary-c9ccc957-c729-4f8f-8635-1a029de31511/commits/0
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.spark.sql.execution.streaming.MicroBatchExecution - Streaming query made progress: {
  "id" : "9d193cbf-379e-495e-87e3-18f9f09145ea",
  "runId" : "2e9f6d84-23af-4b23-89cd-73ecef66d290",
  "name" : null,
  "timestamp" : "2021-02-16T16:17:06.949Z",
  "batchId" : 0,
  "numInputRows" : 3813,
  "processedRowsPerSecond" : 1035.5784899511136,
  "durationMs" : {
    "addBatch" : 2786,
    "getBatch" : 22,
    "latestOffset" : 446,
    "queryPlanning" : 363,
    "triggerExecution" : 3681,
    "walCommit" : 23
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[linkedin-producer]]",
    "startOffset" : null,
    "endOffset" : {
      "linkedin-producer" : {
        "2" : 1269,
        "1" : 1272,
        "0" : 1272
      }
    },
    "numInputRows" : 3813,
    "processedRowsPerSecond" : 1035.5784899511136
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleTable$@793ec5d7",
    "numOutputRows" : 3813
  }
}
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-1, groupId=test] Seeking to LATEST offset of partition linkedin-producer-0
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-1, groupId=test] Seeking to LATEST offset of partition linkedin-producer-2
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-1, groupId=test] Seeking to LATEST offset of partition linkedin-producer-1
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-1, groupId=test] Resetting offset for partition linkedin-producer-0 to position FetchPosition{offset=1272, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[omri-Lenovo-YOGA-920-13IKB:9092 (id: 0 rack: null)], epoch=0}}.
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-1, groupId=test] Resetting offset for partition linkedin-producer-2 to position FetchPosition{offset=1269, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[omri-Lenovo-YOGA-920-13IKB:9092 (id: 0 rack: null)], epoch=0}}.
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-1, groupId=test] Resetting offset for partition linkedin-producer-1 to position FetchPosition{offset=1272, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[omri-Lenovo-YOGA-920-13IKB:9092 (id: 0 rack: null)], epoch=0}}.
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.spark.sql.execution.streaming.MicroBatchExecution - Streaming query made progress: {
  "id" : "9d193cbf-379e-495e-87e3-18f9f09145ea",
  "runId" : "2e9f6d84-23af-4b23-89cd-73ecef66d290",
  "name" : null,
  "timestamp" : "2021-02-16T16:17:10.664Z",
  "batchId" : 1,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "latestOffset" : 3,
    "triggerExecution" : 4
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[linkedin-producer]]",
    "startOffset" : {
      "linkedin-producer" : {
        "2" : 1269,
        "1" : 1272,
        "0" : 1272
      }
    },
    "endOffset" : {
      "linkedin-producer" : {
        "2" : 1269,
        "1" : 1272,
        "0" : 1272
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleTable$@793ec5d7",
    "numOutputRows" : 0
  }
}
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-1, groupId=test] Seeking to LATEST offset of partition linkedin-producer-0
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-1, groupId=test] Seeking to LATEST offset of partition linkedin-producer-2
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-1, groupId=test] Seeking to LATEST offset of partition linkedin-producer-1
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-1, groupId=test] Resetting offset for partition linkedin-producer-0 to position FetchPosition{offset=1272, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[omri-Lenovo-YOGA-920-13IKB:9092 (id: 0 rack: null)], epoch=0}}.
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-1, groupId=test] Resetting offset for partition linkedin-producer-2 to position FetchPosition{offset=1269, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[omri-Lenovo-YOGA-920-13IKB:9092 (id: 0 rack: null)], epoch=0}}.
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-1, groupId=test] Resetting offset for partition linkedin-producer-1 to position FetchPosition{offset=1272, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[omri-Lenovo-YOGA-920-13IKB:9092 (id: 0 rack: null)], epoch=0}}.
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-1, groupId=test] Seeking to LATEST offset of partition linkedin-producer-0
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-1, groupId=test] Seeking to LATEST offset of partition linkedin-producer-2
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-1, groupId=test] Seeking to LATEST offset of partition linkedin-producer-1
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-1, groupId=test] Resetting offset for partition linkedin-producer-0 to position FetchPosition{offset=1272, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[omri-Lenovo-YOGA-920-13IKB:9092 (id: 0 rack: null)], epoch=0}}.
.
.
.

pom.xml文件:

<!--Spark-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-avro_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
yi0zb3m4

yi0zb3m41#

据我所知,您只想运行作业10秒,因为您正在使用中的超时 awaitTermination .
为了避免您的工作不断检查Kafka中是否有新消息可用,您可以在中设置触发器 writeStream :

.writeStream()
  .outputMode(OutputMode.Append())
  .option("checkpointLocation", "/tmp/java/checkpoint/spark")
  .trigger(Trigger.ProcessingTime("1 second")) # Alternative
  //.trigger(Trigger.Once())
  .format("console")
  .start();

这只会每秒检查Kafka中的新消息,因此每秒只会看到一次“seeking”和“reseting”日志。
你也可能对 Trigger.Once() 另一种选择。

tjjdgumg

tjjdgumg2#

您正在获取记录器信息,因为您已使用默认日志级别作为信息。将日志记录级别设置为警告 spark.sparkContext.setLogLevel("WARN") .

相关问题