siddhi事件表不能与apachestorm正常工作

ygya80vv  于 2021-06-21  发布在  Storm
关注(0)|答案(2)|浏览(427)

我已经尝试在我的执行计划中使用事件表,并尝试在带有apachestorm的分布式模式下将wso2 cepv4.1.0中的表与输入流连接起来。
这是我的siddhi查询。

@Plan:name('ExecutionPlan')

@Import('InputStream:1.0.0')
define stream InputStream (id string, param1 int, param2 double, param3 string, param4 string, param5 string, param6 string, param7 string);

@Export('outputStream:1.0.0')
define stream OutputStream (id string, param3 string);

@From(eventtable = 'rdbms' , datasource.name = 'MYSQL' , table.name = 'cep') 
define table cepTable (id string, param1 int, param2 double, param3 string, param4 string, param5 string, param6 string, param7 string);

@name('query1') 
@dist(parallel='2', execGroup='Filtering')
partition with ( id of InputStream )
begin
  from InputStream join cepTable 
  on cepTable.id == InputStream.id 
  select InputStream.id as id, InputStream.param3 as param3  
  insert into OutputStream;
end;

但它提供了以下例外情况

TID: [-1234] [] [2016-05-13 14:19:11,847] ERROR {org.wso2.carbon.event.processor.admin.EventProcessorAdminService} -  Error while initialising the connection, null 
org.wso2.carbon.event.processor.core.exception.ExecutionPlanConfigurationException: Error while initialising the connection, null
    at org.wso2.carbon.event.processor.core.EventProcessorDeployer.processDeploy(EventProcessorDeployer.java:154)
    at org.wso2.carbon.event.processor.core.EventProcessorDeployer.executeManualDeployment(EventProcessorDeployer.java:178)
    at org.wso2.carbon.event.processor.core.internal.util.EventProcessorConfigurationFilesystemInvoker.save(EventProcessorConfigurationFilesystemInvoker.java:95)
    at org.wso2.carbon.event.processor.core.internal.CarbonEventProcessorService.editInactiveExecutionPlan(CarbonEventProcessorService.java:181)
    at org.wso2.carbon.event.processor.admin.EventProcessorAdminService.editInactiveExecutionPlan(EventProcessorAdminService.java:108)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.axis2.rpc.receivers.RPCUtil.invokeServiceClass(RPCUtil.java:212)
    at org.apache.axis2.rpc.receivers.RPCMessageReceiver.invokeBusinessLogic(RPCMessageReceiver.java:117)
    at org.apache.axis2.receivers.AbstractInOutMessageReceiver.invokeBusinessLogic(AbstractInOutMessageReceiver.java:40)
    at org.apache.axis2.receivers.AbstractMessageReceiver.receive(AbstractMessageReceiver.java:110)
    at org.apache.axis2.engine.AxisEngine.receive(AxisEngine.java:180)
    at org.apache.axis2.transport.local.LocalTransportReceiver.processMessage(LocalTransportReceiver.java:169)
    at org.apache.axis2.transport.local.LocalTransportReceiver.processMessage(LocalTransportReceiver.java:82)
    at org.wso2.carbon.core.transports.local.CarbonLocalTransportSender.finalizeSendWithToAddress(CarbonLocalTransportSender.java:45)
    at org.apache.axis2.transport.local.LocalTransportSender.invoke(LocalTransportSender.java:77)
    at org.apache.axis2.engine.AxisEngine.send(AxisEngine.java:442)
    at org.apache.axis2.description.OutInAxisOperationClient.send(OutInAxisOperation.java:430)
    at org.apache.axis2.description.OutInAxisOperationClient.executeImpl(OutInAxisOperation.java:225)
    at org.apache.axis2.client.OperationClient.execute(OperationClient.java:149)
    at org.wso2.carbon.event.processor.stub.EventProcessorAdminServiceStub.editInactiveExecutionPlan(EventProcessorAdminServiceStub.java:2473)
    at org.apache.jsp.eventprocessor.edit_005fexecution_005fplan_005fajaxprocessor_jsp._jspService(edit_005fexecution_005fplan_005fajaxprocessor_jsp.java:84)
    at org.apache.jasper.runtime.HttpJspBase.service(HttpJspBase.java:70)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:727)
    at org.apache.jasper.servlet.JspServletWrapper.service(JspServletWrapper.java:432)
    at org.apache.jasper.servlet.JspServlet.serviceJspFile(JspServlet.java:395)
    at org.apache.jasper.servlet.JspServlet.service(JspServlet.java:339)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:727)
    at org.wso2.carbon.ui.JspServlet.service(JspServlet.java:155)
    at org.wso2.carbon.ui.TilesJspServlet.service(TilesJspServlet.java:80)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:727)
    at org.eclipse.equinox.http.helper.ContextPathServletAdaptor.service(ContextPathServletAdaptor.java:37)
    at org.eclipse.equinox.http.servlet.internal.ServletRegistration.service(ServletRegistration.java:61)
    at org.eclipse.equinox.http.servlet.internal.ProxyServlet.processAlias(ProxyServlet.java:128)
    at org.eclipse.equinox.http.servlet.internal.ProxyServlet.service(ProxyServlet.java:68)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:727)
    at org.wso2.carbon.tomcat.ext.servlet.DelegationServlet.service(DelegationServlet.java:68)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:303)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
    at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
    at org.wso2.carbon.ui.filters.CSRFPreventionFilter.doFilter(CSRFPreventionFilter.java:88)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
    at org.wso2.carbon.ui.filters.CRLFPreventionFilter.doFilter(CRLFPreventionFilter.java:59)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
    at org.wso2.carbon.tomcat.ext.filter.CharacterSetFilter.doFilter(CharacterSetFilter.java:61)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:220)
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:122)
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:504)
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:170)
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:103)
    at org.wso2.carbon.tomcat.ext.valves.CompositeValve.continueInvocation(CompositeValve.java:99)
    at org.wso2.carbon.tomcat.ext.valves.CarbonTomcatValve$1.invoke(CarbonTomcatValve.java:47)
    at org.wso2.carbon.webapp.mgt.TenantLazyLoaderValve.invoke(TenantLazyLoaderValve.java:57)
    at org.wso2.carbon.event.receiver.core.internal.tenantmgt.TenantLazyLoaderValve.invoke(TenantLazyLoaderValve.java:48)
    at org.wso2.carbon.tomcat.ext.valves.TomcatValveContainer.invokeValves(TomcatValveContainer.java:47)
    at org.wso2.carbon.tomcat.ext.valves.CompositeValve.invoke(CompositeValve.java:62)
    at org.wso2.carbon.tomcat.ext.valves.CarbonStuckThreadDetectionValve.invoke(CarbonStuckThreadDetectionValve.java:159)
    at org.apache.catalina.valves.AccessLogValve.invoke(AccessLogValve.java:950)
    at org.wso2.carbon.tomcat.ext.valves.CarbonContextCreatorValve.invoke(CarbonContextCreatorValve.java:57)
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:116)
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:421)
    at org.apache.coyote.http11.AbstractHttp11Processor.process(AbstractHttp11Processor.java:1074)
    at org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:611)
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1739)
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.run(NioEndpoint.java:1698)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.wso2.siddhi.core.exception.ExecutionPlanRuntimeException: Error while initialising the connection, null
    at org.wso2.siddhi.extension.eventtable.rdbms.DBHandler.<init>(DBHandler.java:79)
    at org.wso2.siddhi.extension.eventtable.RDBMSEventTable.init(RDBMSEventTable.java:119)
    at org.wso2.siddhi.core.util.parser.helper.DefinitionParserHelper.addEventTable(DefinitionParserHelper.java:99)
    at org.wso2.siddhi.core.util.ExecutionPlanRuntimeBuilder.defineTable(ExecutionPlanRuntimeBuilder.java:74)
    at org.wso2.siddhi.core.util.parser.ExecutionPlanParser.defineTableDefinitions(ExecutionPlanParser.java:194)
    at org.wso2.siddhi.core.util.parser.ExecutionPlanParser.parse(ExecutionPlanParser.java:140)
    at org.wso2.siddhi.core.SiddhiManager.createExecutionPlanRuntime(SiddhiManager.java:53)
    at org.wso2.siddhi.core.SiddhiManager.createExecutionPlanRuntime(SiddhiManager.java:61)
    at org.wso2.carbon.event.processor.common.storm.component.SiddhiBolt.init(SiddhiBolt.java:104)
    at org.wso2.carbon.event.processor.common.storm.component.SiddhiBolt.<init>(SiddhiBolt.java:86)
    at org.wso2.carbon.event.processor.core.internal.storm.util.StormTopologyConstructor.constructTopologyBuilder(StormTopologyConstructor.java:114)
    at org.wso2.carbon.event.processor.core.internal.storm.StormTopologyManager.submitTopology(StormTopologyManager.java:127)
    at org.wso2.carbon.event.processor.core.internal.CarbonEventProcessorService.addExecutionPlan(CarbonEventProcessorService.java:314)
    at org.wso2.carbon.event.processor.core.EventProcessorDeployer.processDeploy(EventProcessorDeployer.java:124)
    ... 76 more
