gpt4 book ai didi

apache-spark - 使用 Spark 连接到 MS SQL Server

转载 作者:行者123 更新时间:2023-12-03 20:07:39 26 4
gpt4 key购买 nike

我正在尝试使用 Spark JdbcRDD 从 SQL Server 数据库加载数据。我使用的是 Microsoft JDBC 驱动程序的 4.0 版。下面是一段代码:

 public JdbcRDD<Object[]> load(){
SparkConf conf = new SparkConf().setMaster("local").setAppName("myapp");
JavaSparkContext context = new JavaSparkContext(conf);
DbConnection connection = new DbConnection("com.microsoft.sqlserver.jdbc.SQLServerDriver","my-connection-string","test","test");
JdbcRDD<Object[]> jdbcRDD = new JdbcRDD<Object[]>(context.sc(),connection,"select * from <table>",1,1000,1,new JobMapper(),ClassManifestFactory$.MODULE$.fromClass(Object[].class));
return jdbcRDD;
}

public static void main(String[] args) {
JdbcRDD<Object[]> jdbcRDD = load();
JavaRDD<Object[]> javaRDD = JavaRDD.fromRDD(jdbcRDD, ClassManifestFactory$.MODULE$.fromClass(Object[].class));
List<String> ids = javaRDD.map(new Function<Object[],String>(){
public String call(final Object[] record){
return (String)record[0];
}
}).collect();
System.out.println(ids);
}

我收到以下异常:
java.lang.AbstractMethodError: com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.isClosed()Z
at org.apache.spark.rdd.JdbcRDD$$anon$1.close(JdbcRDD.scala:109)
at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63)
at org.apache.spark.rdd.JdbcRDD$$anon$1$$anonfun$1.apply(JdbcRDD.scala:74)
at org.apache.spark.rdd.JdbcRDD$$anon$1$$anonfun$1.apply(JdbcRDD.scala:74)
at org.apache.spark.TaskContextImpl$$anon$1.onTaskCompletion(TaskContextImpl.scala:49)
at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:68)
at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:66)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:58)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:695)

JobMapper 的定义如下:
public class JobMapper extends AbstractFunction1<ResultSet, Object[]> implements Serializable {

private static final Logger logger = Logger.getLogger(JobMapper.class);
public Object[] apply(ResultSet row){
return JdbcRDD.resultSetToObjectArray(row);
}

}

最佳答案

我发现了我正在做的事情的问题。有几件事:

  • 它似乎不适用于 4.0 版驱动程序。所以我把它改成使用 3.0 版
  • JdbcRDD 的文档指出,SQL 字符串必须包含两个指示查询范围的参数。所以我不得不改变查询。
  • JdbcRDD<Object[]> jdbcRDD = new JdbcRDD<Object[]>(context.sc(),connection,"SELECT * FROM <table> where Id >= ? and Id <= ?",1,20,1,new JobMapper(),ClassManifestFactory$.MODULE$.fromClass(Object[].class));
    参数 1 和 20 表示查询的范围。

    关于apache-spark - 使用 Spark 连接到 MS SQL Server,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28244278/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com