我试图通过使用具有与分配给文档的序列号和主要术语不同的索引请求来验证Elastic搜索文档是否正确更新。根据文档,如果IfSeqNo和IfPrimaryTerm值与文档的sequence_no和主要术语不匹配,则ES将抛出VersionConflictEngineException。我的测试代码文档正在更新,序列号正在递增,即使我在索引请求中设置了一个旧的序列号。我希望抛出VersionConflictEngineException异常,但这并没有发生。让我知道我的代码有什么问题。先谢谢你了。
附加我的java代码。test方法是入口点。我试着在同一个ES集群上从kibana客户端做同样的事情,但它按预期工作(抛出异常)。
Elasticsearch版本为6.8 JavaClient版本为elasticsearch-6.6.1
/**
* only perform this indexing request if the document was last modification was assigned the given
* sequence number. Must be used in combination with {@link #setIfPrimaryTerm(long)}
*
* If the document last modification was assigned a different sequence number a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public IndexRequest setIfSeqNo(long seqNo) { }
Kibana客户端示例
POST /table_index/_doc/606635778254_db_dcs_int_test4_test_table_6SjooYiji3?if_seq_no=1574635&if_primary_term=6
{
"name" : "test_table_6SjooYiji3",
"description" : null,
"catalogId" : "606635778254",
"databaseName" : "db_dcs_int_test4"
}
Response from kibana
{
"error": {
"root_cause": [
{
"type": "version_conflict_engine_exception",
"reason": "[gluetable][606635778254_db_dcs_int_test4_test_table_6SjooYiji3]: version conflict, required seqNo [1574635], primary term [6]. current document has seqNo [1574661] and primary term [6]",
"index_uuid": "jisjayWfTTe7Rd-Ll48qWw",
"shard": "0",
"index": "table_index"
}
],
"type": "version_conflict_engine_exception",
"reason": "[gluetable][606635778254_db_dcs_int_test4_test_table_6SjooYiji3]: version conflict, required seqNo [1574635], primary term [6]. current document has seqNo [1574661] and primary term [6]",
"index_uuid": "jisjayWfTTe7Rd-Ll48qWw",
"shard": "0",
"index": "table_index"
},
"status": 409
}
private void test() {
String catalogId="606635778254";
String databaseName="db_dcs_int_test4";
String tableName="test_table_6SjooYiji3";
com.amazon.spektrdatacataloglambda.DBTable dbTable = new com.amazon.spektrdatacataloglambda.DBTable();
dbTable.setName(tableName);
dbTable.setCatalogId(catalogId);
dbTable.setMetadata(new HashMap<String, String>(){{ put("test","singhjfi");}});
elasticSearchUtils.upsertTableToTableIndexInES(catalogId, databaseName, dbTable, true, null);
}
public void upsertTableToTableIndexInES(final String catalogId, final String databaseName, final DBTable table,
final Boolean isOverwriteESTableDoc,
final GetResponse tableESDocument) {
IndexRequest indexRequest = new IndexRequest(ES_TABLE_INDEX, ES_TABLE_INDEX_DOC_TYPE);
if (isOverwriteESTableDoc) {
// doing to prevent any data loss because of multiple update call from other DCS clients
// TODO: remove after testing
indexRequest = indexRequest.setIfSeqNo(1574633);
indexRequest = indexRequest.setIfPrimaryTerm(6);
log.info("setting sequence number and primary term");
}
final Map<String, Object> payload = getMapFromObject(table);
//Append table, database for search and actual payload information as string (no search on it).
payload.put(ES_TABLE_INDEX_DATABASE_NAME_KEY, databaseName);
payload.put(ES_DOC_KEY_CATALOG_ID, catalogId);
final String tableName = table.getName();
final String id = getESTableId(catalogId, databaseName, tableName);
indexRequest.source(payload).id(id);
log.info("Adding {} to elasticsearch index:{} as type {} with id {}", GSON.toJson(payload),
ES_TABLE_INDEX, ES_TABLE_INDEX_DOC_TYPE, id);
performIndexRequest(indexRequest);
log.info("Added {} to elasticsearch index:{} as type {} with id {}", GSON.toJson(payload),
ES_TABLE_INDEX, ES_TABLE_INDEX_DOC_TYPE, id);
}
protected void performIndexRequest (IndexRequest indexRequest) {
try {
log.info("seq_num={}, primary_term={}", indexRequest.ifSeqNo(), indexRequest.ifPrimaryTerm());
log.info("Initiating Index Request {} from ElasticSearch", indexRequest);
IndexResponse indexResponse = elasticSearchClient.index(indexRequest, RequestOptions.DEFAULT);
log.info("Index response {}", indexResponse);
} catch (IOException e) {
String ioError = String.format("I/O Error when performing index request %s on ElasticSearch.", indexRequest);
log.error(ioError, e);
throw new DataCatalogException(ioError + " " + e.getMessage(), e);
}
}
原木
[java] 11 Apr 2023 17:05:46,736 [INFO] [] (main) Utils:44 Initializing AppConfig with args:--root=build--realm=us-east-1--domain=beta
[java] 11 Apr 2023 17:05:46,742 [INFO] [] (main) AppConfigTree:1556 Using config root at build/brazil-config
[java] 11 Apr 2023 17:05:46,743 [INFO] [] (main) AppConfigTree:1703 Found domain = beta
[java] 11 Apr 2023 17:05:46,743 [INFO] [] (main) AppConfigTree:1704 Found realm = us-east-1
[java] 11 Apr 2023 17:05:46,743 [INFO] [] (main) AppConfigTree:1743 Determined appname = SpektrDataCatalogLambda
[java] 11 Apr 2023 17:05:46,743 [INFO] [] (main) AppConfigTree:1759 Determined appgroup = SpektrDataCatalogLambda
[java] 11 Apr 2023 17:05:46,744 [INFO] [] (main) AppConfigTree:1952 Parsing config file: build/brazil-config/app/SpektrDataCatalogLambda.cfg
[java] 11 Apr 2023 17:05:46,749 [INFO] [] (main) MixedCaseTableUtils:116 tempDirPrefix=/tmp/beta
[java] 11 Apr 2023 17:05:46,794 [INFO] [] (main) Utils:70 creating DCS client for awsAccountId=606635778254, realm=us-east-1, dcsEndpoint=https://datacatalogv2.beta.spektr.a2z.com
[java] 11 Apr 2023 17:05:50,746 [INFO] [] (main) Utils:57 Getting your credentials from Conduit.
[java] 11 Apr 2023 17:05:52,650 [INFO] [] (main) ElasticSearchClientFactory:46 Initialized ES URL as search-spektr-es-datacatalog-6-8-cq2riup3kroeai2hokcrfxh5me.us-east-1.es.amazonaws.com
[java] 11 Apr 2023 17:05:52,909 [INFO] [] (main) ElasticSearchUtils:121 setting sequence number and primary term
[java] 11 Apr 2023 17:05:53,009 [INFO] [] (main) ElasticSearchUtils:134 Adding {"name":"test_table_6SjooYiji3","catalogId":"606635778254","metadata":{"test":"singhjfi"},"retentionPeriodInDays":0,"databaseName":"db_dcs_int_test4"} to elasticsearch index:table_index as type gluetable with id 606635778254_db_dcs_int_test4_test_table_6SjooYiji3
[java] 11 Apr 2023 17:05:53,010 [INFO] [] (main) ElasticSearchEnabledGlueDataCatalog:1971 seq_num=1574633, primary_term=6
[java] 11 Apr 2023 17:05:53,011 [INFO] [] (main) ElasticSearchEnabledGlueDataCatalog:1972 Initiating Index Request index {[table_index][gluetable][606635778254_db_dcs_int_test4_test_table_6SjooYiji3], source[{"name":"test_table_6SjooYiji3","description":null,"catalogId":"606635778254","owner":null,"partitionKeys":null,"storageDescriptor":null,"storageDetails":null,"parameters":null,"lastRefreshedTime":null,"encryptedData":null,"versioned":null,"versionColumns":null,"metadata":{"test":"singhjfi"},"retentionPeriodInDays":0,"retention":null,"autoApproveReadRequest":null,"databaseName":"db_dcs_int_test4"}]} from ElasticSearch
[java] 11 Apr 2023 17:05:53,885 [INFO] [] (main) ElasticSearchEnabledGlueDataCatalog:1974 Index response IndexResponse[index=table_index,type=gluetable,id=606635778254_db_dcs_int_test4_test_table_6SjooYiji3,version=35,result=updated,seqNo=1574661,primaryTerm=6,shards={"total":2,"successful":1,"failed":0}]
[java] 11 Apr 2023 17:05:53,885 [INFO] [] (main) ElasticSearchUtils:139 Added {"name":"test_table_6SjooYiji3","catalogId":"606635778254","metadata":{"test":"singhjfi"},"retentionPeriodInDays":0,"databaseName":"db_dcs_int_test4"} to elasticsearch index:table_index as type gluetable with id 606635778254_db_dcs_int_test4_test_table_6SjooYiji3
[java] 11 Apr 2023 17:05:53,885 [INFO] [] (main) MixedCaseTableUtils:143 Done operation=test
2条答案
按热度按时间xnifntxz1#
我看到的最大区别是,在Kibana中,你使用的是
_doc
类型,而在你的代码中,Map类型是gluetable
,所以你没有更新你认为你在更新的东西。在Kibana中,你正在索引
而在你的测试代码中你索引的是
这是两个不同的文件。
确保在你的代码中
ES_TABLE_INDEX_DOC_TYPE = "_doc"
,你应该得到同样的错误。rbl8hiat2#
在将ES java客户端包从6.6.1升级到7.9.3之后,它会像预期的那样抛出异常。
从客户端源代码中找到提示