메뉴 건너뛰기

Cloudera, BigData, Semantic IoT, Hadoop, NoSQL

Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.


Hadoop 2.7.x에서 사용할 수 있는 파일/디렉토리 관련 utils

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class HDFSUtils {
/**
   * create a existing file from local filesystem to hdfs
   * @param source
   * @param dest
   * @param conf
   * @throws IOException
   */
  public void addFile(String source, String dest, Configuration conf) throws IOException {

    FileSystem fileSystem = FileSystem.get(conf);

    // Get the filename out of the file path
    String filename = source.substring(source.lastIndexOf('/') + 1,source.length());

    // Create the destination path including the filename.
    if (dest.charAt(dest.length() - 1) != '/') {
      dest = dest + "/" + filename;
    } else {
      dest = dest + filename;
    }

    // System.out.println("Adding file to " + destination);

    // Check if the file already exists
    Path path = new Path(dest);
    if (fileSystem.exists(path)) {
      System.out.println("File " + dest + " already exists");
      return;
    }

    // Create a new file and write data to it.
    FSDataOutputStream out = fileSystem.create(path);
    InputStream in = new BufferedInputStream(new FileInputStream(new File(
        source)));

    byte[] b = new byte[1024];
    int numBytes = 0;
    while ((numBytes = in.read(b)) > 0) {
      out.write(b, 0, numBytes);
    }

    // Close all the file descriptors
    in.close();
    out.close();
    fileSystem.close();
  }

  /**
   * read a file from hdfs
   * @param file
   * @param conf
   * @throws IOException
   */
  public void readFile(String file, Configuration conf) throws IOException {
    FileSystem fileSystem = FileSystem.get(conf);

    Path path = new Path(file);
    if (!fileSystem.exists(path)) {
      System.out.println("File " + file + " does not exists");
      return;
    }

    FSDataInputStream in = fileSystem.open(path);

    String filename = file.substring(file.lastIndexOf('/') + 1,
        file.length());

    OutputStream out = new BufferedOutputStream(new FileOutputStream(
        new File(filename)));

    byte[] b = new byte[1024];
    int numBytes = 0;
    while ((numBytes = in.read(b)) > 0) {
      out.write(b, 0, numBytes);
    }

    in.close();
    out.close();
    fileSystem.close();
  }

  /**
   * delete a directory in hdfs
   * @param file
   * @throws IOException
   */
  public void deleteFile(String file, Configuration conf) throws IOException {
    FileSystem fileSystem = FileSystem.get(conf);

    Path path = new Path(file);
    if (!fileSystem.exists(path)) {
      System.out.println("File " + file + " does not exists");
      return;
    }

    fileSystem.delete(new Path(file), true);

    fileSystem.close();
  }

  /**
   * create directory in hdfs
   * @param dir
   * @throws IOException
   */
  public void mkdir(String dir, Configuration conf) throws IOException {
    FileSystem fileSystem = FileSystem.get(conf);

    Path path = new Path(dir);
    if (fileSystem.exists(path)) {
      System.out.println("Dir " + dir + " already exists");
      return;
    } else {
    fileSystem.mkdirs(path);
    fileSystem.close();
    }
  }
  
  /**
   * delete directory in hdfs
   * @param dir
   * @throws IOException
   */
  public void rmdir(String dir, Configuration conf) throws IOException {
    FileSystem fileSystem = FileSystem.get(conf);

    Path path = new Path(dir);
    if (fileSystem.exists(path)) {
    fileSystem.delete(path, true);
    fileSystem.close();
    } else {
    System.out.println("Dir " + dir + " not exists");
    }
  }


  /*
  public static void main(String[] args) throws IOException {

    if (args.length < 1) {
      System.out.println("Usage: hdfsclient add/read/delete/mkdir"
          + " [<local_path> <hdfs_path>]");
      System.exit(1);
    }

    FileSystemOperations client = new FileSystemOperations();
    String hdfsPath = "hdfs://" + args[0] + ":" + args[1];

    Configuration conf = new Configuration();
    // Providing conf files
    // conf.addResource(new Path(HDFSAPIDemo.class.getResource("/conf/core-site.xml").getFile()));
    // conf.addResource(new Path(HDFSAPIDemo.class.getResource("/conf/hdfs-site.xml").getFile()));
    // (or) using relative paths
    //    conf.addResource(new Path(
    //        "/u/hadoop-1.0.2/conf/core-site.xml"));
    //    conf.addResource(new Path(
    //        "/u/hadoop-1.0.2/conf/hdfs-site.xml"));

    //(or)
    // alternatively provide namenode host and port info
    conf.set("fs.default.name", hdfsPath);

    if (args[0].equals("add")) {
      if (args.length < 3) {
        System.out.println("Usage: hdfsclient add <local_path> "
            + "<hdfs_path>");
        System.exit(1);
      }

      client.addFile(args[1], args[2], conf);

    } else if (args[0].equals("read")) {
      if (args.length < 2) {
        System.out.println("Usage: hdfsclient read <hdfs_path>");
        System.exit(1);
      }

      client.readFile(args[1], conf);

    } else if (args[0].equals("delete")) {
      if (args.length < 2) {
        System.out.println("Usage: hdfsclient delete <hdfs_path>");
        System.exit(1);
      }

      client.deleteFile(args[1], conf);

    } else if (args[0].equals("mkdir")) {
      if (args.length < 2) {
        System.out.println("Usage: hdfsclient mkdir <hdfs_path>");
        System.exit(1);
      }

      client.mkdir(args[1], conf);

    } else {
      System.out.println("Usage: hdfsclient add/read/delete/mkdir"
          + " [<local_path> <hdfs_path>]");
      System.exit(1);
    }

    System.out.println("Done!");
  }
  */
}


