各位开发者,
我正在创建动态定长文件读取函数——模式将来自json文件:我的代码语言是:scala,因为大多数现有代码已经用scala编写。
在浏览时,我找到了我需要的用pyspark编写的精确代码。你能帮我把它转换成相应的spark scala代码吗?特别是字典部分和循环部分
主要参考:使用pyspark中json文件的模式读取固定宽度的文件
SchemaFile.json
===========================
{"Column":"id","From":"1","To":"3"}
{"Column":"date","From":"4","To":"8"}
{"Column":"name","From":"12","To":"3"}
{"Column":"salary","From":"15","To":"5"}
File = spark.read\
.format("csv")\
.option("header","false")\
.load("C:\Temp\samplefile.txt")
SchemaFile = spark.read\
.format("json")\
.option("header","true")\
.json('C:\Temp\schemaFile\schema.json')
sfDict = map(lambda x: x.asDict(), SchemaFile.collect())
print(sfDict)
# [{'Column': u'id', 'From': u'1', 'To': u'3'},
# {'Column': u'date', 'From': u'4', 'To': u'8'},
# {'Column': u'name', 'From': u'12', 'To': u'3'},
# {'Column': u'salary', 'From': u'15', 'To': u'5'}
from pyspark.sql.functions import substring
File.select(
*[
substring(
str='_c0',
pos=int(row['From']),
len=int(row['To'])
).alias(row['Column'])
for row in sfDict
]
).show()
1条答案
按热度按时间e4yzc0pl1#
检查以下代码。