HDFS 架构

HDFS 架构,第1张

HDFS 涉及两个重要进程:NameNode、DataNode。

他们一般都部署单独部署在不同服务器上,运行 NameNode 的服务器是主服务器,运行 DataNode 的服务器是从服务器。主服务器只有一个,从服务器有多个。

这种一主多从的架构基本适用于所有分布式系统或框架。可重复使用的架构方案叫作架构模式,一主多从可谓是大数据领域的最主要的架构模式。主服务器只有一台,掌控全局。从服务器有很多台,负责具体的事情。这样很多台服务器可以有效组织起来,对外表现出一个统一又强大的存储计算能力。

DataNode 负责文件数据的存储和读写 *** 作,HDFS 将文件数据分割成若干数据块(Block),每个 DataNode 存储一部分数据块,这样文件就分布存储在整个 HDFS 服务器集群中。应用程序客户端(Client)可以并行对这些数据块进行访问,从而使得 HDFS 可以在服务器集群规模上实现数据并行访问,极大地提高了访问速度。

在实践中,HDFS 集群的 DataNode 服务器会有很多台,一般在几百台到几千台这样的规模,每台服务器配有数块磁盘,整个集群的存储容量大概在几 PB 到数百 PB。

NameNode 负责整个分布式文件系统的元数据(MetaData)管理,也就是文件路径名、数据块的 ID 以及存储位置等信息,相当于 *** 作系统中文件分配表(FAT)的角色。HDFS 为了保证数据的高可用,会将一个数据块复制为多份(默认3份),并将多份相同的数据块存储在不同的机架的服务器上。这样当有磁盘损坏,或者某个 DataNode 服务器宕机,甚至某个交换机宕机时,系统能通过其备份的数据块进行查找。

处理客户端的请求。

客户端向 HDFS 上传文件。

客户端向 HDFS 读取文件。

像 NameNode 这样主从服务器管理同一份数据的场景,如果从服务器错误地以为主服务器宕机而接管集群管理,会出现主从服务器一起对 DataNode 发送指令,进而导致集群混乱,也就是所谓的“脑裂”。这也是这类场景选举主服务器时,引入 ZooKeeper 的原因。

Java API读写HDFS

