gpt4 book ai didi

java - cassandra java驱动程序连接抛出noHostAvailableException

转载 作者:行者123 更新时间:2023-12-02 10:47:30 25 4
gpt4 key购买 nike

我有一个带有两个节点的 cassandra 集群..我已经设置了 Spark 作业来从这个 cassandra 集群进行查询,该集群有 3651568 个键。

import com.datastax.spark.connector.rdd.ReadConf
import org.apache.spark.sql.cassandra
import org.apache.spark.sql.SparkSession

val conf = new SparkConf(true).set("spark.cassandra.connection.host", "hostname)
val sc = new SparkContext(conf)

val spark = SparkSession.builder().master("local").appName("Spark_Cassandra").config("spark.cassandra.connection.host", "hostname").getOrCreate()
val studentsDF = spark.read.cassandraFormat("keyspacename", "tablename").options(ReadConf.SplitSizeInMBParam.option(32)).load()
studentsDF.show(1000)

我能够查询前 1000 行,但无法找到从第 1001 行读取到第 2000 行的方法,以便我可以使用 Spark 作业从 Cassandra 表中批量读取数据。

根据建议我开始使用java驱动程序

这是完整的解释

我必须使用 datastax java 驱动程序从 cassandra 数据库查询。我正在使用 datastax java 驱动程序版本 cassandra-java-driver-3.5.1 和 apache-cassandra 版本 apache-cassandra -3.0.9并且我尝试通过安装jar来解决依赖关系我还检查了yaml文件种子,listen_address,rpc_address都指向我的主机并且start_native_transport设置为true这是我的java代码,用于建立与cassandra数据库的连接 `

import java.net.InetAddress;
import com.datastax.driver.core.Metadata;
import java.net.UnknownHostException;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Cluster.Builder;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
public class Started {
public void connect()
{
try
{
Cluster cluster;
Session session;
cluster = Cluster.builder().addContactPoints("***.***.*.*").build();
cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(2000);
System.out.println("Connected to cluster:");
session= cluster.connect("demo");
Row row = session.execute("SELECT ename FROM demo.emp").one();
System.out.println(row.getString("ename"));
cluster.close();
}
catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args)
{
Started st = new Started();
st.connect();
}
}

`

我的 cassandra 集群中只有一个节点,并且它已启动并正在运行。我也可以在 9042 端口上对其进行 cqlsh 操作..到目前为止一切顺利,但是当我运行我的 java 程序时,我收到此错误或异常消息...

Connected to cluster:
`

com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /***.***.*.*:9042 (com.datastax.driver.core.exceptions.TransportException: [/***.***.*.*:9042] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:232)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:79)
at com.datastax.driver.core.Cluster$Manager.negotiateProtocolVersionAndConnect(Cluster.java:1631)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1549)
at com.datastax.driver.core.Cluster.init(Cluster.java:160)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:342)
at com.datastax.driver.core.Cluster.connect(Cluster.java:292)
at Started.connect(Started.java:22)
at Started.main(Started.java:34)

`

谁能帮忙!!

最佳答案

这可能不适合 Spark。例如,显示仅显示 1000 条记录,但不保证记录的顺序。多次调用可能会产生不同的结果。

如果您想对 Spark 进行分页,那么您在 Spark 中最好的选择可能是将结果作为本地迭代器获取,但这可能不是最好的方法。 Spark 是一个用于处理远程集群上的数据的系统。这意味着在数据帧 API 中进行处理。

如果您确实只想缓慢地翻阅记录,则可以使用 toLocalIterator 将批处理抓取回驱动程序计算机(不推荐)。但是您可以通过使用 Java 驱动程序执行 Select (*) 来完成类似的操作。当您浏览结果时,返回给您的结果集迭代器将自动对结果进行分页。

使用 Java 驱动程序分页的示例

https://docs.datastax.com/en/developer/java-driver/3.2/manual/paging/

ResultSet rs = session.execute("your query");
for (Row row : rs) {
// Process the row ...
// By default this will only pull a new "page" of data from cassandra
// when the previous page has been fully iterated through. See the
// docs for more details
}

使用 Spark 远程处理数据的示例

RDD Docs for Cassandra Dataframe Docs for Cassandra //RDD接口(interface) SparkContext.cassandraTable("ks","tab").foreach(row =>//processRow)

//Dataframe API - although similar foreach is available here as well
spark.read.format("org.apache.spark.sql.cassandra")
.load()
.select(//do some transforms)
.write(//pickoutput of request)

使用 localIterator 的示例,可能是最不相关的方法

Why you might want to do this with an example

// This reads all data in large blocks to executors, those blocks are then pulled one at a time back to the Spark Driver.
sparkContext.cassandraTable("ks","tab").toLocalIterator

关于java - cassandra java驱动程序连接抛出noHostAvailableException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52443918/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com