我尝试在DataBricks环境中创建一个Streaming LIVE TABLE对象,使用一个S3桶,其中包含一组CSV文件作为源。
我使用的语法是:
CREATE OR REFRESH STREAMING LIVE TABLE t1
COMMENT "test table"
TBLPROPERTIES
(
"myCompanyPipeline.quality" = "bronze"
, 'delta.columnMapping.mode' = 'name'
, 'delta.minReaderVersion' = '2'
, 'delta.minWriterVersion' = '5'
)
AS
SELECT * FROM cloud_files
(
"/input/t1/"
,"csv"
,map
(
"cloudFiles.inferColumnTypes", "true"
, "delimiter", ","
, "header", "true"
)
)
示例源文件内容:
ROW_TS,ROW_KEY,CLASS_ID,EVENT_ID,CREATED_BY,CREATED_ON,UPDATED_BY,UPDATED_ON
31/07/2018 02:29,4c1a985c-0f98-46a6-9703-dd5873febbbb,HFK,XP017,test-user,02/01/2017 23:03,,
17/01/2021 21:40,3be8187e-90de-4d6b-ac32-1001c184d363,HTE,XP083,test-user,02/09/2017 12:01,,
08/11/2019 17:21,05fa881e-6c8d-4242-9db4-9ba486c96fa0,JG8,XP083,test-user,18/05/2018 22:40,,
运行关联的管道时,出现以下错误:
org.apache.spark.sql.AnalysisException:无法在配置单元元存储中创建其列名包含逗号的表。
由于某种原因,加载程序无法将逗号识别为列分隔符,并试图将整个内容加载到单个列中。
我已经花了好几个小时试图找到一个解决方案。用分号替换逗号(在源文件和“分隔符”选项中都是如此)没有帮助。
尝试手动将相同的文件上载到常规(即非流式)数据块表可以正常工作。该问题仅与流式表有关。
想法?
1条答案
按热度按时间u3r8eeie1#
不完全是我在这里所期望的解决方案的类型,但它似乎很有效,所以...
与使用SQL创建DLT不同,使用Python脚本有助于:
请注意,上述脚本需要通过DLT管道执行(直接从笔记本运行它将引发
ModuleNotFoundError
异常)