pandas 在RICKS中运行python代码时出现此错误

wd2eg0qa  于 2023-01-19  发布在  Python
关注(0)|答案(1)|浏览(87)

错误

Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the
referenced columns only include the internal corrupt record column
(named _corrupt_record by default). For example:
spark.read.schema(schema).csv(file).filter($"_corrupt_record".isNotNull).count()
and spark.read.schema(schema).csv(file).select("_corrupt_record").show().
Instead, you can cache or save the parsed results and then send the same query.
For example, val df = spark.read.schema(schema).csv(file).cache() and then
df.filter($"_corrupt_record".isNotNull).count().

守则

from pyspark.sql.functions import explode, col

# Read the JSON file from Databricks storage
df_json = spark.read.json("/mnt/BigData_JSONFiles/new_test.json")
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "false")

# Convert the dataframe to a dictionary
data = df_json.toPandas().to_dict()

# Split the data into two parts
d1 = dict(itertools.islice(data.items(), 8))
d2 = dict(itertools.islice(data.items(), 8, len(data.items())))

# Convert the first part of the data back to a dataframe
df1 = spark.createDataFrame([d1])

# Write the first part of the data to a JSON file in Databricks storage
df1.write.format("json").save("/mnt/BigData_JSONFiles/new_test_header.json")

# Convert the second part of the data back to a dataframe
df2 = spark.createDataFrame([d2])

# Write the second part of the data to a JSON file in Databricks storage
df2.write.format("json").save("/mnt/BigData_JSONFiles/new_test_detail.json")

大型JSON文件的示例JSON文件

{
  "reporting_entity_name": "launcher",
  "reporting_entity_type": "launcher",
  "plan_name": "launched",
  "plan_id_type": "hios",
  "plan_id": "1111111111",
  "plan_market_type": "individual",
  "last_updated_on": "2020-08-27",
  "version": "1.0.0",
  "in_network": [
    {
      "negotiation_arrangement": "ffs",
      "name": "Boosters",
      "billing_code_type": "CPT",
      "billing_code_type_version": "2020",
      "billing_code": "27447",
      "description": "Boosters On Demand",
      "negotiated_rates": [
        {
          "provider_groups": [
            {
              "npi": [
                0
              ],
              "tin": {
                "type": "ein",
                "value": "11-1111111"
              }
            }
          ],
          "negotiated_prices": [
            {
              "negotiated_type": "negotiated",
              "negotiated_rate": 123.45,
              "expiration_date": "2022-01-01",
              "billing_class": "organizational"
            }
          ]
        }
      ]
    }
  ]
}

你好,我正在尝试将一个大的JSON文件分成两种格式,这是由上面的代码完成的。但它是失败的,说缓存我用。缓存()在加载文件的最后,但仍然得到这个错误。请让我知道我如何解决这个错误。

e4eetjau

e4eetjau1#

我能够解决此错误购买更改此

df_json = spark.read.json("/mnt/BigData_JSONFiles/new_test.json")

到这个

df_json = spark.read.option("multiline","true").json("/mnt/BigData_JSONFiles/new_test.json")

相关问题