docker KStreams应用程序状态,dir -检查点文件编号

5lwkijsr  于 2023-04-29  发布在  Docker
关注(0)|答案(2)|浏览(62)

我有一个在Docker容器中运行的KStreams应用程序,它使用持久性键值存储。我的运行环境是Docker 1。RHEL 7上的13.1。
我将state.dir配置为/tmp/kafka-streams(默认值)。
当我使用“docker run”启动这个容器时,我将/tmp/kafka-streams挂载到我的主机上的一个目录,例如/mnt/storage/kafka-streams
我的application.id是“myapp”。我的输入主题中有288个分区,这意味着我的状态存储/更改日志主题也将有这么多分区。因此,当启动我的Docker容器时,我看到有一个分区编号为0_1,0_2的文件夹。...0_288在/mnt/storage/kafka-streams/myapp/
当我关闭我的应用程序时,我看不到任何分区目录中的任何.checkpoint文件。
当我重新启动应用程序时,它开始从changelog主题中获取记录,而不是从本地磁盘阅读。我怀疑这是因为在任何分区目录中都没有.checkpoint文件。(注意:我可以在分区目录中看到.lockrocksdb子目录)
这是我在启动日志中看到的。它似乎是从更新日志主题i引导整个状态存储。e.执行网络I/O而不是从磁盘上阅读:

2022-05-31T12:08:02.791 [mtx-caf-f6900c0a-50ca-43a0-8a4b-95eaad9e5093-StreamThread-122] WARN  o.a.k.s.p.i.ProcessorStateManager - MSG=stream-thread [myapp-f6900c0a-50ca-43a0-8a4b-95eaa
d9e5093-StreamThread-122] task [0_170] State store MyAppRecordStore did not find checkpoint offsets while stores are not empty, since under EOS it has the risk of getting uncommitte
d data in stores we have to treat it as a task corruption error and wipe out the local state of task 0_170 before re-bootstrapping
2022-05-31T12:08:02.791 [myapp-f6900c0a-50ca-43a0-8a4b-95eaad9e5093-StreamThread-122] WARN  o.a.k.s.p.internals.StreamThread - MSG=stream-thread [mtx-caf-f6900c0a-50ca-43a0-8a4b-95eaad
9e5093-StreamThread-122] Detected the states of tasks [0_170] are corrupted. Will close the task as dirty and re-create and bootstrap from scratch.
org.apache.kafka.streams.errors.TaskCorruptedException: Tasks [0_170] are corrupted and hence needs to be re-initialized
        at org.apache.kafka.streams.processor.internals.ProcessorStateManager.initializeStoreOffsetsFromCheckpoint(ProcessorStateManager.java:254)
        at org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:109)
        at org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:216)
        at org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:433)
        at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:849)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:731)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)

1.当我关闭我的应用程序时,我应该在/mnt/storage/kafka-streams/myapp/下的每个分区目录中看到一个.checkpoint文件吗?
1.这是一个问题,因为我在docker容器中运行我的KStreams应用程序?如果有权限问题,那么我希望在创建其他文件时看到问题,如。lock或rocksdb文件夹(及其内容)。
1.如果我在我的Windows笔记本电脑上运行这个应用程序作为一个独立的/可运行的Springboot JAR。例如,不在Docker容器中,我可以看到它创建了。检查点文件如预期。

knsnq2tg

knsnq2tg1#

我的Java应用程序在Docker容器中通过入口点脚本运行。看起来如果我停止容器,那么它不会向我的java进程发送$TERM信号,因此不会完全关闭java KStreams应用程序。
所以,我需要做的就是找到一种方法,以某种方式向容器内的Java应用程序发送一个$TERM信号。
目前,我只是ssh'艾德到容器中,并为我的java进程做了一个kill -s TERM <pid>。一旦我这样做,它导致了一个干净的关闭,因此创建了.checpoint文件以及。

oxf4rvwz

oxf4rvwz2#

我假设您已经将DockerFile中的ENTRYPOINT配置为脚本,对吗?
类似于:

ENTRYPOINT ["/bin/run.sh"]

在该脚本中,您将调用java:

#!/bin/bash

java <cp, and thing to run>

如果是这种情况,那么$TERM信号将不会被转发到您的Java进程。你需要使用exec:

#!/bin/bash

exec java <cp, and thing to run>

使用exec,将转发TERM信号。
为什么?
Java进程没有收到$TERM信号的原因是Docker正在向ENTRYPOINT发送$TERM信号。即运行脚本的shell进程,脚本不会将其转发到Java进程。
exec命令用Java进程 * 替换 * shell进程,因此现在向它发送了$TERM信号。

相关问题