每个Web开发者的目标都是创建一个能够吸引广泛用户的产品。然而,这伴随着一些问题,其中最主要的是可扩展性问题,以应对用户需求的激增。
如果不加以解决,这可能导致服务之间的通信混乱,破坏确保数据库事务有序进行的措施。但幸运的是,我们有消息代理来解决这个问题。
在本文中,我们将重点介绍消息队列作为后端开发最佳实践的重要性,相关的使用案例和流行的消息队列工具,以及如何在后端应用程序中实现消息队列。
为了能够跟随本文进行实践,您需要具备以下知识:
在分布式系统中,同时发送多个请求和队列。消息队列的概念使得消息能够有序地存储,允许消息和请求的接收者相应地处理它们。
它以异步方式运行,允许分布式系统的不同组件独立运行。有了这些机制,无论系统是否停机,发送给接收者的消息最终都会得到处理。消息会安全地存储,直到被确认。
以下是一些消息代理的实际用例:
在下一节中,我们将讨论提供消息队列功能的工具。
许多应用程序和服务提供消息队列功能。其中一些服务嵌入在商业云基础设施提供商中。以下是一些常用的消息队列服务:
我们将使用 RabbitMQ Cloud-as-a-service 应用程序来处理我们的消息,因为它受欢迎且易于使用。以下是文档链接。您还可以查看上面提供的其他消息队列应用程序。
接下来,我们将开发一个演示项目,利用消息队列功能。
在这个项目中,我们将使用 RabbitMQ 作为服务云平台,构建一个简单的消息代理系统,允许两个 Node.js 服务器之间的无缝、有序通信。
在本教程中,我们将创建一个消息发布者作为发送者,以及一个消息消费者来接收消息。
首先,我们需要创建两个将相互通信的 Node.js 服务器。
您可以创建两个不同的文件,并使用 npm init
初始化一个 Node 项目。
之后,您可以安装相关的包,以帮助实现这些功能。我们将使用 amqplib
库,这是一个用于 RabbitMQ 的 Node 库实现。
这个包允许我们通过 Node.js 应用程序快速与 RabbitMQ 通信。由于其内置的创建队列、发布消息和消费消息的函数,它能够无缝地实现这一点。稍后将详细讨论其用法。
要在我们的项目中安装此包,请执行:
npm i amqplib
接下来,我们将起草发布函数。之后,我们需要在项目中初始化 amqplib
。
const amp = require(“amqplib”)
此外,我们需要设置我们的 RabbitMQ 代理来管理我们的消息。
创建 RabbitMQ 服务器有几种方法,最流行的是在家庭计算机上安装它们,然后设置它们与后端服务器交互。您可以在此处下载软件。然而,为了便于使用,我们将使用基于云的 RabbitMQ 代理即服务应用程序来生成我们的服务器。
要完成此操作,请导航到 https://www.cloudamqp.com/ 并创建一个帐户。在本教程中,创建并配置了一个实例到离我最近的区域。成功创建实例后,将提供 RabbitMQ 的详细信息。
接下来,我们将创建一个消息队列,双方可以使用它作为连接管道。我们将首先创建一个发送消息的函数。
async function sendMessage(msg) {
try {
const connection = await amqp.connect(url);
const channel = await connection.createChannel();
await channel.assertQueue(queue);
await channel.sendToQueue(queue, Buffer.from(msg));
} catch (err) {
console.error("Failed to send message:", err);
}
}
在上面的代码中,确保并维护了连接。之后,还创建了一个通信通道。执行 assertQueue
函数时,确保现有队列得到维护,如果不存在则创建队列。
附加到函数的消息被缓冲,然后发送到创建的队列。
async function receiveMessage() {
try {
const connection = await amqp.connect(url);
const channel = await connection.createChannel();
await channel.assertQueue(queue);
await channel.consume(queue, (msg) => {
console.log(`Received message: ${msg.content.toString()}`);
channel.ack(msg);
});
} catch (err) {
console.error("Failed to receive message:", err);
}
}
接收函数通过在确切的队列上执行 consume
方法来接收进入队列的任何消息。在我们的例子中,消息作为日志消息输出。
然后执行 ack
函数以确认从队列接收到的消息。
以下是完整的项目代码:
消息消费者代码:
const amqp = require("amqplib");
const url = "amqp://localhost"; // 替换为您的 RabbitMQ 服务器 URL
const queue = "queue";
async function receiveMessage() {
try {
const connection = await amqp.connect(url);
const channel = await connection.createChannel();
await channel.assertQueue(queue);
await channel.consume(queue, (msg) => {
if (msg !== null) {
console.log(`Received message: ${msg.content.toString()}`);
channel.ack(msg);
}
});
} catch (err) {
console.error("Failed to receive message:", err);
}
}
receiveMessage();
消息发布者代码:
const amqp = require("amqplib");
const url = "amqp://localhost"; // 替换为您的 RabbitMQ 服务器 URL
const queue = "queue";
async function sendMessage(msg) {
try {
const connection = await amqp.connect(url);
const channel = await connection.createChannel();
await channel.assertQueue(queue);
await channel.sendToQueue(queue, Buffer.from(msg));
console.log(`Message sent to ${queue}: ${msg}`);
await channel.close();
await connection.close();
} catch (err) {
console.error("Failed to send message:", err);
}
}
sendMessage("Hello, world!");
以下是代码的输出:
到目前为止,我们已经完成了关于消息队列及其在促进各种系统之间无缝通信中的作用的教程。为了进一步提高您的技能,以下是构建复杂服务时应实施的一些额外最佳实践:
我们强调了消息代理的重要性以及如何在后端应用程序中实现消息队列。
欢迎查看我的其他文章。下次见,继续编码!