小男孩‘自慰网亚洲一区二区,亚洲一级在线播放毛片,亚洲中文字幕av每天更新,黄aⅴ永久免费无码,91成人午夜在线精品,色网站免费在线观看,亚洲欧洲wwwww在线观看

分享

Quartz應(yīng)用與集群原理分析

 liuguichuan 2015-05-24

一、問題背景

美團(tuán)CRM系統(tǒng)中每天有大量的后臺任務(wù)需要調(diào)度執(zhí)行,如構(gòu)建索引、統(tǒng)計報表、周期同步數(shù)據(jù)等等,要求任務(wù)調(diào)度系統(tǒng)具備高可用性、負(fù)載均衡特性,可以管理并監(jiān)控任務(wù)的執(zhí)行流程,以保證任務(wù)的正確執(zhí)行。

二、歷史方案

美團(tuán)CRM系統(tǒng)的任務(wù)調(diào)度模塊經(jīng)歷了以下歷史方案。

1. Crontab+SQL

每天晚上運(yùn)行定時任務(wù),通過SQL腳本+crontab方式執(zhí)行,例如,

#crm
0 2 * * * /xxx/mtcrm/shell/mtcrm_daily_stat.sql    //每天凌晨2:00執(zhí)行統(tǒng)計
30 7 * * * /xxx/mtcrm/shell/mtcrm_data_fix.sql     //每天早上7:30執(zhí)行數(shù)據(jù)修復(fù)

該方案存在以下問題:

  • 直接訪問數(shù)據(jù)庫,各系統(tǒng)業(yè)務(wù)接口沒有重用。
  • 完成復(fù)雜業(yè)務(wù)需求時,會引入過多中間表。
  • 業(yè)務(wù)邏輯計算完全依賴SQL,增大數(shù)據(jù)庫壓力。
  • 任務(wù)失敗無法自動恢復(fù)。

2. Python+SQL

采用python腳本(多數(shù)據(jù)源)+SQL方式執(zhí)行,例如,

 def connectCRM():
  return MySQLdb.Connection("host1", "uname", "xxx", "crm", 3306, charset="utf8")

 def connectTemp():
  return MySQLdb.Connection("host1", "uname", "xxx", "temp", 3306, charset="utf8")

該方案存在問題:

  • 直接訪問數(shù)據(jù),需要理解各系統(tǒng)的數(shù)據(jù)結(jié)構(gòu),無法滿足動態(tài)任務(wù)問題,各系統(tǒng)業(yè)務(wù)接口沒有重用。
  • 無負(fù)載均衡。
  • 任務(wù)失敗無法恢復(fù)。
  • 在JAVA語言開發(fā)中出現(xiàn)異構(gòu),且很難統(tǒng)一到自動部署系統(tǒng)中。

3. Spring+JDK Timer

該方案使用spring+JDK Timer方式,調(diào)用接口完成定時任務(wù),在分布式部署環(huán)境下,防止多個節(jié)點(diǎn)同時運(yùn)行任務(wù),需要寫死host,控制在一臺服務(wù)器上執(zhí)行task。

<bean id="accountStatusTaskScanner"  class="xxx.crm.service.impl.AccountStatusTaskScanner" />
    <task:scheduler id="taskScheduler" pool-size="5" />
    <task:scheduled-tasks scheduler="taskScheduler">
    <task:scheduled ref="accountStatusTaskScanner" method="execute" cron="0 0 1 * * ?" />
</task:scheduled-tasks>

該方案較方案1,2有很大改進(jìn),但仍存在以下問題:

  • 步驟復(fù)雜、分散,任務(wù)量增大的情況下,很難擴(kuò)展
  • 使用寫死服務(wù)器Host的方式執(zhí)行task,存在單點(diǎn)風(fēng)險,負(fù)載均衡手動完成。
  • 應(yīng)用重啟,任務(wù)無法自動恢復(fù)。

CRM系統(tǒng)定時任務(wù)走過了很多彎路:定時任務(wù)多種實現(xiàn)方式,使配置和代碼分散在多處,難以維護(hù)和監(jiān)控;任務(wù)執(zhí)行過程沒有保證,沒有錯誤恢復(fù);任務(wù)執(zhí)行異常沒有反饋(郵件);沒有集群支持、負(fù)載均衡。CRM系統(tǒng)需要分布式的任務(wù)調(diào)度框架,統(tǒng)一解決問題,Java可以使用的任務(wù)調(diào)度框架有Quartz,Jcrontab,cron4j,我們選擇了Quartz。

