我在spark structured streaming(pyspark)中阅读了Kafka的2条流(stream1和stream2)。我必须计算流1和流2的偏移量之差。
我试着这样做:
<class 'pyspark.sql.dataframe.DataFrame'>
root
|--timestamp: timestamp (nullable = true)
|-- value: string (nullable = true)
|-- offset: double (nullable = true)
|-- string_val: string (nullable = true)
|-- ping: double (nullable = true)
|-- date: string (nullable = true)
|-- time: string (nullable = true)
|-- offset_v1: double (nullable = true)
|-- date_time: string (nullable = true)
|-- date_format: timestamp (nullable = true)
<class 'pyspark.sql.dataframe.DataFrame'>
|-- Mean: double (nullable = true)
|-- pingTime: timestamp (nullable = true)
|-- Std_Deviation: double (nullable = true)
|-- devTime: timestamp (nullable = true)
|-- offset_v2: double (nullable = true)
|-- upperBound: double (nullable = true)
|-- lowerBound: double (nullable = true)
stream2 = stream2.withColumn('difference',stream2.offset_v2-stream1.offset_v1)
它抛出一个错误:
pyspark.sql.utils.analysisexception:运算符中的u'resolved属性偏移量ţv1ţ95缺少上限ţ182,标准ţ偏差ţ149,下限ţ189,平均值ţ133,pingtimeţ129-t30000ms,devtimeţ144-t30000ms,偏移量ţv2ţ155!项目[平均值133,pingtime 129-t30000ms,标准偏差149,devtime 144-t30000ms,偏移量v2 155,上界182,下界189,(偏移量v2 155-偏移量v1 95)作为差值233]
1条答案
按热度按时间guicsvcw1#
就像venki所说的,您需要首先连接起来比较相关的行。你有什么专栏要写吗?有个约会和身份证就行了。假设两个Dataframe中都有一个名为join\u col的Dataframe:
如果找不到合适的联接,那么在比较不同长度的列的情况下,这就是一个问题,我相信这就是您要处理的。