I 'm testing rabbitmq and spring-ampq for a prototype. I have to use rabbitmq in an "rpc style" for some communications. I find out and do the rpc rabbit-mq tutorial (here) and finally does the same thing with spring amqp.
In this work the only point was to implement an equivalent of
QueueingConsumer consumer = new QueueingConsumer(channel);
I replace this object by my own implementation as i don't fonnd anything in spring-rabbit abstraction.
import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import org.springframework.amqp.core.Address; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import com.rabbitmq.client.Channel; public class ProducerMessageListener implements ChannelAwareMessageListener { ConnectionFactory connectionFactory; RemoteService callback; public BlockingQueue_queue = new LinkedBlockingQueue (); public ProducerMessageListener(RemoteService callback, String corrid) { this.callback = callback; } public ConnectionFactory getConnectionFactory() { return connectionFactory; } public void setConnectionFactory(ConnectionFactory connectionFactory) { this.connectionFactory = connectionFactory; } public void onMessage(Message message, Channel channel) throws Exception { System.out.println("message receive on blocking process"); try { Address adrs = message.getMessageProperties().getReplyTo(); byte[] corrid = message.getMessageProperties().getCorrelationId(); synchronized (_queue) { _queue.add(message); } } catch (Exception e) { e.printStackTrace(); } } }
To use it
ProducerMessageListener cml = new ProducerMessageListener(this, corrid); cml.setConnectionFactory(rabbitConnectionFactory); SimpleMessageListenerContainer asmlc = new SimpleMessageListenerContainer(); asmlc.setConnectionFactory(rabbitConnectionFactory); asmlc.setQueueNames(replyto); asmlc.setQueues(q); asmlc.setMessageListener(cml); asmlc.initialize(); asmlc.start(); rabbitTemplateConsumer.send(msg); //wait on blocking queue Message msgresponse=null; try { msgresponse = cml._queue.take(); System.out.println("got "+new String(msgresponse.getBody())); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } //CARREFULL // destroy asmlc when finish (gc..) // destruction is time consumming so we does in thread new Thread(new Runnable() { @Override public void run() { asmlc.stop(); asmlc.destroy(); } }).start(); return new String(msgresponse.getBody()); }