pom
<dependency><!--hdfs客户端--><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.2.2</version></dependency>
java代码
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;@Slf4j
public class IOHDFSUtil {public static String hdfsUser;//设置访问hdfs的用户名称public static String hdfsUrl;//设置hdfs的地址public static Configuration confDefault = new Configuration();/*** 初始化HDFS的配制参数,默认读取resource目录下的core-site.xml和hdfs-site.xml文件作为hdfs的连接配置* 如果resource目录下没有,转而使用启动jar当前目录下的core-site.xml和hdfs-site.xml文件* 如果找不到,则使用成员变量的hdfsUrl地址*/public static void init() throws IOException {System.setProperty("HADOOP_USER_NAME", hdfsUser);System.setProperty("hadoop.home.dir", ParamUtil.Convert.getProjectFilePath("winutils", true));confDefault.set("dfs.client.datanode-restart.timeout", "60s");confDefault.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");confDefault.set("dfs.client.use.datanode.hostname", "true");String sourceConfHost = confDefault.get("fs.defaultFS");if (!"file:///".equals(sourceConfHost)) {hdfsUrl = sourceConfHost;log.info("使用系统默认环境变量中的hdfs地址: {}", hdfsUrl);return;}String coreConfigFilePath = ParamUtil.Convert.getProjectFilePath("core-site.xml");String hdfsConfigFilePath = ParamUtil.Convert.getProjectFilePath("hdfs-site.xml");if (new File(hdfsConfigFilePath).exists() && new File(coreConfigFilePath).exists()) {confDefault.addResource(Files.newInputStream(Paths.get(hdfsConfigFilePath)));confDefault.addResource(Files.newInputStream(Paths.get(coreConfigFilePath)));hdfsUrl = confDefault.get("fs.defaultFS");log.info("使用当前目录下的[core-site.xml/hdfs-site.xml]配制文件 地址: {}", hdfsUrl);return;}log.info("使用当前目录config.xml配制的地址: {}", hdfsUrl);}/*** 获取hdfs的客户端连接** @return 返回连接对象* @throws Exception 连接异常*/public static FileSystem getHDFSConn() throws Exception {FileSystem conn;try {conn = FileSystem.get(new URI(hdfsUrl), confDefault);} catch (Exception e) {throw new Exception("初始化HDFS连接异常", e);}return conn;}/*** 上传文件** @param localFilePath 本地文件地址* @param absHDFSFilePath 上传hdfs文件的绝对路径* @return 是否上传成功* @throws Exception 异常信息*/public static boolean uploadToHDFS(String localFilePath, String absHDFSFilePath) throws Exception {FileSystem hdfs = null;try {hdfs = IOHDFSUtil.getHDFSConn();return uploadToHDFS(hdfs, localFilePath, absHDFSFilePath);} finally {ResourceUtil.close(hdfs);}}/*** 上传文件到HDFS** @param localFilePath 本地文件绝对路径* @param absHDFSFilePath 上传hdfs文件的绝对路径* @param hdfs hdfs连接对象* @return 上传是否成功* @throws Exception 异常信息*/public static boolean uploadToHDFS(FileSystem hdfs, String localFilePath, String absHDFSFilePath) throws Exception {try {File uploadFile = new File(absHDFSFilePath);String parentPath = uploadFile.getParent();Path localPath = new Path(localFilePath);Path hdfsDir = new Path(parentPath);Path hdfsFile = new Path(absHDFSFilePath);if (hdfs.exists(hdfsFile)) {log.info("HDFS内部已经存在文件,不能重复上传: {}", absHDFSFilePath);return true;}if (!new File(localFilePath).exists()) {log.error("本地文件[{}]不存在,请检查异常任务记录,重新采集数据", localFilePath);return false;}if (!hdfs.exists(hdfsDir)) hdfs.mkdirs(hdfsDir);//不存在目录,创建hdfs.copyFromLocalFile(localPath, hdfsFile);log.info("上传文件[{}]到hdfs完成", hdfsFile.getName());return true;} catch (Exception e) {log.info(ParamUtil.StringDeal.placeholder("上传本地文件到HDFS失败: {}", localFilePath), e);return false;}}/*** 下载hdfs文件到本地** @param absHDFSFilePath 下载hdfs文件的绝对路径* @param localFilePath 下载后的本地文件路径* @throws Exception 异常信息*/public static void downloadToLocal(String absHDFSFilePath, String localFilePath) throws Exception {FileSystem hdfs = null;try {hdfs = IOHDFSUtil.getHDFSConn();downloadToLocal(absHDFSFilePath, localFilePath, hdfs);} finally {ResourceUtil.close(hdfs);}}/*** 下载hdfs文件到本地** @param absHDFSFilePath 下载hdfs文件的绝对路径* @param localFilePath 下载到本地文件路径* @param hdfs hdfs的客户端连接对象* @throws Exception 抛出异常*/public static void downloadToLocal(String absHDFSFilePath, String localFilePath, FileSystem hdfs) throws Exception {Path path = new Path(absHDFSFilePath);if (!hdfs.exists(path))throw new Exception(ParamUtil.StringDeal.placeholder("HDFS中不存在文件[{}]", absHDFSFilePath));File file = new File(localFilePath);if (file.exists())throw new Exception(ParamUtil.StringDeal.placeholder("本地已存在文件[{}],文件冲突,请清理冲突文件后再试", localFilePath));FSDataInputStream open = null;FileOutputStream fileOutputStream = null;try {open = hdfs.open(path);fileOutputStream = new FileOutputStream(localFilePath);IOUtil.readWriter(open, fileOutputStream);} finally {ResourceUtil.close(fileOutputStream);ResourceUtil.close(open);}}/*** 删除HDFS文件** @param absHDFSFilePath hdfs文件的绝对路径* @return 删除是否成功* @throws Exception 异常信息*/public static boolean deleteHdfs(String absHDFSFilePath) throws Exception {FileSystem hdfs = null;try {hdfs = IOHDFSUtil.getHDFSConn();return deleteHdfs(new Path(absHDFSFilePath), hdfs);} finally {ResourceUtil.close(hdfs);}}/*** 删除hdfs文件** @param path hdfs文件路径* @param hdfs 客户端对象* @return 是否删除成功* @throws IOException 异常信息*/public static boolean deleteHdfs(Path path, FileSystem hdfs) throws IOException {if (hdfs.exists(path)) return hdfs.delete(path, false);//删除当前文件, false表示不递归删除return false;}public static void main(String[] args) throws Exception {hdfsUser = "hadoop";hdfsUrl = "hdfs://192.168.1.100:9000";init();//初始化FileSystem conn = getHDFSConn();uploadToHDFS(conn, "/1.txt", "/hdfs/file.txt");//Path path = new Path("/");FileStatus[] fileStatuses = conn.listStatus(path);//遍历文件Arrays.stream(fileStatuses).forEach(fileStatus -> {System.out.println("路径: " + fileStatus.getPath().toString());System.out.println("地址: " + fileStatus.isDirectory());});conn.close();}
}