我有一个有两个节点的cassandra集群。。我已经设置了spark作业来查询这个cassandra集群,它有3651568个密钥。
import com.datastax.spark.connector.rdd.ReadConf
import org.apache.spark.sql.cassandra
import org.apache.spark.sql.SparkSession
val conf = new SparkConf(true).set("spark.cassandra.connection.host", "hostname)
val sc = new SparkContext(conf)
val spark = SparkSession.builder().master("local").appName("Spark_Cassandra").config("spark.cassandra.connection.host", "hostname").getOrCreate()
val studentsDF = spark.read.cassandraFormat("keyspacename", "tablename").options(ReadConf.SplitSizeInMBParam.option(32)).load()
studentsDF.show(1000)
我可以查询前1000行,但我找不到一种方法来读取 1001th
行到第2000行,这样我就可以使用spark job从cassandra表中批量读取数据。
根据推荐,我开始使用java驱动程序
下面是完整的解释
我必须使用datastax java驱动程序从cassandra数据库进行查询。。我使用的是datastax java驱动程序版本 cassandra-java-driver-3.5.1
和apache cassandra版本 apache-cassandra-3.0.9
我试着通过安装jar来解决依赖性我还检查了yaml文件种子、listen\u地址、rpc\u地址都指向我的主机并且start\u native\u transport设置为true这里是我的java代码,用于建立到cassandra数据库的连接
import java.net.InetAddress;
import com.datastax.driver.core.Metadata;
import java.net.UnknownHostException;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Cluster.Builder;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
public class Started {
public void connect()
{
try
{
Cluster cluster;
Session session;
cluster = Cluster.builder().addContactPoints("***.***.*.*").build();
cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(2000);
System.out.println("Connected to cluster:");
session= cluster.connect("demo");
Row row = session.execute("SELECT ename FROM demo.emp").one();
System.out.println(row.getString("ename"));
cluster.close();
}
catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args)
{
Started st = new Started();
st.connect();
}
}
`
我在cassandra集群中只有一个节点,它正在运行。我也可以在9042端口连接到它。。到目前为止还不错,但当我运行我的java程序,我得到这个错误或异常消息。。。
Connected to cluster:
`
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /***.***.*.*:9042 (com.datastax.driver.core.exceptions.TransportException: [/***.***.*.*:9042] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:232)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:79)
at com.datastax.driver.core.Cluster$Manager.negotiateProtocolVersionAndConnect(Cluster.java:1631)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1549)
at com.datastax.driver.core.Cluster.init(Cluster.java:160)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:342)
at com.datastax.driver.core.Cluster.connect(Cluster.java:292)
at Started.connect(Started.java:22)
at Started.main(Started.java:34)
`
谁能帮帮我!!
2条答案
按热度按时间rkttyhzu1#
这就是驱动程序兼容性的问题。最初我使用的是cassandra-java-driver-3.5.1和apache-cassandra-3.0.9。切换到cassandra-java-driver-3.0.8和apache-cassandra-3.0.9,并安装一些jar文件:
slf4j-log4j12-1.7.7.jar
,log4j-1.2.17.jar
,netty-all-4.0.39.Final.jar
.. 适合我:)e3bfsja22#
这可能不适合spark。例如,show只显示1000条记录,但不能保证记录的顺序。多次调用可能会产生不同的结果。
在spark中,如果您想遍历结果,最好的方法可能是以本地迭代器的形式获取结果,但这可能不是最好的方法。spark是一个处理远程集群数据的系统。这意味着在dataframeapi中进行处理。
如果你真的只想慢慢翻阅记录,你可以使用
toLocalIterator
将批抓取回驱动程序计算机(不推荐)。但是您可以通过使用java驱动程序执行select(*)来完成类似的任务。返回给您的结果集迭代器将在您遍历结果时自动分页遍历结果。使用java驱动程序分页的示例
https://docs.datastax.com/en/developer/java-driver/3.2/manual/paging/
spark远程数据处理示例
用于cassandrata的rdd文档用于cassandra的框架文档//rdd api sparkcontext.cassandratable(“ks”,“tab”).foreach(row=>//processrow)
使用本地化器的示例,可能是最不相关的方法
为什么你想用一个例子来做这个