这就是我的storm ui数据。
问题是我不知道这些数字(发出的元组的数量)来自哪里。
我的拓扑结构非常简单:kafkaspout->bolt(将数据持久化到hbase中)
拓扑工作-当我将数据放入kafka主题时,我用bolt处理它们并将其持久化到hbase中,然后用hbase shell中的scan操作符进行验证(这样就插入了新的记录)
但是,每次我将新消息提交到kafka中,并且bolt将其持久化时,我的拓扑结构不会增加“1”发出的消息数。
我会定期把所有的数字增加20个,而不会向Kafka发送任何新的信息。i、 我的Kafka主题在几个小时内没有收到任何消息,但随着时间的推移,发出的元组数总是以20个为单位增加。我在hbase中仍然得到相同数量的记录。
我在apachestorm日志中没有任何异常/错误。
我没有在bolt实现中执行ack()或fail()任何元组(这是basicbolt类型自动执行ack)
我在bolt度量中的容量或延迟总是保持为零,即使我在kafka中加载了很多消息
我的Kafka偏移日志($kafka/kafka-run-class.sh kafka.tools.consumeroffsetchecker)显示所有消息都已处理,并且给定主题/组的Kafka延迟为0。
所以我的问题是:
随着时间的推移,喷口和螺栓中的“发射”元组会增加20秒,这些“隐形”元组是什么?
是否可以在storm ui中启用“调试”以查看这些元组是什么?
为什么bolt度量中的容量/延迟总是为零,而bolt被确认可以持久化数据?
环境详细信息
我使用的是Java8+ApacheStorm1.0.3
[devops@storm-wk1-prod]~/storm/supervisor/stormdist% storm version
Running: /usr/lib/jvm/jre-1.8.0-openjdk/bin/java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/opt/apache-storm-1.0.3 -Dstorm.log.dir=/opt/apache-storm-1.0.3/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /opt/apache-storm-1.0.3/lib/storm-core-1.0.3.jar:/opt/apache-storm-1.0.3/lib/kryo-3.0.3.jar:/opt/apache-storm-1.0.3/lib/reflectasm-1.10.1.jar:/opt/apache-storm-1.0.3/lib/asm-5.0.3.jar:/opt/apache-storm-1.0.3/lib/minlog-1.3.0.jar:/opt/apache-storm-1.0.3/lib/objenesis-2.1.jar:/opt/apache-storm-1.0.3/lib/clojure-1.7.0.jar:/opt/apache-storm-1.0.3/lib/disruptor-3.3.2.jar:/opt/apache-storm-1.0.3/lib/log4j-api-2.1.jar:/opt/apache-storm-1.0.3/lib/log4j-core-2.1.jar:/opt/apache-storm-1.0.3/lib/log4j-slf4j-impl-2.1.jar:/opt/apache-storm-1.0.3/lib/slf4j-api-1.7.7.jar:/opt/apache-storm-1.0.3/lib/log4j-over-slf4j-1.6.6.jar:/opt/apache-storm-1.0.3/lib/servlet-api-2.5.jar:/opt/apache-storm-1.0.3/lib/storm-rename-hack-1.0.3.jar:/opt/storm/conf org.apache.storm.utils.VersionInfo
Storm 1.0.3
URL https://git-wip-us.apache.org/repos/asf/storm.git -r eac433b0beb3798c4723deb39b3c4fad446378f4
Branch (no branch)
Compiled by ptgoetz on 2017-02-07T20:22Z
From source with checksum c78e52de4b8a22d99551d45dfe9c1a4b
我的风暴.yaml:
我用storm supervisor运行了2个示例,每个示例都有以下配置:
storm.zookeeper.servers:
- "10.138.0.8"
- "10.138.0.9"
- "10.138.0.16"
storm.zookeeper.port: 2181
nimbus.seeds: ["10.138.0.10"]
storm.local.dir: "/var/log/storm"
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
worker.childopts: "-Xmx768m"
nimbus.childopts: "-Xmx512m"
supervisor.childopts: "-Xmx256m"
拓扑.yaml
nimbus.host: "10.138.0.10"
# In Storm 0.7.x, this is necessary in order to give workers time to
# initialize. In Storm 0.8.0 and later, it may not be necessary because Storm
# has added a separate, longer timeout for the initial launch of a worker.
supervisor.worker.timeout.secs: 60
topology.workers: 1
拓扑学
import tbolts
import tspouts
def create(builder):
"""Create toplogy through Petrel library
"""
# spout getting data from kafka instance
# we run 2 tasks of kafka spout
builder.setSpout("kafka",
tspouts.KafkaSpout(), 2)
# persistence bolt
# we run 4 tasks of persistence bolt
builder.setBolt("persistence",
tbolts.PersistenceBolt(), 4).shuffleGrouping("kafka")
1条答案
按热度按时间xqnpmsa81#
emit计数增加20的原因是因为storm每20个tuple buy default只采样一次,以更新其度量。此采样率由
topology.stats.sample.rate
配置变量,并且可以根据拓扑进行更改。所以你可以把这个设定为1.0
(是的0.05
默认情况下),您将获得准确的发射计数,但是这将引入大量的处理开销,并可能导致acker和/或metrics使用者示例过载。小心使用。