我试图找到一种方法,以批处理的方式将mongodb数据库中的记录接收到pysparkDataframe中(即仅接收添加到集合中且尚未接收的最新/最新记录),以进行一些数据清理和分析。我以mongodb为例https://www.mongodb.com/blog/post/getting-started-with-mongodb-pyspark-and-jupyter-notebook 我可以阅读我感兴趣的全部藏品。
不同的是,我的收藏将作为一个“数据湖”作为Kafka流的消费者,并将随着每小时/每天越来越多的数据而增长(我打算为大约30天的记录设置一个tol)。您可以推断,每当此spark作业在当前设置下运行时,Dataframe将越来越大,因为它将接收整个集合,即使接收的记录已被清除/分析。
在pyspark和/或mongodb中是否有一种方法可以一次摄取100条最旧的记录,进行清理/分析,清除Dataframe,然后获取下100条最旧的记录,依此类推,直到获取最新的记录?
我从中提取数据的tweets集合确实有一个日期字段,它是创建tweet时的时间戳,所以这个时间戳可以用作过滤器或排序?
或者,去掉mongodb,仅仅使用spark结构化流媒体来阅读mongodb当前正在使用的kafka主题,会更容易/更明智吗?
这是我将mongodb集合的当前状态/大小转换为Dataframe的初始代码:
from pyspark.sql import SparkSession
spark = SparkSession.\
builder.\
appName("twitter-analysis-notebook").\
master("spark://spark-master:7077").\
config("spark.executor.memory", "1g").\
config("spark.mongodb.input.uri","mongodb://mongo1:27017/Twitter_Database.Tweets?replicaSet=rs0").\
config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.0").\
getOrCreate()
df = spark.read.format("mongo").load()
暂无答案!
目前还没有任何答案,快来回答吧!