gpt4 book ai didi

hbase - 从 Apache Storm bolt 在 HBase 中插入和删除值的方法

转载 作者:行者123 更新时间:2023-12-04 12:40:23 25 4
gpt4 key购买 nike

我有一个在 Hadoop 上运行的 Storm 拓扑配置为伪分布式模式。拓扑包含一个 bolt ,它必须将数据写入 Hbase。
我用于测试目的的第一种方法是创建(并关闭)连接并在我的 bolt execute 中写入数据。方法。然而,我的本地机器上似乎没有太多资源来处理所有传入 HBase 的请求。在成功处理了大约 30 个请求后,我在 Storm 工作人员的日志中看到了以下内容:

o.a.z.ClientCnxn [INFO] Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
o.a.z.ClientCnxn [INFO] Socket connection established to localhost/127.0.0.1:2181, initiating session
o.a.z.ClientCnxn [INFO] Unable to read additional data from server sessionid 0x0, likely server has closed socket, closing socket connection and attempting reconnect
o.a.h.h.z.RecoverableZooKeeper [WARN] Possibly transient ZooKeeper, quorum=localhost:2181, exception=org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /hbase/hbaseid

我的想法是通过为每个 bolt 实例创建单个连接来减少到 HBase 的连接数量 - 在 prepare 中打开连接方法并在 cleanup 关闭它.但是根据文档 cleanup不保证在分布式模式下被调用。

