是否支持kafka流与join的相同sink和source主题?

ljo96ir5  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(318)

我有一个复杂的kafka流应用程序,在同一个流中有两个完全有状态的流:
它使用一个 Execution 主题作为源,增强消息并重新发布回相同的 Execution 主题。
它加入了另一个主题 WorkerTaskResult ,将结果添加到 Execution 并发布回 Execution 主题。
主要目标是提供一个工作流系统。
详细逻辑如下:
执行是taskrun的列表
这个 Execution 看看所有人的现状 TaskRun 找到下一个要执行的
如果发现任何一个,执行改变他们 TaskRunsList 并添加下一个并发布回kafka,同时将要完成的任务发送到另一个队列( WorkerTask )
这个 WorkerTask 在kafka流之外继续并发布回另一个队列( WorkerTaskResult )与一个简单的Kafka消费者和生产者
这个 WorkerTaskResult 改变电流 TaskRun 在当前 Execution 并更改了状态(主要是running/success/failed),还发布回 Execution 队列(Kafka流)
如你所见 Execution (与 TaskRun 列表)是当前应用程序的状态。
当所有消息都是连续的(没有并发性,我只能有一个alter)时,流工作得很好 TaskRun 同时列出)。当工作流变得并行(并发)时 WorkerTaskResult 可以是join),似乎我的执行状态是override并产生一种roolback。
日志输出示例:

2020-04-20 08:05:44,830 INFO  reamThread-1 afkaExecutor Stream in with 3264792750: (
  state=RUNNING
  taskRunList=
  [
    TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
    TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=CREATED) # >>>>> t1 is created 
  ] 
)
2020-04-20 08:05:44,881 INFO  reamThread-1 afkaExecutor WorkerTaskResult: TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=RUNNING) # >>>>> worker send running state
2020-04-20 08:05:44,882 INFO  reamThread-1 afkaExecutor Stream out  with 1805535461 : (
  state=RUNNING
  taskRunList=
  [
    TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
    TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=RUNNING) # >>>>> t1 save the running state
  ] 
)
2020-04-20 08:05:45,047 INFO  reamThread-1 afkaExecutor WorkerTaskResult: TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=SUCCESS) # >>>>> worker send success
2020-04-20 08:05:45,047 INFO  reamThread-1 afkaExecutor Stream out  with 578845055 : (
  state=RUNNING
  taskRunList=
  [
    TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
    TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=SUCCESS)
  ] 
)
2020-04-20 08:05:45,153 INFO  reamThread-1 afkaExecutor Stream in with 1805535461: (
  state=RUNNING
  taskRunList=
  [
    TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
    TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=RUNNING) # >>>>> OUT OF ORDER AND ROLLBACK TO PREVIOUS VERSION
  ] 
)
2020-04-20 08:05:45,157 INFO  reamThread-1 afkaExecutor Stream out  with 1889889916 : (
  state=RUNNING
  taskRunList=
  [
    TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
    TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=RUNNING),
    TaskRun(id=6k23oBXy9cD0uCJeZ20SpB, taskId=t2, value=null, state=CREATED)
  ] 
)
2020-04-20 08:05:45,209 WARN  reamThread-1 KTableSource Detected out-of-order KTable update for execution at offset 10, partition 2.
2020-04-20 08:05:45,313 INFO  reamThread-1 afkaExecutor Stream in with 1889889916: (
  state=RUNNING
  taskRunList=
  [
    TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
    TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=RUNNING),
    TaskRun(id=6k23oBXy9cD0uCJeZ20SpB, taskId=t2, value=null, state=CREATED)
  ] 
)
2020-04-20 08:05:45,350 INFO  reamThread-1 afkaExecutor WorkerTaskResult: TaskRun(id=6k23oBXy9cD0uCJeZ20SpB, taskId=t2, value=null, state=RUNNING)
2020-04-20 08:05:45,350 INFO  reamThread-1 afkaExecutor Stream out  with 3651399223 : (
  state=RUNNING
  taskRunList=
  [
    TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
    TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=RUNNING),
    TaskRun(id=6k23oBXy9cD0uCJeZ20SpB, taskId=t2, value=null, state=RUNNING)
  ] 
)

