본문 바로가기
BigData 기술/Spark

[Spark] Spark 예제 - 데이터 로딩, 조인, 필터링, 정렬

by 잇서니 2020. 7. 10.
반응형

 

 

샘플데이터

data_products (상품데이터)

  • 1#ROBITUSSIN PEAK COLD NIGHTTIME COLD PLUS FLU#9721.89#10
  • 상품ID#상품이름#가격#수량?

data_transaction (구매데이터)

  • 2015-03-30#6:55 AM#51#68#1#9506.21
  • 일자#시간#고객id#상품id#구매수량#가격

ch04_data_products.txt
0.00MB
ch04_data_transactions.txt
0.03MB

 

 

데이터 로딩 및 준비

//구매데이터 로딩
val tranFile = sc.textFile("/spark-in-action/ch04_data_transactions.txt")
val tranData = tranFile.map(_.split("#"))

//상품목록 로딩
val products = sc.textFile("/spark-in-action/ch04_data_products.txt").
    map(line => line.split("#")).
    map(p => (p(0).toInt, p))
    
//상품ID, 구매데이터 배열
val transByProd = tranData.map(tran => (tran(3).toInt, tran))

//상품ID별 매출액 합계
val totalsByProd = transByProd.mapValues(t => t(5).toDouble).reduceByKey{case(tot1, tot2) => tot1 + tot2} 
totalsByProd.foreach(println)
//(34,62592.43000000001) ...

  • textFile : 기본적으로 HDFS에 있는 파일을 읽는다. 로컬에 있는 파일을 읽고 싶다면 경로 앞에 file:// 을 붙여준다.
  • map : 인자를 받아서 새로운 RDD를 만든다.
  • mapValues : 기존 값을 변경한다.
  • reduceByKey : 동일한 키에 대한 값을 합친다. (더하거나 빼거나 등등)
  • case : 

 

 

 

조인

// 판매한 상품
val totalsAndProds = totalsByProd.join(products)
totalsAndProds.foreach(println)
// (34,(62592.43000000001,[Ljava.lang.String;@35a8d5ad)) ...
totalsAndProds.lookup(34)
// res72: Seq[(Double, Array[String])] = ArrayBuffer((62592.43000000001,Array(34, GAM X360 Assassins Creed 3, 6363.95, 9)))


// 판매한 상품 + 판매하지 않은 상품
//val totalsWithMissingProds = products.leftOuterJoin(totalsByProd)
val totalsWithMissingProds = totalsByProd.rightOuterJoin(products)

totalsWithMissingProds.foreach(println)
//(7,(Some(74330.11),[Ljava.lang.String;@2b651059)), (3,(None,[Ljava.lang.String;@5e5a2d8)) ...
  • inner join : A, B 의 key 값이 동일한 내용만 출력됨
  • A left join B : B에 없는 내용도 출력됨
  • B right join A : B에 없는 내용도 출력됨

 

 

판매하지 않은 상품 목록만 출력 (None 포함된 row만 출력)

// 판매하지 않은 상품 목록
val missingProds = totalsWithMissingProds.
  filter(x => x._2._1 == None).
  map(x => x._2._2)

missingProds.foreach(p => println(p.mkString(", ")))

//val missingProds = products.subtractByKey(totalsByProd).values
  • filter
  • mkString : 배열을 문자열로 바꾼다.
  • A subtractByKey B : A에서 B 내용을 뺌 

 

 

조인

// 판매한 상품 + 판매하지 않은 상품
val prodTotCogroup = totalsByProd.cogroup(products)
  • cogroup : 조인 연산자이다. 조인할 녀석들의 키 타입이 동일해야 한다.

 

 

판매하지 않은 상품 & 판매한 상품 출력

// 판매하지 않은 상품 목록
prodTotCogroup.filter(x => x._2._1.isEmpty).
    foreach(x => println(x._2._2.head.mkString(",")))
    
// 판매한 상품
val totalsAndProds = prodTotCogroup.filter(x => !x._2._1.isEmpty).
    map(x => (x._2._2.head(0).toInt, (x._2._1.head, x._2._2.head)))
  • filter
  • mkString : 배열을 문자열로 바꾼다.

 

 

상품 이름으로 (알파벳순) 정렬

// 상품 이름으로 정렬
val sortedProds = totalsAndProds.sortBy(_._2._2(1))
sortedProds.collect()
  • sortBy
  • collect : 단일 배열으로 바꾼다. [[a,a,a],[b,b,b]] => [a,a,a,b,b,b]

 

 

 

 

반응형

댓글