gpt4 book ai didi

elasticsearch - 将 AWS Kinesis Firehose 回填到 Elasticsearch Service 失败记录

转载 作者:行者123 更新时间:2023-11-29 02:50:35 26 4
gpt4 key购买 nike

我们有一个 firehose 可以将记录发送到 Elasticsearch 服务集群。我们的集群已满,一些记录已故障转移到 S3。 https://docs.aws.amazon.com/firehose/latest/dev/basic-deliver.html#retry 处的文档表示失败的记录可用于回填:“跳过的文档已传送到 elasticsearch_failed/文件夹中的 S3 存储桶,您可以将其用于手动回填”,但我无法找到有关如何完成此操作的任何文档.

查看记录,它们似乎是包含 JSON blob 的文本文件的 gzip 文件,“rawData”字段包含我们发送到 firehose 的原始记录的 base64 编码字符串。

是否有现成的工具来处理这些来自 S3 的 gzip 文件、分解它们并重新提交记录?该文档暗示您可以“仅手动回填”,这是一个非常标准化的流程,所以我假设以前有人这样做过,但我一直没能找到如何做的。

最佳答案

我想手动回填意味着使用其中一个 AWS SDK 将文档再次发送到 Elasticsearch。 python 中的一个示例(使用 boto3),从 S3 读取失败文件并将其中的文档发送到 Elasticsearch:

es_client = boto3.client('es', region_name=REGION, aws_access_key_id=ACCESS_KEY_ID, aws_secret_access_key=SECRET_ACCESS_KEY)
s3_client = boto3.client('s3', region_name=REGION, aws_access_key_id=ACCESS_KEY_ID, aws_secret_access_key=SECRET_ACCESS_KEY)

file = s3_client.get_object(Bucket=bucket, Key=key)
text = file['Body'].read().decode("utf-8")
failure_cases = list(map(lambda x: json.loads(x), filter(None, text.split('\n'))))

for case in failure_cases:
try:
data = base64.b64decode(case['rawData'])
es_instance.create(index=case['esIndexName'], id=case['esDocumentId'], body=data)
logger.debug("Successfully sent {}".format(case['esDocumentId']))
except RequestError:
logger.info("Retry failed for Document ID {}\nReason: {}"
.format(case['esDocumentId'], case['errorMessage']))

关于elasticsearch - 将 AWS Kinesis Firehose 回填到 Elasticsearch Service 失败记录,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49822554/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com