PyFlink中源函数和汇函数的使用

w46czmvw  于 2023-01-28  发布在  Apache
关注(0)|答案(1)|浏览(191)

我是PyFlink的新手,我已经用Java做过正式的培训练习:https://github.com/apache/flink-training
但是,我正在做的项目必须使用Python作为编程语言。我想知道是否可以使用“SourceFunction”编写一个数据生成器。在旧的PyFlink版本中,使用Jython可以做到这一点:https://nightlies.apache.org/flink/flink-docs-release-1.7/dev/stream/python.html#streaming-program-example
在更新的例子中, Dataframe 包含一个有限的数据集,它从来没有被扩展过。我没有在PyFlink中找到任何数据生成器的例子,例如https://github.com/apache/flink-training/blob/master/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiRideGenerator.java
我不确定接口Source和SinkFunction提供了哪些功能。它可以在python中使用吗?或者只能与其他管道或jar文件结合使用吗?看起来方法“run()”和“cancel()”没有实现,因此它不能像其他类那样通过重载来使用。
如果它不能用在Python中,有没有其他的方法来使用它?有人可能会提供一个简单的例子。
如果不可能使用它,有没有其他方法可以用OOP风格编写数据生成器?举个例子:https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/datastream_tutorial/这里split()方法被用来分离流。基本上,我想通过一个额外的类来实现这一点,并且只是扩展流,这在Java TaxiRide示例中是通过“ctx.collect()"完成的。我正在努力避免使用Java,另一个管道框架,以及Jython。如果能得到一个简短的示例代码就好了,但是我很感激任何提示和建议。
我试图直接使用SourceFunction,但正如已经提到的,我认为这是一种完全错误的方式,导致了一个错误:AttributeError: 'DataGenerator' object has no attribute '_get_object_id'

class DataGenerator(SourceFunction):
    def __init__(self):
        super().__init__(self)
        self._num_iters = 1000
        self._running = True

    def run(self, ctx):
        counter = 0
        while self._running and counter < self._num_iters:
            ctx.collect('Hello World')
            counter += 1

    def cancel(self):
        self._running = False
jpfvwuh4

jpfvwuh41#

**解决方案:**在查找了一些使用类Source和SinkFunction的旧代码之后,我找到了一个解决方案。这里使用了一个用Java编写的Kafka连接器。python代码可以作为如何使用pyflink的Source和SinkFuntion的示例。

我只为SourceFunction编写了一个示例:

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream import SourceFunction
from pyflink.java_gateway import get_gateway

class TaxiRideGenerator(SourceFunction):

    def __init__(self):
        java_src_class = get_gateway().jvm.org.apache.flink.training.exercises.common.sources.TaxiRideGenerator
        java_src_obj = java_src_class()
        super(TaxiRideGenerator, self).__init__(java_src_obj)

def show(ds, env):
    # this is just a little helper to show the output of the pipeline
    ds.print()
    env.execute()

def streaming():
    # arm the flink ExecutionEnvironment
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)
    taxi_src = TaxiRideGenerator()
    ds = env.add_source(taxi_src)
    show(ds, env)

if __name__ == "__main__":
    streaming()

init类中的第二行很难找到,我原以为会在第一行中得到一个对象。
在构建this project之后,您必须创建一个jar文件。我已经输入了路径,直到我看到文件夹“org”:

$ cd flink-training/flink-training/common/build/classes/java/main
flink-training/common/build/classes/java/main$ ls
flink-training/common/build/classes/java/main$ org
flink-training/common/build/classes/java/main$ jar cvf flink-training.jar org/apache/flink/training/exercises/common/**/*.class

将jar文件复制到pyflink/lib文件夹,通常是在你的python环境下,例如flinkenv/lib/python3.8/site-packages/pyflink/lib,然后启动脚本。

相关问题