在 HIVE 中从 REST API 访问数据
Access Data from REST API in HIVE
有没有办法创建配置单元 table,其中配置单元 table 的位置将是 http JSON REST API?不想每次都在HDFS中导入数据
几年前我在一个项目中遇到过类似的情况。这是一种低调的方式,从Restful提取数据到HDFS,然后你使用Hive分析来实现业务logic.I希望你熟悉核心Java,Map Reduce(如果不是,您可以查看 Hortonworks Data Flow,HDF,它是 Hortonworks 的产品)。
第 1 步:您的数据摄取工作流不应绑定到包含业务逻辑的 Hive 工作流。这应该根据您的要求(数据流的数量和速度)及时独立执行并定期监控。我正在文本编辑器上编写这段代码。警告:它未经编译或测试!!
下面的代码使用了一个 Mapper,它将接受 url 或调整它以接受来自 FS 的 urls 列表。负载或请求的数据以文本文件的形式存储在指定的作业输出目录中(这次忘记数据的结构)。
映射器Class:
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URL;
import java.net.URLConnection;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class HadoopHttpClientMap extends Mapper<LongWritable, Text, Text, Text> {
private int file = 0;
private String jobOutDir;
private String taskId;
@Override
protected void setup(Context context) throws IOException,InterruptedException {
super.setup(context);
jobOutDir = context.getOutputValueClass().getName();
taskId = context.getJobID().toString();
}
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
Path httpDest = new Path(jobOutDir, taskId + "_http_" + (file++));
InputStream is = null;
OutputStream os = null;
URLConnection connection;
try {
connection = new URL(value.toString()).openConnection();
//implement connection timeout logics
//authenticate.. etc
is = connection.getInputStream();
os = FileSystem.getLocal(context.getConfiguration()).create(httpDest,true);
IOUtils.copyBytes(is, os, context.getConfiguration(), true);
} catch(Throwable t){
t.printStackTrace();
}finally {
IOUtils.closeStream(is);
IOUtils.closeStream(os);
}
context.write(value, null);
//context.write(new Text (httpDest.getName()), new Text (os.toString()));
}
}
仅映射器作业:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class HadoopHttpClientJob {
private static final String data_input_directory = “YOUR_INPUT_DIR”;
private static final String data_output_directory = “YOUR_OUTPUT_DIR”;
public HadoopHttpClientJob() {
}
public static void main(String... args) {
try {
Configuration conf = new Configuration();
Path test_data_in = new Path(data_input_directory, "urls.txt");
Path test_data_out = new Path(data_output_directory);
@SuppressWarnings("deprecation")
Job job = new Job(conf, "HadoopHttpClientMap" + System.currentTimeMillis());
job.setJarByClass(HadoopHttpClientJob.class);
FileSystem fs = FileSystem.get(conf);
fs.delete(test_data_out, true);
job.setMapperClass(HadoopHttpClientMap.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setNumReduceTasks(0);
FileInputFormat.addInputPath(job, test_data_in);
FileOutputFormat.setOutputPath(job, test_data_out);
job.waitForCompletion(true);
}catch (Throwable t){
t.printStackTrace();
}
}
}
第二步:根据HDFS目录在Hive中创建外部table。请记住将 Hive SerDe 用于 JSON 数据(在您的情况下),然后您可以将数据从外部 table 复制到托管主机 table 中。这是您实现增量逻辑、压缩的步骤..
第 3 步:将您的配置单元查询(您可能已经创建)指向主 table 以实现您的业务需求。
注意:如果您指的是实时分析或流式处理 api,您可能需要更改应用程序的架构。既然你问了建筑问题,我就用我最好的有根据的猜测来支持你。请通过一次。如果你觉得你可以在你的应用程序中实现这一点,那么你可以提出具体问题,我会尽力解决它们。
有没有办法创建配置单元 table,其中配置单元 table 的位置将是 http JSON REST API?不想每次都在HDFS中导入数据
几年前我在一个项目中遇到过类似的情况。这是一种低调的方式,从Restful提取数据到HDFS,然后你使用Hive分析来实现业务logic.I希望你熟悉核心Java,Map Reduce(如果不是,您可以查看 Hortonworks Data Flow,HDF,它是 Hortonworks 的产品)。
第 1 步:您的数据摄取工作流不应绑定到包含业务逻辑的 Hive 工作流。这应该根据您的要求(数据流的数量和速度)及时独立执行并定期监控。我正在文本编辑器上编写这段代码。警告:它未经编译或测试!!
下面的代码使用了一个 Mapper,它将接受 url 或调整它以接受来自 FS 的 urls 列表。负载或请求的数据以文本文件的形式存储在指定的作业输出目录中(这次忘记数据的结构)。
映射器Class:
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URL;
import java.net.URLConnection;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class HadoopHttpClientMap extends Mapper<LongWritable, Text, Text, Text> {
private int file = 0;
private String jobOutDir;
private String taskId;
@Override
protected void setup(Context context) throws IOException,InterruptedException {
super.setup(context);
jobOutDir = context.getOutputValueClass().getName();
taskId = context.getJobID().toString();
}
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
Path httpDest = new Path(jobOutDir, taskId + "_http_" + (file++));
InputStream is = null;
OutputStream os = null;
URLConnection connection;
try {
connection = new URL(value.toString()).openConnection();
//implement connection timeout logics
//authenticate.. etc
is = connection.getInputStream();
os = FileSystem.getLocal(context.getConfiguration()).create(httpDest,true);
IOUtils.copyBytes(is, os, context.getConfiguration(), true);
} catch(Throwable t){
t.printStackTrace();
}finally {
IOUtils.closeStream(is);
IOUtils.closeStream(os);
}
context.write(value, null);
//context.write(new Text (httpDest.getName()), new Text (os.toString()));
}
}
仅映射器作业:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class HadoopHttpClientJob {
private static final String data_input_directory = “YOUR_INPUT_DIR”;
private static final String data_output_directory = “YOUR_OUTPUT_DIR”;
public HadoopHttpClientJob() {
}
public static void main(String... args) {
try {
Configuration conf = new Configuration();
Path test_data_in = new Path(data_input_directory, "urls.txt");
Path test_data_out = new Path(data_output_directory);
@SuppressWarnings("deprecation")
Job job = new Job(conf, "HadoopHttpClientMap" + System.currentTimeMillis());
job.setJarByClass(HadoopHttpClientJob.class);
FileSystem fs = FileSystem.get(conf);
fs.delete(test_data_out, true);
job.setMapperClass(HadoopHttpClientMap.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setNumReduceTasks(0);
FileInputFormat.addInputPath(job, test_data_in);
FileOutputFormat.setOutputPath(job, test_data_out);
job.waitForCompletion(true);
}catch (Throwable t){
t.printStackTrace();
}
}
}
第二步:根据HDFS目录在Hive中创建外部table。请记住将 Hive SerDe 用于 JSON 数据(在您的情况下),然后您可以将数据从外部 table 复制到托管主机 table 中。这是您实现增量逻辑、压缩的步骤..
第 3 步:将您的配置单元查询(您可能已经创建)指向主 table 以实现您的业务需求。
注意:如果您指的是实时分析或流式处理 api,您可能需要更改应用程序的架构。既然你问了建筑问题,我就用我最好的有根据的猜测来支持你。请通过一次。如果你觉得你可以在你的应用程序中实现这一点,那么你可以提出具体问题,我会尽力解决它们。