如何将reducer的输出写入数据库?
How to write the output of reducer to a database?
我将从一个例子开始。假设输入数据类似于
User1,product1,time1
User1,product2,time2
User1,product3,time3
User2,product2,time2
User2,product4,time6
现在预期的输出是我必须将数据插入数据库(在我的例子中是 Aerospike(Key Value Store)),其中数据的格式应为
User1, [ [product1,time1],[product2,time2],[product3,time3] ]
User2, [ [product2,time2],[product4,time6] ]
所以在 Mapper 中我输出以下内容
UserID, [productid,timestamp]
Please do not assume that [x,y] means i am outputting list i may send data from mappper in any way may be write the data in a custom object
所以在接收端我有格式为
的数据
User1, [ [product1,time1],[product2,time2],[product3,time3] ]
User2, [ [product2,time2],[product4,time6] ]
现在我可以做两件事
a) 我可以编写逻辑来仅在 reducer 中将此数据推送到数据库中
(我不想这样做)
b) 我想做的是,当我们执行 Context.write() 时,我希望将数据写入数据库。
请帮助如何做到这一点,如果可能的话附上代码片段或伪代码
PS :Context.write() 是做什么的?它写到哪里?它经历了哪些步骤和阶段?
据我了解,调用 context.write 涉及一定数量的步骤
在驱动程序中我们必须指定输出格式。现在让我们看看如果我们想写入文件会发生什么
为了写入文本文件,我们指定类似
的内容
job.setOutputFormatClass(TextOutputFormat.class);
现在,如果我们看到扩展 FileOutputFormat(abstract class) 的 TextOutputFormat class 的实现,它实现了 OutputFormat 接口并且 OutputFormat 接口提供了两个方法
1) getRecordWriter
2) checkOutputSpecs
现在会发生什么,OutputFormatClass 只是告诉你想要写什么样的记录以及记录者是如何给出的,对于记录者来说它只是 Object Key, Object Value
其中值可以是单个或列表,并且在记录编写器的实现中,我们指定了实际的逻辑,比如应该如何编写这条记录。
现在回到最初的问题,在我的案例 Aerospike 中,应该如何将记录写入数据库
我创建了一个自定义的 OutputFormat 说
public class AerospikeOutputFormat extends OutputFormat {
//Return a new instance of record writer
@Override
public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
return new AerospikeRecordWriter(context.getConfiguration(), new Progressable() {
@Override
public void progress() {
}
});
}
}
现在我们必须定义一个自定义记录编写器,它将获取一个键和一个值并将数据写入数据库
public class RSRVRecordWriter<KK,VV> extends RecordWriter<KK, VV> {
@Override
public void write(KK key, VV value) throws IOException {
//Now here we can have an instance of aerospikeclient from a singleton class and then we could do client.put()
}
以上代码只是一个片段,一定要采取适当的设计策略。
PS:Aerospike 提供了一个记录器,可以在 this link
进行扩展以满足您的需求
我将从一个例子开始。假设输入数据类似于
User1,product1,time1
User1,product2,time2
User1,product3,time3
User2,product2,time2
User2,product4,time6
现在预期的输出是我必须将数据插入数据库(在我的例子中是 Aerospike(Key Value Store)),其中数据的格式应为
User1, [ [product1,time1],[product2,time2],[product3,time3] ]
User2, [ [product2,time2],[product4,time6] ]
所以在 Mapper 中我输出以下内容
UserID, [productid,timestamp]
Please do not assume that [x,y] means i am outputting list i may send data from mappper in any way may be write the data in a custom object
所以在接收端我有格式为
的数据User1, [ [product1,time1],[product2,time2],[product3,time3] ]
User2, [ [product2,time2],[product4,time6] ]
现在我可以做两件事
a) 我可以编写逻辑来仅在 reducer 中将此数据推送到数据库中 (我不想这样做)
b) 我想做的是,当我们执行 Context.write() 时,我希望将数据写入数据库。
请帮助如何做到这一点,如果可能的话附上代码片段或伪代码
PS :Context.write() 是做什么的?它写到哪里?它经历了哪些步骤和阶段?
据我了解,调用 context.write 涉及一定数量的步骤
在驱动程序中我们必须指定输出格式。现在让我们看看如果我们想写入文件会发生什么
为了写入文本文件,我们指定类似
的内容job.setOutputFormatClass(TextOutputFormat.class);
现在,如果我们看到扩展 FileOutputFormat(abstract class) 的 TextOutputFormat class 的实现,它实现了 OutputFormat 接口并且 OutputFormat 接口提供了两个方法
1) getRecordWriter
2) checkOutputSpecs
现在会发生什么,OutputFormatClass 只是告诉你想要写什么样的记录以及记录者是如何给出的,对于记录者来说它只是 Object Key, Object Value
其中值可以是单个或列表,并且在记录编写器的实现中,我们指定了实际的逻辑,比如应该如何编写这条记录。
现在回到最初的问题,在我的案例 Aerospike 中,应该如何将记录写入数据库
我创建了一个自定义的 OutputFormat 说
public class AerospikeOutputFormat extends OutputFormat {
//Return a new instance of record writer
@Override
public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
return new AerospikeRecordWriter(context.getConfiguration(), new Progressable() {
@Override
public void progress() {
}
});
}
}
现在我们必须定义一个自定义记录编写器,它将获取一个键和一个值并将数据写入数据库
public class RSRVRecordWriter<KK,VV> extends RecordWriter<KK, VV> {
@Override
public void write(KK key, VV value) throws IOException {
//Now here we can have an instance of aerospikeclient from a singleton class and then we could do client.put()
}
以上代码只是一个片段,一定要采取适当的设计策略。
PS:Aerospike 提供了一个记录器,可以在 this link
进行扩展以满足您的需求