python 在MLRun中读取Parquet,“无法推断Parquet的架构,必须手动指定,”

woobm2wo  于 2023-01-29  发布在  Python
关注(0)|答案(1)|浏览(133)

我得到了这个问题,当我摄取/写入数据到FeatureSet(MLRun FeatureStore的一部分),然后我通过PySpark读取数据(它似乎是无效的Parquet)。

AnalysisException                         Traceback (most recent call last)
<ipython-input-8-a8c688f9ceb5> in <module>
----> 1 newDF1 = spark.read.parquet(f"v3io://projects/{project_name}/FeatureStore/FS-ingest")
      2 newDF1.show()

/spark/python/pyspark/sql/readwriter.py in parquet(self, *paths, **options)
    299                        int96RebaseMode=int96RebaseMode)
    300 
--> 301         return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))
    302 
    303     def text(self, paths, wholetext=False, lineSep=None, pathGlobFilter=None,

/spark/python/lib/py4j-0.10.9.3-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1320         answer = self.gateway_client.send_command(command)
   1321         return_value = get_return_value(
-> 1322             answer, self.gateway_client, self.target_id, self.name)
   1323 
   1324         for temp_arg in temp_args:

/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    115                 # Hide where the exception came from that shows a non-Pythonic
    116                 # JVM exception message.
--> 117                 raise converted from None
    118             else:
    119                 raise

AnalysisException: Unable to infer schema for Parquet. It must be specified manually.

查看源代码的关键部分(生成异常的部分):

...
feature_set1=fstore.FeatureSet(name="FS-ingest",entities=[fstore.Entity('app'),fstore.Entity('id')],engine="spark",timestamp_key='time')
feature_set1.set_targets(targets=[ParquetTarget(name="s1",partitioned=False),NoSqlTarget(name="s2")],with_defaults=False)
feature_set1.save()
fstore.ingest(f"store://feature-sets/{project_name}/FS-ingest", sparkDF,spark_context=spark, overwrite=True)
...
newDF1 = spark.read.parquet(f"v3io://projects/{project_name}/FeatureStore/FS-ingest")
newDF1.show()

您是否发现类似问题?
注:Parquet路径包含parquet文件(所有文件均有效),这意味着摄取成功。

l7wslrjt

l7wslrjt1#

源代码(parquet的使用)包含错误。FeatureSet使用了两个目标,在线和离线存储,在这种情况下,spark.read.parquet也影响了在线存储,其中格式与parquet不同。我看到了两个可能的解决方案。

1.更新 parquet 读取部分

这是一个简单的方法,如何解决这个问题。简单,扩展/添加当前路径/parquet,请参阅更新的代码:

...
newDF1 = spark.read.parquet(f"v3io://projects/{project_name}/FeatureStore/FS-ingest/parquet")
newDF1.show()
...

2.删除联机/NoSql目标

这是关于FeatureSet定义的更新(删除NoSqlTarget(name="s2"))并保留Spark。阅读。 parquet 部分见更新代码:

...
feature_set1=fstore.FeatureSet(name="FS-ingest",entities=[fstore.Entity('app'),fstore.Entity('id')],engine="spark",timestamp_key='time')
feature_set1.set_targets(targets=[ParquetTarget(name="s1",partitioned=False)],with_defaults=False)
feature_set1.save()

newDF1 = spark.read.parquet(f"v3io://projects/{project_name}/FeatureStore/FS-ingest/parquet")
newDF1.show()
...

**顺便说一句:**相同的解决方案也适用于此不同的例外情况,其中包含更准确的问题描述(鉴于在线和离线商店的不同路径):

Py4JJavaError: An error occurred while calling o3233.parquet.
: java.lang.AssertionError: assertion failed: Conflicting directory structures detected. Suspicious paths:
    v3io://projects/spark-parquet-test2/featurestore/FS-ingest/nosql/sets/FS-ingest/1674747966078_84
    v3io://projects/spark-parquet-test2/featurestore/FS-ingest/parquet/sets/FS-ingest/1674747966078_84

If provided paths are partition directories, please set "basePath" in the options of the data source to specify the root directory of the table. If there are multiple root directories, please load them separately and then union them.
    at scala.Predef$.assert(Predef.scala:223)
    at org.apache.spark.sql.execution.datasources.PartitioningUtils$.parsePartitions(PartitioningUtils.scala:178)
    at org.apache.spark.sql.execution.datasources.PartitioningUtils$.parsePartitions(PartitioningUtils.scala:110)
    at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.inferPartitioning(PartitioningAwareFileIndex.scala:158)
    at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.partitionSpec(InMemoryFileIndex.scala:73)
    at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.partitionSchema(PartitioningAwareFileIndex.scala:50)
    at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:169)

相关问题