dynamodb到elasticsearch?

63lcw9qa  于 2021-06-14  发布在  ElasticSearch
关注(0)|答案(1)|浏览(353)

我遇到了这个问题和答案,展示了如何将数据从dynamodb推送到elasticsearch以进行全文搜索索引。然而,我们的应用程序并没有使用lambdas。相反,我们使用apachecamel来捕获dynamodb流事件,并希望从那里将记录推送到elasticsearch。
因为我们使用的是aws sdk v2,所以我们没有捕获 DynamodbEvent 类别或相应 DynamodbStreamRecord 包含dynamodb记录的record类。相反,我们收到了 software.amazon.awssdk.services.dynamodb.model.Record 对象。既然如此,我们如何在elasticsearch中序列化并随后索引这些数据?在引用的另一个问题中,记录被转换为json字符串,然后发送到elasticsearch。有没有办法用v2做到这一点 Record 上课?这个 ItemUtils 答案中提到的类已经不存在了,所以我不知道还有什么方法可以序列化它。
非常感谢您的帮助!!

j9per5c4

j9per5c41#

与您提供的示例类似,您可以尝试以下操作:

public void processRecord(Record record, String index, String type, RestHighLevelClient esClient) throws Exception {
  // Get operation
  final OperationType operationType = record.eventName();
  // Obtain a reference to actual DynamoDB stream record
  final StreamRecord streamRecord = record.dynamodb();
  // Get ID. Assume single numeric attribute as partition key
  final Map<String,AttributeValue> keys = streamRecord.keys();
  final String recordId = keys.get("ID").n();

  switch (operationType) {
    case INSERT:
      if (!streamRecord.hasNewImage()) {
        throw new IllegalArgumentException("No new image when inserting");
      }
      Map<String,AttributeValue> newImage = streamRecord.newImage();
      // Where toJson is defined here https://github.com/aaronanderson/aws-java-sdk-v2-utils/blob/master/src/main/java/DynamoDBUtil.java
      // and included below
      JsonObject jsonObject = toJson(newImage);
      IndexRequest indexRequest = new IndexRequest(index, type, recordId);
      indexRequest.source(jsonObject.toString(), XContentType.JSON);
      IndexResponse indexResponse = esClient.index(indexRequest, RequestOptions.DEFAULT);
      System.out.println("New content successfully indexed: " + indexResponse);
      break;
    case MODIFY:
      if (!streamRecord.hasNewImage()) {
        throw new IllegalArgumentException("No new image when updating");
      }
      Map<String,AttributeValue> newImage = streamRecord.newImage();
      JsonObject jsonObject = toJson(newImage);
      UpdateRequest updateRequest = new UpdateRequest(index, type, recordId);
      request.doc(jsonObject.toString(), XContentType.JSON);
      UpdateResponse updateResponse = esClient.update(updateRequest, RequestOptions.DEFAULT);
      System.out.println("Content successfully updated: " + updateResponse);
      break;
    case REMOVE:
      DeleteRequest deleteRequest = new DeleteRequest(index, type, recordId);
      DeleteResponse deleteResponse = esClient.delete(deleteRequest, RequestOptions.DEFAULT);
      System.out.println("Successfully removed: " + deleteResponse);
      break;
    default:
      throw new UnsupportedOperationException("Operation type " + opetationType + " not supportd");  
  }
}

这个 toJson 方法定义为此类:https://github.com/aaronanderson/aws-java-sdk-v2-utils/blob/master/src/main/java/dynamodbutil.java
源代码复制如下:

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.zip.DataFormatException;
import java.util.zip.Deflater;
import java.util.zip.Inflater;

import javax.json.Json;
import javax.json.JsonArray;
import javax.json.JsonArrayBuilder;
import javax.json.JsonNumber;
import javax.json.JsonObject;
import javax.json.JsonObjectBuilder;
import javax.json.JsonString;
import javax.json.JsonStructure;
import javax.json.JsonValue;

import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;

/**This is a utility for converting DynamoDB AttributeValues to and from Java JSON-P objects */
public class DynamoDBUtil {

    public static void addList(String key, JsonObjectBuilder objectBuilder, List<JsonObject> items) {
        if (!items.isEmpty()) {
            JsonArrayBuilder builder = Json.createArrayBuilder();
            items.forEach(i -> builder.add(i));
            objectBuilder.add(key, builder.build());
        }

    }

    public static JsonArray toJson(List<AttributeValue> attributeValues) {
        if (attributeValues == null) {
            return null;
        }
        JsonArrayBuilder valueBuilder = Json.createArrayBuilder();
        for (AttributeValue a : attributeValues) {
            add(toJson(a), valueBuilder);
        }
        return valueBuilder.build();
    }

    public static JsonObject toJson(Map<String, AttributeValue> attributeValues) {
        if (attributeValues == null) {
            return null;
        }
        JsonObjectBuilder valueBuilder = Json.createObjectBuilder();
        for (Map.Entry<String, AttributeValue> a : attributeValues.entrySet()) {
            add(a.getKey(), toJson(a.getValue()), valueBuilder);
        }
        return valueBuilder.build();
    }

