即使在IfSeqNo和IfPrimaryTerm值与分配的序列号和主要术语不匹配之后,ElasticSearch文档也会更新

nnt7mjpx  于 2023-04-20  发布在  ElasticSearch
关注(0)|答案(2)|浏览(108)

我试图通过使用具有与分配给文档的序列号和主要术语不同的索引请求来验证Elastic搜索文档是否正确更新。根据文档,如果IfSeqNo和IfPrimaryTerm值与文档的sequence_no和主要术语不匹配,则ES将抛出VersionConflictEngineException。我的测试代码文档正在更新,序列号正在递增,即使我在索引请求中设置了一个旧的序列号。我希望抛出VersionConflictEngineException异常,但这并没有发生。让我知道我的代码有什么问题。先谢谢你了。
附加我的java代码。test方法是入口点。我试着在同一个ES集群上从kibana客户端做同样的事情,但它按预期工作(抛出异常)。
Elasticsearch版本为6.8 JavaClient版本为elasticsearch-6.6.1

/**
     * only perform this indexing request if the document was last modification was assigned the given
     * sequence number. Must be used in combination with {@link #setIfPrimaryTerm(long)}
     *
     * If the document last modification was assigned a different sequence number a
     * {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
     */
    public IndexRequest setIfSeqNo(long seqNo) { }

Kibana客户端示例

POST /table_index/_doc/606635778254_db_dcs_int_test4_test_table_6SjooYiji3?if_seq_no=1574635&if_primary_term=6
{
    "name" : "test_table_6SjooYiji3",
    "description" : null,
    "catalogId" : "606635778254",
    "databaseName" : "db_dcs_int_test4"
}

Response from kibana

{
   "error": {
    "root_cause": [
      {
        "type": "version_conflict_engine_exception",
        "reason": "[gluetable][606635778254_db_dcs_int_test4_test_table_6SjooYiji3]: version conflict, required seqNo [1574635], primary term [6]. current document has seqNo [1574661] and primary term [6]",
        "index_uuid": "jisjayWfTTe7Rd-Ll48qWw",
        "shard": "0",
        "index": "table_index"
      }
    ],
    "type": "version_conflict_engine_exception",
    "reason": "[gluetable][606635778254_db_dcs_int_test4_test_table_6SjooYiji3]: version conflict, required seqNo [1574635], primary term [6]. current document has seqNo [1574661] and primary term [6]",
    "index_uuid": "jisjayWfTTe7Rd-Ll48qWw",
    "shard": "0",
    "index": "table_index"
   },
   "status": 409
}
private void test() {
    String catalogId="606635778254";
    String databaseName="db_dcs_int_test4";
    String tableName="test_table_6SjooYiji3";
    com.amazon.spektrdatacataloglambda.DBTable dbTable = new com.amazon.spektrdatacataloglambda.DBTable();
    dbTable.setName(tableName);
    dbTable.setCatalogId(catalogId);
    dbTable.setMetadata(new HashMap<String, String>(){{ put("test","singhjfi");}});
    elasticSearchUtils.upsertTableToTableIndexInES(catalogId, databaseName, dbTable, true, null);
}
    
    
public void upsertTableToTableIndexInES(final String catalogId, final String databaseName, final DBTable table,
                                        final Boolean isOverwriteESTableDoc,
                                        final GetResponse tableESDocument) {
    IndexRequest indexRequest = new IndexRequest(ES_TABLE_INDEX, ES_TABLE_INDEX_DOC_TYPE);

    if (isOverwriteESTableDoc) {
        // doing to prevent any data loss because of multiple update call from other DCS clients
        // TODO: remove after testing
        indexRequest = indexRequest.setIfSeqNo(1574633);
        indexRequest = indexRequest.setIfPrimaryTerm(6);
        log.info("setting sequence number and primary term");
    }

    final Map<String, Object> payload = getMapFromObject(table);
    //Append table, database for search and actual payload information as string (no search on it).
    payload.put(ES_TABLE_INDEX_DATABASE_NAME_KEY, databaseName);
    payload.put(ES_DOC_KEY_CATALOG_ID, catalogId);

    final String tableName = table.getName();
    final String id = getESTableId(catalogId, databaseName, tableName);

    indexRequest.source(payload).id(id);

    log.info("Adding {} to elasticsearch index:{} as type {} with id {}", GSON.toJson(payload),
            ES_TABLE_INDEX, ES_TABLE_INDEX_DOC_TYPE, id);

    performIndexRequest(indexRequest);

    log.info("Added {} to elasticsearch index:{} as type {} with id {}", GSON.toJson(payload),
            ES_TABLE_INDEX, ES_TABLE_INDEX_DOC_TYPE, id);
}
    
protected void performIndexRequest (IndexRequest indexRequest) {
    try {
        log.info("seq_num={}, primary_term={}", indexRequest.ifSeqNo(), indexRequest.ifPrimaryTerm());
        log.info("Initiating Index Request {} from ElasticSearch", indexRequest);
        IndexResponse indexResponse = elasticSearchClient.index(indexRequest, RequestOptions.DEFAULT);
        log.info("Index response {}", indexResponse);
    } catch (IOException e) {
        String ioError = String.format("I/O Error when performing index request %s on ElasticSearch.", indexRequest);
        log.error(ioError, e);
        throw new DataCatalogException(ioError + " " + e.getMessage(), e);
    }
}

原木

