我有几个动态Kafka消费者(基于部门id等…),你可以找到下面的代码。
基本上,我想记录每个 onMessage()
方法调用,因此我创建了 @LogExecutionTime
方法级自定义注解,并将其添加到 onMessage()
方法。但是我的 logExecutionTime()
的 LogExecutionTimeAspect
即使我的朋友 onMessage()
每当主题上有消息时就会调用,其他一切都正常。
你能帮我弄清楚我遗漏了什么吗 LogExecutionTimeAspect
上课让它开始工作?
日志执行时间:
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface LogExecutionTime {
}
logexecutiontimeaspect类:
@Aspect
@Component
public class LogExecutionTimeAspect {
@Around("within(com.myproject..*) && @annotation(LogExecutionTime)")
public Object logExecutionTime(ProceedingJoinPoint joinPoint) throws Throwable {
long startTime = System.currentTimeMillis();
Object object = joinPoint.proceed();
long endTime = System.currentTimeMillis();
System.out.println(" Time taken by Listener ::"+(endTime-startTime)+"ms");
return object;
}
}
departmentsmessageconsumer类:
@Component
public class DepartmentsMessageConsumer implements MessageListener {
@Value(value = "${spring.kafka.bootstrap-servers}" )
private String bootstrapAddress;
@PostConstruct
public void init() {
Map<String, Object> consumerProperties = new HashMap<>();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "DEPT_ID_HERE");
ContainerProperties containerProperties =
new ContainerProperties("com.myproj.depts.topic");
containerProperties.setMessageListener(this);
DefaultKafkaConsumerFactory<String, Greeting> consumerFactory =
new DefaultKafkaConsumerFactory<>(consumerProperties,
new StringDeserializer(),
new JsonDeserializer<>(Department.class));
ConcurrentMessageListenerContainer container =
new ConcurrentMessageListenerContainer<>(consumerFactory,
containerProperties);
container.start();
}
@Override
@LogExecutionTime
public void onMessage(Object message) {
ConsumerRecord record = (ConsumerRecord) message;
Department department = (Department)record.value();
System.out.println(" department :: "+department);
}
}
applicationlauncher类:
@SpringBootApplication
@EnableKafka
@EnableAspectJAutoProxy
@ComponentScan(basePackages = { "com.myproject" })
public class ApplicationLauncher extends SpringBootServletInitializer {
public static void main(String[] args) {
SpringApplication.run(ApplicationLauncher.class, args);
}
}
编辑:
我试过了 @EnableAspectJAutoProxy(exposeProxy=true)
,但不起作用。
1条答案
按热度按时间dvtswwa31#
您应该考虑在
@EnableAspectJAutoProxy
:另一方面,有这样的东西,比aop更好:
更新
@EnableAspectJAutoProxy(exposeProxy=true)
不起作用,我知道我可以使用拦截器,但我想让它与aop一起工作。那我建议你考虑分开
DepartmentsMessageConsumer
以及ConcurrentMessageListenerContainer
. 我的意思是移动那个ConcurrentMessageListenerContainer
进入单独的@Configuration
班级。这个ApplicationLauncher
他是个很好的候选人。让它成为一个@Bean
依靠你的DepartmentsMessageConsumer
注射用。关键是你需要给一个aop一个机会来测试你的DepartmentsMessageConsumer
,但在@PostConstruct
,现在从Kafka开始示例化和消费还为时过早。