org.apache.edgent.topology.Topology类的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(11.4k)|赞(0)|评价(0)|浏览(154)

本文整理了Java中org.apache.edgent.topology.Topology类的一些代码示例,展示了Topology类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Topology类的具体详情如下:
包路径:org.apache.edgent.topology.Topology
类名称:Topology

Topology介绍

[英]A declaration of a topology of streaming data. This class provides some fundamental generic methods to create source streams, such as #source(Supplier), #poll(Supplier,long,TimeUnit), #strings(String...).

See Edgent Source Streams.
[中]流数据拓扑的声明。这个类提供了一些基本的通用方法来创建源流,例如#source(Supplier)、#poll(Supplier,long,TimeUnit)、#strings(String…)。
Edgent Source Streams

代码示例

代码示例来源:origin: org.apache.edgent/edgent-test-appservice-applications

@Override
  public BiConsumer<Topology, JsonObject> getBuilder() {
    return (t,c) -> t.strings(getName()).print();
  }     
}

代码示例来源:origin: apache/incubator-edgent

protected void waitUntilComplete(Topology t, TStream<String> s, String[] data) throws Exception {
    Condition<Long> tc = t.getTester().tupleCount(s, data.length);
    complete(t, tc);
    
    // Save the job.
    job = t.getTester().getJob();
    assertNotNull(job);
    
  }
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void testBasics() {
  final Topology t = newTopology("T123");
  assertEquals("T123", t.getName());
  assertSame(t, t.topology());
}

代码示例来源:origin: apache/incubator-edgent

/**
 * Declares a stream populated by {@link JobRegistryService} events.
 * <p>
 * The job registry is passed as a runtime service. At startup 
 * {@code JobRegistryService#addListener()} is called by the 
 * runtime to subscribe an event listener.  The listener invokes the given 
 * {@code wrapper} function to construct a tuple from a job event
 * and submits the tuple on the returned stream.</p>
 * <p>
 * When the topology's execution is terminated, 
 * {@code JobRegistryServic#removeListener()}  in invoked to unsubscribe 
 * the tuple source from the job registry. 
 * </p>
 *
 * @param <T> Tuple type
 * @param topology the stream topology
 * @param wrapper constructs a tuple from a job event
 * @return new stream containing the tuples generated by the specified {@code wrapper}.
 * 
 * @see Topology#getRuntimeServiceSupplier() 
 * @see JobRegistryService#addListener(BiConsumer)
 * @see JobRegistryService#removeListener(BiConsumer)
 */