我在控制台上有一些警告信息 Detected out-of-order KTable update for execution at offset 10, partition 7. 完整的来源可以在这里找到。
如果你也尝试许多不同的方法,比如这里的方法:
把钥匙放好 Execution 以及 WorkerTaskResult 在同一主题上,确保同一时间只处理同一条消息
保留最后一个 Execution 我自己在一家国有商店(为了加入 WorkerTaskResult & Execution )
但听起来好像我修改了一个ktable,这并没有更好的效果
或者这里的这个:
基本上与前一个相同(保留最后一个 Execution 一个人在一家国有商店)
但是使用2 kstream到kstream(删除ktable)。
我的问题是:
Kafka团队是否支持这种模式(这不是我们在同一主题上的dag流)?
将此流设计为并发安全的好方法是什么?
任何线索都很感激,完全卡住了,谢谢
编辑1:
以下是一些附加信息:
只有kstream应用程序将新事件发布到 Execution ,没有发布此主题的外部应用程序,发布执行的外部应用程序的唯一情况是第一个事件(即创建执行)。
有一个 WorkerApp (外部应用程序,简单的消费者/生产者)从 WorkerTask (要完成的工作)并在上发布结果 WorkerTaskResult (主要是应用程序的当前状态)。
以下是实际流的简化版本:

Builder 
  -> Stream 1
     - from KStream<WorkerTaskResult> 
     - join KTable<Execution>
     - to Execution topic 
  -> Stream 2 
     - from KTable<Execution> (same than previous)
     - multiple output 
       - to WorkerTaskResult topic (if found an end) 
       - to Execution & to WorkerTask topic (if found a next task)
       - to Execution topic (if detect an Execution end)

kstream主要是一个executor状态应用程序,用于查找下一个是什么 WorkerTask 并评估流是否结束,以便应用程序可以:
新建 TaskRun 电流变化状态 TaskRun 加入 WorkerTaskResult
评估整个执行,发现任务失败(基于依赖项)
更改执行状态并发布最终状态成功或失败,这将打破“无限循环”
在这个实际版本中,我真正不清楚的是 Detected out-of-order KTable update 在现实世界里?这是否意味着一个ktable必须在每个分区和每个键上有一个生产者,才能保持主题的顺序?
编辑2:
与此同时,我发现了一种新的方式来思考似乎正在运行的流应用程序。单元测试已通过 Detected out-of-order . 以下是简化的新流程:

Builder 
  - from KTable<Execution> 
  - leftJoin KTable<WorkerTaskResult> 
  - Branch 
    - If Join > to Execution topic
    - If not joint > continue the flow 
      - Multiple output (same than previous) 
        - to WorkerTaskResult topic (if found an end) 
        - to Execution & to WorkerTask topic (if found a next task)
        - to Execution topic (if detect an Execution end)