三、為什么選擇Quartz

Quartz是Java領(lǐng)域最著名的開源任務(wù)調(diào)度工具。Quartz提供了極為廣泛的特性如持久化任務(wù),集群和分布式任務(wù)等,其特點(diǎn)如下:

  • 完全由Java寫成,方便集成(Spring)
  • 伸縮性
  • 負(fù)載均衡
  • 高可用性

四、Quartz集群部署實踐

CRM中Quartz與Spring結(jié)合使用,Spring通過提供org.springframework.scheduling.quartz下的封裝類對Quartz支持。

Quartz集群部署:

Quartz集群部署

Quartz集群中的每個節(jié)點(diǎn)是一個獨(dú)立的Quartz應(yīng)用,它又管理著其他的節(jié)點(diǎn)。該集群需要分別對每個節(jié)點(diǎn)分別啟動或停止,不像應(yīng)用服務(wù)器的集群,獨(dú)立的Quartz節(jié)點(diǎn)并不與另一個節(jié)點(diǎn)或是管理節(jié)點(diǎn)通信。Quartz應(yīng)用是通過數(shù)據(jù)庫表來感知到另一應(yīng)用。只有使用持久的JobStore才能完成Quqrtz集群。

基于Spring的集群配置:

<!-- 調(diào)度工廠 -->
<bean id="quartzScheduler"
  class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
  <property name="dataSource" ref="dataSource" />
  <property name="quartzProperties">
    <props>
      <prop key="org.quartz.scheduler.instanceName">CRMscheduler</prop>
      <prop key="org.quartz.scheduler.instanceId">AUTO</prop>
      <!-- 線程池配置 -->
      <prop key="org.quartz.threadPool.class">org.quartz.simpl.SimpleThreadPool</prop>
      <prop key="org.quartz.threadPool.threadCount">20</prop>
      <prop key="org.quartz.threadPool.threadPriority">5</prop>
      <!-- JobStore 配置 -->
      <prop key="org.quartz.jobStore.class">org.quartz.impl.jdbcjobstore.JobStoreTX</prop>

      <!-- 集群配置 -->
      <prop key="org.quartz.jobStore.isClustered">true</prop>
      <prop key="org.quartz.jobStore.clusterCheckinInterval">15000</prop>
      <prop key="org.quartz.jobStore.maxMisfiresToHandleAtATime">1</prop>

      <prop key="org.quartz.jobStore.misfireThreshold">120000</prop>

      <prop key="org.quartz.jobStore.tablePrefix">QRTZ_</prop>
    </props>

  </property>

  <property name="schedulerName" value="CRMscheduler" />

  <!--必須的,QuartzScheduler 延時啟動,應(yīng)用啟動完后 QuartzScheduler 再啟動 -->
  <property name="startupDelay" value="30" />

  <property name="applicationContextSchedulerContextKey" value="applicationContextKey" />

  <!--可選,QuartzScheduler 啟動時更新己存在的Job,這樣就不用每次修改targetObject后刪除qrtz_job_details表對應(yīng)記錄了 -->
  <property name="overwriteExistingJobs" value="true" />

  <!-- 設(shè)置自動啟動 -->
  <property name="autoStartup" value="true" />

  <!-- 注冊觸發(fā)器 -->
  <property name="triggers">
    <list>
      <ref bean="userSyncScannerTrigger" />
           ......
    </list>
  </property>

  <!-- 注冊jobDetail -->
  <property name="jobDetails">
    <list>
    </list>
  </property>

  <property name="schedulerListeners">
    <list>
      <ref bean="quartzExceptionSchedulerListener" />
    </list>
  </property>
</bean>


org.quartz.jobStore.class屬性為JobStoreTX,將任務(wù)持久化到數(shù)據(jù)中。因為集群中節(jié)點(diǎn)依賴于數(shù)據(jù)庫來傳播Scheduler實例的狀態(tài),你只能在使用JDBC JobStore時應(yīng)用Quartz集群。

org.quartz.jobStore.isClustered屬性為true,通知Scheduler實例要它參與到一個集群當(dāng)中。

