gpt4 book ai didi

symfony - 使用 Messenger 读取未使用 Messenger 发送的排队消息

转载 作者:行者123 更新时间:2023-12-04 15:38:49 29 4
gpt4 key购买 nike

我正在尝试读取未通过 Symfony Messenger 发送的排队消息(在 RabbitMQ 中)。似乎 Messenger 添加了一些标题,例如

headers: 
type: App\Message\Transaction

但是在读取外部消息时,此 header 不存在。

那么,有没有办法告诉 Messenger 队列 A 中的每条消息都必须被视为消息类型 Transaction ?

我今天所拥有的是:
framework:
messenger:
transports:
# Uncomment the following line to enable a transport named "amqp"
amqp:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
exchange:
name: messages
type: direct
queue:
name: queue_messages

routing:
# Route your messages to the transports
'App\Message\Transaction': amqp

我想补充的是:
        routing:
# Route your messages to the transports
amqp: 'App\Message\Transaction'

最佳答案

Ryan Weaver 在 symfony 的 slack 上回答了一个类似的问题 :

You will need a custom serializer for messenger if the messages do not originate from messenger :)

1) You create a custom serialize (implements SerializerInterface from Messenger) and configure it under the messenger config

2) Somehow in that serializer, you take JSON and turn it into some "message" object you have in your code. How you do that is up to you - you need to somehow be able to look at your JSON and figure out which message class it should be mapped to. You could then create that object manually and populate the data, or use Symfony's serializer. Wrap this in an Envelope before returning it

3) Because your serializer is now returning a "message" object if some sort, Messenger uses its normal logic to find the handler(s) for that Message and execute them



我根据自己的需要做了一个快速的实现,由你决定是否适合你的业务逻辑 :

1 - 创建一个 Serializer至实现 SerializerInterface :


// I keeped the default serializer, and just override his decode method.

/**
* {@inheritdoc}
*/
public function decode(array $encodedEnvelope): Envelope
{
if (empty($encodedEnvelope['body']) || empty($encodedEnvelope['headers'])) {
throw new InvalidArgumentException('Encoded envelope should have at least a "body" and some "headers".');
}

if (empty($encodedEnvelope['headers']['action'])) {
throw new InvalidArgumentException('Encoded envelope does not have an "action" header.');
}

// Call a factory to return the Message Class associate with the action
if (!$messageClass = $this->messageFactory->getMessageClass($encodedEnvelope['headers']['action'])) {
throw new InvalidArgumentException(sprintf('"%s" is not a valid action.', $encodedEnvelope['headers']['action']));
}

// ... keep the default Serializer logic

return new Envelope($message, ...$stamps);
}

2 - 检索权利 Message使用工厂:

class MessageFactory
{
/**
* @param string $action
* @return string|null
*/
public function getMessageClass(string $action)
{
switch($action){
case ActionConstants::POST_MESSAGE :
return PostMessage::class ;
default:
return null;
}
}
}

3) 为 messenger 配置新的自定义序列化程序:
framework:
messenger:
serializer: 'app.my_custom_serializer'

我会尝试更进一步,找到一种直接“连接”队列的方法,会让你知道。

关于symfony - 使用 Messenger 读取未使用 Messenger 发送的排队消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55577408/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com