快速使用

1.pom添加依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>

2.yml添加配置

version 2.2.0以上才是这么配置

1
2
3
4
rocketmq:
name-server: ${serverip}:9876
producer:
group: message-producer-group

3.编写Producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@SpringBootTest(classes = RocketMQApplication.class)
@RunWith(SpringRunner.class)
public class commonProducer {

@Autowired
private RocketMQTemplate rocketMQTemplate;

// 同步消息
@Test
public void sendSyncMessage() {
String msg = "我是一个同步消息";
rocketMQTemplate.syncSend("commonSyncTopic", msg);
}

// 异步消息
@Test
public void sendAsyncMessage() {
String msg = "我是一个异步消息";
rocketMQTemplate.asyncSend("commonAsyncTopic", msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("成功");
}

@Override
public void onException(Throwable throwable) {
System.out.println("失败" + throwable.getMessage());
}
});
}

// 单向消息
@Test
public void sendOneWayMessage() {
String msg = "我是一个单向消息";
rocketMQTemplate.syncSend("commonOneWayTopic", msg);
}

// 延迟消息
@Test
public void sendDelayMessage() {
Message<String> message = MessageBuilder.withPayload("我是一个延迟消息").build();
System.out.println(new Date());
// 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
rocketMQTemplate.syncSend("commonDelayTopic", message,3000,3);
}
}

4.编写Consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 第一种写法: MessageExt有消息头 + 消息体
@Component
@RocketMQMessageListener(
consumerGroup = "commonAsyncMessage-consumer-group",
topic = "commonAsyncTopic")
public class CommonAsyncConsumer implements RocketMQListener<MessageExt> {

@Override
public void onMessage(MessageExt messageExt) {
System.out.println("接收到消息" + new String(messageExt.getBody()));
}
}

// 第二种写法: 只有消息体
@Component
@RocketMQMessageListener(
consumerGroup = "commonAsyncMessage-consumer-group",
topic = "commonAsyncTopic")
public class CommonAsyncConsumer implements RocketMQListener<String> {

@Override
public void onMessage(String message) {
System.out.println("接收到消息" + message);
}
}

5.发送消息

启动producer内的测试方法,以及启动项目的启动类