flink作业不生成输出

clj7thdc  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(502)

我使用了apachebahir的amqsource连接器来侦听activemq,但是当我运行flink作业来使用activemq中的数据时,不会生成任何输出。
例如,连接器正在侦听包含4条消息的activemq,但是当我运行flink作业时,没有数据被占用。

val brokerURL = "tcp://localhost:61616"
val destinationName = "TEST.FOO"
val filePath = "C:\\output" + System.currentTimeMillis + ".csv"

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStateBackend(new MemoryStateBackend(1000, false))

val config = new AMQSourceConfig.AMQSourceConfigBuilder[String]()
  .setConnectionFactory(new ActiveMQConnectionFactory(brokerURL))
  .setDestinationName(destinationName)
  .setDeserializationSchema(new SimpleStringSchema)
  .setDestinationType(DestinationType.QUEUE)
  .setRunningChecker(new RunningChecker).build
val amqSource = new AMQSource[String](config)

val stream = env.addSource(amqSource)

stream.map(/*Some MapFunction*/)

stream.writeAsText(filePath)

stream.print

env.execute
rn0zuynd

rn0zuynd1#

amqsource要求消息为字节,请参阅amqsource.class下run方法的代码:

Message message = this.consumer.receive(1000L);
if (!(**message instanceof BytesMessage**)) {
LOG.warn("Active MQ source received non bytes message: {}", message);
return;
}

向activemq生成数据而不是文本消息时:

val message = session.createTextMessage(text)

使用字节消息:

val message = session.createBytesMessage()
message.writeBytes(text.getBytes)

相关问题