반응형
스파크로 히스토그램을 만들고 그래프까지 그려본다.
실행 환경은 zeppelin 이다.
1. 샘플데이터 처리
%spark
// collect : 단일 배열으로 바꿈
val lines = sc.textFile("/spark-in-action/client-ids.log")
val idsStr = lines.map(line => line.split(","))
idsStr.collect
// flatMap : 모든 배열 요소를 단일 컬렉션으로 만든다. (엔터 없애기)
val ids = lines.flatMap(_.split(","))
ids.collect
ids.collect.mkString(";")
val intIds = ids.map(_.toInt)
intIds.collect
val uniqueIds = intIds.distinct
uniqueIds.collect
val finalCount = uniqueIds.count
val transactionCount = ids.count
client-ids.log 파일
15,16,20,20
77,80,94,94,98
16,31,31,15,20
collect
Array(Array(15, 16, 20, 20), Array(77, 80, 94), Array(94, 98, 16, 31), Array(31, 15, 20))
flatMap , collect
Array(15, 16, 20, 20, 77, 80, 94, 94, 98, 16, 31, 31, 15, 20)
2. 히스토그램 만들기
%spark
val a = intIds.histogram(3)
// 구간 경계
val startValues = a._1
// 각 구간에 속한 요소 개
val counts = a._2
historam 연산자를 사용하면 Array 객체가 반환된다. 첫번째 값은 구간 경계의 배열이다 (Double 타입). 두번째 값은 각 구간에 속한 요소 개수가 저장된 배열이다 (Int 타입).
3. 히스토그램 시각화
%spark
// zip : 두 개 배열을 합친다.
val zippedValues = startValues.zip(counts)
case class HistRow(startPoint:Double,count:Long)
val rowRDD = zippedValues.map( value => HistRow(value._1,value._2))
// RDD -> DataFrame (스파크 세션 사용)
val histDf = spark.createDataFrame(rowRDD)
histDf.createOrReplaceTempView("histogramTable")
zeppelin에서 그래프를 생성하기 위해 DataFrame을 사용하는 게 편하다. 이를 위해 Array(RDD)를 DataFrame으로 변환하는 과정을 거친다.
4. 그래프 출력
%sql
select * from histogramTable
반응형
'BigData 기술 > Spark' 카테고리의 다른 글
[Spark] Spark 예제 - json 데이터에서 원하는 데이터만 추출 (2) | 2020.06.25 |
---|---|
[spark] 파일 한 줄씩 읽기 (scala fromFile) (4) | 2020.06.24 |
jupyter 노트북에서 pyspark 사용하기 (2) | 2020.04.28 |
docker로 spark-hadoop-cluster 구축하기 (4) | 2020.04.24 |
spark history 서버 설정 및 구동 (4) | 2020.04.22 |
댓글