org.quartz.jobStore.clusterCheckinInterval屬性定義了Scheduler實例檢入到數(shù)據(jù)庫中的頻率(單位:毫秒)。Scheduler檢查是否其他的實例到了它們應(yīng)當(dāng)檢入的時候未檢入;這能指出一個失敗的Scheduler實例,且當(dāng)前 Scheduler會以此來接管任何執(zhí)行失敗并可恢復(fù)的Job。通過檢入操作,Scheduler 也會更新自身的狀態(tài)記錄。clusterChedkinInterval越小,Scheduler節(jié)點(diǎn)檢查失敗的Scheduler實例就越頻繁。默認(rèn)值是 15000 (即15 秒)。

其余參數(shù)在后文將會詳細(xì)介紹。

Quartz監(jiān)控

CRM后臺目前可以做到對Quartz實例的監(jiān)控、操作以及動態(tài)部署Trigger.

Triggers監(jiān)控:

Quartz Triggers

JobDetails監(jiān)控:

Quartz Jobs

五、Quartz集群原理分析

1. Quartz集群數(shù)據(jù)庫表

Quartz的集群部署方案在架構(gòu)上是分布式的,沒有負(fù)責(zé)集中管理的節(jié)點(diǎn),而是利用數(shù)據(jù)庫鎖的方式來實現(xiàn)集群環(huán)境下進(jìn)行并發(fā)控制。BTW,分布式部署時需要保證各個節(jié)點(diǎn)的系統(tǒng)時間一致。

Quartz數(shù)據(jù)庫核心表如下:

Table Name Description
QRTZ_CALENDARS 存儲Quartz的Calendar信息
QRTZ_CRON_TRIGGERS 存儲CronTrigger,包括Cron表達(dá)式和時區(qū)信息
QRTZ_FIRED_TRIGGERS 存儲與已觸發(fā)的Trigger相關(guān)的狀態(tài)信息,以及相聯(lián)Job的執(zhí)行信息
QRTZ_PAUSED_TRIGGER_GRPS 存儲已暫停的Trigger組的信息
QRTZ_SCHEDULER_STATE 存儲少量的有關(guān)Scheduler的狀態(tài)信息,和別的Scheduler實例
QRTZ_LOCKS 存儲程序的悲觀鎖的信息
QRTZ_JOB_DETAILS 存儲每一個已配置的Job的詳細(xì)信息
QRTZ_JOB_LISTENERS 存儲有關(guān)已配置的JobListener的信息
QRTZ_SIMPLE_TRIGGERS 存儲簡單的Trigger,包括重復(fù)次數(shù)、間隔、以及已觸的次數(shù)
QRTZ_BLOG_TRIGGERS Trigger作為Blob類型存儲
QRTZ_TRIGGER_LISTENERS 存儲已配置的TriggerListener的信息
QRTZ_TRIGGERS 存儲已配置的Trigger的信息

其中,QRTZ_LOCKS就是Quartz集群實現(xiàn)同步機(jī)制的行鎖表,其表結(jié)構(gòu)如下:

