我正在准备一个拓扑,以便从azure中的iot hub读取设备的状态,但在读取设备的状态一分钟后,我得到了此异常的消息。
我正在设置4个工人的人数。
java.lang.RuntimeException: com.microsoft.eventhubs.client.EventHubException: org.apache.qpid.amqp_1_0.client.ConnectionErrorException: At least one receiver for the endpoint is created with epoch of '12', and so non-epoch receiver is not allowed. Either reconnect with a higher epoch, or make sure all epoch receivers are closed or disconnected. TrackingId:f650897e-9cda-4b12-86ed-6e0b08cd4d43_B7, SystemTracker:iothub-ns-trinityhub-71157-43d51dd035:EventHub:trinityhub-operationmonitoring~29490, Timestamp:11/29/2016 7:25:33 AM Reference:0204db86-8f3f-4365-9b8f-bca97e78b3fa, TrackingId:82395b51-0004-4833-ba2e-b16dc308f5db_B7, SystemTracker:iothub-ns-trinityhub-71157-43d51dd035:eventhub:trinityhub-operationmonitoring~29490|$default, Timestamp:11/29/2016 7:25:33 AM TrackingId:0d6a85ffa5a0494cbec2b2bc309776e5_G4, SystemTracker:gateway6, Timestamp:11/29/2016 7:25:34 AM
at org.apache.storm.eventhubs.spout.EventHubSpout.open(EventHubSpout.java:156) ~[storm-eventhubs-0.10.0.jar:0.10.0]
at backtype.storm.daemon.executor$fn__5624$fn__5639.invoke(executor.clj:564) ~[storm-core-0.10.0.jar:0.10.0]
at backtype.storm.util$async_loop$fn__545.invoke(util.clj:477) [storm-core-0.10.0.jar:0.10.0]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.6.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_101]
Caused by: com.microsoft.eventhubs.client.EventHubException: org.apache.qpid.amqp_1_0.client.ConnectionErrorException: At least one receiver for the endpoint is created with epoch of '12', and so non-epoch receiver is not allowed. Either reconnect with a higher epoch, or make sure all epoch receivers are closed or disconnected. TrackingId:f650897e-9cda-4b12-86ed-6e0b08cd4d43_B7, SystemTracker:iothub-ns-trinityhub-71157-43d51dd035:EventHub:trinityhub-operationmonitoring~29490, Timestamp:11/29/2016 7:25:33 AM Reference:0204db86-8f3f-4365-9b8f-bca97e78b3fa, TrackingId:82395b51-0004-4833-ba2e-b16dc308f5db_B7, SystemTracker:iothub-ns-trinityhub-71157-43d51dd035:eventhub:trinityhub-operationmonitoring~29490|$default, Timestamp:11/29/2016 7:25:33 AM TrackingId:0d6a85ffa5a0494cbec2b2bc309776e5_G4, SystemTracker:gateway6, Timestamp:11/29/2016 7:25:34 AM
at com.microsoft.eventhubs.client.EventHubReceiver.ensureReceiverCreated(EventHubReceiver.java:112) ~[eventhubs-client-0.9.1.jar:?]
at com.microsoft.eventhubs.client.EventHubReceiver.<init>(EventHubReceiver.java:65) ~[eventhubs-client-0.9.1.jar:?]
at com.microsoft.eventhubs.client.EventHubConsumerGroup.createReceiver(EventHubConsumerGroup.java:48) ~[eventhubs-client-0.9.1.jar:?]
at com.microsoft.eventhubs.client.ResilientEventHubReceiver.initialize(ResilientEventHubReceiver.java:63) ~[eventhubs-client-0.9.1.jar:?]
at org.apache.storm.eventhubs.spout.EventHubReceiverImpl.open(EventHubReceiverImpl.java:74) ~[storm-eventhubs-0.10.0.jar:0.10.0]
at org.apache.storm.eventhubs.spout.SimplePartitionManager.open(SimplePartitionManager.java:77) ~[storm-eventhubs-0.10.0.jar:0.10.0]
at org.apache.storm.eventhubs.spout.EventHubSpout.preparePartitions(EventHubSpout.java:134) ~[storm-eventhubs-0.10.0.jar:0.10.0]
at org.apache.storm.eventhubs.spout.EventHubSpout.open(EventHubSpout.java:153) ~[storm-eventhubs-0.10.0.jar:0.10.0]
... 4 more
Caused by: org.apache.qpid.amqp_1_0.client.ConnectionErrorException: At least one receiver for the endpoint is created with epoch of '12', and so non-epoch receiver is not allowed. Either reconnect with a higher epoch, or make sure all epoch receivers are closed or disconnected. TrackingId:f650897e-9cda-4b12-86ed-6e0b08cd4d43_B7, SystemTracker:iothub-ns-trinityhub-71157-43d51dd035:EventHub:trinityhub-operationmonitoring~29490, Timestamp:11/29/2016 7:25:33 AM Reference:0204db86-8f3f-4365-9b8f-bca97e78b3fa, TrackingId:82395b51-0004-4833-ba2e-b16dc308f5db_B7, SystemTracker:iothub-ns-trinityhub-71157-43d51dd035:eventhub:trinityhub-operationmonitoring~29490|$default, Timestamp:11/29/2016 7:25:33 AM TrackingId:0d6a85ffa5a0494cbec2b2bc309776e5_G4, SystemTracker:gateway6, Timestamp:11/29/2016 7:25:34 AM
at org.apache.qpid.amqp_1_0.client.Receiver.<init>(Receiver.java:223) ~[qpid-amqp-1-0-client-0.32.jar:0.32]
at org.apache.qpid.amqp_1_0.client.Session.createReceiver(Session.java:281) ~[qpid-amqp-1-0-client-0.32.jar:0.32]
at org.apache.qpid.amqp_1_0.client.Session.createReceiver(Session.java:260) ~[qpid-amqp-1-0-client-0.32.jar:0.32]
at org.apache.qpid.amqp_1_0.client.Session.createReceiver(Session.java:185) ~[qpid-amqp-1-0-client-0.32.jar:0.32]
at com.microsoft.eventhubs.client.EventHubReceiver.ensureReceiverCreated(EventHubReceiver.java:108) ~[eventhubs-client-0.9.1.jar:?]
at com.microsoft.eventhubs.client.EventHubReceiver.<init>(EventHubReceiver.java:65) ~[eventhubs-client-0.9.1.jar:?]
at com.microsoft.eventhubs.client.EventHubConsumerGroup.createReceiver(EventHubConsumerGroup.java:48) ~[eventhubs-client-0.9.1.jar:?]
at com.microsoft.eventhubs.client.ResilientEventHubReceiver.initialize(ResilientEventHubReceiver.java:63) ~[eventhubs-client-0.9.1.jar:?]
at org.apache.storm.eventhubs.spout.EventHubReceiverImpl.open(EventHubReceiverImpl.java:74) ~[storm-eventhubs-0.10.0.jar:0.10.0]
at org.apache.storm.eventhubs.spout.SimplePartitionManager.open(SimplePartitionManager.java:77) ~[storm-eventhubs-0.10.0.jar:0.10.0]
at org.apache.storm.eventhubs.spout.EventHubSpout.preparePartitions(EventHubSpout.java:134) ~[storm-eventhubs-0.10.0.jar:0.10.0]
at org.apache.storm.eventhubs.spout.EventHubSpout.open(EventHubSpout.java:153) ~[storm-eventhubs-0.10.0.jar:0.10.0]
... 4 more
喷口:创建一个螺栓并将其设置到喷口中
* //Set the WASB bolt to read from the parser output
/* topologyBuilder.setBolt("wasbbolt", wasbBolt, 10)
.shuffleGrouping("EventHubsSpout")
.setNumTasks(spoutConfig.getPartitionCount());*/
//Parse the data from the JSON format in the Event Hub into tuples
topologyBuilder.setBolt("healthDataBolt", new HealthDataBolt(), spoutConfig.getPartitionCount())
.shuffleGrouping("EventHubsSpout")
.setNumTasks(spoutConfig.getPartitionCount());*
bolt:这个是解析状态包并存储到postgres数据库的bolt。
@Override
public void execute(Tuple tuple) {
String data=tuple.getString(0);
JSONObject jsonObject=new JSONObject(data);
Iterator keys = jsonObject.keys();
Date now = new Date();
SimpleDateFormat sdf = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss");
//sdf.setTimeZone (TimeZone.getTimeZone ("IST"));
String currentTime=sdf.format (now);
while(keys.hasNext())
{
String key = (String) keys.next();
if(key.equals("deviceId"))
{
String deviceId=(String) jsonObject.get("deviceId");
String operationName=(String) jsonObject.get("operationName");
String sql="update iot.device_configuration set connection_status=?,current_status_time=?,status_packet=? where licence_key=?";
try
{
PreparedStatement prepareStatement = connectPostgres.prepareStatement(sql);
prepareStatement.setString(1, operationName);
prepareStatement.setString(2, currentTime);
prepareStatement.setString(3, data);
prepareStatement.setString(4, deviceId);
boolean execute = prepareStatement.execute();
}
catch(Exception e)
{
e.printStackTrace();
}
}
}
HTable parsedPacket = null;
try {
parsedPacket = new HTable(hbaseConfig, "device_health_table");
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
Put data1 = new Put(Bytes.toBytes(currentTime.toString()));
data1.add(Bytes.toBytes("details"), Bytes.toBytes("packet"),
Bytes.toBytes(data.toString()));
data1.add(Bytes.toBytes("details"), Bytes.toBytes("updated_time"),
Bytes.toBytes(currentTime.toString()));
try {
parsedPacket.put(data1);
} catch (RetriesExhaustedWithDetailsException | InterruptedIOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
暂无答案!
目前还没有任何答案,快来回答吧!