본문 바로가기

오픈소스프레임워크/RabbitMQ

[RabbitMQ] Java Client API Guide

RabbitMQ 튜토리얼 정리

 

http://www.rabbitmq.com/getstarted.html

 

P: Producer(메시지를 삽입하는 프로그램) 

    ex) 웹어플리케이션

C: Consumer(큐에서 메시지를 꺼내는 프로그램)

    ex) 백그라운드에서 동작하는 데몬

X: 메시지를 큐에 전달(Exchange)할 때  규칙

Key: 규칙과 큐를 binding하는 인자값(Routing Key)

 

자료출처: http://www.rabbitmq.com/getstarted.html

            http://ir.bagesoft.com/643

 

 

Hello World


 

C가 1개이므로 작업(메시지 처리)가 분산되지 않아, 실제로 사용할 일은 많지 않다.
다만 로직을 P와 C로 분리하여, 오래 걸리는 작업은 C가 담당하면 사용자 입장에서의 응답속도가 빨라질 것이다.

 


 

 

[Producer]
  
//필요한 클래스를 import한다.
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
public class Send { 
 // 사용할 큐의 이름을 지정한다.  
  private final static String QUEUE_NAME = "hello"; 
  public static void main(String[] argv) throws Exception {   

    //서버와의 연결을 설정한다.  	          
    ConnectionFactory factory = new ConnectionFactory();    
    factory.setHost("localhost");    
    Connection connection = factory.newConnection();    
    Channel channel = connection.createChannel();     

    // 전송에 사용할 큐를 선언한다.    
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);    
    String message = "Hello World!";    

    // 큐에 메시지를 게재한다.     
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes());    
    System.out.println(" [x] Sent '" + message + "'");   

    // 채널과 연결을 닫는다.    
    channel.close();    
    connection.close();  
  }
}    

[Consumer]

  
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class Recv {    
     private final static String QUEUE_NAME = "hello";    
     public static void main(String[] argv) throws Exception {    
     
     //서버와의 연결을 설정한다.     
     ConnectionFactory factory = new ConnectionFactory();    
     factory.setHost("localhost");    
     Connection connection = factory.newConnection();    
     Channel channel = connection.createChannel();    

     // 전송에 사용할 큐를 선언한다.    
     channel.queueDeclare(QUEUE_NAME, false, false, false, null);    
     System.out.println(" [*] Waiting for messages. To exit press CTRL+C");        
  
     // 큐로부터 메시지를 배달해달라고 등록한다.
     QueueingConsumer consumer = new QueueingConsumer(channel);    
     channel.basicConsume(QUEUE_NAME, true, consumer);        
  
     while (true) {      

        // 큐에 게시된 메시지가 있으면 읽어온다.      
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();      
        String message = new String(delivery.getBody());      
        System.out.println(" [x] Received '" + message + "'");    
      }  
    }
}
 

Work Queues (Round-Robin)


큐에서 순서대로 메시지를 읽어서 C1, C2, ... 순서대로 번갈아 수행한다.  따라서 C1과 C2가 다른 메시지를 받지만 하는 일은 같다.

 


 

[Producer]
  
import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

public class NewTask {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) 
                      throws java.io.IOException {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

    String message = getMessage(argv);

    channel.basicPublish( "", TASK_QUEUE_NAME, 
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");

    channel.close();
    connection.close();
  }      
  //...
}
[Consumer] 
  
import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class Worker {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv)
                      throws java.io.IOException,
                      java.lang.InterruptedException {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    channel.basicQos(1);

    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(TASK_QUEUE_NAME, false, consumer);

    while (true) {
      QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      String message = new String(delivery.getBody());

      System.out.println(" [x] Received '" + message + "'");   
      doWork(message); 
      System.out.println(" [x] Done" );

      channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }
  }
  //...
}

 

Publish/Subscribe (FanOut)


X(fanout)에 의해, 여러 큐에 모두 동일한 메시지가 삽입된다. 따라서 C1, C2가 모두 동일한 메시지를 받지만 하는 일은 다르다. Hellow World 예제에서 C가 하던 일을 C1, C2로 분리했다고 생각할 수 도 있다.

 

exchange의 타입에는 direct, topic, headers, fanout이 있다.

