我正在尝试将flink与ElasticSearch2.1.1集成,我正在使用maven依赖项
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch2_2.10</artifactId>
<version>1.1-SNAPSHOT</version>
</dependency>
这是我从kafka队列中读取事件的java代码(工作正常),但不知怎么的,这些事件没有在elasticsearch中发布,而且也没有错误,在下面的代码中,如果我更改与端口、主机名和,elasticsearch的群集名或索引名,然后我立即看到一个错误,但目前它没有显示任何错误,也没有在elasticsearch中创建任何新文档
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// parse user parameters
ParameterTool parameterTool = ParameterTool.fromArgs(args);
DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer082<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties()));
messageStream.print();
Map<String, String> config = new HashMap<>();
config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_INTERVAL_MS, "1");
config.put("cluster.name", "FlinkDemo");
List<InetSocketAddress> transports = new ArrayList<>();
transports.add(new InetSocketAddress(InetAddress.getByName("localhost"), 9300));
messageStream.addSink(new ElasticsearchSink<String>(config, transports, new TestElasticsearchSinkFunction()));
env.execute();
}
private static class TestElasticsearchSinkFunction implements ElasticsearchSinkFunction<String> {
private static final long serialVersionUID = 1L;
public IndexRequest createIndexRequest(String element) {
Map<String, Object> json = new HashMap<>();
json.put("data", element);
return Requests.indexRequest()
.index("flink").id("hash"+element).source(json);
}
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}
2条答案
按热度按时间t1qtbnec1#
我找到了flink&elasticsearch连接器的一个很好的例子
第一个maven依赖项:
第二个示例java代码
abithluo2#
我确实在本地机器上运行它并进行调试,但是,我唯一缺少的是正确配置日志记录,因为大多数弹性问题都在“log.warn”语句中描述。问题是elasticsearch-2.2.1客户端api中“bulkrequesthandler.java”内部出现异常,引发错误-“org.elasticsearch.action.actionrequestvalidationexception:validation failed:1:type is missing因为我已经创建了索引,但不是一个我觉得很奇怪的类型,因为它应该主要关注索引,并在默认情况下创建类型。