我创建了一个简单的apache-flink项目,它将从kafka主题读取数据并将数据写入s3存储桶。当我运行这个项目时,我没有收到任何错误,它成功地读取了kafka主题中的每条消息,但是没有任何内容写入我的s3 bucket。没有错误,因此很难尝试和调试正在进行的操作。下面是我的项目和配置。这仅在我使用StreamExecutionEnvironment时发生。如果我尝试使用常规的批处理执行环境生成s3,它就可以工作了。
s3测试java程序
public class S3Test {
public static void main(String[] args) throws Exception {
// parse input arguments
final ParameterTool parameterTool = ParameterTool.fromPropertiesFile(args[0]);
if(parameterTool.getNumberOfParameters() < 4) {
System.out.println("Missing parameters!\nUsage: Kafka --topic <topic> " +
"--bootstrap.servers <kafka brokers> --zookeeper.connect <zk quorum> --group.id <some id>");
return;
}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
env.enableCheckpointing(5000); // create a checkpoint every 5 seconds
env.getConfig().setGlobalJobParameters(parameterTool); //make parameters available in the web interface
DataStream<String> messageStream = env
.addSource(new FlinkKafkaConsumer09<String>(
parameterTool.getRequired("kafka.topic"),
new SimpleStringSchema(),
parameterTool.getProperties()));
// write kafka stream to standard out.
//messageStream.print();
String id = UUID.randomUUID().toString();
messageStream.writeAsText("s3://flink-data/" + id + ".txt").setParallelism(1);
env.execute("Write to S3 Example");
}
}
pom.xml文件
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
<version>1.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.10</artifactId>
<version>1.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.9_2.10</artifactId>
<version>1.1.4</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<version>1.7.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.2.5</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.2.5</version>
</dependency>
<!-- Apache Kafka Dependencies -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.9.0.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
core-site.xml(hadoop配置)
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
</property>
<property>
<name>fs.s3.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
<!-- Comma separated list of local directories used to buffer
large results prior to transmitting them to S3. -->
<property>
<name>fs.s3a.buffer.dir</name>
<value>/tmp</value>
</property>
<!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.Constants -->
<property>
<name>fs.s3a.access.key</name>
<value>***************</value>
</property>
<!-- set your AWS access key -->
<property>
<name>fs.s3a.secret.key</name>
<value>****************</value>
</property>
</configuration>
3条答案
按热度按时间9rbhqvlz1#
通过flink将kafka主题持久化到s3需要使用
RollingSink
. rollingsink使用bucketer指定零件文件将保存到的目录的名称。datetime是默认的bucketer,但是您也可以创建一个自定义的bucketer。当达到最大批量大小时,将保存并关闭零件文件,然后将创建新的零件文件。以下代码起作用:}
p8ekf7hl2#
帮助您获得一些调试信息的一个简单方法是打开s3 bucket的日志记录,该bucket应该接收kafka数据。这将为您提供更多信息,帮助您从s3的Angular 确定错误的来源:
http://docs.aws.amazon.com/amazons3/latest/ug/managingbucketlogging.html
bpzcxfmw3#
iam权限-确保要写入s3存储桶的角色。