使用 hadoop reducer 在 BulkWriteOperation 进入 mongo 时检查重复记录
Check for duplicate records while BulkWriteOperation into mongo using hadoop reducer
我正在使用 hadoop map-reduce 处理 XML 文件。我直接把JSON的数据存入mongodb。
怎样才能做到在执行BulkWriteOperation
前只把不重复的记录存入数据库?
重复记录标准将基于产品图像和产品名称,我不想使用吗啡层,其中我们可以为 class 成员分配索引。
这是我的减速器 class:
public class XMLReducer extends Reducer<Text, MapWritable, Text, NullWritable>{
private static final Logger LOGGER = Logger.getLogger(XMLReducer.class);
protected void reduce(Text key, Iterable<MapWritable> values, Context ctx) throws IOException, InterruptedException{
LOGGER.info("reduce()------Start for key>"+key);
Map<String,String> insertProductInfo = new HashMap<String,String>();
try{
MongoClient mongoClient = new MongoClient("localhost", 27017);
DB db = mongoClient.getDB("test");
BulkWriteOperation operation = db.getCollection("product").initializeOrderedBulkOperation();
for (MapWritable entry : values) {
for (Entry<Writable, Writable> extractProductInfo : entry.entrySet()) {
insertProductInfo.put(extractProductInfo.getKey().toString(), extractProductInfo.getValue().toString());
}
if(!insertProductInfo.isEmpty()){
BasicDBObject basicDBObject = new BasicDBObject(insertProductInfo);
operation.insert(basicDBObject);
}
}
//How can I check for duplicates before executing bulk operation
operation.execute();
LOGGER.info("reduce------end for key"+key);
}catch(Exception e){
LOGGER.error("General Exception in XMLReducer",e);
}
}
}
编辑: 在我添加的建议答案之后:
BasicDBObject query = new BasicDBObject("product_image", basicDBObject.get("product_image"))
.append("product_name", basicDBObject.get("product_name"));
operation.find(query).upsert().updateOne(new BasicDBObject("$setOnInsert", basicDBObject));
operation.insert(basicDBObject);
我收到如下错误:com.mongodb.MongoInternalException: no mapping found for index 0
任何帮助都将是 useful.Thanks。
我想这完全取决于你真正想用 "duplicates" 做什么,以及你如何处理它。
你总是可以使用 .initializeUnOrderedBulkOperation()
which won't "error" on a duplicate key from your index ( which you need to stop duplicates ) but will report any such errors in the returned BulkWriteResult
对象。这是从 .execute()
返回的
BulkWriteResult result = operation.execute();
另一方面,您可以只使用 "upserts" 并使用 $setOnInsert
等运算符来仅在不存在重复项的情况下进行更改:
BasicDBObject basicdbobject = new BasicDBObject(insertProductInfo);
BasicDBObject query = new BasicDBObject("key", basicdbobject.get("key"));
operation.find(query).upsert().updateOne(new BasicDBObject("$setOnInsert", basicdbobject));
因此,您基本上是查找包含 "key" 的字段的值以确定与查询的重复项,然后实际上只更改未找到 "key" 的任何数据,因此新文档和 "inserted".
在任何一种情况下,这里的默认行为都是 "insert" 第一个唯一的 "key" 值,然后忽略所有其他事件。如果您想在找到相同键的地方做其他事情,例如 "overwrite" 或 "increment" 值,那么 .update()
"upsert" 方法就是您想要的方法,但您将使用其他方法update operators 对于这些操作。
我正在使用 hadoop map-reduce 处理 XML 文件。我直接把JSON的数据存入mongodb。
怎样才能做到在执行BulkWriteOperation
前只把不重复的记录存入数据库?
重复记录标准将基于产品图像和产品名称,我不想使用吗啡层,其中我们可以为 class 成员分配索引。
这是我的减速器 class:
public class XMLReducer extends Reducer<Text, MapWritable, Text, NullWritable>{
private static final Logger LOGGER = Logger.getLogger(XMLReducer.class);
protected void reduce(Text key, Iterable<MapWritable> values, Context ctx) throws IOException, InterruptedException{
LOGGER.info("reduce()------Start for key>"+key);
Map<String,String> insertProductInfo = new HashMap<String,String>();
try{
MongoClient mongoClient = new MongoClient("localhost", 27017);
DB db = mongoClient.getDB("test");
BulkWriteOperation operation = db.getCollection("product").initializeOrderedBulkOperation();
for (MapWritable entry : values) {
for (Entry<Writable, Writable> extractProductInfo : entry.entrySet()) {
insertProductInfo.put(extractProductInfo.getKey().toString(), extractProductInfo.getValue().toString());
}
if(!insertProductInfo.isEmpty()){
BasicDBObject basicDBObject = new BasicDBObject(insertProductInfo);
operation.insert(basicDBObject);
}
}
//How can I check for duplicates before executing bulk operation
operation.execute();
LOGGER.info("reduce------end for key"+key);
}catch(Exception e){
LOGGER.error("General Exception in XMLReducer",e);
}
}
}
编辑: 在我添加的建议答案之后:
BasicDBObject query = new BasicDBObject("product_image", basicDBObject.get("product_image"))
.append("product_name", basicDBObject.get("product_name"));
operation.find(query).upsert().updateOne(new BasicDBObject("$setOnInsert", basicDBObject));
operation.insert(basicDBObject);
我收到如下错误:com.mongodb.MongoInternalException: no mapping found for index 0
任何帮助都将是 useful.Thanks。
我想这完全取决于你真正想用 "duplicates" 做什么,以及你如何处理它。
你总是可以使用 .initializeUnOrderedBulkOperation()
which won't "error" on a duplicate key from your index ( which you need to stop duplicates ) but will report any such errors in the returned BulkWriteResult
对象。这是从 .execute()
BulkWriteResult result = operation.execute();
另一方面,您可以只使用 "upserts" 并使用 $setOnInsert
等运算符来仅在不存在重复项的情况下进行更改:
BasicDBObject basicdbobject = new BasicDBObject(insertProductInfo);
BasicDBObject query = new BasicDBObject("key", basicdbobject.get("key"));
operation.find(query).upsert().updateOne(new BasicDBObject("$setOnInsert", basicdbobject));
因此,您基本上是查找包含 "key" 的字段的值以确定与查询的重复项,然后实际上只更改未找到 "key" 的任何数据,因此新文档和 "inserted".
在任何一种情况下,这里的默认行为都是 "insert" 第一个唯一的 "key" 值,然后忽略所有其他事件。如果您想在找到相同键的地方做其他事情,例如 "overwrite" 或 "increment" 值,那么 .update()
"upsert" 方法就是您想要的方法,但您将使用其他方法update operators 对于这些操作。