메뉴 건너뛰기

Cloudera, BigData, Semantic IoT, Hadoop, NoSQL

Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.


console창을 두개 띄우고 한쪽에는 아래의 소스를 실행

(예, $HOME/spark/bin/spark-submit
--master spark://sda1:7077,sda2:7077
--driver-memory 2g
--executor-memory 3g
--class com.gooper.icbms.sda.kafka.onem2m.JavaSparkTest
sda-client-2.0.jar)


시키고 다른 쪽에는 nc -l 7777을 실행하고 문자열을 입력하여 7777포트에 stream을 발생시켜준다.


import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaDoubleRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.DoubleFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import scala.Tuple2;

import com.google.gson.Gson;

public final class JavaStreamingContextTest {

  public static void main(String[] args) throws Exception {
	  
	System.out.println("start(JavaStreamingContextTest)................");

    System.out.println("=========== test21 start =================================");
    test21();
    System.out.println("=========== test21 end =================================");
    

    
    System.out.println("end(JavaStreamingContextTest)................");
  }

  
// localhost:7777에서 들어오는 stream data에서 입력된 문자열을 기준으로 동일 문자열의 개수를 카운트한다.
static void test21()  {
	  SparkConf sc=new SparkConf().setAppName("JavaStreamingContextTest");
	  JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(1));
	  jssc.checkpoint("/tmp");
	  JavaDStream<String> lines = jssc.socketTextStream("sda1", 7777);
	  	  
	  // error가 있으면 출력
	  JavaDStream<String> errorLines  = lines.filter(new Function<String, Boolean>() {
		  public Boolean call(String line) {
			  return line.contains("error");
		  }
	  });
	  errorLines.print();
	  
	  // 문자카운트
	  JavaPairDStream<String, Integer> rst = lines.mapToPair(
			  new PairFunction<String, String, Integer>() {
				  public Tuple2<String, Integer> call(String line) {
					  return new Tuple2(line, 1);
				  }
			  }).updateStateByKey(new UpdateRunningSum());

	  // 람다식으로 처리할 경우
	  //JavaPairDStream<String, Integer> rst = lines.mapToPair( (line)->new Tuple2<String, Integer>(line, 1)).updateStateByKey(new UpdateRunningSum());

	  rst.print();
	  
	  jssc.start();
	  try { 
		  jssc.awaitTermination();
	  } catch (Exception e) {
		  System.out.println("exception 2: "+e.getMessage());
	  }
}

}
	
class UpdateRunningSum implements Function2<List<Integer>, Optional<Integer>, Optional<Integer>> {
	public Optional<Integer> call(List<Integer> values, Optional<Integer> current) {
		int newSum = current.orElse(0);
		for(int value : values) {
			newSum += value;
		}
		return Optional.of(newSum);
	}
};


번호 제목 날짜 조회 수
741 [Ranger]RangerAdminRESTClient Error gertting pplicies; Received NULL response!!, secureMode=true, user=rangerkms/node01.gooper.com@ GOOPER.COM (auth:KERBEROS), serviceName=cm_kms 2023.06.27 73
740 [vue storefrontui]외부 API통합하기 참고 문서 2022.02.09 80
739 [Encryption Zone]Encryption Zone에 생성된 table을 select할때 HDFS /tmp/zone1에 대한 권한이 없는 경우 2023.06.29 83
738 ./gradlew :composeDown 및 ./gradlew :composeUp 를 성공했을때의 메세지 2023.02.20 84
737 [EncryptionZone]User:testuser not allowed to do "DECRYPT_EEK" on 'testkey' 2023.06.29 89
736 [vi] test.nq파일에서 특정문자열(예, <>)을 찾아서 포함되는 라인을 삭제한 동일한 이름의 파일을 만드는 방법 2017.01.25 98
735 CM의 Impala->Query tab에서 FINISHED query가 보이지 않는 현상 2021.08.31 98
734 [Impala] alter table구문수행시 "WARNINGS: Impala does not have READ_WRITE access to path 'hdfs://nameservice1/DATA/Temp/DB/source/table01_ccd'" 발생시 조치 2024.04.26 98
733 restaurant-controller,에서 등록 예시 2022.04.30 99
732 주문히스토리 조회 2022.04.30 99
731 [Hue metadata]Oracle에 있는 Hue 메타정보 테이블을 이용하여 coordinator와 workflow관계 목록을 추출하는 방법 2023.08.22 99
730 [Cloudera Agent] Metadata-Plugin throttling_logger INFO (713 skipped) Unable to send data to nav server. Will try again. 2022.05.16 103
729 oozie의 sqoop action수행시 ooize:launcher의 applicationId를 이용하여 oozie:action의 applicationId및 관련 로그를 찾는 방법 2023.07.26 103
728 [CDP7.1.6,HDFS]HDFS파일을 삭제하고 Trash비움이 완료된후에도 HDFS 공간을 차지하고 있는 경우 확인/조치 방법 2023.07.17 107
727 [CDP7.1.7, Replication]Encryption Zone내 HDFS파일을 비Encryption Zone으로 HDFS Replication시 User hdfs가 아닌 hadoop으로 수행하는 방법 2024.01.15 110
726 주문 생성 데이터 예시 2022.04.30 112
725 호출 url현황 2023.02.21 112
724 [CDP7.1.7, Hive Replication]Hive Replication진행중 "The following columns have types incompatible with the existing columns in their respective positions " 오류 2023.12.27 113
723 eclipse 3.1 단축키 정리파일 2017.01.02 117
722 Cloudera Manager 5.x설치시 embedded postgresql를 사용하는 경우의 관리정보 2018.04.13 118
위로