gpt4 book ai didi

java - java中配置Logstash从socket接收数据并插入到Elasticsearch中

转载 作者:行者123 更新时间:2023-12-01 19:20:45 34 4
gpt4 key购买 nike

我想将数据直接注入(inject)ElasticSearch来执行一些性能测试。我的第一个想法是为每个文档创建 JSON 文件并将文件导入 ElasticSearch,但这会花费太长时间。我测试了 110K 文件,创建文件只花了 18 分钟,我需要 55M 文档 - 这是我测试的 500 倍。快速计算一下:需要150个小时,也就是6.25天,太长了。第二个选项是当我将 JSON 放入搅拌中并使用 Logstash 将字符串注入(inject) ElasticSearch 时停止。但是,我遇到了一个异常(exception):

2019-12-16 13:49:27,240 | Timer-0 | ERROR | search-injector | c.n.es.injector.output.SocketOutput | SocketOutput::output: 
java.net.SocketException: Software caused connection abort: socket write error
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
at java.net.SocketOutputStream.write(SocketOutputStream.java:134)
at java.io.DataOutputStream.writeBytes(DataOutputStream.java:276)
at com.beniregev.es.injector.output.SocketOutput.output(SocketOutput.java:39)
at com.beniregev.es.injector.policies.UpdateOutputHandlers.run(UpdateOutputHandlers.java:60)
at java.util.TimerThread.mainLoop(Timer.java:555)
at java.util.TimerThread.run(Timer.java:505)

ElasticSearchlocalhost 端口 9200 上运行,Logstashlocalhost 端口 9600 上运行。我的SocketOutput.java

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;

import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;

public class SocketOutput implements OutputHandler {

private static final Logger log = LoggerFactory.getLogger(SocketOutput.class);

public static final String CLI_OPTION = "socket";

@Value("${socket.hostname}")
private String hostname;
@Value("${socket.port}")
private int port;
Socket clientSocket;

public boolean open() {
try {
clientSocket = new Socket(hostname, port);
} catch (IOException ioe) {
log.error("", ioe);
return false;
}
return true;
}

@Override
public void output(String data) {

DataOutputStream outToServer = null;
try {
outToServer = new DataOutputStream(clientSocket.getOutputStream());
outToServer.writeBytes( data );
} catch (IOException ioe) {
log.error("", ioe);
}
}

}

logstash-simple.conf

# Simple Logstash configuration for creating a simple
# Stdin -> Logstash -> Elasticsearch pipeline.
input { stdin { } }

output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
}
stdout { codec => rubydebug }
}

我使用以下命令运行 Logstash:bin/logstash.bat -f config/logstash-simple.conf

JSON 字符串已创建且有效,Socket 获取正确的参数值(主机名=“localhost”和端口=9600)。我将不胜感激任何帮助。

最佳答案

该解决方案分为几个层次,首先是使用 Kibana Manager 在 Elastic Search 中创建和使用正确的索引,然后正确配置 Logstash,最后是使用JSON 字符串而不是文件。

创建索引不是本题的问题,所以我不会讨论它。

Logstash.conf(配置文件):

#################################################
# Stdin -> Logstash -> Elastic Search pipeline.
#################################################
input {
stdin{}
tcp{
host => "localhost"
port => 9600
codec => json
}
}

filter
{
mutate
{
remove_field => ["host", "@version", "@timestamp", "port", "tags", "level", "logger_name", "themessage", "mensage", "spring.application.name", "level_value", "thread_name"]
}
}

output {
stdout{ codec => rubydebug }

elasticsearch{
hosts => ["localhost:9200"]
index => ["my-index"]
}
}

注意:index中输入您创建并将使用的索引的名称。

使用配置文件运行Logstash:bin/logstash.bat -f config/logstash-simple.conf

在Java中将字符串输出到Logstash:

package com.beniregev.injector.output;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;

import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;

public class SocketOutput implements OutputHandler {

private static final Logger log = LoggerFactory.getLogger(SocketOutput.class);

public static final String CLI_OPTION = "socket";
private int outputIndex = 0;

@Value("${socket.hostname}")
private String hostname;
@Value("${socket.port}")
private int port;
Socket clientSocket;

public boolean open() {
try {
clientSocket = new Socket(hostname, port);
} catch (IOException ioe) {
log.error("", ioe);
return false;
}
return true;
}

@Override
public void output(String data) {
DataOutputStream outToServer = null;
try {
outToServer = new DataOutputStream(clientSocket.getOutputStream());
outToServer.writeBytes( data );
outputIndex++;
} catch (IOException ioe) {
log.error("", ioe);
}
System.out.println("Wrote segment " + outputIndex + " to socket");

}

}

hostport 指向 Logstash,默认端口=9600。

这解决了我的问题。

关于java - java中配置Logstash从socket接收数据并插入到Elasticsearch中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59357125/

34 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com