storm和spring4集成

mmvthczy  于 2021-06-21  发布在  Storm
关注(0)|答案(4)|浏览(349)

我有一个原型storm应用程序,它读取stomp流并将输出存储在hbase上。它可以工作,但不是很灵活,我正试图让它设置在一个更一致的方式与我们的其他应用程序,但没有多少运气弄清楚如何与风暴目前的工作方式。我们使用springjms类,但不是以标准spring方式使用它们,而是在运行时创建它们,并手动设置依赖项。
本项目:https://github.com/granthenke/storm-spring 看起来很有希望,但自从storm jar被带进apache孵化器并重新 Package 后,它已经有几年没有被碰过了,而且不能正常构建。
是我遗漏了什么,还是把这些东西整合起来不值得?

k4aesqcs

k4aesqcs1#

@zenbeni已经回答了这个问题,但是我想告诉你关于我的实现,很难像springbeans那样制作喷口/螺栓。但是要在喷口/螺栓中使用其他springbean,可以声明一个全局变量&在execute方法中检查whtether变量是否为null。如果为null,则必须从应用程序上下文获取bean。创建一个类,其中包含一个方法来初始化bean(如果尚未初始化)。查看applicationcontextaware接口以获取更多信息(springbean重用)。
示例代码:
螺栓等级:

public class Class1 implements IRichBolt{
    Class2 class2Object;

    public void prepare() {
        if (class2Object== null) {          
            class2Object= (Class2) Util
                .initializeContext("class2");
        }
    }
}

用于初始化bean(如果尚未初始化)的util类:

public class Util{
    public static Object initializeContext(String beanName) {
        Object bean = null;
        try {
            synchronized (Util.class) {
                if (ApplicationContextUtil.getAppContext() == null) {
                    ApplicationContext appContext = new ClassPathXmlApplicationContext("beans.xml");
                    bean = ApplicationContextUtil.getAppContext().getBean(
                        beanName);
                } else {
                    bean = ApplicationContextUtil.getAppContext().getBean(
                        beanName);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

        return bean;
    }
}

应用程序上下文更改的侦听器:

@Component
public class ApplicationContextUtil implements ApplicationContextAware {

    private static ApplicationContext appContext;

    public void setApplicationContext(ApplicationContext applicationContext)
        throws BeansException {
        appContext = applicationContext;
    }

    public static ApplicationContext getAppContext() {
        return appContext;
    }
}

注意:每个worker将初始化spring上下文,因为它在不同的jvm中运行。
更新
如果你想使用一个Springbean类,在这个类中你有一些先前分配的值,试试这个,
注意:将当前类传递给bolt的构造函数
已包含值的类(拓扑创建类):

public class StormTopologyClass implements ITopologyBuilder, Serializable {
    public Map<String, String> attributes = new HashMap<String, String>();

    TopologyBuilder builder=new TopologyBuilder();
    builder.setBolt("Class1",new Class1(this));
    builder.createTopology();
}

使用单参数构造函数进行螺栓连接:

public class Class1 implements IRichBolt{
    StormTopologyClass topology;

    public Class1 (StormTopologyClass topology) {
        this.topology = topology;
    }
}

现在可以使用attributes变量&它在bolt类中的值。

inkz8wg9

inkz8wg92#

事实上,storm spring似乎是您正在寻找的,但是它没有更新,并且有一些限制(例如不能定义螺栓/喷口上的任务等)。也许你应该自己整合?
不要忘记你的目标:一个有很多工人的集群。当您将使用StormAPI(例如重新平衡)在一个或多个worker上部署拓扑时,spring的行为如何?这是否意味着它必须在启动时在worker jvm上示例化一个新的spring上下文,然后storm才能部署目标bolt/spouts并定义执行器?
如果您在spring配置中只定义storm组件,那么它应该可以工作(拓扑的启动配置然后storm只管理对象),但是如果您依赖spring来管理其他组件(springjms似乎是这样),那么它可能会在拓扑重新平衡上变得混乱,例如(每个worker/jvm的单例?或者整个拓扑结构。
由您来决定是否值得麻烦,我对spring配置的担心是您很容易忘记storm拓扑(它似乎是一个jvm,但可以是更多)。就我个人而言,我为每个类装入器定义了自己的单例(例如static final,如果需要延迟的示例,则使用双重检查锁定),因为它不会隐藏(中-高)复杂性。

5lwkijsr

5lwkijsr3#

也许这个教程可以帮助你。
http://spring.io/guides/gs/messaging-stomp-websocket/

pcrecxhr

pcrecxhr4#

我意识到这是事后诸葛亮,但是您想过使用apachecamel来处理jms连接吗?camel不是ioc或di,但它确实为企业集成模式建模。也许这就是你要找的?
尼克。

相关问题