gpt4 book ai didi

java - 无法使用水槽从远程 HDFS 写入

转载 作者:行者123 更新时间:2023-12-02 21:41:20 26 4
gpt4 key购买 nike

我正在尝试使用水槽将文本写入远程机器的 HDFS。但我的尝试失败了。

我正在使用 cloudera quickstart VM 作为远程机器。
这是我的步骤:

  • 我已经启动了水槽:
    sudo init.d/flume-ng-agent start
  • 在 cloudera 管理器中编辑了水槽配置
    # Please paste flume.conf here. Example:

    # Sources, channels, and sinks are defined per
    # agent name, in this case 'tier1'.
    tier1.sources = source1
    tier1.channels = channel1
    tier1.sinks = sink1

    # For each source, channel, and sink, set
    # standard properties.
    tier1.sources.source1.type = avro
    tier1.sources.source1.bind = 172.24.***.*** # address of remote machine (cloudera quickstart VM)
    tier1.sources.source1.port = 41414
    tier1.sources.source1.channels = channel1

    tier1.channels.channel1.type = memory

    tier1.sinks.sink1.type = hdfs
    tier1.sinks.sink1.channel = channel1
    tier1.sinks.sink1.hdfs.path = /tmp/%y-%m-%d/%H%M/%S
    tier1.sinks.sink1.hdfs.fileType = DataStream

    #Format to be written
    tier1.sinks.sink1.hdfs.writeFormat = Text

    tier1.sinks.sink1.hdfs.maxOpenFiles = 10
    # rollover file based on maximum size of 10 MB
    tier1.sinks.sink1.hdfs.rollSize = 10485760

    # never rollover based on the number of events
    tier1.sinks.sink1.hdfs.rollCount = 0

    # rollover file based on max time of 1 mi
    tier1.sinks.sink1.hdfs.rollInterval = 60

    #Specify the channel the sink should use
    tier1.sinks.sink1.channel = memoryChannel

    # Other properties are specific to each type of
    # source, channel, or sink. In this case, we
    # specify the capacity of the memory channel.
    tier1.channels.channel1.capacity = 100
  • 这是我的代码应该向远程机器发送消息
    import org.apache.flume.Event;
    import org.apache.flume.EventDeliveryException;
    import org.apache.flume.api.RpcClient;
    import org.apache.flume.api.RpcClientFactory;
    import org.apache.flume.event.EventBuilder;
    import java.nio.charset.Charset;

    public class FlumeTransport {
    public static void main(String[] args) {
    MyRpcClientFacade client = new MyRpcClientFacade();
    // Initialize client with the remote Flume agent's host and port
    client.init("172.24.***.***", 41414);

    // Send 10 events to the remote Flume agent. That agent should be
    // configured to listen with an AvroSource.
    String sampleData = "Hello Flume!";
    for (int i = 0; i < 10; i++) {
    client.sendDataToFlume(sampleData);
    }

    client.cleanUp();
    }
    }

    class MyRpcClientFacade {
    private RpcClient client;
    private String hostname;
    private int port;

    public void init(String hostname, int port) {
    // Setup the RPC connection
    this.hostname = hostname;
    this.port = port;
    this.client = RpcClientFactory.getDefaultInstance(hostname, port);
    // Use the following method to create a thrift client (instead of the above line):
    // this.client = RpcClientFactory.getThriftInstance(hostname, port);
    }

    public void sendDataToFlume(String data) {
    // Create a Flume Event object that encapsulates the sample data
    Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));

    // Send the event
    try {
    client.append(event);
    } catch (EventDeliveryException e) {
    // clean up and recreate the client
    System.out.println(e.getMessage());
    client.close();
    client = null;
    client = RpcClientFactory.getDefaultInstance(hostname, port);
    // Use the following method to create a thrift client (instead of the above line):
    // this.client = RpcClientFactory.getThriftInstance(hostname, port);
    }
    }

    public void cleanUp() {
    // Close the RPC connection
    client.close();
    }

    }

  • 当我运行应用程序时出现异常
    13/2-14:57:14,202 WARN : o.a.f.a.NettyAvroRpcClient - Invalid value for batchSize: 0; Using default value.
    13/2-14:57:14,209 WARN : o.a.f.a.NettyAvroRpcClient - Using default maxIOWorkers
    NettyAvroRpcClient { host: quickstart.cloudera.*******.com.ua, port: 41414 }: Failed to send event

    最佳答案

    您的 Avro RPC 客户端无法连接到您的水槽代理。检查/var/log/flume-ng/flume.log 中的日志文件以了解发生了什么。您的代理可能无法绑定(bind)到接口(interface)。考虑更换

    tier1.sources.source1.bind     = 172.24.***.***


    tier1.sources.source1.bind     = 0.0.0.0

    它有效地绑定(bind)到所有接口(interface)。在本地尝试 telnet 41414 以测试端口是否实际响应。

    关于java - 无法使用水槽从远程 HDFS 写入,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28500169/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com