Bigdata, Hadoop ecosystem, Semantic IoT등의 프로젝트를 진행중에 습득한 내용을 정리하는 곳입니다.
필요한 분을 위해서 공개하고 있습니다.
문의사항은 gooper@gooper.com로 메일을
보내주세요.
import org.apache.spark.SparkContext
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
object StreamingLogsMB {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: stubs.StreamingLogsMB <hostname> <port>")
System.exit(1)
}
// get hostname and port of data source from application arguments
val hostname = args(0)
val port = args(1).toInt
// Create a Spark Context
val sc = new SparkContext()
// Set log level to ERROR to avoid distracting extra output
sc.setLogLevel("ERROR")
// Configure the Streaming Context with a 1 second batch duration
val ssc = new StreamingContext(sc,Seconds(1))
// Create a DStream of log data from the server and port specified
val logs = ssc.socketTextStream(hostname,port)
ssc.checkpoint("logcheckpt")
logs.countByWindow(Seconds(5), Seconds(2)).print
ssc.start()
ssc.awaitTermination()
}
}