org.apache.edgent.topology.Topology.collection()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(15.8k)|赞(0)|评价(0)|浏览(114)

本文整理了Java中org.apache.edgent.topology.Topology.collection()方法的一些代码示例,展示了Topology.collection()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Topology.collection()方法的具体详情如下:
包路径:org.apache.edgent.topology.Topology
类名称:Topology
方法名:collection

Topology.collection介绍

[英]Declare a stream of constants from a collection. The returned stream will contain all the tuples in tuples.
[中]声明集合中的常量流。返回的流将包含元组中的所有元组。

代码示例

代码示例来源:origin: apache/incubator-edgent

private TStream<Person> readPersonsTable(Topology t, JdbcStreams db, List<PersonId> personIdList, int delayMsec) {
  // Create a stream of Person from a stream of ids
  TStream<PersonId> personIds = t.collection(personIdList);
  if (delayMsec!=0) {
    personIds =  PlumbingStreams.blockingOneShotDelay(personIds,
        delayMsec, TimeUnit.MILLISECONDS);
  }
  TStream<Person> rcvdPerson = db.executeStatement(personIds,
      () -> "SELECT id, firstname, lastname, gender, age"
          + " FROM persons WHERE id = ?",
      (tuple,stmt) -> stmt.setInt(1, tuple.id),
      (tuple,resultSet,exc,stream) -> {
        resultSet.next();
        int id = resultSet.getInt("id");
        String firstName = resultSet.getString("firstname");
        String lastName = resultSet.getString("lastname");
        String gender = resultSet.getString("gender");
        int age = resultSet.getInt("age");
        stream.accept(new Person(id, firstName, lastName, gender, age));
        }
      );
  return rcvdPerson;
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void testStatusCode() throws Exception {
  
  DirectProvider ep = new DirectProvider();
  
  Topology topology = ep.newTopology();
  
  String url = "http://httpbin.org/status/";
  
  TStream<Integer> rc = HttpStreams.<Integer,Integer>requests(
      topology.collection(Arrays.asList(200, 404, 202)),
      HttpClients::noAuthentication,
      t-> HttpGet.METHOD_NAME,
      t-> url + Integer.toString(t),
      (t,resp) -> resp.getStatusLine().getStatusCode());
  
  Tester tester =  topology.getTester();
  
  Condition<List<Integer>> endCondition = tester.streamContents(rc, 200, 404, 202);
  
  tester.complete(ep,  new JsonObject(), endCondition, 10, TimeUnit.SECONDS);
  
  assertTrue(endCondition.valid());
}

代码示例来源:origin: apache/incubator-edgent

topology.collection(Arrays.asList(request)),
HttpClients::noAuthentication,
t-> url + "a=" + t.get("a").getAsString() + "&b=" + t.get("b").getAsString()

代码示例来源:origin: apache/incubator-edgent

@Test
public void testJoinLastWithKeyer() throws Exception{
  Topology t = newTopology();
  
  List<Integer> ints = new ArrayList<>();
  for(int i = 0; i < 100; i++){
    ints.add(i);
  }
  
  TStream<Integer> intStream = t.collection(ints);
  
  // Wait until the window is populated, and then submit tuples
  TStream<Integer> lookupIntStream = t.source(() -> {
    try {
      Thread.sleep(500);
    } catch (Exception e) {
      e.printStackTrace();
    }
    return ints;
  });
  
  TStream<String> joinsHappened = lookupIntStream.joinLast(tuple -> tuple, intStream, tuple -> tuple, (a, b) -> {
    assertTrue(a.equals(b));
    return "0";
  });
  Condition<Long> tc = t.getTester().tupleCount(joinsHappened, 100);
  complete(t, tc);      
}

代码示例来源:origin: apache/incubator-edgent

t.collection(getPersonIdList()), 3, TimeUnit.SECONDS);
TStream<Person> rcvdPerson = db.executeStatement(personIds,
    (cn) -> cn.prepareStatement("SELECT id, firstname, lastname, gender, age"

代码示例来源:origin: apache/incubator-edgent

@Test
public void jobDone2() throws Exception {
  final int NUM_TUPLES = 1000000;
  Integer[] data = new Integer[NUM_TUPLES];
  AtomicInteger numTuples = new AtomicInteger();
  for (int i = 0; i < data.length; i++) {
    data[i] = new Integer(i);
  }
  Topology t = newTopology();
  TStream<Integer> ints = t.collection(Arrays.asList(data));
  ints.sink(tuple -> numTuples.incrementAndGet());
  Job job = awaitCompleteExecution(t);
  Thread.sleep(1500); // wait for numTuples visibility 
  assertEquals(NUM_TUPLES, numTuples.get());
  assertEquals("job.getCurrentState() must be RUNNING", Job.State.RUNNING, job.getCurrentState());
  assertEquals("job.getCurrentState() must be HEALTHY", Job.Health.HEALTHY, job.getHealth());
  assertEquals("", job.getLastError());
  job.stateChange(Job.Action.CLOSE);
  assertEquals("job.getCurrentState() must be CLOSED", Job.State.CLOSED, job.getCurrentState());
  assertEquals("job.getCurrentState() must be HEALTHY", Job.Health.HEALTHY, job.getHealth());
  assertEquals("", job.getLastError());
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void testSimple() throws Exception {
  Topology t = newTopology("testSimple");
  Map<String, Object> configMap = initRabbitmqConfig();
  MsgGenerator generator = new MsgGenerator(t.getName());
  String queue = "testQueue";
  List<String> msgs = createMsgs(generator, queue, getMsg1(), getMsg2());
  TStream<String> stream = PlumbingStreams.blockingOneShotDelay(
    t.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
  RabbitmqConsumer consumer = new RabbitmqConsumer(t, () -> configMap);
  TStream<String> receivedStream = consumer.subscribe((byte[] bytes) -> new String(bytes), queue);
  RabbitmqProducer producer = new RabbitmqProducer(t, () -> configMap);
  TSink<String> sink = producer.publish(stream, queue, (String s) -> s.getBytes());
  completeAndValidate("", t, receivedStream, generator, SEC_TIMEOUT, msgs.toArray(new String[0]));
  assertNotNull(sink);
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void testWindowSum() throws Exception {
  Topology t = newTopology();
  
  TStream<Integer> integers = t.collection(Arrays.asList(1,2,3,4));
  TWindow<Integer, Integer> window = integers.last(4, unpartitioned());
  assertSame(unpartitioned(), window.getKeyFunction());
  TStream<Integer> sums = window.aggregate((tuples, key) -> {
    assertEquals(Integer.valueOf(0), key);
    int sum = 0;
    for(Integer tuple : tuples)
      sum+=tuple;
    return sum;
  });
  Condition<Long> tc = t.getTester().tupleCount(sums, 4);
  Condition<List<Integer>> contents = t.getTester().streamContents(sums, 1, 3, 6, 10);
  complete(t, tc);
  assertTrue(contents.valid());
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void testKeyedWindowSum() throws Exception {
  Topology t = newTopology();
  
  TStream<Integer> integers = t.collection(Arrays.asList(1,2,3,4,4,3,4,4,3));
  TWindow<Integer, Integer> window = integers.last(9, identity());
  assertSame(identity(), window.getKeyFunction());
  assertSame(t, window.topology());
  assertSame(integers, window.feeder());
  TStream<Integer> sums = window.aggregate((tuples, key) -> {
    // All tuples in a partition are equal due to identity
    assertEquals(1, new HashSet<>(tuples).size());
    int sum = 0;
    for(Integer tuple : tuples)
      sum+=tuple;
    return sum;
  });
  
  Condition<Long> tc = t.getTester().tupleCount(sums, 9);
  Condition<List<Integer>> contents = t.getTester().streamContents(sums, 
      1, 2, 3, 4, 8, 6, 12, 16, 9);
  complete(t, tc);
  assertTrue(contents.valid());
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void testAutoClientId() throws Exception {
  Topology top = newTopology("testAutoClientId");
  MsgGenerator mgen = new MsgGenerator(top.getName());
  int qos = 0;
  boolean retain = false;
  String topic = getMqttTopics()[0];
  List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
  
  TStream<String> s = PlumbingStreams.blockingOneShotDelay(
      top.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
  
  // Test with auto-generated clientId
  MqttConfig config = newConfig(getServerURI(), null/*clientId*/);
  MqttStreams mqtt = new MqttStreams(top, () -> config);
  mqtt.publish(s, topic, qos, retain);
  TStream<String> rcvd = mqtt.subscribe(topic, qos);
  completeAndValidate("some-auto-clientId", top, rcvd, mgen, SEC_TIMEOUT, msgs.toArray(new String[0]));
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void testQoS1() throws Exception {
  Topology top = newTopology("testQoS1");
  MsgGenerator mgen = new MsgGenerator(top.getName());
  int qos = 1;
  boolean retain = false;
  String clientId = newClientId(top.getName());
  String topic = getMqttTopics()[0];
  List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
  
  TStream<String> s = PlumbingStreams.blockingOneShotDelay(
      top.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
  
  // TODO something to verify that we actually provide
  // the QoS semantics.
  // Test publish(TStream<String>, topic, qos)
  // Test TStream<String> subscribe(topic, qos)
  
  MqttConfig config = newConfig(getServerURI(), clientId);
  MqttStreams mqtt = new MqttStreams(top, () -> config);
  mqtt.publish(s, topic, qos, retain);
  TStream<String> rcvd = mqtt.subscribe(topic, qos);
  completeAndValidate(clientId, top, rcvd, mgen, SEC_TIMEOUT, msgs.toArray(new String[0]));
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void testMultipleServerURL() throws Exception {
  Topology top = newTopology("testMultipleServerURL");
  MsgGenerator mgen = new MsgGenerator(top.getName());
  int qos = 0;
  boolean retain = false;
  String clientId = newClientId(top.getName());
  String topic = getMqttTopics()[0];
  List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
  
  TStream<String> s = PlumbingStreams.blockingOneShotDelay(
      top.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
  
  // Test server URL selection - supply a bogus 1st URL.
  
  String[] serverURLs = new String[] {"tcp://localhost:31999", getServerURI()};
  MqttConfig config = newConfig(serverURLs[0], clientId);
  config.setServerURLs(serverURLs);
  MqttStreams mqtt = new MqttStreams(top, () -> config);
  mqtt.publish(s, topic, qos, retain);
  TStream<String> rcvd = mqtt.subscribe(topic, qos);
  completeAndValidate(clientId, top, rcvd, mgen, SEC_TIMEOUT, msgs.toArray(new String[0]));
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void testStringPublish() throws Exception {
  Topology top = newTopology("testStringPublish");
  MsgGenerator mgen = new MsgGenerator(top.getName());
  int qos = 0;
  boolean retain = false;
  String clientId = newClientId(top.getName());
  String topic = getMqttTopics()[0];
  List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
  
  TStream<String> s = PlumbingStreams.blockingOneShotDelay(
      top.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
  
  // Test publish(TStream<String>, topic, qos)
  // Test TStream<String> subscribe(topic, qos)
  
  MqttConfig config = newConfig(getServerURI(), clientId);
  MqttStreams mqtt = new MqttStreams(top, () -> config);
  TSink<String> sink = mqtt.publish(s, topic, qos, retain);
  TStream<String> rcvd = mqtt.subscribe(topic, qos);
  completeAndValidate(clientId, top, rcvd, mgen, SEC_TIMEOUT, msgs.toArray(new String[0]));
  
  assertNotNull(sink);
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void testActionTime() throws Exception {
  Topology top = newTopology("testActionTime");
  MsgGenerator mgen = new MsgGenerator(top.getName());
  int qos = 0;
  boolean retain = false;
  String clientId = newClientId(top.getName());
  String topic = getMqttTopics()[0];
  List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
  
  TStream<String> s = PlumbingStreams.blockingOneShotDelay(
      top.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
  
  // Test publish(TStream<String>, topic, qos)
  // Test TStream<String> subscribe(topic, qos)
  
  MqttConfig config = newConfig(getServerURI(), clientId);
  config.setActionTimeToWaitMillis(3*1000);
  MqttStreams mqtt = new MqttStreams(top, () -> config);
  mqtt.publish(s, topic, qos, retain);
  TStream<String> rcvd = mqtt.subscribe(topic, qos);
  completeAndValidate(clientId, top, rcvd, mgen, SEC_TIMEOUT, msgs.toArray(new String[0]));
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void testSslClientAuth() throws Exception {
  Topology top = newTopology("testSslClientAuth");
  MsgGenerator mgen = new MsgGenerator(top.getName());
  int qos = 0;
  boolean retain = false;
  String clientId = newClientId(top.getName());
  String topic = getMqttTopics()[0];
  List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
  
  TStream<String> s = PlumbingStreams.blockingOneShotDelay(
      top.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
  
  // Test publish(TStream<String>, topic, qos)
  // Test TStream<String> subscribe(topic, qos)
  
  // System.setProperty("javax.net.debug", "ssl"); // or "all"; "help" for full list
  
  setSslAuthInfo("sslClientAuth");
  MqttConfig config = newConfig(getSslClientAuthServerURI(), clientId);
  MqttStreams mqtt = new MqttStreams(top, () -> config);
  TSink<String> sink = mqtt.publish(s, topic, qos, retain);
  TStream<String> rcvd = mqtt.subscribe(topic, qos);
  completeAndValidate(clientId, top, rcvd, mgen, SEC_TIMEOUT, msgs.toArray(new String[0]));
  
  assertNotNull(sink);
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void testSsl() throws Exception {
  Topology top = newTopology("testSsl");
  MsgGenerator mgen = new MsgGenerator(top.getName());
  int qos = 0;
  boolean retain = false;
  String clientId = newClientId(top.getName());
  String topic = getMqttTopics()[0];
  List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
  
  TStream<String> s = PlumbingStreams.blockingOneShotDelay(
      top.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
  
  // Test publish(TStream<String>, topic, qos)
  // Test TStream<String> subscribe(topic, qos)
  // System.setProperty("javax.net.debug", "ssl"); // or "all"; "help" for full list
  
  setSslAuthInfo("ssl");
  MqttConfig config = newConfig(getSslServerURI(), clientId);
  MqttStreams mqtt = new MqttStreams(top, () -> config);
  TSink<String> sink = mqtt.publish(s, topic, qos, retain);
  TStream<String> rcvd = mqtt.subscribe(topic, qos);
  completeAndValidate(clientId, top, rcvd, mgen, SEC_TIMEOUT, msgs.toArray(new String[0]));
  
  assertNotNull(sink);
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void testSimple() throws Exception {
  Topology t = newTopology("testSimple");
  MsgGenerator mgen = new MsgGenerator(t.getName());
  String topic = getKafkaTopics()[0];
  String groupId = newGroupId(t.getName());
  List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
  
  TStream<String> s = PlumbingStreams.blockingOneShotDelay(
          t.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
  
  Map<String,Object> pConfig = newProducerConfig();
  KafkaProducer producer = new KafkaProducer(t, () -> pConfig);
  
  TSink<String> sink = producer.publish(s, topic);
  
  Map<String,Object> cConfig = newConsumerConfig(groupId);
  KafkaConsumer consumer = new KafkaConsumer(t, () -> cConfig);
  
  TStream<String> rcvd = consumer.subscribe(
        rec -> rec.value(),
        topic);
  completeAndValidate("", t, rcvd, mgen, SEC_TIMEOUT, msgs.toArray(new String[0]));
  
  assertNotNull(sink);
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void testQoS2() throws Exception {
  Topology top = newTopology("testQoS2");
  MsgGenerator mgen = new MsgGenerator(top.getName());
  int qos = 2;
  boolean retain = false;
  String clientId = newClientId(top.getName());
  String topic = getMqttTopics()[0];
  List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
  
  TStream<String> s = PlumbingStreams.blockingOneShotDelay(
      top.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
  
  // TODO something to verify that we actually provide
  // the QoS semantics.
  // Also improve code coverage with persistence override
  // Test publish(TStream<String>, topic, qos)
  // Test TStream<String> subscribe(topic, qos)
  
  MqttConfig config = newConfig(getServerURI(), clientId);
  config.setPersistence(new MemoryPersistence());
  MqttStreams mqtt = new MqttStreams(top, () -> config);
  mqtt.publish(s, topic, qos, retain);
  TStream<String> rcvd = mqtt.subscribe(topic, qos);
  completeAndValidate(clientId, top, rcvd, mgen, SEC_TIMEOUT, msgs.toArray(new String[0]));
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void testMultiConnector() throws Exception {
  Topology top = newTopology("testMultiConnector");
  MsgGenerator mgen = new MsgGenerator(top.getName());
  int qos = 0;
  boolean retain = false;
  String pubClientId = newClientId(top.getName())+"_pub";
  String subClientId = newClientId(top.getName())+"_sub";
  String topic = getMqttTopics()[0];
  List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
  
  TStream<String> s = PlumbingStreams.blockingOneShotDelay(
      top.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
  
  // Test separate connectors for pub and sub
  
  MqttConfig config = newConfig(getServerURI(), pubClientId);
  MqttStreams mqttPub = new MqttStreams(top, () -> config);
  mqttPub.publish(s, topic, qos, retain);
  
  MqttConfig configSub = newConfig(getServerURI(), subClientId);
  MqttStreams mqttSub = new MqttStreams(top, () -> configSub);
  TStream<String> rcvd = mqttSub.subscribe(topic, qos);
  completeAndValidate(pubClientId, top, rcvd, mgen, SEC_TIMEOUT, msgs.toArray(new String[0]));
}

代码示例来源:origin: apache/incubator-edgent

top.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);

相关文章