반응형
샘플데이터
data_products (상품데이터)
- 1#ROBITUSSIN PEAK COLD NIGHTTIME COLD PLUS FLU#9721.89#10
- 상품ID#상품이름#가격#수량?
data_transaction (구매데이터)
- 2015-03-30#6:55 AM#51#68#1#9506.21
- 일자#시간#고객id#상품id#구매수량#가격
데이터 로딩 및 준비
//구매데이터 로딩
val tranFile = sc.textFile("/spark-in-action/ch04_data_transactions.txt")
val tranData = tranFile.map(_.split("#"))
//상품목록 로딩
val products = sc.textFile("/spark-in-action/ch04_data_products.txt").
map(line => line.split("#")).
map(p => (p(0).toInt, p))
//상품ID, 구매데이터 배열
val transByProd = tranData.map(tran => (tran(3).toInt, tran))
//상품ID별 매출액 합계
val totalsByProd = transByProd.mapValues(t => t(5).toDouble).reduceByKey{case(tot1, tot2) => tot1 + tot2}
totalsByProd.foreach(println)
//(34,62592.43000000001) ...
- textFile : 기본적으로 HDFS에 있는 파일을 읽는다. 로컬에 있는 파일을 읽고 싶다면 경로 앞에 file:// 을 붙여준다.
- map : 인자를 받아서 새로운 RDD를 만든다.
- mapValues : 기존 값을 변경한다.
- reduceByKey : 동일한 키에 대한 값을 합친다. (더하거나 빼거나 등등)
- case :
조인
// 판매한 상품
val totalsAndProds = totalsByProd.join(products)
totalsAndProds.foreach(println)
// (34,(62592.43000000001,[Ljava.lang.String;@35a8d5ad)) ...
totalsAndProds.lookup(34)
// res72: Seq[(Double, Array[String])] = ArrayBuffer((62592.43000000001,Array(34, GAM X360 Assassins Creed 3, 6363.95, 9)))
// 판매한 상품 + 판매하지 않은 상품
//val totalsWithMissingProds = products.leftOuterJoin(totalsByProd)
val totalsWithMissingProds = totalsByProd.rightOuterJoin(products)
totalsWithMissingProds.foreach(println)
//(7,(Some(74330.11),[Ljava.lang.String;@2b651059)), (3,(None,[Ljava.lang.String;@5e5a2d8)) ...
- inner join : A, B 의 key 값이 동일한 내용만 출력됨
- A left join B : B에 없는 내용도 출력됨
- B right join A : B에 없는 내용도 출력됨
판매하지 않은 상품 목록만 출력 (None 포함된 row만 출력)
// 판매하지 않은 상품 목록
val missingProds = totalsWithMissingProds.
filter(x => x._2._1 == None).
map(x => x._2._2)
missingProds.foreach(p => println(p.mkString(", ")))
//val missingProds = products.subtractByKey(totalsByProd).values
- filter
- mkString : 배열을 문자열로 바꾼다.
- A subtractByKey B : A에서 B 내용을 뺌
조인
// 판매한 상품 + 판매하지 않은 상품
val prodTotCogroup = totalsByProd.cogroup(products)
- cogroup : 조인 연산자이다. 조인할 녀석들의 키 타입이 동일해야 한다.
판매하지 않은 상품 & 판매한 상품 출력
// 판매하지 않은 상품 목록
prodTotCogroup.filter(x => x._2._1.isEmpty).
foreach(x => println(x._2._2.head.mkString(",")))
// 판매한 상품
val totalsAndProds = prodTotCogroup.filter(x => !x._2._1.isEmpty).
map(x => (x._2._2.head(0).toInt, (x._2._1.head, x._2._2.head)))
- filter
- mkString : 배열을 문자열로 바꾼다.
상품 이름으로 (알파벳순) 정렬
// 상품 이름으로 정렬
val sortedProds = totalsAndProds.sortBy(_._2._2(1))
sortedProds.collect()
- sortBy
- collect : 단일 배열으로 바꾼다. [[a,a,a],[b,b,b]] => [a,a,a,b,b,b]
반응형
'BigData 기술 > Spark' 카테고리의 다른 글
[spark] spark streaming + kafka (547) | 2020.08.10 |
---|---|
SPARK에서 저장소(HDFS, Hive 등)에 접근하는 방식 (745) | 2020.07.31 |
[Spark] Spark 예제 - 고객별 구매횟수, 구매금액 등 구해보기 (count, sum, sort) (4) | 2020.07.06 |
[Spark] Spark 예제 - json 데이터에서 원하는 데이터만 추출 (2) | 2020.06.25 |
[spark] 파일 한 줄씩 읽기 (scala fromFile) (4) | 2020.06.24 |
댓글