使用 hadoop reducer 在 BulkWriteOperation 进入 mongo 时检查重复记录

Check for duplicate records while BulkWriteOperation into mongo using hadoop reducer

我正在使用 hadoop map-reduce 处理 XML 文件。我直接把JSON的数据存入mongodb。
怎样才能做到在执行BulkWriteOperation前只把不重复的记录存入数据库?

重复记录标准将基于产品图像和产品名称,我不想使用吗啡层,其中我们可以为 class 成员分配索引。

这是我的减速器 class:

public class XMLReducer extends Reducer<Text, MapWritable, Text, NullWritable>{

private static final Logger LOGGER = Logger.getLogger(XMLReducer.class);    

protected void reduce(Text key, Iterable<MapWritable> values, Context ctx) throws IOException, InterruptedException{
    LOGGER.info("reduce()------Start for key>"+key);
    Map<String,String> insertProductInfo = new HashMap<String,String>();
    try{
        MongoClient mongoClient = new MongoClient("localhost", 27017);
        DB db = mongoClient.getDB("test");
        BulkWriteOperation operation = db.getCollection("product").initializeOrderedBulkOperation();
        for (MapWritable entry : values) {
             for (Entry<Writable, Writable> extractProductInfo : entry.entrySet()) {
                    insertProductInfo.put(extractProductInfo.getKey().toString(), extractProductInfo.getValue().toString());
                }
             if(!insertProductInfo.isEmpty()){
                 BasicDBObject basicDBObject = new BasicDBObject(insertProductInfo);
                 operation.insert(basicDBObject);
             }          
        }
        //How can I check for duplicates before executing bulk operation
        operation.execute();
        LOGGER.info("reduce------end for key"+key);
    }catch(Exception e){
        LOGGER.error("General Exception in XMLReducer",e);
    }
  } 
}

编辑: 在我添加的建议答案之后:

 BasicDBObject query = new BasicDBObject("product_image", basicDBObject.get("product_image"))
                 .append("product_name", basicDBObject.get("product_name"));
                 operation.find(query).upsert().updateOne(new BasicDBObject("$setOnInsert", basicDBObject));
 operation.insert(basicDBObject);

我收到如下错误:com.mongodb.MongoInternalException: no mapping found for index 0

任何帮助都将是 useful.Thanks。

我想这完全取决于你真正想用 "duplicates" 做什么,以及你如何处理它。

你总是可以使用 .initializeUnOrderedBulkOperation() which won't "error" on a duplicate key from your index ( which you need to stop duplicates ) but will report any such errors in the returned BulkWriteResult 对象。这是从 .execute()

返回的
BulkWriteResult result = operation.execute();

另一方面,您可以只使用 "upserts" 并使用 $setOnInsert 等运算符来仅在不存在重复项的情况下进行更改:

BasicDBObject basicdbobject = new BasicDBObject(insertProductInfo);
BasicDBObject query = new BasicDBObject("key", basicdbobject.get("key"));

operation.find(query).upsert().updateOne(new BasicDBObject("$setOnInsert", basicdbobject));

因此,您基本上是查找包含 "key" 的字段的值以确定与查询的重复项,然后实际上只更改未找到 "key" 的任何数据,因此新文档和 "inserted".

在任何一种情况下,这里的默认行为都是 "insert" 第一个唯一的 "key" 值,然后忽略所有其他事件。如果您想在找到相同键的地方做其他事情,例如 "overwrite" 或 "increment" 值,那么 .update() "upsert" 方法就是您想要的方法,但您将使用其他方法update operators 对于这些操作。