fanout타입은 exchange가 알고 있는 모든 큐에 메시지를 전달하는 방식이다.

 

 

 

[Producer]
import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class EmitLog {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv)
                  throws java.io.IOException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
    //...
}

[Consumer]

import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class ReceiveLogs {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv)
                  throws java.io.IOException,
                  java.lang.InterruptedException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        String queueName = channel.queueDeclare().getQueue();

       // exchange와 queue을 연결한다.
       channel.queueBind(queueName, EXCHANGE_NAME, "");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());

            System.out.println(" [x] Received '" + message + "'");
        }
    }
}

 

Routing (Direct, Exact Matching)


선택적으로 메시지를 보낸다.

 

C가 X(direct)와 큐를 binding할 때, Key를 명시하고, 메시지의 Key와 비교해서 동일한(complete matching)경우만 각 큐에 삽입된다. 

 

Key가 error일 때는 두 큐에 모두 삽입되고,  
Key가 info, warning일 경우에는 두번 째 큐에만 삽입된다.
그 외의 Key는 큐에 삽입되지 않는다. 즉 그 외의 key를 가진 메시지는 버려진다.

 

C1은 중대한 오류(error)에 대해서만 처리하고, C2는 모든 오류(info, error, warning)에 대해서 처리한다.
따라서 C1, C2는 다른 메시지를 받으며, 하는 일도 다르다.

 


 

[Producer]

public class EmitLogDirect {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv)
                  throws java.io.IOException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        String severity = getSeverity(argv);
        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
        System.out.println(" [x] Sent '" + severity + "':'" + message + "'");

        channel.close();
        connection.close();
    }
    //..
}

[Consumer]

public class ReceiveLogsDirect {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv)
                  throws java.io.IOException,
                  java.lang.InterruptedException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String queueName = channel.queueDeclare().getQueue();

        if (argv.length < 1){
            System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
            System.exit(1);
        }

        for(String severity : argv){
        // EXCHANGE_NAME을 가지는 direct exchange에서 아규먼트로 넘어오는 serverity의 값을 
        // routingKey로 가지는 메시지만 queueName을 가지는 큐에 전송된다.
         channel.queueBind(queueName, EXCHANGE_NAME, severity);
        }

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            String routingKey = delivery.getEnvelope().getRoutingKey();

            System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
        }
    }
}

 

Topics (Pattern Matching)


Routing 예제와 비슷하지만,  Key를 비교할 때, complete matching이 아니라 pattern matching을 사용하여 유연성과 확장성이 제공된다.

 

Key는 .(마침표)로 구분되어 word(토큰)로 나누어지진다.
각 word에 대하여 *는 한개이상의 임의의 단어를, #는 0개이상의 임의의 단어를 의미한다.

 

<주의>마침표(.)은 단어 구분자의 역할을 하며 X가 비교할때는 포함되지 않는다.

 


[Producer]
 public class EmitLogTopic {

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv)
                  throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 큐의 이름과 타입을 지정
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        // 라우팅 키 설정
        String routingKey = getRouting(argv);
        String message = getMessage(argv);
 
        // MQ에 메시지 전송
       channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
        System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");

        connection.close();
    }
    //...
} 

 

[Consumer]

 public class ReceiveLogsTopic {

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv)
                  throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String queueName = channel.queueDeclare().getQueue();

        if (argv.length < 1){
            System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
            System.exit(1);
        }

        for(String bindingKey : argv){
            channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
        }

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            String routingKey = delivery.getEnvelope().getRoutingKey();

            System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
        }
    }
}


 

RPC (Remote Procedure Call)


 RPC(Remote Procedure Call)를 사용하는 경우이며, 이 소스는 Java로 컨버팅하지 않았다.


 


저작자표시 비영리 변경금지 (새창열림)

'오픈소스프레임워크 > RabbitMQ' 카테고리의 다른 글

[RabbitMQ] 관리용 웹 설치  (0) 2013.05.20
[RabbitMQ] Aspiring Craftsmanpursuing well-crafted software 링크  (0) 2013.05.20
[RabbitMQ] Java Client API Guide ___ www.rabbitmq.com  (0) 2013.05.20
[RabbitMQ] JMS 먼저 정리하고...  (0) 2013.05.15
CentOS 6에서 RabbitMQ 3.1.0 설치 하기  (0) 2013.04.29