我有一个storm拓扑(1 worker)设置,其中spout(在java中)从redis中退出(使用blpop)事件并传输到Bolt。但有一个观察结果是,当有超过200万的队列并且在storm nimbus/主管/Zookeeper/工作人员日志中未发现警告/异常时,bolt未收到一些事件(在clojure中,6个喷口螺纹,50个螺栓螺纹)。
在本地,此场景不使用虚拟数据进行复制。集群中没有网络延迟/数据包丢失。平均处理延迟是100毫秒。如何找到原因来解决它的生产。
(ns event-processor
(:import [backtype.storm StormSubmitter LocalCluster]
java.util.UUID
storm_jedis.RedisQueueSpout
)
(:use [backtype.storm clojure config])
(:require [clojure.tools.logging :as log])
(:require [clj-redis.client :as redis])
(:import (redis.clients.jedis Jedis JedisPool JedisPoolConfig))
(:gen-class))
(defmacro process-event [tuple]
(log/info "processing event")
)
(defbolt execute-ls-closure ["word"] {:prepare true}
[conf context collector]
(let [counts (atom {})]
(bolt
(execute [tuple]
(let [
timestart (. System currentTimeMillis)
tuple-message (.get (get tuple "message") 0)
string-to-emit (process-event tuple)
]
(emit-bolt! collector [string-to-emit] :anchor tuple)
(ack! collector tuple)
)))))
(defn mk-topology []
(topology
;{"1" (spout-spec sentence-spout)
{"1" (spout-spec redis-spout :p 6)
}
{"3" (bolt-spec {"1" :shuffle }
execute-ls-closure
:p 50)
}))
(defn run-local! []
(let [cluster (LocalCluster.)]
(.submitTopology cluster "word-count" {TOPOLOGY-DEBUG true} (mk-topology))
(Thread/sleep 10000)
(.shutdown cluster)
))
(defn submit-topology! [name]
(StormSubmitter/submitTopology
name
{TOPOLOGY-DEBUG true
TOPOLOGY-WORKERS 1}
(mk-topology)))
(defn -main
([]
(run-local!))
([name]
(submit-topology! name)))
1条答案
按热度按时间bybem2ql1#
如果它不会太慢您的拓扑结构,您可以使用启用调试日志记录
Config.setDebug(true)
https://github.com/apache/storm/blob/f2ced23fa4e3f699558663baef4ee582ee148fa2/storm-client/src/jvm/org/apache/storm/config.java#l1763.否则,我会尝试在bolt中添加一些调试日志记录,并为redis喷口启用日志记录,以确定元组是在风暴中丢失还是在redis集成中丢失。
我还注意到你使用的是旧的风暴版本。你可以尝试升级。