python 我如何将数据传递给dagster中不同模块中的操作?

wlsrxk51  于 2022-12-28  发布在  Python
关注(0)|答案(2)|浏览(129)

我是dagster的新手,很难解决这个问题。我必须在dagster管道中定义作业,并且希望将数据从一个作业的操作传递到另一个作业的操作
我的设置如下(简化示例)
job1.py

@op()
def generate_num():
    return 3
@op()
def increase_num(generate_num):
    return generate_num + 1
@job()
def increment_up():
    increase_num(generate_num))

job2.py

@op()
def decrease_num(generate_num)
    generate_num - 1
@op()
def multiple_num(decrease_num)
    decrease_num * 2
@job()
def get_multiple():
    multiple_num(decrease_num())

其中从“generate_num”返回的值被传递给job2.py。这样做完全是错误的吗?

vjhs03f7

vjhs03f71#

有什么理由不能在第二个作业中重用generate_num吗?

from job1 import generate_num

@op()
def decrease_num(generate_num)
    generate_num - 1
@op()
def multiple_num(decrease_num)
    decrease_num * 2
@job()
def get_multiple():
    multiple_num(decrease_num(generate_num()))
9lowa7mx

9lowa7mx2#

在Dagster中,考虑这个问题的典型方式是资产。即generate_num在第一个作业中产生并在第二个作业中使用的值将位于任何作业运行范围之外的持久存储中的某个地方。资产是持久存储中的对象,如文件或表。
以下是共享一个资源的两个作业的示例:

from dagster import Definitions, AssetSelection, asset, define_asset_job

@asset
def num():
    return 3

@asset
def num_plus_one(num):
    return num + 1

@asset
def num_multiplied(num):
    return num + 2

defs = Definitions(
    assets=[num, num_plus_one, num_multiplied],
    jobs=[
        define_asset_job("inc_job", AssetSelection.assets(num, num_plus_one)),
        define_asset_job("multi_job", AssetSelection.assets(num_multiplied)),
    ],
)

运行第一个作业时,将为numnum_plus_one创建一个文件。运行第二个作业时,它将使用num文件的内容来计算num_multiplied

相关问题