在此之后,我发现了 Storm 的 Hbase 工作框架 - Storm -hbase .不幸的是,几乎没有关于它的信息,只有它的 github repo 中的自述文件。
  • 所以我的第一个问题是否使用storm-hbase for Storm-Hbase
    整合是好的解决方案吗?什么是最好的方法呢?

  • 此外,我需要能够从 HBase 表中删除单元格。但是我在storm-hbase doc中没有找到任何关于它的信息。
  • 使用storm-hbase可以做到这一点吗?或者回到
    上一个问题,有没有另一种方法可以做到这一切?

  • 提前致谢!

    最佳答案

    哦,男孩,我发光的时候了!我不得不从 Storm 对 HBase 进行大量优化写入,所以希望这会对您有所帮助。

    如果您刚刚开始 storm-hbase是开始将数据流式传输到 hbase 的好方法。您可以克隆项目,进行 maven 安装,然后在拓扑中引用它。

    但是,如果您开始获得更复杂的逻辑,那么创建您自己的类来与 HBase 对话可能是要走的路。这就是我将在此处的答案中展示的内容。

    项目设置

    我假设您正在使用 maven 和 maven-shade 插件。您需要引用 hbase-client:

    <dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>${hbase.version}</version>
    </dependency>

    还要确保打包 hbase-site.xml在您的拓扑 jar 中。您可以从集群下载此文件并将其放入 src/main/resources .我还有一个用于在开发中测试的名为 hbase-site.dev.xml .然后只需使用 shade 插件将其移动到 jar 的根目录。
    <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
    <version>2.4</version>
    <configuration>
    <createDependencyReducedPom>true</createDependencyReducedPom>
    <artifactSet>
    <excludes>
    <exclude>classworlds:classworlds</exclude>
    <exclude>junit:junit</exclude>
    <exclude>jmock:*</exclude>
    <exclude>*:xml-apis</exclude>
    <exclude>org.apache.maven:lib:tests</exclude>
    <exclude>log4j:log4j:jar:</exclude>
    <exclude>org.testng:testng</exclude>
    </excludes>
    </artifactSet>
    </configuration>
    <executions>
    <execution>
    <phase>package</phase>
    <goals>
    <goal>shade</goal>
    </goals>
    <configuration>
    <transformers>
    <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
    <resource>core-site.xml</resource>
    <file>src/main/resources/core-site.xml</file>
    </transformer>
    <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
    <resource>hbase-site.xml</resource>
    <file>src/main/resources/hbase-site.xml</file>
    </transformer>
    <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
    <resource>hdfs-site.xml</resource>
    <file>src/main/resources/hdfs-site.xml</file>
    </transformer>
    <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
    <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
    <mainClass></mainClass>
    </transformer>
    </transformers>
    <filters>
    <filter>
    <artifact>*:*</artifact>
    <excludes>
    <exclude>META-INF/*.SF</exclude>
    <exclude>META-INF/*.DSA</exclude>
    <exclude>META-INF/*.RSA</exclude>
    <exclude>junit/*</exclude>
    <exclude>webapps/</exclude>
    <exclude>testng*</exclude>
    <exclude>*.js</exclude>
    <exclude>*.png</exclude>
    <exclude>*.css</exclude>
    <exclude>*.json</exclude>
    <exclude>*.csv</exclude>
    </excludes>
    </filter>
    </filters>
    </configuration>
    </execution>
    </executions>
    </plugin>

    注意:我在那里有用于我使用的其他配置的行,因此如果您不需要它们,请删除它们。顺便说一句,我真的不喜欢像这样打包配置但是...它使设置 HBase 连接变得更加容易并解决了一堆奇怪的连接错误。

    在 Storm 中管理 HBase 连接

    2018 年 3 月 19 日更新:自从我写下这个答案以来,HBase 的 API 发生了重大变化,但概念是相同的。

    最重要的是创建 每个 bolt 实例都有一个 HConnection prepare方法,然后 重用该连接 在 bolt 的整个生命周期内!
    Configuration config = HBaseConfiguration.create();
    connection = HConnectionManager.createConnection(config);

    首先,您可以在 HBase 中执行单个 PUT。您可以通过这种方式打开/关闭每次调用的表。
    // single put method
    private HConnection connection;

    @SuppressWarnings("rawtypes")
    @Override
    public void prepare(java.util.Map stormConf, backtype.storm.task.TopologyContext context) {
    Configuration config = HBaseConfiguration.create();
    connection = HConnectionManager.createConnection(config);
    }

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
    try {
    // do stuff
    // call putFruit
    } catch (Exception e) {
    LOG.error("bolt error", e);
    collector.reportError(e);
    }
    }

    // example put method you'd call from within execute somewhere
    private void putFruit(String key, FruitResult data) throws IOException {
    HTableInterface table = connection.getTable(Constants.TABLE_FRUIT);
    try {
    Put p = new Put(key.getBytes());
    long ts = data.getTimestamp();
    p.add(Constants.FRUIT_FAMILY, Constants.COLOR, ts, data.getColor().getBytes());
    p.add(Constants.FRUIT_FAMILY, Constants.SIZE, ts, data.getSize().getBytes());
    p.add(Constants.FRUIT_FAMILY, Constants.WEIGHT, ts, Bytes.toBytes(data.getWeight()));
    table.put(p);
    } finally {
    try {
    table.close();
    } finally {
    // nothing
    }
    }
    }

    请注意,我在这里重新使用了连接。我建议从这里开始,因为这更容易工作和调试。最终这不会因为您尝试通过网络发送的请求数量而扩展,您需要开始将多个 PUT 一起批处理。

    为了批处理 PUT,您需要使用 HConnection 打开一个表并保持打开状态。您还需要将 Auto Flush 设置为 false。这意味着该表将自动缓冲请求,直到达到“hbase.client.write.buffer”大小(默认为 2097152)。
    // batch put method
    private static boolean AUTO_FLUSH = false;
    private static boolean CLEAR_BUFFER_ON_FAIL = false;
    private HConnection connection;
    private HTableInterface fruitTable;

    @SuppressWarnings("rawtypes")
    @Override
    public void prepare(java.util.Map stormConf, backtype.storm.task.TopologyContext context) {
    Configuration config = HBaseConfiguration.create();
    connection = HConnectionManager.createConnection(config);
    fruitTable = connection.getTable(Constants.TABLE_FRUIT);
    fruitTable.setAutoFlush(AUTO_FLUSH, CLEAR_BUFFER_ON_FAIL);
    }

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
    try {
    // do stuff
    // call putFruit
    } catch (Exception e) {
    LOG.error("bolt error", e);
    collector.reportError(e);
    }
    }

    // example put method you'd call from within execute somewhere
    private void putFruit(String key, FruitResult data) throws IOException {
    Put p = new Put(key.getBytes());
    long ts = data.getTimestamp();
    p.add(Constants.FRUIT_FAMILY, Constants.COLOR, ts, data.getColor().getBytes());
    p.add(Constants.FRUIT_FAMILY, Constants.SIZE, ts, data.getSize().getBytes());
    p.add(Constants.FRUIT_FAMILY, Constants.WEIGHT, ts, Bytes.toBytes(data.getWeight()));
    fruitTable.put(p);
    }

    在任一方法中,最好仍然尝试在 cleanup 中关闭 HBase 连接。 .请注意,在您的 worker 被杀之前,它可能不会被调用。

    其他的东西
  • 要删除,只需执行 new Delete(key);而不是放置。

  • 如果您有更多问题,请告诉我。

    关于hbase - 从 Apache Storm bolt 在 HBase 中插入和删除值的方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32128158/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com