analysisexception:解析join中的冲突引用时失败:“join inner”

oknrviil  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(446)

我有一个简单的代码

var count = event_stream
      .groupBy("value").count()

event_stream.join(count,"value").printSchema() //get error on this line

事件流和计数模式如下

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

root
 |-- value: binary (nullable = true)
 |-- count: long (nullable = false)

两个问题
为什么会出现此错误以及如何修复?
为什么groupby.count会删除所有其他列?
误差如下

Exception in thread "main" org.apache.spark.sql.AnalysisException: 
Failure when resolving conflicting references in Join:
'Join Inner
:- AnalysisBarrier
:     +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@7f2c57fe, kafka, Map(startingOffsets -> latest, failOnDataLoss -> false, subscribe -> events-identification-carrier, kafka.bootstrap.servers -> svc-kafka-pre-c1-01.jamba.net:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@3dbd7107,kafka,List(),None,List(),None,Map(startingOffsets -> latest, failOnDataLoss -> false, subscribe -> events-identification-carrier, kafka.bootstrap.servers -> svc-kafka-pre-c1-01.jamba.net:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
+- AnalysisBarrier
      +- Aggregate [value#8], [value#8, count(1) AS count#46L]
         +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@7f2c57fe, kafka, Map(startingOffsets -> latest, failOnDataLoss -> false, subscribe -> events-identification-carrier, kafka.bootstrap.servers -> svc-kafka-pre-c1-01.jamba.net:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@3dbd7107,kafka,List(),None,List(),None,Map(startingOffsets -> latest, failOnDataLoss -> false, subscribe -> events-identification-carrier, kafka.bootstrap.servers -> svc-kafka-pre-c1-01.jamba.net:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]

Conflicting attributes: value#8

编辑:是的!更改列的名称是有效的。但是现在,如果我使用join,我必须使用outputmode.append,为此,我需要向流中添加水印。
我想要的是提取resultingdf中的count和topic(从上面打印的模式中),并将其写入某个sink。
两个问题
有没有其他更好的方法?
我是否可以执行多个agg,比如count(),然后再添加另一个字符串类型的列,即topic是这种情况?

2fjabf4q

2fjabf4q1#

错误原因是用于联接的列名。你可以像这样操作。

var count = event_stream
      .groupBy("value").count()

event_stream.join(count,Seq("value"))
niwlg2el

niwlg2el2#

为什么会出现此错误以及如何修复?
我认为您得到了这个错误,因为最终连接的模式包含两个值字段,连接的每一侧一个。要解决此问题,请重命名两个连接流之一的“值”字段,如下所示:

var count = event_stream.
    groupBy("value").count().
    withColumnRenamed("value", "join_id")

event_stream.join(count, $"value" === $"join_id").
    drop("join_id").  
    printSchema()

为什么groupby.count会删除所有其他列? groupBy 操作基本上是将字段划分为两个列表。要用作键的字段列表和要聚合的字段列表。关键字段只会在最终结果中显示,但任何不在列表中的字段都需要定义聚合操作才能在结果中显示。否则spark无法知道如何组合该字段的多个值!你想数一数吗?你想要最大值吗?是否要查看所有不同的值?要指定如何汇总字段,可以在.agg(..)调用中定义它。
例子:

val input = Seq(
    (1, "Bob", 4),
    (1, "John", 5)
).toDF("key", "name", "number")

input.groupBy("key").
    agg(collect_set("name") as "names",
        max("number") as "maxnum").
    show

+---+-----------+------+
|key|name       |maxnum|
+---+-----------+------+
|  1|[Bob, John]|     5|
+---+-----------+------+

相关问题