gpt4 book ai didi

mysql - 一种从Mysql读取表数据到Pig的方法

转载 作者:可可西里 更新时间:2023-11-01 06:50:18 25 4
gpt4 key购买 nike

大家都知道Pig支持了DBStorage ,但它们只支持从 Pig 到 mysql 的加载结果

STORE data INTO DBStorage('com.mysql.jdbc.Driver', 'dbc:mysql://host/db', 'INSERT ...');

但请告诉我如何从 mysql 中读取表

data = LOAD 'my_table' AS DBStorage('com.mysql.jdbc.Driver', 'dbc:mysql://host/db', 'SELECT * FROM my_table');

这是我的代码

public class DBLoader extends LoadFunc {
private final Log log = LogFactory.getLog(getClass());
private ArrayList mProtoTuple = null;
private Connection con;
private String jdbcURL;
private String user;
private String pass;
private int batchSize;
private int count = 0;
private String query;
ResultSet result;
protected TupleFactory mTupleFactory = TupleFactory.getInstance();

public DBLoader() {
}

public DBLoader(String driver, String jdbcURL, String user, String pass,
String query) {

try {
Class.forName(driver);
} catch (ClassNotFoundException e) {
log.error("can't load DB driver:" + driver, e);
throw new RuntimeException("Can't load DB Driver", e);
}
this.jdbcURL = jdbcURL;
this.user = user;
this.pass = pass;
this.query = query;

}

@Override
public InputFormat getInputFormat() throws IOException {
// TODO Auto-generated method stub
return new TextInputFormat();
}

@Override
public Tuple getNext() throws IOException {
// TODO Auto-generated method stub
boolean next = false;

try {
next = result.next();
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

if (!next)
return null;
int numColumns = 0;
// Get result set meta data
ResultSetMetaData rsmd;
try {
rsmd = result.getMetaData();
numColumns = rsmd.getColumnCount();
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

for (int i = 0; i < numColumns; i++) {

try {
Object field = result.getObject(i);

switch (DataType.findType(field)) {
case DataType.NULL:

mProtoTuple.add(null);

break;

case DataType.BOOLEAN:
mProtoTuple.add((Boolean) field);

break;

case DataType.INTEGER:
mProtoTuple.add((Integer) field);

break;

case DataType.LONG:
mProtoTuple.add((Long) field);

break;

case DataType.FLOAT:
mProtoTuple.add((Float) field);

break;

case DataType.DOUBLE:
mProtoTuple.add((Double) field);

break;

case DataType.BYTEARRAY:
byte[] b = ((DataByteArray) field).get();
mProtoTuple.add(b);

break;
case DataType.CHARARRAY:
mProtoTuple.add((String) field);

break;
case DataType.BYTE:
mProtoTuple.add((Byte) field);

break;

case DataType.MAP:
case DataType.TUPLE:
case DataType.BAG:
throw new RuntimeException("Cannot store a non-flat tuple "
+ "using DbStorage");

default:
throw new RuntimeException("Unknown datatype "
+ DataType.findType(field));

}

} catch (Exception ee) {
throw new RuntimeException(ee);
}
}

Tuple t = mTupleFactory.newTuple(mProtoTuple);
mProtoTuple.clear();
return t;

}

@Override
public void prepareToRead(RecordReader arg0, PigSplit arg1)
throws IOException {

con = null;
if (query == null) {
throw new IOException("SQL Insert command not specified");
}
try {
if (user == null || pass == null) {
con = DriverManager.getConnection(jdbcURL);
} else {
con = DriverManager.getConnection(jdbcURL, user, pass);
}
con.setAutoCommit(false);
result = con.createStatement().executeQuery(query);
} catch (SQLException e) {
log.error("Unable to connect to JDBC @" + jdbcURL);
throw new IOException("JDBC Error", e);
}
count = 0;
}

@Override
public void setLocation(String location, Job job) throws IOException {
// TODO Auto-generated method stub

//TextInputFormat.setInputPaths(job, location);

}

class MyDBInputFormat extends InputFormat<NullWritable, NullWritable>{

@Override
public RecordReader<NullWritable, NullWritable> createRecordReader(
InputSplit arg0, TaskAttemptContext arg1) throws IOException,
InterruptedException {
// TODO Auto-generated method stub
return null;
}

@Override
public List<InputSplit> getSplits(JobContext arg0) throws IOException,
InterruptedException {
// TODO Auto-generated method stub
return null;
}

}

}

我尝试了很多次写UDF但都没有成功.....

最佳答案

如您所说,DBStorage 仅支持将结果保存到数据库。

要从 MySQL 加载数据,您可以查看名为 sqoop 的项目(将数据从数据库复制到 HDFS),或者您可以执行 mysql 转储,然后将文件复制到 HDFS。这两种方式都需要一些交互,不能直接从 Pig 内部使用。

第三种选择是考虑编写一个 Pig LoadFunc(您说您尝试编写一个 UDF)。它应该不会太难,您需要传递与 DBStorage 相同的选项(驱动程序、连接凭据和要执行的 SQL 查询),并且您可能也可以使用一些结果集元数据检查来自动生成模式。

关于mysql - 一种从Mysql读取表数据到Pig的方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10942739/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com