메뉴 건너뛰기

Cloudera, BigData, Semantic IoT, Hadoop, NoSQL

Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.


console창을 두개 띄우고 한쪽에는 아래의 소스를 실행

(예, $HOME/spark/bin/spark-submit
--master spark://sda1:7077,sda2:7077
--driver-memory 2g
--executor-memory 3g
--class com.gooper.icbms.sda.kafka.onem2m.JavaSparkTest
sda-client-2.0.jar)


시키고 다른 쪽에는 nc -l 7777을 실행하고 문자열을 입력하여 7777포트에 stream을 발생시켜준다.


import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaDoubleRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.DoubleFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import scala.Tuple2;

import com.google.gson.Gson;

public final class JavaStreamingContextTest {

  public static void main(String[] args) throws Exception {
	  
	System.out.println("start(JavaStreamingContextTest)................");

    System.out.println("=========== test21 start =================================");
    test21();
    System.out.println("=========== test21 end =================================");
    

    
    System.out.println("end(JavaStreamingContextTest)................");
  }

  
// localhost:7777에서 들어오는 stream data에서 입력된 문자열을 기준으로 동일 문자열의 개수를 카운트한다.
static void test21()  {
	  SparkConf sc=new SparkConf().setAppName("JavaStreamingContextTest");
	  JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(1));
	  jssc.checkpoint("/tmp");
	  JavaDStream<String> lines = jssc.socketTextStream("sda1", 7777);
	  	  
	  // error가 있으면 출력
	  JavaDStream<String> errorLines  = lines.filter(new Function<String, Boolean>() {
		  public Boolean call(String line) {
			  return line.contains("error");
		  }
	  });
	  errorLines.print();
	  
	  // 문자카운트
	  JavaPairDStream<String, Integer> rst = lines.mapToPair(
			  new PairFunction<String, String, Integer>() {
				  public Tuple2<String, Integer> call(String line) {
					  return new Tuple2(line, 1);
				  }
			  }).updateStateByKey(new UpdateRunningSum());

	  // 람다식으로 처리할 경우
	  //JavaPairDStream<String, Integer> rst = lines.mapToPair( (line)->new Tuple2<String, Integer>(line, 1)).updateStateByKey(new UpdateRunningSum());

	  rst.print();
	  
	  jssc.start();
	  try { 
		  jssc.awaitTermination();
	  } catch (Exception e) {
		  System.out.println("exception 2: "+e.getMessage());
	  }
}

}
	
class UpdateRunningSum implements Function2<List<Integer>, Optional<Integer>, Optional<Integer>> {
	public Optional<Integer> call(List<Integer> values, Optional<Integer> current) {
		int newSum = current.orElse(0);
		for(int value : values) {
			newSum += value;
		}
		return Optional.of(newSum);
	}
};


번호 제목 날짜 조회 수
201 Hive MetaStore Server기동시 Could not create "increment"/"table" value-generation container SEQUENCE_TABLE since autoCreate flags do not allow it. 오류발생시 조치사항 2017.05.03 678
200 Ubuntu 16.04 LTS에 Hive 2.1.1설치하면서 "Version information not found in metastore"발생하는 오류원인및 조치사항 2017.05.03 658
199 hadoop에서 yarn jar ..를 이용하여 appliction을 실행하여 정상적(?)으로 수행되었으나 yarn UI의 어플리케이션 목록에 나타나지 않는 문제 2017.05.02 217
198 hadoop에서 yarn jar ..를 이용하여 appliction을 실행하여 정상적으로 수행되었으나 yarn UI의 어플리케이션 목록에 나타나지 않는 문제 2017.05.02 227
197 hadoop에서 yarn jar ..를 이용하여 appliction을 실행하여 정상적으로 수행되었으나 yarn UI의 어플리케이션 목록에 나타나지 않는 문제 2017.05.02 158
196 hadoop에서 yarn jar ..를 이용하여 appliction을 실행하여 정상적으로 수행되었으나 yarn UI의 어플리케이션 목록에 나타나지 않는 문제 2017.05.02 298
195 Cleaning up the staging area file시 'cannot access' 혹은 'Directory is not writable' 발생시 조치사항 2017.05.02 451
194 Ubuntu 16.04 LTS에 MariaDB 10.1설치 및 포트변경 및 원격접속 허용 2017.05.01 1694
193 Ubuntu 16.04 LTS에 4대에 Hadoop 2.8.0설치 2017.05.01 1131
192 Kafka의 API중 Consumer.createJavaConsumerConnector()를 이용하고 다수의 thread를 생성하여 Kafka broker의 topic에 접근하여 데이타를 가져오고 처리하는 예제 소스 2017.04.26 492
191 Spark에서 KafkaUtils.createStream()를 이용하여 이용하여 kafka topic에 접근하여 객채로 저장된 값을 가져오고 처리하는 예제 소스 2017.04.26 459
190 Hbase API를 이용하여 scan시 페이징을 고려하여 목록을 가져올때 사용할 수 있는 로직의 예시를 보여줌 2017.04.26 912
189 Spark에서 Serializable관련 오류및 조치사항 2017.04.21 5107
188 Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging 발생시 조치사항 2017.04.19 851
187 Container killed by the ApplicationMaster. Container killed on request. Exit code is 143 Container exited with a non-zero exit code 143 TaskAttempt killed because it ran on unusable node 오류시 조치방법 2017.04.06 1015
186 streaming작업시 입력된 값에 대한 사본을 만들게 되는데 이것이 실패했을때 발생하는 경고메세지 2017.04.03 824
» JavaStreamingContext를 이용하여 스트림으로 들어오는 문자열 카운트 소스 2017.03.30 287
184 kafka-manager 1.3.3.4 설정및 실행하기 2017.03.20 1918
183 spark 2.0.0의 api를 이용하는 예제 프로그램 2017.03.15 349
182 It is indirectly referenced from required .class files 오류 발생시 조치방법 2017.03.09 873
위로