本文整理了Java中org.apache.edgent.topology.Topology
类的一些代码示例,展示了Topology
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Topology
类的具体详情如下:
包路径:org.apache.edgent.topology.Topology
类名称:Topology
[英]A declaration of a topology of streaming data. This class provides some fundamental generic methods to create source streams, such as #source(Supplier), #poll(Supplier,long,TimeUnit), #strings(String...).
See Edgent Source Streams.
[中]流数据拓扑的声明。这个类提供了一些基本的通用方法来创建源流,例如#source(Supplier)、#poll(Supplier,long,TimeUnit)、#strings(String…)。
见Edgent Source Streams。
代码示例来源:origin: org.apache.edgent/edgent-test-appservice-applications
@Override
public BiConsumer<Topology, JsonObject> getBuilder() {
return (t,c) -> t.strings(getName()).print();
}
}
代码示例来源:origin: apache/incubator-edgent
protected void waitUntilComplete(Topology t, TStream<String> s, String[] data) throws Exception {
Condition<Long> tc = t.getTester().tupleCount(s, data.length);
complete(t, tc);
// Save the job.
job = t.getTester().getJob();
assertNotNull(job);
}
}
代码示例来源:origin: apache/incubator-edgent
@Test
public void testBasics() {
final Topology t = newTopology("T123");
assertEquals("T123", t.getName());
assertSame(t, t.topology());
}
代码示例来源:origin: apache/incubator-edgent
/**
* Declares a stream populated by {@link JobRegistryService} events.
* <p>
* The job registry is passed as a runtime service. At startup
* {@code JobRegistryService#addListener()} is called by the
* runtime to subscribe an event listener. The listener invokes the given
* {@code wrapper} function to construct a tuple from a job event
* and submits the tuple on the returned stream.</p>
* <p>
* When the topology's execution is terminated,
* {@code JobRegistryServic#removeListener()} in invoked to unsubscribe
* the tuple source from the job registry.
* </p>
*
* @param <T> Tuple type
* @param topology the stream topology
* @param wrapper constructs a tuple from a job event
* @return new stream containing the tuples generated by the specified {@code wrapper}.
*
* @see Topology#getRuntimeServiceSupplier()
* @see JobRegistryService#addListener(BiConsumer)
* @see JobRegistryService#removeListener(BiConsumer)
*/
public static <T> TStream<T> source(
Topology topology,
BiFunction<JobRegistryService.EventType, Job, T> wrapper) {
Supplier<RuntimeServices> rts = topology.getRuntimeServiceSupplier();
return topology.events(new JobEventsSetup<T>(wrapper, rts));
}
代码示例来源:origin: apache/incubator-edgent
@Test
public void testStringContants() throws Exception {
Topology t = newTopology();
TStream<String> s = t.strings("a", "b", "c");
assertStream(t, s);
Condition<Long> tc = t.getTester().tupleCount(s, 3);
Condition<List<String>> contents = t.getTester().streamContents(s, "a", "b", "c");
complete(t, tc);
assertTrue(contents.valid());
}
代码示例来源:origin: apache/incubator-edgent
@Test
public void testSimple() throws Exception {
Topology t = newTopology("testSimple");
Map<String, Object> configMap = initRabbitmqConfig();
MsgGenerator generator = new MsgGenerator(t.getName());
String queue = "testQueue";
List<String> msgs = createMsgs(generator, queue, getMsg1(), getMsg2());
TStream<String> stream = PlumbingStreams.blockingOneShotDelay(
t.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
RabbitmqConsumer consumer = new RabbitmqConsumer(t, () -> configMap);
TStream<String> receivedStream = consumer.subscribe((byte[] bytes) -> new String(bytes), queue);
RabbitmqProducer producer = new RabbitmqProducer(t, () -> configMap);
TSink<String> sink = producer.publish(stream, queue, (String s) -> s.getBytes());
completeAndValidate("", t, receivedStream, generator, SEC_TIMEOUT, msgs.toArray(new String[0]));
assertNotNull(sink);
}
代码示例来源:origin: apache/incubator-edgent
@Test
public void testDeadtimeTooShort() throws Exception {
Topology topology = newTopology("testDeadtimeTooShort");
TStream<Integer> values = topology.of(1,2,3,4,5,6,7,8,9,10);
// no deadtime due to < 1ms
TStream<Integer> filtered = Filters.deadtime(values, 999, TimeUnit.MICROSECONDS);
Condition<Long> count = topology.getTester().tupleCount(filtered, 10);
Condition<List<Integer>> contents = topology.getTester().streamContents(filtered, 1,2,3,4,5,6,7,8,9,10 );
complete(topology, count);
assertTrue(contents.getResult().toString(), contents.valid());
}
代码示例来源:origin: apache/incubator-edgent
@Test
public void testMetricsEverywhere() throws Exception {
Topology t = newTopology();
TStream<String> s = t.strings("a", "b", "c");
// Condition inserts a sink
Condition<Long> tc = t.getTester().tupleCount(s, 3);
Graph g = t.graph();
Collection<Vertex<? extends Oplet<?, ?>, ?, ?>> vertices = g.getVertices();
// Two vertices before submission
assertEquals(2, vertices.size());
complete(t, tc);
// At least three vertices after submission
// (provide may have added other oplets as well)
Collection<Vertex<? extends Oplet<?, ?>, ?, ?>> verticesAfterSubmit = g.getVertices();
assertTrue("size="+verticesAfterSubmit.size(), verticesAfterSubmit.size() >= 3);
// There is exactly one vertex for a metric oplet
int numOplets = 0;
for (Vertex<? extends Oplet<?, ?>, ?, ?> v : verticesAfterSubmit) {
Oplet<?,?> oplet = v.getInstance();
if (oplet instanceof CounterOp) {
numOplets++;
}
}
assertEquals(1, numOplets);
}
代码示例来源:origin: apache/incubator-edgent
@Test
public void testRuntimeServices() throws Exception {
Topology t = newTopology();
TStream<String> s = t.strings("A");
Supplier<RuntimeServices> serviceGetter =
t.getRuntimeServiceSupplier();
TStream<Boolean> b = s.map(tuple ->
serviceGetter.get().getService(ThreadFactory.class) != null
&& serviceGetter.get().getService(ScheduledExecutorService.class) != null
);
Condition<List<Boolean>> tc = t.getTester().streamContents(b, Boolean.TRUE);
complete(t, tc);
assertTrue(tc.valid());
}
代码示例来源:origin: apache/incubator-edgent
@Test
public void testDeadtime() throws Exception {
Topology topology = newTopology("testDeadtime");
int maxTupleCnt = 10;
AtomicInteger cnt = new AtomicInteger();
TStream<Integer> values = topology.poll(() -> {
int curCnt = cnt.incrementAndGet();
if (curCnt > maxTupleCnt)
return null;
return curCnt;
}, 100, TimeUnit.MILLISECONDS);
// use a deadtime value that causes filtering of every other tuple
TStream<Integer> filtered = Filters.deadtime(values, 150, TimeUnit.MILLISECONDS);
Condition<Long> count = topology.getTester().tupleCount(filtered, maxTupleCnt/2);
Condition<List<Integer>> contents = topology.getTester().streamContents(filtered, 1, 3, 5, 7, 9 );
complete(topology, count);
assertTrue(contents.getResult().toString(), contents.valid());
}
代码示例来源:origin: apache/incubator-edgent
@Test
public void testString() throws Exception {
Topology t = newTopology("testString");
System.out.println("===== "+t.getName());
startEchoer(); // before getConfig() so it gets the port
Properties config = getConfig();
WebSocketClient wsClient = new Jsr356WebSocketClient(t, config);
String[] expected = new String[] { getStr1(), getStr2() };
TStream<String> s = t.strings(expected);
s = PlumbingStreams.blockingOneShotDelay(s, 2, TimeUnit.SECONDS);
wsClient.sendString(s);
TStream<String> rcvd = wsClient.receiveString();
completeAndValidate("", t, rcvd, SEC_TMO, expected);
}
代码示例来源:origin: apache/incubator-edgent
TStream<Integer> values = top.of(resultTuples);
});
Condition<Long> count = top.getTester().tupleCount(result, resultTuples.length);
Condition<List<Integer>> contents = top.getTester().contentsUnordered(result2, resultTuples);
long expMinDuration = (resultTuples.length / width) * 100;
System.out.println(top.getName()+" expMinDuration="+expMinDuration+" actDuration="+actDuration+" expMinSerialDuration="+expMinSerialDuration);
System.err.println(top.getName()+" WARNING skipped performance check on 'ci' system use");
else
assertTrue("expMinSerialDuration="+expMinSerialDuration+" actDuration="+actDuration,
代码示例来源:origin: apache/incubator-edgent
@Test
public void testStatusCode() throws Exception {
DirectProvider ep = new DirectProvider();
Topology topology = ep.newTopology();
String url = "http://httpbin.org/status/";
TStream<Integer> rc = HttpStreams.<Integer,Integer>requests(
topology.collection(Arrays.asList(200, 404, 202)),
HttpClients::noAuthentication,
t-> HttpGet.METHOD_NAME,
t-> url + Integer.toString(t),
(t,resp) -> resp.getStatusLine().getStatusCode());
Tester tester = topology.getTester();
Condition<List<Integer>> endCondition = tester.streamContents(rc, 200, 404, 202);
tester.complete(ep, new JsonObject(), endCondition, 10, TimeUnit.SECONDS);
assertTrue(endCondition.valid());
}
代码示例来源:origin: apache/incubator-edgent
/**
* Subscribe to the specified topics and yield a stream of tuples from the published RabbitMQ records.
*
* @param toTupleFn A function that yields a tuple from a byte array,
* @param queue the specified RabbitMQ queue
* @param <T> A function that yields a tuple from a
* @return stream of tuples
*/
public <T> TStream<T> subscribe(Function<byte[], T> toTupleFn, String queue) {
return topology.events(new RabbitmqSubscriber<>(connector, queue, toTupleFn));
}
代码示例来源:origin: apache/incubator-edgent
@Test
public void testGenerateRestart() throws Exception {
Topology t = newTopology("testGenerateRestart");
Path tempFile1 = FileUtil.createTempFile("test1", ".txt", getLines());
System.out.println("Test: "+t.getName()+" "+tempFile1);
ProcessBuilder cmd = new ProcessBuilder(mkCatFileCmd(tempFile1.toString()));
int NUM_RUNS = 3;
List<String> expLines = new ArrayList<>();
for (int i = 0; i < NUM_RUNS; i++) {
expLines.addAll(Arrays.asList(getLines()));
}
// N.B. if looking at trace: EDGENT-224 generate() continues running after job is closed
TStream<String> s = CommandStreams.generate(t, cmd);
completeAndValidate("", t, s, 10 + ((NUM_RUNS-1) * 1/*restart delay time*/), expLines.toArray(new String[0]));
}
代码示例来源:origin: apache/incubator-edgent
@Test
public void metricsEverywhereMultiplePeek() throws Exception {
Topology t = newTopology();
Graph g = t.graph();
TStream<String> s = t.strings("a", "b", "c");
List<String> peekedValues = new ArrayList<>();
TStream<String> speek = s.peek(tuple -> peekedValues.add(tuple + "1st"));
TStream<String> speek2 = speek.peek(tuple -> peekedValues.add(tuple + "2nd"));
TStream<String> speek3 = speek2.peek(tuple -> peekedValues.add(tuple + "3rd"));
speek3.sink(tuple -> System.out.print("."));
Collection<Vertex<? extends Oplet<?, ?>, ?, ?>> vertices = g.getVertices();
assertEquals(5, vertices.size());
Collection<Edge> edges = g.getEdges();
assertEquals(4, edges.size());
Metrics.counter(t);
printGraph(g);
// One single counter inserted after the 3rd peek
vertices = g.getVertices();
assertEquals(6, vertices.size());
edges = g.getEdges();
assertEquals(5, edges.size());
}
代码示例来源:origin: apache/incubator-edgent
return topology.poll(commandReaderList(cmd), period, units);
代码示例来源:origin: apache/incubator-edgent
AtomicInteger n = new AtomicInteger(0);
@SuppressWarnings("unused")
TStream<Integer> ints = t.poll(() -> n.incrementAndGet(), 500, TimeUnit.MILLISECONDS);
Collection<Vertex<? extends Oplet<?, ?>, ?, ?>> vertices = t.graph().getVertices();
PeriodicSource<?> src = null;
for (Vertex<? extends Oplet<?, ?>, ?, ?> v : vertices) {
代码示例来源:origin: apache/incubator-edgent
t.getRuntimeServiceSupplier().get()
.getService(StreamScopeRegistry.class);
if (rgy == null)
t.graph().peekAll(
() -> { return new org.apache.edgent.streamscope.oplets.StreamScope<>(new StreamScope<>()); },
Functions.alwaysTrue());
代码示例来源:origin: org.apache.edgent/edgent-apps-runtime
jobEvents.sink(new JobRestarter(t.getRuntimeServiceSupplier()));
内容来源于网络,如有侵权,请联系作者删除!