我认为有意义的是:
这个 WorkerTaskResult 现在是一个ktable,所以我只保留结果的最后一个版本
我有一个输出到 Execution (我认为这是解决问题的最重要部分)
整个系统似乎每个输入只有一个输出(1个新的值) Execution 将在上产生1个新值 Execution (主题)
以下是新拓扑:

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [kestra_execution])
      --> KTABLE-SOURCE-0000000001
    Processor: KTABLE-SOURCE-0000000001 (stores: [execution])
      --> KTABLE-TOSTREAM-0000000002, KTABLE-JOINTHIS-0000000007
      <-- KSTREAM-SOURCE-0000000000
    Source: KSTREAM-SOURCE-0000000004 (topics: [kestra_workertaskresult])
      --> KTABLE-SOURCE-0000000005
    Processor: KTABLE-SOURCE-0000000005 (stores: [workertaskresult])
      --> KTABLE-JOINOTHER-0000000008
      <-- KSTREAM-SOURCE-0000000004
    Processor: KTABLE-JOINOTHER-0000000008 (stores: [execution])
      --> KTABLE-MERGE-0000000006
      <-- KTABLE-SOURCE-0000000005
    Processor: KTABLE-JOINTHIS-0000000007 (stores: [workertaskresult])
      --> KTABLE-MERGE-0000000006
      <-- KTABLE-SOURCE-0000000001
    Processor: KTABLE-MERGE-0000000006 (stores: [])
      --> KTABLE-TOSTREAM-0000000009
      <-- KTABLE-JOINTHIS-0000000007, KTABLE-JOINOTHER-0000000008
    Processor: KTABLE-TOSTREAM-0000000009 (stores: [])
      --> KSTREAM-FILTER-0000000010, KSTREAM-FILTER-0000000015
      <-- KTABLE-MERGE-0000000006
    Processor: KSTREAM-FILTER-0000000015 (stores: [])
      --> KSTREAM-MAPVALUES-0000000016
      <-- KTABLE-TOSTREAM-0000000009
    Processor: KSTREAM-MAPVALUES-0000000016 (stores: [])
      --> KSTREAM-MAPVALUES-0000000017
      <-- KSTREAM-FILTER-0000000015
    Processor: KSTREAM-MAPVALUES-0000000017 (stores: [])
      --> KSTREAM-FLATMAPVALUES-0000000018, KSTREAM-FILTER-0000000024, KSTREAM-FILTER-0000000019, KSTREAM-MAPVALUES-0000000067
      <-- KSTREAM-MAPVALUES-0000000016
    Processor: KSTREAM-FLATMAPVALUES-0000000018 (stores: [])
      --> KSTREAM-FILTER-0000000042, KSTREAM-FILTER-0000000055, KSTREAM-FILTER-0000000030
      <-- KSTREAM-MAPVALUES-0000000017
    Processor: KSTREAM-FILTER-0000000042 (stores: [])
      --> KSTREAM-MAPVALUES-0000000043
      <-- KSTREAM-FLATMAPVALUES-0000000018
    Processor: KSTREAM-FILTER-0000000030 (stores: [])
      --> KSTREAM-MAPVALUES-0000000031
      <-- KSTREAM-FLATMAPVALUES-0000000018
    Processor: KSTREAM-FILTER-0000000055 (stores: [])
      --> KSTREAM-MAPVALUES-0000000056
      <-- KSTREAM-FLATMAPVALUES-0000000018
    Processor: KSTREAM-MAPVALUES-0000000043 (stores: [])
      --> KSTREAM-FILTER-0000000044, KSTREAM-FILTER-0000000050
      <-- KSTREAM-FILTER-0000000042
    Processor: KSTREAM-MAPVALUES-0000000031 (stores: [])
      --> KSTREAM-FILTER-0000000032, KSTREAM-FILTER-0000000038
      <-- KSTREAM-FILTER-0000000030
    Processor: KSTREAM-MAPVALUES-0000000056 (stores: [])
      --> KSTREAM-FILTER-0000000063, KSTREAM-FILTER-0000000057
      <-- KSTREAM-FILTER-0000000055
    Processor: KSTREAM-FILTER-0000000024 (stores: [])
      --> KSTREAM-MAPVALUES-0000000025
      <-- KSTREAM-MAPVALUES-0000000017
    Processor: KSTREAM-FILTER-0000000032 (stores: [])
      --> KSTREAM-MAPVALUES-0000000033
      <-- KSTREAM-MAPVALUES-0000000031
    Processor: KSTREAM-FILTER-0000000044 (stores: [])
      --> KSTREAM-MAPVALUES-0000000045
      <-- KSTREAM-MAPVALUES-0000000043
    Processor: KSTREAM-FILTER-0000000057 (stores: [])
      --> KSTREAM-MAPVALUES-0000000058
      <-- KSTREAM-MAPVALUES-0000000056
    Processor: KSTREAM-FILTER-0000000010 (stores: [])
      --> KSTREAM-MAPVALUES-0000000011
      <-- KTABLE-TOSTREAM-0000000009
    Processor: KSTREAM-FILTER-0000000019 (stores: [])
      --> KSTREAM-MAPVALUES-0000000020
      <-- KSTREAM-MAPVALUES-0000000017
    Processor: KSTREAM-FILTER-0000000050 (stores: [])
      --> KSTREAM-MAPVALUES-0000000051
      <-- KSTREAM-MAPVALUES-0000000043
    Processor: KSTREAM-MAPVALUES-0000000025 (stores: [])
      --> KSTREAM-FILTER-0000000026
      <-- KSTREAM-FILTER-0000000024
    Processor: KSTREAM-MAPVALUES-0000000033 (stores: [])
      --> KSTREAM-MAPVALUES-0000000034
      <-- KSTREAM-FILTER-0000000032
    Processor: KSTREAM-MAPVALUES-0000000045 (stores: [])
      --> KSTREAM-MAPVALUES-0000000046
      <-- KSTREAM-FILTER-0000000044
    Processor: KSTREAM-MAPVALUES-0000000058 (stores: [])
      --> KSTREAM-MAPVALUES-0000000059
      <-- KSTREAM-FILTER-0000000057
    Processor: KSTREAM-FILTER-0000000026 (stores: [])
      --> KSTREAM-FILTER-0000000027
      <-- KSTREAM-MAPVALUES-0000000025
    Processor: KSTREAM-FILTER-0000000038 (stores: [])
      --> KSTREAM-MAPVALUES-0000000039
      <-- KSTREAM-MAPVALUES-0000000031
    Processor: KSTREAM-FILTER-0000000063 (stores: [])
      --> KSTREAM-MAPVALUES-0000000064
      <-- KSTREAM-MAPVALUES-0000000056
    Processor: KSTREAM-MAPVALUES-0000000011 (stores: [])
      --> KSTREAM-FILTER-0000000012
      <-- KSTREAM-FILTER-0000000010
    Processor: KSTREAM-MAPVALUES-0000000020 (stores: [])
      --> KSTREAM-FILTER-0000000021
      <-- KSTREAM-FILTER-0000000019
    Processor: KSTREAM-MAPVALUES-0000000034 (stores: [])
      --> KSTREAM-FILTER-0000000035
      <-- KSTREAM-MAPVALUES-0000000033
    Processor: KSTREAM-MAPVALUES-0000000046 (stores: [])
      --> KSTREAM-FILTER-0000000047
      <-- KSTREAM-MAPVALUES-0000000045
    Processor: KSTREAM-MAPVALUES-0000000051 (stores: [])
      --> KSTREAM-FILTER-0000000052
      <-- KSTREAM-FILTER-0000000050
    Processor: KSTREAM-MAPVALUES-0000000059 (stores: [])
      --> KSTREAM-FILTER-0000000060
      <-- KSTREAM-MAPVALUES-0000000058
    Processor: KSTREAM-MAPVALUES-0000000067 (stores: [])
      --> KSTREAM-FILTER-0000000068
      <-- KSTREAM-MAPVALUES-0000000017
    Processor: KSTREAM-FILTER-0000000012 (stores: [])
      --> KSTREAM-PEEK-0000000013
      <-- KSTREAM-MAPVALUES-0000000011
    Processor: KSTREAM-FILTER-0000000021 (stores: [])
      --> KSTREAM-PEEK-0000000022
      <-- KSTREAM-MAPVALUES-0000000020
    Processor: KSTREAM-FILTER-0000000027 (stores: [])
      --> KSTREAM-PEEK-0000000028
      <-- KSTREAM-FILTER-0000000026
    Processor: KSTREAM-FILTER-0000000035 (stores: [])
      --> KSTREAM-PEEK-0000000036
      <-- KSTREAM-MAPVALUES-0000000034
    Processor: KSTREAM-FILTER-0000000047 (stores: [])
      --> KSTREAM-PEEK-0000000048
      <-- KSTREAM-MAPVALUES-0000000046
    Processor: KSTREAM-FILTER-0000000052 (stores: [])
      --> KSTREAM-PEEK-0000000053
      <-- KSTREAM-MAPVALUES-0000000051
    Processor: KSTREAM-FILTER-0000000060 (stores: [])
      --> KSTREAM-PEEK-0000000061
      <-- KSTREAM-MAPVALUES-0000000059
    Processor: KSTREAM-FILTER-0000000068 (stores: [])
      --> KSTREAM-PEEK-0000000069
      <-- KSTREAM-MAPVALUES-0000000067
    Processor: KSTREAM-MAPVALUES-0000000039 (stores: [])
      --> KSTREAM-FILTER-0000000040
      <-- KSTREAM-FILTER-0000000038
    Processor: KSTREAM-MAPVALUES-0000000064 (stores: [])
      --> KSTREAM-TRANSFORM-0000000065
      <-- KSTREAM-FILTER-0000000063
    Processor: KSTREAM-FILTER-0000000040 (stores: [])
      --> KSTREAM-SINK-0000000041
      <-- KSTREAM-MAPVALUES-0000000039
    Processor: KSTREAM-PEEK-0000000013 (stores: [])
      --> KSTREAM-SINK-0000000014
      <-- KSTREAM-FILTER-0000000012
    Processor: KSTREAM-PEEK-0000000022 (stores: [])
      --> KSTREAM-SINK-0000000023
      <-- KSTREAM-FILTER-0000000021
    Processor: KSTREAM-PEEK-0000000028 (stores: [])
      --> KSTREAM-SINK-0000000029
      <-- KSTREAM-FILTER-0000000027
    Processor: KSTREAM-PEEK-0000000036 (stores: [])
      --> KSTREAM-SINK-0000000037
      <-- KSTREAM-FILTER-0000000035
    Processor: KSTREAM-PEEK-0000000048 (stores: [])
      --> KSTREAM-SINK-0000000049
      <-- KSTREAM-FILTER-0000000047
    Processor: KSTREAM-PEEK-0000000053 (stores: [])
      --> KSTREAM-SINK-0000000054
      <-- KSTREAM-FILTER-0000000052
    Processor: KSTREAM-PEEK-0000000061 (stores: [])
      --> KSTREAM-SINK-0000000062
      <-- KSTREAM-FILTER-0000000060
    Processor: KSTREAM-PEEK-0000000069 (stores: [])
      --> KSTREAM-SINK-0000000070
      <-- KSTREAM-FILTER-0000000068
    Processor: KSTREAM-TRANSFORM-0000000065 (stores: [workertask_deduplication])
      --> KSTREAM-SINK-0000000066
      <-- KSTREAM-MAPVALUES-0000000064
    Processor: KTABLE-TOSTREAM-0000000002 (stores: [])
      --> log-executionStream
      <-- KTABLE-SOURCE-0000000001
    Sink: KSTREAM-SINK-0000000014 (topic: kestra_execution)
      <-- KSTREAM-PEEK-0000000013
    Sink: KSTREAM-SINK-0000000023 (topic: kestra_execution)
      <-- KSTREAM-PEEK-0000000022
    Sink: KSTREAM-SINK-0000000029 (topic: kestra_execution)
      <-- KSTREAM-PEEK-0000000028
    Sink: KSTREAM-SINK-0000000037 (topic: kestra_execution)
      <-- KSTREAM-PEEK-0000000036
    Sink: KSTREAM-SINK-0000000041 (topic: kestra_workertaskresult)
      <-- KSTREAM-FILTER-0000000040
    Sink: KSTREAM-SINK-0000000049 (topic: kestra_execution)
      <-- KSTREAM-PEEK-0000000048
    Sink: KSTREAM-SINK-0000000054 (topic: kestra_execution)
      <-- KSTREAM-PEEK-0000000053
    Sink: KSTREAM-SINK-0000000062 (topic: kestra_execution)
      <-- KSTREAM-PEEK-0000000061
    Sink: KSTREAM-SINK-0000000066 (topic: kestra_workertask)
      <-- KSTREAM-TRANSFORM-0000000065
    Sink: KSTREAM-SINK-0000000070 (topic: kestra_execution)
      <-- KSTREAM-PEEK-0000000069
    Processor: log-executionStream (stores: [])
      --> none
      <-- KTABLE-TOSTREAM-0000000002

