如何将hdfs里某一目录下的所有文件的文件名读取出来

如何将hdfs里某一目录下的所有文件的文件名读取出来,第1张

最近刚刚接触到RobotFramework,发现这个工具倒是可以满足我的要求,而且可以结合seleniumLibrary,用来做web的自动化测试相当不错。之前我也接触过selenium,不过感觉那个工具更贴近开发人员使用,有了robotFramework之后,感觉这个工具相当强大,而且是贴近测试人员的。之所以说强大,主要是这些测试脚本都可以用文本格式保存(如txt/html等)

==安装篇==

如果有想学的朋友可以自己下载以下文件安装(Google-code里可以找到大部分的安装文件):

这篇文章的内容比较旧了,最新的安装指南请查看 更新篇

python-271msi(首先要有python,请选择将Python加入Path)

wxPython28-win32-unicode-28110-py27exe(wxPython,必须要的)

robotframework-260win32exe(然后装robot的Framework)

robotframework-ride-0381win32exe(robotFramework的IDE,很不错)

robotframework-seleniumlibrary-28win32exe(seleniumLibrary)

安装成功后

执行[PythonDir]\Scripts\ridepy

看到界面就是安装成功了。

如果需要AutoIt支持就下载下面2个东东。

AutoItLibrary-11

pywin32-216win32-py27exe

==入门篇==

安装完成了,这个框架可以说是基于keyword的 *** 作,按F5可以看到所有加载的keyword。

首先新增一个project

然后新增suite

然后新增test case,接着在suite层级add library,把selenium library加进来,添加后按F5检验是否添加成功,如图

OK,继续在suite的setting里设置suite启动和结束的keyword,即Start Selenium Server和Stop Selenium Server,他会在运行时帮助我们自动启动seleniumserver。

接下来在test case里添加一个步骤,open browser(一般用selenium做web测试都要用这个方法来打开浏览器),添加后关键字变成蓝色表示找到关键字了,否则可能是拼写错误或者没有加载相应的library。红色表示有一个必选参数要给定输入值,具体参数可以看F5里的keyword说明。

输入参数,第二个参数默认是firefox,不过我没装,就用ie吧。

以上只是一个简单的例子,没有详细说明每个步骤的 *** 作,只是初步介绍。后续再详细介绍。

refer: >

Java API读写HDFS

