spark架构管理

vxbzzdmp  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(388)

问题

管理spark表的模式的最佳方法是什么?你看到选项2有什么缺点吗?你能提出更好的选择吗?

我看到的解决方案

选项1:为代码和元存储保留单独的定义
这种方法的缺点是您必须不断地保持它们的同步(容易出错)。另一个缺点是,如果表有500列,它会变得很麻烦。
创建\u some \u table.sql[第一个定义]

-- Databricks syntax (internal metastore)
CREATE TABLE IF NOT EXISTS some_table (
  Id int,
  Value string,
  ...
  Year int
)
USING PARQUET
PARTITION BY (Year)
OPTIONS (
  PATH 'abfss://...'
)

某个job.py[第二个定义]

def run():
   df = spark.read.table('input_table')  # 500 columns
   df = transorm(df)
   # this logic should be in `transform`, but anycase it should be
   df = df.select(
     'Id', 'Year', F.col('Value').cast(StringType()).alias('Value')  # actually another schema definition: you have to enumerate all output columns
   )
   df.write.saveAsTable('some_table')

测试\u some \u job.py[第三个定义]

def test_some_job(spark):
   output_schema = ...  # another definition
   expected = spark.createDataFrame([...], output_schema)

选项2:在代码中只保留一个定义(structtype)
可以动态生成模式。这种方法的好处是简单性和模式定义在一个地方。你看到什么缺点了吗?

def run(input: Table, output: Table):
   df = spark.read.table(input.name)
   df = transform(df)
   save(df, output)    

def save(df: DataFrame, table: Table): 
    df \
        .select(table.schema.fieldNames()) \
        .write \
        .partitionBy(table.partition_by) \
        .option('path', table.path) \
        .saveAsTable(table.name)
    # In case table doesn't exists, Databricks will automatically generate table definition

class Table(NamedTuple):
    name: str
    path: str
    partition_by: List[str]
    schema: StructType
uxh89sit

uxh89sit1#

在我的公司,我们有一个非常小的团队,所以我们构建了一个基于c#模板的生成器,它从sql server数据库读取信息,并生成我们所有的笔记本。为整个系统生成ddl模式和处理代码所需的所有信息都在该数据库中。我们现在唯一手工编写的代码是DIM和facts加载过程的一部分。所有的锅炉板代码都被抽象掉了。对于新表,我们进入元数据库,输入新的表和列信息并运行生成器。这将导致您在生成的代码中所说的重复,但现在我们只是维护元数据库信息,而不必查找所有需要更新的位置。如果你已经建立了一个系统,那么这种方法将很难适应,而且说服高层管理人员也是一个挑战,因为前面还有很多开发。但是,如果您从零开始就遵循“自动化”这一信条,一个流程一个流程地工作,了解如何使其通用化,您将获得解决问题和稳定平台的无与伦比的能力。我在野外看到的唯一一款能够在大数据领域做类似事情的产品是wherescape,如果我没记错的话,它的价格非常昂贵,每年每个座位35000美元。所以如果你有钱的话,我强烈建议你去看看[https://www.wherescape.com/][1] 可能还有其他人,但那是我唯一知道的。
举几个例子说明这种方法是如何带来好处的。
我们自己有两个开发人员,另外一个开发人员,我们能够在4个月内完全将130个staging和40个dim和facts从u-sql datalake analytics切换到databricks。在我们进行的过程中关闭所有生成的进程。
假设我们在加载暂存表时发现了一个处理问题—我们更新了流程模板—重新生成笔记本并重新部署。这一过程现在更新了所有130个表的半天到一天的工作,可能已经花了几个星期来修复和测试,否则。

8dtrkrch

8dtrkrch2#

让我先谈几点,然后再提出建议。
数据比代码寿命长得多。
上面描述的代码是创建和写入数据的代码,也有读取和使用数据的代码需要考虑。
还有第三个选项,用数据存储数据(模式)的定义。通常被称为“自我描述格式”
数据的结构会随着时间的推移而改变。
此问题带有 databricks 以及 aws-glue Parquet地板是自我描述文件的基础上。
delta-lake表使用parquet数据文件,但是还将模式嵌入到事务日志中,因此对整个表和模式进行版本控制。
数据需要被广泛的工具生态系统使用,因此数据需要是可发现的,模式不应该被锁定在一个计算引擎中。
建议:
以开放格式存储包含数据的架构
使用delta-lake格式(结合了Parquet和事务日志)
改变 USING PARQUETUSING DELTA 将元存储指向aws glue catalog,glue catalog将存储表名和位置
使用者将从delta-lake表事务日志解析模式
模式可以随着编写器代码的发展而发展。
结果:
您的编写器创建了模式,并且可以选择性地发展模式
所有使用者都会在delta lake中找到模式(与表版本配对)(\u delta\u log dir是特定的)

相关问题