目前,我还不清楚该解决方案是否对任何并发都具有弹性,以及我是否可以在另一次发生故障时(这意味着执行在前一次回滚,并导致同一任务的多次执行)。

qni6mghb

qni6mghb1#

Kafka团队是否支持这种模式(这不是我们在同一主题上的dag流)?
一般来说是的。您只需要确保不会以“无限循环”结束,也就是说,在某个点上,输入记录应该“终止”,不再向输出主题生成任何内容。为了你的案子,还有 Execution 最终不会产生新的 Tasks (通过反馈回路)。
将此流设计为并发安全的好方法是什么
它总是取决于具体的应用。。。对于您的情况,如果我正确理解了应用程序的设计,您基本上有两个输入主题( Execution 以及 WorkerTaskResult )和两个输出主题( Execution 以及 WorkerTask ). 当处理输入主题时,来自每个输入的消息可以修改共享状态(即,任务的状态)。
此外,还有一个“外部应用程序”从 WorkerTask 主题并写信给 WorkerTaskResult 主题?因此,在整个数据流中实际上还有第二个循环?我假设还有其他上游应用程序会将新数据推送到 Execution 主题也是?

+-----------------+
                             |                 |
                             v                 |
upstream producers ---> "Execution" --+        |
                                      |        |
                                      v        |  
                                      KS-App --+
                                      ^        |
                                      |        |
            +--> "WorkerTaskResult" --+        +--> "WorkerTask" --+
            |                                                      |
            +------------------------ outside app <----------------+

