본문 바로가기
BigData 기술/Spark

[Spark] Spark 예제 - 고객별 구매횟수, 구매금액 등 구해보기 (count, sum, sort)

by 잇서니 2020. 7. 6.
반응형

 

샘플데이터

  • 2015-03-30#6:55 AM#51#68#1#9506.21
  • 일자#시간#고객id#상품id#구매수량#가격

ch04_data_transactions.txt
0.03MB

 

 

파일 로딩고객 ID 별로 데이터 재생성하기

val tranFile = sc.textFile("/spark-in-action/ch04_data_transactions.txt")
val tranData = tranFile.map(x => x.split("#"))
//val tranData = tranFile.map(_.split("#"))


// key : 고객ID, value : 구매기록 배열
var transByCust = tranData.map(tran => (tran(2).toInt, tran))

//
transByCust.foreach(println)
//
(51,[Ljava.lang.String;@39e216fd)
(99,[Ljava.lang.String;@34eb03fe)
(79,[Ljava.lang.String;@1ce24ab5)
(51,[Ljava.lang.String;@4852b7f1)
..
  • textFile : 기본적으로 HDFS에 있는 파일을 읽는다. 로컬에 있는 파일을 읽고 싶다면 경로 앞에 file:// 을 붙여준다.
  • map : 인자를 받아서 새로운 RDD를 만든다.

 

 

고객수 카운팅, 고객별 구매 횟수, 구매 횟수로 정렬

//고객 수
transByCust.keys.distinct().count()
//고객ID별 구매 횟수
transByCust.countByKey()
//
scala.collection.Map[Int,Long] = Map(69 -> 7, 88 -> 5, 5 -> 11, 10 -> 7, 56 -> 17, 42 -> 7, 24 -> 9, 37 -> 7, 25 -> 12, 52 -> 9, 14 -> 8, 20 -> 8, 46 -> 9, 93 -> 12, 57 -> 8, 78 -> 11, 29 -> 9, 84 -> 9, 61 -> 8, 89 -> 9, 1 -> 9, 74 -> 11, 6 -> 7, 60 -> 4, 85 -> 9, 28 -> 11, 38 -> 9, 70 -> 8, 21 -> 13, 33 -> 9, 92 -> 8, 65 -> 10, 97 -> 12, 9 -> 7, 53 -> 19, 77 -> 11, 96 -> 8, 13 -> 12, 41 -> 12, 73 -> 7, 2 -> 15, 32 -> 14, 34 -> 14, 45 -> 11, 64 -> 10, 17 -> 13, 22 -> 10, 44 -> 8, 59 -> 9, 27 -> 7, 71 -> 10, 12 -> 7, 54 -> 7, 49 -> 8, 86 -> 9, 81 -> 9, 76 -> 15, 7 -> 10, 39 -> 11, 98 -> 11, 91 -> 13, 66 -> 11, 3 -> 13, 80 -> 7, 35 -> 10, 48 -> 5, 63 -> 12, 18 -> 9, 95 -> 8, 50 -> 14, 67 -> 5, 16 -> 8, 31 -> 14, 11 -> 8, 72 -> 7, 43 -> 12, 99 -> 12, 87 -> 10, 40 -> 10, 26 -> 11, 55...
//모든 고객들의 총 구매횟수
transByCust.countByKey().values.sum
//구매 횟수가 가장 많은 고객
val (cid, purch) = transByCust.countByKey().toSeq.sortBy(_._2).last
// 53번 고객의 구매 기록
transByCust.lookup(53)
//
Seq[Array[String]] = WrappedArray(Array(2015-03-30, 6:18 AM, 53, 42, 5, 2197.85), Array(2015-03-30, 4:42 AM, 53, 44, 6, 9182.08), Array(2015-03-30, 2:51 AM, 53, 59, 5, 3154.43), Array(2015-03-30, 5:57 PM, 53, 31, 5, 6649.27), Array(2015-03-30, 6:11 AM, 53, 33, 10, 2353.72), Array(2015-03-30, 9:46 PM, 53, 93, 1, 2889.03), Array(2015-03-30, 4:15 PM, 53, 72, 7, 9157.55), Array(2015-03-30, 2:42 PM, 53, 94, 1, 921.65), Array(2015-03-30, 8:30 AM, 53, 38, 5, 4000.92), Array(2015-03-30, 6:06 AM, 53, 12, 6, 2174.02), Array(2015-03-30, 3:44 AM, 53, 47, 1, 7556.32), Array(2015-03-30, 10:25 AM, 53, 30, 2, 5107.0), Array(2015-03-30, 1:48 AM, 53, 58, 4, 718.93), Array(2015-03-30, 9:31 AM, 53, 18, 4, 8214.79), Array(2015-03-30, 9:04 AM, 53, 68, 4, 9246.59), Array(2015-03-30, 1:51 AM, 53, 40, 1,...
  • countByKey : key를 기준으로 값 개수를 카운팅
  • toSeq : Sequence 형태로 바꿈. Map(69 -> 7 , 88 -> 5 ..) -> ArrayBuffer((69,7), (88,5), ...)
  • sortBy : keyBy + sortByKey

 

구매기록(value) 변경하기

// 구매기록(value) 변경(기존값 변경)
transByCust = transByCust.mapValues(tran => {
    if(tran(3).toInt == 25 && tran(4).toDouble > 1)
        tran(5) = (tran(5).toDouble * 0.95).toString
    tran
})


// 구매기록(value) 변경 (원래값 + 새로운값)
transByCust.flatMapValues(tran =>{
    if(tran(3).toInt == 81 && tran(4).toDouble >=5){
        val cloned = tran.clone()
        cloned(5) = "0.00"; cloned(3) = "70"; cloned(4) = "1";
        List(tran, cloned)
    }
    else
        List(tran)
})
  • mapValues : 기존 값을 변경한다.
  • flatMapValues : 기존 값에 새로운 값을 추가한다. (가져다 붙이기. append 개념)

 

 

고객별 구매금액 합계, 구매금액 높은 순으로 정렬하기

// key : 고객ID, value : 구매가격만 포함
val amounts = transByCust.mapValues(t => t(5).toDouble)
amounts.foreach(println)
(58,2761.68)
(51,9506.21)
(42,4336.81)
..
// 고객 ID별 구매 합계
val totals = amounts.foldByKey(0)((p1, p2) => p1 + p2).collect()
// val totals = amounts.reduceByKey((v1, v2) => v1 + v2).collect()
// Array[(Int, Double)] = Array((34,77332.59), (52,58348.020000000004), (96,36928.57), (4,41801.35), (16,40696.020000000004), (82,58722.58), (66,52130.009999999995), (28,45534.299999999996), (54,36307.04), (80,31794.62), 


// 제일 많이 구매한 고객
totals.toSeq.sortBy(_._2).last
  • reduceByKey : 동일한 키에 대한 값을 합친다.
  • foldByKey : reduceByKey와 기능이 같다.
  • collect : 단일 배열으로 바꾼다. [[a,a,a],[b,b,b]] => [a,a,a,b,b,b]
  • toSeq : Sequence 형태로 바꿈. Array( (34,77332.59), ...) -> WrappedArray((34,77332.59), ...)

 

 

 

구매기록(배열) 합치기, 결과파일 HDFS에 저장하기

// 구매기록 추가
complTrans = complTrans :+ Array("2015-09-30", "11:59 PM","76", "63", "1","0.00")
transByCust = transByCust.union(sc.parallelize(complTrans).map(t => (t(2).toInt, t)))
transByCust.map(t => t._2.mkString("#")).saveAsTextFile("/spark-in-action/ch04output-transByCust2")

// 고객 ID별 구매한 전체 목록
val prods = transByCust.aggregateByKey(List[String]())(
    (prods, tran) => prods ::: List(tran(3)),
    (prods1, prods2) => prods1 ::: prods2
    )
  • saveAsTextFile : 기본적으로 HDFS에 저장한다. 덮어쓰기는 지원 안 한다.
  • aggregateByKey : foldByKey와 기능이 같다. 값의 타입을 바꿀 수 있다.
    • 고객 ID별 구매한 전체 목록 코드는 이해가 잘 안된다.

 

 

반응형

댓글