订阅体系结构和RabbitMQ使用Mule应用程序设计模式(订阅体系结构和RabbitMQ使用Mule应用程序设计模式)

在本文中,我们将演示如何实现Mule Pub / Sub设计模式的应用程序使用rabbitmq作为消息队列代理。

每天‬分享‬最新‬软件‬开发‬,Devops,敏捷‬,测试‬以及‬项目‬管理‬最新‬,最热门‬的‬文章‬,每天‬花‬3分钟‬学习‬何乐而不为‬,希望‬大家‬点赞‬,评论,加‬关注‬,你的‬支持‬是我‬最大‬的‬动力‬。

在本文中,我们将演示如何实现Mule应用程序发布—订阅设计模式。 在这里我们将使用消息队列代理RabbitMQ。

  • 发布/订阅体系结构: 的 发布-订阅模式 ,也被称为Pub / Sub,是一种建筑设计模式,提供了一个框架之间交换消息的发布者和订阅者。 这种模式包括发布者和订阅者依靠继电器信息发布者的message broker用户。
  • RabbitMQ : 它是一个开源和独立于平台的消息代理软件,最初的实现 高级消息队列协议 。 它已经被扩展的插件架构支持流媒体面向文本的消息传递协议和MQ遥测传输。 在MuleSoft,我们将使用一个AMQP连接器的集成 。
先决条件

我们将使用下面的工具来实现这一集成场景。 这是局限于Windows 64位操作系统版本。 RabbitMQ之前安装Erlang是依赖软件。

  • Erlang 24.3.4
  • RabbitMQ 3.10.5
  • Mule Anypoint Studio 7.11或最新的
  • Mule Anypoint平台账户
安装/设置

我们将执行安装和配置RabbitMQ的从一开始,随着依赖。

Erlang安装

一旦Erlang软件下载,按照以下步骤安装它。

  • 双击otp_win64_24.3.4。 exe并单击“下一步”。

订阅体系结构和RabbitMQ使用Mule应用程序设计模式(订阅体系结构和RabbitMQ使用Mule应用程序设计模式)(1)

  • 选择安装路径,点击“安装”。

订阅体系结构和RabbitMQ使用Mule应用程序设计模式(订阅体系结构和RabbitMQ使用Mule应用程序设计模式)(2)

  • 点击“下一个&安装。” 那么你应该看到如下所示的窗口。 一旦它完成100%我们就可以点击关闭按钮。

订阅体系结构和RabbitMQ使用Mule应用程序设计模式(订阅体系结构和RabbitMQ使用Mule应用程序设计模式)(3)

  • 一旦安装成功,然后设置环境变量Erlang安装目录如下。 这就完成了安装Erlang。

订阅体系结构和RabbitMQ使用Mule应用程序设计模式(订阅体系结构和RabbitMQ使用Mule应用程序设计模式)(4)

RabbitMQ安装

一旦RabbitMQ软件下载,通过以下步骤安装它。

  • 双击rabbitmq-server-3.10.5。 exe并单击“下一步”。

订阅体系结构和RabbitMQ使用Mule应用程序设计模式(订阅体系结构和RabbitMQ使用Mule应用程序设计模式)(5)

  • 选择安装路径,点击“安装”。

订阅体系结构和RabbitMQ使用Mule应用程序设计模式(订阅体系结构和RabbitMQ使用Mule应用程序设计模式)(6)

  • 点击“下一步”,完成。

订阅体系结构和RabbitMQ使用Mule应用程序设计模式(订阅体系结构和RabbitMQ使用Mule应用程序设计模式)(7)

  • 一旦安装完成后,打开命令提示符,去“C: \ Program Files \ RabbitMQ服务器\ rabbitmq_server-3.10.5 \ sbin。” 执行以下命令。

rabbitmq-plugins.bat enable rabbitmq_management

订阅体系结构和RabbitMQ使用Mule应用程序设计模式(订阅体系结构和RabbitMQ使用Mule应用程序设计模式)(8)

  • 以上后,执行命令 rabbitmq-plugins启用rabbitmq_shovel rabbitmq_shovel_management 。 然后你应该看到:

订阅体系结构和RabbitMQ使用Mule应用程序设计模式(订阅体系结构和RabbitMQ使用Mule应用程序设计模式)(9)

