gpt4 book ai didi

hadoop - 从 HIVE 中的 REST API 访问数据

转载 作者:可可西里 更新时间:2023-11-01 16:42:19 26 4
gpt4 key购买 nike

有没有办法创建一个配置单元表,其中该配置单元表的位置将是一个 http JSON REST API?我不想每次都在 HDFS 中导入数据。

最佳答案

几年前我在一个项目中遇到过类似的情况。这是一种将数据从 Restful 摄取到 HDFS 的低调方式,然后您使用 Hive 分析来实现业务逻辑。我希望您熟悉核心 Java,Map Reduce(如果不是,您可以查看 Hortonworks Data Flow, HDF 是 Hortonworks 的产品)。

第 1 步:您的数据摄取工作流不应绑定(bind)到包含业务逻辑的 Hive 工作流。这应该根据您的要求(数据流的数量和速度)及时独立执行并定期监控。我正在文本编辑器上编写这段代码。警告:它未经编译或测试!!

下面的代码使用了一个 Mapper,它将接受 url 或调整它以接受来自 FS 的 url 列表。有效负载或请求的数据以文本文件的形式存储在指定的作业输出目录中(这次忘记数据的结构)。

映射器类:

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URL;
import java.net.URLConnection;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;


public class HadoopHttpClientMap extends Mapper<LongWritable, Text, Text, Text> {
private int file = 0;
private String jobOutDir;
private String taskId;

@Override
protected void setup(Context context) throws IOException,InterruptedException {
super.setup(context);

jobOutDir = context.getOutputValueClass().getName();
taskId = context.getJobID().toString();

}

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{

Path httpDest = new Path(jobOutDir, taskId + "_http_" + (file++));

InputStream is = null;
OutputStream os = null;
URLConnection connection;
try {
connection = new URL(value.toString()).openConnection();
//implement connection timeout logics
//authenticate.. etc
is = connection.getInputStream();

os = FileSystem.getLocal(context.getConfiguration()).create(httpDest,true);

IOUtils.copyBytes(is, os, context.getConfiguration(), true);

} catch(Throwable t){
t.printStackTrace();
}finally {
IOUtils.closeStream(is);
IOUtils.closeStream(os);
}

context.write(value, null);
//context.write(new Text (httpDest.getName()), new Text (os.toString()));
}

}

仅映射器作业:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;


public class HadoopHttpClientJob {
private static final String data_input_directory = “YOUR_INPUT_DIR”;
private static final String data_output_directory = “YOUR_OUTPUT_DIR”;

public HadoopHttpClientJob() {
}

public static void main(String... args) {
try {
Configuration conf = new Configuration();

Path test_data_in = new Path(data_input_directory, "urls.txt");
Path test_data_out = new Path(data_output_directory);

@SuppressWarnings("deprecation")
Job job = new Job(conf, "HadoopHttpClientMap" + System.currentTimeMillis());
job.setJarByClass(HadoopHttpClientJob.class);

FileSystem fs = FileSystem.get(conf);

fs.delete(test_data_out, true);
job.setMapperClass(HadoopHttpClientMap.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setNumReduceTasks(0);

FileInputFormat.addInputPath(job, test_data_in);
FileOutputFormat.setOutputPath(job, test_data_out);

job.waitForCompletion(true);

}catch (Throwable t){
t.printStackTrace();
}
}
}

第二步:根据HDFS目录在Hive中创建外部表。请记住对 JSON 数据使用 Hive SerDe(在您的情况下),然后您可以将数据从外部表复制到托管主表中。这是您实现增量逻辑、压缩的步骤。

第 3 步:将您的配置单元查询(您可能已经创建)指向主表以实现您的业务需求。

注意:如果您指的是实时分析或流式 api,您可能需要更改应用程序的架构。既然你问了建筑问题,我就用我最好的有根据的猜测来支持你。请通过一次。如果您觉得可以在您的应用程序中实现这一点,那么您可以提出具体问题,我会尽力解决。

关于hadoop - 从 HIVE 中的 REST API 访问数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39735425/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com