public static <T> TStream<T> source(
    Topology topology, 
    BiFunction<JobRegistryService.EventType, Job, T> wrapper) {
  Supplier<RuntimeServices> rts = topology.getRuntimeServiceSupplier();
  return topology.events(new JobEventsSetup<T>(wrapper, rts));
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void testStringContants() throws Exception {
  Topology t = newTopology();
  TStream<String> s = t.strings("a", "b", "c");
  assertStream(t, s);
  Condition<Long> tc = t.getTester().tupleCount(s, 3);
  Condition<List<String>> contents = t.getTester().streamContents(s, "a", "b", "c");
  complete(t, tc);
  assertTrue(contents.valid());
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void testSimple() throws Exception {
  Topology t = newTopology("testSimple");
  Map<String, Object> configMap = initRabbitmqConfig();
  MsgGenerator generator = new MsgGenerator(t.getName());
  String queue = "testQueue";
  List<String> msgs = createMsgs(generator, queue, getMsg1(), getMsg2());
  TStream<String> stream = PlumbingStreams.blockingOneShotDelay(
    t.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
  RabbitmqConsumer consumer = new RabbitmqConsumer(t, () -> configMap);
  TStream<String> receivedStream = consumer.subscribe((byte[] bytes) -> new String(bytes), queue);
  RabbitmqProducer producer = new RabbitmqProducer(t, () -> configMap);
  TSink<String> sink = producer.publish(stream, queue, (String s) -> s.getBytes());
  completeAndValidate("", t, receivedStream, generator, SEC_TIMEOUT, msgs.toArray(new String[0]));
  assertNotNull(sink);
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void testDeadtimeTooShort() throws Exception {
  Topology topology = newTopology("testDeadtimeTooShort");
  
  TStream<Integer> values = topology.of(1,2,3,4,5,6,7,8,9,10);
  
  // no deadtime due to < 1ms
  TStream<Integer> filtered = Filters.deadtime(values, 999, TimeUnit.MICROSECONDS);
  
  Condition<Long> count = topology.getTester().tupleCount(filtered, 10);
  Condition<List<Integer>> contents = topology.getTester().streamContents(filtered, 1,2,3,4,5,6,7,8,9,10 );
  complete(topology, count);
  
  assertTrue(contents.getResult().toString(), contents.valid());
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void testMetricsEverywhere() throws Exception {
  Topology t = newTopology();
  TStream<String> s = t.strings("a", "b", "c");
  // Condition inserts a sink
  Condition<Long> tc = t.getTester().tupleCount(s, 3);
  Graph g = t.graph();
  Collection<Vertex<? extends Oplet<?, ?>, ?, ?>> vertices = g.getVertices();
  
  // Two vertices before submission
  assertEquals(2, vertices.size());
  complete(t, tc);
  // At least three vertices after submission
  // (provide may have added other oplets as well)
  Collection<Vertex<? extends Oplet<?, ?>, ?, ?>> verticesAfterSubmit = g.getVertices();
  assertTrue("size="+verticesAfterSubmit.size(), verticesAfterSubmit.size() >= 3);
  
  // There is exactly one vertex for a metric oplet
  int numOplets = 0;
  for (Vertex<? extends Oplet<?, ?>, ?, ?> v : verticesAfterSubmit) {
    Oplet<?,?> oplet = v.getInstance();
    if (oplet instanceof CounterOp) {
      numOplets++;
    }
  }
  assertEquals(1, numOplets);
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void testRuntimeServices() throws Exception {
  Topology t = newTopology();
  TStream<String> s = t.strings("A");
  
  Supplier<RuntimeServices> serviceGetter =
      t.getRuntimeServiceSupplier();
  
  TStream<Boolean> b = s.map(tuple -> 
    serviceGetter.get().getService(ThreadFactory.class) != null
    && serviceGetter.get().getService(ScheduledExecutorService.class) != null
  );
  
  Condition<List<Boolean>> tc = t.getTester().streamContents(b, Boolean.TRUE);
  complete(t, tc);
  
  assertTrue(tc.valid());
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void testDeadtime() throws Exception {
  Topology topology = newTopology("testDeadtime");
  
  int maxTupleCnt = 10;
  AtomicInteger cnt = new AtomicInteger();
  
  TStream<Integer> values = topology.poll(() -> {
    int curCnt = cnt.incrementAndGet();
    if (curCnt > maxTupleCnt)
      return null;
    return curCnt;
    }, 100, TimeUnit.MILLISECONDS);
  
  // use a deadtime value that causes filtering of every other tuple
  TStream<Integer> filtered = Filters.deadtime(values, 150, TimeUnit.MILLISECONDS);
  
  Condition<Long> count = topology.getTester().tupleCount(filtered, maxTupleCnt/2);
  Condition<List<Integer>> contents = topology.getTester().streamContents(filtered, 1, 3, 5, 7, 9 );
  complete(topology, count);
  
  assertTrue(contents.getResult().toString(), contents.valid());
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void testString() throws Exception {
  Topology t = newTopology("testString");
  System.out.println("===== "+t.getName());
  
  startEchoer();  // before getConfig() so it gets the port
  
  Properties config = getConfig();
  WebSocketClient wsClient = new Jsr356WebSocketClient(t, config);
  
  String[] expected = new String[] { getStr1(), getStr2() };
  
  TStream<String> s = t.strings(expected);
  s = PlumbingStreams.blockingOneShotDelay(s, 2, TimeUnit.SECONDS);
  wsClient.sendString(s);
  
  TStream<String> rcvd = wsClient.receiveString();
  
  completeAndValidate("", t, rcvd, SEC_TMO, expected);
}

代码示例来源:origin: apache/incubator-edgent

TStream<Integer> values = top.of(resultTuples);
 });
Condition<Long> count = top.getTester().tupleCount(result, resultTuples.length);
Condition<List<Integer>> contents = top.getTester().contentsUnordered(result2, resultTuples);
long expMinDuration = (resultTuples.length / width) * 100;
System.out.println(top.getName()+" expMinDuration="+expMinDuration+" actDuration="+actDuration+" expMinSerialDuration="+expMinSerialDuration);
 System.err.println(top.getName()+" WARNING skipped performance check on 'ci' system use");
else
 assertTrue("expMinSerialDuration="+expMinSerialDuration+" actDuration="+actDuration,

代码示例来源:origin: apache/incubator-edgent

@Test
public void testStatusCode() throws Exception {
  
  DirectProvider ep = new DirectProvider();
  
  Topology topology = ep.newTopology();
  
  String url = "http://httpbin.org/status/";
  
  TStream<Integer> rc = HttpStreams.<Integer,Integer>requests(
      topology.collection(Arrays.asList(200, 404, 202)),
      HttpClients::noAuthentication,
      t-> HttpGet.METHOD_NAME,
      t-> url + Integer.toString(t),
      (t,resp) -> resp.getStatusLine().getStatusCode());
  
  Tester tester =  topology.getTester();
  
  Condition<List<Integer>> endCondition = tester.streamContents(rc, 200, 404, 202);
  
  tester.complete(ep,  new JsonObject(), endCondition, 10, TimeUnit.SECONDS);
  
  assertTrue(endCondition.valid());
}

代码示例来源:origin: apache/incubator-edgent

/**
 * Subscribe to the specified topics and yield a stream of tuples from the published RabbitMQ records.
 *
 * @param toTupleFn A function that yields a tuple from a byte array,
 * @param queue the specified RabbitMQ queue
 * @param <T> A function that yields a tuple from a
 * @return stream of tuples
 */
public <T> TStream<T> subscribe(Function<byte[], T> toTupleFn, String queue) {
  return topology.events(new RabbitmqSubscriber<>(connector, queue, toTupleFn));
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void testGenerateRestart() throws Exception {
 Topology t = newTopology("testGenerateRestart");
 Path tempFile1 = FileUtil.createTempFile("test1", ".txt", getLines());
 System.out.println("Test: "+t.getName()+" "+tempFile1);
 
 ProcessBuilder cmd = new ProcessBuilder(mkCatFileCmd(tempFile1.toString()));
 int NUM_RUNS = 3;
 List<String> expLines = new ArrayList<>();
 for (int i = 0; i < NUM_RUNS; i++) {
  expLines.addAll(Arrays.asList(getLines()));
 }
 
 // N.B. if looking at trace: EDGENT-224 generate() continues running after job is closed
 TStream<String> s = CommandStreams.generate(t, cmd);
 
 completeAndValidate("", t, s, 10 + ((NUM_RUNS-1) * 1/*restart delay time*/), expLines.toArray(new String[0]));
}

代码示例来源:origin: apache/incubator-edgent

@Test
public void metricsEverywhereMultiplePeek() throws Exception {
  Topology t = newTopology();
  Graph g = t.graph();
  TStream<String> s = t.strings("a", "b", "c");
  List<String> peekedValues = new ArrayList<>();
  TStream<String> speek = s.peek(tuple -> peekedValues.add(tuple + "1st"));
  TStream<String> speek2 = speek.peek(tuple -> peekedValues.add(tuple + "2nd"));
  TStream<String> speek3 = speek2.peek(tuple -> peekedValues.add(tuple + "3rd"));
  speek3.sink(tuple -> System.out.print("."));
  Collection<Vertex<? extends Oplet<?, ?>, ?, ?>> vertices = g.getVertices();
  assertEquals(5, vertices.size());
  Collection<Edge> edges = g.getEdges();
  assertEquals(4, edges.size());
  Metrics.counter(t);
  printGraph(g);
  // One single counter inserted after the 3rd peek 
  vertices = g.getVertices();
  assertEquals(6, vertices.size());
  edges = g.getEdges();
  assertEquals(5, edges.size());
}

代码示例来源:origin: apache/incubator-edgent

return topology.poll(commandReaderList(cmd), period, units);

代码示例来源:origin: apache/incubator-edgent

AtomicInteger n = new AtomicInteger(0);
@SuppressWarnings("unused")
TStream<Integer> ints = t.poll(() -> n.incrementAndGet(), 500, TimeUnit.MILLISECONDS);
Collection<Vertex<? extends Oplet<?, ?>, ?, ?>> vertices = t.graph().getVertices();
PeriodicSource<?> src = null;
for (Vertex<? extends Oplet<?, ?>, ?, ?> v : vertices) {

代码示例来源:origin: apache/incubator-edgent

t.getRuntimeServiceSupplier().get()
   .getService(StreamScopeRegistry.class);
if (rgy == null)
t.graph().peekAll( 
  () -> { return new org.apache.edgent.streamscope.oplets.StreamScope<>(new StreamScope<>()); },
  Functions.alwaysTrue());

代码示例来源:origin: org.apache.edgent/edgent-apps-runtime

jobEvents.sink(new JobRestarter(t.getRuntimeServiceSupplier()));

相关文章