在分布式系统中,多个实例可能会同时尝试执行某些任务,例如监听数据库变更、执行定时任务等。为了避免任务的重复执行,我们通常需要一种竞争控制机制,以确保只有一个实例能够执行任务,同时在失败时支持自动重试。本文介绍一种基于 Redis 分布式锁的解决方案,结合 Redisson 和 MongoDB Change Streams,实现任务的多实例竞争控制与自动重试。

方案背景

矿小圈 中,需要从 MongoDB 同步数据到 Elasticsearch,以实现高效的搜索功能。由于该项目是 微服务架构,但 MongoDB Change Streams 监听只能由一个实例执行,因此需要一种机制来确保在多个实例部署的情况下,只有一个实例 监听 MongoDB 的变更,同时为了 避免单实例负载过高,需要 定期释放锁,让其他实例有机会获取锁并执行任务。这就形成了 多实例竞争唯一任务的机制

方案概述

本方案基于 Redis 的分布式锁来确保 多个实例竞争一个任务,即 MongoDB Change Streams 监听

  • 单实例监听:同一时间只能有一个实例在监听 MongoDB。
  • 定期释放锁:避免某个实例长期持有任务,导致压力过大。
  • 自动重试:监听失败或实例崩溃时,其他实例会重新竞争获取锁。
  • 任务高可用:防止某个实例崩溃导致任务丢失。

代码实现

抽象监听器类

我们定义一个 AbstractMongoChangeStreamsListener 抽象类,封装 Redis 锁的获取、释放及 MongoDB 监听的核心逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
@Slf4j
public abstract class AbstractMongoChangeStreamsListener {
protected MongoTemplate mongoTemplate;
protected RedisTemplate<String, String> redisTemplate;
protected RedissonClient redissonClient;
private RLock lock;
private final ScheduledExecutorService listeningScheduler = Executors.newSingleThreadScheduledExecutor();
private final ExecutorService restartExecutor = Executors.newSingleThreadExecutor();
protected volatile boolean locked = false;
private final AtomicBoolean isRestarting = new AtomicBoolean(false);
private volatile long requestId;

protected final String COLLECTION;
private final String LOCK_KEY;
private static final long TRY_LOCK_TIME = 5;
private static final long LOCK_EXPIRE_TIME = 30;
private static final long LOCK_RENEW_INTERVAL = LOCK_EXPIRE_TIME / 3;

protected AbstractMongoChangeStreamsListener(
MongoTemplate mongoTemplate,
RedisTemplate<String, String> redisTemplate,
RedissonClient redissonClient,
String collection
) {
this.mongoTemplate = mongoTemplate;
this.redisTemplate = redisTemplate;
this.redissonClient = redissonClient;
this.COLLECTION = collection;
this.LOCK_KEY = "mongo:change_streams_lock:" + collection;
}

@PostConstruct
private void startListening() {
this.lock = redissonClient.getLock(LOCK_KEY);
tryToAcquireLockAndListen();
}

private void tryToAcquireLockAndListen() {
listeningScheduler.scheduleWithFixedDelay(() -> {
requestId = Thread.currentThread().threadId();
try {
if (lock.tryLock(TRY_LOCK_TIME, LOCK_EXPIRE_TIME, TimeUnit.SECONDS)) {
log.info("[{}] 🔓 获取到 Redis 锁,开始监听 MongoDB Change Streams", COLLECTION);
locked = true;
watchMongoChangeStreams();
lock.unlock();
locked = false;
} else {
log.info("[{}] 🔒 未获取到 Redis 锁,等待下次尝试", COLLECTION);
}
} catch (Exception e) {
log.error("[{}] Redis 锁获取 & MongoDB Change Streams 监听异常,准备重启 | ", COLLECTION, e);
stopListeningAndRestart();
}
}, TRY_LOCK_TIME, TRY_LOCK_TIME, TimeUnit.SECONDS);
}

/**
* 定期释放锁,保证多个实例有机会获取锁,避免某个实例长期占用任务。
*/
@Scheduled(cron = "*/30 * * * * *")
private void releaseLock() {
if (lock.isLocked()) {
log.info("[{}] 🔓 定时任务释放 Redis 锁", COLLECTION);
lock.forceUnlock();
}
}

private void stopListeningAndRestart() {
restartExecutor.submit(() -> {
if (!isRestarting.compareAndSet(false, true)) {
return;
}
locked = false;
try {
log.info("[{}] 🔄 重新启动监听", COLLECTION);
tryToAcquireLockAndListen();
} finally {
isRestarting.set(false);
}
});
}

public abstract void watchMongoChangeStreams();
}

具体监听器实现

TagChangeStreamsListener 继承 AbstractMongoChangeStreamsListener,监听 tag 集合的变更。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Slf4j
@Component
public class TagChangeStreamsListener extends AbstractMongoChangeStreamsListener {
@Autowired
TagChangeStreamsListener(
MongoTemplate mongoTemplate,
RedisTemplate<String, String> redisTemplate,
RedissonClient redissonClient
) {
super(mongoTemplate, redisTemplate, redissonClient, "tag");
}

@Override
public void watchMongoChangeStreams() {
var cursor = mongoTemplate.getCollection(COLLECTION)
.watch()
.cursor();
while (locked && cursor.hasNext()) {
ChangeStreamDocument<Document> next = cursor.next();
log.info("[{}] 🔍 ResumeToken: {}", COLLECTION, next.getResumeToken());
log.info("[{}] 🔍 MongoDB Change Streams 数据: {}", COLLECTION, next.getDocumentKey());
}
}
}

完整代码可在FlyingForum获取:

search模块 com.atcumt.search.listener.mongo.template包 AbstractMongoChangeStreamsListener抽象类

方案优势

  • 单实例监听,避免重复消费:确保 MongoDB Change Streams 只有一个实例在监听。
  • 多实例竞争,均衡负载:定期释放锁,使不同实例都有机会监听,防止单实例压力过大。
  • 自动重试,提升可用性:任务异常时,其他实例可以接管任务,防止数据丢失。

适用场景

  • MongoDB 数据同步到 Elasticsearch:确保搜索数据的实时性。
  • 定时任务调度:多实例情况下避免任务重复执行。

结论

本文详细介绍了 矿小圈 如何基于 Redis 分布式锁,实现 MongoDB Change Streams 监听的多实例竞争,并通过定期释放锁的方式,实现 任务均衡分配,确保任务高可用、高可靠性。