如何使用Java API操作HDFS系统?
1.搭建项目环境
打开Eclipse选择FileàNewàMaven Project创建Maven工程,选择“Create a simple project ”选项,点击【Next】按钮,会进入“New Maven Project”界面,如图1所示。
图1 创建Maven工程
在图1中,勾选“Create a simple project(skip archetype selection)”表示创建一个简单的项目(跳过对原型模板的选择),然后勾选“User default Workspace location”表示使用本地默认的工作空间之后,点击【Next】按钮,如图2所示。
图2 创建Maven工程配置
在图2中,GroupID也就是项目组织唯一的标识符,实际对应Java的包结构,这里输入com.itcast。ArtifactID就是项目的唯一标识符,实际对应项目的名称,就是项目根目录的名称,这里输入HadoopDemo,打包方式这里选择Jar包方式即可,后续创建Web工程选择War包。
此时Maven工程已经被创建好了,会发现在Maven项目中,有一个pom.xml的配置文件,这个配置文件就是对项目进行管理的核心配置文件。
使用Java API操作HDFS需要用到hadoop-common、hadoop-hdfs、hadoop-client三种依赖,同时为了进行单元测试,还要引入junit的测试包,具体代码如文件所示。
文件 pom.xml
在上述代码中,@Before是一个用于在Junit单元测试框架中控制程序最先执行的注解,这里可以保证init()方法在程序中最先执行。
小提示:
FileSystem.get()方法从conf中的设置的参数 fs.defaultFS的配置值,用来设置文件系统类型。如果代码中没有指定为fs.defaultFS,并且工程classpath下也没有给定相应的配置,则conf中的默认值就来自于hadoop-common-2.7.4.jar包中的core-default.xml,默认值为:“file:/// ”,这样获取的不是DistributedFileSystem实例,而是一个本地文件系统的客户端对象。
3.上传文件到HDFS
初始化客户端对象后,接下来实现上传文件到HDFS的功能。由于采用Java测试类来实现JavaApi对HDFS的操作,因此可以在HDFS_CRUD.java文件中添加一个testAddFileToHdfs()方法来演示本地文件上传到HDFS的示例,具体代码如下:
从上述代码可以看出,可以通过FileSystem对象的copyFromLocalFile()方法,将本地数据上传至HDFS中。copyFromLocalFile()方法接收两个参数,第一个参数是要上传的文件所在的本地路径(需要提前创建),第二个参数是要上传到HDFS的目标路径。
4.从HDFS下载文件到本地
在HDFS_CRUD.java文件中添加一个testDownloadFileToLocal()方法,来实现从HDFS中下载文件到本地系统的功能,具体代码如下:
从上述代码可以看出,可以通过FileSystem对象的copyToLocalFile()方法从HDFS上下载文件到本地。copyToLocalFile()方法接收两个参数,第一个参数是HDFS上的文件路径,第二个参数是下载到本地的目标路径。注意:在Windows平台开发HDFS项目时,若不设置Hadoop开发环境,则会报以下的错误:
解决方式:
(1)根据教材提示,安装配置windows平台hadoop(注意,配置后必须重启电脑),运行没有问题。
(2)直接使用下载的hadoop linux平台下的压缩包进行解压,然后在解压包bin目录下额外添加windows相关依赖文件(winutils.exe、winutils.pdb、hadoop.dll),然后进行hadoop环境变量配置(注意,配置后必须重启电脑),运行同样没有问题。
(3)使用FileSystem自带的方法即使不配置windows平台hadoop也可以正常运行(这种方法下载后就是没有附带一个类似.testFile.crc的校验文件):
fs.copyToLocalFile(false,new Path(\”/testFile\”), new Path(\”D:/\”),true);
5.目录操作
在HDFS_CRUD.java文件中添加一个testMkdirAndDeleteAndRename()方法,实现目录的创建、删除、重命名的功能,具体代码如下:
从上述代码可以看出,可以通过调用FileSystem的mkdirs()方法创建新的目录;调用delete()方法可以删除文件夹,delete()方法接收两个参数,第一个参数表示要删除的文件夹路径,第二个参数用于设置是否递归删除目录,其值为true或false,true表示递归删除,false表示非递归删除;调用rename()方法可以对文件或文件夹重命名,rename()接收两个参数,第一个参数代表需要修改的目标路径,第二个参数代表新的命名。
6.查看目录中的文件信息
在HDFS_CRUD.java文件中添加一个testListFiles()方法,实现查看目录中所有文件的详细信息的功能,代码如下:
在上述代码中,可以调用FileSystem的listFiles()方法获取文件列表,其中第一个参数表示需要获取的目录路径,第二个参数表示是否递归查询,这里传入参数为true,表示需要递归查询。
Java8中的Stream API详细分析
昨天粗略的介绍了下java8中的一些特性,对Stream API也做了简单的介绍。这次单独来介绍下Stream API中的一些常用的方法。因为这个Stream流在平时的开发中使用的频率还是蛮高的。
介绍:
Java 8 API 中的Stream(流)是一种用于处理集合数据的抽象概念。它提供了一种声明式的方式来处理数据,类似于使用 SQL 语句从数据库中查询数据。Stream使用一种类似用 SQL 语句从数据库查询数据的直观方式来提供一种对 Java 集合运算和表达的高阶抽象。Stream API 可以极大提高 Java 程序员的生产力,让程序员写出高效率、干净、简洁的代码。
Stream(流)是一个来自数据源的元素队列并支持聚合操作:
- 元素:是特定类型的对象,形成一个队列。
- 数据源:流的来源,可以是集合、数组、I/O channel、产生器 generator 等。
- 聚合操作:类似 SQL 语句一样的操作,比如filter、map、reduce、find、match、sorted等。
和以前的Collection操作不同,Stream操作还有两个基础的特征:
- Pipelining:中间操作都会返回流对象本身,这样多个操作可以串联成一个管道,如同流式风格(fluent style),这样做可以对操作进行优化,比如延迟执行(laziness)和短路(short-circuiting)。
- 内部迭代:以前对集合遍历都是通过Iterator或者For-Each的方式,显式地在集合外部进行迭代,这叫做外部迭代。Stream提供了内部迭代的方式,通过访问者模式(Visitor)实现。
在 Java 8 中,集合接口有两个方法来生成流:
- stream():为集合创建串行流。元素按顺序依次处理;适用于大多数常规的顺序处理逻辑
- parallelStream():为集合创建并行流。以并行的方式处理元素,利用多核 CPU 的优势来提高处理效率。
- 根据Collection获取
错误使用,流是一次性的,不能重复使用
正确使用方式
- 通过Stream的of方法
操作到数组中的数据,由于数组对象不可能添加默认方法,Stream接口中提供了静态方法of
- filter(Predicate<? super T> predicate):根据指定条件进行过滤。
filter方法的作用是用来过滤数据的。返回符合条件的数据
可以通过filter方法将一个流转换成另一个子集流:
Stream<T> filter(Predicate<? super T> predicate);
- map(Function<? super T,? extends R> mapper):对元素进行映射转换。
当前流中的T类型数据转换为另一种R类型的数据
将流中的元素映射到另一个流中,可以使用map方法:
<R> Stream<R> map(Function<? super T, ? extends R> mapper);
- flatMap(Function<? super T,? extends Stream<? extends R>> mapper):将每个元素映射为一个流并展平。
flatMap方法主要用于将流中的每个元素转换为一个新的流,然后将这些新生成的流中的元素全部“展平”合并到一个新的流中。它特别适用于处理元素本身又包含多个子元素的情况。比如,一个元素是一个包含多个值的集合,通过flatMap可以将这些值都提取出来组成一个新的流。
- distinct():去除重复元素。
要去掉重复数据,可以使用distinct
Stream流中的distinct方法对于基本数据类型是可以直接出重的,但是对于自定义类型,我们是需要重写hashCode和equals方法来移除重复元素。
- sorted(Comparator<? super T> comparator):排序。
- limit(long maxSize):限制流的长度。
limit方法可以对流进行截取处理,只取前n个数据
- skip(long n):跳过前 n 个元素。
跳过前面几个元素
连用 注意limit 和 skip的 数据
- forEach(Consumer<? super T> action):对每个元素执行操作。
void forEach(Consumer<? super T> action); // 用来变量数据
- collect(Collector<? super T, A, R> collector):收集流的结果到特定结构。
collect方法主要用于根据指定的收集器(Collector)来对Stream流进行最终的收集操作,以得到一个特定的结果
- reduce(BinaryOperator<T> accumulator):进行归作。
- count():计算元素数量。
统计元素的个数
- anyMatch(Predicate<? super T> predicate):是否存在至少一个匹配元素。
- allMatch(Predicate<? super T> predicate):是否所有元素都匹配。
- noneMatch(Predicate<? super T> predicate):是否没有元素匹配。
以上就是总结的Stream的经常使用的一些使用方法,在开发中也比较常见。具体的使用需要结合实际情况选择合适的方法进行使用。
本文作者及来源:Renderbus瑞云渲染农场https://www.renderbus.com
文章为作者独立观点不代本网立场,未经允许不得转载。