- android - RelativeLayout 背景可绘制重叠内容
- android - 如何链接 cpufeatures lib 以获取 native android 库?
- java - OnItemClickListener 不起作用,但 OnLongItemClickListener 在自定义 ListView 中起作用
- java - Android 文件转字符串
大家都知道Pig支持了DBStorage ,但它们只支持从 Pig 到 mysql 的加载结果
STORE data INTO DBStorage('com.mysql.jdbc.Driver', 'dbc:mysql://host/db', 'INSERT ...');
但请告诉我如何从 mysql 中读取表
data = LOAD 'my_table' AS DBStorage('com.mysql.jdbc.Driver', 'dbc:mysql://host/db', 'SELECT * FROM my_table');
这是我的代码
public class DBLoader extends LoadFunc {
private final Log log = LogFactory.getLog(getClass());
private ArrayList mProtoTuple = null;
private Connection con;
private String jdbcURL;
private String user;
private String pass;
private int batchSize;
private int count = 0;
private String query;
ResultSet result;
protected TupleFactory mTupleFactory = TupleFactory.getInstance();
public DBLoader() {
}
public DBLoader(String driver, String jdbcURL, String user, String pass,
String query) {
try {
Class.forName(driver);
} catch (ClassNotFoundException e) {
log.error("can't load DB driver:" + driver, e);
throw new RuntimeException("Can't load DB Driver", e);
}
this.jdbcURL = jdbcURL;
this.user = user;
this.pass = pass;
this.query = query;
}
@Override
public InputFormat getInputFormat() throws IOException {
// TODO Auto-generated method stub
return new TextInputFormat();
}
@Override
public Tuple getNext() throws IOException {
// TODO Auto-generated method stub
boolean next = false;
try {
next = result.next();
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if (!next)
return null;
int numColumns = 0;
// Get result set meta data
ResultSetMetaData rsmd;
try {
rsmd = result.getMetaData();
numColumns = rsmd.getColumnCount();
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
for (int i = 0; i < numColumns; i++) {
try {
Object field = result.getObject(i);
switch (DataType.findType(field)) {
case DataType.NULL:
mProtoTuple.add(null);
break;
case DataType.BOOLEAN:
mProtoTuple.add((Boolean) field);
break;
case DataType.INTEGER:
mProtoTuple.add((Integer) field);
break;
case DataType.LONG:
mProtoTuple.add((Long) field);
break;
case DataType.FLOAT:
mProtoTuple.add((Float) field);
break;
case DataType.DOUBLE:
mProtoTuple.add((Double) field);
break;
case DataType.BYTEARRAY:
byte[] b = ((DataByteArray) field).get();
mProtoTuple.add(b);
break;
case DataType.CHARARRAY:
mProtoTuple.add((String) field);
break;
case DataType.BYTE:
mProtoTuple.add((Byte) field);
break;
case DataType.MAP:
case DataType.TUPLE:
case DataType.BAG:
throw new RuntimeException("Cannot store a non-flat tuple "
+ "using DbStorage");
default:
throw new RuntimeException("Unknown datatype "
+ DataType.findType(field));
}
} catch (Exception ee) {
throw new RuntimeException(ee);
}
}
Tuple t = mTupleFactory.newTuple(mProtoTuple);
mProtoTuple.clear();
return t;
}
@Override
public void prepareToRead(RecordReader arg0, PigSplit arg1)
throws IOException {
con = null;
if (query == null) {
throw new IOException("SQL Insert command not specified");
}
try {
if (user == null || pass == null) {
con = DriverManager.getConnection(jdbcURL);
} else {
con = DriverManager.getConnection(jdbcURL, user, pass);
}
con.setAutoCommit(false);
result = con.createStatement().executeQuery(query);
} catch (SQLException e) {
log.error("Unable to connect to JDBC @" + jdbcURL);
throw new IOException("JDBC Error", e);
}
count = 0;
}
@Override
public void setLocation(String location, Job job) throws IOException {
// TODO Auto-generated method stub
//TextInputFormat.setInputPaths(job, location);
}
class MyDBInputFormat extends InputFormat<NullWritable, NullWritable>{
@Override
public RecordReader<NullWritable, NullWritable> createRecordReader(
InputSplit arg0, TaskAttemptContext arg1) throws IOException,
InterruptedException {
// TODO Auto-generated method stub
return null;
}
@Override
public List<InputSplit> getSplits(JobContext arg0) throws IOException,
InterruptedException {
// TODO Auto-generated method stub
return null;
}
}
}
我尝试了很多次写UDF但都没有成功.....
最佳答案
如您所说,DBStorage
仅支持将结果保存到数据库。
要从 MySQL 加载数据,您可以查看名为 sqoop 的项目(将数据从数据库复制到 HDFS),或者您可以执行 mysql 转储,然后将文件复制到 HDFS。这两种方式都需要一些交互,不能直接从 Pig 内部使用。
第三种选择是考虑编写一个 Pig LoadFunc(您说您尝试编写一个 UDF)。它应该不会太难,您需要传递与 DBStorage 相同的选项(驱动程序、连接凭据和要执行的 SQL 查询),并且您可能也可以使用一些结果集元数据检查来自动生成模式。
关于mysql - 一种从Mysql读取表数据到Pig的方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10942739/
我想在嵌入式 pig 程序中执行一个 pig 脚本文件,如下所示 ----testPig.pig----- A = load '/user/biadmin/student' using PigStor
我正在使用 CurrentTime(),它是一种日期时间数据类型。但是,我需要它作为字符数组。我有以下几点: A = LOAD ... B = FOREACH A GENERATE CurrentTi
我有一个 hadoop 集群。 安装了 pig : 但是在 Hue (3.7) 中看不到 Pig 编辑器: 我该如何解决? 最佳答案 你能检查一下你的hue.ini文件吗 在解释器部分,如下图 # O
我在源文本文件中有一些日期值,如下面的第 3 列 123|text|2000-02-05 01:00:00-0500|true 如何将它们转换为 Pig 拉丁语中相应的 long 值?谢谢。 a =
看来我做不到 dump (limit A 10); 尽管 B = limit A 10; dump B; 似乎工作。 为什么?这似乎违反直觉。 最佳答案 确实是你不能这样做。 我相信为什么的问题不在范
A = load 'a.txt' as (id, a1); B = load 'b.txt as (id, b1); C = join A by id, B by id; D = foreach C
假设我有一个输入文件作为 map 。 sample.txt [1#"anything",2#"something",3#"anotherthing"] [2#"kish"] [3#"mad"] [4#
我正在尝试用 PIG 减去两个日期。 我有这样的数据: key_one, activation_date , deactivation_date (1456,2010-06-14 00:00:00,2
我正在与 pig 一起加载以逗号分隔的文件/文件夹 hadoop 范围。( this question on how to load multiple files in pig 问题是每个文件夹都有不
我一直认为 '' 和 "" 在 pig 中是一样的,但今天我得到了 Unexpected character '"' 出错 register datafu-pig-1.2.1.jar define C
我有一个运行 Hadoop 0.20.2 和 Pig 0.10 的集群。我有兴趣向 Pig 的源代码添加一些日志,并在集群上运行我自己的 Pig 版本。 我做了什么: 使用'ant'命令构建项目 得到
我无能为力地试图解决这个问题。我的脚本和 UDF 可以在 Pig 0.8.1 上完美运行,但是当我尝试在 Pig 0.10.0 上运行时,我得到: ERROR org.apache.pig.tools
目前我正在执行我的脚本: /usr/bin/pig /somepath/myscript.pig 出于某种原因,pig 总是卡在这个阶段。 2014-01-28 16:49:31,328 [main]
我有一个要加载到 Pig Engine 上的文本文件, 文本文件在单独的行中有名称,数据但有错误......特殊字符......像这样: Ja@@$s000on J@@a%^ke T!!ina M
我有一个用例,我需要计算两个字段的不同数量。 sample : x = LOAD 'testdata' using PigStorage('^A') as (a,b,c,d); y = GROUP x
我是 Pig 的新手,在解析我的输入并将其转换为我可以使用的格式时遇到了问题。输入文件包含具有固定字段和 KV 对的行,如下所示: FF1|FF2|FF3|FF4|KVP1|KVP2|...|KVPn
我有一个每天创建的文件文件夹,所有文件都存储相同类型的信息。我想制作一个脚本,加载最新的 10 个,将它们联合起来,然后在它们上运行一些其他代码。由于 pig 已经有一个 ls 方法,我想知道是否有一
我正在使用 Pig 0.11.0 排名函数并为我的数据中的每个 id 生成排名。 我需要以特定方式对我的数据进行排名。我希望每个新 ID 的排名都重置并从 1 开始。 是否可以直接使用 rank 函数
我有一个 (t,a,b) 形式的元组集合,我想在 Pig 中按 b 对它们进行分组。一旦分组,我想从每组中的元组中过滤出 b 并为每组生成一袋过滤后的元组。 例如,假设我们有 (1,2,1) (2,0
-- do something store result into '$RESULT.tmp'; rmf $RESULT mv $RESULT.tmp $RESULT 如果在 rmf $RESULT
我是一名优秀的程序员,十分优秀!