使用apache drill“构造kafka使用者失败”

mo49yndu  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(398)

我在我的应用程序中使用ApacheDrill(1.14)jdbc驱动程序,它使用来自kafka的数据。应用程序在一段时间内运行良好,经过几次迭代后,由于以下原因,它无法执行 Too many files open 问题。我确保我的代码中没有文件句柄泄漏,但仍然不确定为什么会发生这个问题?
在构建kafka使用者时,apachedrill库中的问题似乎正在发生。有谁能帮我解决这个问题吗?
当我重新启动apache钻头时,问题就消失了,但很快它又发生了。我确实使用 ulimit -a | wc -l & lsof -a -p <PID> | wc -l 在钻取过程重新启动之前和之后,钻取过程似乎占用了大量的文件描述符。我尝试增加系统上的文件描述符计数,但仍然没有成功。
在将kafka插件配置为apachedrill时,我遵循了apachedrill存储插件文档https://drill.apache.org/docs/kafka-storage-plugin/
在这个问题上的任何帮助都是非常感谢的。谢谢。
jdbc url: jdbc:drill:drillbit=localhost:31010;schema=kafka 注意:我正在按下查询中的过滤器 SELECT * FROM myKafkaTopic WHERE kafkaMsgTimestamp > 1560210931626 ```
org.apache.drill.common.exceptions.UserException: DATA_READ ERROR: Failed to fetch start/end offsets of the topic myKafkaTopic

Failed to construct kafka consumer

[Error Id: 73f896a7-09d4-425b-8cd5-f269c3a6e69a ]
at org.apache.drill.common.exceptions.UserException$Builder.build(UserException.java:633) ~[drill-common-1.14.0.jar:1.14.0]
at org.apache.drill.exec.store.kafka.KafkaGroupScan.init(KafkaGroupScan.java:198) [drill-storage-kafka-1.14.0.jar:1.14.0]
at org.apache.drill.exec.store.kafka.KafkaGroupScan.(KafkaGroupScan.java:98) [drill-storage-kafka-1.14.0.jar:1.14.0]
at org.apache.drill.exec.store.kafka.KafkaStoragePlugin.getPhysicalScan(KafkaStoragePlugin.java:83) [drill-storage-kafka-1.14.0.jar:1.14.0]
at org.apache.drill.exec.store.AbstractStoragePlugin.getPhysicalScan(AbstractStoragePlugin.java:111) [drill-java-exec-1.14.0.jar:1.14.0]
at org.apache.drill.exec.planner.logical.DrillTable.getGroupScan(DrillTable.java:99) [drill-java-exec-1.14.0.jar:1.14.0]
at org.apache.drill.exec.planner.logical.DrillScanRel.(DrillScanRel.java:89) [drill-java-exec-1.14.0.jar:1.14.0]
at org.apache.drill.exec.planner.logical.DrillScanRel.(DrillScanRel.java:69) [drill-java-exec-1.14.0.jar:1.14.0]
at org.apache.drill.exec.planner.logical.DrillScanRel.(DrillScanRel.java:62) [drill-java-exec-1.14.0.jar:1.14.0]
at org.apache.drill.exec.planner.logical.DrillScanRule.onMatch(DrillScanRule.java:38) [drill-java-exec-1.14.0.jar:1.14.0]
at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212) [calcite-core-1.16.0-drill-r6.jar:1.16.0-drill-r6]
at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:652) [calcite-core-1.16.0-drill-r6.jar:1.16.0-drill-r6]
at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368) [calcite-core-1.16.0-drill-r6.jar:1.16.0-drill-r6]
at org.apache.drill.exec.planner.sql.handlers.DefaultSqlHandler.transform(DefaultSqlHandler.java:429) [drill-java-exec-1.14.0.jar:1.14.0]
at org.apache.drill.exec.planner.sql.handlers.DefaultSqlHandler.transform(DefaultSqlHandler.java:369) [drill-java-exec-1.14.0.jar:1.14.0]
at org.apache.drill.exec.planner.sql.handlers.DefaultSqlHandler.convertToRawDrel(DefaultSqlHandler.java:255) [drill-java-exec-1.14.0.jar:1.14.0]
at org.apache.drill.exec.planner.sql.handlers.DefaultSqlHandler.convertToDrel(DefaultSqlHandler.java:318) [drill-java-exec-1.14.0.jar:1.14.0]
at org.apache.drill.exec.planner.sql.handlers.DefaultSqlHandler.getPlan(DefaultSqlHandler.java:180) [drill-java-exec-1.14.0.jar:1.14.0]
at org.apache.drill.exec.planner.sql.DrillSqlWorker.getQueryPlan(DrillSqlWorker.java:145) [drill-java-exec-1.14.0.jar:1.14.0]
at org.apache.drill.exec.planner.sql.DrillSqlWorker.getPlan(DrillSqlWorker.java:83) [drill-java-exec-1.14.0.jar:1.14.0]
at org.apache.drill.exec.work.foreman.Foreman.runSQL(Foreman.java:567) [drill-java-exec-1.14.0.jar:1.14.0]
at org.apache.drill.exec.work.foreman.Foreman.run(Foreman.java:266) [drill-java-exec-1.14.0.jar:1.14.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_181]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_181]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_181]
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:765) ~[kafka-clients-0.11.0.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:633) ~[kafka-clients-0.11.0.1.jar:na]
at org.apache.drill.exec.store.kafka.KafkaGroupScan.init(KafkaGroupScan.java:168) [drill-storage-kafka-1.14.0.jar:1.14.0]
... 23 common frames omitted
Caused by: org.apache.kafka.common.KafkaException: java.io.IOException: Too many open files
at org.apache.kafka.common.network.Selector.(Selector.java:129) ~[kafka-clients-0.11.0.1.jar:na]
at org.apache.kafka.common.network.Selector.(Selector.java:156) ~[kafka-clients-0.11.0.1.jar:na]
at org.apache.kafka.common.network.Selector.(Selector.java:160) ~[kafka-clients-0.11.0.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:701) ~[kafka-clients-0.11.0.1.jar:na]
... 25 common frames omitted
Caused by: java.io.IOException: Too many open files
at sun.nio.ch.EPollArrayWrapper.epollCreate(Native Method) ~[na:1.8.0_181]
at sun.nio.ch.EPollArrayWrapper.(EPollArrayWrapper.java:130) ~[na:1.8.0_181]
at sun.nio.ch.EPollSelectorImpl.(EPollSelectorImpl.java:69) ~[na:1.8.0_181]
at sun.nio.ch.EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36) ~[na:1.8.0_181]
at java.nio.channels.Selector.open(Selector.java:227) ~[na:1.8.0_181]
at org.apache.kafka.common.network.Selector.(Selector.java:127) ~[kafka-clients-0.11.0.1.jar:na]```

q35jwt9p

q35jwt9p1#

这是因为kafka存储插件从不显式关闭与kafka的连接。它依赖于服务器的空闲超时(connections.max.idle.ms),默认值为10分钟。我在运行查询几分钟时看到了完全相同的问题—我们减少了默认超时时间以验证情况是否属实。

相关问题