我正在开发一个简单的kafka流应用程序,它从一个主题中提取消息,并在转换后将其放入另一个主题中。我正在使用intelij进行我的开发。
当我调试/运行这个应用程序时,如果我的ide和kafka服务器位于同一台机器上,那么它工作得非常好
(即,使用引导程序\u服务器\u配置=localhost:9092 and 架构\注册表\ url \配置=localhost:8081)
但是,当我尝试使用另一台机器进行开发时
(即,使用引导程序\u servers\u config=.。:9092 and schema\u registry\u url\u config=.。:8081 where ..是我的Kafka的ip地址),
调试过程在第一次运行时没有问题。但是,在重置偏移量后第二次运行时,收到以下错误:
ERROR stream-thread [main] Failed to delete the state directory. (org.apache.kafka.streams.processor.internals.StateDirectory:297)
java.nio.file.DirectoryNotEmptyException: \tmp\kafka-streams\my_application_id\0_0
Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: java.nio.file.DirectoryNotEmptyException:
如果我改变了 my_application_id
作为 my_application_id2
,然后运行它,它在第一次运行时再次工作,但是如果我再次运行它,它将再次收到错误。
我在申请表的最后一句话中有以下代码:
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
如何解决这个问题有什么建议吗?
更新:
我已经检查了在我的开发机器(windows平台)中创建的状态目录,如果我在第二次运行之前手动删除这些目录,没有发现错误。我尝试以管理员身份运行ide,因为我认为这可能与文件夹的权限有关。然而,这并没有帮助。
完整堆栈供参考:
信息Kafka版本:1.1.0(org.apache.kafka.common.utils.appinfoparser:109)Kafka委员会信息:fdcf75ea326b8e07(org.apache.kafka.common.utils.appinfoparser:110)信息流线程[main]正在删除任务0\u 0的状态目录0\u 0,用户正在调用清除(org.apache.kafka.streams.processor.internals.statedirectory:281)已断开与目标vm的连接,地址:'127.0.0.1:16552',传输:线程“main”org.apache.kafka.streams.errors.streamsexception:java.nio.file.directorynotemptyexception:c:\workspace\bennychan\kafka streams\my\u application\u 001\0\u 0 at org.apache.kafka.streams.processor.internals.stateditory.clean(stateditory)。java:231)在org.apache.kafka.streams.kafkastreams.cleanup(kafkastreams。java:931)在com.macroviewhk.financialreport.simplestream.start(simplestream。java:60)请访问com.macroviewhk.financialreport.simplestream.main(simplestream.com)。java:45)原因:java.nio.file.directorynotemptyexception:c:\workspace\bennychan\kafka streams\my\u application\u 001\0\u 0 atsun.nio.fs.windowsfilesystemprovider.impldelete(windowsfilesystemprovider。java:266)在sun.nio.fs.abstractfilesystemprovider.delete(abstractfilesystemprovider。java:103)在java.nio.file.files.delete(文件)。java:1126)在org.apache.kafka.common.utils.utils$1.postvisitdirectory(utils。java:651)在org.apache.kafka.common.utils.utils$1.postvisitdirectory(utils。java:634)在java.nio.file.files.walkfiletree(文件。java:2688)在java.nio.file.files.walkfiletree(文件。java:2742)在org.apache.kafka.common.utils.utils.delete(utils。java:634)错误流线程[main]未能删除状态目录(org.apache.kafka.streams.processor.internals.statedirectory:297)在org.apache.kafka.streams.processor.internals.statedirectory.cleanremovedtasks(statedirectory。java:287)java.nio.file.directorynotemptyexception:c:\workspace\bennychan\kafka streams\my\u application\u 001\0\u 0位于org.apache.kafka.streams.processor.internals.statedirectory.clean(statedirectory。java:228)在sun.nio.fs.windowsfilesystemprovider.impldelete(windowsfilesystemprovider。java:266) ... 在sun.nio.fs.abstractfilesystemprovider.delete(abstractfilesystemprovider。java:103)在java.nio.file.files.delete(文件)。java:1126)在org.apache.kafka.common.utils.utils$1.postvisitdirectory(utils。java:651)在org.apache.kafka.common.utils.utils$1.postvisitdirectory(utils。java:634)在java.nio.file.files.walkfiletree(文件。java:2688)在java.nio.file.files.walkfiletree(文件。java:2742)在org.apache.kafka.common.utils.utils.delete(utils。java:634)在org.apache.kafka.streams.processor.internals.statedirectory.cleanremovedtasks(statedirectory。java:287)在org.apache.kafka.streams.processor.internals.statedirectory.clean(statedirectory。java:228)在org.apache.kafka.streams.kafkastreams.cleanup(kafkastreams。java:931)在com.macroviewhk.financialreport.simplestream.start(simplestream。java:60)请访问com.macroviewhk.financialreport.simplestream.main(simplestream.com)。java:45)
更新2:在另一个详细检查之后,下面的行抛出异常
Files.walkFileTree(file.toPath(), new SimpleFileVisitor<Path>() {
该行位于kafka-clients-1.1.0.jar org.apache.kafka.common.utilsutils.class
可能这是windows系统的问题(对不起,我不是一个有经验的java程序员)。
4条答案
按热度按时间qnyhuwrf1#
这是我们在windows上实现的。这是用Kotlin写的。
使用版本:Kafka流测试-utils:2.3.0.
关键是捕捉异常。只要捕获由引发的异常,测试就会通过
testDriver.close()
即使你不删除目录。但是,清理目录会使单元测试独立且可重复。6yt4nkrj2#
对谷歌来说。。
我目前正在使用这个scala代码来帮助windows人员处理状态存储的删除。
pgky5nke3#
我同意@ideano1,它似乎与https://issues.apache.org/jira/browse/kafka-6647 --你可以尝试的是,显式地调用
KafkaStreams#cleanUp()
测试之间。目前尚不清楚windowos为何会出现问题。所有的测试都在linux上进行。ztyzrc3y4#
对于测试(但不仅是在你负担得起的情况下),你可以使用
IN_MEMORY("in-memory")
为每个存储KTable
创建(直接或间接,如聚合);这样可以避免创建任何目录,从而不再发生错误。