번호 제목 날짜 조회 수
456 lagom의 online-auction-java프로젝트 실행시 "Could not find Cassandra contact points, due to: ServiceLocator is not bound" 경고 발생시 조치사항 2017.10.12 2793
» Hadoop 2.7.x에서 사용할 수 있는 파일/디렉토리 관련 util성 클래스 파일 2017.09.28 2433
454 python3.5에서 numpy버젼에 따른 문제점을 조치하는 방법및 pymysql import할때 오류 발생시 조치사항 2017.09.28 3879
453 fuseki에서 제공하는 script중 s-post를 사용하는 예문 2017.09.15 4349
452 core 'gc_shard3_replica2' is already locked라는 오류가 발생할때 조치사항 2017.09.14 4849
451 editLog의 문제로 발생하는 journalnode 기동 오류 발생시 조치사항 2017.09.14 3994
450 halyard 1.3의 console을 이용하여 100억건의 데이타에 대한 쿼리수행시 ScannerTimeoutException 발생시 조치사항 2017.09.06 4136
449 hadoop cluster구성된 노드를 확인시 Capacity를 보면 색이 붉은색으로 표시되어 있는 경우나 Unhealthy인 경우 처리방법 2017.08.30 3211
448 파일은 남겨두고 파일 내용만 지우고자 할 때. 2017.08.30 2996
447 RDF4J의 RESTFul API처리 클래스 소스 파악(web module위주) 2017.08.30 3757
446 RDF4J의 rdf4j-server.war가 제공하는 RESTFul API를 이용한 CRUD테스트(트랜잭션처리) 2017.08.30 4187
445 RDF4J의 rdf4j-server.war가 제공하는 RESTFul API를 이용하여 repository에 CRUD테스트 2017.08.30 4094
444 DeviceType이 o:motion-sensor_33 이거나 o:motion-sensor_32 경우의 sparql문장은 다음과 같다. 2017.08.16 4002
443 [oneM2M]Ontologies used for oneM2M 2017.08.02 4001
442 Windows7 64bit 환경에서 Apache Spark 2.2.0 설치하기 2017.07.26 9621
441 Windows7 64bit 환경에서 Apache Hadoop 2.7.1설치하기 2017.07.26 5398
440 jena/fuseki 3.4.0 설치 2017.07.25 4315
439 LUBM 데이타 생성구문 2017.07.24 7652
438 Core with name 'xx_shard4_replica1' already exists. 발생시 조치사항 2017.07.22 3148
437 9대가 hbase cluster로 구성된 서버에서 테스트 data를 halyard에 적재하고 테스트 하는 방법및 절차 2017.07.21 4964
위로