基于Redisson实现任务的多实例竞争控制与重试机制
|字数总计:1.1k|阅读时长:4分钟|阅读量:|
在分布式系统中,多个实例可能会同时尝试执行某些任务,例如监听数据库变更、执行定时任务等。为了避免任务的重复执行,我们通常需要一种竞争控制机制,以确保只有一个实例能够执行任务,同时在失败时支持自动重试。本文介绍一种基于 Redis 分布式锁的解决方案,结合 Redisson 和 MongoDB Change Streams,实现任务的多实例竞争控制与自动重试。
方案背景
在 矿小圈 中,需要从 MongoDB 同步数据到 Elasticsearch,以实现高效的搜索功能。由于该项目是 微服务架构,但 MongoDB Change Streams 监听只能由一个实例执行,因此需要一种机制来确保在多个实例部署的情况下,只有一个实例 监听 MongoDB 的变更,同时为了 避免单实例负载过高,需要 定期释放锁,让其他实例有机会获取锁并执行任务。这就形成了 多实例竞争唯一任务的机制。
方案概述
本方案基于 Redis 的分布式锁来确保 多个实例竞争一个任务,即 MongoDB Change Streams 监听。
- 单实例监听:同一时间只能有一个实例在监听 MongoDB。
- 定期释放锁:避免某个实例长期持有任务,导致压力过大。
- 自动重试:监听失败或实例崩溃时,其他实例会重新竞争获取锁。
- 任务高可用:防止某个实例崩溃导致任务丢失。
代码实现
抽象监听器类
我们定义一个 AbstractMongoChangeStreamsListener
抽象类,封装 Redis 锁的获取、释放及 MongoDB 监听的核心逻辑。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
| @Slf4j public abstract class AbstractMongoChangeStreamsListener { protected MongoTemplate mongoTemplate; protected RedisTemplate<String, String> redisTemplate; protected RedissonClient redissonClient; private RLock lock; private final ScheduledExecutorService listeningScheduler = Executors.newSingleThreadScheduledExecutor(); private final ExecutorService restartExecutor = Executors.newSingleThreadExecutor(); protected volatile boolean locked = false; private final AtomicBoolean isRestarting = new AtomicBoolean(false); private volatile long requestId;
protected final String COLLECTION; private final String LOCK_KEY; private static final long TRY_LOCK_TIME = 5; private static final long LOCK_EXPIRE_TIME = 30; private static final long LOCK_RENEW_INTERVAL = LOCK_EXPIRE_TIME / 3;
protected AbstractMongoChangeStreamsListener( MongoTemplate mongoTemplate, RedisTemplate<String, String> redisTemplate, RedissonClient redissonClient, String collection ) { this.mongoTemplate = mongoTemplate; this.redisTemplate = redisTemplate; this.redissonClient = redissonClient; this.COLLECTION = collection; this.LOCK_KEY = "mongo:change_streams_lock:" + collection; }
@PostConstruct private void startListening() { this.lock = redissonClient.getLock(LOCK_KEY); tryToAcquireLockAndListen(); }
private void tryToAcquireLockAndListen() { listeningScheduler.scheduleWithFixedDelay(() -> { requestId = Thread.currentThread().threadId(); try { if (lock.tryLock(TRY_LOCK_TIME, LOCK_EXPIRE_TIME, TimeUnit.SECONDS)) { log.info("[{}] 🔓 获取到 Redis 锁,开始监听 MongoDB Change Streams", COLLECTION); locked = true; watchMongoChangeStreams(); lock.unlock(); locked = false; } else { log.info("[{}] 🔒 未获取到 Redis 锁,等待下次尝试", COLLECTION); } } catch (Exception e) { log.error("[{}] Redis 锁获取 & MongoDB Change Streams 监听异常,准备重启 | ", COLLECTION, e); stopListeningAndRestart(); } }, TRY_LOCK_TIME, TRY_LOCK_TIME, TimeUnit.SECONDS); }
@Scheduled(cron = "*/30 * * * * *") private void releaseLock() { if (lock.isLocked()) { log.info("[{}] 🔓 定时任务释放 Redis 锁", COLLECTION); lock.forceUnlock(); } }
private void stopListeningAndRestart() { restartExecutor.submit(() -> { if (!isRestarting.compareAndSet(false, true)) { return; } locked = false; try { log.info("[{}] 🔄 重新启动监听", COLLECTION); tryToAcquireLockAndListen(); } finally { isRestarting.set(false); } }); }
public abstract void watchMongoChangeStreams(); }
|
具体监听器实现
TagChangeStreamsListener
继承 AbstractMongoChangeStreamsListener
,监听 tag
集合的变更。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Slf4j @Component public class TagChangeStreamsListener extends AbstractMongoChangeStreamsListener { @Autowired TagChangeStreamsListener( MongoTemplate mongoTemplate, RedisTemplate<String, String> redisTemplate, RedissonClient redissonClient ) { super(mongoTemplate, redisTemplate, redissonClient, "tag"); }
@Override public void watchMongoChangeStreams() { var cursor = mongoTemplate.getCollection(COLLECTION) .watch() .cursor(); while (locked && cursor.hasNext()) { ChangeStreamDocument<Document> next = cursor.next(); log.info("[{}] 🔍 ResumeToken: {}", COLLECTION, next.getResumeToken()); log.info("[{}] 🔍 MongoDB Change Streams 数据: {}", COLLECTION, next.getDocumentKey()); } } }
|
完整代码可在FlyingForum获取:
search模块 com.atcumt.search.listener.mongo.template包 AbstractMongoChangeStreamsListener抽象类
方案优势
- 单实例监听,避免重复消费:确保 MongoDB Change Streams 只有一个实例在监听。
- 多实例竞争,均衡负载:定期释放锁,使不同实例都有机会监听,防止单实例压力过大。
- 自动重试,提升可用性:任务异常时,其他实例可以接管任务,防止数据丢失。
适用场景
- MongoDB 数据同步到 Elasticsearch:确保搜索数据的实时性。
- 定时任务调度:多实例情况下避免任务重复执行。
结论
本文详细介绍了 矿小圈 如何基于 Redis 分布式锁,实现 MongoDB Change Streams 监听的多实例竞争,并通过定期释放锁的方式,实现 任务均衡分配,确保任务高可用、高可靠性。