[java] 11 Apr 2023 17:05:46,736 [INFO] [] (main) Utils:44 Initializing AppConfig with args:--root=build--realm=us-east-1--domain=beta
[java] 11 Apr 2023 17:05:46,742 [INFO] [] (main) AppConfigTree:1556 Using config root at build/brazil-config
[java] 11 Apr 2023 17:05:46,743 [INFO] [] (main) AppConfigTree:1703 Found domain = beta
[java] 11 Apr 2023 17:05:46,743 [INFO] [] (main) AppConfigTree:1704 Found realm = us-east-1
[java] 11 Apr 2023 17:05:46,743 [INFO] [] (main) AppConfigTree:1743 Determined appname = SpektrDataCatalogLambda
[java] 11 Apr 2023 17:05:46,743 [INFO] [] (main) AppConfigTree:1759 Determined appgroup = SpektrDataCatalogLambda
[java] 11 Apr 2023 17:05:46,744 [INFO] [] (main) AppConfigTree:1952 Parsing config file: build/brazil-config/app/SpektrDataCatalogLambda.cfg
[java] 11 Apr 2023 17:05:46,749 [INFO] [] (main) MixedCaseTableUtils:116 tempDirPrefix=/tmp/beta
[java] 11 Apr 2023 17:05:46,794 [INFO] [] (main) Utils:70 creating DCS client for awsAccountId=606635778254, realm=us-east-1, dcsEndpoint=https://datacatalogv2.beta.spektr.a2z.com
[java] 11 Apr 2023 17:05:50,746 [INFO] [] (main) Utils:57 Getting your credentials from Conduit.
[java] 11 Apr 2023 17:05:52,650 [INFO] [] (main) ElasticSearchClientFactory:46 Initialized ES URL as search-spektr-es-datacatalog-6-8-cq2riup3kroeai2hokcrfxh5me.us-east-1.es.amazonaws.com
[java] 11 Apr 2023 17:05:52,909 [INFO] [] (main) ElasticSearchUtils:121 setting sequence number and primary term
[java] 11 Apr 2023 17:05:53,009 [INFO] [] (main) ElasticSearchUtils:134 Adding {"name":"test_table_6SjooYiji3","catalogId":"606635778254","metadata":{"test":"singhjfi"},"retentionPeriodInDays":0,"databaseName":"db_dcs_int_test4"} to elasticsearch index:table_index as type gluetable with id 606635778254_db_dcs_int_test4_test_table_6SjooYiji3
[java] 11 Apr 2023 17:05:53,010 [INFO] [] (main) ElasticSearchEnabledGlueDataCatalog:1971 seq_num=1574633, primary_term=6
[java] 11 Apr 2023 17:05:53,011 [INFO] [] (main) ElasticSearchEnabledGlueDataCatalog:1972 Initiating Index Request index {[table_index][gluetable][606635778254_db_dcs_int_test4_test_table_6SjooYiji3], source[{"name":"test_table_6SjooYiji3","description":null,"catalogId":"606635778254","owner":null,"partitionKeys":null,"storageDescriptor":null,"storageDetails":null,"parameters":null,"lastRefreshedTime":null,"encryptedData":null,"versioned":null,"versionColumns":null,"metadata":{"test":"singhjfi"},"retentionPeriodInDays":0,"retention":null,"autoApproveReadRequest":null,"databaseName":"db_dcs_int_test4"}]} from ElasticSearch
[java] 11 Apr 2023 17:05:53,885 [INFO] [] (main) ElasticSearchEnabledGlueDataCatalog:1974 Index response IndexResponse[index=table_index,type=gluetable,id=606635778254_db_dcs_int_test4_test_table_6SjooYiji3,version=35,result=updated,seqNo=1574661,primaryTerm=6,shards={"total":2,"successful":1,"failed":0}]
[java] 11 Apr 2023 17:05:53,885 [INFO] [] (main) ElasticSearchUtils:139 Added {"name":"test_table_6SjooYiji3","catalogId":"606635778254","metadata":{"test":"singhjfi"},"retentionPeriodInDays":0,"databaseName":"db_dcs_int_test4"} to elasticsearch index:table_index as type gluetable with id 606635778254_db_dcs_int_test4_test_table_6SjooYiji3
[java] 11 Apr 2023 17:05:53,885 [INFO] [] (main) MixedCaseTableUtils:143 Done operation=test
xnifntxz

xnifntxz1#

我看到的最大区别是,在Kibana中,你使用的是_doc类型,而在你的代码中,Map类型是gluetable,所以你没有更新你认为你在更新的东西。
在Kibana中,你正在索引

/table_index/_doc/606635778254_db_dcs_int_test4_test_table_6SjooYiji3

而在你的测试代码中你索引的是

/table_index/gluetable/606635778254_db_dcs_int_test4_test_table_6SjooYiji3

这是两个不同的文件。
确保在你的代码中ES_TABLE_INDEX_DOC_TYPE = "_doc",你应该得到同样的错误。

rbl8hiat

rbl8hiat2#

在将ES java客户端包从6.6.1升级到7.9.3之后,它会像预期的那样抛出异常。
从客户端源代码中找到提示

if (out.getVersion().onOrAfter(Version.V_6_6_0)) {
            out.writeZLong(ifSeqNo);
            out.writeVLong(ifPrimaryTerm);
        } else if (ifSeqNo != UNASSIGNED_SEQ_NO || ifPrimaryTerm != UNASSIGNED_PRIMARY_TERM) {
            assert false : "setIfMatch [" + ifSeqNo + "], currentDocTem [" + ifPrimaryTerm + "]";
            throw new IllegalStateException(
                "sequence number based compare and write is not supported until all nodes are on version 7.0 or higher. " +
                    "Stream version [" + out.getVersion() + "]");
        }

相关问题