메뉴 건너뛰기

Cloudera, BigData, Semantic IoT, Hadoop, NoSQL

Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.


Hadoop 2.7.x에서 사용할 수 있는 파일/디렉토리 관련 utils

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class HDFSUtils {
	/**
	   * create a existing file from local filesystem to hdfs
	   * @param source
	   * @param dest
	   * @param conf
	   * @throws IOException
	   */
	  public void addFile(String source, String dest, Configuration conf) throws IOException {

	    FileSystem fileSystem = FileSystem.get(conf);

	    // Get the filename out of the file path
	    String filename = source.substring(source.lastIndexOf('/') + 1,source.length());

	    // Create the destination path including the filename.
	    if (dest.charAt(dest.length() - 1) != '/') {
	      dest = dest + "/" + filename;
	    } else {
	      dest = dest + filename;
	    }

	    // System.out.println("Adding file to " + destination);

	    // Check if the file already exists
	    Path path = new Path(dest);
	    if (fileSystem.exists(path)) {
	      System.out.println("File " + dest + " already exists");
	      return;
	    }

	    // Create a new file and write data to it.
	    FSDataOutputStream out = fileSystem.create(path);
	    InputStream in = new BufferedInputStream(new FileInputStream(new File(
	        source)));

	    byte[] b = new byte[1024];
	    int numBytes = 0;
	    while ((numBytes = in.read(b)) > 0) {
	      out.write(b, 0, numBytes);
	    }

	    // Close all the file descriptors
	    in.close();
	    out.close();
	    fileSystem.close();
	  }

	  /**
	   * read a file from hdfs
	   * @param file
	   * @param conf
	   * @throws IOException
	   */
	  public void readFile(String file, Configuration conf) throws IOException {
	    FileSystem fileSystem = FileSystem.get(conf);

	    Path path = new Path(file);
	    if (!fileSystem.exists(path)) {
	      System.out.println("File " + file + " does not exists");
	      return;
	    }

	    FSDataInputStream in = fileSystem.open(path);

	    String filename = file.substring(file.lastIndexOf('/') + 1,
	        file.length());

	    OutputStream out = new BufferedOutputStream(new FileOutputStream(
	        new File(filename)));

	    byte[] b = new byte[1024];
	    int numBytes = 0;
	    while ((numBytes = in.read(b)) > 0) {
	      out.write(b, 0, numBytes);
	    }

	    in.close();
	    out.close();
	    fileSystem.close();
	  }

	  /**
	   * delete a directory in hdfs
	   * @param file
	   * @throws IOException
	   */
	  public void deleteFile(String file, Configuration conf) throws IOException {
	    FileSystem fileSystem = FileSystem.get(conf);

	    Path path = new Path(file);
	    if (!fileSystem.exists(path)) {
	      System.out.println("File " + file + " does not exists");
	      return;
	    }

	    fileSystem.delete(new Path(file), true);

	    fileSystem.close();
	  }

	  /**
	   * create directory in hdfs
	   * @param dir
	   * @throws IOException
	   */
	  public void mkdir(String dir, Configuration conf) throws IOException {
	    FileSystem fileSystem = FileSystem.get(conf);

	    Path path = new Path(dir);
	    if (fileSystem.exists(path)) {
	      System.out.println("Dir " + dir + " already exists");
	      return;
	    } else {
		    fileSystem.mkdirs(path);
		    fileSystem.close();
	    }
	  }
	  
	  /**
	   * delete directory in hdfs
	   * @param dir
	   * @throws IOException
	   */
	  public void rmdir(String dir, Configuration conf) throws IOException {
	    FileSystem fileSystem = FileSystem.get(conf);

	    Path path = new Path(dir);
	    if (fileSystem.exists(path)) {
		    fileSystem.delete(path, true);
		    fileSystem.close();
	    } else {
		    System.out.println("Dir " + dir + " not exists");
	    }
	  }


	  /*
	  public static void main(String[] args) throws IOException {

	    if (args.length < 1) {
	      System.out.println("Usage: hdfsclient add/read/delete/mkdir"
	          + " [<local_path> <hdfs_path>]");
	      System.exit(1);
	    }

	    FileSystemOperations client = new FileSystemOperations();
	    String hdfsPath = "hdfs://" + args[0] + ":" + args[1];

	    Configuration conf = new Configuration();
	    // Providing conf files
	    // conf.addResource(new Path(HDFSAPIDemo.class.getResource("/conf/core-site.xml").getFile()));
	    // conf.addResource(new Path(HDFSAPIDemo.class.getResource("/conf/hdfs-site.xml").getFile()));
	    // (or) using relative paths
	    //    conf.addResource(new Path(
	    //        "/u/hadoop-1.0.2/conf/core-site.xml"));
	    //    conf.addResource(new Path(
	    //        "/u/hadoop-1.0.2/conf/hdfs-site.xml"));

	    //(or)
	    // alternatively provide namenode host and port info
	    conf.set("fs.default.name", hdfsPath);

	    if (args[0].equals("add")) {
	      if (args.length < 3) {
	        System.out.println("Usage: hdfsclient add <local_path> "
	            + "<hdfs_path>");
	        System.exit(1);
	      }

	      client.addFile(args[1], args[2], conf);

	    } else if (args[0].equals("read")) {
	      if (args.length < 2) {
	        System.out.println("Usage: hdfsclient read <hdfs_path>");
	        System.exit(1);
	      }

	      client.readFile(args[1], conf);

	    } else if (args[0].equals("delete")) {
	      if (args.length < 2) {
	        System.out.println("Usage: hdfsclient delete <hdfs_path>");
	        System.exit(1);
	      }

	      client.deleteFile(args[1], conf);

	    } else if (args[0].equals("mkdir")) {
	      if (args.length < 2) {
	        System.out.println("Usage: hdfsclient mkdir <hdfs_path>");
	        System.exit(1);
	      }

	      client.mkdir(args[1], conf);

	    } else {
	      System.out.println("Usage: hdfsclient add/read/delete/mkdir"
	          + " [<local_path> <hdfs_path>]");
	      System.exit(1);
	    }

	    System.out.println("Done!");
	  }
	  */
}


