1.创建项目(此处忽略)

2.引入依赖

在pom.xml文件中引入如下依赖

com.rabbitmq

amqp-client

5.6.0

org.apache.maven.plugins

maven-compiler-plugin

3.8.0

1.8

1.8

3.编写生产者

package com.tjrac.rabbitmq.TestDemo;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import java.util.Map;

public class Producer {

public static void main(String[] args) throws Exception {

//创建连接工厂

ConnectionFactory connectionFactory = new ConnectionFactory();

//主机地址

connectionFactory.setHost("192.168.100.110");

//连接端口;默认为 5672

connectionFactory.setPort(5672);

//虚拟主机名称;默认为 /

connectionFactory.setVirtualHost("/");

//连接用户名;默认为guest

connectionFactory.setUsername("admin");

//连接密码;默认为guest

connectionFactory.setPassword("123456");

//创建连接

Connection connection = connectionFactory.newConnection();

//创建频道

Channel channel = connection.createChannel();

// 声明(创建)队列

/**

* queue 参数1:队列名称

* durable 参数2:是否定义持久化队列,当mq重启之后,还在

* exclusive 参数3:是否独占本次连接

* ① 是否独占,只能有一个消费者监听这个队列

* ② 当connection关闭时,是否删除队列

* autoDelete 参数4:是否在不使用的时候自动删除队列,当没有consumer时,自动删除

* arguments 参数5:队列其它参数

*/

channel.queueDeclare("simple_queue", true, false, false, null);

// 要发送的信息

String message = "Hello RabbitMQ";

/**

* 参数1:交换机名称,如果没有指定则使用默认Default Exchage

* 参数2:路由key,简单模式可以传递队列名称

* 参数3:配置信息

* 参数4:消息内容

*/

channel.basicPublish("", "simple_queue", null, message.getBytes());

System.out.println("已发送消息:" + message);

// 关闭资源

channel.close();

connection.close();

}

}

4.编写消费者

package com.tjrac.rabbitmq.TestDemo;

import com.rabbitmq.client.*;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

public class Consumer {

public static void main(String[] args) throws Exception {

//1.创建连接工厂

ConnectionFactory factory = new ConnectionFactory();

//2. 设置参数

factory.setHost("192.168.110.110");//ip

factory.setPort(5672); //端口 默认值 5672

factory.setVirtualHost("/");//虚拟机 默认值/

factory.setUsername("admin");//用户名

factory.setPassword("123456");//密码

//3. 创建连接 Connection

Connection connection = factory.newConnection();

//4. 创建Channel

Channel channel = connection.createChannel();

//5. 创建队列Queue

/*

queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments)

参数:

1. queue:队列名称

2. durable:是否持久化,当mq重启之后,还在

3. exclusive:

* 是否独占。只能有一个消费者监听这队列

* 当Connection关闭时,是否删除队列

4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉

5. arguments:参数。

*/

//如果没有一个名字叫simple_queue的队列,则会创建该队列,如果有则不会创建

channel.queueDeclare("simple_queue",true,false,false,null);

// 接收消息

DefaultConsumer consumer = new DefaultConsumer(channel){

/*

回调方法,当收到消息后,会自动执行该方法

1. consumerTag:标识

2. envelope:获取一些信息,交换机,路由key...

3. properties:配置信息

4. body:数据

*/

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

System.out.println("consumerTag:"+consumerTag);

System.out.println("Exchange:"+envelope.getExchange());

System.out.println("RoutingKey:"+envelope.getRoutingKey());

System.out.println("properties:"+properties);

System.out.println("body:"+new String(body));

}

};

/*

basicConsume(String queue, boolean autoAck, Consumer callback)

参数:

1. queue:队列名称

2. autoAck:是否自动确认 ,类似咱们发短信,发送成功会收到一个确认消息

3. callback:回调对象

*/

// 消费者类似一个监听程序,主要是用来监听消息

channel.basicConsume("simple_queue",true,consumer);

}

}

相关阅读

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: