本文整理了Java中org.apache.beam.sdk.transforms.Filter.by()
方法的一些代码示例,展示了Filter.by()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Filter.by()
方法的具体详情如下:
包路径:org.apache.beam.sdk.transforms.Filter
类名称:Filter
方法名:by
[英]Returns a PTransform that takes an input PCollection and returns a PCollection with elements that satisfy the given predicate. The predicate must be a SerializableFunction.
Example of use:
PCollection wordList = ...;
See also #lessThan, #lessThanEq, #greaterThan, #greaterThanEq, which return elements satisfying various inequalities with the specified value based on the elements' natural ordering.
[中]返回一个PTransform,该PTransform接受一个输入PCollection,并返回一个包含满足给定谓词的元素的PCollection。谓词必须是SerializableFunction。
使用示例:
PCollection wordList = ...;
另请参见#lessThan、#lessThanEq、#greaterThan、#greaterThanEq,它们返回满足各种不等式的元素,并根据元素的自然顺序指定值。
代码示例来源:origin: jbonofre/beam-samples
public static PCollection<String> filterByCountry(PCollection<String> data, final String country) {
return data.apply("FilterByCountry", Filter.by(new SerializableFunction<String, Boolean>() {
public Boolean apply(String row) {
return getCountry(row).equals(country);
}
}));
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
/**
* Returns a {@code PTransform} that takes an input {@code PCollection<T>} and returns a {@code
* PCollection<T>} with elements that equals to a given value. Elements must be {@code
* Comparable}.
*
* <p>Example of use:
*
* <pre>{@code
* PCollection<Integer> listOfNumbers = ...;
* PCollection<Integer> equalNumbers = listOfNumbers.apply(Filter.equal(1000));
* }</pre>
*
* <p>See also {@link #greaterThan}, {@link #lessThan}, {@link #lessThanEq} and {@link
* #greaterThanEq}, which return elements satisfying various inequalities with the specified value
* based on the elements' natural ordering.
*
* <p>See also {@link #by}, which returns elements that satisfy the given predicate.
*/
public static <T extends Comparable<T>> Filter<T> equal(final T value) {
return by((SerializableFunction<T, Boolean>) input -> input.compareTo(value) == 0)
.described(String.format("x == %s", value));
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
/**
* Returns a {@code PTransform} that takes an input {@code PCollection<T>} and returns a {@code
* PCollection<T>} with elements that are less than or equal to a given value, based on the
* elements' natural ordering. Elements must be {@code Comparable}.
*
* <p>Example of use:
*
* <pre>{@code
* PCollection<Integer> listOfNumbers = ...;
* PCollection<Integer> smallOrEqualNumbers =
* listOfNumbers.apply(Filter.lessThanEq(10));
* }</pre>
*
* <p>See also {@link #lessThan}, {@link #greaterThanEq}, {@link #equal} and {@link #greaterThan},
* which return elements satisfying various inequalities with the specified value based on the
* elements' natural ordering.
*
* <p>See also {@link #by}, which returns elements that satisfy the given predicate.
*/
public static <T extends Comparable<T>> Filter<T> lessThanEq(final T value) {
return by((SerializableFunction<T, Boolean>) input -> input.compareTo(value) <= 0)
.described(String.format("x ≤ %s", value));
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
/**
* Returns a {@code PTransform} that takes an input {@link PCollection} and returns a {@link
* PCollection} with elements that are less than a given value, based on the elements' natural
* ordering. Elements must be {@code Comparable}.
*
* <p>Example of use:
*
* <pre>{@code
* PCollection<Integer> listOfNumbers = ...;
* PCollection<Integer> smallNumbers =
* listOfNumbers.apply(Filter.lessThan(10));
* }</pre>
*
* <p>See also {@link #lessThanEq}, {@link #greaterThanEq}, {@link #equal} and {@link
* #greaterThan}, which return elements satisfying various inequalities with the specified value
* based on the elements' natural ordering.
*
* <p>See also {@link #by}, which returns elements that satisfy the given predicate.
*/
public static <T extends Comparable<T>> Filter<T> lessThan(final T value) {
return by((SerializableFunction<T, Boolean>) input -> input.compareTo(value) < 0)
.described(String.format("x < %s", value));
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
/**
* Returns a {@code PTransform} that takes an input {@code PCollection<T>} and returns a {@code
* PCollection<T>} with elements that are greater than a given value, based on the elements'
* natural ordering. Elements must be {@code Comparable}.
*
* <p>Example of use:
*
* <pre>{@code
* PCollection<Integer> listOfNumbers = ...;
* PCollection<Integer> largeNumbers =
* listOfNumbers.apply(Filter.greaterThan(1000));
* }</pre>
*
* <p>See also {@link #greaterThanEq}, {@link #lessThan}, {@link #equal} and {@link #lessThanEq},
* which return elements satisfying various inequalities with the specified value based on the
* elements' natural ordering.
*
* <p>See also {@link #by}, which returns elements that satisfy the given predicate.
*/
public static <T extends Comparable<T>> Filter<T> greaterThan(final T value) {
return by((SerializableFunction<T, Boolean>) input -> input.compareTo(value) > 0)
.described(String.format("x > %s", value));
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
/**
* Returns a {@code PTransform} that takes an input {@code PCollection<T>} and returns a {@code
* PCollection<T>} with elements that are greater than or equal to a given value, based on the
* elements' natural ordering. Elements must be {@code Comparable}.
*
* <p>Example of use:
*
* <pre>{@code
* PCollection<Integer> listOfNumbers = ...;
* PCollection<Integer> largeOrEqualNumbers =
* listOfNumbers.apply(Filter.greaterThanEq(1000));
* }</pre>
*
* <p>See also {@link #greaterThan}, {@link #lessThan}, {@link #equal} and {@link #lessThanEq},
* which return elements satisfying various inequalities with the specified value based on the
* elements' natural ordering.
*
* <p>See also {@link #by}, which returns elements that satisfy the given predicate.
*/
public static <T extends Comparable<T>> Filter<T> greaterThanEq(final T value) {
return by((SerializableFunction<T, Boolean>) input -> input.compareTo(value) >= 0)
.described(String.format("x ≥ %s", value));
}
代码示例来源:origin: com.google.cloud.genomics/google-genomics-dataflow
/**
* Compute a PCollection of reference allele frequencies for SNPs of interest.
* The SNPs all have only a single alternate allele, and neither the
* reference nor the alternate allele have a population frequency < minFreq.
* The results are returned in a PCollection indexed by Position.
*
* @param variants a set of variant calls for a reference population
* @param minFreq the minimum allele frequency for the set
* @return a PCollection mapping Position to AlleleCounts
*/
static PCollection<KV<Position, AlleleFreq>> getFreq(
PCollection<Variant> variants, double minFreq) {
return variants.apply("PassingFilter", Filter.by(VariantFunctions.IS_PASSING))
.apply("OnChromosomeFilter", Filter.by(VariantFunctions.IS_ON_CHROMOSOME))
.apply("NotLowQualityFilter", Filter.by(VariantFunctions.IS_NOT_LOW_QUALITY))
.apply("SNPFilter", Filter.by(VariantFunctions.IS_SINGLE_ALTERNATE_SNP))
.apply(ParDo.of(new GetAlleleFreq()))
.apply(Filter.by(new FilterFreq(minFreq)));
}
代码示例来源:origin: googlegenomics/dataflow-java
/**
* Compute a PCollection of reference allele frequencies for SNPs of interest.
* The SNPs all have only a single alternate allele, and neither the
* reference nor the alternate allele have a population frequency < minFreq.
* The results are returned in a PCollection indexed by Position.
*
* @param variants a set of variant calls for a reference population
* @param minFreq the minimum allele frequency for the set
* @return a PCollection mapping Position to AlleleCounts
*/
static PCollection<KV<Position, AlleleFreq>> getFreq(
PCollection<Variant> variants, double minFreq) {
return variants.apply("PassingFilter", Filter.by(VariantFunctions.IS_PASSING))
.apply("OnChromosomeFilter", Filter.by(VariantFunctions.IS_ON_CHROMOSOME))
.apply("NotLowQualityFilter", Filter.by(VariantFunctions.IS_NOT_LOW_QUALITY))
.apply("SNPFilter", Filter.by(VariantFunctions.IS_SINGLE_ALTERNATE_SNP))
.apply(ParDo.of(new GetAlleleFreq()))
.apply(Filter.by(new FilterFreq(minFreq)));
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-io-redis
@Override
public PCollection<KV<String, String>> expand(PCollection<KV<String, String>> input) {
// reparallelize mimics the same behavior as in JdbcIO
// breaking fusion
PCollectionView<Iterable<KV<String, String>>> empty =
input
.apply("Consume", Filter.by(SerializableFunctions.constant(false)))
.apply(View.asIterable());
PCollection<KV<String, String>> materialized =
input.apply(
"Identity",
ParDo.of(
new DoFn<KV<String, String>, KV<String, String>>() {
@ProcessElement
public void processElement(ProcessContext context) {
context.output(context.element());
}
})
.withSideInputs(empty));
return materialized.apply(Reshuffle.viaRandomKey());
}
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
@Test
@Category(NeedsRunner.class)
public void testNoFilterByPredicateWithLambda() {
PCollection<Integer> output = p.apply(Create.of(1, 2, 4, 5)).apply(Filter.by(i -> false));
PAssert.that(output).empty();
p.run();
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
@Test
@Category(NeedsRunner.class)
public void testIdentityFilterByPredicateWithLambda() {
PCollection<Integer> output =
p.apply(Create.of(591, 11789, 1257, 24578, 24799, 307)).apply(Filter.by(i -> true));
PAssert.that(output).containsInAnyOrder(591, 11789, 1257, 24578, 24799, 307);
p.run();
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
@Test
@Category(NeedsRunner.class)
public void testFilterByPredicateWithLambda() {
PCollection<Integer> output =
p.apply(Create.of(1, 2, 3, 4, 5, 6, 7)).apply(Filter.by(i -> i % 2 == 0));
PAssert.that(output).containsInAnyOrder(2, 4, 6);
p.run();
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
/**
* Confirms that in Java 8 style, where a lambda results in a rawtype, the output type token is
* not useful. If this test ever fails there may be simplifications available to us.
*/
@Test
public void testFilterParDoOutputTypeDescriptorRawWithLambda() throws Exception {
@SuppressWarnings({"unchecked", "rawtypes"})
PCollection<String> output = p.apply(Create.of("hello")).apply(Filter.by(s -> true));
thrown.expect(CannotProvideCoderException.class);
p.getCoderRegistry().getCoder(output.getTypeDescriptor());
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
@Test
@Category(NeedsRunner.class)
public void testIdentityFilterByPredicate() {
PCollection<Integer> output =
p.apply(Create.of(591, 11789, 1257, 24578, 24799, 307))
.apply(Filter.by(new TrivialFn(true)));
PAssert.that(output).containsInAnyOrder(591, 11789, 1257, 24578, 24799, 307);
p.run();
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
@Test
@Category(NeedsRunner.class)
public void testNoFilterByPredicate() {
PCollection<Integer> output =
p.apply(Create.of(1, 2, 4, 5)).apply(Filter.by(new TrivialFn(false)));
PAssert.that(output).empty();
p.run();
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
@Test
@Category(NeedsRunner.class)
public void testFilterByPredicate() {
PCollection<Integer> output =
p.apply(Create.of(1, 2, 3, 4, 5, 6, 7)).apply(Filter.by(new EvenFn()));
PAssert.that(output).containsInAnyOrder(2, 4, 6);
p.run();
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
@Test
@Category(NeedsRunner.class)
public void testFilterByMethodReferenceWithLambda() {
PCollection<Integer> output =
p.apply(Create.of(1, 2, 3, 4, 5, 6, 7)).apply(Filter.by(new EvenFilter()::isEven));
PAssert.that(output).containsInAnyOrder(2, 4, 6);
p.run();
}
代码示例来源:origin: org.apache.beam/beam-examples-java
/** A basic smoke test that ensures there is no crash at pipeline construction time. */
@Test
public void testMinimalWordCount() throws Exception {
p.getOptions().as(GcsOptions.class).setGcsUtil(buildMockGcsUtil());
p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))
.apply(
FlatMapElements.into(TypeDescriptors.strings())
.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+"))))
.apply(Filter.by((String word) -> !word.isEmpty()))
.apply(Count.perElement())
.apply(
MapElements.into(TypeDescriptors.strings())
.via(
(KV<String, Long> wordCount) ->
wordCount.getKey() + ": " + wordCount.getValue()))
.apply(TextIO.write().to("gs://your-output-bucket/and-output-prefix"));
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
@Test
@Category({ValidatesRunner.class, UsesSchema.class})
public void testSchemasPassedThrough() {
List<InferredPojo> pojoList =
Lists.newArrayList(
new InferredPojo("a", 1), new InferredPojo("b", 2), new InferredPojo("c", 3));
PCollection<InferredPojo> out = pipeline.apply(Create.of(pojoList)).apply(Filter.by(e -> true));
assertTrue(out.hasSchema());
pipeline.run();
}
}
代码示例来源:origin: org.apache.beam/beam-examples-java
/** Test the filtering. */
@Test
@Category(ValidatesRunner.class)
public void testUserScoresFilter() throws Exception {
final Instant startMinTimestamp = new Instant(1447965680000L);
PCollection<String> input = p.apply(Create.of(GAME_EVENTS).withCoder(StringUtf8Coder.of()));
PCollection<KV<String, Integer>> output =
input
.apply("ParseGameEvent", ParDo.of(new ParseEventFn()))
.apply(
"FilterStartTime",
Filter.by(
(GameActionInfo gInfo) -> gInfo.getTimestamp() > startMinTimestamp.getMillis()))
// run a map to access the fields in the result.
.apply(
MapElements.into(
TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))
.via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore())));
PAssert.that(output).containsInAnyOrder(FILTERED_EVENTS);
p.run().waitUntilFinish();
}
内容来源于网络,如有侵权,请联系作者删除!