在clojure中使用Kafka迭代的正确方法是什么?

kpbwa7wx  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(420)

下面的代码只是阻止执行。似乎由于某种原因,map无法返回延迟序列。我不知道为什么。

(ns testt.consumer
  (:require
    ;internal
    ;external
    [clojure.walk   :refer [stringify-keys]]
    [clojure.pprint :as pprint])
  (:import
    [kafka.consumer         ConsumerConfig Consumer KafkaStream ]
    [kafka.javaapi.consumer ConsumerConnector                   ]
    [kafka.message          MessageAndMetadata                  ]
    [java.util              Properties                          ])
  (:gen-class))

; internal

(defn hashmap-to-properties
  [h]
  (doto (Properties.)
    (.putAll (stringify-keys h))))

; external

(defn consumer-connector
  [h]
  (let [config (ConsumerConfig. (hashmap-to-properties h))]
    (Consumer/createJavaConsumerConnector config)))

(defn message-stream
  [^ConsumerConnector consumer topic thread-pool-size]
  ;this is dealing only with the first stream, needs to be fixed to support multiple streams
  (let [ stream (first (.get (.createMessageStreams consumer {topic thread-pool-size}) topic)) ]
    (map #(.message %) (iterate (.next (.iterator ^KafkaStream stream))))))

连接器期望的配置:

{  :zookeeper.connect              "10.0.0.1:2181"
   :group.id                       "test-0"
   :thread.pool.size               "1"
   :topic                          "test_topic"
   :zookeeper.session.timeout.ms   "1000"
   :zookeeper.sync.time.ms         "200"
   :auto.commit.interval.ms        "1000"
   :auto.offset.reset              "smallest"
   :auto.commit.enable             "true" }
t3psigkw

t3psigkw1#

实际上,不需要kafka的iterate,因为clojure提供了更好的处理流的方法。您可以将Kafka流看作一个序列,只需执行以下操作:

(doseq 
 [^kafka.message.MessageAndMetadata message stream] (do-some-stuff message))

这可能是最有效的方法。
更多代码如下:
https://github.com/l1x/shovel

相关问题