
HDFS 涉及两个重要进程:NameNode、DataNode。
他们一般都部署单独部署在不同服务器上,运行 NameNode 的服务器是主服务器,运行 DataNode 的服务器是从服务器。主服务器只有一个,从服务器有多个。
这种一主多从的架构基本适用于所有分布式系统或框架。可重复使用的架构方案叫作架构模式,一主多从可谓是大数据领域的最主要的架构模式。主服务器只有一台,掌控全局。从服务器有很多台,负责具体的事情。这样很多台服务器可以有效组织起来,对外表现出一个统一又强大的存储计算能力。
DataNode 负责文件数据的存储和读写 *** 作,HDFS 将文件数据分割成若干数据块(Block),每个 DataNode 存储一部分数据块,这样文件就分布存储在整个 HDFS 服务器集群中。应用程序客户端(Client)可以并行对这些数据块进行访问,从而使得 HDFS 可以在服务器集群规模上实现数据并行访问,极大地提高了访问速度。
在实践中,HDFS 集群的 DataNode 服务器会有很多台,一般在几百台到几千台这样的规模,每台服务器配有数块磁盘,整个集群的存储容量大概在几 PB 到数百 PB。
NameNode 负责整个分布式文件系统的元数据(MetaData)管理,也就是文件路径名、数据块的 ID 以及存储位置等信息,相当于 *** 作系统中文件分配表(FAT)的角色。HDFS 为了保证数据的高可用,会将一个数据块复制为多份(默认3份),并将多份相同的数据块存储在不同的机架的服务器上。这样当有磁盘损坏,或者某个 DataNode 服务器宕机,甚至某个交换机宕机时,系统能通过其备份的数据块进行查找。
处理客户端的请求。
客户端向 HDFS 上传文件。
客户端向 HDFS 读取文件。
像 NameNode 这样主从服务器管理同一份数据的场景,如果从服务器错误地以为主服务器宕机而接管集群管理,会出现主从服务器一起对 DataNode 发送指令,进而导致集群混乱,也就是所谓的“脑裂”。这也是这类场景选举主服务器时,引入 ZooKeeper 的原因。
Java API读写HDFS
public class FSOptr {
/
@param args
/
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
makeDir(conf);
rename(conf);
delete(conf);
}
// 创建文件目录
private static void makeDir(Configuration conf) throws Exception {
FileSystem fs = FileSystemget(conf);
Path dir = new Path("/user/hadoop/data/20140318");
boolean result = fsmkdirs(dir);// 创建文件夹
Systemoutprintln("make dir :" + result);
// 创建文件,并写入内容
Path dst = new Path("/user/hadoop/data/20140318/tmp");
byte[] buff = "hello,hadoop!"getBytes();
FSDataOutputStream outputStream = fscreate(dst);
outputStreamwrite(buff, 0, bufflength);
outputStreamclose();
FileStatus files[] = fslistStatus(dst);
for (FileStatus file : files) {
Systemoutprintln(filegetPath());
}
fsclose();
}
// 重命名文件
private static void rename(Configuration conf) throws Exception {
FileSystem fs = FileSystemget(conf);
Path oldName = new Path("/user/hadoop/data/20140318/1txt");
Path newName = new Path("/user/hadoop/data/20140318/2txt");
fsrename(oldName, newName);
FileStatus files[] = fslistStatus(new Path(
"/user/hadoop/data/20140318"));
for (FileStatus file : files) {
Systemoutprintln(filegetPath());
}
fsclose();
}
// 删除文件
@SuppressWarnings("deprecation")
private static void delete(Configuration conf) throws Exception {
FileSystem fs = FileSystemget(conf);
Path path = new Path("/user/hadoop/data/20140318");
if (fsisDirectory(path)) {
FileStatus files[] = fslistStatus(path);
for (FileStatus file : files) {
fsdelete(filegetPath());
}
} else {
fsdelete(path);
}
// 或者
fsdelete(path, true);
fsclose();
}
/
下载,将hdfs文件下载到本地磁盘
@param localSrc1
本地的文件地址,即文件的路径
@param hdfsSrc1
存放在hdfs的文件地址
/
public boolean sendFromHdfs(String hdfsSrc1, String localSrc1) {
Configuration conf = new Configuration();
FileSystem fs = null;
try {
fs = FileSystemget(URIcreate(hdfsSrc1), conf);
Path hdfs_path = new Path(hdfsSrc1);
Path local_path = new Path(localSrc1);
fscopyToLocalFile(hdfs_path, local_path);
return true;
} catch (IOException e) {
eprintStackTrace();
}
return false;
}
/
上传,将本地文件copy到hdfs系统中
@param localSrc
本地的文件地址,即文件的路径
@param hdfsSrc
存放在hdfs的文件地址
/
public boolean sendToHdfs1(String localSrc, String hdfsSrc) {
InputStream in;
try {
in = new BufferedInputStream(new FileInputStream(localSrc));
Configuration conf = new Configuration();// 得到配置对象
FileSystem fs; // 文件系统
try {
fs = FileSystemget(URIcreate(hdfsSrc), conf);
// 输出流,创建一个输出流
OutputStream out = fscreate(new Path(hdfsSrc),
new Progressable() {
// 重写progress方法
public void progress() {
// Systemoutprintln("上传完一个设定缓存区大小容量的文件!");
}
});
// 连接两个流,形成通道,使输入流向输出流传输数据,
IOUtilscopyBytes(in, out, 10240, true); // in为输入流对象,out为输出流对象,4096为缓冲区大小,true为上传后关闭流
return true;
} catch (IOException e) {
eprintStackTrace();
}
} catch (FileNotFoundException e) {
eprintStackTrace();
}
return false;
}
/
移动
@param old_st原来存放的路径
@param new_st移动到的路径
/
public boolean moveFileName(String old_st, String new_st) {
try {
// 下载到服务器本地
boolean down_flag = sendFromHdfs(old_st, "/home/hadoop/文档/temp");
Configuration conf = new Configuration();
FileSystem fs = null;
// 删除源文件
try {
fs = FileSystemget(URIcreate(old_st), conf);
Path hdfs_path = new Path(old_st);
fsdelete(hdfs_path);
} catch (IOException e) {
eprintStackTrace();
}
// 从服务器本地传到新路径
new_st = new_st + old_stsubstring(old_stlastIndexOf("/"));
boolean uplod_flag = sendToHdfs1("/home/hadoop/文档/temp", new_st);
if (down_flag && uplod_flag) {
return true;
}
} catch (Exception e) {
eprintStackTrace();
}
return false;
}
// copy本地文件到hdfs
private static void CopyFromLocalFile(Configuration conf) throws Exception {
FileSystem fs = FileSystemget(conf);
Path src = new Path("/home/hadoop/wordtxt");
Path dst = new Path("/user/hadoop/data/");
fscopyFromLocalFile(src, dst);
fsclose();
}
// 获取给定目录下的所有子目录以及子文件
private static void getAllChildFile(Configuration conf) throws Exception {
FileSystem fs = FileSystemget(conf);
Path path = new Path("/user/hadoop");
getFile(path, fs);
}
private static void getFile(Path path, FileSystem fs)throws Exception {
FileStatus[] fileStatus = fslistStatus(path);
for (int i = 0; i < fileStatuslength; i++) {
if (fileStatus[i]isDir()) {
Path p = new Path(fileStatus[i]getPath()toString());
getFile(p, fs);
} else {
Systemoutprintln(fileStatus[i]getPath()toString());
}
}
}
//判断文件是否存在
private static boolean isExist(Configuration conf,String path)throws Exception{
FileSystem fileSystem = FileSystemget(conf);
return fileSystemexists(new Path(path));
}
//获取hdfs集群所有主机结点数据
private static void getAllClusterNodeInfo(Configuration conf)throws Exception{
FileSystem fs = FileSystemget(conf);
DistributedFileSystem hdfs = (DistributedFileSystem)fs;
DatanodeInfo[] dataNodeStats = hdfsgetDataNodeStats();
String[] names = new String[dataNodeStatslength];
Systemoutprintln("list of all the nodes in HDFS cluster:"); //print info
for(int i=0; i < dataNodeStatslength; i++){
names[i] = dataNodeStats[i]getHostName();
Systemoutprintln(names[i]); //print info
}
}
//get the locations of a file in HDFS
private static void getFileLocation(Configuration conf)throws Exception{
FileSystem fs = FileSystemget(conf);
Path f = new Path("/user/cluster/dfstxt");
FileStatus filestatus = fsgetFileStatus(f);
BlockLocation[] blkLocations = fsgetFileBlockLocations(filestatus,0,filestatusgetLen());
int blkCount = blkLocationslength;
for(int i=0; i < blkCount; i++){
String[] hosts = blkLocations[i]getHosts();
//Do sth with the block hosts
Systemoutprintln(hosts);
}
}
//get HDFS file last modification time
private static void getModificationTime(Configuration conf)throws Exception{
FileSystem fs = FileSystemget(conf);
Path f = new Path("/user/cluster/dfstxt");
FileStatus filestatus = fsgetFileStatus(f);
long modificationTime = filestatusgetModificationTime(); // measured in milliseconds since the epoch
Date d = new Date(modificationTime);
Systemoutprintln(d);
}
}
你好,实体类定义代码:
<pre name="code" class="java">package comqinoperadb;
import javaioDataInput;
import javaioDataOutput;
import javaioIOException;
import javasqlPreparedStatement;
import javasqlResultSet;
import javasqlSQLException;
import orgapachehadoopioText;
import orgapachehadoopioWritable;
import orgapachehadoopmapreducelibdbDBWritable;
/
封装数据库实体信息
的记录
搜索大数据技术交流群:376932160
/
public class PersonRecoder implements Writable,DBWritable {
public int id;//对应数据库中id字段
public String name;//对应数据库中的name字段
public int age;//对应数据库中的age字段
@Override
public void readFields(ResultSet result) throws SQLException {
thisid=resultgetInt(1);
thisname=resultgetString(2);
thisage=resultgetInt(3);
}
@Override
public void write(PreparedStatement stmt) throws SQLException {
stmtsetInt(1, id);
stmtsetString(2, name);
stmtsetInt(3, age);
}
@Override
public void readFields(DataInput arg0) throws IOException {
// TODO Auto-generated method stub
thisid=arg0readInt();
thisname=TextreadString(arg0);
thisage=arg0readInt();
}
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
outwriteInt(id);
TextwriteString(out, thisname);
outwriteInt(thisage);
}
@Override
public String toString() {
// TODO Auto-generated method stub
return "id: "+id+" 年龄: "+age+" 名字:"+name;
}
}
</pre>
MR类的定义代码,注意是一个Map Only作业:
<pre name="code" class="java">package comqinoperadb;
import javaioIOException;
import orgapachehadoopconfConfiguration;
import orgapachehadoopfsFileSystem;
import orgapachehadoopfsPath;
import orgapachehadoopioLongWritable;
import orgapachehadoopioText;
import orgapachehadoopmapredJobConf;
import orgapachehadoopmapredlibIdentityReducer;
import orgapachehadoopmapreduceJob;
import orgapachehadoopmapreduceMapper;
import orgapachehadoopmapreducelibdbDBConfiguration;
import orgapachehadoopmapreducelibdbDBInputFormat;
import orgapachehadoopmapreduceliboutputFileOutputFormat;
public class ReadMapDB {
/
Map作业读取数据记录数
/
private static class DBMap extends Mapper<LongWritable, PersonRecoder , LongWritable, Text>{
@Override
protected void map(LongWritable key, PersonRecoder value,Context context)
throws IOException, InterruptedException {
contextwrite(new LongWritable(valueid), new Text(valuetoString()));
}
}
public static void main(String[] args)throws Exception {
JobConf conf=new JobConf(ReadMapDBclass);
//Configuration conf=new Configuration();
// confset("mapredjobtracker","19216875130:9001");
//读取person中的数据字段
// confsetJar("ttjar");
//注意这行代码放在最前面,进行初始化,否则会报
DBConfigurationconfigureDB(conf, "commysqljdbcDriver", "jdbc:mysql://19216821136:3306/test", "root", "qin");
/要读取的字段信息/
String fileds[]=new String[]{"id","name","age"};
/Job任务/
Job job=new Job(conf, "readDB");
Systemoutprintln("模式: "+confget("mapredjobtracker"));
/设置数据库输入格式的一些信息/
DBInputFormatsetInput(job, PersonRecoderclass, "person", null, "id", fileds);
/设置输入格式/
jobsetInputFormatClass(DBInputFormatclass);
jobsetOutputKeyClass(LongWritableclass);
jobsetOutputValueClass(Textclass);
jobsetMapperClass(DBMapclass);
String path="hdfs://19216875130:9000/root/outputdb";
FileSystem fs=FileSystemget(conf);
Path p=new Path(path);
if(fsexists(p)){
fsdelete(p, true);
Systemoutprintln("输出路径存在,已删除!");
}
FileOutputFormatsetOutputPath(job,p );
Systemexit(jobwaitForCompletion(true) 0 : 1);
}
}
1、简单介绍下hadoop吧?
广义上hadoop是指与hadoop相关的大数据生态圈。包含hive、spark、hbase等。
狭义上hadoop指的是apache的开源框架。有三个核心组件:
----hdfs:分布式文件存储系统
----yarn:分布式资源管理调度平台
----mr:分布式计算引擎
2、介绍下hdfs
全称为Hadoop Distributed File System。有三个核心组件:
namenode:有三个作用,第一是负责保存集群的元数据信息,第二是负责维护整个集群节点的正常运行。
第三是负责处理客户端的请求。
datanode:负责实际保存数据。实际执行数据块的读写 *** 作。
secondarynamenode:辅助namenode进行元数据的管理。不是namenode的备份。
3、namenode的工作机制?
namenode在内存中保存着整个内存系统的名称空间和文件数据块的地址映射。整个hdfs可存储的文件数受限于namenode的内存大小。所以hdfs不适合大量小文件的存储。
---namenode有三种元数据存储方式来管理元数据:
》内存元数据:内存中保存了完整的元数据
》保存在磁盘上的元数据镜像文件(fsimage):该文件时hdfs存在磁盘中的元数据检查点,里面保存的是最后一次检查点之前的hdfs文件系统中所有目录和文件的序列化信息。
》数据 *** 作日志文件(edits):用于衔接内存meta data和持久化元数据镜像fsimage之间的 *** 作日志文件。保存了自最后一次检查点之后所有针对hdfs文件系统的 *** 作。如对文件的增删改查。
4、如何查看元数据信息?
因为edits和fsimage文件是经过序列化的,所以不能直接查看。hadoop20以上提供了查看两种文件的工具。
----命令:hdfs oiv 可以将fsimage文件转换成其他格式,如xml和文本文件。-i 表示输入fsimage文件。-o 输出文件路径,-p 指定输出文件
hdfs oev可以查看edits文件。同理需要指定相关参数。
详情查看: >
一、涉及的问题
1 冗余数据保存
2 数据保存策略
3 数据恢复
二、冗余数据保存问题
1 冗余因子
出于成本考虑(也是HDFS优势),HDFS常架构在廉价机器上——经常出故障。所以必须有冗余机制。一般每个块都保存3份,即冗余因子默认是3
注意:伪分布式配置,即名称节点和数据节点都放在同一个机器上,则冗余因子显然只能是1,因为只有一个机器
2 冗余机制的好处
(1) 加快数据传输速度——当多个客户端同时想访问相同数据块时,可以同时并行而不需要排队
(2) 很容易检查数据错误——互相对照发现错误
(3) 保证数据可靠性——HDFS有这样的机制:一旦探测到一个副本故障,会自动复制正确副本,使冗余因子恢复默认值
三、数据保存与读取
1 第一副本存放策略:
(1) 如果保存请求来自集群内部,第一个副本放在发起者(应用)所在节点。比如一个在DataNode1上的应用发起存数据请求,那就把它第一份副本也存在DataNode1
(2) 如果保存请求来自集群外部,HDFS会随机挑选一台磁盘不太忙且CPU不太忙的节点来放置第一个副本
2 第二个副本存放策略:
放在和第一副本不同的机架的节点上 。如下图中DataNode4,它和DataNode1在不同机架上
3 第三副本放置策略:
放在和第一副本相同的机架的其他节点。如图中的DataNode2或DataNode3
4 更多副本存放策略:
全部随机放置(依靠随机算法)
5、数据读取
原则:就近读取
——HDFS提供一个API可以告诉数据节点的机架ID,客户端也可以用API诊断自己所在机架ID。ID相同说明在同一机架。而相同机架数据间通信很快,它们就是“相近”的数据。
——而前面也说了,每个数据块都有多个不同的副本,如果找到某个副本和客户端在同一个机架上,就优先选此副本。如果没有就随机找一个副本读取
四、数据的错误与恢复
1 名称节点出错
只有一个名称节点,而且它保存了核心数据结构FsImage和EditLog。恢复方法:
(1) 在HDFS10里只能暂停服务,从第二名称节点(冷备份)恢复
(2) 在HDFS20里可以直接用热备份恢复而不用暂停服务
2 数据节点出错
(1) 如何发现数据节点出问题:
在整个运行期间,DataNode都会定期通过远程调用向NameNode发送心跳信息。一旦隔了一个周期收不到心跳信息,则NameNode就知道这个DataNode发生了故障
(2) 如何恢复数据节点:
NameNode会在状态列表里把出错的DataNode标记为不可用(宕机),然后把它里面的数据块对应的备份(在其他DataNode上)复制到另一个DataNode上去
——HDFS和其他分布式文件系统最大的区别就是 可以调整冗余数据位置 。这种调整不仅发生在故障时,也可以在在机器负载不均衡时把一个DataNode的数据迁移到另一个上面以平衡负载
3 数据出错
(1) 如何发现数据出错:
“校验码机制”——客户端每写入一个数据块,都会为其生成一个校验码并保存在同一文件目录下。读取数据块同时会核对其校验码,如果不对说明数据出问题了。
(2) 如何恢复数据:
从备份复制过来
Reference:
>
以上就是关于HDFS 架构全部的内容,包括:HDFS 架构、如何使用Java API读写HDFS、如何通过网络访问hdfs的数据等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)