1、背景
Springboot4RabbitMQ为消息的生产者
RabbitProducer为消息的消费者
消息队列为:order-queue
2、问题
生产者生产的消息发送到RabbitMQ后,消费者在读取消息消息时反序列化消息失败
消息类:
public class Order implements Serializable{ private static final long serialVersionUID = -7128203829971899888L; private String id; private String name; private String messageId; public String getId() { return id; } public void setId(String id) { this.id = id == null ? null : id.trim(); } public String getName() { return name; } public void setName(String name) { this.name = name == null ? null : name.trim(); } public String getMessageId() { return messageId; } public void setMessageId(String messageId) { this.messageId = messageId == null ? null : messageId.trim(); } @Override public String toString() { return "Order [id=" + id + ", name=" + name + ", messageId=" + messageId + "]"; }}
生产者发送消息后RabbitMQ中的数据为
可以看到order-queue队列中有一条待确认的消息,
消费者异常信息为:
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method could not be invoked with the incoming messageEndpoint handler details:Method [public void com.rabbit.producer.RabbitProducer.receiver.OrderRecevier.onOrderMessage(com.rabbit.producer.RabbitProducer.entity.Order,com.rabbitmq.client.Channel,java.util.Map) throws java.lang.Exception]Bean [com.rabbit.producer.RabbitProducer.receiver.OrderRecevier@600b7b3d] at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:185) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE] at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:120) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1414) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1337) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1324) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1303) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:817) [spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:801) [spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:77) [spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1042) [spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_71]Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.rabbit.Springboot4RabbitMQ.entity.Order] to [com.rabbit.producer.RabbitProducer.entity.Order] for GenericMessage [payload=Order [id=RabbitMQTestId0002, name=HelloWorld, messageId=1538919928275$0836e0e7-4976-457e-92fb-44b937255855], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=order.ABC, amqp_receivedExchange=order-exchange, amqp_deliveryTag=1, amqp_consumerQueue=order-queue, amqp_redelivered=false, id=0ffe4dcd-048f-f274-bca9-5550f9ecebb1, amqp_consumerTag=amq.ctag-82Oo3kl1I138E2pvVRsczA, contentType=application/x-java-serialized-object, timestamp=1538919929083}] at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:144) ~[spring-messaging-5.0.8.RELEASE.jar:5.0.8.RELEASE] at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:116) ~[spring-messaging-5.0.8.RELEASE.jar:5.0.8.RELEASE] at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:137) ~[spring-messaging-5.0.8.RELEASE.jar:5.0.8.RELEASE] at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:109) ~[spring-messaging-5.0.8.RELEASE.jar:5.0.8.RELEASE] at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:51) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE] at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:182) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE] ... 10 common frames omitted2018-10-07 21:45:29.111 WARN 5264 --- [cTaskExecutor-2] ingErrorHandler$DefaultExceptionStrategy : Fatal message conversion error; message rejected; it will be dropped or routed to a dead letter exchange, if so configured: (Body:'[B@75b3a922(byte[206])' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=order-exchange, receivedRoutingKey=order.ABC, deliveryTag=1, consumerTag=amq.ctag-82Oo3kl1I138E2pvVRsczA, consumerQueue=order-queue])
从异常信息中可以看到是消费者对消息反序列化的时候失败了。虽然两个项目中的Order类是完全一样的,但在进行反序列化的时候还是失败了
3、解决方案
方案1、消费者引用生产者项目中的消息体即Order.java
在消费者项目上【右键】->【Bulid Path】->【Configure Build Path】->【Projects】->【Add】 选择生产者项目,然后消费者项目就可以引用生产者项目中类,这样完全保证了两个项目中JavaBean是一致的,所以能解决反序列失败的问题
方案2、生产者在发送消息前将消息体转换为JSONObject,消费者以JSONObject接收消息,再转换为对应的JavaBean
4、测试结果
1)生产者将消息转换为JSONObject
public void sendOrder(Order order) throws Exception { rabbitTemplate.setConfirmCallback(confirmCallback); // 消息唯一ID CorrelationData correlationData = new CorrelationData(order.getMessageId()); rabbitTemplate.convertAndSend("order-exchange", "order.ABC", FastJsonConvertUtil.toJsonObject(order), correlationData); }
2) 消费者使用JSONObject接收消息再转换成对应的JavaBean
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}", durable = "${spring.rabbitmq.listener.order.queue.durable}"), exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}", durable = "${spring.rabbitmq.listener.order.exchange.durable}", type = "${spring.rabbitmq.listener.order.exchange.type}", ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationeExceptions}"), key = "${spring.rabbitmq.listener.order.key}")) public void onOrderMessage(@Payload JSONObject object, Channel channel, @Headers Mapheaders) throws Exception { System.err.println("----------------------------------"); Order order = JsonConvertUtils.convertJSONToObject(object); System.err.println("消费端Order: " + order.toString()); Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG); channel.basicAck(deliveryTag, false); }
3) postman发送用于生产的消息体:
4)消费者控制台成功打印消息体内容
项目的源代码地址:https://github.com/KillMonkey/Springboot4RabbitMQ