我使用以下代码构建flink kinesis连接器:
public class DemoKinesisCA {
public static void main(String[] args) throws Exception {
ParameterTool pt = ParameterTool.fromArgs(args);
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
see.setParallelism(1);
Properties consumerConfig = new Properties();
consumerConfig.put(AWSConfigConstants.AWS_REGION, "cn-north-1");
consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "${my_key}");
consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "${my_secret_key}");
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON");
DataStream<String> kinesis = see.addSource(new FlinkKinesisConsumer<>(
"${my_stream}",
new SimpleStringSchema(),
consumerConfig));
kinesis.print();
see.execute();
}
}
代码运行正常,日志中未发现异常。我已将初始位置设置为 trim_horizon
这意味着使用最早的数据,但我可以从kinesis接收0字节。我被告知这不是一条空流,所以有什么不对。。。
感谢您的帮助。
日志如下所示:
2019-12-20 09:21:22,814 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Receive slot request 00b96cfdd8c98b09635926e3643210f3 for job c5449594afa59fde3826b82d6034a71b from resource manager with leader id 00000000000000000000000000000000.
2019-12-20 09:21:22,815 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Allocated slot for 00b96cfdd8c98b09635926e3643210f3.
2019-12-20 09:21:22,815 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Add job c5449594afa59fde3826b82d6034a71b for job leader monitoring.
2019-12-20 09:21:22,815 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Try to register at job manager akka.tcp://flink@localhost:6123/user/jobmanager_6 with leader id 00000000-0000-0000-0000-000000000000.
2019-12-20 09:21:22,819 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Resolved JobManager address, beginning registration
2019-12-20 09:21:22,819 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Registration at JobManager attempt 1 (timeout=100ms)
2019-12-20 09:21:22,826 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Successful registration at job manager akka.tcp://flink@localhost:6123/user/jobmanager_6 for job c5449594afa59fde3826b82d6034a71b.
2019-12-20 09:21:22,826 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Establish JobManager connection for job c5449594afa59fde3826b82d6034a71b.
2019-12-20 09:21:22,826 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Offer reserved slots to the leader of job c5449594afa59fde3826b82d6034a71b.
2019-12-20 09:21:22,830 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable - Activate slot 00b96cfdd8c98b09635926e3643210f3.
2019-12-20 09:21:22,837 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task Source: Custom Source -> Sink: Print to Std. Out (1/1).
2019-12-20 09:21:22,837 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> Sink: Print to Std. Out (1/1) (6febbaa53f94b70650cfce8281831aaf) switched from CREATED to DEPLOYING.
2019-12-20 09:21:22,837 INFO org.apache.flink.runtime.taskmanager.Task - Creating FileSystem stream leak safety net for task Source: Custom Source -> Sink: Print to Std. Out (1/1) (6febbaa53f94b70650cfce8281831aaf) [DEPLOYING]
2019-12-20 09:21:22,837 INFO org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task Source: Custom Source -> Sink: Print to Std. Out (1/1) (6febbaa53f94b70650cfce8281831aaf) [DEPLOYING].
2019-12-20 09:21:22,838 INFO org.apache.flink.runtime.blob.BlobClient - Downloading c5449594afa59fde3826b82d6034a71b/p-c6e596cf391265b05e5e166e9448a1b36bc8fb74-756c152250698cb8c1c63a809f05d4b3 from localhost/127.0.0.1:44568
2019-12-20 09:21:23,011 INFO org.apache.flink.runtime.taskmanager.Task - Registering task at network: Source: Custom Source -> Sink: Print to Std. Out (1/1) (6febbaa53f94b70650cfce8281831aaf) [DEPLOYING].
2019-12-20 09:21:23,012 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> Sink: Print to Std. Out (1/1) (6febbaa53f94b70650cfce8281831aaf) switched from DEPLOYING to RUNNING.
2019-12-20 09:21:23,013 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)
2019-12-20 09:21:23,037 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber does not contain a setter for field sequenceNumber
2019-12-20 09:21:23,037 INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class class org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
2019-12-20 09:21:23,038 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer - No restore state for FlinkKinesisConsumer.
2019-12-20 09:21:23,779 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer - Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='my_stream', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49599347585804022687811337727581452022704930306761162754,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
2019-12-20 09:21:23,780 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - Subtask 0 will start consuming seeded shard StreamShardHandle{streamName='my_stream', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49599347585804022687811337727581452022704930306761162754,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0
1条答案
按热度按时间ylamdve61#
欢迎来到java世界
"${my_stream}"
未评估,可能指向不存在的主题。如果
my_stream
是参数,则需要使用pt.get("my_stream")
.