快速使用
1.pom添加依赖
1 2 3 4 5
| <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.3</version> </dependency>
|
2.yml添加配置
version 2.2.0以上才是这么配置
1 2 3 4
| rocketmq: name-server: ${serverip}:9876 producer: group: message-producer-group
|
3.编写Producer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| @SpringBootTest(classes = RocketMQApplication.class) @RunWith(SpringRunner.class) public class commonProducer {
@Autowired private RocketMQTemplate rocketMQTemplate;
@Test public void sendSyncMessage() { String msg = "我是一个同步消息"; rocketMQTemplate.syncSend("commonSyncTopic", msg); }
@Test public void sendAsyncMessage() { String msg = "我是一个异步消息"; rocketMQTemplate.asyncSend("commonAsyncTopic", msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println("成功"); }
@Override public void onException(Throwable throwable) { System.out.println("失败" + throwable.getMessage()); } }); }
@Test public void sendOneWayMessage() { String msg = "我是一个单向消息"; rocketMQTemplate.syncSend("commonOneWayTopic", msg); }
@Test public void sendDelayMessage() { Message<String> message = MessageBuilder.withPayload("我是一个延迟消息").build(); System.out.println(new Date()); rocketMQTemplate.syncSend("commonDelayTopic", message,3000,3); } }
|
4.编写Consumer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| @Component @RocketMQMessageListener( consumerGroup = "commonAsyncMessage-consumer-group", topic = "commonAsyncTopic") public class CommonAsyncConsumer implements RocketMQListener<MessageExt> {
@Override public void onMessage(MessageExt messageExt) { System.out.println("接收到消息" + new String(messageExt.getBody())); } }
@Component @RocketMQMessageListener( consumerGroup = "commonAsyncMessage-consumer-group", topic = "commonAsyncTopic") public class CommonAsyncConsumer implements RocketMQListener<String> {
@Override public void onMessage(String message) { System.out.println("接收到消息" + message); } }
|
5.发送消息
启动producer内的测试方法,以及启动项目的启动类