0 引言context是Go中廣泛使用的程序包,由Google官方開發(fā),在1.7版本引入。它用來簡化在多個go routine傳遞上下文數(shù)據(jù)、(手動/超時)中止routine樹等操作,比如,官方http包使用context傳遞請求的上下文數(shù)據(jù),gRpc使用context來終止某個請求產(chǎn)生的routine樹。由于它使用簡單,現(xiàn)在基本成了編寫go基礎(chǔ)庫的通用規(guī)范。筆者在使用context上有一些經(jīng)驗(yàn),遂分享下。
本文主要談?wù)勔韵聨讉€方面的內(nèi)容: - context的使用。
- context實(shí)現(xiàn)原理,哪些是需要注意的地方。
- 在實(shí)踐中遇到的問題,分析問題產(chǎn)生的原因。
1 使用1.1 核心接口Contexttype Context interface {
// Deadline returns the time when work done on behalf of this context
// should be canceled. Deadline returns ok==false when no deadline is
// set.
Deadline() (deadline time.Time, ok bool)
// Done returns a channel that's closed when work done on behalf of this
// context should be canceled.
Done() <-chan struct{}
// Err returns a non-nil error value after Done is closed.
Err() error
// Value returns the value associated with this context for key.
Value(key interface{}) interface{}
}
簡單介紹一下其中的方法: - Done會返回一個channel,當(dāng)該context被取消的時候,該channel會被關(guān)閉,同時對應(yīng)的使用該context的routine也應(yīng)該結(jié)束并返回。 - Context中的方法是協(xié)程安全的,這也就代表了在父routine中創(chuàng)建的context,可以傳遞給任意數(shù)量的routine并讓他們同時訪問。 - Deadline會返回一個超時時間,routine獲得了超時時間后,可以對某些io操作設(shè)定超時時間。 - Value可以讓routine共享一些數(shù)據(jù),當(dāng)然獲得數(shù)據(jù)是協(xié)程安全的。 在請求處理的過程中,會調(diào)用各層的函數(shù),每層的函數(shù)會創(chuàng)建自己的routine,是一個routine樹。所以,context也應(yīng)該反映并實(shí)現(xiàn)成一棵樹。 要創(chuàng)建context樹,第一步是要有一個根結(jié)點(diǎn)。context.Background函數(shù)的返回值是一個空的context,經(jīng)常作為樹的根結(jié)點(diǎn),它一般由接收請求的第一個routine創(chuàng)建,不能被取消、沒有值、也沒有過期時間。 func Background() Context
之后該怎么創(chuàng)建其它的子孫節(jié)點(diǎn)呢?context包為我們提供了以下函數(shù): func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
func WithValue(parent Context, key interface{}, val interface{}) Context
這四個函數(shù)的第一個參數(shù)都是父context,返回一個Context類型的值,這樣就層層創(chuàng)建出不同的節(jié)點(diǎn)。子節(jié)點(diǎn)是從復(fù)制父節(jié)點(diǎn)得到的,并且根據(jù)接收的函數(shù)參數(shù)保存子節(jié)點(diǎn)的一些狀態(tài)值,然后就可以將它傳遞給下層的routine了。 WithCancel函數(shù),返回一個額外的CancelFunc函數(shù)類型變量,該函數(shù)類型的定義為:
調(diào)用CancelFunc對象將撤銷對應(yīng)的Context對象,這樣父結(jié)點(diǎn)的所在的環(huán)境中,獲得了撤銷子節(jié)點(diǎn)context的權(quán)利,當(dāng)觸發(fā)某些條件時,可以調(diào)用CancelFunc對象來終止子結(jié)點(diǎn)樹的所有routine。在子節(jié)點(diǎn)的routine中,需要用類似下面的代碼來判斷何時退出routine: select {
case <-cxt.Done():
// do some cleaning and return
}
根據(jù)cxt.Done()判斷是否結(jié)束。當(dāng)頂層的Request請求處理結(jié)束,或者外部取消了這次請求,就可以cancel掉頂層context,從而使整個請求的routine樹得以退出。 WithDeadline和WithTimeout比WithCancel多了一個時間參數(shù),它指示context存活的最長時間。如果超過了過期時間,會自動撤銷它的子context。所以context的生命期是由父context的routine和deadline共同決定的。
WithValue返回parent的一個副本,該副本保存了傳入的key/value,而調(diào)用Context接口的Value(key)方法就可以得到val。注意在同一個context中設(shè)置key/value,若key相同,值會被覆蓋。
關(guān)于更多的使用示例,可參考官方博客。 2 原理2.1 上下文數(shù)據(jù)的存儲與查詢type valueCtx struct {
Context
key, val interface{}
}
func WithValue(parent Context, key, val interface{}) Context {
if key == nil {
panic("nil key")
}
......
return &valueCtx{parent, key, val}
}
func (c *valueCtx) Value(key interface{}) interface{} {
if c.key == key {
return c.val
}
return c.Context.Value(key)
}
context上下文數(shù)據(jù)的存儲就像一個樹,每個結(jié)點(diǎn)只存儲一個key/value對。WithValue()保存一個key/value對,它將父context嵌入到新的子context,并在節(jié)點(diǎn)中保存了key/value數(shù)據(jù)。Value()查詢key對應(yīng)的value數(shù)據(jù),會從當(dāng)前context中查詢,如果查不到,會遞歸查詢父context中的數(shù)據(jù)。 值得注意的是,context中的上下文數(shù)據(jù)并不是全局的,它只查詢本節(jié)點(diǎn)及父節(jié)點(diǎn)們的數(shù)據(jù),不能查詢兄弟節(jié)點(diǎn)的數(shù)據(jù)。 2.2 手動cancel和超時cancelcancelCtx中嵌入了父Context,實(shí)現(xiàn)了canceler接口:
type cancelCtx struct {
Context // 保存parent Context
done chan struct{}
mu sync.Mutex
children map[canceler]struct{}
err error
}
// A canceler is a context type that can be canceled directly. The
// implementations are *cancelCtx and *timerCtx.
type canceler interface {
cancel(removeFromParent bool, err error)
Done() <-chan struct{}
}
cancelCtx結(jié)構(gòu)體中children保存它的所有子canceler, 當(dāng)外部觸發(fā)cancel時,會調(diào)用children中的所有cancel()來終止所有的cancelCtx。done用來標(biāo)識是否已被cancel。當(dāng)外部觸發(fā)cancel、或者父Context的channel關(guān)閉時,此done也會關(guān)閉。
type timerCtx struct {
cancelCtx //cancelCtx.Done()關(guān)閉的時機(jī):1)用戶調(diào)用cancel 2)deadline到了 3)父Context的done關(guān)閉了
timer *time.Timer
deadline time.Time
}
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc) {
......
c := &timerCtx{
cancelCtx: newCancelCtx(parent),
deadline: deadline,
}
propagateCancel(parent, c)
d := time.Until(deadline)
if d <= 0 {
c.cancel(true, DeadlineExceeded) // deadline has already passed
return c, func() { c.cancel(true, Canceled) }
}
c.mu.Lock()
defer c.mu.Unlock()
if c.err == nil {
c.timer = time.AfterFunc(d, func() {
c.cancel(true, DeadlineExceeded)
})
}
return c, func() { c.cancel(true, Canceled) }
}
timerCtx結(jié)構(gòu)體中deadline保存了超時的時間,當(dāng)超過這個時間,會觸發(fā)cancel。
可以看出,cancelCtx也是一棵樹,當(dāng)觸發(fā)cancel時,會cancel本結(jié)點(diǎn)和其子樹的所有cancelCtx。 3 遇到的問題3.1 背景某天,為了給我們的系統(tǒng)接入etrace(內(nèi)部的鏈路跟蹤系統(tǒng)),需要在gRpc/Mysql/Redis/MQ操作過程中傳遞requestId、rpcId,我們的解決方案是Context。 所有Mysql、MQ、Redis的操作接口的第一個參數(shù)都是context,如果這個context(或其父context)被cancel了,則操作會失敗。 func (tx *Tx) QueryContext(ctx context.Context, query string, args ...interface{}) (*Rows, error)
func(process func(context.Context, redis.Cmder) error) func(context.Context, redis.Cmder) error
func (ch *Channel) Consume(ctx context.Context, handler Handler, queue string, dc <-chan amqp.Delivery) error
func (ch *Channel) Publish(ctx context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) (err error)
上線后,遇到一系列的坑...... 3.2 Case 1現(xiàn)象:上線后,5分鐘后所有用戶登錄失敗,不斷收到報警。 原因:程序中使用localCache,會每5分鐘Refresh(調(diào)用注冊的回調(diào)函數(shù))一次所緩存的變量。localCache中保存了一個context,在調(diào)用回調(diào)函數(shù)時會傳進(jìn)去。如果回調(diào)函數(shù)依賴context,可能會產(chǎn)生意外的結(jié)果。 程序中,回調(diào)函數(shù)getAppIDAndAlias的功能是從mysql中讀取相關(guān)數(shù)據(jù)。如果ctx被cancel了,會直接返回失敗。 func getAppIDAndAlias(ctx context.Context, appKey, appSecret string) (string, string, error)
第一次localCache.Get(ctx, appKey, appSeret)傳的ctx是gRpc call傳進(jìn)來的context,而gRpc在請求結(jié)束或失敗時會cancel掉context,導(dǎo)致之后cache Refresh()時,執(zhí)行失敗。 解決方法:在Refresh時不使用localCache的context,使用一個不會cancel的context。 3.3 Case 2現(xiàn)象:上線后,不斷收到報警(sys err過多)??磍og/etrace產(chǎn)生2種sys err: - context canceled
- sql: Transaction has already been committed or rolled back
3.3.1 背景及原因
Ticket是處理Http請求的服務(wù),它使用Restful風(fēng)格的協(xié)議。由于程序內(nèi)部使用的是gRpc協(xié)議,需要某個組件進(jìn)行協(xié)議轉(zhuǎn)換,我們引入了grpc-gateway,用它來實(shí)現(xiàn)Restful轉(zhuǎn)成gRpc的互轉(zhuǎn)。
復(fù)現(xiàn)context canceled的流程如下: - 客戶端發(fā)送http restful請求。
- grpc-gateway與客戶端建立連接,接收請求,轉(zhuǎn)換參數(shù),調(diào)用后面的grpc-server。
- grpc-server處理請求。其中,grpc-server會對每個請求啟一個stream,由這個stream創(chuàng)建context。
- 客戶端連接斷開。
- grpc-gateway收到連接斷開的信號,導(dǎo)致context cancel。grpc client在發(fā)送rpc請求后由于外部異常使它的請求終止了(即它的context被cancel),會發(fā)一個RST_STREAM。
- grpc server收到后,馬上終止請求(即grpc server的stream context被cancel)。
可以看出,是因?yàn)間Rpc handler在處理過程中連接被斷開。 sql: Transaction has already been committed or rolled back產(chǎn)生的原因:
程序中使用了官方database包來執(zhí)行db transaction。其中,在db.BeginTx時,會啟一個協(xié)程awaitDone: func (tx *Tx) awaitDone() {
// Wait for either the transaction to be committed or rolled
// back, or for the associated context to be closed.
<-tx.ctx.Done()
// Discard and close the connection used to ensure the
// transaction is closed and the resources are released. This
// rollback does nothing if the transaction has already been
// committed or rolled back.
tx.rollback(true)
}
在context被cancel時,會進(jìn)行rollback(),而rollback時,會操作原子變量。之后,在另一個協(xié)程中tx.Commit()時,會判斷原子變量,如果變了,會拋出錯誤。 3.3.2 解決方法這兩個error都是由連接斷開導(dǎo)致的,是正常的??珊雎赃@兩個error。 3.4 Case 3上線后,每兩天左右有1~2次的mysql事務(wù)阻塞,導(dǎo)致請求耗時達(dá)到120秒。在盤古(內(nèi)部的mysql運(yùn)維平臺)中查詢到所有阻塞的事務(wù)在處理同一條記錄。
3.4.1 處理過程1. 初步懷疑是跨機(jī)房的多個事務(wù)操作同一條記錄導(dǎo)致的。由于跨機(jī)房操作,耗時會增加,導(dǎo)致阻塞了其他機(jī)房執(zhí)行的db事務(wù)。 2. 出現(xiàn)此現(xiàn)象時,暫時將某個接口降級。降低多個事務(wù)操作同一記錄的概率。 3. 減少事務(wù)的個數(shù)。 - 將單條sql的事務(wù)去掉
- 通過業(yè)務(wù)邏輯的轉(zhuǎn)移減少不必要的事務(wù)
4. 調(diào)整db參數(shù)innodb_lock_wait_timeout(120s->50s)。這個參數(shù)指示mysql在執(zhí)行事務(wù)時阻塞的最大時間,將這個時間減少,來減少整個操作的耗時??紤]過在程序中指定事務(wù)的超時時間,但是innodb_lock_wait_timeout要么是全局,要么是session的。擔(dān)心影響到session上的其它sql,所以沒設(shè)置。 5. 考慮使用分布式鎖來減少操作同一條記錄的事務(wù)的并發(fā)量。但由于時間關(guān)系,沒做這塊的改進(jìn)。 6. DAL同事發(fā)現(xiàn)有事務(wù)沒提交,查看代碼,找到root cause。 原因是golang官方包database/sql會在某種競態(tài)條件下,導(dǎo)致事務(wù)既沒有commit,也沒有rollback。 3.4.2 源碼描述開始事務(wù)BeginTxx()時會啟一個協(xié)程: // awaitDone blocks until the context in Tx is canceled and rolls back
// the transaction if it's not already done.
func (tx *Tx) awaitDone() {
// Wait for either the transaction to be committed or rolled
// back, or for the associated context to be closed.
<-tx.ctx.Done()
// Discard and close the connection used to ensure the
// transaction is closed and the resources are released. This
// rollback does nothing if the transaction has already been
// committed or rolled back.
tx.rollback(true)
}
tx.rollback(true)中,會先判斷原子變量tx.done是否為1,如果1,則返回;如果是0,則加1,并進(jìn)行rollback操作。
在提交事務(wù)Commit()時,會先操作原子變量tx.done,然后判斷context是否被cancel了,如果被cancel,則返回;如果沒有,則進(jìn)行commit操作。 // Commit commits the transaction.
func (tx *Tx) Commit() error {
if !atomic.CompareAndSwapInt32(&tx.done, 0, 1) {
return ErrTxDone
}
select {
default:
case <-tx.ctx.Done():
return tx.ctx.Err()
}
var err error
withLock(tx.dc, func() {
err = tx.txi.Commit()
})
if err != driver.ErrBadConn {
tx.closePrepared()
}
tx.close(err)
return err
}
如果先進(jìn)行commit()過程中,先操作原子變量,然后context被cancel,之后另一個協(xié)程在進(jìn)行rollback()會因?yàn)樵幼兞恐脼?而返回。導(dǎo)致commit()沒有執(zhí)行,rollback()也沒有執(zhí)行。 3.4.3 解決方法解決方法可以是如下任一個: - 在執(zhí)行事務(wù)時傳進(jìn)去一個
不會cancel的context - 修正
database/sql源碼,然后在編譯時指定新的go編譯鏡像
我們之后給Golang提交了patch,修正了此問題(已合入go 1.9.3)。 4 經(jīng)驗(yàn)教訓(xùn)由于go大量的官方庫、第三方庫使用了context,所以調(diào)用接收context的函數(shù)時要小心,要清楚context在什么時候cancel,什么行為會觸發(fā)cancel。筆者在程序經(jīng)常使用gRpc傳出來的context,產(chǎn)生了一些非預(yù)期的結(jié)果,之后花時間總結(jié)了gRpc、內(nèi)部基礎(chǔ)庫中context的生命期及行為,以避免出現(xiàn)同樣的問題。
|