gpt4 book ai didi

php - symfony2 FOSElasticaBundle:当Elasticsearch服务关闭时,如何推迟文档更新?

转载 作者:行者123 更新时间:2023-12-03 02:03:13 25 4
gpt4 key购买 nike

我正在使用symfony2和FOSElasticaBundle。

我的Elasticsearch服务经常由于未知原因而被杀死或失败。我已经使用restart always将systemctl放置在适当的位置,以作为临时解决方案。

尽管如此,如果失败了,当学说更新实体时执行索引更新的elasticsearch侦听器会给我一个错误52:

Couldn't connect to host, Elasticsearch down?



因此,如果还使用FOSUserBundle来更新上次用户连接日期,则会在记录时发生。在elasticsearch上如此依赖,真是太烦人了。我已针对此错误设置了异常侦听器,但我希望在服务再次可用时,让 bundle 包在以后保留更新。

查看 bundle 文件,我发现:

vendor/friendsofsymfony/elastica-bundle/Persister/ObjectPersister.php


public function replaceMany(array $objects)
{
$documents = array();
foreach ($objects as $object) {
$document = $this->transformToElasticaDocument($object);
$document->setDocAsUpsert(true);
$documents[] = $document;
}

try {
$this->type->updateDocuments($documents);
} catch (BulkException $e) {
$this->log($e);
}
}

这是一项服务,我跳入了以下内容,但该类是另一个继承的类,并且实例化了一个子类,而不是将其称为服务,因此我看不到如何覆盖它。我怎么能 ?
    try {
$this->type->updateDocuments($documents);
} catch (\Exception $e) {
if ($e instanceof BulkException)
{
$this->log($e);
}
elseif ($e->getMessage() != "Couldn't connect to host, Elasticsearch down?")
{
throw $e;
}
}

然后,如何确保下次服务可用时会更新文档?

编辑:

出现错误时的跟踪:
Stack Trace
in vendor/ruflin/elastica/lib/Elastica/Transport/Http.php at line 153 -
}
if ($errorNumber > 0) {
throw new HttpException($errorNumber, $request, $response);
}
return $response;
at Http ->exec (object(Request), array('connection' => array('config' => array('headers' => array()), 'host' => 'localhost', 'port' => '9200', 'logger' => 'fos_elastica.logger', 'enabled' => true)))
in vendor/ruflin/elastica/lib/Elastica/Request.php at line 167 +
at Request ->send ()
in vendor/ruflin/elastica/lib/Elastica/Client.php at line 587 +
at Client ->request ('_bulk', 'PUT', '{"update":{"_index":"foodmeup","_type":"user","_id":4}} {"doc":{"firstName":"Dominique","lastName":"Descamps","content":null,"username":"ddescamps","email":"ddescamps@ebp-paris.com","jobSeeker":{"skills":[],"experiences":[],"trainings":[]}},"doc_as_upsert":true} ', array())
in vendor/friendsofsymfony/elastica-bundle/Elastica/Client.php at line 47 +
at Client ->request ('_bulk', 'PUT', '{"update":{"_index":"foodmeup","_type":"user","_id":4}} {"doc":{"firstName":"Dominique","lastName":"Descamps","content":null,"username":"ddescamps","email":"ddescamps@ebp-paris.com","jobSeeker":{"skills":[],"experiences":[],"trainings":[]}},"doc_as_upsert":true} ', array())
in vendor/ruflin/elastica/lib/Elastica/Bulk.php at line 342 +
at Bulk ->send ()
in vendor/ruflin/elastica/lib/Elastica/Client.php at line 270 +
at Client ->updateDocuments (array(object(Document)))
in vendor/ruflin/elastica/lib/Elastica/Index.php at line 131 +
at Index ->updateDocuments (array(object(Document)))
in vendor/ruflin/elastica/lib/Elastica/Type.php at line 174 +
at Type ->updateDocuments (array(object(Document)))
in vendor/friendsofsymfony/elastica-bundle/Persister/ObjectPersister.php at line 144 +
at ObjectPersister ->replaceMany (array(object(User)))
in vendor/friendsofsymfony/elastica-bundle/Doctrine/Listener.php at line 151 +
at Listener ->persistScheduled ()
in vendor/friendsofsymfony/elastica-bundle/Doctrine/Listener.php at line 182 +
at Listener ->postFlush (object(PostFlushEventArgs))
in vendor/symfony/symfony/src/Symfony/Bridge/Doctrine/ContainerAwareEventManager.php at line 63 +
at ContainerAwareEventManager ->dispatchEvent ('postFlush', object(PostFlushEventArgs))
in vendor/doctrine/orm/lib/Doctrine/ORM/UnitOfWork.php at line 3318 +
at UnitOfWork ->dispatchPostFlushEvent ()
in vendor/doctrine/orm/lib/Doctrine/ORM/UnitOfWork.php at line 428 +
at UnitOfWork ->commit (null)
in vendor/doctrine/orm/lib/Doctrine/ORM/EntityManager.php at line 357 +
at EntityManager ->flush (null)
in src/AppBundle/Model/Classes/CustomBaseController.php at line 61 +
at CustomBaseController ->flush ()
in src/AppBundle/Controller/Core/VoteController.php at line 68 +
at VoteController ->voteAction (object(Request), 'up', 'Post', 'permettre-le-partage-de-documents-avec-les-equipes')
at call_user_func_array (array(object(VoteController), 'voteAction'), array(object(Request), 'up', 'Post', 'permettre-le-partage-de-documents-avec-les-equipes'))
in app/bootstrap.php.cache at line 3029 +
at HttpKernel ->handleRaw (object(Request), '1')
in app/bootstrap.php.cache at line 2991 +
at HttpKernel ->handle (object(Request), '1', true)
in app/bootstrap.php.cache at line 3140 +
at ContainerAwareHttpKernel ->handle (object(Request), '1', true)
in app/bootstrap.php.cache at line 2384 +
at Kernel ->handle (object(Request))
in web/app_dev.php at line 36 +

