flink asyncdatastream如何传递和使用配置参数

ss2ws0br  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(448)

在main()函数中有以下代码:

DataStream<OutputObject> asyncResultStream = AsyncDataStream.orderedWait(
            listOfData,
            new CustomAsyncConnector(),
            5,
            TimeUnit.SECONDS,
            10).setParallelism(3).startNewChain().uid("customUid");

这是在1.2中使用AsyncDataStream的简单格式。customasyncconnector中的代码与您将在其核心找到的每个示例一样:

public class CustomAsyncConnector extends RichAsyncFunction<CustomObject, ResultObject> {

private transient Session client;

@Override
public void open(Configuration parameters) throws Exception {
    client = Cluster.builder().addContactPoint("<CustomUrl>")
            .withPort(1234)
            .build()
            .connect("<thisKeyspace>");
}
@Override
public void close() throws Exception {
    client.close();
}
@Override
public void asyncInvoke(final CustomObject ctsa, final AsyncCollector<ResultObject> asyncCollector) throws Exception {

    //Custom code here...

}

}
下面是我的问题:1)在customasyncconnector()中向open()函数传递“参数”的正确方法是什么?在main()函数中从哪里调用它。2.)在open()函数中,应该如何使用参数来设置与客户端的连接?
我猜第一个问题是在main中创建一个新的customasyncconnector()对象示例,然后直接调用open()函数并将parameters对象传递给它,然后将该示例放入asysdatastream的代码中。但是,我不确定这是否是设置配置类型对象中字段的最佳方法,更重要的是,是否是正确的方法(同样,假设执行“configparameters.setstring(“contactpointurl”,“127.0.0.1”)”是正确的,但我不确定)。这就引出了我的第二个,也是最重要的问题。
关于我的第二个问题,我想传递给open()函数的参数是contactpointurl、portnumber和要放入.connect()的键空间。但是,我似乎无法通过执行“.addcontactpoint(parameters.getstring(“contactpointurl”)”之类的操作来访问它们。我还试着看看是否应该或应该在哪里执行cluster.builder().getconfiguration(参数),但我在暗中猜测它甚至属于什么地方,或者根本不知道参数名是否必须是特定的,等等。
所以我希望我没有说得太差,但任何和所有的帮助将不胜感激。
提前谢谢!

zf9nrax1

zf9nrax11#

这就是最终的结果。仍然不知道如何将配置参数传递给.open()方法,但是很好。
已将此添加到customasyncconnector类:

private final CustomProps props;

public CustomAsyncConnector(CustomProps props) {
    super();
    this.props = props;
}

以及我在main()方法中传递的内容:

AsyncDataStream
                .unorderedWait(
                        dataToProcess,
                        new CustomAsyncConnector(props),
                        5,
                        TimeUnit.SECONDS,
                        10);

并使用了.open()方法中的道具,就像我想使用参数一样。

相关问题