[英]Returns a PTransform that takes an input PCollection and returns a PCollection with elements that satisfy the given predicate. The predicate must be a SerializableFunction.
Example of use:
PCollection wordList = ...;
See also #lessThan, #lessThanEq, #greaterThan, #greaterThanEq, which return elements satisfying various inequalities with the specified value based on the elements' natural ordering.
PCollection wordList = ...;
代码示例来源:origin: jbonofre/beam-samples
public static PCollection<String> filterByCountry(PCollection<String> data, final String country) {
return data.apply("FilterByCountry", Filter.by(new SerializableFunction<String, Boolean>() {
public Boolean apply(String row) {
return getCountry(row).equals(country);
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
* Returns a {@code PTransform} that takes an input {@code PCollection<T>} and returns a {@code
* PCollection<T>} with elements that equals to a given value. Elements must be {@code
* Comparable}.
* <p>Example of use:
* <pre>{@code
* PCollection<Integer> listOfNumbers = ...;
* PCollection<Integer> equalNumbers = listOfNumbers.apply(Filter.equal(1000));
* }</pre>
* <p>See also {@link #greaterThan}, {@link #lessThan}, {@link #lessThanEq} and {@link
* #greaterThanEq}, which return elements satisfying various inequalities with the specified value
* based on the elements' natural ordering.
* <p>See also {@link #by}, which returns elements that satisfy the given predicate.
public static <T extends Comparable<T>> Filter<T> equal(final T value) {
return by((SerializableFunction<T, Boolean>) input -> input.compareTo(value) == 0)
.described(String.format("x == %s", value));
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
* Returns a {@code PTransform} that takes an input {@code PCollection<T>} and returns a {@code
* PCollection<T>} with elements that are less than or equal to a given value, based on the
* elements' natural ordering. Elements must be {@code Comparable}.
* <p>Example of use:
* <pre>{@code
* PCollection<Integer> listOfNumbers = ...;
* PCollection<Integer> smallOrEqualNumbers =
* listOfNumbers.apply(Filter.lessThanEq(10));
* }</pre>
* <p>See also {@link #lessThan}, {@link #greaterThanEq}, {@link #equal} and {@link #greaterThan},
* which return elements satisfying various inequalities with the specified value based on the
* elements' natural ordering.
* <p>See also {@link #by}, which returns elements that satisfy the given predicate.
public static <T extends Comparable<T>> Filter<T> lessThanEq(final T value) {
return by((SerializableFunction<T, Boolean>) input -> input.compareTo(value) <= 0)
.described(String.format("x ≤ %s", value));
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
* Returns a {@code PTransform} that takes an input {@link PCollection} and returns a {@link
* PCollection} with elements that are less than a given value, based on the elements' natural
* ordering. Elements must be {@code Comparable}.
* <p>Example of use:
* <pre>{@code
* PCollection<Integer> listOfNumbers = ...;
* PCollection<Integer> smallNumbers =
* listOfNumbers.apply(Filter.lessThan(10));
* }</pre>
* <p>See also {@link #lessThanEq}, {@link #greaterThanEq}, {@link #equal} and {@link
* #greaterThan}, which return elements satisfying various inequalities with the specified value
* based on the elements' natural ordering.
* <p>See also {@link #by}, which returns elements that satisfy the given predicate.
public static <T extends Comparable<T>> Filter<T> lessThan(final T value) {
return by((SerializableFunction<T, Boolean>) input -> input.compareTo(value) < 0)
.described(String.format("x < %s", value));
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
* Returns a {@code PTransform} that takes an input {@code PCollection<T>} and returns a {@code
* PCollection<T>} with elements that are greater than a given value, based on the elements'
* natural ordering. Elements must be {@code Comparable}.
* <p>Example of use:
* <pre>{@code
* PCollection<Integer> listOfNumbers = ...;
* PCollection<Integer> largeNumbers =
* listOfNumbers.apply(Filter.greaterThan(1000));
* }</pre>
* <p>See also {@link #greaterThanEq}, {@link #lessThan}, {@link #equal} and {@link #lessThanEq},
* which return elements satisfying various inequalities with the specified value based on the
* elements' natural ordering.
* <p>See also {@link #by}, which returns elements that satisfy the given predicate.
public static <T extends Comparable<T>> Filter<T> greaterThan(final T value) {
return by((SerializableFunction<T, Boolean>) input -> input.compareTo(value) > 0)
.described(String.format("x > %s", value));
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
* Returns a {@code PTransform} that takes an input {@code PCollection<T>} and returns a {@code
* PCollection<T>} with elements that are greater than or equal to a given value, based on the
* elements' natural ordering. Elements must be {@code Comparable}.
* <p>Example of use:
* <pre>{@code
* PCollection<Integer> listOfNumbers = ...;
* PCollection<Integer> largeOrEqualNumbers =
* listOfNumbers.apply(Filter.greaterThanEq(1000));
* }</pre>
* <p>See also {@link #greaterThan}, {@link #lessThan}, {@link #equal} and {@link #lessThanEq},
* which return elements satisfying various inequalities with the specified value based on the
* elements' natural ordering.
* <p>See also {@link #by}, which returns elements that satisfy the given predicate.
public static <T extends Comparable<T>> Filter<T> greaterThanEq(final T value) {
return by((SerializableFunction<T, Boolean>) input -> input.compareTo(value) >= 0)
.described(String.format("x ≥ %s", value));
代码示例来源:origin: com.google.cloud.genomics/google-genomics-dataflow
* Compute a PCollection of reference allele frequencies for SNPs of interest.
* The SNPs all have only a single alternate allele, and neither the
* reference nor the alternate allele have a population frequency < minFreq.
* The results are returned in a PCollection indexed by Position.
* @param variants a set of variant calls for a reference population
* @param minFreq the minimum allele frequency for the set
* @return a PCollection mapping Position to AlleleCounts
static PCollection<KV<Position, AlleleFreq>> getFreq(
PCollection<Variant> variants, double minFreq) {
return variants.apply("PassingFilter", Filter.by(VariantFunctions.IS_PASSING))
.apply("OnChromosomeFilter", Filter.by(VariantFunctions.IS_ON_CHROMOSOME))
.apply("NotLowQualityFilter", Filter.by(VariantFunctions.IS_NOT_LOW_QUALITY))
.apply("SNPFilter", Filter.by(VariantFunctions.IS_SINGLE_ALTERNATE_SNP))
.apply(ParDo.of(new GetAlleleFreq()))
.apply(Filter.by(new FilterFreq(minFreq)));
代码示例来源:origin: googlegenomics/dataflow-java
* Compute a PCollection of reference allele frequencies for SNPs of interest.
* The SNPs all have only a single alternate allele, and neither the
* reference nor the alternate allele have a population frequency < minFreq.
* The results are returned in a PCollection indexed by Position.
* @param variants a set of variant calls for a reference population
* @param minFreq the minimum allele frequency for the set
* @return a PCollection mapping Position to AlleleCounts
static PCollection<KV<Position, AlleleFreq>> getFreq(
PCollection<Variant> variants, double minFreq) {
return variants.apply("PassingFilter", Filter.by(VariantFunctions.IS_PASSING))
.apply("OnChromosomeFilter", Filter.by(VariantFunctions.IS_ON_CHROMOSOME))
.apply("NotLowQualityFilter", Filter.by(VariantFunctions.IS_NOT_LOW_QUALITY))
.apply("SNPFilter", Filter.by(VariantFunctions.IS_SINGLE_ALTERNATE_SNP))
.apply(ParDo.of(new GetAlleleFreq()))
.apply(Filter.by(new FilterFreq(minFreq)));
代码示例来源:origin: org.apache.beam/beam-sdks-java-io-redis
public PCollection<KV<String, String>> expand(PCollection<KV<String, String>> input) {
// reparallelize mimics the same behavior as in JdbcIO
// breaking fusion
PCollectionView<Iterable<KV<String, String>>> empty =
.apply("Consume", Filter.by(SerializableFunctions.constant(false)))
PCollection<KV<String, String>> materialized =
new DoFn<KV<String, String>, KV<String, String>>() {
public void processElement(ProcessContext context) {
return materialized.apply(Reshuffle.viaRandomKey());
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
public void testNoFilterByPredicateWithLambda() {
PCollection<Integer> output = p.apply(Create.of(1, 2, 4, 5)).apply(Filter.by(i -> false));
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
public void testIdentityFilterByPredicateWithLambda() {
PCollection<Integer> output =
p.apply(Create.of(591, 11789, 1257, 24578, 24799, 307)).apply(Filter.by(i -> true));
PAssert.that(output).containsInAnyOrder(591, 11789, 1257, 24578, 24799, 307);
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
public void testFilterByPredicateWithLambda() {
PCollection<Integer> output =
p.apply(Create.of(1, 2, 3, 4, 5, 6, 7)).apply(Filter.by(i -> i % 2 == 0));
PAssert.that(output).containsInAnyOrder(2, 4, 6);
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
* Confirms that in Java 8 style, where a lambda results in a rawtype, the output type token is
* not useful. If this test ever fails there may be simplifications available to us.
public void testFilterParDoOutputTypeDescriptorRawWithLambda() throws Exception {
@SuppressWarnings({"unchecked", "rawtypes"})
PCollection<String> output = p.apply(Create.of("hello")).apply(Filter.by(s -> true));
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
public void testIdentityFilterByPredicate() {
PCollection<Integer> output =
p.apply(Create.of(591, 11789, 1257, 24578, 24799, 307))
.apply(Filter.by(new TrivialFn(true)));
PAssert.that(output).containsInAnyOrder(591, 11789, 1257, 24578, 24799, 307);
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
public void testNoFilterByPredicate() {
PCollection<Integer> output =
p.apply(Create.of(1, 2, 4, 5)).apply(Filter.by(new TrivialFn(false)));
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
public void testFilterByPredicate() {
PCollection<Integer> output =
p.apply(Create.of(1, 2, 3, 4, 5, 6, 7)).apply(Filter.by(new EvenFn()));
PAssert.that(output).containsInAnyOrder(2, 4, 6);
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
public void testFilterByMethodReferenceWithLambda() {
PCollection<Integer> output =
p.apply(Create.of(1, 2, 3, 4, 5, 6, 7)).apply(Filter.by(new EvenFilter()::isEven));
PAssert.that(output).containsInAnyOrder(2, 4, 6);
代码示例来源:origin: org.apache.beam/beam-examples-java
/** A basic smoke test that ensures there is no crash at pipeline construction time. */
public void testMinimalWordCount() throws Exception {
.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+"))))
.apply(Filter.by((String word) -> !word.isEmpty()))
(KV<String, Long> wordCount) ->
wordCount.getKey() + ": " + wordCount.getValue()))
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
@Category({ValidatesRunner.class, UsesSchema.class})
public void testSchemasPassedThrough() {
List<InferredPojo> pojoList =
new InferredPojo("a", 1), new InferredPojo("b", 2), new InferredPojo("c", 3));
PCollection<InferredPojo> out = pipeline.apply(Create.of(pojoList)).apply(Filter.by(e -> true));
代码示例来源:origin: org.apache.beam/beam-examples-java
/** Test the filtering. */
public void testUserScoresFilter() throws Exception {
final Instant startMinTimestamp = new Instant(1447965680000L);
PCollection<String> input = p.apply(Create.of(GAME_EVENTS).withCoder(StringUtf8Coder.of()));
PCollection<KV<String, Integer>> output =
.apply("ParseGameEvent", ParDo.of(new ParseEventFn()))
(GameActionInfo gInfo) -> gInfo.getTimestamp() > startMinTimestamp.getMillis()))
// run a map to access the fields in the result.
TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))
.via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore())));