使用 AWS SDK v2 从 DynamoDB 到 Elasticsearch?

DynamoDB to Elasticsearch with AWS SDK v2?

我遇到了 ,展示了如何将数据从 DynamoDB 推送到 Elasticsearch 以进行全文搜索索引。然而,我们的应用程序没有使用 Lambdas。相反,我们使用 Apache Camel 来捕获 DynamoDB Streams 事件,并希望将记录从那里推送到 Elasticsearch。

由于我们使用的是 AWS SDK v2,因此我们不会捕获包含 DynamoDB 记录的 DynamodbEvent class 或相应的 DynamodbStreamRecord 记录 class。相反,我们收到一个 software.amazon.awssdk.services.dynamodb.model.Record 对象。鉴于此,我们如何在 Elasticsearch 中序列化并随后索引这些数据?在引用的另一个问题中,记录被转换为 JSON 字符串,然后发送到 Elasticsearch。有没有办法用 v2 Record class 做到这一点?答案中提到的 ItemUtils class 已经不存在了,所以我不知道还有另一种序列化它的方法。

非常感谢您提供的任何帮助!!

与您提供的示例类似,您可以尝试以下操作:

public void processRecord(Record record, String index, String type, RestHighLevelClient esClient) throws Exception {
  // Get operation
  final OperationType operationType = record.eventName();
  // Obtain a reference to actual DynamoDB stream record
  final StreamRecord streamRecord = record.dynamodb();
  // Get ID. Assume single numeric attribute as partition key
  final Map<String,AttributeValue> keys = streamRecord.keys();
  final String recordId = keys.get("ID").n();

  switch (operationType) {
    case INSERT:
      if (!streamRecord.hasNewImage()) {
        throw new IllegalArgumentException("No new image when inserting");
      }
      Map<String,AttributeValue> newImage = streamRecord.newImage();
      // Where toJson is defined here https://github.com/aaronanderson/aws-java-sdk-v2-utils/blob/master/src/main/java/DynamoDBUtil.java
      // and included below
      JsonObject jsonObject = toJson(newImage);
      IndexRequest indexRequest = new IndexRequest(index, type, recordId);
      indexRequest.source(jsonObject.toString(), XContentType.JSON);
      IndexResponse indexResponse = esClient.index(indexRequest, RequestOptions.DEFAULT);
      System.out.println("New content successfully indexed: " + indexResponse);
      break;
    case MODIFY:
      if (!streamRecord.hasNewImage()) {
        throw new IllegalArgumentException("No new image when updating");
      }
      Map<String,AttributeValue> newImage = streamRecord.newImage();
      JsonObject jsonObject = toJson(newImage);
      UpdateRequest updateRequest = new UpdateRequest(index, type, recordId);
      request.doc(jsonObject.toString(), XContentType.JSON);
      UpdateResponse updateResponse = esClient.update(updateRequest, RequestOptions.DEFAULT);
      System.out.println("Content successfully updated: " + updateResponse);
      break;
    case REMOVE:
      DeleteRequest deleteRequest = new DeleteRequest(index, type, recordId);
      DeleteResponse deleteResponse = esClient.delete(deleteRequest, RequestOptions.DEFAULT);
      System.out.println("Successfully removed: " + deleteResponse);
      break;
    default:
      throw new UnsupportedOperationException("Operation type " + opetationType + " not supportd");  
  }
}

toJson方法定义是这样的class:https://github.com/aaronanderson/aws-java-sdk-v2-utils/blob/master/src/main/java/DynamoDBUtil.java

源码转载于此:

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.zip.DataFormatException;
import java.util.zip.Deflater;
import java.util.zip.Inflater;

import javax.json.Json;
import javax.json.JsonArray;
import javax.json.JsonArrayBuilder;
import javax.json.JsonNumber;
import javax.json.JsonObject;
import javax.json.JsonObjectBuilder;
import javax.json.JsonString;
import javax.json.JsonStructure;
import javax.json.JsonValue;

import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;

/** This is a utility for converting DynamoDB AttributeValues to and from Java JSON-P objects */
public class DynamoDBUtil {

    public static void addList(String key, JsonObjectBuilder objectBuilder, List<JsonObject> items) {
        if (!items.isEmpty()) {
            JsonArrayBuilder builder = Json.createArrayBuilder();
            items.forEach(i -> builder.add(i));
            objectBuilder.add(key, builder.build());
        }

    }

    public static JsonArray toJson(List<AttributeValue> attributeValues) {
        if (attributeValues == null) {
            return null;
        }
        JsonArrayBuilder valueBuilder = Json.createArrayBuilder();
        for (AttributeValue a : attributeValues) {
            add(toJson(a), valueBuilder);
        }
        return valueBuilder.build();
    }

    public static JsonObject toJson(Map<String, AttributeValue> attributeValues) {
        if (attributeValues == null) {
            return null;
        }
        JsonObjectBuilder valueBuilder = Json.createObjectBuilder();
        for (Map.Entry<String, AttributeValue> a : attributeValues.entrySet()) {
            add(a.getKey(), toJson(a.getValue()), valueBuilder);
        }
        return valueBuilder.build();
    }

