基于条件的Apache Flink Kafka流处理

igetnqfo  于 2022-12-09  发布在  Apache
关注(0)|答案(1)|浏览(114)

我正在使用apache-flink构建一个 Package 器库,我在其中侦听(使用)多个主题,并且我有一组应用程序希望处理来自这些主题的消息。
示例:我有10个应用程序app1,app2,app3 ... app10(每个都是同一个内部项目的java库部分,即所有10个jar都是同一个. war文件的一部分),其中只有5个应用程序应该使用到达使用者组的消息。我可以在过滤器函数的帮助下对5个应用程序进行过滤。
挑战在于strStream.process(executionServiceInterface)函数,其中app1提供ExceucionServiceInterface的实现类ExecutionServiceApp1Impl,类似的app2提供ExecutionServiceApp2Impl
当有多个实现可用时,Spring希望我们提供@Qualifier注解,或者必须在实现(ExecutionServiceApp1Impl、ExecutionServiceApp2Impl)上标记@Primary
但是我并不想这么做,因为我构建了一个通用的 Package 器库,它应该支持任何这样的应用程序(app1,app2等),并且所有这些应用程序都应该能够实现自己的实现逻辑(ExecutionServiceApp1Impl,ExecutionServiceApp2Impl)。
有人能帮我吗?怎么解决这个问题?
下面是代码供参考。
第一个
非常感谢!

6tdlim6h

6tdlim6h1#

使用反映解决了这个问题,下面是解决这个问题的程式码。
注意:这要求我知道完全限定类名和方法名的列表沿着它们的参数。

@Component
public class SampleJobExecutor extends ProcessFunction<String, String> {

@Autowired
MyAppProperties myAppProperties;

    @Override
    public void processElement(String inputMessage, ProcessFunction<String, String>.Context context,
            Collector<String> collector) throws Exception {
        
        String className = null;
        String methodName = null;
        try {   
            
            Map<String, List<String>> map = myAppProperties.getMapOfImplementors();
            
            JSONObject json = new JSONObject(inputMessage);
            if (json != null && json.has("appName")) {
                className = map.get(json.getString("appName")).get(0);
                methodName = map.get(json.getString("appName")).get(1);
            }
            
            
            Class<?> forName = Class.forName(className);
            Object job = forName.newInstance();
            Method method = forName.getDeclaredMethod(methodName, String.class);
            method.invoke(job , inputMessage);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

相关问题