将PySpark DataFrame作为对象ID写入MongoDB插入字段

iaqfqrcu  于 2022-10-22  发布在  Spark
关注(0)|答案(3)|浏览(165)

我试图向MongoDB写入一个Spark DataFrame,其中包含另一个集合的_id的对象ID的字符串表示。
问题是,PySpark不支持对象ID(这里解释了Scala和Java对象ID支持:https://github.com/mongodb/mongo-spark/blob/master/doc/1-sparkSQL.md),那么,如何使用Spark连接器将对象ID从PySpark插入到MongoDB中呢?

cuxqih21

cuxqih211#

如今,公认的答案似乎已经过时了。它真的让我找到了一个工作版本,谢谢你。
以下是我的代码的工作版本:

import pyspark.sql.functions as sfunc
from pyspark.sql.types import *

# This user defined function creates from an str ID like "5b8f7fe430c49e04fdb91599"

# the following Object : { "oid" : "5b8f7fe430c49e04fdb91599"}

# which will be recognized as an ObjectId by MongoDB

udf_struct_id = sfunc.udf(
    lambda x: tuple((str(x),)), 
    StructType([StructField("oid",  StringType(), True)])
)

df = df.withColumn('future_object_id_field', udf_struct_id('string_object_id_column'))

我的设置:MongoDB 4.0、Docker镜像for Spark gettyimages/spark:2.3.1-hadoop-3.0、python3.6
documentation for the pyspark mongo connector让我想到将字段称为oid,这是mongo将字段识别为OBJECTID类型所必需的。

oknwwptz

oknwwptz2#

我将列转换为Spark Structfield,插入到MongoDB中的Spark StructField将自动转换为ObjectID

import pyspark.sql.functions as sfunc
import pyspark.sql.types as stypes

udf_struct_id = sfunc.UserDefinedFunction(
    x: tuple((str(x),)), 
    StructType((stypes.StructField( stypes.StringType(), True),))
)

df = df.withColumn('future_object_id_field', udf_struct_id(df['string_object_id_column']))

然后,您可以对该DataFrame的Mongo执行SparkSession写入,Future_Object_id_field将成为一个对象ID。

注意:该字段必须为空的True,如果为False,则该字段将变成一个内部有字符串的对象。

nvbavucw

nvbavucw3#

出于某种原因,如果您从DICT创建 Dataframe 。

mongo_schema = StructType([
    StructField("SomeField", StringType()),
    StructField("ObjectIdField", StructType([StructField('oid', StringType())]))
])

some_dict = {'SomeField': some_field,
             'ObjectIdField': (object_id,)}

df = s.createDataFrame(data=some_dict, schema=mongo_schema)

相关问题