我们完成安装和设置。 这可以通过点击验证http://localhost: 15672(默认凭证:客人/客人)。

订阅体系结构和RabbitMQ使用Mule应用程序设计模式(订阅体系结构和RabbitMQ使用Mule应用程序设计模式)(10)

登录成功后,集成的目的,创建一个新用户(admin / admin)与专用虚拟主机“MuleIntDev。”

订阅体系结构和RabbitMQ使用Mule应用程序设计模式(订阅体系结构和RabbitMQ使用Mule应用程序设计模式)(11)

我们完成了对RabbitMQ的安装和配置。

Mule应用程序实现

在这个应用程序中,我们将使用一个AMQP连接器从RabbitMQ发布和订阅消息。 流将发布一个消息交换和另一个流将消耗从队列的消息。

AMQP操作

不同的操作,下面是一个简短的描述。

  • 发布: 这是用于发布消息。
  • 消费: 这是用于订阅消息,但它不会自动创建任何活跃的消费者应用程序部署。 这不能被用作源在一个流。 流只能使用流参考执行,等等。
  • 听众: 这是用于订阅消息。 它会自动创建一个活跃的消费者应用程序部署。
  • 发布消费: 发送消息到一个AMQP交换和等待响应所提供的replyTo目的地或动态创建的临时目的地。
  • :这是用于ACK AMQP消息交付。
  • 拒绝: 这是用来拒绝一个AMQP消息交付。

创建一个新的Mule应用程序后,拖拽 HTTP侦听器 “AMQP”,然后添加模块如下。

订阅体系结构和RabbitMQ使用Mule应用程序设计模式(订阅体系结构和RabbitMQ使用Mule应用程序设计模式)(12)

前面的列表的AMQP连接器业务,对于本文,我们将使用 P 出版 l istener 操作。

现在拖拽 发布 操作到消息流表面。 然后配置AMQP如下。

订阅体系结构和RabbitMQ使用Mule应用程序设计模式(订阅体系结构和RabbitMQ使用Mule应用程序设计模式)(13)

创建一个交易所RabbitMQ门户如下。

订阅体系结构和RabbitMQ使用Mule应用程序设计模式(订阅体系结构和RabbitMQ使用Mule应用程序设计模式)(14)

在这之后,创建一个队列的名称”员工。 MuleESB”和绑定此队列“骡子:员工。” 一次 出版商 发布消息的交换,那么同样的信息将被路由到此队列订阅的用户。

订阅体系结构和RabbitMQ使用Mule应用程序设计模式(订阅体系结构和RabbitMQ使用Mule应用程序设计模式)(15)

订阅体系结构和RabbitMQ使用Mule应用程序设计模式(订阅体系结构和RabbitMQ使用Mule应用程序设计模式)(16)

现在输入交易名称为“骡子:员工” 发布 操作连接器配置如下。

订阅体系结构和RabbitMQ使用Mule应用程序设计模式(订阅体系结构和RabbitMQ使用Mule应用程序设计模式)(17)

发布一条消息为RabbitMQ的最终实现使用AQMP协议如下。

现在部署应用程序和测试下面的负载作为一个员工样本有效载荷。

要求:

{ "Employees": [ { "userId": "Test rirani", "jobTitleName": "Developer", "firstName": "Romin", "lastName": "Irani", "preferredFullName": "Test Romin Irani", "employeeCode": "E1", "region": "CA", "phoneNumber": "408-1299967", "emailAddress": "romin.k.test@gmail.com" } ] }

现在,我们可以监视RabbitMQ交换,消息发布到交换,消息的队列状态到达自动进入队列。

订阅体系结构和RabbitMQ使用Mule应用程序设计模式(订阅体系结构和RabbitMQ使用Mule应用程序设计模式)(18)

现在,拖拽 侦听器 操作到消息流面积来实现 消费 操作。 的 侦听器 会听源配置。 在这种情况下,活跃的消费者将自动创建RabbitMQ一旦应用程序部署。 下面是实现的高级概述。

订阅体系结构和RabbitMQ使用Mule应用程序设计模式(订阅体系结构和RabbitMQ使用Mule应用程序设计模式)(19)

一旦应用程序部署,消费者会自动创建,如果任何消息到达队列,那么同样的将认购。