--QRTZ_LOCKS表結(jié)構(gòu)
CREATE TABLE `QRTZ_LOCKS` (
  `LOCK_NAME` varchar(40) NOT NULL,
   PRIMARY KEY (`LOCK_NAME`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

--QRTZ_LOCKS記錄
+-----------------+ 
| LOCK_NAME       |
+-----------------+ 
| CALENDAR_ACCESS |
| JOB_ACCESS      |
| MISFIRE_ACCESS  |
| STATE_ACCESS    |
| TRIGGER_ACCESS  |
+-----------------+

可以看出QRTZ_LOCKS中有5條記錄,代表5把鎖,分別用于實現(xiàn)多個Quartz Node對Job、Trigger、Calendar訪問的同步控制。

2. Quartz線程模型

在Quartz中有兩類線程:Scheduler調(diào)度線程和任務(wù)執(zhí)行線程。 任務(wù)執(zhí)行線程 :Quartz不會在主線程(QuartzSchedulerThread)中處理用戶的Job。Quartz把線程管理的職責(zé)委托給ThreadPool,一般的設(shè)置使用SimpleThreadPool。SimpleThreadPool創(chuàng)建了一定數(shù)量的WorkerThread實例來使得Job能夠在線程中進(jìn)行處理。WorkerThread是定義在SimpleThreadPool類中的內(nèi)部類,它實質(zhì)上就是一個線程。例如,CRM中配置如下:

 <!-- 線程池配置 -->
 <prop key="org.quartz.threadPool.class">org.quartz.simpl.SimpleThreadPool</prop>
 <prop key="org.quartz.threadPool.threadCount">20</prop>
 <prop key="org.quartz.threadPool.threadPriority">5</prop>

QuartzSchedulerThread調(diào)度主線程 :QuartzScheduler被創(chuàng)建時創(chuàng)建一個QuartzSchedulerThread實例。

3. 集群源碼分析

Quartz究竟是如何保證集群情況下trgger處理的信息同步?

下面跟著源碼一步一步分析,QuartzSchedulerThread包含有決定何時下一個Job將被觸發(fā)的處理循環(huán),主要邏輯在其run()方法中:

public void run() {
   boolean lastAcquireFailed = false;
   while (!halted.get()) {
  ......

  int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
  if(availThreadCount > 0) { 

  ......

  //調(diào)度器在trigger隊列中尋找30秒內(nèi)一定數(shù)目的trigger(需要保證集群節(jié)點(diǎn)的系統(tǒng)時間一致)
  triggers = qsRsrcs.getJobStore().acquireNextTriggers(
             now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());

  ......

  //觸發(fā)trigger
  List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);

  ......

  //釋放trigger
  for (int i = 0; i < triggers.size(); i++) {
      qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
  }

   }			 
}

由此可知,QuartzScheduler調(diào)度線程不斷獲取trigger,觸發(fā)trigger,釋放trigger。下面分析trigger的獲取過程,qsRsrcs.getJobStore()返回對象是JobStore,集群環(huán)境配置如下:

<!-- JobStore 配置 -->
<prop key="org.quartz.jobStore.class">org.quartz.impl.jdbcjobstore.JobStoreTX</prop>

JobStoreTX繼承自JobStoreSupport,而JobStoreSupport的acquireNextTriggers、triggersFired、releaseAcquiredTrigger方法負(fù)責(zé)具體trigger相關(guān)操作,都必須獲得TRIGGER_ACCESS鎖。核心邏輯在executeInNonManagedTXLock方法中:

protected <T> T executeInNonManagedTXLock(
    String lockName, 
    TransactionCallback<T> txCallback, final TransactionValidator<T> txValidator) throws JobPersistenceException {
  boolean transOwner = false;
  Connection conn = null;
  try {
    if (lockName != null) {
      if (getLockHandler().requiresConnection()) {
        conn = getNonManagedTXConnection();
      }

      //獲取鎖
      transOwner = getLockHandler().obtainLock(conn, lockName);
    }

    if (conn == null) {
      conn = getNonManagedTXConnection();
    }

    final T result = txCallback.execute(conn);
    try {
      commitConnection(conn);
    } catch (JobPersistenceException e) {
      rollbackConnection(conn);
      if (txValidator == null || !retryExecuteInNonManagedTXLock(lockName, new TransactionCallback<Boolean>() {
        @Override
        public Boolean execute(Connection conn) throws JobPersistenceException {
          return txValidator.validate(conn, result);
        }
      })) {
        throw e;
      }
    }

    Long sigTime = clearAndGetSignalSchedulingChangeOnTxCompletion();
    if(sigTime != null && sigTime >= 0) {
      signalSchedulingChangeImmediately(sigTime);
    }

    return result;
  } catch (JobPersistenceException e) {
    rollbackConnection(conn);
    throw e;
  } catch (RuntimeException e) {
    rollbackConnection(conn);
    throw new JobPersistenceException("Unexpected runtime exception: "
        + e.getMessage(), e);
  } finally {
    try {
      releaseLock(lockName, transOwner);	  //釋放鎖
    } finally {
      cleanupConnection(conn);
    }
  }
}

由上代碼可知Quartz集群基于數(shù)據(jù)庫鎖的同步操作流程如下圖所示:

Quartz集群基于鎖的同步方案

一個調(diào)度器實例在執(zhí)行涉及到分布式問題的數(shù)據(jù)庫操作前,首先要獲取QUARTZ_LOCKS表中對應(yīng)的行級鎖,獲取鎖后即可執(zhí)行其他表中的數(shù)據(jù)庫操作,隨著操作事務(wù)的提交,行級鎖被釋放,供其他調(diào)度實例獲取。集群中的每一個調(diào)度器實例都遵循這樣一種嚴(yán)格的操作規(guī)程。

getLockHandler()方法返回的對象類型是Semaphore,獲取鎖和釋放鎖的具體邏輯由該對象維護(hù)

public interface Semaphore {

     boolean obtainLock(Connection conn, String lockName) throws LockException;

     void releaseLock(String lockName) throws LockException;

     boolean requiresConnection();
}

該接口的實現(xiàn)類完成具體操作鎖的邏輯,在JobStoreSupport的初始化方法中注入的Semaphore具體類型是StdRowLockSemaphore

setLockHandler(new StdRowLockSemaphore(getTablePrefix(), getInstanceName(), getSelectWithLockSQL()));

StdRowLockSemaphore的源碼如下所示:

public class StdRowLockSemaphore extends DBSemaphore {
//鎖定SQL語句
public static final String SELECT_FOR_LOCK = "SELECT * FROM "
    + TABLE_PREFIX_SUBST + TABLE_LOCKS + " WHERE " + COL_LOCK_NAME
    + " = ? FOR UPDATE";

public static final String INSERT_LOCK = "INSERT INTO " + TABLE_PREFIX_SUBST 
    + TABLE_LOCKS + "(" + COL_SCHEDULER_NAME + ", " 
    + COL_LOCK_NAME + ") VALUES (" + SCHED_NAME_SUBST + ", ?)"; 

//指定鎖定SQL
protected void executeSQL(Connection conn, String lockName, String expandedSQL) throws LockException {
  PreparedStatement ps = null;
  ResultSet rs = null;
  try {
    ps = conn.prepareStatement(expandedSQL);
    ps.setString(1, lockName);
    ......
    rs = ps.executeQuery();
    if (!rs.next()) {
      throw new SQLException(Util.rtp(
        "No row exists in table " + TABLE_PREFIX_SUBST +
        TABLE_LOCKS + " for lock named: " + lockName, getTablePrefix()));
    }
  } catch (SQLException sqle) {

  } finally {
    ...... //release resources
  }
  }
}

//獲取QRTZ_LOCKS行級鎖
public boolean obtainLock(Connection conn, String lockName) throws LockException {
  lockName = lockName.intern();

  if (!isLockOwner(conn, lockName)) {
    executeSQL(conn, lockName, expandedSQL);

    getThreadLocks().add(lockName);
  }
  return true;
}

//釋放QRTZ_LOCKS行級鎖
public void releaseLock(Connection conn, String lockName) {
  lockName = lockName.intern();

  if (isLockOwner(conn, lockName)) {
    getThreadLocks().remove(lockName);
  }
  ......
}

至此,總結(jié)一下Quartz集群同步機(jī)制:每當(dāng)要進(jìn)行與某種業(yè)務(wù)相關(guān)的數(shù)據(jù)庫操作時,先去QRTZ_LOCKS表中查詢操作相關(guān)的業(yè)務(wù)對象所需要的鎖,在select語句之后加for update來實現(xiàn)。例如,TRIGGER_ACCESS表示對任務(wù)觸發(fā)器相關(guān)的信息進(jìn)行修改、刪除操作時所需要獲得的鎖。這時,執(zhí)行查詢這個表數(shù)據(jù)的SQL形如:

select * from QRTZ_LOCKS t where t.lock_name='TRIGGER_ACCESS' for update

當(dāng)一個線程使用上述的SQL對表中的數(shù)據(jù)執(zhí)行查詢操作時,若查詢結(jié)果中包含相關(guān)的行,數(shù)據(jù)庫就對該行進(jìn)行ROW LOCK;若此時,另外一個線程使用相同的SQL對表的數(shù)據(jù)進(jìn)行查詢,由于查詢出的數(shù)據(jù)行已經(jīng)被數(shù)據(jù)庫鎖住了,此時這個線程就只能等待,直到擁有該行鎖的線程完成了相關(guān)的業(yè)務(wù)操作,執(zhí)行了commit動作后,數(shù)據(jù)庫才會釋放了相關(guān)行的鎖,這個線程才能繼續(xù)執(zhí)行。

通過這樣的機(jī)制,在集群環(huán)境下,結(jié)合悲觀鎖的機(jī)制就可以防止一個線程對數(shù)據(jù)庫數(shù)據(jù)的操作的結(jié)果被另外一個線程所覆蓋,從而可以避免一些難以覺察的錯誤發(fā)生。當(dāng)然,達(dá)到這種效果的前提是需要把Connection設(shè)置為手動提交,即autoCommit為false。

六、參考資料

    本站是提供個人知識管理的網(wǎng)絡(luò)存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點(diǎn)擊一鍵舉報。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多