|
Spark是一個針對超大數(shù)據(jù)集合的集群分布式計算系統(tǒng),比MapReducer快。我們從三個方面來引出和介紹Spark。
1. How to find the most similar movies? 2. How to scale? 3. How to survive?
1. How to find the most similar movies? 我們的目標是算電影之間的相似度。這里我們講一個非?;镜南嗨贫人惴?,它的基本思想是電影的并發(fā)出現(xiàn)率越高,相似度越高。如圖所示,三個用戶分別有一個喜歡的電影的列表,為了算電影相似度,我們先找出電影的并發(fā)出現(xiàn),比如通過u1的列表,先找到6個電影的并發(fā)pair,表示m2和m5同時出現(xiàn),m2和m7同時出現(xiàn)……(例中并發(fā)pair和順序有關(guān),我們稍后會將),再把屬于不同用戶的相同pair求和,比如(m2,m5)出現(xiàn)一次,(m2,m7)出現(xiàn)三次。這樣可以得出m2相似度最高的是m7的結(jié)論。 1) What if the data is large? 仍然使用上述例子,數(shù)據(jù)很大時可以先將用戶喜歡電影的列表(data1) 拆解成并發(fā)pair,再把相同pair merge在一起。 運用數(shù)據(jù)的partition,將不同用戶的數(shù)據(jù)放在不同機器上運算,每一部分叫一個worker,比如worker1處理u1的數(shù)據(jù),worker2處理u2的數(shù)據(jù),worker3處理u3的數(shù)據(jù)。如果用戶很多,對每個用戶分配一個機器并不高效,我們需要改變parition方式。比如可以將u1到u100的數(shù)據(jù)放在worker1里處理。每個worker不僅要存data,還要存算法。比如用分解的算法,input為存電影的vector,通過n^2的復雜度得到并發(fā)pair。每個worker得到新的data2(并發(fā)pair)。
Merge過程也需要多個worker,每個worker存有data2(并發(fā)pair)和一個新的算法(算法2)。這個過程要輸出pair出現(xiàn)的次數(shù)。并發(fā)pair第一個值相同的數(shù)據(jù)放在相同的worker里,用算法2進行統(tǒng)計,就可以輸出并發(fā)pair出現(xiàn)次數(shù)并找到有最大相似度的電影。 這里有幾個問題需要思考: (1) merge過程的worker開始工作時,是否要求partition的worker結(jié)束工作? 不要求。因為即使partition的worker沒有處理完數(shù)據(jù),worker仍可以得到部分數(shù)據(jù),這就是流處理(streaming)。 (2) partition的數(shù)據(jù)是否要拷貝到merge的worker上? 不建議數(shù)據(jù)拷貝。因為數(shù)據(jù)拷貝非常耗時。如果worker在同一個機器上,可以傳遞引用/指針。 (3) 是否需要格外的worker從merge過程讀取數(shù)據(jù),輸出best one?(比如對于m2,輸出相似度最高的m7)? 不需要。我們可以在merge過程中增加一個算法2.5來選擇最高相似度的電影,并且不用在worker間傳數(shù)據(jù)。 (4) 分布式系統(tǒng)中,數(shù)據(jù)變成immutable性能變好還是壞? immutable的數(shù)據(jù)是指只讀的數(shù)據(jù),它雖然天然并發(fā),但使得寫操作變多。 (5)(高階)既然可以用拆解的方式做,為什么仍需要map reduce?(留作思考) 總結(jié)下來,程序 = 數(shù)據(jù) + 算法。計算復雜度 = 數(shù)據(jù) + 算法。數(shù)據(jù)部分指的數(shù)據(jù)的移動,它在分布式系統(tǒng)中很顯著。降低數(shù)據(jù)移動的復雜度可以少移動(傳引用或組合操作,比如算法組合)或者多用memory。 2) How to run faster? 可以通過組合算法(比如算法2和算法2.5進行組合)減少數(shù)據(jù)移動。這種組合就是DAG分析(DAG有向無環(huán)圖,反應(yīng)彈性分布式數(shù)據(jù)集之間的依賴關(guān)系)。 還可以用lazy evaluation。所有計算過程都是data + 算法 = new data,我們先找出所有的算法關(guān)系,再進行化簡,這就是lazy evaluation。雖然算法復雜度不一定變低,但是數(shù)據(jù)移動變少。 Spark的中間數(shù)據(jù)放到內(nèi)存中,對迭代運算效率更高。(根據(jù)spark官網(wǎng)給出的對比測試結(jié)果,當spark所有的計算都在內(nèi)存中進行時,spark要比hadoop快兩個數(shù)量級100多倍;當spark計算應(yīng)用到磁盤時,spark的計算速度也是hadoop的10x倍) (from: http://student-lp./blog/2158969) 說到hadoop,很多人不清楚spark和map reduce區(qū)別。我們用品牌名和功能名來解釋。 品牌名:spark, ec2, hadoop, apache, nginx, tomcat, kafka 功能名:mapreduce, filter, storage, handle HTTP, load balance, cache 一個品牌可以有很多功能,比如apache, nginx都可以handle HTTP, load balance, cache 2. How to scale? 1) What is the architecture? 首先client,server互相連接,client需要driver,server需要driver。同時有controller/master處理各種機器。在建立連接時,可以client和master連接,master負責分發(fā),也可以client直接和server連接。這兩種方法都可以。如果client全都找master,master會成為瓶頸。 master有很多種(比如yarn),master的功能有很多種,比如負責協(xié)調(diào),負責回滾 除了driver,client還需要scheduler,把數(shù)據(jù)拆解成可調(diào)度的序列。除了recevier,server還需要exectuer和data management。exectuer之間的數(shù)據(jù)盡量共享,這樣數(shù)據(jù)不用存儲多份。存儲的數(shù)據(jù)是immutable,這樣不用擔心數(shù)據(jù)被改動,增加數(shù)據(jù)共享率。 如果數(shù)據(jù)太多,可以使用MMAP(一種內(nèi)存映射文件的方法,即將一個文件或者其它對象映射到進程的地址空間,實現(xiàn)文件磁盤地址和進程虛擬地址空間中一段虛擬地址的一一對映關(guān)系。)或者調(diào)整算法。 此外還有shuffle/transfer,它使得從用戶角度看,只有一個server。 (from:http://www.cnblogs.com/huxiao-tee/p/4660352.html) 3. How to survive? 1) What if a worker is broken? spark使用重算而不是replica,因為replica會使數(shù)據(jù)移動,增加時間復雜度。重算的前提是系統(tǒng)還存有故障部分前面的數(shù)據(jù),這需要有選擇性的做cache/snapshot,從而可以用log來進行數(shù)據(jù)恢復。 這個過程在spark里叫l(wèi)ineage:spark里數(shù)據(jù)恢復原狀。RDD可以通過不斷尋找父類,找到最原始的那個RDD。這條繼承路徑就認為是RDD的血統(tǒng)。 這里有一個tricky的問題:如果幾個worker在同一臺機器,一個worker失敗,會增加很多復雜度。如果在不同機器,運算又會很慢,因為要傳遞數(shù)據(jù)。至此,我們已經(jīng)大致了解Spark的架構(gòu)了。 One more thing 回到相似度的問題上。之前我們討論的是item之間的similarity。如何計算user之間的similarity? (1) 首先得到各個user的所有喜愛電影的列表。 (2) 然后filter數(shù)據(jù),比如選擇喜歡電影數(shù)量在10~100的用戶。因為如果用戶小于10,歷史記錄太少,大于100說明數(shù)據(jù)太廣泛,也不適于進行相似度分析。 (3) 進行倒排索引,得到(user,movie)的pair (4) 針對每個電影,得到針對電影的倒排索引(可以partition)。比如m1 = {u1, u3, u7}。 (5) 通過filter得到用戶最新的10個電影列表,比如u1 = {m1, m5, m9…...}。 (6) 兩種數(shù)據(jù)進行組合(4和5中)。對于u1,找m1, m5, m9, …...的用戶列表,找到出現(xiàn)次數(shù)最多的用戶。 ↓↓↓ 長按識別圖中二維碼 【論碼農(nóng)的自我修養(yǎng)】 |
|
|