public class FSOptr {

/

@param args

/

public static void main(String[] args) throws Exception {

// TODO Auto-generated method stub

Configuration conf = new Configuration();

makeDir(conf);

rename(conf);

delete(conf);

}

// 创建文件目录

private static void makeDir(Configuration conf) throws Exception {

FileSystem fs = FileSystemget(conf);

Path dir = new Path("/user/hadoop/data/20140318");

boolean result = fsmkdirs(dir);// 创建文件夹

Systemoutprintln("make dir :" + result);

// 创建文件,并写入内容

Path dst = new Path("/user/hadoop/data/20140318/tmp");

byte[] buff = "hello,hadoop!"getBytes();

FSDataOutputStream outputStream = fscreate(dst);

outputStreamwrite(buff, 0, bufflength);

outputStreamclose();

FileStatus files[] = fslistStatus(dst);

for (FileStatus file : files) {

Systemoutprintln(filegetPath());

}

fsclose();

}

// 重命名文件

private static void rename(Configuration conf) throws Exception {

FileSystem fs = FileSystemget(conf);

Path oldName = new Path("/user/hadoop/data/20140318/1txt");

Path newName = new Path("/user/hadoop/data/20140318/2txt");

fsrename(oldName, newName);

FileStatus files[] = fslistStatus(new Path(

"/user/hadoop/data/20140318"));

for (FileStatus file : files) {

Systemoutprintln(filegetPath());

}

fsclose();

}

// 删除文件

@SuppressWarnings("deprecation")

private static void delete(Configuration conf) throws Exception {

FileSystem fs = FileSystemget(conf);

Path path = new Path("/user/hadoop/data/20140318");

if (fsisDirectory(path)) {

FileStatus files[] = fslistStatus(path);

for (FileStatus file : files) {

fsdelete(filegetPath());

}

} else {

fsdelete(path);

}

// 或者

fsdelete(path, true);

fsclose();

}

/

下载,将hdfs文件下载到本地磁盘

@param localSrc1

本地的文件地址,即文件的路径

@param hdfsSrc1

放在hdfs的文件地址

/

public boolean sendFromHdfs(String hdfsSrc1, String localSrc1) {

Configuration conf = new Configuration();

FileSystem fs = null;

try {

fs = FileSystemget(URIcreate(hdfsSrc1), conf);

Path hdfs_path = new Path(hdfsSrc1);

Path local_path = new Path(localSrc1);

fscopyToLocalFile(hdfs_path, local_path);

return true;

} catch (IOException e) {

eprintStackTrace();

}

return false;

}

/

上传,将本地文件copy到hdfs系统中

@param localSrc

本地的文件地址,即文件的路径

@param hdfsSrc

存放在hdfs的文件地址

/

public boolean sendToHdfs1(String localSrc, String hdfsSrc) {

InputStream in;

try {

in = new BufferedInputStream(new FileInputStream(localSrc));

Configuration conf = new Configuration();// 得到配置对象

FileSystem fs; // 文件系统

try {

fs = FileSystemget(URIcreate(hdfsSrc), conf);

// 输出流,创建一个输出流

OutputStream out = fscreate(new Path(hdfsSrc),

new Progressable() {

// 重写progress方法

public void progress() {

// Systemoutprintln("上传完一个设定缓存区大小容量的文件!");

}

});

// 连接两个流,形成通道,使输入流向输出流传输数据,

IOUtilscopyBytes(in, out, 10240, true); // in为输入流对象,out为输出流对象,4096为缓冲区大小,true为上传后关闭流

return true;

} catch (IOException e) {

eprintStackTrace();

}

} catch (FileNotFoundException e) {

eprintStackTrace();

}

return false;

}

/

移动

@param old_st原来存放的路径

@param new_st移动到的路径

/

public boolean moveFileName(String old_st, String new_st) {

try {

// 下载到服务器本地

boolean down_flag = sendFromHdfs(old_st, "/home/hadoop/文档/temp");

Configuration conf = new Configuration();

FileSystem fs = null;

// 删除源文件

try {

fs = FileSystemget(URIcreate(old_st), conf);

Path hdfs_path = new Path(old_st);

fsdelete(hdfs_path);

} catch (IOException e) {

eprintStackTrace();

}

// 从服务器本地传到新路径

new_st = new_st + old_stsubstring(old_stlastIndexOf("/"));

boolean uplod_flag = sendToHdfs1("/home/hadoop/文档/temp", new_st);

if (down_flag && uplod_flag) {

return true;

}

} catch (Exception e) {

eprintStackTrace();

}

return false;

}

// copy本地文件到hdfs

private static void CopyFromLocalFile(Configuration conf) throws Exception {

FileSystem fs = FileSystemget(conf);

Path src = new Path("/home/hadoop/wordtxt");

Path dst = new Path("/user/hadoop/data/");

fscopyFromLocalFile(src, dst);

fsclose();

}

// 获取给定目录下的所有子目录以及子文件

private static void getAllChildFile(Configuration conf) throws Exception {

FileSystem fs = FileSystemget(conf);

Path path = new Path("/user/hadoop");

getFile(path, fs);

}

private static void getFile(Path path, FileSystem fs)throws Exception {

FileStatus[] fileStatus = fslistStatus(path);

for (int i = 0; i < fileStatuslength; i++) {

if (fileStatus[i]isDir()) {

Path p = new Path(fileStatus[i]getPath()toString());

getFile(p, fs);

} else {

Systemoutprintln(fileStatus[i]getPath()toString());

}

}

}

//判断文件是否存在

private static boolean isExist(Configuration conf,String path)throws Exception{

FileSystem fileSystem = FileSystemget(conf);

return fileSystemexists(new Path(path));

}

//获取hdfs集群所有主机结点数据

private static void getAllClusterNodeInfo(Configuration conf)throws Exception{

FileSystem fs = FileSystemget(conf);

DistributedFileSystem hdfs = (DistributedFileSystem)fs;

DatanodeInfo[] dataNodeStats = hdfsgetDataNodeStats();

String[] names = new String[dataNodeStatslength];

Systemoutprintln("list of all the nodes in HDFS cluster:"); //print info

for(int i=0; i < dataNodeStatslength; i++){

names[i] = dataNodeStats[i]getHostName();

Systemoutprintln(names[i]); //print info

}

}

//get the locations of a file in HDFS

private static void getFileLocation(Configuration conf)throws Exception{

FileSystem fs = FileSystemget(conf);

Path f = new Path("/user/cluster/dfstxt");

FileStatus filestatus = fsgetFileStatus(f);

BlockLocation[] blkLocations = fsgetFileBlockLocations(filestatus,0,filestatusgetLen());

int blkCount = blkLocationslength;

for(int i=0; i < blkCount; i++){

String[] hosts = blkLocations[i]getHosts();

//Do sth with the block hosts

Systemoutprintln(hosts);

}

}

//get HDFS file last modification time

private static void getModificationTime(Configuration conf)throws Exception{

FileSystem fs = FileSystemget(conf);

Path f = new Path("/user/cluster/dfstxt");

FileStatus filestatus = fsgetFileStatus(f);

long modificationTime = filestatusgetModificationTime(); // measured in milliseconds since the epoch

Date d = new Date(modificationTime);

Systemoutprintln(d);

}

}

