本文整理了Java中org.apache.edgent.topology.Topology.collection()
方法的一些代码示例,展示了Topology.collection()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Topology.collection()
方法的具体详情如下:
包路径:org.apache.edgent.topology.Topology
类名称:Topology
方法名:collection
[英]Declare a stream of constants from a collection. The returned stream will contain all the tuples in tuples.
[中]声明集合中的常量流。返回的流将包含元组中的所有元组。
代码示例来源:origin: apache/incubator-edgent
private TStream<Person> readPersonsTable(Topology t, JdbcStreams db, List<PersonId> personIdList, int delayMsec) {
// Create a stream of Person from a stream of ids
TStream<PersonId> personIds = t.collection(personIdList);
if (delayMsec!=0) {
personIds = PlumbingStreams.blockingOneShotDelay(personIds,
delayMsec, TimeUnit.MILLISECONDS);
}
TStream<Person> rcvdPerson = db.executeStatement(personIds,
() -> "SELECT id, firstname, lastname, gender, age"
+ " FROM persons WHERE id = ?",
(tuple,stmt) -> stmt.setInt(1, tuple.id),
(tuple,resultSet,exc,stream) -> {
resultSet.next();
int id = resultSet.getInt("id");
String firstName = resultSet.getString("firstname");
String lastName = resultSet.getString("lastname");
String gender = resultSet.getString("gender");
int age = resultSet.getInt("age");
stream.accept(new Person(id, firstName, lastName, gender, age));
}
);
return rcvdPerson;
}
代码示例来源:origin: apache/incubator-edgent
@Test
public void testStatusCode() throws Exception {
DirectProvider ep = new DirectProvider();
Topology topology = ep.newTopology();
String url = "http://httpbin.org/status/";
TStream<Integer> rc = HttpStreams.<Integer,Integer>requests(
topology.collection(Arrays.asList(200, 404, 202)),
HttpClients::noAuthentication,
t-> HttpGet.METHOD_NAME,
t-> url + Integer.toString(t),
(t,resp) -> resp.getStatusLine().getStatusCode());
Tester tester = topology.getTester();
Condition<List<Integer>> endCondition = tester.streamContents(rc, 200, 404, 202);
tester.complete(ep, new JsonObject(), endCondition, 10, TimeUnit.SECONDS);
assertTrue(endCondition.valid());
}
代码示例来源:origin: apache/incubator-edgent
topology.collection(Arrays.asList(request)),
HttpClients::noAuthentication,
t-> url + "a=" + t.get("a").getAsString() + "&b=" + t.get("b").getAsString()
代码示例来源:origin: apache/incubator-edgent
@Test
public void testJoinLastWithKeyer() throws Exception{
Topology t = newTopology();
List<Integer> ints = new ArrayList<>();
for(int i = 0; i < 100; i++){
ints.add(i);
}
TStream<Integer> intStream = t.collection(ints);
// Wait until the window is populated, and then submit tuples
TStream<Integer> lookupIntStream = t.source(() -> {
try {
Thread.sleep(500);
} catch (Exception e) {
e.printStackTrace();
}
return ints;
});
TStream<String> joinsHappened = lookupIntStream.joinLast(tuple -> tuple, intStream, tuple -> tuple, (a, b) -> {
assertTrue(a.equals(b));
return "0";
});
Condition<Long> tc = t.getTester().tupleCount(joinsHappened, 100);
complete(t, tc);
}
代码示例来源:origin: apache/incubator-edgent
t.collection(getPersonIdList()), 3, TimeUnit.SECONDS);
TStream<Person> rcvdPerson = db.executeStatement(personIds,
(cn) -> cn.prepareStatement("SELECT id, firstname, lastname, gender, age"
代码示例来源:origin: apache/incubator-edgent
@Test
public void jobDone2() throws Exception {
final int NUM_TUPLES = 1000000;
Integer[] data = new Integer[NUM_TUPLES];
AtomicInteger numTuples = new AtomicInteger();
for (int i = 0; i < data.length; i++) {
data[i] = new Integer(i);
}
Topology t = newTopology();
TStream<Integer> ints = t.collection(Arrays.asList(data));
ints.sink(tuple -> numTuples.incrementAndGet());
Job job = awaitCompleteExecution(t);
Thread.sleep(1500); // wait for numTuples visibility
assertEquals(NUM_TUPLES, numTuples.get());
assertEquals("job.getCurrentState() must be RUNNING", Job.State.RUNNING, job.getCurrentState());
assertEquals("job.getCurrentState() must be HEALTHY", Job.Health.HEALTHY, job.getHealth());
assertEquals("", job.getLastError());
job.stateChange(Job.Action.CLOSE);
assertEquals("job.getCurrentState() must be CLOSED", Job.State.CLOSED, job.getCurrentState());
assertEquals("job.getCurrentState() must be HEALTHY", Job.Health.HEALTHY, job.getHealth());
assertEquals("", job.getLastError());
}
代码示例来源:origin: apache/incubator-edgent
@Test
public void testSimple() throws Exception {
Topology t = newTopology("testSimple");
Map<String, Object> configMap = initRabbitmqConfig();
MsgGenerator generator = new MsgGenerator(t.getName());
String queue = "testQueue";
List<String> msgs = createMsgs(generator, queue, getMsg1(), getMsg2());
TStream<String> stream = PlumbingStreams.blockingOneShotDelay(
t.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
RabbitmqConsumer consumer = new RabbitmqConsumer(t, () -> configMap);
TStream<String> receivedStream = consumer.subscribe((byte[] bytes) -> new String(bytes), queue);
RabbitmqProducer producer = new RabbitmqProducer(t, () -> configMap);
TSink<String> sink = producer.publish(stream, queue, (String s) -> s.getBytes());
completeAndValidate("", t, receivedStream, generator, SEC_TIMEOUT, msgs.toArray(new String[0]));
assertNotNull(sink);
}
代码示例来源:origin: apache/incubator-edgent
@Test
public void testWindowSum() throws Exception {
Topology t = newTopology();
TStream<Integer> integers = t.collection(Arrays.asList(1,2,3,4));
TWindow<Integer, Integer> window = integers.last(4, unpartitioned());
assertSame(unpartitioned(), window.getKeyFunction());
TStream<Integer> sums = window.aggregate((tuples, key) -> {
assertEquals(Integer.valueOf(0), key);
int sum = 0;
for(Integer tuple : tuples)
sum+=tuple;
return sum;
});
Condition<Long> tc = t.getTester().tupleCount(sums, 4);
Condition<List<Integer>> contents = t.getTester().streamContents(sums, 1, 3, 6, 10);
complete(t, tc);
assertTrue(contents.valid());
}
代码示例来源:origin: apache/incubator-edgent
@Test
public void testKeyedWindowSum() throws Exception {
Topology t = newTopology();
TStream<Integer> integers = t.collection(Arrays.asList(1,2,3,4,4,3,4,4,3));
TWindow<Integer, Integer> window = integers.last(9, identity());
assertSame(identity(), window.getKeyFunction());
assertSame(t, window.topology());
assertSame(integers, window.feeder());
TStream<Integer> sums = window.aggregate((tuples, key) -> {
// All tuples in a partition are equal due to identity
assertEquals(1, new HashSet<>(tuples).size());
int sum = 0;
for(Integer tuple : tuples)
sum+=tuple;
return sum;
});
Condition<Long> tc = t.getTester().tupleCount(sums, 9);
Condition<List<Integer>> contents = t.getTester().streamContents(sums,
1, 2, 3, 4, 8, 6, 12, 16, 9);
complete(t, tc);
assertTrue(contents.valid());
}
代码示例来源:origin: apache/incubator-edgent
@Test
public void testAutoClientId() throws Exception {
Topology top = newTopology("testAutoClientId");
MsgGenerator mgen = new MsgGenerator(top.getName());
int qos = 0;
boolean retain = false;
String topic = getMqttTopics()[0];
List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
TStream<String> s = PlumbingStreams.blockingOneShotDelay(
top.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
// Test with auto-generated clientId
MqttConfig config = newConfig(getServerURI(), null/*clientId*/);
MqttStreams mqtt = new MqttStreams(top, () -> config);
mqtt.publish(s, topic, qos, retain);
TStream<String> rcvd = mqtt.subscribe(topic, qos);
completeAndValidate("some-auto-clientId", top, rcvd, mgen, SEC_TIMEOUT, msgs.toArray(new String[0]));
}
代码示例来源:origin: apache/incubator-edgent
@Test
public void testQoS1() throws Exception {
Topology top = newTopology("testQoS1");
MsgGenerator mgen = new MsgGenerator(top.getName());
int qos = 1;
boolean retain = false;
String clientId = newClientId(top.getName());
String topic = getMqttTopics()[0];
List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
TStream<String> s = PlumbingStreams.blockingOneShotDelay(
top.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
// TODO something to verify that we actually provide
// the QoS semantics.
// Test publish(TStream<String>, topic, qos)
// Test TStream<String> subscribe(topic, qos)
MqttConfig config = newConfig(getServerURI(), clientId);
MqttStreams mqtt = new MqttStreams(top, () -> config);
mqtt.publish(s, topic, qos, retain);
TStream<String> rcvd = mqtt.subscribe(topic, qos);
completeAndValidate(clientId, top, rcvd, mgen, SEC_TIMEOUT, msgs.toArray(new String[0]));
}
代码示例来源:origin: apache/incubator-edgent
@Test
public void testMultipleServerURL() throws Exception {
Topology top = newTopology("testMultipleServerURL");
MsgGenerator mgen = new MsgGenerator(top.getName());
int qos = 0;
boolean retain = false;
String clientId = newClientId(top.getName());
String topic = getMqttTopics()[0];
List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
TStream<String> s = PlumbingStreams.blockingOneShotDelay(
top.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
// Test server URL selection - supply a bogus 1st URL.
String[] serverURLs = new String[] {"tcp://localhost:31999", getServerURI()};
MqttConfig config = newConfig(serverURLs[0], clientId);
config.setServerURLs(serverURLs);
MqttStreams mqtt = new MqttStreams(top, () -> config);
mqtt.publish(s, topic, qos, retain);
TStream<String> rcvd = mqtt.subscribe(topic, qos);
completeAndValidate(clientId, top, rcvd, mgen, SEC_TIMEOUT, msgs.toArray(new String[0]));
}
代码示例来源:origin: apache/incubator-edgent
@Test
public void testStringPublish() throws Exception {
Topology top = newTopology("testStringPublish");
MsgGenerator mgen = new MsgGenerator(top.getName());
int qos = 0;
boolean retain = false;
String clientId = newClientId(top.getName());
String topic = getMqttTopics()[0];
List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
TStream<String> s = PlumbingStreams.blockingOneShotDelay(
top.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
// Test publish(TStream<String>, topic, qos)
// Test TStream<String> subscribe(topic, qos)
MqttConfig config = newConfig(getServerURI(), clientId);
MqttStreams mqtt = new MqttStreams(top, () -> config);
TSink<String> sink = mqtt.publish(s, topic, qos, retain);
TStream<String> rcvd = mqtt.subscribe(topic, qos);
completeAndValidate(clientId, top, rcvd, mgen, SEC_TIMEOUT, msgs.toArray(new String[0]));
assertNotNull(sink);
}
代码示例来源:origin: apache/incubator-edgent
@Test
public void testActionTime() throws Exception {
Topology top = newTopology("testActionTime");
MsgGenerator mgen = new MsgGenerator(top.getName());
int qos = 0;
boolean retain = false;
String clientId = newClientId(top.getName());
String topic = getMqttTopics()[0];
List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
TStream<String> s = PlumbingStreams.blockingOneShotDelay(
top.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
// Test publish(TStream<String>, topic, qos)
// Test TStream<String> subscribe(topic, qos)
MqttConfig config = newConfig(getServerURI(), clientId);
config.setActionTimeToWaitMillis(3*1000);
MqttStreams mqtt = new MqttStreams(top, () -> config);
mqtt.publish(s, topic, qos, retain);
TStream<String> rcvd = mqtt.subscribe(topic, qos);
completeAndValidate(clientId, top, rcvd, mgen, SEC_TIMEOUT, msgs.toArray(new String[0]));
}
代码示例来源:origin: apache/incubator-edgent
@Test
public void testSslClientAuth() throws Exception {
Topology top = newTopology("testSslClientAuth");
MsgGenerator mgen = new MsgGenerator(top.getName());
int qos = 0;
boolean retain = false;
String clientId = newClientId(top.getName());
String topic = getMqttTopics()[0];
List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
TStream<String> s = PlumbingStreams.blockingOneShotDelay(
top.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
// Test publish(TStream<String>, topic, qos)
// Test TStream<String> subscribe(topic, qos)
// System.setProperty("javax.net.debug", "ssl"); // or "all"; "help" for full list
setSslAuthInfo("sslClientAuth");
MqttConfig config = newConfig(getSslClientAuthServerURI(), clientId);
MqttStreams mqtt = new MqttStreams(top, () -> config);
TSink<String> sink = mqtt.publish(s, topic, qos, retain);
TStream<String> rcvd = mqtt.subscribe(topic, qos);
completeAndValidate(clientId, top, rcvd, mgen, SEC_TIMEOUT, msgs.toArray(new String[0]));
assertNotNull(sink);
}
代码示例来源:origin: apache/incubator-edgent
@Test
public void testSsl() throws Exception {
Topology top = newTopology("testSsl");
MsgGenerator mgen = new MsgGenerator(top.getName());
int qos = 0;
boolean retain = false;
String clientId = newClientId(top.getName());
String topic = getMqttTopics()[0];
List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
TStream<String> s = PlumbingStreams.blockingOneShotDelay(
top.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
// Test publish(TStream<String>, topic, qos)
// Test TStream<String> subscribe(topic, qos)
// System.setProperty("javax.net.debug", "ssl"); // or "all"; "help" for full list
setSslAuthInfo("ssl");
MqttConfig config = newConfig(getSslServerURI(), clientId);
MqttStreams mqtt = new MqttStreams(top, () -> config);
TSink<String> sink = mqtt.publish(s, topic, qos, retain);
TStream<String> rcvd = mqtt.subscribe(topic, qos);
completeAndValidate(clientId, top, rcvd, mgen, SEC_TIMEOUT, msgs.toArray(new String[0]));
assertNotNull(sink);
}
代码示例来源:origin: apache/incubator-edgent
@Test
public void testSimple() throws Exception {
Topology t = newTopology("testSimple");
MsgGenerator mgen = new MsgGenerator(t.getName());
String topic = getKafkaTopics()[0];
String groupId = newGroupId(t.getName());
List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
TStream<String> s = PlumbingStreams.blockingOneShotDelay(
t.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
Map<String,Object> pConfig = newProducerConfig();
KafkaProducer producer = new KafkaProducer(t, () -> pConfig);
TSink<String> sink = producer.publish(s, topic);
Map<String,Object> cConfig = newConsumerConfig(groupId);
KafkaConsumer consumer = new KafkaConsumer(t, () -> cConfig);
TStream<String> rcvd = consumer.subscribe(
rec -> rec.value(),
topic);
completeAndValidate("", t, rcvd, mgen, SEC_TIMEOUT, msgs.toArray(new String[0]));
assertNotNull(sink);
}
代码示例来源:origin: apache/incubator-edgent
@Test
public void testQoS2() throws Exception {
Topology top = newTopology("testQoS2");
MsgGenerator mgen = new MsgGenerator(top.getName());
int qos = 2;
boolean retain = false;
String clientId = newClientId(top.getName());
String topic = getMqttTopics()[0];
List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
TStream<String> s = PlumbingStreams.blockingOneShotDelay(
top.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
// TODO something to verify that we actually provide
// the QoS semantics.
// Also improve code coverage with persistence override
// Test publish(TStream<String>, topic, qos)
// Test TStream<String> subscribe(topic, qos)
MqttConfig config = newConfig(getServerURI(), clientId);
config.setPersistence(new MemoryPersistence());
MqttStreams mqtt = new MqttStreams(top, () -> config);
mqtt.publish(s, topic, qos, retain);
TStream<String> rcvd = mqtt.subscribe(topic, qos);
completeAndValidate(clientId, top, rcvd, mgen, SEC_TIMEOUT, msgs.toArray(new String[0]));
}
代码示例来源:origin: apache/incubator-edgent
@Test
public void testMultiConnector() throws Exception {
Topology top = newTopology("testMultiConnector");
MsgGenerator mgen = new MsgGenerator(top.getName());
int qos = 0;
boolean retain = false;
String pubClientId = newClientId(top.getName())+"_pub";
String subClientId = newClientId(top.getName())+"_sub";
String topic = getMqttTopics()[0];
List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
TStream<String> s = PlumbingStreams.blockingOneShotDelay(
top.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
// Test separate connectors for pub and sub
MqttConfig config = newConfig(getServerURI(), pubClientId);
MqttStreams mqttPub = new MqttStreams(top, () -> config);
mqttPub.publish(s, topic, qos, retain);
MqttConfig configSub = newConfig(getServerURI(), subClientId);
MqttStreams mqttSub = new MqttStreams(top, () -> configSub);
TStream<String> rcvd = mqttSub.subscribe(topic, qos);
completeAndValidate(pubClientId, top, rcvd, mgen, SEC_TIMEOUT, msgs.toArray(new String[0]));
}
代码示例来源:origin: apache/incubator-edgent
top.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
内容来源于网络,如有侵权,请联系作者删除!