메뉴 건너뛰기

Cloudera, BigData, Semantic IoT, Hadoop, NoSQL

Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.


console창을 두개 띄우고 한쪽에는 아래의 소스를 실행

(예, $HOME/spark/bin/spark-submit
--master spark://sda1:7077,sda2:7077
--driver-memory 2g
--executor-memory 3g
--class com.gooper.icbms.sda.kafka.onem2m.JavaSparkTest
sda-client-2.0.jar)


시키고 다른 쪽에는 nc -l 7777을 실행하고 문자열을 입력하여 7777포트에 stream을 발생시켜준다.


import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaDoubleRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.DoubleFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import scala.Tuple2;

import com.google.gson.Gson;

public final class JavaStreamingContextTest {

  public static void main(String[] args) throws Exception {
  
System.out.println("start(JavaStreamingContextTest)................");

    System.out.println("=========== test21 start =================================");
    test21();
    System.out.println("=========== test21 end =================================");
    

    
    System.out.println("end(JavaStreamingContextTest)................");
  }

  
// localhost:7777에서 들어오는 stream data에서 입력된 문자열을 기준으로 동일 문자열의 개수를 카운트한다.
static void test21()  {
  SparkConf sc=new SparkConf().setAppName("JavaStreamingContextTest");
  JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(1));
  jssc.checkpoint("/tmp");
  JavaDStream<String> lines = jssc.socketTextStream("sda1", 7777);
     
  // error가 있으면 출력
  JavaDStream<String> errorLines  = lines.filter(new Function<String, Boolean>() {
  public Boolean call(String line) {
  return line.contains("error");
  }
  });
  errorLines.print();
  
  // 문자카운트
  JavaPairDStream<String, Integer> rst = lines.mapToPair(
  new PairFunction<String, String, Integer>() {
  public Tuple2<String, Integer> call(String line) {
  return new Tuple2(line, 1);
  }
  }).updateStateByKey(new UpdateRunningSum());

  // 람다식으로 처리할 경우
  //JavaPairDStream<String, Integer> rst = lines.mapToPair( (line)->new Tuple2<String, Integer>(line, 1)).updateStateByKey(new UpdateRunningSum());

  rst.print();
  
  jssc.start();
  try { 
  jssc.awaitTermination();
  } catch (Exception e) {
  System.out.println("exception 2: "+e.getMessage());
  }
}

}

class UpdateRunningSum implements Function2<List<Integer>, Optional<Integer>, Optional<Integer>> {
public Optional<Integer> call(List<Integer> values, Optional<Integer> current) {
int newSum = current.orElse(0);
for(int value : values) {
newSum += value;
}
return Optional.of(newSum);
}
};


번호 제목 날짜 조회 수
396 Ubuntu 16.04 LTS에 Hive 2.1.1설치하면서 "Version information not found in metastore"발생하는 오류원인및 조치사항 2017.05.03 3224
395 우분투에서 패키지 설치시 E: Sub-process /usr/bin/dpkg returned an error code 발생시 조치 2017.05.02 4657
394 hadoop에서 yarn jar ..를 이용하여 appliction을 실행하여 정상적(?)으로 수행되었으나 yarn UI의 어플리케이션 목록에 나타나지 않는 문제 2017.05.02 3630
393 hadoop에서 yarn jar ..를 이용하여 appliction을 실행하여 정상적으로 수행되었으나 yarn UI의 어플리케이션 목록에 나타나지 않는 문제 2017.05.02 3703
392 hadoop에서 yarn jar ..를 이용하여 appliction을 실행하여 정상적으로 수행되었으나 yarn UI의 어플리케이션 목록에 나타나지 않는 문제 2017.05.02 3521
391 hadoop에서 yarn jar ..를 이용하여 appliction을 실행하여 정상적으로 수행되었으나 yarn UI의 어플리케이션 목록에 나타나지 않는 문제 2017.05.02 3438
390 Cleaning up the staging area file시 'cannot access' 혹은 'Directory is not writable' 발생시 조치사항 2017.05.02 3929
389 test333444 2017.05.01 2344
388 test333 2017.05.01 1933
387 Ubuntu 16.04 LTS에 MariaDB 10.1설치 및 포트변경 및 원격접속 허용 2017.05.01 4983
386 Ubuntu 16.04 LTS에 4대에 Hadoop 2.8.0설치 2017.05.01 4334
385 fuseki webUI를 통해서 전체 카운트를 하면 급격하게 메모리를 소모해 버리는 문제가 있음 file 2017.04.28 4660
384 Kafka의 API중 Consumer.createJavaConsumerConnector()를 이용하고 다수의 thread를 생성하여 Kafka broker의 topic에 접근하여 데이타를 가져오고 처리하는 예제 소스 2017.04.26 4335
383 Spark에서 KafkaUtils.createStream()를 이용하여 이용하여 kafka topic에 접근하여 객채로 저장된 값을 가져오고 처리하는 예제 소스 2017.04.26 2906
382 Hbase API를 이용하여 scan시 페이징을 고려하여 목록을 가져올때 사용할 수 있는 로직의 예시를 보여줌 2017.04.26 4407
381 linux에서 특정 포트를 사용하는 프로세스 확인하기 2017.04.26 3829
380 Spark에서 Serializable관련 오류및 조치사항 2017.04.21 8171
379 Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging 발생시 조치사항 2017.04.19 5082
378 [Jsoup]특정페이지를 jsoup을 이용하여 파싱하는 샘플소스 2017.04.18 4156
377 [jsoup]Jsoup Tutorial 2017.04.11 3314
위로