我不清楚的是:
哪些状态更改从ks应用程序直接传播回 Execution ?
哪些状态更改是通过“外部应用程序”传播的 WorkerTaskResult ?
也许你可以更新你的问题,我可以尝试更新我的答案相应。
更新(基于编辑1和2)
to execution&to workertask主题(如果找到下一个任务)
这一步似乎引入了竞赛条件?回信给 Execution 主题,则在读回状态时更新状态。同时,任务的执行可能首先完成(即,在 Execution 更新被重新读取和处理),因此 Execution 更新(任务完成时)我们是否可以先编写以更新状态?
在这个实际版本中,我真正不清楚的是在现实世界中检测到的无序ktable更新的含义是什么?这是否意味着一个ktable必须在每个分区和每个键上有一个生产者,才能保持主题的顺序?
你可以这么说。对于每个输入记录 table() 运算符将输入的时间戳与表中当前项的时间戳进行比较。如果输入记录的时间戳较小,则会记录警告(仍将应用更新):发出警告的原因是,表中每个键只存储一个条目,表只希望在时间上向前移动。如果有无序更新,这可能会导致意外结果,从而导致警告日志。每个分区使用一个生产者,或者每个键使用一个生产者,可以避免每个键的数据顺序错误(假设生产者只发送顺序数据)。
我不是100%肯定如果我完全了解你的应用程序的新版本,但总的来说,你要确保避免数据竞争,并线性化的更新到你的应用程序 Execution .

相关问题