我对 akka 是个新手,我只是想掌握它的窍门。
作为一个实验,我想从一个Kinesis流中读取并收集n条消息,然后停止。
我发现的唯一一个停止阅读记录的方法是Sink.head(),但它只返回一条记录,我希望得到更多的记录。
我不太明白如何在收到n条消息后停止从流中阅读。
这是我目前为止尝试过的代码
@Test
public void testReadingFromKinesisNRecords() throws ExecutionException, InterruptedException {
final ActorSystem system = ActorSystem.create("foo");
final Materializer materializer = ActorMaterializer.create(system);
ProfileCredentialsProvider profileCredentialsProvider = ProfileCredentialsProvider.create();
final KinesisAsyncClient kinesisClient = KinesisAsyncClient.builder()
.credentialsProvider(profileCredentialsProvider)
.region(Region.US_WEST_2)
.httpClient(AkkaHttpClient.builder()
.withActorSystem(system).build())
.build();
system.registerOnTermination(kinesisClient::close);
String streamName = "akka-test-stream";
String shardId = "shardId-000000000000";
int numberOfRecordsToRead = 3;
final ShardSettings settings = ShardSettings.create(streamName, shardId)
.withRefreshInterval(Duration.ofSeconds(1))
.withLimit(numberOfRecordsToRead) // return a maximum of n records (and quit?!)
.withShardIterator(ShardIterators.latest());
final Source<Record, NotUsed> sourceKinesisBasic = KinesisSource.basic(settings, kinesisClient);
Flow<Record, String, NotUsed> flowMapRecordToString = Flow.of(Record.class).map(record -> extractDataFromRecord(record));
Flow<String, String, NotUsed> flowPrinter = Flow.of(String.class).map(s -> debugPrint(s));
// Flow<String, List<String>, NotUsed> flowGroupedWithinMinute =
// Flow.of(String.class).groupedWithin(
// numberOfRecordsToRead, // group size
// Duration.ofSeconds(60) // group time
// );
Source<String, NotUsed> sourceStringsFromKinesisRecords = sourceKinesisBasic
.via(flowMapRecordToString)
.via(flowPrinter);
// .via(flowGroupedWithinMinute); // nope
// sink to list of strings
// Sink<String, CompletionStage<List<String>>> sinkToList = Sink.seq();
Sink<String, CompletionStage<List<String>>> sink10 = Sink.takeLast(10);
// Sink<String, CompletionStage<String>> sinkHead = Sink.head(); // only gives you one message
CompletionStage<List<String>> streamCompletion = sourceStringsFromKinesisRecords
.runWith(sink10, materializer);
CompletableFuture<List<String>> completableFuture = streamCompletion.toCompletableFuture();
completableFuture.join(); // never stops running...
List<String> result = completableFuture.get();
int foo = 1;
}
private String extractDataFromRecord(Record record) {
String encType = record.encryptionTypeAsString();
Instant arrivalTimestamp = record.approximateArrivalTimestamp();
String data = record.data().asString(StandardCharsets.UTF_8);
return data;
}
private String debugPrint(String s) {
System.out.println(s);
return s;
}
谢谢你提供的线索
2条答案
按热度按时间6jygbczu1#
我发现答案是在流级别使用takeN
8ulbf1ek2#
为了补充您找到的答案,也可以不使用
via
来更直接地表达事物: