csv DLT:逗号被视为列名的一部分

yyyllmsg  于 2022-12-06  发布在  其他
关注(0)|答案(1)|浏览(135)

我尝试在DataBricks环境中创建一个Streaming LIVE TABLE对象,使用一个S3桶,其中包含一组CSV文件作为源。
我使用的语法是:

CREATE OR REFRESH STREAMING LIVE TABLE t1
COMMENT "test table"
 TBLPROPERTIES
 (
   "myCompanyPipeline.quality" = "bronze"
   , 'delta.columnMapping.mode' = 'name'
   , 'delta.minReaderVersion' = '2'
   , 'delta.minWriterVersion' = '5'
 )
AS
SELECT * FROM cloud_files
(
  "/input/t1/"
  ,"csv"
  ,map
   (
    "cloudFiles.inferColumnTypes", "true"
   , "delimiter", ","
   , "header", "true"
   )
)

示例源文件内容:

ROW_TS,ROW_KEY,CLASS_ID,EVENT_ID,CREATED_BY,CREATED_ON,UPDATED_BY,UPDATED_ON
31/07/2018 02:29,4c1a985c-0f98-46a6-9703-dd5873febbbb,HFK,XP017,test-user,02/01/2017 23:03,,
17/01/2021 21:40,3be8187e-90de-4d6b-ac32-1001c184d363,HTE,XP083,test-user,02/09/2017 12:01,,
08/11/2019 17:21,05fa881e-6c8d-4242-9db4-9ba486c96fa0,JG8,XP083,test-user,18/05/2018 22:40,,

运行关联的管道时,出现以下错误:
org.apache.spark.sql.AnalysisException:无法在配置单元元存储中创建其列名包含逗号的表。
由于某种原因,加载程序无法将逗号识别为列分隔符,并试图将整个内容加载到单个列中。
我已经花了好几个小时试图找到一个解决方案。用分号替换逗号(在源文件和“分隔符”选项中都是如此)没有帮助。
尝试手动将相同的文件上载到常规(即非流式)数据块表可以正常工作。该问题仅与流式表有关。
想法?

u3r8eeie

u3r8eeie1#

不完全是我在这里所期望的解决方案的类型,但它似乎很有效,所以...
与使用SQL创建DLT不同,使用Python脚本有助于:

import dlt

@dlt.table
def t1():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/input/t1/")
  )

请注意,上述脚本需要通过DLT管道执行(直接从笔记本运行它将引发ModuleNotFoundError异常)

相关问题