    public static void add(String key, Object value, JsonObjectBuilder object) {
        if (value instanceof JsonValue) {
            object.add(key, (JsonValue) value);
            // with json-p 1.0 can't create JsonString or JsonNumber so simply setting JsonValue not an option.
        } else if (value instanceof String) {
            object.add(key, (String) value);
        } else if (value instanceof BigDecimal) {
            object.add(key, (BigDecimal) value);
        } else if (value instanceof Boolean) {
            object.add(key, (Boolean) value);
        } else if (value == null || value.equals(JsonValue.NULL)) {
            object.addNull(key);
        }

    }

    public static void add(Object value, JsonArrayBuilder array) {
        if (value instanceof JsonValue) {
            array.add((JsonValue) value);
        } else if (value instanceof String) {
            array.add((String) value);
        } else if (value instanceof BigDecimal) {
            array.add((BigDecimal) value);
        } else if (value instanceof Boolean) {
            array.add((Boolean) value);
        } else if (value.equals(JsonValue.NULL)) {
            array.addNull();
        }

    }

    public static Object toJson(AttributeValue attributeValue) {
        // with json-p 1.1 Json.createValue() can be used.

        if (attributeValue == null) {
            return null;
        }
        if (attributeValue.s() != null) {
            return attributeValue.s();
        }
        if (attributeValue.n() != null) {
            return new BigDecimal(attributeValue.n());
        }
        if (attributeValue.bool() != null) {
            // return attributeValue.bool() ? JsonValue.TRUE : JsonValue.FALSE;
            return attributeValue.bool();
        }

        if (attributeValue.b() != null) {
            // return Base64.getEncoder().encodeToString(attributeValue.b().array());
            return null;
        }

        if (attributeValue.nul() != null && attributeValue.nul()) {
            return JsonValue.NULL;
        }

        if (!attributeValue.m().isEmpty()) {
            return toJson(attributeValue.m());
        }
        if (!attributeValue.l().isEmpty()) {
            return toJson(attributeValue.l());
        }

        if (!attributeValue.ss().isEmpty()) {
            return attributeValue.ss();
        }

        if (!attributeValue.ns().isEmpty()) {
            return attributeValue.ns();
        }

        if (!attributeValue.bs().isEmpty()) {
            //return attributeValue.bs();
            return null;
        }
        return null;
    }

    public static Map<String, AttributeValue> toAttribute(JsonObject jsonObject) {
        Map<String, AttributeValue> attribute = new HashMap<>();
        jsonObject.entrySet().forEach(e -> {
            attribute.put(e.getKey(), toAttribute(e.getValue()));
        });
        return attribute;
    }

    public static List<AttributeValue> toAttribute(JsonArray jsonArray) {
        List<AttributeValue> attributes = new LinkedList<>();
        jsonArray.forEach(e -> {
            attributes.add(toAttribute(e));
        });
        return attributes;
    }

    public static AttributeValue toAttribute(JsonValue jsonValue) {
        if (jsonValue == null) {
            return null;
        }
        switch (jsonValue.getValueType()) {
        case STRING:
            return AttributeValue.builder().s(((JsonString) jsonValue).getString()).build();
        case OBJECT:
            return AttributeValue.builder().m(toAttribute((JsonObject) jsonValue)).build();
        case ARRAY:
            return AttributeValue.builder().l(toAttribute((JsonArray) jsonValue)).build();
        case NUMBER:
            return AttributeValue.builder().n(((JsonNumber) jsonValue).toString()).build();
        case TRUE:
            return AttributeValue.builder().bool(true).build();
        case FALSE:
            return AttributeValue.builder().bool(false).build();
        case NULL:
            return AttributeValue.builder().nul(true).build();
        }

        return null;
    }

    public static AttributeValue compress(Map<String, AttributeValue> attributeValues) throws IOException {
        return compress(toJson(attributeValues));
    }

    public static AttributeValue compress(List<AttributeValue> attributeValues) throws IOException {
        return compress(toJson(attributeValues));
    }

    public static AttributeValue compress(JsonStructure jsonStructure) throws IOException {
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        Json.createWriter(outputStream).write(jsonStructure);
        outputStream.close();
        byte[] jsonBinary = outputStream.toByteArray();

        outputStream = new ByteArrayOutputStream();
        Deflater deflater = new Deflater();
        deflater.setInput(jsonBinary);
        deflater.finish();
        byte[] buffer = new byte[1024];
        while (!deflater.finished()) {
            int count = deflater.deflate(buffer); // returns the generated code... index
            outputStream.write(buffer, 0, count);
        }
        outputStream.close();
        jsonBinary = outputStream.toByteArray();

        return AttributeValue.builder().b(SdkBytes.fromByteArray(jsonBinary)).build();
    }

    public static JsonStructure decompress(AttributeValue attributeValue) throws IOException, DataFormatException {
        Inflater inflater = new Inflater();
        byte[] jsonBinary = attributeValue.b().asByteArray();
        inflater.setInput(jsonBinary);
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream(jsonBinary.length);
        byte[] buffer = new byte[1024];
        while (!inflater.finished()) {
            int count = inflater.inflate(buffer);
            outputStream.write(buffer, 0, count);
        }
        outputStream.close();
        byte[] output = outputStream.toByteArray();
        ByteArrayInputStream bis = new ByteArrayInputStream(output);
        return Json.createReader(bis).read();
    }

}

此 class 是此 gist 中最初介绍的更新版本。

此 post 还为 Jackson 的 AtributeValue 序列化程序提供了 link,如果您更喜欢使用该库进行 JSON 序列化。