|
該模塊兒是整個hadoop平臺命令類協(xié)議通訊的基礎,Hadoop平臺中所有的協(xié)議調用都是通過該套機制進行實現(xiàn)。
術語解釋:
遠程進程調用
client調用遠程Server中某實例的方法。
。
具體實現(xiàn):
遠程過程調用,一定通過網絡進行方法參數(shù)以及返回值信息傳輸,該模塊兒主要采用通用的網絡Server設計實現(xiàn)方式,利用Socket構建Server服務器,并在其上構造一個具體業(yè)務實例,client將方法調用參數(shù)按照一定格式進行序列化,通過網絡傳輸?shù)絊erver端,Server解析用戶請求調用相應的實例方法,將返回值或者捕獲的異常通過網絡反饋給client。
整體涉及到的主要類為:
Org.apache.hadoop.ipc.Server以及其內部實現(xiàn)類
org.apache.hadoop.ipc.Client以及其內部實現(xiàn)類
org.apache.hadoop.ipc.Rpc$Server
客戶端(client)具體實現(xiàn):
客戶端主要通過proxy模式,利用java自身動態(tài)代理技術,Proxy.newProxyInstance
public static Object newProxyInstance(ClassLoader loader,
Class<?>[] interfaces,
InvocationHandler h)
該函數(shù)主要的兩個參數(shù)Class<?>[] interfaces, InvocationHandler h
用戶調用該函數(shù)生成一個Object實例,該實例是一個實現(xiàn)了interfaces接口的具體實例,用戶通常將返回的Object轉化為具體的interfaces進行使用,當用戶調用該實例(實現(xiàn)了interfaces接口)某個具體方法時,系統(tǒng)就將請求轉發(fā)給InvocationHandler h這個參數(shù)對應的實例。
public interface InvocationHandler
{
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable;
}
最終用戶的請求被push到InvocationHandler h這個參數(shù)的invoke方法,系統(tǒng)將method以及args這兩個參數(shù)進行序列化,通過網絡傳輸?shù)絪erver端。
主要流程如下:其中Invoker是一個實現(xiàn)了InvocationHandler 接口的類
1.生成代理
生成遠程代理的主要關鍵步驟就是java的動態(tài)代理技術 Proxy.newProxyInstance.
2.遠程實例方法調用:
最終用戶的請求被push到InvocationHandler h這個參數(shù)的invoke方法,系統(tǒng)將method以及args這兩個參數(shù)進行序列化,通過網絡傳輸?shù)絪erver端(實例化Invoker的同時會實例化一個Client類實例,這個client主要負責將調用方法以及參數(shù)序列化以后,通過網絡傳輸?shù)絊erver端)。
遠程調用的主要關鍵就是Invocation實現(xiàn)了Writable接口,Invocation在write(DataOutput out)函數(shù)中將調用的methodName寫入到out,將調用方法的參數(shù)個數(shù)寫入out ,同時逐個將參數(shù)的className寫入out,最后將所有參數(shù)逐個寫入out,這也就決定了通過RPC實現(xiàn)調用的方法中的參數(shù)要么是簡單類型,要么是String,要么是實現(xiàn)了Writable接口的類(參數(shù)自己知道如何序列化到stream),要么是數(shù)組(數(shù)組的元素也必須為簡單類型,String,實現(xiàn)了Writable接口的類)。
Invocation序列化參數(shù)的實現(xiàn)是通過如下函數(shù)實現(xiàn)的:
Org.apache.hadoop.io.ObjectWritable.writeObject
(DataOutput out, Object instance,
Class declaredClass, Configuration conf);
{
if (instance == null)
{ // null
instance = new NullInstance(declaredClass, conf);
declaredClass = Writable.class;
}
//首先寫入參數(shù)名稱寫入out
UTF8.writeString(out, declaredClass.getName()); // always write declared
//如果是數(shù)組
if (declaredClass.isArray())
{ // array
int length = Array.getLength(instance);
//首先寫入數(shù)組元素個數(shù)
out.writeInt(length);
//逐步序列化數(shù)組中的元素
for (int i = 0; i < length; i++)
{
writeObject(out, Array.get(instance, i), declaredClass
.getComponentType(), conf);
}
}
//
else if (declaredClass == String.class)
{ // String
UTF8.writeString(out, (String) instance);
}
else if (declaredClass.isPrimitive())
{ // primitive type
if (declaredClass == Boolean.TYPE)
{ // boolean
out.writeBoolean(((Boolean) instance).booleanValue());
}
else if (declaredClass == Character.TYPE)
{ // char
out.writeChar(((Character) instance).charValue());
}
else if (declaredClass == Byte.TYPE)
{ // byte
out.writeByte(((Byte) instance).byteValue());
}
else if (declaredClass == Short.TYPE)
{ // short
out.writeShort(((Short) instance).shortValue());
}
else if (declaredClass == Integer.TYPE)
{ // int
out.writeInt(((Integer) instance).intValue());
}
else if (declaredClass == Long.TYPE)
{ // long
out.writeLong(((Long) instance).longValue());
}
else if (declaredClass == Float.TYPE)
{ // float
out.writeFloat(((Float) instance).floatValue());
}
else if (declaredClass == Double.TYPE)
{ // double
out.writeDouble(((Double) instance).doubleValue());
}
else if (declaredClass == Void.TYPE)
{ // void
}
else
{
throw new IllegalArgumentException("Not a primitive: "
+ declaredClass);
}
}
else if (declaredClass.isEnum())
{ // enum
UTF8.writeString(out, ((Enum) instance).name());
}
//其他元素通過自身實現(xiàn)的writable接口功能實現(xiàn)序列化
else if (Writable.class.isAssignableFrom(declaredClass))
{ // Writable
UTF8.writeString(out, instance.getClass().getName());
((Writable) instance).write(out);
}
else
{
throw new IOException("Can't write: " + instance + " as "
+ declaredClass);
}
}
服務端(Server,具體調用實例所在)具體實現(xiàn):
1.具體相關類
最底層的抽象類Server是整個網絡通訊的核心,完成Listener,Responser以及Handler實例的初始化,并控制所有功能模塊兒服務的啟動。
Listener主要負責Socket的監(jiān)聽以及Connection的建立,同時監(jiān)控ClientSocket的數(shù)據(jù)可讀事件,通知Connection進行processData,收到完成請求包以后,封裝為一個Call對象(包含Connection對象,從網絡流中讀取的參數(shù)信息,調用方法信息),將其放入隊列。
Handler主要負責從請求隊列中取出數(shù)據(jù)逐個進行處理,最終調用Server抽獎類的call方法,目前抽象類Server實現(xiàn)了一個具體的實現(xiàn)類,名稱也是Server。Server的call方法中根據(jù)方法名以及參數(shù)構建method對象,并在實際服務的實例對象上進行方法調用。方法調用返回值或異常對象被序列化到一個ByteBuffer,由Responser.doRespond方法進行發(fā)送數(shù)據(jù)。
Responser的主要作用是發(fā)送數(shù)據(jù),如果數(shù)據(jù)量比較大一次發(fā)送不出去,就監(jiān)聽Write事件,一直Select是否可寫,保證數(shù)據(jù)發(fā)送完整.
2.Server端相關調用流程圖如下:
本文來自CSDN博客,轉載請標明出處:http://blog.csdn.net/sxf_824/archive/2009/11/20/4842153.aspx
本文來自CSDN博客,轉載請標明出處:http://blog.csdn.net/sxf_824/archive/2009/11/20/4842153.aspx
|
|
|