I 'm testing rabbitmq and spring-ampq for a prototype. I have to use rabbitmq in an "rpc style" for some communications. I find out and do the rpc rabbit-mq tutorial (here) and finally does the same thing with spring amqp.
In this work the only point was to implement an equivalent of
QueueingConsumer consumer = new QueueingConsumer(channel);
I replace this object by my own implementation as i don't fonnd anything in spring-rabbit abstraction.
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.springframework.amqp.core.Address;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import com.rabbitmq.client.Channel;
public class ProducerMessageListener implements ChannelAwareMessageListener {
ConnectionFactory connectionFactory;
RemoteService callback;
public BlockingQueue _queue = new LinkedBlockingQueue();
public ProducerMessageListener(RemoteService callback, String corrid) {
this.callback = callback;
}
public ConnectionFactory getConnectionFactory() {
return connectionFactory;
}
public void setConnectionFactory(ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
public void onMessage(Message message, Channel channel) throws Exception {
System.out.println("message receive on blocking process");
try {
Address adrs = message.getMessageProperties().getReplyTo();
byte[] corrid = message.getMessageProperties().getCorrelationId();
synchronized (_queue) {
_queue.add(message);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
To use it
ProducerMessageListener cml = new ProducerMessageListener(this, corrid);
cml.setConnectionFactory(rabbitConnectionFactory);
SimpleMessageListenerContainer asmlc = new SimpleMessageListenerContainer();
asmlc.setConnectionFactory(rabbitConnectionFactory);
asmlc.setQueueNames(replyto);
asmlc.setQueues(q);
asmlc.setMessageListener(cml);
asmlc.initialize();
asmlc.start();
rabbitTemplateConsumer.send(msg);
//wait on blocking queue
Message msgresponse=null;
try {
msgresponse = cml._queue.take();
System.out.println("got "+new String(msgresponse.getBody()));
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
//CARREFULL
// destroy asmlc when finish (gc..)
// destruction is time consumming so we does in thread
new Thread(new Runnable() {
@Override
public void run() {
asmlc.stop();
asmlc.destroy();
}
}).start();
return new String(msgresponse.getBody());
}