最佳答案

消息队列完全符合您的要求。每当您的模型更新时,您都会向MQ发送一条消息。 Web流程就是这样。然后,您有一个工作池,这些工作池使用来自MQ的消息并试图更新ES索引。如果ES立即关闭,则将出现异常,工作线程死亡,消息返回队列。因此,一旦ES成为在线工作人员,消息仍然在MQ中。

Same模式不仅可以用于ES,而且可以用于任何其他第三方服务。例如,您要发送非常重要的电子邮件,但邮件服务器已关闭,因此您迫不及待必须将响应发送给客户。因此,将其放到MQ上,然后让经纪人和工作人员完成工作。

这是使用enqueue MQ库如何完成代码的代码。安装和配置为pretty easy to do,因此我将跳过它。

标准侦听器必须替换为发送消息的侦听器:

<?php
use Enqueue\Client\ProducerInterface;

class ElasticaUpdateIndexListener
{
private $producer;

public function __construct(ProducerInterface $producer)
{
$this->producer = $producer;
}

public function postPersist(LifecycleEventArgs $eventArgs)
{
$entity = $eventArgs->getObject();

$this->producer->sendCommand('elastica_index_entity', [
'entity' => $entity->getId(),
'type' => 'insert'
]);
}

public function postUpdate(LifecycleEventArgs $eventArgs)
{
$entity = $eventArgs->getObject();

$this->producer->sendCommand('elastica_index_entity', [
'entity' => $entity->getId(),
'type' => 'update'
]);
}

public function preRemove(LifecycleEventArgs $eventArgs)
{
$entity = $eventArgs->getObject();

$this->producer->sendCommand('elastica_index_entity', [
'entity' => $entity->getId(),
'type' => 'delete'
]);
}
}

此消息的处理器如下所示:
<?php

class ElasticaUpdateIndexProcessor implements PsrProcessor, CommandSubscriberInterface
{
private $doctrine;

protected $objectPersister;

protected $propertyAccessor;

private $indexable;

public function __construct(Registry $doctrine, ObjectPersisterInterface $objectPersister, IndexableInterface $indexable)
{
$this->indexable = $indexable;
$this->objectPersister = $objectPersister;
$this->propertyAccessor = PropertyAccess::createPropertyAccessor();
$this->doctrine = $doctrine;
}

public function process(PsrMessage $message, PsrContext $context)
{
$data = JSON::encode($message->getBody());

if ($data['type'] == 'delete') {
$this->objectPersister->deleteManyByIdentifiers([$data['entityId']]);

return self::ACK;
}

if (false == $entity = $this->doctrine->getManagerForClass($data['entityClass'])->find($data['entityId'])) {
return self::REJECT;
}

if (false == ($this->objectPersister->handlesObject($entity) && $this->isObjectIndexable($entity))) {
return self::ACK;
}

if ($data['type'] == 'insert') {
$this->objectPersister->insertMany([$this->scheduledForInsertion]);

return self::ACK;
}

if ($data['type'] == 'update') {
$this->objectPersister->replaceMany([$this->scheduledForInsertion]);

return self::ACK;
}

return self::REJECT;
}

private function isObjectIndexable($object)
{
return $this->indexable->isObjectIndexable(
$this->config['indexName'],
$this->config['typeName'],
$object
);
}

public static function getSubscribedCommand()
{
return 'elastica_index_entity';
}
}

并运行一些 worker :
./bin/console enqueue:consume --setup-broker -vvv 

关于php - symfony2 FOSElasticaBundle:当Elasticsearch服务关闭时,如何推迟文档更新?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29715125/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com