public class FSOptr {

/

@param args

/

public static void main(String[] args) throws Exception {

// TODO Auto-generated method stub

Configuration conf = new Configuration();

makeDir(conf);

rename(conf);

delete(conf);

}

// 创建文件目录

private static void makeDir(Configuration conf) throws Exception {

FileSystem fs = FileSystemget(conf);

Path dir = new Path("/user/hadoop/data/20140318");

boolean result = fsmkdirs(dir);// 创建文件夹

Systemoutprintln("make dir :" + result);

// 创建文件,并写入内容

Path dst = new Path("/user/hadoop/data/20140318/tmp");

byte[] buff = "hello,hadoop!"getBytes();

FSDataOutputStream outputStream = fscreate(dst);

outputStreamwrite(buff, 0, bufflength);

outputStreamclose();

FileStatus files[] = fslistStatus(dst);

for (FileStatus file : files) {

Systemoutprintln(filegetPath());

}

fsclose();

}

// 重命名文件

private static void rename(Configuration conf) throws Exception {

FileSystem fs = FileSystemget(conf);

Path oldName = new Path("/user/hadoop/data/20140318/1txt");

Path newName = new Path("/user/hadoop/data/20140318/2txt");

fsrename(oldName, newName);

FileStatus files[] = fslistStatus(new Path(

"/user/hadoop/data/20140318"));

for (FileStatus file : files) {

Systemoutprintln(filegetPath());

}

fsclose();

}

// 删除文件

@SuppressWarnings("deprecation")

private static void delete(Configuration conf) throws Exception {

FileSystem fs = FileSystemget(conf);

Path path = new Path("/user/hadoop/data/20140318");

if (fsisDirectory(path)) {

FileStatus files[] = fslistStatus(path);

for (FileStatus file : files) {

fsdelete(filegetPath());

}

} else {

fsdelete(path);

}

// 或者

fsdelete(path, true);

fsclose();

}

/

下载,将hdfs文件下载到本地磁盘

@param localSrc1

本地的文件地址,即文件的路径

@param hdfsSrc1

存放在hdfs的文件地址

/

public boolean sendFromHdfs(String hdfsSrc1, String localSrc1) {

Configuration conf = new Configuration();

FileSystem fs = null;

try {

fs = FileSystemget(URIcreate(hdfsSrc1), conf);

Path hdfs_path = new Path(hdfsSrc1);

Path local_path = new Path(localSrc1);

fscopyToLocalFile(hdfs_path, local_path);

return true;

} catch (IOException e) {

eprintStackTrace();

}

return false;

}

/

上传,将本地文件copy到hdfs系统中

@param localSrc

本地的文件地址,即文件的路径

@param hdfsSrc

存放在hdfs的文件地址

/

public boolean sendToHdfs1(String localSrc, String hdfsSrc) {

InputStream in;

try {

in = new BufferedInputStream(new FileInputStream(localSrc));

Configuration conf = new Configuration();// 得到配置对象

FileSystem fs; // 文件系统

try {

fs = FileSystemget(URIcreate(hdfsSrc), conf);

// 输出流,创建一个输出流

OutputStream out = fscreate(new Path(hdfsSrc),

new Progressable() {

// 重写progress方法

public void progress() {

// Systemoutprintln("上传完一个设定缓存区大小容量的文件!");

}

});

// 连接两个流,形成通道,使输入流向输出流传输数据,

IOUtilscopyBytes(in, out, 10240, true); // in为输入流对象,out为输出流对象,4096为缓冲区大小,true为上传后关闭流

return true;

} catch (IOException e) {

eprintStackTrace();

}

} catch (FileNotFoundException e) {

eprintStackTrace();

}

return false;

}

/

移动

@param old_st原来存放的路径

@param new_st移动到的路径

/

public boolean moveFileName(String old_st, String new_st) {

try {

// 下载到服务器本地

boolean down_flag = sendFromHdfs(old_st, "/home/hadoop/文档/temp");

Configuration conf = new Configuration();

FileSystem fs = null;

// 删除源文件

try {

fs = FileSystemget(URIcreate(old_st), conf);

Path hdfs_path = new Path(old_st);

fsdelete(hdfs_path);

} catch (IOException e) {

eprintStackTrace();

}

// 从服务器本地传到新路径

new_st = new_st + old_stsubstring(old_stlastIndexOf("/"));

boolean uplod_flag = sendToHdfs1("/home/hadoop/文档/temp", new_st);

if (down_flag && uplod_flag) {

return true;

}

} catch (Exception e) {

eprintStackTrace();

}

return false;

}

// copy本地文件到hdfs

private static void CopyFromLocalFile(Configuration conf) throws Exception {

FileSystem fs = FileSystemget(conf);

Path src = new Path("/home/hadoop/wordtxt");

Path dst = new Path("/user/hadoop/data/");

fscopyFromLocalFile(src, dst);

fsclose();

}

// 获取给定目录下的所有子目录以及子文件

private static void getAllChildFile(Configuration conf) throws Exception {

FileSystem fs = FileSystemget(conf);

Path path = new Path("/user/hadoop");

getFile(path, fs);

}

private static void getFile(Path path, FileSystem fs)throws Exception {

FileStatus[] fileStatus = fslistStatus(path);

for (int i = 0; i < fileStatuslength; i++) {

if (fileStatus[i]isDir()) {

Path p = new Path(fileStatus[i]getPath()toString());

getFile(p, fs);

} else {

Systemoutprintln(fileStatus[i]getPath()toString());

}

}

}

//判断文件是否存在

private static boolean isExist(Configuration conf,String path)throws Exception{

FileSystem fileSystem = FileSystemget(conf);

return fileSystemexists(new Path(path));

}

//获取hdfs集群所有主机结点数据

private static void getAllClusterNodeInfo(Configuration conf)throws Exception{

FileSystem fs = FileSystemget(conf);

DistributedFileSystem hdfs = (DistributedFileSystem)fs;

DatanodeInfo[] dataNodeStats = hdfsgetDataNodeStats();

String[] names = new String[dataNodeStatslength];

Systemoutprintln("list of all the nodes in HDFS cluster:"); //print info

for(int i=0; i < dataNodeStatslength; i++){

names[i] = dataNodeStats[i]getHostName();

Systemoutprintln(names[i]); //print info

}

}

//get the locations of a file in HDFS

private static void getFileLocation(Configuration conf)throws Exception{

FileSystem fs = FileSystemget(conf);

Path f = new Path("/user/cluster/dfstxt");

FileStatus filestatus = fsgetFileStatus(f);

BlockLocation[] blkLocations = fsgetFileBlockLocations(filestatus,0,filestatusgetLen());

int blkCount = blkLocationslength;

for(int i=0; i < blkCount; i++){

String[] hosts = blkLocations[i]getHosts();

//Do sth with the block hosts

Systemoutprintln(hosts);

}

}

//get HDFS file last modification time

private static void getModificationTime(Configuration conf)throws Exception{

FileSystem fs = FileSystemget(conf);

Path f = new Path("/user/cluster/dfstxt");

FileStatus filestatus = fsgetFileStatus(f);

long modificationTime = filestatusgetModificationTime(); // measured in milliseconds since the epoch

Date d = new Date(modificationTime);

Systemoutprintln(d);

}

}

