在使用scala的spark中,我需要分离出具有 _corrupt_record
.
我有以下代码:
在这里,我把数据读入一个df-这很好。
val data_frame_datasource0 = glueContext.getCatalogSource(database = "my-stream-database", tableName = "my-stream-table", tmpDir = "", transformationContext = "datasource0", additionalOptions = JsonOptions("""{"startingPosition": "TRIM_HORIZON", "inferSchema": "true"}""")).getDataFrame()
这里我想创建一个数据具有 _corrupt_record
与好数据分离,以便以后我可以将split\u valid\u corrupt\u df(0)转储到坏数据位置。
val split_valid_corrupt_df = data_frame_datasource0.splitRows(Seq("_corrupt_record"), transformationContext = "split_valid_corrupt_df", CallSite("Not provided", ""), stageThreshold = 10, totalThreshold = 100)
但是 _corrupt_record
没有分开。
这里面我缺了什么?我是新来的。我还需要别的吗 Seq
附加或非空过滤器 _corrupt_record
不知怎么的?
谢谢
暂无答案!
目前还没有任何答案,快来回答吧!