Thursday, July 21, 2011

Rabbit MQ - spring-amqp : waiting response (RPC like style)

I 'm testing rabbitmq and spring-ampq for a prototype. I have to use rabbitmq in an "rpc style" for some communications. I find out and do the rpc rabbit-mq tutorial (here) and finally does the same thing with spring amqp.

In this work the only point was to implement an equivalent of  
QueueingConsumer consumer = new QueueingConsumer(channel);


I replace this object by my own implementation as i don't fonnd anything in spring-rabbit abstraction.

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;


import org.springframework.amqp.core.Address;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;


import com.rabbitmq.client.Channel;

public class ProducerMessageListener implements ChannelAwareMessageListener {
    ConnectionFactory connectionFactory;
    RemoteService callback;
   

    public BlockingQueue _queue = new LinkedBlockingQueue();
    

    public ProducerMessageListener(RemoteService callback, String corrid) {
        this.callback = callback;
    }



    public ConnectionFactory getConnectionFactory() {
        return connectionFactory;
    }

    public void setConnectionFactory(ConnectionFactory connectionFactory) {
        this.connectionFactory = connectionFactory;
    }

    

    public void onMessage(Message message, Channel channel) throws Exception {
        System.out.println("message receive on blocking process");
        try {
            Address adrs = message.getMessageProperties().getReplyTo();
            byte[] corrid = message.getMessageProperties().getCorrelationId();
            synchronized (_queue) {
                _queue.add(message);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}


To use it 
ProducerMessageListener cml = new ProducerMessageListener(this, corrid);
cml.setConnectionFactory(rabbitConnectionFactory);
SimpleMessageListenerContainer asmlc = new SimpleMessageListenerContainer();
asmlc.setConnectionFactory(rabbitConnectionFactory);
asmlc.setQueueNames(replyto);
asmlc.setQueues(q);
asmlc.setMessageListener(cml);
asmlc.initialize();
asmlc.start();
       
rabbitTemplateConsumer.send(msg);
//wait on blocking queue
Message msgresponse=null;
try {
            msgresponse = cml._queue.take();
            System.out.println("got "+new String(msgresponse.getBody()));
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
       
//CARREFULL
// destroy asmlc when finish (gc..)
// destruction is time consumming so we does in thread
        new Thread(new Runnable() {
          
            @Override
            public void run() {
                    asmlc.stop();
                    asmlc.destroy();
              
            }
        }).start();
 
return new String(msgresponse.getBody());
}

No comments:

Post a Comment