번호 제목 날짜 조회 수
461 HDFS에서 quota 설정 방법및 확인 방법 2022.03.30 399
460 Collections.sort를 이용한 List<User>형태의 데이타 정렬(숫자, 문자에 대해서 각각 asc/desc및 복합정렬) 2016.12.15 400
459 solr 데몬이 떠있는 동안 hadoop이 다운되는 경우 Index dir 'hdfs://mycluster/user/../core_node2/data/index/' of core 'gc_shard1_replica2' is already locked라논 오류가 발생하는데 이에 대한 조치사항 2018.01.04 400
458 [Oozie]Disk I/O error: Failed to open HDFS file dhfs://..../tb_aaa/....OPYING 2019.02.15 400
457 mysql sqoop작업을 위해서 mysql-connector-java.jar을 추가하는 경우 확실하게 인식시키는 방법 2020.05.11 404
456 hadoop nfs gateway설정 (Cloudera 6.3.4, CentOS 7.4 환경에서) 2022.01.07 404
455 [보안/인증]javax.net.ssl.SSLHandshakeException: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target발생 원인/조치내용 2023.10.24 404
454 kafka의 re-balance를 이용하여 consumer를 multi thread로 돌려서 topic의 partitions을 활용 2015.03.31 405
453 Cloudera설치중 실패로 여러번 설치하는 과정에 "Running in non-interactive mode, and data appears to exist in Storage Directory /dfs/nn. Not formatting." 오류가 발생시 조치하는 방법 2018.03.29 406
452 cloudera-scm-agent 설정파일 위치및 재시작 명령문 2018.03.29 407
451 hadoop클러스터를 구성하던 서버중 HA를 담당하는 서버의 hostname등이 변경되어 문제가 발생했을때 조치사항 2016.07.29 408
450 SCM서비스를 추가하는 동안 Unexpected error. Unable to verify database connection. 오류발생시 확인 사항 2018.06.08 408
449 hbase CustomFilter만들기 (0.98.X이상) 2015.05.08 409
448 [shell script]test.txt에 space로 분리된 내용을 일어들이는 예제 2017.02.21 409
447 spark submit용 jar파일을 만드는 sbt 용 build.sbt설정 파일(참고용) 2016.08.19 410
446 [shellscript] 함수에 배열을 인자로 주어서 처리하는 방법 2019.07.16 411
445 jena jar파일실행시 org.apache.jena.tdb.TDB.init에서 java.lang.NullPointerException발생시 조치사항 2016.08.19 412
444 Not enough replica available for query at consistency QUORUM가 발생하는 경우 2017.06.21 412
443 모두를 위한 머신러닝과 딥러닝의 강의 file 2016.09.27 418
442 [Jsoup]특정페이지를 jsoup을 이용하여 파싱하는 샘플소스 2017.04.18 418
위로