我有一个很大的问题,希望在解释我想做什么清楚。我正在尝试在pyspark(spark structured streaming)上获取流结构,并且我想在kafka中从scraping获取新数据时更新相同的文档。
以下是在localhost和mongocompass上发送的json示例:
{
_id: ObjectId("28276465847392747")
id: reply
Company: reply
Value:{
Date: 20-05-2020
Last_Hour_Contract: 09.12.25
Last_Hour: 09.14.30
Price: 16.08
Quantity: 8000
Medium_Price: 8.98
Min_Price: 8.98
Max_Price: 20.33
News: { id_news: Reply_20-05-20
title_news: "titolo news"
text: "text"
date: 20-05-2020
hour: 09:13:00
subject: Reply
}
}
}
{
_id: ObjectId("28276465847392747")
id: reply
Company: reply
Value:{
Date: 20-05-2020
Last_Hour_Contract: 09.12.25
Last_Hour: 09.14.30
Price: 17.78
Quantity: 9000
Medium_Price: 67.98
Min_Price: 8.98
Max_Price: 20.33
News: { id_news: Reply_20-05-20
title_news: "title_news"
text: "text"
date: 20-05-2020
hour: 09:13:00
subject: Reply
}
}
}
我想实现的是,当新数据到达时,将各种文档(基于company\u name=“name\u company”)合并到一个文档中。
我希望json文档的设置如下:
{
_id: ObjectId("3333884747656565"),
id: reply
Date: 21-05-2020
Company: Reply
Value:{
Date: 20-05-2020
Last_Hour_Contract: 09.12.25
Last_Hour: 09.14.30
Price: 16.08
Quantity: 8000
Medium_Price: 8.98
Min_Price: 8.98
Max_Price: 20.33
News: {id_news: Reply_20-05-20
title_news: "title news..."
text: "text..."
date: 20-05-2020
hour: 09:13:00
subject: Reply
}
Date: 21-05-2020
Last_Hour_Contract: 09.12.25
Last_Hour: 09.16.50
Price: 16.68
Quantity: 7000
Medium_Price: 8.98
Min_Price: 8.98
Max_Price: 20.33
News: {id_news: Reply_20-05-20
title_news: "title news..."
text: "text..."
date: 21-05-2020
hour: 09:14:00
subject: Reply
}
}
}
我还插入了一个图像,让您更好地理解(我希望这两个箭头是可以理解的):
如何使用pyspark实现这一点?谢谢
这是我的密码:
def writeStreamer(sdf):
sdf.select("id_Borsa","NomeAzienda","Valori_Di_Borsa") \
.dropDuplicates(["id_Borsa","NomeAzienda","Valori_Di_Borsa"]) \
.writeStream \
.outputMode("append") \
.foreachBatch(foreach_batch_function) \
.start()
def foreach_batch_function(sdf, epoch_id):
sdf.write \
.format("mongo") \
.mode("append") \
.option("spark.mongodb.input.uri", "mongodb://127.0.0.1:27017/DataManagement.Data") \
.option("spark.mongodb.output.uri", "mongodb://127.0.0.1:27017/DataManagement.Data") \
.save() #"com.mongodb.spark.sql.DefaultSource"
df_borsa = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", kafka_broker) \
.option("startingOffsets", "latest") \
.option("subscribe","Reply_borsa") \
.load() \
.selectExpr("CAST(value AS STRING)")
df_news = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", kafka_broker) \
.option("startingOffsets", "latest") \
.option("subscribe","Reply_news") \
.load() \
.selectExpr("CAST(value AS STRING)")
df_borsa = df_borsa.withColumn("Valori_Di_Borsa",F.struct(F.col("Data"),F.col("PrezzoUltimoContratto"),F.col("Var%"),F.col("VarAssoluta"),F.col("OraUltimoContratto"),F.col("QuantitaUltimo"),F.col("QuantitaAcquisto"),F.col("QuantitaVendita"),F.col("QuantitaTotale"),F.col("NumeroContratti"),F.col("MaxOggi"),F.col("MinOggi")))
df_news = df_news.withColumn("News",F.struct(F.col("id_News"),F.col("TitoloNews"),F.col("TestoNews"),F.col("DataNews"),F.col("OraNews")))
# Apply watermarks on event-time columns
dfWithWatermark = df_borsa.select("id_Borsa","NomeAzienda","StartTime","Valori_Di_Borsa").withWatermark("StartTime", "2 hours") # maximal delay
df1WithWatermark = df_news.select("SoggettoNews","EndTime").withWatermark("EndTime", "3 hours") # maximal delay
# Join with event-time constraints
sdf = dfWithWatermark.join(df1WithWatermark,expr("""
SoggettoNews = NomeAzienda AND
EndTime >= StartTime AND
EndTime <= StartTime + interval 1 hours
"""),
"leftOuter").withColumn("Valori_Di_Borsa",F.struct(F.col("Valori_Di_Borsa.*"),F.col("News")))
query = writeStreamer(sdf)
spark.streams.awaitAnyTermination()
sdf\u printschema():
1条答案
按热度按时间iqjalb3h1#
你要做的就是
group
操作员按文档分组Company
字段并添加value
对象将每个分组文档放入新形成的数组字段中values
通过使用$push
接线员。因此,上面的mongo实现如下所示:
您可以轻松地将上述聚合转换为pyspark实现。
你需要做如下的事情:
注意:我不是pyspark的Maven,但是您可以轻松地将其转换为所需的实现。