我使用nats-spark-connector(https://github.com/nats-io/nats-spark-connector/tree/main/load_balanced)连接到NATS Jetstream并使用Spark Java代码消费消息和流程。
private static void sparkNatsTester() {
SparkSession spark = SparkSession.builder()
.appName("spark-with-nats")
.master("local")
// .config("spark.logConf", "false")
.config("spark.jars",
"libs/nats-spark-connector-balanced_2.12-1.1.4.jar,"+"libs/jnats-2.17.1.jar"
)
// .config("spark.executor.instances", "2")
// .config("spark.cores.max", "4")
// .config("spark.executor.memory", "2g")
.getOrCreate();
System.out.println("sparkSession : "+ spark);
Dataset<Row> df = spark.readStream()
.format("nats")
.option("nats.host", "localhost")
.option("nats.port", 4222)
.option("nats.stream.name", "my_stream3")
.option("nats.stream.subjects", "mysub3")
// wait 90 seconds for an ack before resending a message
.option("nats.msg.ack.wait.secs", 90)
//.option("nats.num.listeners", 2)
// Each listener will fetch 10 messages at a time
// .option("nats.msg.fetch.batch.size", 10)
.load();
System.out.println("Successfully read nats stream !");
StreamingQuery query;
try {
query = df.writeStream()
.outputMode("append")
.format("console")
.option("truncate", false)
.start();
query.awaitTermination();
} catch (Exception e) {
e.printStackTrace();
}
}
字符串
根据nats-io jetstream指南(https://docs.nats.io/nats-concepts/jetstream/js_walkthrough),使用以下命令将消息发布到流(主题名称:mysub 3)
nats pub foo --count=1000 --sleep 1s "publication #{{Count}} @ {{TimeStamp}}"
型
在将消息发布到nats流之后,代码的输出是:
Successfully read nats stream !
Status change nats: connection opened
-------------------------------------------
Batch: 0
-------------------------------------------
+-------+--------+-------+
|subject|dateTime|content|
+-------+--------+-------+
+-------+--------+-------+
-------------------------------------------
Batch: 1
-------------------------------------------
+-------+--------+-------+
|subject|dateTime|content|
+-------+--------+-------+
+-------+--------+-------+
-------------------------------------------
Batch: 2
-------------------------------------------
+-------+---------------------------+------------------------------------------+
|subject|dateTime |content |
+-------+---------------------------+------------------------------------------+
|mysub3 |10/31/2023 - 18:04:31 +0530|publication #1 @ 2023-10-31T18:04:31+05:30|
+-------+---------------------------+------------------------------------------+
-------------------------------------------
Batch: 3
-------------------------------------------
+-------+---------------------------+------------------------------------------+
|subject|dateTime |content |
+-------+---------------------------+------------------------------------------+
|mysub3 |10/31/2023 - 18:04:31 +0530|publication #1 @ 2023-10-31T18:04:31+05:30|
|mysub3 |10/31/2023 - 18:04:32 +0530|publication #2 @ 2023-10-31T18:04:32+05:30|
|mysub3 |10/31/2023 - 18:04:33 +0530|publication #3 @ 2023-10-31T18:04:33+05:30|
+-------+---------------------------+------------------------------------------+
-------------------------------------------
Batch: 4
-------------------------------------------
+-------+---------------------------+------------------------------------------+
|subject|dateTime |content |
+-------+---------------------------+------------------------------------------+
|mysub3 |10/31/2023 - 18:04:31 +0530|publication #1 @ 2023-10-31T18:04:31+05:30|
|mysub3 |10/31/2023 - 18:04:32 +0530|publication #2 @ 2023-10-31T18:04:32+05:30|
|mysub3 |10/31/2023 - 18:04:33 +0530|publication #3 @ 2023-10-31T18:04:33+05:30|
|mysub3 |10/31/2023 - 18:04:34 +0530|publication #4 @ 2023-10-31T18:04:34+05:30|
+-------+---------------------------+------------------------------------------+
-------------------------------------------
Batch: 5
-------------------------------------------
+-------+---------------------------+------------------------------------------+
|subject|dateTime |content |
+-------+---------------------------+------------------------------------------+
|mysub3 |10/31/2023 - 18:04:31 +0530|publication #1 @ 2023-10-31T18:04:31+05:30|
|mysub3 |10/31/2023 - 18:04:32 +0530|publication #2 @ 2023-10-31T18:04:32+05:30|
|mysub3 |10/31/2023 - 18:04:33 +0530|publication #3 @ 2023-10-31T18:04:33+05:30|
|mysub3 |10/31/2023 - 18:04:34 +0530|publication #4 @ 2023-10-31T18:04:34+05:30|
|mysub3 |10/31/2023 - 18:04:35 +0530|publication #5 @ 2023-10-31T18:04:35+05:30|
+-------+---------------------------+------------------------------------------+
-------------------------------------------
Batch: 6
-------------------------------------------
+-------+---------------------------+------------------------------------------+
|subject|dateTime |content |
+-------+---------------------------+------------------------------------------+
|mysub3 |10/31/2023 - 18:04:31 +0530|publication #1 @ 2023-10-31T18:04:31+05:30|
|mysub3 |10/31/2023 - 18:04:32 +0530|publication #2 @ 2023-10-31T18:04:32+05:30|
|mysub3 |10/31/2023 - 18:04:33 +0530|publication #3 @ 2023-10-31T18:04:33+05:30|
|mysub3 |10/31/2023 - 18:04:34 +0530|publication #4 @ 2023-10-31T18:04:34+05:30|
|mysub3 |10/31/2023 - 18:04:35 +0530|publication #5 @ 2023-10-31T18:04:35+05:30|
|mysub3 |10/31/2023 - 18:04:36 +0530|publication #6 @ 2023-10-31T18:04:36+05:30|
|mysub3 |10/31/2023 - 18:04:37 +0530|publication #7 @ 2023-10-31T18:04:37+05:30|
+-------+---------------------------+------------------------------------------+
型
它继续为每个批次打印相同的消息集。由于我使用outputMode("append")
,我希望每个批次只打印新发布的消息。但所有消息,包括在前一批次打印的消息,都包含在后续批次中。我也尝试了outputMode("update")
。它给出了与append相同的输出。你能帮助我如何确保每个批次都有新的消息吗?批处理只打印/接收新推送的消息?
1条答案
按热度按时间wmtdaxz31#
找到了这个场景的解决方案!消息在每一批中都重复,因为在消费它之后没有向NATS确认消息。必须使用持久消费者并将消费者名称作为spark代码中的选项传递。
创建持久消费者的说明:https://docs.nats.io/nats-concepts/jetstream/js_walkthrough#3.-creating-a-consumer
耐用消费品相关信息:https://github.com/nats-io/nats-spark-connector/tree/main/load_balanced
通过使用键“nats.durable.name"将此消费者名称作为选项传递:
字符串
这将确保只有最后一条确认消息之后的消息才会被传递到客户端。