gpt4 book ai didi

python - 您如何将Elasticsearch Ingest附件处理器插件与Python软件包elasticsearch-dsl结合使用

转载 作者:行者123 更新时间:2023-12-02 22:46:09 38 4
gpt4 key购买 nike

我在尝试将Ingest Attachment Processor Plugin与ElasticSearch一起使用时遇到麻烦(在AWS上为5.5,在本地为5.6)。我正在Python(3.6)中进行开发,并且正在使用elasticsearch-dls library

我正在使用Persistence并进行如下的类设置:

import base64
from elasticsearch_dsl.field import Attachment, Text
from elasticsearch_dsl import DocType, analyzer

lower_keyword = analyzer('keyword', tokenizer="keyword", filter=["lowercase"])

class ExampleIndex(DocType):
class Meta:
index = 'example'
doc_type = 'Example'

id = Text()
name = Text(analyzer=lower_keyword)
my_file = Attachment()

然后,我有一个这样的函数,我可以调用该函数来创建索引并保存文档。
def index_doc(a_file):
# Ensure that the Index is created before any documents are saved
try:
i = Index('example')
i.doc_type(ExampleIndex)
i.create()

# todo - Pipeline creation needs to go here - But how do you do it?

except Exception:
pass

# Check for existing index
indices = ExampleIndex()
try:
s = indices.search()
r = s.query('match', name=a_file.name).execute()
if r.success():
for h in r:
indices = ExampleIndex.get(id=h.meta.id)
break
except NotFoundError:
pass
except Exception:
logger.exception("Something went wrong")
raise

# Populate the document
indices.name = a_file.name
with open(a_file.path_to_file, 'rb') as f:
contents = f.read()
indices.my_file = base64.b64encode(contents).decode("ascii")

indices.save(pipeline="attachment") if indices.my_file else indices.save()

我有一个文本文件,其内容为测试文档。当其内容以base64编码时,它们将变为VGhpcyBpcyBhIHRlc3QgZG9jdW1lbnQK

如果我直接使用CURL,那么它将起作用:

创建管道:
curl -XPUT 'localhost:9200/_ingest/pipeline/attachment?pretty' -H 'Content-Type: application/json' -d' {   "description" : "Extract attachment information",   "processors" : [
{
"attachment" : {
"field" : "my_file"
}
} ] }

放入数据
curl -XPUT 'localhost:9200/example/Example/AV9nkyJMZAQ2lQ3CtsLb?pipeline=attachment&pretty'\
-H 'Content-Type: application/json' \
-d '{"my_file": "VGhpcyBpcyBhIHRlc3QgZG9jdW1lbnQK"}'

获取数据
http://localhost:9200/example/Example/AV9nkyJMZAQ2lQ3CtsLb?pretty
{
"_index" : "example",
"_type" : "Example",
"_id" : "AV9nkyJMZAQ2lQ3CtsLb",
"_version" : 4,
"found" : true,
"_source" : {
"my_file" : "VGhpcyBpcyBhIHRlc3QgZG9jdW1lbnQK",
"attachment" : {
"content_type" : "text/plain; charset=ISO-8859-1",
"language" : "en",
"content" : "This is a test document",
"content_length" : 25
}
}
}

麻烦的是我看不到如何使用elasticsearch-dsl Python库重新创建它

更新:
除了最初创建管道之外,我现在可以使所有工作正常。如果我使用CURL创建管道,则可以通过将.save()方法调用更改为.save(pipeline =“attachment”)来使用它。我已经更新了我以前的功能,以显示此功能并就需要创建管线的方式发表评论。

这是创建管道的CURL实现的示例
curl - XPUT 'localhost:9200/_ingest/pipeline/attachment?pretty' \
- H 'Content-Type: application/json' \
- d '"description": "Extract attachment information","processors": [{"attachment": {"field": "my_field"}}]}'

最佳答案

这个问题的答案是在使用低级elasticseatch.py​​库之前,使用IngestClient创建管道。

from elasticsearch.client.ingest import IngestClient
p = IngestClient(es_connection)
p.put_pipeline(id='attachment', body={
'description': "Extract attachment information",
'processors': [
{"attachment": {"field": "cv"}}
]
})

使用elasticsearch-dsl持久流(DocType)在ElasticSearch中创建管道,索引和文档的完整工作示例是:
import base64
from uuid import uuid4
from elasticsearch.client.ingest import IngestClient
from elasticsearch.exceptions import NotFoundError
from elasticsearch_dsl import analyzer, DocType, Index
from elasticsearch_dsl.connections import connections
from elasticsearch_dsl.field import Attachment, Text


# Establish a connection
host = '127.0.0.1'
port = 9200
es = connections.create_connection(host=host, port=port)

# Some custom analyzers
html_strip = analyzer('html_strip', tokenizer="standard", filter=["standard", "lowercase", "stop", "snowball"],
char_filter=["html_strip"])
lower_keyword = analyzer('keyword', tokenizer="keyword", filter=["lowercase"])


class ExampleIndex(DocType):
class Meta:
index = 'example'
doc_type = 'Example'

id = Text()
uuid = Text()
name = Text()
town = Text(analyzer=lower_keyword)
my_file = Attachment(analyzer=html_strip)


def save_document(doc):
"""

:param obj doc: Example object containing values to save
:return:
"""
try:
# Create the Pipeline BEFORE creating the index
p = IngestClient(es)
p.put_pipeline(id='myattachment', body={
'description': "Extract attachment information",
'processors': [
{
"attachment": {
"field": "my_file"
}
}
]
})

# Create the index. An exception will be raise if it already exists
i = Index('example')
i.doc_type(ExampleIndex)
i.create()
except Exception:
# todo - should be restricted to the expected Exception subclasses
pass

indices = ExampleIndex()
try:
s = indices.search()
r = s.query('match', uuid=doc.uuid).execute()
if r.success():
for h in r:
indices = ExampleIndex.get(id=h.meta.id)
break
except NotFoundError:
# New record
pass
except Exception:
print("Unexpected error")
raise

# Now set the doc properties
indices.uuid = doc.uuid
indices.name = doc.name
indices.town = doc.town
if doc.my_file:
with open(doc.my_file, 'rb') as f:
contents = f.read()
indices.my_file = base64.b64encode(contents).decode("ascii")

# Save the index, using the Attachment pipeline if a file was attached
return indices.save(pipeline="myattachment") if indices.my_file else indices.save()


class MyObj(object):
uuid = uuid4()
name = ''
town = ''
my_file = ''

def __init__(self, name, town, file):
self.name = name
self.town = town
self.my_file = file


me = MyObj("Steve", "London", '/home/steve/Documents/test.txt')

res = save_document(me)

关于python - 您如何将Elasticsearch Ingest附件处理器插件与Python软件包elasticsearch-dsl结合使用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46988307/

38 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com