你好,实体类定义代码:

<pre name="code" class="java">package comqinoperadb;

import javaioDataInput;

import javaioDataOutput;

import javaioIOException;

import javasqlPreparedStatement;

import javasqlResultSet;

import javasqlSQLException;

import orgapachehadoopioText;

import orgapachehadoopioWritable;

import orgapachehadoopmapreducelibdbDBWritable;

/

封装数据库实体信息

的记录

搜索大数据技术交流群:376932160

/

public class PersonRecoder implements Writable,DBWritable {

public int id;//对应数据库中id字段

public String name;//对应数据库中的name字段

public int age;//对应数据库中的age字段

@Override

public void readFields(ResultSet result) throws SQLException {

thisid=resultgetInt(1);

thisname=resultgetString(2);

thisage=resultgetInt(3);

}

@Override

public void write(PreparedStatement stmt) throws SQLException {

stmtsetInt(1, id);

stmtsetString(2, name);

stmtsetInt(3, age);

}

@Override

public void readFields(DataInput arg0) throws IOException {

// TODO Auto-generated method stub

thisid=arg0readInt();

thisname=TextreadString(arg0);

thisage=arg0readInt();

}

@Override

public void write(DataOutput out) throws IOException {

// TODO Auto-generated method stub

outwriteInt(id);

TextwriteString(out, thisname);

outwriteInt(thisage);

}

@Override

public String toString() {

// TODO Auto-generated method stub

return "id: "+id+" 年龄: "+age+" 名字:"+name;

}

}

</pre>

MR类的定义代码,注意是一个Map Only作业:

<pre name="code" class="java">package comqinoperadb;

import javaioIOException;

import orgapachehadoopconfConfiguration;

import orgapachehadoopfsFileSystem;

import orgapachehadoopfsPath;

import orgapachehadoopioLongWritable;

import orgapachehadoopioText;

import orgapachehadoopmapredJobConf;

import orgapachehadoopmapredlibIdentityReducer;

import orgapachehadoopmapreduceJob;

import orgapachehadoopmapreduceMapper;

import orgapachehadoopmapreducelibdbDBConfiguration;

import orgapachehadoopmapreducelibdbDBInputFormat;

import orgapachehadoopmapreduceliboutputFileOutputFormat;

public class ReadMapDB {

/

Map作业读取数据记录数

/

private static class DBMap extends Mapper<LongWritable, PersonRecoder , LongWritable, Text>{

@Override

protected void map(LongWritable key, PersonRecoder value,Context context)

throws IOException, InterruptedException {

contextwrite(new LongWritable(valueid), new Text(valuetoString()));

}

}

public static void main(String[] args)throws Exception {

JobConf conf=new JobConf(ReadMapDBclass);

//Configuration conf=new Configuration();

// confset("mapredjobtracker","19216875130:9001");

//读取person中的数据字段

// confsetJar("ttjar");

//注意这行代码放在最前面,进行初始化,否则会报

DBConfigurationconfigureDB(conf, "commysqljdbcDriver", "jdbc:mysql://19216821136:3306/test", "root", "qin");

/要读取的字段信息/

String fileds[]=new String[]{"id","name","age"};

/Job任务/

Job job=new Job(conf, "readDB");

Systemoutprintln("模式: "+confget("mapredjobtracker"));

/设置数据库输入格式的一些信息/

DBInputFormatsetInput(job, PersonRecoderclass, "person", null, "id", fileds);

/设置输入格式/

jobsetInputFormatClass(DBInputFormatclass);

jobsetOutputKeyClass(LongWritableclass);

jobsetOutputValueClass(Textclass);

jobsetMapperClass(DBMapclass);

String path="hdfs://19216875130:9000/root/outputdb";

FileSystem fs=FileSystemget(conf);

Path p=new Path(path);

if(fsexists(p)){

fsdelete(p, true);

Systemoutprintln("输出路径存在,已删除!");

}

FileOutputFormatsetOutputPath(job,p );

Systemexit(jobwaitForCompletion(true) 0 : 1);

}

}

1、简单介绍下hadoop吧?

    广义上hadoop是指与hadoop相关的大数据生态圈。包含hive、spark、hbase等。

    狭义上hadoop指的是apache的开源框架。有三个核心组件:

----hdfs:分布式文件存储系统

----yarn:分布式资源管理调度平台

----mr:分布式计算引擎

2、介绍下hdfs

全称为Hadoop Distributed File System。有三个核心组件:

namenode:有三个作用,第一是负责保存集群的元数据信息,第二是负责维护整个集群节点的正常运行。

第三是负责处理客户端的请求。

datanode:负责实际保存数据。实际执行数据块的读写 *** 作。

secondarynamenode:辅助namenode进行元数据的管理。不是namenode的备份。

