我使用了apachebahir的amqsource连接器来侦听activemq,但是当我运行flink作业来使用activemq中的数据时,不会生成任何输出。
例如,连接器正在侦听包含4条消息的activemq,但是当我运行flink作业时,没有数据被占用。
val brokerURL = "tcp://localhost:61616"
val destinationName = "TEST.FOO"
val filePath = "C:\\output" + System.currentTimeMillis + ".csv"
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStateBackend(new MemoryStateBackend(1000, false))
val config = new AMQSourceConfig.AMQSourceConfigBuilder[String]()
.setConnectionFactory(new ActiveMQConnectionFactory(brokerURL))
.setDestinationName(destinationName)
.setDeserializationSchema(new SimpleStringSchema)
.setDestinationType(DestinationType.QUEUE)
.setRunningChecker(new RunningChecker).build
val amqSource = new AMQSource[String](config)
val stream = env.addSource(amqSource)
stream.map(/*Some MapFunction*/)
stream.writeAsText(filePath)
stream.print
env.execute
1条答案
按热度按时间rn0zuynd1#
amqsource要求消息为字节,请参阅amqsource.class下run方法的代码:
向activemq生成数据而不是文本消息时:
使用字节消息: