here.olp:我如何“订阅通知”到目录?

fdbelqdn  于 2021-06-21  发布在  Flink
关注(0)|答案(3)|浏览(266)

我正试图订阅一个目录以便通知。但找不到符号“thenappy”。请帮忙。

// subscription to notifications
CompletionStage<NotificationSubscriptionControl> controlStage =
    queryApi.subscribeToNotifications(consumerSettings)
        .thenApply(subscription -> {
            subscription
                .notifications()
                .runWith(Sink.foreach(notification ->
                    // this callback is called each time a new batch publication happens in catalog
                    System.out.printf("catalog %s has a new version %d\n", catalogHrn, notification.getCatalogVersion())
                ), myMaterializer);
            return subscription.subscriptionControl();
        });

[错误]编译错误:[info]

xvw2m8pv

xvw2m8pv1#

------[错误]main.java:[41,25]找不到symbol symbol:方法thenapply((下标…;})位置:interface org.apache.flink.streaming.api.functions.source.sourcefunction[error]main.java:[44,65]包akka.stream.javadsl不存在[error]main.java:[47,40]找不到符号:变量mymaterializer

wi3ka0sx

wi3ka0sx2#

根据您的编译错误,您似乎正在尝试订阅flink应用程序中的通知。flink数据客户端queryapi返回一个sourcefunction,而不是completionstage。你可以这样使用它:

StreamExecutionEnvironment
    .getExecutionEnvironment()
    .addSource(
        query.subscribeToNotifications(
            new NotificationConsumerSettings(
                "my-notification-consumer-group-1"
            )
        )
    )
    .addSink(
        notification - > System.out.printf(
            "catalog %s has a new version %d\n",
            STREAMING_INPUT_CATALOG_HRN,
            notification
            .getCatalogVersion()
        )
    );
agyaoht7

agyaoht73#

如果subscribetonotifications是一个阻塞函数,那么可以将其 Package 在completablefuture的completedfuture方法中。

CompletionStage<NotificationSubscriptionControl> controlStage =
        CompletableFuture.completedFuture(queryApi.subscribeToNotifications(consumerSettings))
            .thenApply(
                subscription -> {
                  subscription
                      .notifications()
                      .runWith(
                          Sink.foreach(
                              notification ->
                                  // this callback is called each time a new batch publication
                                  // happens in catalog
                                  System.out.printf(
                                      "catalog %s has a new version %d\n",
                                      catalogHrn, notification.getCatalogVersion())),
                          myMaterializer);
                  return subscription.subscriptionControl();
                });

相关问题