我是PyFlink的新手,我已经用Java做过正式的培训练习:https://github.com/apache/flink-training
但是,我正在做的项目必须使用Python作为编程语言。我想知道是否可以使用“SourceFunction”编写一个数据生成器。在旧的PyFlink版本中,使用Jython可以做到这一点:https://nightlies.apache.org/flink/flink-docs-release-1.7/dev/stream/python.html#streaming-program-example
在更新的例子中, Dataframe 包含一个有限的数据集,它从来没有被扩展过。我没有在PyFlink中找到任何数据生成器的例子,例如https://github.com/apache/flink-training/blob/master/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiRideGenerator.java
我不确定接口Source和SinkFunction提供了哪些功能。它可以在python中使用吗?或者只能与其他管道或jar文件结合使用吗?看起来方法“run()”和“cancel()”没有实现,因此它不能像其他类那样通过重载来使用。
如果它不能用在Python中,有没有其他的方法来使用它?有人可能会提供一个简单的例子。
如果不可能使用它,有没有其他方法可以用OOP风格编写数据生成器?举个例子:https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/datastream_tutorial/这里split()方法被用来分离流。基本上,我想通过一个额外的类来实现这一点,并且只是扩展流,这在Java TaxiRide示例中是通过“ctx.collect()"完成的。我正在努力避免使用Java,另一个管道框架,以及Jython。如果能得到一个简短的示例代码就好了,但是我很感激任何提示和建议。
我试图直接使用SourceFunction,但正如已经提到的,我认为这是一种完全错误的方式,导致了一个错误:AttributeError: 'DataGenerator' object has no attribute '_get_object_id'
class DataGenerator(SourceFunction):
def __init__(self):
super().__init__(self)
self._num_iters = 1000
self._running = True
def run(self, ctx):
counter = 0
while self._running and counter < self._num_iters:
ctx.collect('Hello World')
counter += 1
def cancel(self):
self._running = False
1条答案
按热度按时间jpfvwuh41#
**解决方案:**在查找了一些使用类Source和SinkFunction的旧代码之后,我找到了一个解决方案。这里使用了一个用Java编写的Kafka连接器。python代码可以作为如何使用pyflink的Source和SinkFuntion的示例。
我只为SourceFunction编写了一个示例:
init类中的第二行很难找到,我原以为会在第一行中得到一个对象。
在构建this project之后,您必须创建一个jar文件。我已经输入了路径,直到我看到文件夹“org”:
将jar文件复制到pyflink/lib文件夹,通常是在你的python环境下,例如flinkenv/lib/python3.8/site-packages/pyflink/lib,然后启动脚本。