一、quartz數(shù)據(jù)庫鎖
其中,QRTZ_LOCKS就是Quartz集群實現(xiàn)同步機制的行鎖表,其表結(jié)構(gòu)如下:
- --QRTZ_LOCKS表結(jié)構(gòu)
- CREATE TABLE `QRTZ_LOCKS` (
- `LOCK_NAME` varchar(40) NOT NULL,
- PRIMARY KEY (`LOCK_NAME`)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
- --QRTZ_LOCKS記錄
- +-----------------+
- | LOCK_NAME |
- +-----------------+
- | CALENDAR_ACCESS |
- | JOB_ACCESS |
- | MISFIRE_ACCESS |
- | STATE_ACCESS |
- | TRIGGER_ACCESS |
- +-----------------+
注:此表結(jié)構(gòu)在2.2版本有新增字段,這里暫時不考慮。
可以看出QRTZ_LOCKS中有5條記錄,代表5把鎖,分別用于實現(xiàn)多個Quartz Node對Job、Trigger、Calendar訪問的同步控制。
關(guān)于行鎖的機制:
1、mysql > set autocommit=0; //先把mysql設(shè)置為不自動提交。
2、 select * from es_locks where lock_name = 'TRIGGER_ACCESS' for update ; //線程一通過for update 可以把這行鎖住
3、 select * from es_locks where lock_name = 'TRIGGER_ACCESS' for update ; //線程二通過for update 無法獲得鎖,線程等待。
4、commit; //線程一通過commit 釋放鎖
5、 //線程二可以訪問到數(shù)據(jù),線程不再等待。
所以,通過這個機制,一次只能有一個線程來操作 加鎖 - 操作 - 釋放鎖。 如果 操作 的時間過長的話,會帶來集群間的主線程等待。
數(shù)據(jù)庫行鎖是一種悲觀鎖,鎖表時其它線程無法查詢。
源碼中關(guān)于數(shù)據(jù)庫集群加鎖的方法有如下幾種:
1、executeInNonManagedTXLock方法的含義是自己管理事務(wù),不讓容器管理事務(wù)的加鎖方法。
- executeInNonManagedTXLock(
- String lockName,
- TransactionCallback<T> txCallback , final TransactionValidator<T> txValidator )
三個參數(shù)lockName的值是上面所說的TRIGGER_ACCESS,表示要加鎖的類型。
txCallback是加鎖后再回調(diào)的方法。
txValidator是驗證方法,一般為null
函數(shù)先執(zhí)行加鎖,再回調(diào)要操作的方法,然后再解鎖。
看一下源碼:
- if (lockName != null) {
- // If we aren't using db locks, then delay getting DB connection
- // until after acquiring the lock since it isn't needed.
- if (getLockHandler().requiresConnection()) {
- conn = getNonManagedTXConnection();
- }
-
- transOwner = getLockHandler().obtainLock(conn, lockName);
- }
-
- if (conn == null) {
- conn = getNonManagedTXConnection();
- }
-
- final T result = txCallback.execute(conn);
- try {
- commitConnection(conn);
- } catch (JobPersistenceException e) {
- rollbackConnection(conn);
- if (txValidator == null || !retryExecuteInNonManagedTXLock(lockName, new TransactionCallback<Boolean>() {
- @Override
- public Boolean execute(Connection conn) throws JobPersistenceException {
- return txValidator.validate(conn, result);
- }
- })) {
- throw e;
- }
- }
- Long sigTime = clearAndGetSignalSchedulingChangeOnTxCompletion();
- if(sigTime != null && sigTime >= 0) {
- signalSchedulingChangeImmediately(sigTime);
- }
-
- return result;
- } catch (JobPersistenceException e) {
- rollbackConnection(conn);
- throw e;
- } catch (RuntimeException e) {
rollbackConnection(conn);
throw new JobPersistenceException("Unexpected runtime exception: "
+ e.getMessage(), e);
} finally {
try {
releaseLock(lockName, transOwner);
} finally {
cleanupConnection(conn);
}
}
2、如果不是通過這種回調(diào)方法的加鎖,一般是:
getLockHandler().obtainLock
執(zhí)行
commitConnection(conn)
releaseLock
cleanupConnection
二、源碼分析鎖
目前代碼中行鎖只用到了STATE_ACCESS 和TRIGGER_ACCESS 這兩種。
1、TRIGGER_ACCESS
先了解一篇文章,通過源碼來分析quartz是如何通過加鎖來實現(xiàn)集群環(huán)境,觸發(fā)器狀態(tài)的一致性。
http://www.ahfyzs.com/content/14/0926/08/15077656_412418636.shtml
可以看到觸發(fā)器的操作主要用主線程StdScheduleThread來完成,不管是獲取需要觸發(fā)的30S內(nèi)的觸發(fā)器,還是觸發(fā)過程。select和update觸發(fā)器表時
都會先加鎖,后解鎖。如果數(shù)據(jù)庫資源競爭比較大的話,鎖會影響整個性能??梢钥紤]將任務(wù)信息放在分布式內(nèi)存,如redis上進行處理。數(shù)據(jù)庫只是定時從redis上load數(shù)據(jù)下來做統(tǒng)計。
參考:quartz詳解2:quartz由淺入深 查看第四章第1,2節(jié)
實現(xiàn)都在JobStoreSupport類
| 加鎖類型 |
加鎖方法 |
底層數(shù)據(jù)庫操作 |
備注 |
| executeInNonManagedTXLock |
acquireNextTrigger |
selectTriggerToAcquire
selectTrigger
selectJobDetail
insertFiredTrigger |
查詢需要點火的trigger
選擇需要執(zhí)行的trigger加入到fired_trigger表 |
| for執(zhí)行 triggerFired |
selectJobDetail
selectCalendar
updateFiredTrigger
triggerExists updateTrigger |
點火trigger
修改trigger狀態(tài)為可執(zhí)行狀態(tài)。 |
| recoverJobs |
updateTriggerStatesFromOtherStates
hasMisfiredTriggersInState doUpdateOfMisfiredTrigger
selectTriggersForRecoveringJobs
selectTriggersInState
deleteFiredTriggers |
非集群環(huán)境下重新執(zhí)行
failed與misfired的trigger |
| retryExecuteInNonManagedTXLock |
releaseAcquiredTrigger |
updateTriggerStateFromOtherState
deleteFiredTrigger |
異常情況下重新釋放trigger到初使狀態(tài)。 |
| triggeredJobComplete |
selectTriggerStatus
removeTrigger updateTriggerState
deleteFiredTrigger |
觸發(fā)JOB任務(wù)完成后的處理。 |
| obtainLock |
recoverMisfiredJobs |
hasMisfiredTriggersInState doUpdateOfMisfiredTrigger |
重新執(zhí)行misfired的trigger
可以在啟動時執(zhí)行,也可以由misfired線程定期執(zhí)行。 |
| clusterRecover |
selectInstancesFiredTriggerRecords
updateTriggerStatesForJobFromOtherState
storeTrigger
deleteFiredTriggers
selectFiredTriggerRecords
removeTrigger
deleteSchedulerState |
集群有結(jié)點faied,讓JOB能重新執(zhí)行。 |
executeInLock
數(shù)據(jù)庫集群里等同于
executeInNonManagedTXLock |
storeJobAndTrigger |
updateJobDetail insertJobDetail
triggerExists
selectJobDetail
updateTrigger insertTrigger |
保存JOB和TRIGGER配置 |
| storeJob |
|
保存JOB |
| removeJob |
|
刪除JOB |
| removeJobs |
|
批量刪除JOB |
| removeTriggers |
|
批量刪除triggers |
| storeJobsAndTriggers |
|
保存JOB和多個trigger配置 |
| removeTrigger |
|
刪除trigger |
| replaceTrigger |
|
替換trigger |
| storeCalendar |
|
保存定時日期 |
| removeCalendar |
|
刪除定時日期 |
| clearAllSchedulingData |
|
清除所有定時數(shù)據(jù) |
| pauseTrigger |
|
停止觸發(fā)器 |
| pauseJob |
|
停止任務(wù) |
| pauseJobs |
|
批量停止任務(wù) |
| resumeTrigger |
|
恢復(fù)觸發(fā)器 |
| resumeJob |
|
恢復(fù)任務(wù) |
| resumeJobs |
|
批量恢復(fù)任務(wù) |
| pauseTriggers |
|
批量停止觸發(fā)器 |
| resumeTriggers |
|
批量恢復(fù)觸發(fā)器 |
| pauseAll |
|
停止所有 |
| resumeAll |
|
恢復(fù)所有 |
---
2、STATE_TRIGGER
實現(xiàn)都在JobStoreSupport類
| 加鎖類型 |
加鎖方法 |
底層數(shù)據(jù)庫操作 |
備注 |
| obtainLock |
doCheckin |
clusterCheckIn |
判斷集群狀態(tài)
先用LOCK_STATE_ACCESS鎖集群狀態(tài)
再用LOCK_TRIGGER_ACCESS恢復(fù)集群運行 |
| |
|
|
---
|