- 論壇徽章:
- 0
|
這一段時(shí)間在忙自己的MapReduce追蹤系統(tǒng)。今天總算把MapReduce的任務(wù)調(diào)度策略的實(shí)時(shí)追蹤系統(tǒng)寫(xiě)完并且調(diào)試通過(guò)了。說(shuō)實(shí)話,確實(shí)比較幸苦。感謝這段時(shí)間女朋友的陪伴,雖然比較多的時(shí)間只是趴在我桌子旁邊睡覺(jué)。
有時(shí)間可以考慮把系統(tǒng)截圖發(fā)上來(lái)。讓更多hadoop愛(ài)好者一起討論。
為了完成對(duì)Hadoop源碼的剖析,今天我繼續(xù)寫(xiě)一點(diǎn)東西。
Hadoop中的RPC機(jī)制。
RPC已經(jīng)被很多庫(kù)實(shí)現(xiàn)了,感覺(jué)在Sun的NFS中的RPC機(jī)制就有,還有apache組織的xmlrpc,還有java的rmi,很多都是實(shí)現(xiàn)這個(gè)RPC,即遠(yuǎn)程過(guò)程調(diào)用。引入RPC這一層是軟件設(shè)計(jì)的偉大創(chuàng)舉,使得分布式程序跟單節(jié)點(diǎn)程序一樣易于編寫(xiě)。
一個(gè)例子就是:
對(duì)象1 funcName(對(duì)象2,對(duì)象3,對(duì)象4...)
在Hadoop中,這些對(duì)象都必須能在網(wǎng)絡(luò)上傳輸。不是每一個(gè)對(duì)象都滿足這個(gè)條件。
所以今天先講Hadoop中的對(duì)象傳輸機(jī)制。
在Hadoop中,能被傳輸?shù)膶?duì)象都實(shí)現(xiàn)了Writable接口。
我們先看下這個(gè)接口。
/**
* Copyright 2005 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io;
import java.io.DataOutput;
import java.io.DataInput;
import java.io.IOException;
/** A simple, efficient, serialization protocol, based on {@link DataInput} and
* {@link DataOutput}.
*
* Implementations typically implement a static read(DataInput)
* method which constructs a new instance, calls {@link
* #readFields(DataInput)}, and returns the instance.
*
* @author Doug Cutting
*/
/**
* write/read for network transfer class
*/
public interface Writable {
/** Writes the fields of this object to out. */
void write(DataOutput out) throws IOException;
/**
* Reads the fields of this object from in. For efficiency,
* implementations should attempt to re-use storage in the existing object
* where possible.
*/
void readFields(DataInput in) throws IOException;
}
這個(gè)接口非常簡(jiǎn)單,兩個(gè)方法。
如果一個(gè)類實(shí)現(xiàn)了這個(gè)接口,也就是說(shuō)這個(gè)類有這兩個(gè)方法的實(shí)現(xiàn)。那么它就可以在網(wǎng)絡(luò)上進(jìn)行傳輸。
我們拿一個(gè)類來(lái)做例子。
/**
* Copyright 2005 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.dfs;
import org.apache.hadoop.io.*;
import java.io.*;
import java.util.*;
/**************************************************
* A Block is a Hadoop FS primitive, identified by a long.
*
* @author Mike Cafarella
**************************************************/
public class Block implements Writable, Comparable {
static { // register a ctor
WritableFactories.setFactory(Block.class, new WritableFactory() {
public Writable newInstance() {
return new Block();
}
});
}
static Random r = new Random();
/**
*/
public static boolean isBlockFilename(File f) {
if (f.getName().startsWith("blk_")) {
return true;
} else {
return false;
}
}
long blkid;
long len;
/**
*/
public Block() {
this.blkid = r.nextLong();
this.len = 0;
}
/**
*/
public Block(long blkid, long len) {
this.blkid = blkid;
this.len = len;
}
/**
* Find the blockid from the given filename
*/
public Block(File f, long len) {
String name = f.getName();
name = name.substring("blk_".length());
this.blkid = Long.parseLong(name);
this.len = len;
}
/**
*/
public long getBlockId() {
return blkid;
}
/**
*/
public String getBlockName() {
return "blk_" + String.valueOf(blkid);
}
/**
*/
public long getNumBytes() {
return len;
}
public void setNumBytes(long len) {
this.len = len;
}
/**
*/
public String toString() {
return getBlockName();
}
// ///////////////////////////////////
// Writable
// ///////////////////////////////////
public void write(DataOutput out) throws IOException {
out.writeLong(blkid);
out.writeLong(len);
}
public void readFields(DataInput in) throws IOException {
this.blkid = in.readLong();
this.len = in.readLong();
}
// ///////////////////////////////////
// Comparable
// ///////////////////////////////////
public int compareTo(Object o) {
Block b = (Block) o;
if (getBlockId() b.getBlockId()) {
return -1;
} else if (getBlockId() == b.getBlockId()) {
return 0;
} else {
return 1;
}
}
public boolean equals(Object o) {
Block b = (Block) o;
return (this.compareTo(b) == 0);
}
}
這個(gè)類中很明顯有兩個(gè)方法,readFields和write
Block在hadoop中是一個(gè)數(shù)據(jù)塊的意思。因?yàn)橐粋(gè)文件在HDFS中是要被切分成很多塊的。
很容易看出來(lái),write就是把自己的字段從DataOutput這個(gè)接口發(fā)出去,readFields就是從DataInput這個(gè)接口里讀進(jìn)來(lái)初始化自己的字段。
這個(gè)設(shè)計(jì)的巧妙之處就在于這里。我們?cè)趥鬏攲?duì)象的時(shí)候,建立一個(gè)Socket,那么我們可以獲取這個(gè)Socket的DataInputStream和DataOutputStream.然后傳給這兩個(gè)函數(shù)。這樣就很輕松低耦合的實(shí)現(xiàn)了對(duì)象傳輸。
Writable這個(gè)類在hadoop中是一個(gè)非常重要的基礎(chǔ)類,整個(gè)Hadoop的大廈都建立在RPC機(jī)制上。而Writable接口功不可沒(méi)。
沒(méi)什么時(shí)間就寫(xiě)這么多,該做點(diǎn)別的事情了,最近看python源碼剖析看得非常激情澎湃。有志同道合的網(wǎng)友歡迎來(lái)交流。
下次有時(shí)間再來(lái)寫(xiě)RPC機(jī)制的另外一部分。代理機(jī)制。
本文來(lái)自ChinaUnix博客,如果查看原文請(qǐng)點(diǎn):http://blog.chinaunix.net/u3/105041/showart_2139430.html |
|