林幽网
当前位置:首页»财经

RocketMQ------快速体验(单机版)

日期:2019-09-06 来源: 评论:

[摘要]==有彩蛋哦!!!==(或者公众号内点击网赚获取彩蛋)程序员探索之路引言为什么使用消息队列呢?消峰,解耦,异步这些都是使用消息队列的好处;但是项目中引入任务一门中间件时都需要考虑其利弊(维护成本是否大,性能是否稳定,社区是否活跃....)。...……

==有彩蛋哦!!!==(或者公众号内点击网赚获取彩蛋)

程序员探索之路引言

为什么使用消息队列呢?消峰,解耦,异步这些都是使用消息队列的好处;但是项目中引入任务一门中间件时都需要考虑其利弊(维护成本是否大,性能是否稳定,社区是否活跃....)。嘿嘿,说这些都是扯淡,在我公司中RocketMQ起着解耦的作用,由于项目中数据服务覆盖放放面面,有许多公司每天都要从我们项目中拿到相关数据。

单机版部署流程

下载zip

在这里插入图片描述

编辑着两个cmd,主要时修改启动堆大小参数

runserver.cmdset "JAVA_OPT=%JAVA_OPT% -server -Xms500m -Xmx500m -Xmn250m -XX:MetaspaceSize=50m -XX:MaxMetaspaceSize=60m"在其中还可以看到垃圾回收器等信息runboker.cmdset "JAVA_OPT=%JAVA_OPT% -server -Xms500m -Xmx500m -Xmn250m"

启动mqnameser.cmd

在这里插入图片描述

启动 mqbroker.cmd -n localhost:9876

在这里插入图片描述

main版本生产者import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.common.message.Message;import java.util.Scanner;public class RocketProducer { public static void main(String[] args) throws MQClientException { DefaultMQProducer producer = new DefaultMQProducer("my-group"); producer.setNamesrvAddr("localhost:9876"); producer.setInstanceName("rmq-instance"); producer.start(); try { Message message = new Message("demo-topic", "demo-tag", "这是一条测试消息".getBytes()); producer.send(message); while (true) { String text = new Scanner(System.in).next(); Message msg = new Message("demo-topic",// topic "demo-tag",// tag text.getBytes() // body ); SendResult sendResult = producer.send(msg); } } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); }}main版本消费者import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class RocketConsumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-group"); consumer.setNamesrvAddr("localhost:9876"); consumer.setInstanceName("rmq-instance"); consumer.subscribe("demo-topic", "demo-tag"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage( List msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println(new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); }}

生产

在这里插入图片描述

消费

在这里插入图片描述基本概念

过完手瘾之后,还是从头开始,RocketMQ中有许多概念需要了解一下

非常友好的中文官方文档

RocketMQ主要由Producer Broker Consumer三部分组成,其中Producer负责生产消息,Consumer负责消费消息,

Broker负责存储消息。Broker在实际部署过程中对应一台服务器,每个Broker可以存储多个Topic的消息,每个

Topic的消息也可以分片存储于不同的Broker。Message Queue用于存储消息的物理地址,每个Topic中的消息地址

存储于多个Message Queue中。ConsumerGroup由多个Consumer实例构成。

负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到Broker服务器。

RockerMQ提供多种发送方式,同步发送,异步发送,顺序发送,单向发送。同步和异步方式均需要Broker返回取人信息,

单向发送不需要。

负责消费消息,一般都是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息,并将其提供给应用程序。

从用户应用的角度而言提供了两种消费形式:拉取式消费,推动式消费。

表示一类消息的集合,每个主题包含若干条消息,每个消息只能属于一个主题,是RockerMQ进行消息订阅的基本单位。

消息中转角色,负责存储消息,转发消息。代理服务器在RockerMQ系统中负责接收从生产者发送来的消息并存储,

同时为消费者的拉去请求做准备。代理服务器也存储消息相关的元数据,包括消费者组,消费进度偏移,主题,队列消息等

名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题对应的Broker IP列表。多个Namesrv

实例组成继承,但相互独立,没有信息交换。

Consumer消费的一种类型,应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息,主动权由应用控制。

一旦获取了批量消息,应用就会启动消费过程。

Consumer消费的一种类型,该模式下Broker收到数据后会主动推送给消费者端,该消费模式一般实时向比较高

同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者

在发送之后崩溃,则Broker服务器会联系同一生产着组的其他生产者实例以提交或回溯消费。

同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,

实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。

RockerMQ支持两种消费模式:集群消费(Clustering)和广播消费(Broadcasting)。

集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。

广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息

普通顺序消费模式下,消费者通过同意给消费队列收到的消息是有顺序的,不同消息队列收到的消息则可能是无顺序的。

严格顺序消息模式下,消费者收到的所有消息均是有顺序的。

消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。RocketMQ中每个消息

拥有唯一的Message ID,且可以携带具有业务标识的key。系统提供了通过Message ID和Key查询消息的功能。

为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,

可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,

并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性

总结

RocketMQ是阿里巴巴贡献给apache项目,已经成为顶级项目,经受过双十一考验,底层更是以java编写,值得学习。其实单机版(main)只是入门入门级上手,俗话说不高集群的消息队列就是在搞流氓

您至少需要输入5个字

相关内容

编辑精选

copyright © 2017 https://www.lyqzlaw.com 林幽网 版权所有