无法删除kafka流应用程序ide中的状态目录

qojgxg4l  于 2021-06-06  发布在  Kafka
关注(0)|答案(4)|浏览(374)

我正在开发一个简单的kafka流应用程序,它从一个主题中提取消息,并在转换后将其放入另一个主题中。我正在使用intelij进行我的开发。
当我调试/运行这个应用程序时,如果我的ide和kafka服务器位于同一台机器上,那么它工作得非常好
(即,使用引导程序\u服务器\u配置=localhost:9092 and 架构\注册表\ url \配置=localhost:8081)
但是,当我尝试使用另一台机器进行开发时
(即,使用引导程序\u servers\u config=.。:9092 and schema\u registry\u url\u config=.。:8081 where ..是我的Kafka的ip地址),
调试过程在第一次运行时没有问题。但是,在重置偏移量后第二次运行时,收到以下错误:

ERROR stream-thread [main] Failed to delete the state directory. (org.apache.kafka.streams.processor.internals.StateDirectory:297) 
java.nio.file.DirectoryNotEmptyException: \tmp\kafka-streams\my_application_id\0_0
Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: java.nio.file.DirectoryNotEmptyException:

如果我改变了 my_application_id 作为 my_application_id2 ,然后运行它,它在第一次运行时再次工作,但是如果我再次运行它,它将再次收到错误。
我在申请表的最后一句话中有以下代码:

Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

如何解决这个问题有什么建议吗?
更新:
我已经检查了在我的开发机器(windows平台)中创建的状态目录,如果我在第二次运行之前手动删除这些目录,没有发现错误。我尝试以管理员身份运行ide,因为我认为这可能与文件夹的权限有关。然而,这并没有帮助。
完整堆栈供参考:
信息Kafka版本:1.1.0(org.apache.kafka.common.utils.appinfoparser:109)Kafka委员会信息:fdcf75ea326b8e07(org.apache.kafka.common.utils.appinfoparser:110)信息流线程[main]正在删除任务0\u 0的状态目录0\u 0,用户正在调用清除(org.apache.kafka.streams.processor.internals.statedirectory:281)已断开与目标vm的连接,地址:'127.0.0.1:16552',传输:线程“main”org.apache.kafka.streams.errors.streamsexception:java.nio.file.directorynotemptyexception:c:\workspace\bennychan\kafka streams\my\u application\u 001\0\u 0 at org.apache.kafka.streams.processor.internals.stateditory.clean(stateditory)。java:231)在org.apache.kafka.streams.kafkastreams.cleanup(kafkastreams。java:931)在com.macroviewhk.financialreport.simplestream.start(simplestream。java:60)请访问com.macroviewhk.financialreport.simplestream.main(simplestream.com)。java:45)原因:java.nio.file.directorynotemptyexception:c:\workspace\bennychan\kafka streams\my\u application\u 001\0\u 0 atsun.nio.fs.windowsfilesystemprovider.impldelete(windowsfilesystemprovider。java:266)在sun.nio.fs.abstractfilesystemprovider.delete(abstractfilesystemprovider。java:103)在java.nio.file.files.delete(文件)。java:1126)在org.apache.kafka.common.utils.utils$1.postvisitdirectory(utils。java:651)在org.apache.kafka.common.utils.utils$1.postvisitdirectory(utils。java:634)在java.nio.file.files.walkfiletree(文件。java:2688)在java.nio.file.files.walkfiletree(文件。java:2742)在org.apache.kafka.common.utils.utils.delete(utils。java:634)错误流线程[main]未能删除状态目录(org.apache.kafka.streams.processor.internals.statedirectory:297)在org.apache.kafka.streams.processor.internals.statedirectory.cleanremovedtasks(statedirectory。java:287)java.nio.file.directorynotemptyexception:c:\workspace\bennychan\kafka streams\my\u application\u 001\0\u 0位于org.apache.kafka.streams.processor.internals.statedirectory.clean(statedirectory。java:228)在sun.nio.fs.windowsfilesystemprovider.impldelete(windowsfilesystemprovider。java:266) ... 在sun.nio.fs.abstractfilesystemprovider.delete(abstractfilesystemprovider。java:103)在java.nio.file.files.delete(文件)。java:1126)在org.apache.kafka.common.utils.utils$1.postvisitdirectory(utils。java:651)在org.apache.kafka.common.utils.utils$1.postvisitdirectory(utils。java:634)在java.nio.file.files.walkfiletree(文件。java:2688)在java.nio.file.files.walkfiletree(文件。java:2742)在org.apache.kafka.common.utils.utils.delete(utils。java:634)在org.apache.kafka.streams.processor.internals.statedirectory.cleanremovedtasks(statedirectory。java:287)在org.apache.kafka.streams.processor.internals.statedirectory.clean(statedirectory。java:228)在org.apache.kafka.streams.kafkastreams.cleanup(kafkastreams。java:931)在com.macroviewhk.financialreport.simplestream.start(simplestream。java:60)请访问com.macroviewhk.financialreport.simplestream.main(simplestream.com)。java:45)
更新2:在另一个详细检查之后,下面的行抛出异常

Files.walkFileTree(file.toPath(), new SimpleFileVisitor<Path>() {

该行位于kafka-clients-1.1.0.jar org.apache.kafka.common.utilsutils.class
可能这是windows系统的问题(对不起,我不是一个有经验的java程序员)。

qnyhuwrf

qnyhuwrf1#

这是我们在windows上实现的。这是用Kotlin写的。
使用版本:Kafka流测试-utils:2.3.0.
关键是捕捉异常。只要捕获由引发的异常,测试就会通过 testDriver.close() 即使你不删除目录。但是,清理目录会使单元测试独立且可重复。

val directory = "test"

@BeforeEach
fun setup(){
    //other code omitted for setting the props
    props.setProperty(StreamsConfig.STATE_DIR_CONFIG,directory)
}

@AfterEach
fun tearDown(){
    try{
        testDriver.close()
    }catch(exception: Exception){
        FileUtils.deleteDirectory(File(directory)) //there is a bug on Windows that does not delete the state directory properly. In order for the test to pass, the directory must be deleted manually
    }
}
6yt4nkrj

6yt4nkrj2#

对谷歌来说。。
我目前正在使用这个scala代码来帮助windows人员处理状态存储的删除。

if (System.getProperty("os.name").toLowerCase.contains("windows")) {
  logger.info("WINDOWS OS MODE - Cleanup state store.")
  try {
    FileUtils.deleteDirectory(new File("/tmp/kafka-streams/" + config.getProperty("application.id")))
    FileUtils.forceMkdir(new File("/tmp/kafka-streams/" + config.getProperty("application.id")))
  } catch {
    case e: Exception => logger.error(e.toString)
  }
}
else {
  streams.cleanUp()
}
pgky5nke

pgky5nke3#

我同意@ideano1,它似乎与https://issues.apache.org/jira/browse/kafka-6647 --你可以尝试的是,显式地调用 KafkaStreams#cleanUp() 测试之间。目前尚不清楚windowos为何会出现问题。所有的测试都在linux上进行。

ztyzrc3y

ztyzrc3y4#

对于测试(但不仅是在你负担得起的情况下),你可以使用 IN_MEMORY("in-memory") 为每个存储 KTable 创建(直接或间接,如聚合);这样可以避免创建任何目录,从而不再发生错误。

相关问题