使用spring云流的kafka绑定的消息生成器
@Component
public static class PageViewEventSource implements ApplicationRunner {
private final MessageChannel pageViewsOut;
private final Log log = LogFactory.getLog(getClass());
public PageViewEventSource(AnalyticsBinding binding) {
this.pageViewsOut = binding.pageViewsOut();
}
@Override
public void run(ApplicationArguments args) throws Exception {
List<String> names = Arrays.asList("priya", "dyser", "Ray", "Mark", "Oman", "Larry");
List<String> pages = Arrays.asList("blog", "facebook", "instagram", "news", "youtube", "about");
Runnable runnable = () -> {
String rPage = pages.get(new Random().nextInt(pages.size()));
String rName = pages.get(new Random().nextInt(names.size()));
PageViewEvent pageViewEvent = new PageViewEvent(rName, rPage, Math.random() > .5 ? 10 : 1000);
Serializer<PageViewEvent> serializer = new JsonSerde<>(PageViewEvent.class).serializer();
byte[] m = serializer.serialize(null, pageViewEvent);
Message<byte[]> message = MessageBuilder
.withPayload(m).build();
try {
this.pageViewsOut.send(message);
log.info("sent " + message);
} catch (Exception e) {
log.error(e);
}
};
Executors.newScheduledThreadPool(1).scheduleAtFixedRate(runnable, 1, 1, TimeUnit.SECONDS);
}
这种用法低于序列化
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.serdes$stringserde spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.serdes$bytesserde
我试图通过springkafka-kafkalistener在单独的消费者应用程序中使用这些消息
@Service
public class PriceEventConsumer {
private static final Logger LOG = LoggerFactory.getLogger(PriceEventConsumer.class);
@KafkaListener(topics = "test1" , groupId = "json", containerFactory = "kafkaListenerContainerFactory")
public void receive(Bytes data){
//public void receive(@Payload PageViewEvent data,@Headers MessageHeaders headers) {
LOG.info("Message received");
LOG.info("received data='{}'", data);
}
集装箱工厂配置
@Bean
public ConsumerFactory<String, Bytes> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
BytesDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "json");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Bytes>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Bytes> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
使用此配置,使用者不会接收消息(字节)。如果我将kafka listener更改为accept string,则会出现以下异常:
@KafkaListener(topics = "test1" , groupId = "json", containerFactory = "kafkaListenerContainerFactory")
public void receive(String data){
LOG.info("Message received");
LOG.info("received data='{}'", data);
}
原因:
org.springframework.messaging.converter.messageconversionexception:无法处理消息;嵌套的异常是org.springframework.messaging.converter.messageconversionexception:对于genericmessage[payload={“userid”:“facebook”,“page”:“about”,“duration”:10},headers={kafka\u offset=4213,kafka\u consumer=brave.kafka.clients,无法从[org.apache.kafka.common.utils.bytes]转换为[java.lang.string]。tracingconsumer@9a75f94,kafka\u timestamptype=create\u time,kafka\u receivedmessagekey=null,kafka\u receivedpartitionid=0,kafka\u receivedtopic=test1,kafka\u receivedtimestamp=1553007593670},failedmessage=genericmessage[payload={“userid”:“facebook”,“page”:“about”,“duration”:10},headers={kafka\u offset=4213,kafka\u consumer=brave.kafka.clients。tracingconsumer@9a75f94,Kafka\u timestamptype=create \u time,Kafka\u receivedmessagekey=null,Kafka\u receivedpartitionid=0,Kafka\u receivedtopic=test1,Kafka\u receivedtimestamp=1553007593670}]。。。23更多原因:org.springframework.messaging.converter.messageconversionexception:无法将genericmessage[payload={“userid”:“facebook”,“page”:“about”,“duration”:10},headers={kafka\u offset=4213,kafka\u consumer=brave.kafka.clients]从[org.apache.kafka.common.utils.bytes]转换为[java.lang.string]。tracingconsumer@9a75f94,kafka_timestamptype=create_time,kafka_receivedmessagekey=null,kafka_receivedpartitionid=0,kafka_receivedtopic=test1,kafka\u receivedtimestamp=1553007593670}]位于org.springframework.messaging.handler.annotation.support.payloadargumentresolver.resolveargument(payloadargumentresolver。java:144)~[spring-messaging-5.1.4.发布。jar:5.1.4.release]在org.springframework.messaging.handler.invocation.handlermethodargumentresolvercomposite.resolveargument(handlermethodargumentresolvercomposite)。java:117)~[spring-messaging-5.1.4.发布。jar:5.1.4.release]在org.springframework.messaging.handler.invocation.invocablehandlermethod.getmethodargumentvalues(invocablehandlermethod。java:147) ~[spring-messaging-5.1.4.发布。jar:5.1.4.release]位于org.springframework.messaging.handler.invocation.invocablehandlermethod.invoke(invocablehandlermethod)。java:116)~[spring-messaging-5.1.4.发布。jar:5.1.4.release]在org.springframework.kafka.listener.adapter.handleAdapter.invoke(handleAdapter。java:48) ~[spring-kafka-2.2.3.发布。jar:2.2.3.release]在org.springframework.kafka.listener.adapter.messagingmessagelisteneradapter.invokehandler(messagingmessagelisteneradapter。java:283)~[spring-kafka-2.2.3.发布。jar:2.2.3.release] ... 22个以上
任何提示都会很有帮助。
更新pojo部件
pojo零件--
@KafkaListener(topics = "test1" , groupId = "json", containerFactory = "kafkaListenerContainerFactory")
public void receive(@Payload PageViewEvent data,@Headers MessageHeaders headers) {
LOG.info("Message received");
LOG.info("received data='{}'", data);
}
集装箱工厂配置
@Bean
public ConsumerFactory<String,PageViewEvent > priceEventConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "json");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(PageViewEvent.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, PageViewEvent> priceEventsKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, PageViewEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(priceEventConsumerFactory());
return factory;
}
制作人-
@Override
public void run(ApplicationArguments args) throws Exception {
List<String> names = Arrays.asList("priya", "dyser", "Ray", "Mark", "Oman", "Larry");
List<String> pages = Arrays.asList("blog", "facebook", "instagram", "news", "youtube", "about");
Runnable runnable = () -> {
String rPage = pages.get(new Random().nextInt(pages.size()));
String rName = pages.get(new Random().nextInt(names.size()));
PageViewEvent pageViewEvent = new PageViewEvent(rName, rPage, Math.random() > .5 ? 10 : 1000);
Message<PageViewEvent> message = MessageBuilder
.withPayload(pageViewEvent).build();
try {
this.pageViewsOut.send(message);
log.info("sent " + message);
} catch (Exception e) {
log.error(e);
}
};
2条答案
按热度按时间kmb7vmvb1#
您可以将kfka中的记录反序列化为pojo,对于<2.2.x版本,请使用
MessageConverter
从版本2.2开始,您可以显式配置反序列化程序,通过使用一个具有布尔值的重载构造函数来使用提供的目标类型并忽略标头中的类型信息或者使用
MessageConverter
```@Bean
public ConcurrentKafkaListenerContainerFactory<String, Bytes> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Bytes> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setMessageConverter(new StringJsonMessageConverter());
return factory;
}
5ssjco0h2#
使用
JsonDeserializer
Package 的org.springframework.kafka.support.serializer
还要信任json反序列化程序包props.put(JsonDeserializer.TRUSTED_PACKAGES, <package-name>)