반응형
테스트환경
- zeppelin 0.8.1
- spark 2.4.0
예제
- github 이벤트 관련 json 데이터를 사용한다.
- push 횟수가 많은 순서대로 이름을 추출하고 정렬한다.
- 그 중에서 특정 명단에 있는 사람들만 뽑는다.
코드
%spark
//최종버전
import scala.io.Source.fromFile
object App {
def main(args : Array[String]) {
// git push 횟수가 많은 순서대로 이름을 뽑는다.
val inputPath = "/spark-in-action/2015-03-01-23.json"
val ghLog = spark.read.json(inputPath)
val pushes = ghLog.filter("type = 'PushEvent'")
val grouped = pushes.groupBy("actor.login").count
val ordered = grouped.orderBy(grouped("count").desc)
// 특정 명단들
val empPath = "/root/sunny/spark-in-action/ghEmployess.txt"
val employees = Set() ++ (
for {
line <- fromFile(empPath).getLines
} yield line.trim
)
//공유변수
val bcEmployees = sc.broadcast(employees)
import spark.implicits._
val isEmp = user => bcEmployees.value.contains(user)
//udf 등록
val isEmployee = spark.udf.register("SetContainsUdf", isEmp)
val filtered = ordered.filter(isEmployee($"login"))
filtered.show()
}
}
// 호출
App.main(null)
결과
+---------------+-----+
| login|count|
+---------------+-----+
| KenanSulayman| 72|
|direwolf-github| 24|
| lukeis| 12|
| keum| 8|
| zhixinwen| 5|
| jaanos| 5|
| fbennett| 4|
| Valicek1| 4|
| samnazarko| 3|
| shryme| 2|
| uqs| 2|
| y-yagi| 2|
| listingslab| 1|
| Somasis| 1|
| Juxnist| 1|
| danielrasmuson| 1|
| sloria| 1|
|EmanueleMinotto| 1|
| kWhittington| 1|
| dinoboff| 1|
+---------------+-----+
반응형
'BigData 기술 > Spark' 카테고리의 다른 글
[Spark] Spark 예제 - 데이터 로딩, 조인, 필터링, 정렬 (4) | 2020.07.10 |
---|---|
[Spark] Spark 예제 - 고객별 구매횟수, 구매금액 등 구해보기 (count, sum, sort) (4) | 2020.07.06 |
[spark] 파일 한 줄씩 읽기 (scala fromFile) (4) | 2020.06.24 |
histogram in spark (scala) using zeppelin (4) | 2020.06.23 |
jupyter 노트북에서 pyspark 사용하기 (2) | 2020.04.28 |
댓글