如何在接收到n个元素后退出akka流?

qv7cva1a  于 2022-11-06  发布在  其他
关注(0)|答案(2)|浏览(138)

我对 akka 是个新手,我只是想掌握它的窍门。
作为一个实验,我想从一个Kinesis流中读取并收集n条消息,然后停止。
我发现的唯一一个停止阅读记录的方法是Sink.head(),但它只返回一条记录,我希望得到更多的记录。
我不太明白如何在收到n条消息后停止从流中阅读。
这是我目前为止尝试过的代码

@Test
  public void testReadingFromKinesisNRecords() throws ExecutionException, InterruptedException {
    final ActorSystem system = ActorSystem.create("foo");
    final Materializer materializer = ActorMaterializer.create(system);

    ProfileCredentialsProvider profileCredentialsProvider = ProfileCredentialsProvider.create();

    final KinesisAsyncClient kinesisClient = KinesisAsyncClient.builder()
        .credentialsProvider(profileCredentialsProvider)
        .region(Region.US_WEST_2)
            .httpClient(AkkaHttpClient.builder()
                .withActorSystem(system).build())
            .build();

    system.registerOnTermination(kinesisClient::close);

    String streamName = "akka-test-stream";
    String shardId = "shardId-000000000000";

    int numberOfRecordsToRead = 3;

    final ShardSettings settings = ShardSettings.create(streamName, shardId)
            .withRefreshInterval(Duration.ofSeconds(1))
            .withLimit(numberOfRecordsToRead) // return a maximum of n records (and quit?!)
            .withShardIterator(ShardIterators.latest());

    final Source<Record, NotUsed> sourceKinesisBasic = KinesisSource.basic(settings, kinesisClient);

    Flow<Record, String, NotUsed> flowMapRecordToString = Flow.of(Record.class).map(record -> extractDataFromRecord(record));
    Flow<String, String, NotUsed> flowPrinter = Flow.of(String.class).map(s -> debugPrint(s));
//    Flow<String, List<String>, NotUsed> flowGroupedWithinMinute =
//        Flow.of(String.class).groupedWithin(
//            numberOfRecordsToRead, // group size
//            Duration.ofSeconds(60) // group time
//        );

    Source<String, NotUsed> sourceStringsFromKinesisRecords = sourceKinesisBasic
        .via(flowMapRecordToString)
        .via(flowPrinter);
//        .via(flowGroupedWithinMinute); // nope

    // sink to list of strings
//    Sink<String, CompletionStage<List<String>>> sinkToList = Sink.seq();
    Sink<String, CompletionStage<List<String>>> sink10 = Sink.takeLast(10);
//    Sink<String, CompletionStage<String>> sinkHead = Sink.head(); // only gives you one message

    CompletionStage<List<String>> streamCompletion = sourceStringsFromKinesisRecords
        .runWith(sink10, materializer);
    CompletableFuture<List<String>> completableFuture = streamCompletion.toCompletableFuture();
    completableFuture.join(); // never stops running...
    List<String> result = completableFuture.get();
    int foo = 1;
  }

  private String extractDataFromRecord(Record record) {
    String encType = record.encryptionTypeAsString();
    Instant arrivalTimestamp = record.approximateArrivalTimestamp();
    String data = record.data().asString(StandardCharsets.UTF_8);
    return data;
  }

  private String debugPrint(String s) {
    System.out.println(s);
    return s;
  }

谢谢你提供的线索

6jygbczu

6jygbczu1#

我发现答案是在流级别使用takeN

...
Flow<String, String, NotUsed> flowTakeN = Flow.of(String.class).take(numberOfRecordsToRead);

Source<String, NotUsed> sourceStringsFromKinesisRecords = sourceKinesisBasic
    .via(flowMapRecordToString)
    .via(flowPrinter)
    .via(flowTakeN);
...
8ulbf1ek

8ulbf1ek2#

为了补充您找到的答案,也可以不使用via来更直接地表达事物:

Source<String, NotUsed> sourceStringsFromKinesisRecords = sourceKinesisBasic
    .map(record -> extractDataFromRecord(record))
    .map(s -> debugPrint(s))
    .take(10)

相关问题