3、namenode的工作机制?

    namenode在内存中保存着整个内存系统的名称空间和文件数据块的地址映射。整个hdfs可存储的文件数受限于namenode的内存大小。所以hdfs不适合大量小文件的存储。

---namenode有三种元数据存储方式来管理元数据:

    》内存元数据:内存中保存了完整的元数据

    》保存在磁盘上的元数据镜像文件(fsimage):该文件时hdfs存在磁盘中的元数据检查点,里面保存的是最后一次检查点之前的hdfs文件系统中所有目录和文件的序列化信息。

    》数据 *** 作日志文件(edits):用于衔接内存meta data和持久化元数据镜像fsimage之间的 *** 作日志文件。保存了自最后一次检查点之后所有针对hdfs文件系统的 *** 作。如对文件的增删改查。

4、如何查看元数据信息?

    因为edits和fsimage文件是经过序列化的,所以不能直接查看。hadoop20以上提供了查看两种文件的工具。

----命令:hdfs oiv 可以将fsimage文件转换成其他格式,如xml和文本文件。-i 表示输入fsimage文件。-o 输出文件路径,-p 指定输出文件

                hdfs oev可以查看edits文件。同理需要指定相关参数。

详情查看: >

一、涉及的问题

1 冗余数据保存

2 数据保存策略

3 数据恢复

二、冗余数据保存问题

1 冗余因子

出于成本考虑(也是HDFS优势),HDFS常架构在廉价机器上——经常出故障。所以必须有冗余机制。一般每个块都保存3份,即冗余因子默认是3

注意:伪分布式配置,即名称节点和数据节点都放在同一个机器上,则冗余因子显然只能是1,因为只有一个机器

2 冗余机制的好处

(1) 加快数据传输速度——当多个客户端同时想访问相同数据块时,可以同时并行而不需要排队

(2) 很容易检查数据错误——互相对照发现错误

(3) 保证数据可靠性——HDFS有这样的机制:一旦探测到一个副本故障,会自动复制正确副本,使冗余因子恢复默认值

三、数据保存与读取

1 第一副本存放策略:

(1) 如果保存请求来自集群内部,第一个副本放在发起者(应用)所在节点。比如一个在DataNode1上的应用发起存数据请求,那就把它第一份副本也存在DataNode1

(2) 如果保存请求来自集群外部,HDFS会随机挑选一台磁盘不太忙且CPU不太忙的节点来放置第一个副本

2 第二个副本存放策略:

放在和第一副本不同的机架的节点上 。如下图中DataNode4,它和DataNode1在不同机架上

3 第三副本放置策略:

放在和第一副本相同的机架的其他节点。如图中的DataNode2或DataNode3

4 更多副本存放策略:

全部随机放置(依靠随机算法)

5、数据读取

原则:就近读取

——HDFS提供一个API可以告诉数据节点的机架ID,客户端也可以用API诊断自己所在机架ID。ID相同说明在同一机架。而相同机架数据间通信很快,它们就是“相近”的数据。

——而前面也说了,每个数据块都有多个不同的副本,如果找到某个副本和客户端在同一个机架上,就优先选此副本。如果没有就随机找一个副本读取

四、数据的错误与恢复

1 名称节点出错

只有一个名称节点,而且它保存了核心数据结构FsImage和EditLog。恢复方法:

(1) 在HDFS10里只能暂停服务,从第二名称节点(冷备份)恢复

(2) 在HDFS20里可以直接用热备份恢复而不用暂停服务

2 数据节点出错

(1) 如何发现数据节点出问题:

在整个运行期间,DataNode都会定期通过远程调用向NameNode发送心跳信息。一旦隔了一个周期收不到心跳信息,则NameNode就知道这个DataNode发生了故障

(2) 如何恢复数据节点:

NameNode会在状态列表里把出错的DataNode标记为不可用(宕机),然后把它里面的数据块对应的备份(在其他DataNode上)复制到另一个DataNode上去

——HDFS和其他分布式文件系统最大的区别就是 可以调整冗余数据位置 。这种调整不仅发生在故障时,也可以在在机器负载不均衡时把一个DataNode的数据迁移到另一个上面以平衡负载

3 数据出错

(1) 如何发现数据出错:

“校验码机制”——客户端每写入一个数据块,都会为其生成一个校验码并保存在同一文件目录下。读取数据块同时会核对其校验码,如果不对说明数据出问题了。

(2) 如何恢复数据:

从备份复制过来

Reference:

>

以上就是关于HDFS 架构全部的内容,包括:HDFS 架构、如何使用Java API读写HDFS、如何通过网络访问hdfs的数据等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/web/10179199.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2023-05-06
下一篇2023-05-06

发表评论

登录后才能评论

评论列表(0条)

    保存