非字符串kafka消息头总是空的。已正确提取类型为string datatype的消息头。
package com.test.KafkaSimpleProject;
import org.apache.camel.CamelContext;
import org.apache.camel.Message;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
/**
* Hello world!
*
*/
public class App2 {
public static void main(String[] args) throws Exception {
CamelContext context = new DefaultCamelContext();
try {
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
//producer route
from("timer:foo?period=3000&repeatCount=1000&delay=3000")
.setBody()
.constant("This is kafka message")
.setHeader("name")
.simple("ApacheCamelkafka", String.class)
.setHeader("contact")
.simple("123123", Integer.class)
.setHeader("trueFalse")
.simple("false", Boolean.class)
.to("kafka:test?brokers=localhost:9092");
//Consumer route
from("kafka:test?brokers=localhost:9092")
.process(new org.apache.camel.Processor() {
public void process(org.apache.camel.Exchange exchange)
throws Exception {
Message in = exchange.getIn();
String name = in .getHeader("name", String.class);
Integer contact = in .getHeader("contact", Integer.class);
Boolean trueFalse = in .getHeader("trueFalse", Boolean.class);
System.out.println("Name:" + name);
System.out.println("contact:" + contact);
System.out.println("trueFalse:" + trueFalse);
}
});
}
});
context.start();
Thread.sleep(20000000);
} finally {
context.stop();
}
}
}
预期产量:
姓名:阿帕切Kafka
联系人:123123
真假:假
实际输出:
姓名:阿帕切Kafka
联系人:空
truefalse:空
暂无答案!
目前还没有任何答案,快来回答吧!