消息队列
生产者和消费者它相当于消息缓冲区,最多能存储多少数据只受限于机器的内存和磁盘。多个生产者可以发送消息给同一个队列,多个消费者也可以从同一个队列接收消息。Windows安装RabbitMQ参考mall商城学习教程的RabbitMQ部分内容:http://www./#/architect/mall_arch_09?id=rabbitmq原文中rabbitmq-server-3.7.14.exe 下载地址失效了,改从这里下载:https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.7.14在地址栏输入cmd并回车启动命令行,然后输入以下命令启动管理功能:rabbitmq-plugins enable rabbitmq_management
Java客户端amqp-client-5.7.1.jar 是RabbitMQ官方提供的Java客户端:https://www./tutorials/tutorial-one-java.html既可以直接下载jar包,也可以在Maven中添加依赖:<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.14.2</version> </dependency>
生产消息import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel;
public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ... } }
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
}
声明消息队列,并发送Hello World! 消息到队列中:channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'");
https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/Send.javaimport com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println(" [x] Sent '" + message + "'"); } } }
消费消息消费消息的代码跟生产消息的代码类似,也需要导包,建立连接:import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback;
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
} }
导包中有个DeliverCallback ,通过它就能消费消息:DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
因为发送消息和接收消息都是异步的,所以它叫做,callback,回调。https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/Recv.javaimport com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets;
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } }
运行代码运行Send.java 生产消息后,能看到RabbitMQ后台已经有1条消息:运行Recv.java 消费消息后,能看到队列中已经没有消息了:而消费者仍然保持着连接,持续监控新消息。如果把消费者停掉,连接就会断开。https://www./tutorials/tutorial-one-java.htmlhttp://www./#/architect/mall_arch_09?id=rabbitmq
|