본문 바로가기
BigData 기술/Spark

histogram in spark (scala) using zeppelin

by 잇서니 2020. 6. 23.
반응형

 

스파크로 히스토그램을 만들고 그래프까지 그려본다.

실행 환경은 zeppelin 이다.

 


 

1. 샘플데이터 처리

%spark 

// collect : 단일 배열으로 바꿈
val lines = sc.textFile("/spark-in-action/client-ids.log")
val idsStr = lines.map(line => line.split(","))
idsStr.collect

// flatMap : 모든 배열 요소를 단일 컬렉션으로 만든다. (엔터 없애기)
val ids = lines.flatMap(_.split(",")) 
ids.collect
ids.collect.mkString(";")

val intIds = ids.map(_.toInt)
intIds.collect

val uniqueIds = intIds.distinct
uniqueIds.collect

val finalCount = uniqueIds.count
val transactionCount = ids.count
client-ids.log 파일

15,16,20,20
77,80,94,94,98
16,31,31,15,20

 

collect

Array(Array(15, 16, 20, 20), Array(77, 80, 94), Array(94, 98, 16, 31), Array(31, 15, 20))

 

flatMap , collect

Array(15, 16, 20, 20, 77, 80, 94, 94, 98, 16, 31, 31, 15, 20)

 

 

2. 히스토그램 만들기

%spark 

val a = intIds.histogram(3)

// 구간 경계
val startValues = a._1
// 각 구간에 속한 요소 개
val counts = a._2

historam 연산자를 사용하면 Array 객체가 반환된다. 첫번째 값은 구간 경계의 배열이다 (Double 타입). 두번째 값은 각 구간에 속한 요소 개수가 저장된 배열이다 (Int 타입).

 

3. 히스토그램 시각화

%spark

// zip : 두 개 배열을 합친다.
val zippedValues = startValues.zip(counts)
case class HistRow(startPoint:Double,count:Long)
val rowRDD = zippedValues.map( value => HistRow(value._1,value._2))

// RDD -> DataFrame (스파크 세션 사용)
val histDf = spark.createDataFrame(rowRDD)
histDf.createOrReplaceTempView("histogramTable")

zeppelin에서 그래프를 생성하기 위해 DataFrame을 사용하는 게 편하다. 이를 위해 Array(RDD)를 DataFrame으로 변환하는 과정을 거친다.

 

4. 그래프 출력

%sql
select * from histogramTable

반응형

댓글