    public static void add(String key, Object value, JsonObjectBuilder object) {
        if (value instanceof JsonValue) {
            object.add(key, (JsonValue) value);
            // with json-p 1.0 can't create JsonString or JsonNumber so simply setting JsonValue not an option.
        } else if (value instanceof String) {
            object.add(key, (String) value);
        } else if (value instanceof BigDecimal) {
            object.add(key, (BigDecimal) value);
        } else if (value instanceof Boolean) {
            object.add(key, (Boolean) value);
        } else if (value == null || value.equals(JsonValue.NULL)) {
            object.addNull(key);
        }

    }

    public static void add(Object value, JsonArrayBuilder array) {
        if (value instanceof JsonValue) {
            array.add((JsonValue) value);
        } else if (value instanceof String) {
            array.add((String) value);
        } else if (value instanceof BigDecimal) {
            array.add((BigDecimal) value);
        } else if (value instanceof Boolean) {
            array.add((Boolean) value);
        } else if (value.equals(JsonValue.NULL)) {
            array.addNull();
        }

    }

    public static Object toJson(AttributeValue attributeValue) {
        // with json-p 1.1 Json.createValue() can be used.

        if (attributeValue == null) {
            return null;
        }
        if (attributeValue.s() != null) {
            return attributeValue.s();
        }
        if (attributeValue.n() != null) {
            return new BigDecimal(attributeValue.n());
        }
        if (attributeValue.bool() != null) {
            // return attributeValue.bool() ? JsonValue.TRUE : JsonValue.FALSE;
            return attributeValue.bool();
        }

        if (attributeValue.b() != null) {
            // return Base64.getEncoder().encodeToString(attributeValue.b().array());
            return null;
        }

        if (attributeValue.nul() != null && attributeValue.nul()) {
            return JsonValue.NULL;
        }

        if (!attributeValue.m().isEmpty()) {
            return toJson(attributeValue.m());
        }
        if (!attributeValue.l().isEmpty()) {
            return toJson(attributeValue.l());
        }

        if (!attributeValue.ss().isEmpty()) {
            return attributeValue.ss();
        }

        if (!attributeValue.ns().isEmpty()) {
            return attributeValue.ns();
        }

        if (!attributeValue.bs().isEmpty()) {
            //return attributeValue.bs();
            return null;
        }
        return null;
    }

    public static Map<String, AttributeValue> toAttribute(JsonObject jsonObject) {
        Map<String, AttributeValue> attribute = new HashMap<>();
        jsonObject.entrySet().forEach(e -> {
            attribute.put(e.getKey(), toAttribute(e.getValue()));
        });
        return attribute;
    }

    public static List<AttributeValue> toAttribute(JsonArray jsonArray) {
        List<AttributeValue> attributes = new LinkedList<>();
        jsonArray.forEach(e -> {
            attributes.add(toAttribute(e));
        });
        return attributes;
    }

    public static AttributeValue toAttribute(JsonValue jsonValue) {
        if (jsonValue == null) {
            return null;
        }
        switch (jsonValue.getValueType()) {
        case STRING:
            return AttributeValue.builder().s(((JsonString) jsonValue).getString()).build();
        case OBJECT:
            return AttributeValue.builder().m(toAttribute((JsonObject) jsonValue)).build();
        case ARRAY:
            return AttributeValue.builder().l(toAttribute((JsonArray) jsonValue)).build();
        case NUMBER:
            return AttributeValue.builder().n(((JsonNumber) jsonValue).toString()).build();
        case TRUE:
            return AttributeValue.builder().bool(true).build();
        case FALSE:
            return AttributeValue.builder().bool(false).build();
        case NULL:
            return AttributeValue.builder().nul(true).build();
        }

        return null;
    }

    public static AttributeValue compress(Map<String, AttributeValue> attributeValues) throws IOException {
        return compress(toJson(attributeValues));
    }

    public static AttributeValue compress(List<AttributeValue> attributeValues) throws IOException {
        return compress(toJson(attributeValues));
    }

    public static AttributeValue compress(JsonStructure jsonStructure) throws IOException {
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        Json.createWriter(outputStream).write(jsonStructure);
        outputStream.close();
        byte[] jsonBinary = outputStream.toByteArray();

        outputStream = new ByteArrayOutputStream();
        Deflater deflater = new Deflater();
        deflater.setInput(jsonBinary);
        deflater.finish();
        byte[] buffer = new byte[1024];
        while (!deflater.finished()) {
            int count = deflater.deflate(buffer); // returns the generated code... index
            outputStream.write(buffer, 0, count);
        }
        outputStream.close();
        jsonBinary = outputStream.toByteArray();

        return AttributeValue.builder().b(SdkBytes.fromByteArray(jsonBinary)).build();
    }

    public static JsonStructure decompress(AttributeValue attributeValue) throws IOException, DataFormatException {
        Inflater inflater = new Inflater();
        byte[] jsonBinary = attributeValue.b().asByteArray();
        inflater.setInput(jsonBinary);
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream(jsonBinary.length);
        byte[] buffer = new byte[1024];
        while (!inflater.finished()) {
            int count = inflater.inflate(buffer);
            outputStream.write(buffer, 0, count);
        }
        outputStream.close();
        byte[] output = outputStream.toByteArray();
        ByteArrayInputStream bis = new ByteArrayInputStream(output);
        return Json.createReader(bis).read();
    }

}

这个类是这个要点中最初介绍的类的更新版本。
这篇文章还提供了一个链接到Jackson的 AtributeValue 序列化程序,如果您喜欢使用该库进行json序列化。

相关问题