我正在尝试显示另一个类的数据 Transaction
在我的 Application
班级。我创建了一个 Transaction
把它传给 sendUserNotification
方法,但当我对数据调用get方法时,它返回为null。 Application Class
```
public class Application {
private static final String TOPIC = "suspicious-transactions";
private static final String BOOTSTRAP_SERVERS = "localhost:9092,localhost:9093,localhost:9094";
public static void main(String[] args) {
Application kafkaConsumerApp = new Application();
String consumerGroup = "user-notification-service";
if (args.length == 1) {
consumerGroup = args[0];
}
System.out.println("Consumer is part of consumer group " + consumerGroup);
Consumer<String, Transaction> kafkaConsumer = kafkaConsumerApp.createKafkaConsumer(BOOTSTRAP_SERVERS, consumerGroup);
kafkaConsumerApp.consumeMessages(TOPIC, kafkaConsumer);
}
public static void consumeMessages(String topic, Consumer<String, Transaction> kafkaConsumer) {
kafkaConsumer.subscribe(Collections.singletonList(topic));
while (true) {
ConsumerRecords<String, Transaction> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
if (consumerRecords.isEmpty()) {
//do something
}
for (ConsumerRecord<String, Transaction> record : consumerRecords) {
Transaction transaction = new Transaction();
sendUserNotification(transaction);
System.out.println(String.format("Received record (key: %s, value %s, partition: %d, offset: %d",
record.key(), record.value(), record.partition(), record.offset()));
}
}
}
public static Consumer<String, Transaction> createKafkaConsumer(String bootstrapServers, String consumerGroups) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer .class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroups);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new KafkaConsumer<>(properties);
}
private static void sendUserNotification(Transaction transaction) {
// Print transaction information to the console
String h =transaction.getUser();
System.out.println(String.format("Sending user %s notification about a suspicious transaction of %f in their account originating in %s", h, transaction.getAmount(), transaction.getTransactionLocation()));
}
`Transaction Class`
public class Transaction {
private String user;
private double amount;
private String transactionLocation;
public String getUser() {
return user;
}
public double getAmount() {
return amount;
}
public String getTransactionLocation() {
return transactionLocation;
}
public void setUser(String user) {
this.user = user;
}
public void setAmount(double amount) {
this.amount = amount;
}
public void setTransactionLocation(String transactionLocation) {
this.transactionLocation = transactionLocation;
}
@Override
public String toString() {
return "Transaction{" +
"user='" + user + '\'' +
", amount=" + amount +
", transactionLocation='" + transactionLocation + '\'' +
'}';
}
/**
* Kafka Deserializer implementation.
* Deserializes a Transaction from JSON to a {@link Transaction} object
*/
public static class TransactionDeserializer implements Deserializer<Transaction> {
@Override
public Transaction deserialize(String topic, byte[] data) {
ObjectMapper mapper = new ObjectMapper();
Transaction transaction = null;
try {
transaction = mapper.readValue(data, Transaction.class);
} catch (Exception e) {
e.printStackTrace();
}
return transaction;
}
}
`Output`
Consumer is part of consumer group user-notification-service
Sending user null notification about a suspicious transaction of 0.000000 in their account originating in null
Received record (key: dkelly9283, value Transaction{user='dkelly9283', amount=1653.32, transactionLocation='China'}, partition: 0, offset: 12
Sending user null notification about a suspicious transaction of 0.000000 in their account originating in null
Received record (key: msmith2015, value Transaction{user='msmith2015', amount=50.43, transactionLocation='California'}, partition: 1, offset: 12
1条答案
按热度按时间aiqt4smr1#
您需要将值传递给用户字段。在java中,引用变量字段(在本例中为string user)默认初始化为null。可以使用构造函数或setuser()
创建新的事务示例后: