背景:項目中一個場景需要用java端的處理代碼獲取php端放到rabbitmq內(nèi)的消息,然后做相應(yīng)業(yè)務(wù)的處理。
前提:rabbitmq服務(wù)器已經(jīng)搭建好,php端的消息發(fā)布正常運行。
首先:下載rabbitmq-client對應(yīng)的java版jar包(spring好像有相應(yīng)的支持)
開始代碼coding的工作,上代碼
package com.eelly.imagesearch.common;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.GetResponse;
public class RabbitMqControll {
/**
* 讀取RabbitMq中的存儲信息
*
* @param queue_name 隊列名
* @param exchange_name 交換機名
* @param route_key 綁定用到的route_key
* @param durable 是否持久化
*/
public void readRabbitMqInfo (String queue_name,
String exchange_name, String route_key, boolean durable)
{
ConnectionFactory factory = new ConnectionFactory();
// 設(shè)置服務(wù)器ip
factory.setHost("172.18.107.66");
// 設(shè)置rabbitmq服務(wù)器運行的端口
factory.setPort(5672);
// 設(shè)置rabbitmq服務(wù)器連接用戶
factory.setUsername("guest");
// 設(shè)置rabbitmq服務(wù)器連接用戶密碼
factory.setPassword("guest");
// 設(shè)置rabbitmq服務(wù)器節(jié)點目錄(個人理解)
factory.setVirtualHost("/");
try {
// 創(chuàng)建工廠連接
Connection connection = factory.newConnection();
// 創(chuàng)建通道
Channel channel = connection.createChannel();
// 聲明交換機(設(shè)置相關(guān)屬性時需要和php端的一致)
channel.exchangeDeclare(exchange_name, "direct", durable);
// 聲明消息隊列(設(shè)置相關(guān)屬性時需要和php端的一致)
channel.queueDeclare(queue_name, durable, false, true, null);
// 綁定消息隊列(設(shè)置相關(guān)屬性時需要和php端的一致)
channel.queueBind(queue_name, exchange_name, route_key);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// basicConsume消費模式
/*channel.basicQos(1);//消息分發(fā)處理
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queue_name, false, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
// 提交消息處理完成回復(fù)
channel.basicAck(delivery.getEnvelope()。getDeliveryTag(), false);
}*/
// basicGet消費模式
while (true)
{
// get方式主動消費
GetResponse res=channel.basicGet(queue_name, false);
if (res != null && res.getMessageCount() >= 0)
{
System.out.println(res.getMessageCount());
String message = "";
message = new String(res.getBody());
channel.basicAck(res.getEnvelope()。getDeliveryTag(), false);
System.out.println(" [x] Received '" + message + "'");
}
else
{
System.out.println("消息隊列中沒有可消費的信息!");
break;
}
}
channel.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
在開發(fā)的過程中,主要報的異常是:
1.創(chuàng)建交換機和消息隊列時,設(shè)置的屬性和消息產(chǎn)生端的php代碼設(shè)置的不一樣,導(dǎo)致不匹配和一直重寫屬性
2.在調(diào)用時一直沒有確定到底是用basicConsume的消費模式還是basicGet消費模式(前者帶有監(jiān)控效果,后者沒有,不知道是不是因為一者有跳出while循環(huán),一者沒有的原因)托福改分