版本\u冲突\u引擎\u异常,带有\u update \u by \u查询

pbwdgjma  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(335)

我在flink中使用elasticsearch update by query api,flink并行度为1。但是我得到了版本冲突引擎异常,这是我在flink richsink函数中的代码,如下所示:

UpdateByQueryRequestBuilder builder = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
        builder.abortOnVersionConflict(true);
        builder.source(indexName);
        builder.filter(filter);
        builder.setMaxRetries(MAX_RETRIES);
        builder.refresh(true);

        String updateTime = Instant.ofEpochMilli(ts).atZone(ZoneId.systemDefault())
                .format(ELASTIC_SEARCH_DATE_TIME_FORMATTER);

        Map<String, Object> params = Maps.newHashMap();
        params.put("fieldName", fieldName);
        params.put("updateTime", updateTime);
        params.put("model", this.transformMap(JacksonUtils.convertValue(model, new TypeReference<Map<String, Object>>() {
        })));

        builder.script(new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, UPDATE_BY_MODEL_PAINLESS_CODE, params));
        BulkByScrollResponse response = builder.get();

我可以肯定,只有这个应用程序访问elasticsearch,flink parallelism是1就像在单线程调用updatebyqueryapi?为什么我有版本冲突引擎异常?如何做到一次又一次?

wgmfuz8q

wgmfuz8q1#

我看到两种可能性:
正在运行可以更新文档的其他程序。
flink的elasticsearch接收器至少提供一次保证,这意味着在发生故障时,接收器有时会在恢复期间执行重复写入。可能这会导致尝试使用过期的版本号更新文档。

相关问题