订阅体系结构和RabbitMQ使用Mule应用程序设计模式(订阅体系结构和RabbitMQ使用Mule应用程序设计模式)(20)

正如前面有一个信息,即存在的,同样是选择和加工。 因此,队列变空。

这是全面实施的Mule配置文件。

<?xml version="1.0" encoding="UTF-8"?> <mule xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core" xmlns:amqp="http://www.mulesoft.org/schema/mule/amqp" xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd http://www.mulesoft.org/schema/mule/amqp http://www.mulesoft.org/schema/mule/amqp/current/mule-amqp.xsd http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd"> <http:listener-config name="HTTP_Listener_config" doc:name="HTTP Listener config" doc:id="5f227be9-f9f0-4b70-9177-c065634a9a5d" > <http:listener-connection host="0.0.0.0" port="8081" /> </http:listener-config> <amqp:config name="AMQP_Config" doc:name="AMQP Config" doc:id="a65f4585-2300-42a3-9aac-0e8cdebdc8a6" > <amqp:connection host="localhost" virtualHost="MuleIntDev" username="admin" password="admin" /> </amqp:config> <flow name="employee-rabbitmq-sys-apiFlow" doc:id="6e50b2b8-69a2-47db-9cd9-965076a8a927" > <http:listener doc:name="Listener" doc:id="309df0b7-e8c1-4fe0-a2f4-764f1ba87eef" config-ref="HTTP_Listener_config" path="/api/v1/rabbitmq-demo"/> <logger level="INFO" doc:name="BeforePublish" doc:id="b2316c6f-a0e6-473b-8939-bc3af693e73c" message="**Before Publish Into RabbitMQ**" category="**RabbitMQ**"/> <amqp:publish doc:name="Publish" doc:id="00f1b4c1-1177-4dc9-a843-bbc37a0ab85f" config-ref="AMQP_Config" exchangeName="Mule:Employee"/> <logger level="INFO" doc:name="AfterPublish" doc:id="706ce978-557c-4ba3-ad47-745c24537392" message="**After Publish Into RabbitMQ**" category="**RabbitMQ**"/> <ee:transform doc:name="final_transform" doc:id="52ae715d-ab9c-4cd2-9bde-eb17175e8620" > <ee:message > <ee:set-payload ><![CDATA[%dw 2.0 output application/json --- { "status": "Message published successfully" }]]></ee:set-payload> </ee:message> </ee:transform> <error-handler > <on-error-propagate enableNotifications="true" logException="true" doc:name="On Error Propagate" doc:id="567ef1e9-37ec-4215-8640-19b6a51d809f" > <ee:transform doc:name="Transform Message" doc:id="e65e9317-e315-46dd-b4ab-36367a300be8" > <ee:message > <ee:set-payload ><![CDATA[%dw 2.0 output application/json --- error]]></ee:set-payload> </ee:message> </ee:transform> </on-error-propagate> </error-handler> </flow> <flow name="employee-rabbitmq-sys-apiFlow1" doc:id="52121737-50f5-4def-8a04-3611c6a5f3f1" initialState="stopped"> <amqp:listener doc:name="Listener" doc:id="29358f30-ac13-4c2a-8998-cb0c10ac7c49" config-ref="AMQP_Config" queueName="Employee.MuleESB"/> <logger level="INFO" doc:name="Consumed_Message" doc:id="1c123130-b666-4519-a68a-1c53bfd40ac2" message="#[payload]" category="**RabbitMQ**"/> </flow> <flow name="employee-rabbitmq-sys-apiFlow2" doc:id="c350d2aa-cb56-4252-b1db-8597a846874a" > <amqp:consume doc:name="Consume" doc:id="a8745777-d895-4b7f-98d3-574737d8fb44" config-ref="AMQP_Config" queueName="Employee:MuleESB"/> </flow> </mule>

我们完成了 Pub / Sub设计模式 实现。 学习快乐!

,

免责声明:本文仅代表文章作者的个人观点,与本站无关。其原创性、真实性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容文字的真实性、完整性和原创性本站不作任何保证或承诺,请读者仅作参考,并自行核实相关内容。文章投诉邮箱:anhduc.ph@yahoo.com

    分享
    投诉
    首页