你不要用hdfs啊,fsdefaultname配置设为file:///,运用本地文件系统试试

你的采纳是我前进的动力,还有不懂的地方,请继续“追问”。

如你还有别的问题,可另外向我求助;答题不易,互相理解,互相帮助。

1Hadoop 分布式 文件系统。特点:性能高、效率高、速度快

2可以在廉价的机器上运行的 可容错 文件系统。

当集群中有机器挂掉时,HDFS会自动将挂掉的机器上的任务分配给正常的机器,使任务继续保持正常工作。

2HDFS处理更加容易。当对一个大型文件进行写 *** 作时,如果将该文件整个写入一个节点,那么该节点的负载便会急剧增加,这样就丧失了分布式文件系统的意义。所以,应该利用HDFS将文件拆分成不同的块,然后将不同的块分配到不同的节点上去,此时,DFS就需要管理者确定文件如何进行拆分,以及每一个块应该分配到哪一个节点。对文件进行 *** 作时,在单机情况下,首先需要知道文件被拆分成多少块,每一个块被放在了哪一个节点上,以及块之间的顺序(文件的粘连)。而HDFS的出现,使得分布式文件集群不再需要人进行管理,利用HDFS读取文件时,我们不需要关心文件如何拆分,分配,粘连。只用告诉HDFS文件的路径即可。

HDFS的指令类似于linux下的指令。

查看文件:hdfs dfs -ls /查询的文件目录

删除文件:hdfs dfs -rm r /删除的文件

创建文件夹:hdfs dfs -mkdir /文件夹名称

上传文件至HDFS:hdfs dfs -put 需要上传的文件 /上传的文件路径

为什么需要学习HDFS结构?

1面试中,能够运用于所有分布式文件系统设计。

既然分布式系统下是多节点运行,那么节点之间是否通信?slave节点只接受来自master节点的命令,向master节点发送心跳指令,slave节点之间不会主动通信。

aMaster slaver 模式:

1High consistency:一致性。当文件中的一个数据块写入slave节点时,当且仅当数据块被成功写入到所有备份的slave节点,slave节点向client反馈写入 *** 作成功,否则,重传写入;

2Simple design:易设计:不需要考虑子节点如何通信。只需要考虑主节点的工作;

3单master节点不具有鲁棒性。

bPeer peer 模式:

1所有的读写 *** 作均匀分布在每一个节点上,每一个节点的负载不会很高;

2任意一个节点挂掉不会影响其他节点;

3低一致性。没有数据的复制步骤。

2更好的理解hadoop生态系统

amaster节点会传输数据吗?

不会,master节点只接收client的请求,决定哪一个slave节点进行读写 *** 作,然后,client直接与slave节点进行通信。如果数据从master节点传输,那么master节点就会成为影响数据传输的瓶颈。

bslave节点如何存储数据?

整个大文件?小的文件块?。HDFS借鉴GFS的设计理念,以block为传输单位,将大文件拆分成一个一个小文件,而一个小文件就是block。block的大小可以由Configuration定义,默认大小是128M。

c谁来决定将文件拆分成块?

masterslave。两者都不是,由HDFS client决定将大文件拆分成block(块)。HDFS的目的是将所有的节点包装起来,可以理解成将所有的节点放在一个黑箱里,我们不需要知道黑箱里到底发生了什么,只需要告诉黑箱需要做什么工作,这里的HDFS client相当于HDFS与user通信的中间媒介。HDFS client相当于一个软件包(api),可以存放在master或者slave或者额外的一个新节点上。

写入in memory失败(ACK出现问题)时,master会重新选择3个新的slave节点。

以上就是关于如何将hdfs里某一目录下的所有文件的文件名读取出来全部的内容,包括:如何将hdfs里某一目录下的所有文件的文件名读取出来、Spark读取HDFS数据分区参考、如何使用Java API读写HDFS等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/web/9288358.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2023-04-26
下一篇2023-04-26

发表评论

登录后才能评论

评论列表(0条)

    保存