在Flink中使用正则表达式消费多个主题

jchrr9hc  于 2023-02-01  发布在  Apache
关注(0)|答案(1)|浏览(325)

我知道flink能够使用正则表达式来使用多个主题,在这里输入链接描述。
我有以下主题名称,例如

sclee-10343434
sclee-10342432
sclee-34234
sclee-3343423432424
....

在本例中,当我使用正则表达式sclee-[\\d+]将值设置为如下所示时,它会给我一个异常。
在我的例子中,正则表达式的情况是正确的吗?还有,Flink真的支持它吗?

val source = new FlinkKafkaConsumer[T](
  java.util.regex.Pattern.compile("sclee-[\\d+]"),
  deserializer,
  consumerProps
)

错误如下。

Caused by: java.lang.RuntimeException: Unable to retrieve any partitions with KafkaTopicsDescriptor: Topic Regex Pattern (dev-plexer-10507689[\d+])
    at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:153)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:553)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.lang.Thread.run(Thread.java:750)
rn0zuynd

rn0zuynd1#

我认为这里的问题是正则表达式不会匹配提供的主题。提供的正则表达式sclee-[\\d+]匹配sclee-后跟一个数字或+符号。
在这种情况下,您最可能需要:sclee-[\\d]+.

相关问题