我正在尝试更改spark数据框中的几列,其中有几列如下:
名字
姓
电子邮件
我想匿名这一点,并产生有意义的价值,我使用伪造。
但如果我用
df.withColumn('FirstName', lit(fake.first_name()))
它为所有行添加相同的名称,例如:
正如您所看到的,每个名字都有相同的值,理想情况下,我希望有不同的faker值,而不是常量。我该如何做到这一点?
更新1:
我看了史蒂文的建议,这是我的最新代码
import pyspark.sql.functions as sf
from faker import Faker
from pyspark.sql import functions as F
MSG_FORMAT = '%(asctime)s %(levelname)s %(name)s: %(message)s'
DATETIME_FORMAT = '%Y-%m-%d %H:%M:%S'
logging.basicConfig(format=MSG_FORMAT, datefmt=DATETIME_FORMAT)
logger = logging.getLogger("[SFDC-GLUE-LOG]")
fake = Faker()
source_df = spark.read.format("jdbc").option("url",connection_url).option("query",query).option("driver", driver_name).option("user", user_name).option("password", password).option("StmtCallLimit",0).load()
fake_firstname = F.udf(fake.first_name)
masked_df=source_df.withColumn("FirstName", fake_firstname())
现在我明白了
Traceback (most recent call last):
File "script_2020-08-05-17-15-26.py", line 52, in <module>
masked_df=source_df.withColumn("FirstName", fake_firstname())
File "/mnt/yarn/usercache/root/appcache/application_1596647211940_0002/container_1596647211940_0002_01_000001/pyspark.zip/pyspark/sql/udf.py", line 189, in wrapper
return self(*args)
File "/mnt/yarn/usercache/root/appcache/application_1596647211940_0002/container_1596647211940_0002_01_000001/pyspark.zip/pyspark/sql/udf.py", line 167, in __call__
judf = self._judf
File "/mnt/yarn/usercache/root/appcache/application_1596647211940_0002/container_1596647211940_0002_01_000001/pyspark.zip/pyspark/sql/udf.py", line 151, in _judf
self._judf_placeholder = self._create_judf()
File "/mnt/yarn/usercache/root/appcache/application_1596647211940_0002/container_1596647211940_0002_01_000001/pyspark.zip/pyspark/sql/udf.py", line 160, in _create_judf
wrapped_func = _wrap_function(sc, self.func, self.returnType)
File "/mnt/yarn/usercache/root/appcache/application_1596647211940_0002/container_1596647211940_0002_01_000001/pyspark.zip/pyspark/sql/udf.py", line 35, in _wrap_function
pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
File "/mnt/yarn/usercache/root/appcache/application_1596647211940_0002/container_1596647211940_0002_01_000001/pyspark.zip/pyspark/rdd.py", line 2420, in _prepare_for_python_RDD
pickled_command = ser.dumps(command)
File "/mnt/yarn/usercache/root/appcache/application_1596647211940_0002/container_1596647211940_0002_01_000001/pyspark.zip/pyspark/serializers.py", line 600, in dumps
raise pickle.PicklingError(msg)
_pickle.PicklingError: Could not serialize object: TypeError: can't pickle weakref objects
2条答案
按热度按时间fcg9iug31#
您需要使用自定义项:
svmlkihl2#
我也有同样的问题,遵循对我有效的解决方案。