如何将reducer的输出写入数据库?

How to write the output of reducer to a database?

我将从一个例子开始。假设输入数据类似于

User1,product1,time1
User1,product2,time2
User1,product3,time3
User2,product2,time2
User2,product4,time6

现在预期的输出是我必须将数据插入数据库(在我的例子中是 Aerospike(Key Value Store)),其中数据的格式应为

User1, [ [product1,time1],[product2,time2],[product3,time3] ]
User2, [ [product2,time2],[product4,time6] ]

所以在 Mapper 中我输出以下内容

UserID, [productid,timestamp]

Please do not assume that [x,y] means i am outputting list i may send data from mappper in any way may be write the data in a custom object

所以在接收端我有格式为

的数据
User1, [ [product1,time1],[product2,time2],[product3,time3] ]
User2, [ [product2,time2],[product4,time6] ]

现在我可以做两件事

a) 我可以编写逻辑来仅在 reducer 中将此数据推送到数据库中 (我不想这样做)

b) 我想做的是,当我们执行 Context.write() 时,我希望将数据写入数据库。

请帮助如何做到这一点,如果可能的话附上代码片段或伪代码

PS :Context.write() 是做什么的?它写到哪里?它经历了哪些步骤和阶段?

据我了解,调用 context.write 涉及一定数量的步骤

在驱动程序中我们必须指定输出格式。现在让我们看看如果我们想写入文件会发生什么

为了写入文本文件,我们指定类似

的内容
job.setOutputFormatClass(TextOutputFormat.class);

现在,如果我们看到扩展 FileOutputFormat(abstract class) 的 TextOutputFormat class 的实现,它实现了 OutputFormat 接口并且 OutputFormat 接口提供了两个方法

1) getRecordWriter
2) checkOutputSpecs

现在会发生什么,OutputFormatClass 只是告诉你想要写什么样的记录以及记录者是如何给出的,对于记录者来说它只是 Object Key, Object Value 其中值可以是单个或列表,并且在记录编写器的实现中,我们指定了实际的逻辑,比如应该如何编写这条记录。

现在回到最初的问题,在我的案例 Aerospike 中,应该如何将记录写入数据库

我创建了一个自定义的 OutputFormat 说

public class AerospikeOutputFormat extends OutputFormat {
    //Return a new instance of record writer
    @Override
    public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
        return new AerospikeRecordWriter(context.getConfiguration(), new Progressable() {
        @Override
        public void progress() {

        }
    });
    }

}

现在我们必须定义一个自定义记录编写器,它将获取一个键和一个值并将数据写入数据库

public class RSRVRecordWriter<KK,VV> extends RecordWriter<KK, VV> {

    @Override
    public void write(KK key, VV value) throws IOException {
        //Now here we can have an instance of aerospikeclient from a singleton class and then we could do client.put()

    }

以上代码只是一个片段,一定要采取适当的设计策略。

PS:Aerospike 提供了一个记录器,可以在 this link

进行扩展以满足您的需求