gpt4 book ai didi

apache-spark - 使用 Hbase 进行 Spark Streaming

转载 作者:行者123 更新时间:2023-12-04 21:07:42 29 4
gpt4 key购买 nike

我正在尝试从 hbase 获取数据,对于所有的 tuto,我发现要获得 Hbase 的数据,我必须通过 Kafka,是否可以在不将 Kafka 包含在链中的情况下直接在 Spark Streaming 和 hbase 之间进行集成
谢谢 。

最佳答案

is it possible an integration between spark streaming and hbase directly without including Kafka



是的..它可能,因为我们在不使用 kafka 的情况下做了同样的事情。
见下面的例子 JavaHBaseStreamingBulkPutExample
package org.apache.hadoop.hbase.spark.example.hbasecontext;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.spark.JavaHBaseContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

/**
* This is a simple example of BulkPut with Spark Streaming
*/
final public class JavaHBaseStreamingBulkPutExample {

private JavaHBaseStreamingBulkPutExample() {}

public static void main(String[] args) {
if (args.length < 4) {
System.out.println("JavaHBaseBulkPutExample " +
"{host} {port} {tableName}");
return;
}

String host = args[0];
String port = args[1];
String tableName = args[2];

SparkConf sparkConf =
new SparkConf().setAppName("JavaHBaseStreamingBulkPutExample " +
tableName + ":" + port + ":" + tableName);

JavaSparkContext jsc = new JavaSparkContext(sparkConf);

try {
JavaStreamingContext jssc =
new JavaStreamingContext(jsc, new Duration(1000));

JavaReceiverInputDStream<String> javaDstream =
jssc.socketTextStream(host, Integer.parseInt(port));

Configuration conf = HBaseConfiguration.create();

JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);

hbaseContext.streamBulkPut(javaDstream,
TableName.valueOf(tableName),
new PutFunction());
} finally {
jsc.stop();
}
}

public static class PutFunction implements Function<String, Put> {

private static final long serialVersionUID = 1L;

public Put call(String v) throws Exception {
String[] part = v.split(",");
Put put = new Put(Bytes.toBytes(part[0]));

put.addColumn(Bytes.toBytes(part[1]),
Bytes.toBytes(part[2]),
Bytes.toBytes(part[3]));
return put;
}

}
}

关于apache-spark - 使用 Hbase 进行 Spark Streaming,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41593265/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com