在 HIVE 中从 REST API 访问数据

Access Data from REST API in HIVE

有没有办法创建配置单元 table,其中配置单元 table 的位置将是 http JSON REST API?不想每次都在HDFS中导入数据

几年前我在一个项目中遇到过类似的情况。这是一种低调的方式,从Restful提取数据到HDFS,然后你使用Hive分析来实现业务logic.I希望你熟悉核心Java,Map Reduce(如果不是,您可以查看 Hortonworks Data Flow,HDF,它是 Hortonworks 的产品)。

第 1 步:您的数据摄取工作流不应绑定到包含业务逻辑的 Hive 工作流。这应该根据您的要求(数据流的数量和速度)及时独立执行并定期监控。我正在文本编辑器上编写这段代码。警告:它未经编译或测试!!

下面的代码使用了一个 Mapper,它将接受 url 或调整它以接受来自 FS 的 urls 列表。负载或请求的数据以文本文件的形式存储在指定的作业输出目录中(这次忘记数据的结构)。

映射器Class:

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URL;
import java.net.URLConnection;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;


public class HadoopHttpClientMap extends Mapper<LongWritable, Text, Text, Text> {
    private int file = 0;
    private String jobOutDir;
    private String taskId;

    @Override
    protected void setup(Context context) throws IOException,InterruptedException {
        super.setup(context);

        jobOutDir = context.getOutputValueClass().getName();
        taskId = context.getJobID().toString();

    }

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{

        Path httpDest = new Path(jobOutDir, taskId + "_http_" + (file++));

        InputStream is = null;
        OutputStream os = null;
        URLConnection connection;
        try {
            connection = new URL(value.toString()).openConnection();
            //implement connection timeout logics
            //authenticate.. etc
            is = connection.getInputStream();

            os = FileSystem.getLocal(context.getConfiguration()).create(httpDest,true);

            IOUtils.copyBytes(is, os, context.getConfiguration(), true);

        } catch(Throwable t){
            t.printStackTrace();
        }finally {
            IOUtils.closeStream(is);
            IOUtils.closeStream(os);
        }

        context.write(value, null);
        //context.write(new Text (httpDest.getName()), new Text (os.toString()));
    }

}  

仅映射器作业:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;


public class HadoopHttpClientJob {
    private static final String data_input_directory  =  “YOUR_INPUT_DIR”;
    private static final String data_output_directory  =  “YOUR_OUTPUT_DIR”;

    public HadoopHttpClientJob() {
    }

    public static void main(String... args) {
        try {
            Configuration conf = new Configuration();

            Path test_data_in = new Path(data_input_directory, "urls.txt");
            Path test_data_out = new Path(data_output_directory);

            @SuppressWarnings("deprecation")
            Job job = new Job(conf, "HadoopHttpClientMap" + System.currentTimeMillis());
            job.setJarByClass(HadoopHttpClientJob.class);

            FileSystem fs = FileSystem.get(conf);

            fs.delete(test_data_out, true);
            job.setMapperClass(HadoopHttpClientMap.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(TextOutputFormat.class);
            job.setNumReduceTasks(0);

            FileInputFormat.addInputPath(job, test_data_in);
            FileOutputFormat.setOutputPath(job, test_data_out);

            job.waitForCompletion(true);

        }catch (Throwable t){
            t.printStackTrace();
        }
    }
}

第二步:根据HDFS目录在Hive中创建外部table。请记住将 Hive SerDe 用于 JSON 数据(在您的情况下),然后您可以将数据从外部 table 复制到托管主机 table 中。这是您实现增量逻辑、压缩的步骤..

第 3 步:将您的配置单元查询(您可能已经创建)指向主 table 以实现您的业务需求。

注意:如果您指的是实时分析或流式处理 api,您可能需要更改应用程序的架构。既然你问了建筑问题,我就用我最好的有根据的猜测来支持你。请通过一次。如果你觉得你可以在你的应用程序中实现这一点,那么你可以提出具体问题,我会尽力解决它们。