一、問題背景
美團(tuán)CRM系統(tǒng)中每天有大量的后臺任務(wù)需要調(diào)度執(zhí)行,如構(gòu)建索引、統(tǒng)計報表、周期同步數(shù)據(jù)等等,要求任務(wù)調(diào)度系統(tǒng)具備高可用性、負(fù)載均衡特性,可以管理并監(jiān)控任務(wù)的執(zhí)行流程,以保證任務(wù)的正確執(zhí)行。
二、歷史方案
美團(tuán)CRM系統(tǒng)的任務(wù)調(diào)度模塊經(jīng)歷了以下歷史方案。
1. Crontab+SQL
每天晚上運(yùn)行定時任務(wù),通過SQL腳本+crontab方式執(zhí)行,例如,
#crm
0 2 * * * /xxx/mtcrm/shell/mtcrm_daily_stat.sql //每天凌晨2:00執(zhí)行統(tǒng)計
30 7 * * * /xxx/mtcrm/shell/mtcrm_data_fix.sql //每天早上7:30執(zhí)行數(shù)據(jù)修復(fù)
該方案存在以下問題:
- 直接訪問數(shù)據(jù)庫,各系統(tǒng)業(yè)務(wù)接口沒有重用。
- 完成復(fù)雜業(yè)務(wù)需求時,會引入過多中間表。
- 業(yè)務(wù)邏輯計算完全依賴SQL,增大數(shù)據(jù)庫壓力。
- 任務(wù)失敗無法自動恢復(fù)。
2. Python+SQL
采用python腳本(多數(shù)據(jù)源)+SQL方式執(zhí)行,例如,
def connectCRM():
return MySQLdb.Connection("host1", "uname", "xxx", "crm", 3306, charset="utf8")
def connectTemp():
return MySQLdb.Connection("host1", "uname", "xxx", "temp", 3306, charset="utf8")
該方案存在問題:
- 直接訪問數(shù)據(jù),需要理解各系統(tǒng)的數(shù)據(jù)結(jié)構(gòu),無法滿足動態(tài)任務(wù)問題,各系統(tǒng)業(yè)務(wù)接口沒有重用。
- 無負(fù)載均衡。
- 任務(wù)失敗無法恢復(fù)。
- 在JAVA語言開發(fā)中出現(xiàn)異構(gòu),且很難統(tǒng)一到自動部署系統(tǒng)中。
3. Spring+JDK Timer
該方案使用spring+JDK Timer方式,調(diào)用接口完成定時任務(wù),在分布式部署環(huán)境下,防止多個節(jié)點(diǎn)同時運(yùn)行任務(wù),需要寫死host,控制在一臺服務(wù)器上執(zhí)行task。
<bean id="accountStatusTaskScanner" class="xxx.crm.service.impl.AccountStatusTaskScanner" />
<task:scheduler id="taskScheduler" pool-size="5" />
<task:scheduled-tasks scheduler="taskScheduler">
<task:scheduled ref="accountStatusTaskScanner" method="execute" cron="0 0 1 * * ?" />
</task:scheduled-tasks>
該方案較方案1,2有很大改進(jìn),但仍存在以下問題:
- 步驟復(fù)雜、分散,任務(wù)量增大的情況下,很難擴(kuò)展
- 使用寫死服務(wù)器Host的方式執(zhí)行task,存在單點(diǎn)風(fēng)險,負(fù)載均衡手動完成。
- 應(yīng)用重啟,任務(wù)無法自動恢復(fù)。
CRM系統(tǒng)定時任務(wù)走過了很多彎路:定時任務(wù)多種實現(xiàn)方式,使配置和代碼分散在多處,難以維護(hù)和監(jiān)控;任務(wù)執(zhí)行過程沒有保證,沒有錯誤恢復(fù);任務(wù)執(zhí)行異常沒有反饋(郵件);沒有集群支持、負(fù)載均衡。CRM系統(tǒng)需要分布式的任務(wù)調(diào)度框架,統(tǒng)一解決問題,Java可以使用的任務(wù)調(diào)度框架有Quartz,Jcrontab,cron4j,我們選擇了Quartz。
三、為什么選擇Quartz
Quartz是Java領(lǐng)域最著名的開源任務(wù)調(diào)度工具。Quartz提供了極為廣泛的特性如持久化任務(wù),集群和分布式任務(wù)等,其特點(diǎn)如下:
- 完全由Java寫成,方便集成(Spring)
- 伸縮性
- 負(fù)載均衡
- 高可用性
四、Quartz集群部署實踐
CRM中Quartz與Spring結(jié)合使用,Spring通過提供org.springframework.scheduling.quartz下的封裝類對Quartz支持。
Quartz集群部署:
Quartz集群中的每個節(jié)點(diǎn)是一個獨(dú)立的Quartz應(yīng)用,它又管理著其他的節(jié)點(diǎn)。該集群需要分別對每個節(jié)點(diǎn)分別啟動或停止,不像應(yīng)用服務(wù)器的集群,獨(dú)立的Quartz節(jié)點(diǎn)并不與另一個節(jié)點(diǎn)或是管理節(jié)點(diǎn)通信。Quartz應(yīng)用是通過數(shù)據(jù)庫表來感知到另一應(yīng)用。只有使用持久的JobStore才能完成Quqrtz集群。
基于Spring的集群配置:
<!-- 調(diào)度工廠 -->
<bean id="quartzScheduler"
class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
<property name="dataSource" ref="dataSource" />
<property name="quartzProperties">
<props>
<prop key="org.quartz.scheduler.instanceName">CRMscheduler</prop>
<prop key="org.quartz.scheduler.instanceId">AUTO</prop>
<!-- 線程池配置 -->
<prop key="org.quartz.threadPool.class">org.quartz.simpl.SimpleThreadPool</prop>
<prop key="org.quartz.threadPool.threadCount">20</prop>
<prop key="org.quartz.threadPool.threadPriority">5</prop>
<!-- JobStore 配置 -->
<prop key="org.quartz.jobStore.class">org.quartz.impl.jdbcjobstore.JobStoreTX</prop>
<!-- 集群配置 -->
<prop key="org.quartz.jobStore.isClustered">true</prop>
<prop key="org.quartz.jobStore.clusterCheckinInterval">15000</prop>
<prop key="org.quartz.jobStore.maxMisfiresToHandleAtATime">1</prop>
<prop key="org.quartz.jobStore.misfireThreshold">120000</prop>
<prop key="org.quartz.jobStore.tablePrefix">QRTZ_</prop>
</props>
</property>
<property name="schedulerName" value="CRMscheduler" />
<!--必須的,QuartzScheduler 延時啟動,應(yīng)用啟動完后 QuartzScheduler 再啟動 -->
<property name="startupDelay" value="30" />
<property name="applicationContextSchedulerContextKey" value="applicationContextKey" />
<!--可選,QuartzScheduler 啟動時更新己存在的Job,這樣就不用每次修改targetObject后刪除qrtz_job_details表對應(yīng)記錄了 -->
<property name="overwriteExistingJobs" value="true" />
<!-- 設(shè)置自動啟動 -->
<property name="autoStartup" value="true" />
<!-- 注冊觸發(fā)器 -->
<property name="triggers">
<list>
<ref bean="userSyncScannerTrigger" />
......
</list>
</property>
<!-- 注冊jobDetail -->
<property name="jobDetails">
<list>
</list>
</property>
<property name="schedulerListeners">
<list>
<ref bean="quartzExceptionSchedulerListener" />
</list>
</property>
</bean>
org.quartz.jobStore.class屬性為JobStoreTX,將任務(wù)持久化到數(shù)據(jù)中。因為集群中節(jié)點(diǎn)依賴于數(shù)據(jù)庫來傳播Scheduler實例的狀態(tài),你只能在使用JDBC JobStore時應(yīng)用Quartz集群。
org.quartz.jobStore.isClustered屬性為true,通知Scheduler實例要它參與到一個集群當(dāng)中。
org.quartz.jobStore.clusterCheckinInterval屬性定義了Scheduler實例檢入到數(shù)據(jù)庫中的頻率(單位:毫秒)。Scheduler檢查是否其他的實例到了它們應(yīng)當(dāng)檢入的時候未檢入;這能指出一個失敗的Scheduler實例,且當(dāng)前 Scheduler會以此來接管任何執(zhí)行失敗并可恢復(fù)的Job。通過檢入操作,Scheduler 也會更新自身的狀態(tài)記錄。clusterChedkinInterval越小,Scheduler節(jié)點(diǎn)檢查失敗的Scheduler實例就越頻繁。默認(rèn)值是 15000 (即15 秒)。
其余參數(shù)在后文將會詳細(xì)介紹。
Quartz監(jiān)控
CRM后臺目前可以做到對Quartz實例的監(jiān)控、操作以及動態(tài)部署Trigger.
Triggers監(jiān)控:
JobDetails監(jiān)控:
五、Quartz集群原理分析
1. Quartz集群數(shù)據(jù)庫表
Quartz的集群部署方案在架構(gòu)上是分布式的,沒有負(fù)責(zé)集中管理的節(jié)點(diǎn),而是利用數(shù)據(jù)庫鎖的方式來實現(xiàn)集群環(huán)境下進(jìn)行并發(fā)控制。BTW,分布式部署時需要保證各個節(jié)點(diǎn)的系統(tǒng)時間一致。
Quartz數(shù)據(jù)庫核心表如下:
| Table Name |
Description |
| QRTZ_CALENDARS |
存儲Quartz的Calendar信息 |
| QRTZ_CRON_TRIGGERS |
存儲CronTrigger,包括Cron表達(dá)式和時區(qū)信息 |
| QRTZ_FIRED_TRIGGERS |
存儲與已觸發(fā)的Trigger相關(guān)的狀態(tài)信息,以及相聯(lián)Job的執(zhí)行信息 |
| QRTZ_PAUSED_TRIGGER_GRPS |
存儲已暫停的Trigger組的信息 |
| QRTZ_SCHEDULER_STATE |
存儲少量的有關(guān)Scheduler的狀態(tài)信息,和別的Scheduler實例 |
| QRTZ_LOCKS |
存儲程序的悲觀鎖的信息 |
| QRTZ_JOB_DETAILS |
存儲每一個已配置的Job的詳細(xì)信息 |
| QRTZ_JOB_LISTENERS |
存儲有關(guān)已配置的JobListener的信息 |
| QRTZ_SIMPLE_TRIGGERS |
存儲簡單的Trigger,包括重復(fù)次數(shù)、間隔、以及已觸的次數(shù) |
| QRTZ_BLOG_TRIGGERS |
Trigger作為Blob類型存儲 |
| QRTZ_TRIGGER_LISTENERS |
存儲已配置的TriggerListener的信息 |
| QRTZ_TRIGGERS |
存儲已配置的Trigger的信息 |
其中,QRTZ_LOCKS就是Quartz集群實現(xiàn)同步機(jī)制的行鎖表,其表結(jié)構(gòu)如下:
--QRTZ_LOCKS表結(jié)構(gòu)
CREATE TABLE `QRTZ_LOCKS` (
`LOCK_NAME` varchar(40) NOT NULL,
PRIMARY KEY (`LOCK_NAME`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
--QRTZ_LOCKS記錄
+-----------------+
| LOCK_NAME |
+-----------------+
| CALENDAR_ACCESS |
| JOB_ACCESS |
| MISFIRE_ACCESS |
| STATE_ACCESS |
| TRIGGER_ACCESS |
+-----------------+
可以看出QRTZ_LOCKS中有5條記錄,代表5把鎖,分別用于實現(xiàn)多個Quartz Node對Job、Trigger、Calendar訪問的同步控制。
2. Quartz線程模型
在Quartz中有兩類線程:Scheduler調(diào)度線程和任務(wù)執(zhí)行線程。 任務(wù)執(zhí)行線程 :Quartz不會在主線程(QuartzSchedulerThread)中處理用戶的Job。Quartz把線程管理的職責(zé)委托給ThreadPool,一般的設(shè)置使用SimpleThreadPool。SimpleThreadPool創(chuàng)建了一定數(shù)量的WorkerThread實例來使得Job能夠在線程中進(jìn)行處理。WorkerThread是定義在SimpleThreadPool類中的內(nèi)部類,它實質(zhì)上就是一個線程。例如,CRM中配置如下:
<!-- 線程池配置 -->
<prop key="org.quartz.threadPool.class">org.quartz.simpl.SimpleThreadPool</prop>
<prop key="org.quartz.threadPool.threadCount">20</prop>
<prop key="org.quartz.threadPool.threadPriority">5</prop>
QuartzSchedulerThread調(diào)度主線程 :QuartzScheduler被創(chuàng)建時創(chuàng)建一個QuartzSchedulerThread實例。
3. 集群源碼分析
Quartz究竟是如何保證集群情況下trgger處理的信息同步?
下面跟著源碼一步一步分析,QuartzSchedulerThread包含有決定何時下一個Job將被觸發(fā)的處理循環(huán),主要邏輯在其run()方法中:
public void run() {
boolean lastAcquireFailed = false;
while (!halted.get()) {
......
int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
if(availThreadCount > 0) {
......
//調(diào)度器在trigger隊列中尋找30秒內(nèi)一定數(shù)目的trigger(需要保證集群節(jié)點(diǎn)的系統(tǒng)時間一致)
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
......
//觸發(fā)trigger
List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
......
//釋放trigger
for (int i = 0; i < triggers.size(); i++) {
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
}
}
}
由此可知,QuartzScheduler調(diào)度線程不斷獲取trigger,觸發(fā)trigger,釋放trigger。下面分析trigger的獲取過程,qsRsrcs.getJobStore()返回對象是JobStore,集群環(huán)境配置如下:
<!-- JobStore 配置 -->
<prop key="org.quartz.jobStore.class">org.quartz.impl.jdbcjobstore.JobStoreTX</prop>
JobStoreTX繼承自JobStoreSupport,而JobStoreSupport的acquireNextTriggers、triggersFired、releaseAcquiredTrigger方法負(fù)責(zé)具體trigger相關(guān)操作,都必須獲得TRIGGER_ACCESS鎖。核心邏輯在executeInNonManagedTXLock方法中:
protected <T> T executeInNonManagedTXLock(
String lockName,
TransactionCallback<T> txCallback, final TransactionValidator<T> txValidator) throws JobPersistenceException {
boolean transOwner = false;
Connection conn = null;
try {
if (lockName != null) {
if (getLockHandler().requiresConnection()) {
conn = getNonManagedTXConnection();
}
//獲取鎖
transOwner = getLockHandler().obtainLock(conn, lockName);
}
if (conn == null) {
conn = getNonManagedTXConnection();
}
final T result = txCallback.execute(conn);
try {
commitConnection(conn);
} catch (JobPersistenceException e) {
rollbackConnection(conn);
if (txValidator == null || !retryExecuteInNonManagedTXLock(lockName, new TransactionCallback<Boolean>() {
@Override
public Boolean execute(Connection conn) throws JobPersistenceException {
return txValidator.validate(conn, result);
}
})) {
throw e;
}
}
Long sigTime = clearAndGetSignalSchedulingChangeOnTxCompletion();
if(sigTime != null && sigTime >= 0) {
signalSchedulingChangeImmediately(sigTime);
}
return result;
} catch (JobPersistenceException e) {
rollbackConnection(conn);
throw e;
} catch (RuntimeException e) {
rollbackConnection(conn);
throw new JobPersistenceException("Unexpected runtime exception: "
+ e.getMessage(), e);
} finally {
try {
releaseLock(lockName, transOwner); //釋放鎖
} finally {
cleanupConnection(conn);
}
}
}
由上代碼可知Quartz集群基于數(shù)據(jù)庫鎖的同步操作流程如下圖所示:
一個調(diào)度器實例在執(zhí)行涉及到分布式問題的數(shù)據(jù)庫操作前,首先要獲取QUARTZ_LOCKS表中對應(yīng)的行級鎖,獲取鎖后即可執(zhí)行其他表中的數(shù)據(jù)庫操作,隨著操作事務(wù)的提交,行級鎖被釋放,供其他調(diào)度實例獲取。集群中的每一個調(diào)度器實例都遵循這樣一種嚴(yán)格的操作規(guī)程。
getLockHandler()方法返回的對象類型是Semaphore,獲取鎖和釋放鎖的具體邏輯由該對象維護(hù)
public interface Semaphore {
boolean obtainLock(Connection conn, String lockName) throws LockException;
void releaseLock(String lockName) throws LockException;
boolean requiresConnection();
}
該接口的實現(xiàn)類完成具體操作鎖的邏輯,在JobStoreSupport的初始化方法中注入的Semaphore具體類型是StdRowLockSemaphore
setLockHandler(new StdRowLockSemaphore(getTablePrefix(), getInstanceName(), getSelectWithLockSQL()));
StdRowLockSemaphore的源碼如下所示:
public class StdRowLockSemaphore extends DBSemaphore {
//鎖定SQL語句
public static final String SELECT_FOR_LOCK = "SELECT * FROM "
+ TABLE_PREFIX_SUBST + TABLE_LOCKS + " WHERE " + COL_LOCK_NAME
+ " = ? FOR UPDATE";
public static final String INSERT_LOCK = "INSERT INTO " + TABLE_PREFIX_SUBST
+ TABLE_LOCKS + "(" + COL_SCHEDULER_NAME + ", "
+ COL_LOCK_NAME + ") VALUES (" + SCHED_NAME_SUBST + ", ?)";
//指定鎖定SQL
protected void executeSQL(Connection conn, String lockName, String expandedSQL) throws LockException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
ps = conn.prepareStatement(expandedSQL);
ps.setString(1, lockName);
......
rs = ps.executeQuery();
if (!rs.next()) {
throw new SQLException(Util.rtp(
"No row exists in table " + TABLE_PREFIX_SUBST +
TABLE_LOCKS + " for lock named: " + lockName, getTablePrefix()));
}
} catch (SQLException sqle) {
} finally {
...... //release resources
}
}
}
//獲取QRTZ_LOCKS行級鎖
public boolean obtainLock(Connection conn, String lockName) throws LockException {
lockName = lockName.intern();
if (!isLockOwner(conn, lockName)) {
executeSQL(conn, lockName, expandedSQL);
getThreadLocks().add(lockName);
}
return true;
}
//釋放QRTZ_LOCKS行級鎖
public void releaseLock(Connection conn, String lockName) {
lockName = lockName.intern();
if (isLockOwner(conn, lockName)) {
getThreadLocks().remove(lockName);
}
......
}
至此,總結(jié)一下Quartz集群同步機(jī)制:每當(dāng)要進(jìn)行與某種業(yè)務(wù)相關(guān)的數(shù)據(jù)庫操作時,先去QRTZ_LOCKS表中查詢操作相關(guān)的業(yè)務(wù)對象所需要的鎖,在select語句之后加for update來實現(xiàn)。例如,TRIGGER_ACCESS表示對任務(wù)觸發(fā)器相關(guān)的信息進(jìn)行修改、刪除操作時所需要獲得的鎖。這時,執(zhí)行查詢這個表數(shù)據(jù)的SQL形如:
select * from QRTZ_LOCKS t where t.lock_name='TRIGGER_ACCESS' for update
當(dāng)一個線程使用上述的SQL對表中的數(shù)據(jù)執(zhí)行查詢操作時,若查詢結(jié)果中包含相關(guān)的行,數(shù)據(jù)庫就對該行進(jìn)行ROW LOCK;若此時,另外一個線程使用相同的SQL對表的數(shù)據(jù)進(jìn)行查詢,由于查詢出的數(shù)據(jù)行已經(jīng)被數(shù)據(jù)庫鎖住了,此時這個線程就只能等待,直到擁有該行鎖的線程完成了相關(guān)的業(yè)務(wù)操作,執(zhí)行了commit動作后,數(shù)據(jù)庫才會釋放了相關(guān)行的鎖,這個線程才能繼續(xù)執(zhí)行。
通過這樣的機(jī)制,在集群環(huán)境下,結(jié)合悲觀鎖的機(jī)制就可以防止一個線程對數(shù)據(jù)庫數(shù)據(jù)的操作的結(jié)果被另外一個線程所覆蓋,從而可以避免一些難以覺察的錯誤發(fā)生。當(dāng)然,達(dá)到這種效果的前提是需要把Connection設(shè)置為手動提交,即autoCommit為false。
六、參考資料
|