我正在使用java 21和Sping Boot 3.2.0(SNAPSHOT)。我想使用从KafkaProducer到KafkaConsumer发送Map<Short,List>。
KafkaConsumerConfig类:
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServer;
public Map<String, Object> consumerConfig(){
Map<String,Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return properties;
}
@Bean
public ConsumerFactory<String,Map<Short, List<Customer>>> consumerFactory(){
return new DefaultKafkaConsumerFactory<>(consumerConfig());
}
public KafkaListenerContainerFactory<
ConcurrentMessageListenerContainer<String,Map<Short, List<Customer>>>> kafkaListenerContainerFactory(ConsumerFactory<String,Map<Short, List<Customer>>> consumerFactory){
ConcurrentKafkaListenerContainerFactory<String,Map<Short, List<Customer>>> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
return factory;
}
}
字符串
这是来自KafkaConsumer的Customer类:
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.annotation.Nullable;
import jakarta.persistence.*;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.Size;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import java.util.List;
import java.util.Objects;
@Data
@NoArgsConstructor
@Entity
@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY)
public class Customer {
@JsonProperty("id")
@Nullable
@Id
@SequenceGenerator(
name = "customer_id_sequence",
sequenceName = "customer_id_sequence"
)
@GeneratedValue(
strategy = GenerationType.SEQUENCE,
generator = "customer_id_sequence"
)
private Long ID;
@JsonProperty("userName")
@Column(unique = false)
@Size(min = 5, max = 50, message = "User must be between 5 and 50 characters")
private String userName;
@JsonProperty("email")
@NonNull
@NotBlank(message = "The email can not be empty")
@Size(min = 12, max = 50, message = "Email must be between 12 and 50 characters")
private String email;
@JsonProperty("message")
private List<String> message;
public Customer(Long ID,
String userName,
String email,
List<String> message) {
this.ID = ID;
this.userName = userName;
this.email = email;
this.message = message;
}
public Customer(String userName,
String email,
List<String> message) {
this.userName = userName;
this.email = email;
this.message = message;
}
public Long getID() {
return ID;
}
public void setID(Long ID) {
this.ID = ID;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public List<String> getMessage() {
return message;
}
public void setMessage(List<String> message) {
this.message = message;
}
public String getEmail() {
return email;
}
public void setEmail(String email) {
this.email = email;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Customer customer = (Customer) o;
return Objects.equals(ID, customer.ID) && Objects.equals(userName, customer.userName) && Objects.equals(email, customer.email) && Objects.equals(message, customer.message);
}
@Override
public int hashCode() {
return Objects.hash(ID, userName, email, message);
}
}
型
这是KafkaProducerConfig类:
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServer;
@Bean
public Map<String, Object> producerConfig() {
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return properties;
}
@Bean
public ProducerFactory<String, Map<Short, List<Customer>>> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfig());
}
@Bean
public KafkaTemplate<String, Map<Short, List<Customer>>> kafkaTemplate(
ProducerFactory<String, Map<Short, List<Customer>>> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
型
这是来自KafkaProducer的Customer类:
import jakarta.annotation.Nullable;
import jakarta.persistence.*;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.Size;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import java.util.List;
import java.util.Objects;
@Data
@NoArgsConstructor
@Entity
public class Customer {
@Nullable
@Id
@SequenceGenerator(
name = "customer_id_sequence",
sequenceName = "customer_id_sequence"
)
@GeneratedValue(
strategy = GenerationType.SEQUENCE,
generator = "customer_id_sequence"
)
private Long ID;
@Column(unique = false)
@Size(min = 5, max = 50, message = "User must be between 5 and 50 characters")
private String userName;
@NonNull
@NotBlank(message = "The email can not be empty")
@Size(min = 12, max = 50, message = "Email must be between 12 and 50 characters")
private String email;
private List<String> message;
public Customer(Long ID,
String userName,
String email,
List<String> message) {
this.ID = ID;
this.userName = userName;
this.email = email;
this.message = message;
}
public Customer(String userName,
String email,
List<String> message) {
this.userName = userName;
this.email = email;
this.message = message;
}
public Long getID() {
return ID;
}
public void setID(Long ID) {
this.ID = ID;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public List<String> getMessage() {
return message;
}
public void setMessage(List<String> message) {
this.message = message;
}
public String getEmail() {
return email;
}
public void setEmail(String email) {
this.email = email;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Customer customer = (Customer) o;
return Objects.equals(ID, customer.ID) && Objects.equals(userName, customer.userName) && Objects.equals(email, customer.email) && Objects.equals(message, customer.message);
}
@Override
public int hashCode() {
return Objects.hash(ID, userName, email, message);
}
}
型
这就是我从生产者到消费者的方式:
@Component
public class SendToDatabse {
private final KafkaTemplate<String, Map<Short, List<Customer>>> kafkaTemplate;
public SendToDatabse(KafkaTemplate<String, Map<Short, List<Customer>>> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendToDatabse(Map<Short, List<Customer>> map){
this.kafkaTemplate.send("databaseTopic",map);
}
}
型
这就是Kafka式:
@KafkaListener(topics = "databaseTopic", groupId = "groupID", containerFactory = "kafkaListenerContainerFactory")
void listener(Map<Short, List<Customer>> map){
map.forEach((k,v)->{
System.out.print(k + "\t");
v.forEach(System.out::println);
});
}
型
错误是:
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [void com.example.DatabseMicroservice.listener.Listener.listener(java.util.Map<java.lang.Short, java.util.List<com.example.DatabseMicroservice.model.Customer>>)]
Bean [com.example.DatabseMicroservice.listener.Listener@62dfe152]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2902) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2847) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2814) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$56(KafkaMessageListenerContainer.java:2732) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
at io.micrometer.observation.Observation.observe(Observation.java:565) ~[micrometer-observation-1.12.0-RC1.jar:1.12.0-RC1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2730) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2582) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2468) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2110) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1465) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1429) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1304) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]
Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.checkAckArg(MessagingMessageListenerAdapter.java:402) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:380) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2833) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:380) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2833) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
... 12 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [java.util.Map] for GenericMessage [payload={"200":[{"userName":"Bob Johnson","email":"[email protected]","message":["random messag"],....{"userName":"Peter Hall","email":"[email protected]","message":["messag"],"id":18},{"userName":"Noah Taylor","email":"[email protected]","message":["another random message","and another on"],"id":16}]}, headers={__ContentTypeId__=[B@3fddd565, kafka_offset=5, __KeyTypeId__=[B@1577f842, kafka_consumer=org.springframework.kafka.core.DefaultKafkaConsumerFactory$ExtendedKafkaConsumer@49c36e71, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=databaseTopic, kafka_receivedTimestamp=1699028747669, __TypeId__=[B@894c52a, kafka_groupId=groupID}]
at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:151) ~[spring-messaging-6.1.0-RC2.jar:6.1.0-RC2]
at org.springframework.kafka.annotation.KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaNullAwarePayloadArgumentResolver.java:46) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:118) ~[spring-messaging-6.1.0-RC2.jar:6.1.0-RC2]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:147) ~[spring-messaging-6.1.0-RC2.jar:6.1.0-RC2]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:115) ~[spring-messaging-6.1.0-RC2.jar:6.1.0-RC2]
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:375) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
... 15 common frames omitted
型
我不知道在这种情况下我该怎么办,因为代码对我来说似乎是正确的。
2条答案
按热度按时间gr8qqesn1#
参见
@KafkaListener
Javadocs:字符串
因此,您的配置确实使用了自动配置的
kafkaListenerContainerFactory
,它实际上是基于普通字符串转换器。由于JSON工厂bean的名称为
factory
,因此请考虑为@KafkaListener(containerFactory)
属性指定确切的名称。qvk1mo1f2#
我设法解决了这样的问题:
字符串
但感觉有些不对劲,更像是一种幻觉。