我有一个简单的代码
var count = event_stream
.groupBy("value").count()
event_stream.join(count,"value").printSchema() //get error on this line
事件流和计数模式如下
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
root
|-- value: binary (nullable = true)
|-- count: long (nullable = false)
两个问题
为什么会出现此错误以及如何修复?
为什么groupby.count会删除所有其他列?
误差如下
Exception in thread "main" org.apache.spark.sql.AnalysisException:
Failure when resolving conflicting references in Join:
'Join Inner
:- AnalysisBarrier
: +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@7f2c57fe, kafka, Map(startingOffsets -> latest, failOnDataLoss -> false, subscribe -> events-identification-carrier, kafka.bootstrap.servers -> svc-kafka-pre-c1-01.jamba.net:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@3dbd7107,kafka,List(),None,List(),None,Map(startingOffsets -> latest, failOnDataLoss -> false, subscribe -> events-identification-carrier, kafka.bootstrap.servers -> svc-kafka-pre-c1-01.jamba.net:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
+- AnalysisBarrier
+- Aggregate [value#8], [value#8, count(1) AS count#46L]
+- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@7f2c57fe, kafka, Map(startingOffsets -> latest, failOnDataLoss -> false, subscribe -> events-identification-carrier, kafka.bootstrap.servers -> svc-kafka-pre-c1-01.jamba.net:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@3dbd7107,kafka,List(),None,List(),None,Map(startingOffsets -> latest, failOnDataLoss -> false, subscribe -> events-identification-carrier, kafka.bootstrap.servers -> svc-kafka-pre-c1-01.jamba.net:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
Conflicting attributes: value#8
编辑:是的!更改列的名称是有效的。但是现在,如果我使用join,我必须使用outputmode.append,为此,我需要向流中添加水印。
我想要的是提取resultingdf中的count和topic(从上面打印的模式中),并将其写入某个sink。
两个问题
有没有其他更好的方法?
我是否可以执行多个agg,比如count(),然后再添加另一个字符串类型的列,即topic是这种情况?
2条答案
按热度按时间2fjabf4q1#
错误原因是用于联接的列名。你可以像这样操作。
niwlg2el2#
为什么会出现此错误以及如何修复?
我认为您得到了这个错误,因为最终连接的模式包含两个值字段,连接的每一侧一个。要解决此问题,请重命名两个连接流之一的“值”字段,如下所示:
为什么groupby.count会删除所有其他列?
groupBy
操作基本上是将字段划分为两个列表。要用作键的字段列表和要聚合的字段列表。关键字段只会在最终结果中显示,但任何不在列表中的字段都需要定义聚合操作才能在结果中显示。否则spark无法知道如何组合该字段的多个值!你想数一数吗?你想要最大值吗?是否要查看所有不同的值?要指定如何汇总字段,可以在.agg(..)调用中定义它。例子: