clojure-core.async接口

wfsdck30  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(339)

我正在使用 clj-kafka ,我正在努力 core.async 它在repl中的接口。
我收到了一些消息,但我的结构感觉不对:我要么无法停止接收消息,要么必须启动 go 再次执行例程以接收更多消息。
以下是我的尝试:

(defn consume [topic]
  (let [consume-chan (chan)]
    (with-resource [c (consumer config)]
      shutdown
      (go (doseq [m (messages c "test")]
                   (>! chan message) ;; should I check the return value?
                   )))
    consume-chan)) ;; is it the right place to return a channel ?

  (def consume-chan (consume "test"))
  ;;(close! consume-chan)

  (go (>! consume-chan "hi")) ;; manual test, but I have some messages in Kafka already

  (def cons-ch (go
                 (with-resource [c (consumer config)]
                   shutdown
                   (doseq [m (messages c "test")]
                     (>! consume-chan m)))))  ;; should I check something here ?

  ;;(close! cons-ch)

  (def go-ch
    (go-loop []
      (if-let [km (<! consume-chan)]
        (do  (println "Got a value in this loop:" km)
              (recur))
        (do (println "Stop recurring - channel closed")))))

  ;;(close! go-ch)

如何使用 core.async 接口?

wbrvyc0a

wbrvyc0a1#

我会这么做: >! 以及 <! 如果通道已关闭,则返回nil,因此请确保发生这种情况时循环退出—这样您就可以通过关闭通道从外部轻松结束循环。
使用try/catch检查go块中的异常,并将任何异常作为返回值,这样它们就不会丢失。
检查读取值是否存在异常,以捕获通道内的任何内容。
go块返回一个通道,块内代码的返回值(如上面的异常)将放在通道上。检查这些通道是否存在异常,可能是为了重新触发。
您现在可以这样写入通道:

(defn write-seq-to-channel
  [channel
   values-seq]
  (a/go
    (try
      (loop [values values-seq]
        (when (seq values)
          (when (a/>! channel (first values))
            (recur (rest values)))))
      (catch Throwable e
        e))))

你读的是这样的:

(defn read-from-channel-and-print
  [channel]
  (a/go
    (try
      (loop []
        (let [value (a/<! channel)]
          (when value
            (when (instance? Throwable value)
              (throw value))
            (println "Value read:" value)
            (recur))))
      (catch Throwable e
        e))))

现在您将有两个频道,所以使用类似 alts! 或者 alts!! 检查循环是否退出。完成后关闭频道。

相关问题