gpt4 book ai didi

elasticsearch - 在 ES : index name is only evaluated on starting execution 中使用 Alpakka 索引的 Akka Streams

转载 作者:行者123 更新时间:2023-11-29 02:53:25 28 4
gpt4 key购买 nike

我使用 Akka Streams 和 Alpakka 编写了一些代码,这些代码从 Amazon SQS 读取数据并在 Elasticsearch 中为事件编制索引。一切顺利,性能也很棒,但索引名称有问题。我有这段代码:

class ElasticSearchIndexFlow(restClient: RestClient) {

private val elasticSettings = ElasticsearchSinkSettings(bufferSize = 10)

def flow: Flow[IncomingMessage[DomainEvent, NotUsed], Seq[IncomingMessageResult[DomainEvent, NotUsed]], NotUsed] =
ElasticsearchFlow.create[DomainEvent](index, "domain-event", elasticSettings)(
restClient,
DomainEventMarshaller.domainEventWrites
)

private def index = {
val now = DateTime.now()
s"de-${now.getYear}.${now.getMonthOfYear}.${now.getDayOfMonth}"
}
}

问题是在运行流程几天后,索引名称没有改变。我想象 Akka Streams 在幕后创建了一个融合的 actor,并且仅在执行开始时评估用于获取索引名称的函数 index

知道如何根据当前日期使用索引名称对 ES 中的事件进行索引吗?

最佳答案

问题的解决方案是在上一步中使用 IncomingMessage.withIndexName 设置索引名称

所以:

def flow: Flow[(DomainEvent, Message), IncomingMessage[DomainEvent, Message], NotUsed] =
Flow[(DomainEvent, Message)].map {
case (domainEvent, message) =>
IncomingMessage(Some(domainEvent.eventId), domainEvent, message)
.withIndexName(indexName(domainEvent.ocurredOn))
}

并且:

def flow: Flow[IncomingMessage[DomainEvent, NotUsed], Seq[IncomingMessageResult[DomainEvent, NotUsed]], NotUsed] =
ElasticsearchFlow.create[DomainEvent]("this-index-name-is-not-used", "domain-event", elasticSettings)(
restClient,
DomainEventMarshaller.domainEventWrites
)

关于elasticsearch - 在 ES : index name is only evaluated on starting execution 中使用 Alpakka 索引的 Akka Streams,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49900221/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com