奇怪的“发射”数字行为/拓扑统计中的零统计数字(storm 1.0.3)

30byixjq  于 2023-06-21  发布在  Storm
关注(0)|答案(1)|浏览(332)

这就是我的storm ui数据。
问题是我不知道这些数字(发出的元组的数量)来自哪里。
我的拓扑结构非常简单:kafkaspout->bolt(将数据持久化到hbase中)
拓扑工作-当我将数据放入kafka主题时,我用bolt处理它们并将其持久化到hbase中,然后用hbase shell中的scan操作符进行验证(这样就插入了新的记录)
但是,每次我将新消息提交到kafka中,并且bolt将其持久化时,我的拓扑结构不会增加“1”发出的消息数。
我会定期把所有的数字增加20个,而不会向Kafka发送任何新的信息。i、 我的Kafka主题在几个小时内没有收到任何消息,但随着时间的推移,发出的元组数总是以20个为单位增加。我在hbase中仍然得到相同数量的记录。
我在apachestorm日志中没有任何异常/错误。
我没有在bolt实现中执行ack()或fail()任何元组(这是basicbolt类型自动执行ack)
我在bolt度量中的容量或延迟总是保持为零,即使我在kafka中加载了很多消息
我的Kafka偏移日志($kafka/kafka-run-class.sh kafka.tools.consumeroffsetchecker)显示所有消息都已处理,并且给定主题/组的Kafka延迟为0。

所以我的问题是:

随着时间的推移,喷口和螺栓中的“发射”元组会增加20秒,这些“隐形”元组是什么?
是否可以在storm ui中启用“调试”以查看这些元组是什么?
为什么bolt度量中的容量/延迟总是为零,而bolt被确认可以持久化数据?

环境详细信息

我使用的是Java8+ApacheStorm1.0.3

[devops@storm-wk1-prod]~/storm/supervisor/stormdist% storm version
Running: /usr/lib/jvm/jre-1.8.0-openjdk/bin/java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/opt/apache-storm-1.0.3 -Dstorm.log.dir=/opt/apache-storm-1.0.3/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /opt/apache-storm-1.0.3/lib/storm-core-1.0.3.jar:/opt/apache-storm-1.0.3/lib/kryo-3.0.3.jar:/opt/apache-storm-1.0.3/lib/reflectasm-1.10.1.jar:/opt/apache-storm-1.0.3/lib/asm-5.0.3.jar:/opt/apache-storm-1.0.3/lib/minlog-1.3.0.jar:/opt/apache-storm-1.0.3/lib/objenesis-2.1.jar:/opt/apache-storm-1.0.3/lib/clojure-1.7.0.jar:/opt/apache-storm-1.0.3/lib/disruptor-3.3.2.jar:/opt/apache-storm-1.0.3/lib/log4j-api-2.1.jar:/opt/apache-storm-1.0.3/lib/log4j-core-2.1.jar:/opt/apache-storm-1.0.3/lib/log4j-slf4j-impl-2.1.jar:/opt/apache-storm-1.0.3/lib/slf4j-api-1.7.7.jar:/opt/apache-storm-1.0.3/lib/log4j-over-slf4j-1.6.6.jar:/opt/apache-storm-1.0.3/lib/servlet-api-2.5.jar:/opt/apache-storm-1.0.3/lib/storm-rename-hack-1.0.3.jar:/opt/storm/conf org.apache.storm.utils.VersionInfo
Storm 1.0.3
URL https://git-wip-us.apache.org/repos/asf/storm.git -r eac433b0beb3798c4723deb39b3c4fad446378f4
Branch (no branch)
Compiled by ptgoetz on 2017-02-07T20:22Z
From source with checksum c78e52de4b8a22d99551d45dfe9c1a4b

我的风暴.yaml:
我用storm supervisor运行了2个示例,每个示例都有以下配置:

storm.zookeeper.servers:
  - "10.138.0.8"
  - "10.138.0.9"
  - "10.138.0.16"

storm.zookeeper.port: 2181

nimbus.seeds: ["10.138.0.10"]

storm.local.dir: "/var/log/storm"
supervisor.slots.ports:
  - 6700
  - 6701
  - 6702
  - 6703

worker.childopts: "-Xmx768m"
nimbus.childopts: "-Xmx512m"
supervisor.childopts: "-Xmx256m"

拓扑.yaml

nimbus.host: "10.138.0.10"

# In Storm 0.7.x, this is necessary in order to give workers time to

# initialize. In Storm 0.8.0 and later, it may not be necessary because Storm

# has added a separate, longer timeout for the initial launch of a worker.

supervisor.worker.timeout.secs: 60

topology.workers: 1

拓扑学

import tbolts
import tspouts

def create(builder):
    """Create toplogy through Petrel library
    """
    # spout getting data from kafka instance
    # we run 2 tasks of kafka spout
    builder.setSpout("kafka",
                     tspouts.KafkaSpout(), 2)

    # persistence bolt
    # we run 4 tasks of persistence bolt
    builder.setBolt("persistence",
                    tbolts.PersistenceBolt(), 4).shuffleGrouping("kafka")
xqnpmsa8

xqnpmsa81#

emit计数增加20的原因是因为storm每20个tuple buy default只采样一次,以更新其度量。此采样率由 topology.stats.sample.rate 配置变量,并且可以根据拓扑进行更改。所以你可以把这个设定为 1.0 (是的 0.05 默认情况下),您将获得准确的发射计数,但是这将引入大量的处理开销,并可能导致acker和/或metrics使用者示例过载。小心使用。

相关问题