Caused by: java.sql.SQLException
    at org.apache.tomcat.jdbc.pool.PooledConnection.connectUsingDriver(PooledConnection.java:254)
    at org.apache.tomcat.jdbc.pool.PooledConnection.connect(PooledConnection.java:182)
    at org.apache.tomcat.jdbc.pool.ConnectionPool.createConnection(ConnectionPool.java:701)
    at org.apache.tomcat.jdbc.pool.ConnectionPool.borrowConnection(ConnectionPool.java:635)
    at org.apache.tomcat.jdbc.pool.ConnectionPool.getConnection(ConnectionPool.java:188)
    at org.apache.tomcat.jdbc.pool.DataSourceProxy.getConnection(DataSourceProxy.java:127)
    at org.wso2.siddhi.extension.eventtable.rdbms.DBHandler.<init>(DBHandler.java:73)
    ... 89 more
Caused by: java.lang.NullPointerException
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:278)
    at org.apache.tomcat.jdbc.pool.PooledConnection.connectUsingDriver(PooledConnection.java:246)
    ... 95 more

原因是什么?

qnakjoqk

qnakjoqk1#

1) 请检查是否配置了mysql连接。要配置mysql连接,请检查以下链接:https://docs.wso2.com/display/cep410/setting+up+mysql
检查服务器是否可访问,服务器上的架构是否可用,以及需要的表(cep)。架构名称“mysql”是否正确。请检查一下。
2) 检查event-processor.xml文件中的风暴群集配置。有关配置的更多信息,请查看以下链接:https://docs.wso2.com/display/cluster44x/clustering+cep+4.1.0#clusteringcep4.1.0-分布式模型部署分布式cep

unhi4e5o

unhi4e5o2#

当执行计划部署在wso2cep服务器上时,有以下注解就足够了,

@From(eventtable = 'rdbms' , datasource.name = 'MYSQL' , table.name = 'cep')

假设我们已经定义了一个名为 'MYSQL' 在wso2cep服务器的数据源配置中。
但是,当exceution计划部署在storm上时,必须内联指定数据源配置(因为wso2cep服务器中定义的数据源配置在storm中不可用)。具体操作如下:

@From(eventtable = 'rdbms' , jdbc.url='', username='', password='', driver.name='' , table.name = 'cep')

适当地填充注解元素。
希望这能解决你的问题。

相关问题