flink应用程序接收器kafkaproducer抛出java堆空间错误(outofmemory)

cs7cruho  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(433)

我已经创建了flink应用程序,它接受字符串的数据流并使用kafka将其下沉。字符串的数据流是简单字符串fromcollection。

List<String> listOfStrings = new ArrayList<>();
listOfStrings.add("testkafka1");
listOfStrings.add("testkafka2");
listOfStrings.add("testkafka3");
listOfStrings.add("testkafka4");

DataStream<String> testStringStream = env.fromCollection(listOfStrings);

flink在kubernetes上运行,具有parllelism 1和1任务管理器。flink作业一开始就失败了,出现以下错误。

ERROR org.apache.kafka.common.utils.KafkaThread                     - Uncaught exception in kafka-producer-network-thread | producer-1: 
java.lang.OutOfMemoryError: Java heap space
    at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
    at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
    at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:75)
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:203)
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:167)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:381)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:326)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433)
    at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
    at org.apache.kafka.clients.producer.internals.Sender.awaitLeastLoadedNodeReady(Sender.java:409)
    at org.apache.kafka.clients.producer.internals.Sender.maybeSendTransactionalRequest(Sender.java:337)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:204)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
    at java.lang.Thread.run(Thread.java:748)

我的taskmanager配置是(取自taskmanager日志)
正在启动任务管理器配置文件:

jobmanager.rpc.address: component-app-adb71002-tm-5c6f4d58bd-rtblz
jobmanager.rpc.port: 6123
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m
taskmanager.numberOfTaskSlots: 2
parallelism.default: 1
jobmanager.execution.failover-strategy: region
blob.server.port: 6124
query.server.port: 6125
blob.server.port: 6125
fs.s3a.aws.credentials.provider: org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.DefaultAWSCredentialsProviderChain
jobmanager.heap.size: 524288k
jobmanager.rpc.port: 6123
jobmanager.web.port: 8081
metrics.internal.query-service.port: 50101
metrics.reporter.dghttp.apikey: f52362263f032f2ebc3622cafc0171cd
metrics.reporter.dghttp.class: org.apache.flink.metrics.datadog.DatadogHttpReporter
metrics.reporter.dghttp.tags: componentingestion,dev
query.server.port: 6124
taskmanager.heap.size: 1048576k
taskmanager.numberOfTaskSlots: 1
web.upload.dir: /opt/flink
jobmanager.rpc.address: component-app-adb71002
taskmanager.host: 10.42.6.6
Starting taskexecutor as a console application on host component-app-adb71002-tm-5c6f4d58bd-rtblz.
2020-02-11 15:19:20,519 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       - --------------------------------------------------------------------------------
2020-02-11 15:19:20,520 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       -  Starting TaskManager (Version: 1.9.2, Rev:c9d2c90, Date:24.01.2020 @ 08:44:30 CST)
2020-02-11 15:19:20,520 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       -  OS current user: flink
2020-02-11 15:19:20,520 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       -  Current Hadoop/Kerberos user: <no hadoop dependency found>
2020-02-11 15:19:20,520 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.242-b08
2020-02-11 15:19:20,521 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       -  Maximum heap size: 922 MiBytes
2020-02-11 15:19:20,521 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       -  JAVA_HOME: /usr/local/openjdk-8
2020-02-11 15:19:20,521 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       -  No Hadoop Dependency available
2020-02-11 15:19:20,521 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       -  JVM Options:
2020-02-11 15:19:20,521 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       -     -XX:+UseG1GC
2020-02-11 15:19:20,521 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       -     -Xms922M
2020-02-11 15:19:20,521 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       -     -Xmx922M
2020-02-11 15:19:20,521 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       -     -XX:MaxDirectMemorySize=8388607T
2020-02-11 15:19:20,521 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       -     -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
2020-02-11 15:19:20,522 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       -     -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
2020-02-11 15:19:20,522 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       -  Program Arguments:
2020-02-11 15:19:20,522 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       -     --configDir
2020-02-11 15:19:20,522 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       -     /opt/flink/conf
2020-02-11 15:19:20,522 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       -  Classpath: /opt/flink/lib/flink-metrics-datadog-1.9.2.jar:/opt/flink/lib/flink-table-blink_2.12-1.9.2.jar:/opt/flink/lib/flink-table_2.12-1.9.2.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.15.jar:/opt/flink/lib/flink-dist_2.12-1.9.2.jar:::

我得到的是

acks = 1
batch.size = 16384
bootstrap.servers = [XXXXXXXXXXXXXXXX] ---I masked it intentionally
buffer.memory = 33554432
client.id = 
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = false
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 3
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = Source: Collection Source -> Sink: Unnamed-eb99017e0f9125fa6648bf56123bdcf7-19
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

大多数producer配置都是默认的,是我在这里遗漏了什么还是配置有什么问题?

iyfamqjs

iyfamqjs1#

正如dominik所建议的,这个问题与heap无关。
如果代理是用ssl身份验证设置的,而客户端不是为ssl身份验证设置的,则会引发此异常。
这是Kafka的错误。
https://issues.apache.org/jira/browse/kafka-4090

相关问题