我有一个应用程序,它执行大量的数据处理(一次大约130万次),这些数据是突发的。应用程序使用来自Kafka主题的数据。
我正在使用SpringCloudStreamStarterKafka的2.0.1版本来使用数据。
我的代码如下:
侦听器:
@Service
public class ListenerService {
@Autowired
private Application2<Foo> application;
@Override
@StreamListener(FooStreams.INPUT)
public void subscribe(@Payload Foo foo) {
application.sync(foo);
}
}
流:
public interface FooStreams {
String INPUT = "Foo";
@Input(value = INPUT)
SubscribableChannel subscribe();
}
在主应用程序中,我将流绑定到kafka,如下所示:
@SpringBootApplication
@EnableBinding({FooStreams.class})
public class Application {
private static final Logger logger = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
try {
SpringApplication.run(Application.class, args);
}
catch (Exception e) {
logger.error("Application failed to start");
}
}
}
有什么我不知道的吗?问题是,我可以看到内存利用率在数据处理期间会急剧上升,而在处理完成后不会下降。
暂无答案!
目前还没有任何答案,快来回答吧!