我有一个空的dataframe创建如下。模式:
table_schema = StructType([
StructField('bookingNumber', StringType(), True),
StructField('bookingProfile', StringType(), True),
StructField('json_string', StringType(), True),
StructField('json_ingestion_time', TimestampType(), True)
])
def prepare_empty_df(schema: StructType):
empty_rdd = spark.sparkContext.emptyRDD()
empty_df = spark.createDataFrame(empty_rdd, schema)
return empty_df
我的数据来自一个API调用。每个GET调用都将返回JSON,我正在将API响应(JSON)转换为文本。我正在解析这个JSON的一些属性,然后插入到表中。因为我有20万个JSON,所以我不想在我的表上运行20万个插入查询,我想把所有API JSON调用的结果附加到一个空的JavaScript框架中,然后简单地摄取JavaScript框架。我所做的API调用不是顺序的,而是并行线程。也就是说,我一次运行4个parallel API调用,并试图将4个输出附加到一个空的嵌套框中。下面是我如何转换API JSON并将其附加到空的JavaScript框架中。
主要方法:
if __name__ == '__main__':
spark = SparkSession.builder.appName('Raw_Check').getOrCreate()
batch_size = 4
booking_ids = []
initial_df = prepare_empty_df(schema=raw_table_schema)
initial_load = True
cquery = f'select booking_id from db.table limit 20'
booking_ids = get_booking_ids(spark=spark, query=cquery) # returns a list of bookings
for i in range(0, len(booking_ids), batch_size):
sub_list = booking_ids[i:i + batch_size]
threads = []
for index in range(batch_size):
t = threading.Thread(target=get_json, name=str(index), args=(spark, sub_list[index], initial_df))
threads.append(t)
t.start()
for index, thread in enumerate(threads):
thread.join()
print('Final Dataframe count')
print(initial_df.count())
print('-------------------------------------------------------------------------------------------------------------------------------------------------')
print('Final Dataframe contents')
initial_df.show()
print('-------------------------------------------------------------------------------------------------------------------------------------------------')
get_json方法:
def get_json(spark: SparkSession, booking_number: str, init_df: DataFrame):
headers = {"Content-type": "some_content_type"}
token = doing_something_to_get_token
token_headers = {'Authorization': f"Bearer {token}"}
api_response = requests.get(f'https://api_url?booking_number={booking_number}', headers=token_headers)
json_data = spark.sparkContext.parallelize([api_response.text])
df = spark.read.json(json_data)
api_df = (df.select('id').withColumnRenamed('id', 'bookingProfile')
.withColumn('bookingNumber', lit(booking_number))
.withColumn('json_string', lit(api_response.text))
.withColumn('json_ingestion_time', lit(current_timestamp()))
)
api_df.show()
init_df.unionAll(api_df)
我将API输出中的每一行联合到我在main方法中创建的initial_df
。当脚本运行时,由于api_df.show()
,我还可以看到来自API_df的数据。四个并行线程正在启动,我可以看到一次运行4个API调用。但是在最后,空的dataframe:我创建的initial_df
在脚本结束时仍然是空的。计数为零,当我显示它的内容时,它基本上打印NULL。
-------------------------------------------------------------------------------------------------------------------------------------------------
Final Dataframe count
0
-------------------------------------------------------------------------------------------------------------------------------------------------
Final Dataframe contents
+--------------+-------------------+-----------+------------------------+
|bookingNumber |bookingProfile |json_string| json_ingestion_time|
+--------------+-------------------+-----------+------------------------+
+--------------+-------------------+-----------+------------------------+
谁能告诉我我在这里犯了什么错误,我该如何纠正?任何帮助都非常感激。
8条答案
按热度按时间sxissh061#
MapPartitions
正如others已经指出的那样,Spark中惯用的方法是通过为每个
booking_id
调用REST API来将booking_id
Map到预期结果。但是,当将
booking_id
sMap到结果时,每个REST调用都有一定的开销,用于启动HTTP连接、执行SSL握手或获取访问API所需的访问令牌。这减慢了该过程。HTTP会话
requests库提供了一种解决方案,通过使用会话对象来减少这种开销。医生说:
因此,如果您向同一主机发出多个请求,底层的TCP连接将被重用,这可以显著提高性能
基本上,HTTP(s)连接在每次API调用后保持打开(使用keep-alive头),并在下一次调用中重用。唯一要求是请求对象从一个API调用传递到下一个调用。因此,与其发起三个HTTPS连接
仅创建并重用了一个会话:
如果请求的日志记录是enabled,我们在第一种情况下看到打开了三个HTTP连接,而在第二种情况下只有一个。
MapPartitions
Spark提供了一种重用session对象的机制:mapPartitions。这个想法是Map一个完整的行分区,而不是单个行。执行实际Map的函数获取一个包含分区所有行的迭代器,并返回一个包含结果分区的迭代器。
在开始使用迭代器之前,会话对象被创建,然后在迭代时被重用:
调用mapPartitions涉及到RDD的(小的)迂回,因为Pyspark API缺少以下功能:
再次启用请求日志记录,我们看到现在每个分区只创建一个连接,并且所有请求都包含头
Connection: keep-alive
。讨论
mapPartitions
的实现比直接使用map
稍微复杂一些,但表现出更好的性能特征。在我的测试中,根据Spark集群、REST API服务器和身份验证过程的性能,mapPartitions
至少比map
快20%。mapPartitions
可以减少REST API服务器上的压力,因为需要更少的连接和SSL握手。callRestApi
内部需要某种错误处理。当从REST API服务器获取和解析响应时,我们可能会看到连接问题或格式错误的json返回。应该处理这些问题,因为它们可能会中断对特定分区中所有行的处理。wko9yo5t2#
为什么不尝试将API响应存储到组合的JSON文件中。然后在dataframe中读取json并摄取到您想要的表中。在这一部分中,您-->将API输出中的每一行联合到initial_df。而不是dataframe将其附加到文件中。每次API调用完成后,您将拥有一个包含所有API调用的所有记录的JSON文件。将json读入一个dataframe,做任何你想要的转换并插入到所需的表中。
目前我使用类似的方法,我从API调用中阅读大约650 k的JSON行/记录,每个API调用批量读取10,000条记录。在每个API调用中,我提取所需的JSON并将其附加到JSON文件中。在所有的API调用完成后,我在JSON文件中留下了650 k的记录,接下来我用它来读取到JavaScript框架中,进行所需的扁平化和转换,并存储到一个表中。
6rqinv9w3#
将完整的逻辑移动到
udf
函数& spark将使用多线程从API获取数据。您可以使用
repartition(<batch_size>)
函数增加或减少并行线程。请检查下面的逻辑并根据您的要求修改。
群集
下面是我在集群中执行的屏幕截图
z5btuh9x4#
直接通过Spark执行API请求是有意义的。您可以使用parallelize和map的组合来实现这一点:
首先,将
get_json
函数缩减为只获取JSON:然后创建以下函数,它并行地从API获取数据:
并行度的大小还取决于您提供给Spark的资源。
aydmsdu95#
实际上我正在为这些类型的问题编写一些代码,你可以制作一个JSON模板,并在JSON中添加一个大的JSON字符串来解析,也许问题会出现在解析过程中,但pandas可以用JSON处理解析部分。
这个包叫做JsonDF,你可以从GitHub或者
pip install JsonDf
下载,如果有一些功能不存在的话,你可以编辑代码,这会很有帮助。我建议你使用的代码是:
以上将输出:
你可以用
tempalte.template
访问模板值,这是JsonDF处理Json对象的方式。你可以用它来创建一个json模板并添加到其中,或者甚至解析一个已经存在的json,只需将json传递到
Json()
对象的第二个参数中,它将解析它并从中创建一个对象,你将能够从json中插入,更新,搜索和删除。这将帮助你只处理JSON上的所有处理,然后传递最终的JSON以转换为dataframe。
希望对你有帮助。
4zcjmb1e6#
unionAll
操作不会修改调用该方法的DataFrame,而是返回一个新的DataFrame。由于您没有将unionAll
操作的结果赋值回initial_df
,因此更改不是持久性的。我不知道它是否完全正确,但请尝试下面的例子。另外,如果我没有弄错的话,我认为unionAll
是不推荐使用的,但你必须仔细检查。祝你好运!
获取json
main.py
41zrol4v7#
感谢大家的帮助,我采取了另一种方法,创建了一个类,并在构造函数中初始化了我的框架,如下所示:
下面是我如何从上面的类调用对象:
gk7wooem8#
我的想法是,不要把数组传给线程的参数,而是使用python * 多处理 * 库的 starmap。获取输出作为嵌套框架的列表
然后迭代数据框以合并空数据框中的结果
我希望这能解决你的问题