gpt4 book ai didi

java - AWS SqsListener 使用 Jackson 反序列化自定义对象

转载 作者:行者123 更新时间:2023-12-02 09:34:54 29 4
gpt4 key购买 nike

当我尝试接收自定义对象时,我只能监听字符串,它会抛出以下错误。看来我需要教 Spring 处理我的自定义对象(B2BOrder)

org.springframework.messaging.converter.MessageConversionException:无法从 [java.lang.String] 转换为 [br.com.b2breservas.api.model.B2BOrder] for GenericMessage [payload={"comments": “95d29059-8552-42fa-8fd9-a1d776416269”},

我的SQSConfig

@Configuration
@EnableSqs
public class SqsConfig {

private static final String DEFAULT_THREAD_NAME_PREFIX = ClassUtils.getShortName(SimpleMessageListenerContainer.class) + "-";

@Bean
public QueueMessagingTemplate myMessagingTemplate(AmazonSQSAsync amazonSqs, ResourceIdResolver resolver) {
ObjectMapper mapper = new ObjectMapper()
.registerModule(new ParameterNamesModule())
.registerModule(new Jdk8Module())
.registerModule(new JodaModule())
.registerModule(new JavaTimeModule());
// configure the Jackson mapper as needed
// maybe I need to do something here!

MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setSerializedPayloadClass(String.class);
converter.setStrictContentTypeMatch(false);
converter.setObjectMapper(mapper);

return new QueueMessagingTemplate(amazonSqs, resolver, converter);
}

@Bean
public ClientConfiguration sqsClientConfiguration() {
return new ClientConfiguration()
.withConnectionTimeout(30000)
.withRequestTimeout(30000)
.withClientExecutionTimeout(30000);
}

@Bean
public ExecutorFactory sqsExecutorFactory() {
return new ExecutorFactory() {
@Override
public ExecutorService newExecutor() {
return new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
};
}

@Value("${b2b.b2b.accesstoken}")
public String accesstoken;

@Value("${b2b.b2b.secretkey}")
public String secretkey;

@Bean
public AmazonSQSAsync amazonSqs(ClientConfiguration sqsClientConfiguration, ExecutorFactory sqsExecutorFactory) {
BasicAWSCredentials credential = new BasicAWSCredentials(accesstoken, secretkey);
return AmazonSQSAsyncClientBuilder.standard()
.withClientConfiguration(sqsClientConfiguration)
.withExecutorFactory(sqsExecutorFactory)
// .withEndpointConfiguration(sqsEndpointConfiguration)
// .withCredentials(credentialsProvider)
.withCredentials(new AWSStaticCredentialsProvider(credential))
.build();
}


@Bean
public AsyncTaskExecutor queueContainerTaskEecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setThreadNamePrefix(DEFAULT_THREAD_NAME_PREFIX);
threadPoolTaskExecutor.setCorePoolSize(2);
threadPoolTaskExecutor.setMaxPoolSize(2);
// No use of a thread pool executor queue to avoid retaining message to long in memory
threadPoolTaskExecutor.setQueueCapacity(0);
threadPoolTaskExecutor.afterPropertiesSet();
return threadPoolTaskExecutor;
}

@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSqs, AsyncTaskExecutor queueContainerTaskEecutor) {
SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
factory.setAmazonSqs(amazonSqs);
factory.setAutoStartup(true);
// factory.setQueueMessageHandler();
factory.setMaxNumberOfMessages(1);
factory.setWaitTimeOut(20);
factory.setTaskExecutor(queueContainerTaskEecutor);
return factory;
}

}

监听器

@Component
public class SqsHub {


@SqsListener(
"https://sqs.us-west-2.amazonaws.com/3234/32443-checkout.fifo"
)
public void listen(B2BOrder message) {
// public void listen(String message) { THIS WORKS!!
System.out.println("!!!! received message {} {}" + message.toString());
}
}

正在发送

    ....
@Autowired
AmazonSQSAsync amazonSqs;

@GetMapping("/yay")
public String yay() {
try {
B2BOrder pendingOrder = new B2BOrder();
pendingOrder.setComments(UUID.randomUUID().toString());
String pendingOrderJson = objectMapper.writeValueAsString(pendingOrder);
QueueMessagingTemplate queueMessagingTemplate = new QueueMessagingTemplate(amazonSqs);
Map<String, Object> headers = new HashMap<>();
headers.put(SqsMessageHeaders.SQS_GROUP_ID_HEADER, "my-application");
headers.put(SqsMessageHeaders.SQS_DEDUPLICATION_ID_HEADER, UUID.randomUUID().toString());
queueMessagingTemplate.convertAndSend("booking-checkout.fifo", pendingOrderJson, headers);
} catch (final AmazonClientException | JsonProcessingException ase) {
System.out.println("Error Message: " + ase.getMessage());
}
return "sdkjfn";
}
....

简单的自定义对象


public class B2BOrder implements Serializable {

@JsonProperty
private String comments;

}

更新

@Michiel 的回答把我带到了这里,但仍然遇到同样的错误。

    @Autowired
public ObjectMapper objectMapper;


@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSqs, AsyncTaskExecutor queueContainerTaskEecutor) {
SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
factory.setAmazonSqs(amazonSqs);
factory.setAutoStartup(true);
QueueMessageHandlerFactory queueMessageHandlerFactory = new QueueMessageHandlerFactory();
queueMessageHandlerFactory.setAmazonSqs(amazonSqs);
MappingJackson2MessageConverter jsonMessageConverter = new MappingJackson2MessageConverter();
jsonMessageConverter.setObjectMapper(objectMapper);
queueMessageHandlerFactory.setMessageConverters(Collections.singletonList(jsonMessageConverter));
factory.setQueueMessageHandler(queueMessageHandlerFactory.createQueueMessageHandler());
// factory.setMaxNumberOfMessages(1);
factory.setWaitTimeOut(20);
factory.setTaskExecutor(queueContainerTaskEecutor);
return factory;
}
```

最佳答案

尽管您已经注册了 MessageConverter,但它仅配置为在传出请求中使用(使用 QueueMessagingTemplate)。您的 MessageListener 没有 MessageConverter 配置。因此,传入消息只能以“原始”类型(例如字符串)形式检索。

在您的代码片段中,您注释了以下代码行:

//        factory.setQueueMessageHandler();

您可以在此处设置 QueueMessageHandler,该队列本身附加了一个或多个 MessageConverters

[编辑]当然:

QueueMessageHandlerFactory handlerFactory = new QueueMessageHandlerFactory();
handlerFactory.setMessageConverters(yourJacksonConfig);
QueueMessageHandler messageHandler = handlerFactory.createQueueMessageHandler();
factory.setQueueMessageHandler(messageHandler);

这个Spring documentation可能会有所帮助。

关于java - AWS SqsListener 使用 Jackson 反序列化自定义对象,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57613779/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com