我从官方文件中了解到,当使用同一个消费群体时,2个消费者不能阅读同一条信息。如果仍要处理同一条消息,则必须使用不同的使用者组。
我试着用2个简单的spark结构化流媒体应用程序来测试这是否正确。它只是从集线器读取所有消息并将它们写入内存中的一个表中。以下是我的两个应用程序代码:
ehConf = {
'eventhubs.connectionString' : connectionString,
'eventhubs.consumerGroup': "group1"
}
df = spark.readStream.format("eventhubs").options(**ehConf).load()
df.writeStream.format("memory").queryName("sink").outputMode("append").start()
然后我试着用这个向中心发送一些消息
dfnum = spark.range(1, 8)
dfnum = dfnum.select(col('id').cast('string')).withColumnRenamed('id', 'body')
# dfnum.show()
ehWriteConf = {
'eventhubs.connectionString' : connectionString
}
ds = dfnum.select("body").write.format("eventhubs").options(**ehWriteConf).option("checkpointLocation", "tmp/checkpoint").save()
问题是,所有spark结构化流媒体应用程序仍然可以读取eventhub的所有消息,这与官方文件所说的不同。
以下是Spark应用1的结果:
+----+------+---------+--------------------+---------+------------+
|body|offset|partition| enqueuedTime|publisher|partitionKey|
+----+------+---------+--------------------+---------+------------+
| 4| 1248| 0|2020-05-31 14:00:...| null| null|
| 1| 1296| 0|2020-05-31 14:00:...| null| null|
| 3| 1152| 1|2020-05-31 14:00:...| null| null|
| 2| 1200| 1|2020-05-31 14:00:...| null| null|
| 5| 1344| 0|2020-05-31 14:01:...| null| null|
| 6| 1248| 1|2020-05-31 14:01:...| null| null|
| 7| 1296| 1|2020-05-31 14:01:...| null| null|
+----+------+---------+--------------------+---------+------------+
以下是Spark应用2的结果:
+----+------+---------+--------------------+---------+------------+
|body|offset|partition| enqueuedTime|publisher|partitionKey|
+----+------+---------+--------------------+---------+------------+
| 4| 1248| 0|2020-05-31 14:00:...| null| null|
| 3| 1152| 1|2020-05-31 14:00:...| null| null|
| 1| 1296| 0|2020-05-31 14:00:...| null| null|
| 2| 1200| 1|2020-05-31 14:00:...| null| null|
| 5| 1344| 0|2020-05-31 14:01:...| null| null|
| 6| 1248| 1|2020-05-31 14:01:...| null| null|
| 7| 1296| 1|2020-05-31 14:01:...| null| null|
+----+------+---------+--------------------+---------+------------+
顺序有点不同,但内容都一样。
我误解了什么吗?
暂无答案!
目前还没有任何答案,快来回答吧!