Hive 트랜잭션 테이블 (Hive update, delete)

잇서니 2021. 1. 3. 13:26



Hive에서 update 또는 delete 쿼리를 사용하기 위해서는 트랜잭션 설정이 필요하다.

Hive 트랜잭션 테이블의 개념과 설정하는 방법을 알아본다.




DB 트랜잭션의 주요한 특징이다.

  • Atomicity
    an operation either succeeds completely or fails, it does not leave partial data
    작업은 성공하거나 실패하거나 둘 중 하나이다.

  • Consistency
    once an application performs an operation the results of that operation are visible to it in every subsequent operation
    하나의 작업이 전체 시스템에 반영된다.

  • Isolation
    an incomplete operation by one user does not cause unexpected side effects for other users
    한 사용자의 작업이 아직 끝나지 않았으면, 다른 사용자의 작업에 영향을 미치지 않는다.

  • Durability
    once an operation is complete it will be preserved even in the face of machine or system failure
    작업이 완료되면, 시스템 에러가 발생하더라도 그 상태를 유지한다.



Hive transaction with ACID 왜 필요할까?

  • it is now possible to provide full ACID semantics at the row level, so that one application can add rows while another reads from the same partition without interfering with each other.
  • 기존 hive에서 불가능했던 update, delete를 할 수 있다.



Basic Design of Hive transaction with ACID

(1) Base and Delta Directories

  • HDFS는 파일 수정이 불가능하다. 이 제약사항을 극복하기 위해 hive는 다른 warehouse tool들의 접근법을 사용한다.

  • base

    • Data for the table or partition is stored in a set of base files
  • delta (변경분 데이터 저장)

    • New records, updates, and deletes are stored in delta files.
    • A new set of delta files is created for each transaction that alters a table or partition.


(2) compactor

  • 테이블 수정 작업이 늘어날수록 delta 파일이 게속 쌓인다. 그러니 HDFS 성능을 위해 압축이 필요하다. (파일 개수 많으면 네임노드에 부하)
  • metastore에서 백그라운드 프로세스로 구동된다.
    • 이 프로세스는 아래 4개 + 알파 로 구성된다.


1) initiator

  • 압축이 필요한 테이블 또는 파티션을 자동으로 찾아준다.
    • hive.compactor.initiator.on = true
  • n번 이상 압축에 실패하면 자동 압축을 중지한다.
    • hive.compactor.initiator.failed.compacts.threshold


2) worker

  • 1개의 worker는 1개의 압축 작업을 한다.
    • 압축 작업은 1개의 파티션만 다룬다. 파티션이 없는 경우 전체 테이블을 다룬다.
    • 압축 작업은 mapreduce job이다.
      • queue를 지정할 수 있다. hive.compactor.job.queue
  • 각 metastore에서 구동할 worker 개수를 설정한다.
    • hive.compactor.worker.threads
    • 동시에 돌릴 수 있는 압축 작업 개수가 결정된다.


3) cleaner

  • 압축을 완료한 후 delta 파일을 삭제한다.


4) AcidHouseKeeperService

  • heartbeat 주고 받을 때 사용하는 프로세스이다.
    • hive.txn.heartbeat.threadpool.size
  • heartbeat 가 오지 않는 transaction을 찾아서 중지한다.
    • hive.txn.timeout


(3) Transaction/Lock Manager

현재 hive transaction 사용시 org.apache.hadoop.hive.ql.lockmgr.DbTxnManager을 사용해야 한다.

  • transaction 관리
    • 테이블 옵션 필요 (transactional = true)
  • DbLockManager 를 사용하여 lock 을 관리한다.
    • 트랜잭션이 아닌 테이블에 대해서도 lock이 적용된다. 


기본값은 org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager 이다. 트랜잭션이 비활성화 된다.



(4) hive lock

insert, delete, update 등 쓰기 작업을 하면 해당 테이블은 EXCLUSIVE(WRITE) lock을 얻는다. 다른 트랜잭션에서 해당 테이블을 읽거나 쓸 수 없다. 즉 lock을 얻을 수 없어 기다려야 한다.

select 등 읽기 작업을 하면 해당 테이블은 SHARED_READ lock을 얻는다. 

해당 테이블을 동시에 읽을 수 있다. 즉, 다른 트랜잭션에서 SHARED_READ lock을 얻을 수 있다. 단, 변경은 불가하다.




hive transaction table 사용하기


  • managed table만 가능
  • load data 불가능
  • non-ACID 세션에서 ACID 테이블 읽기/쓰기 불가
  • 테이블 저장형식 orc만 가능
  • bucketing 필수
  • 테이블 필수옵션 필요
  • BEGIN, COMMIT, ROLEBACK 안 됨. auto-commit임 (추후 개발 에정)
presto 331 버전부터 hive 트랜잭션 테이블을 사용할 수 있다.

  • Use Presto version 331 or higher
  • Use Hive 3 Metastore Server. Presto does not support Hive transactional tables created with Hive before version 3.



(1) hive transaction 설정 (/etc/hive/conf/hive-site.xml)

  • hive.compactor.initiator.on=true (for metastore)
  • hive.compactor.worker.threads=10 (for metastore)
  • (for hive-server2,client)
  • hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager (for hive-server2, client)
  • hive.exec.dynamic.partition.mode = nonstrict (for hive-server2, client)





















설정 후 hive-metastore, hive-server2 서비스 재시작


(2) hive 쿼리 실행


(a) 테이블 생성

CREATE TABLE sunnytest.trx_0831(
    cust_id string,
    number string)
   clustered by (cust_id) into 16 buckets
   stored as orc
  • buckets
    • insert 작업시 buckets 개수만큼 reduce 생김


(b) update 쿼리

UPDATE sunnytest.trx_0831 SET number = 'updated'
where cust_id in ('sunny')
  • partition 컬럼, buckets 컬럼은 update 할 수 없다.
  • update 쿼리시, bucket은 1개만 생성된다.


(c) merge 쿼리

MERGE INTO trx_application_body AS T
USING merge_source_application_body AS S
ON T.invention_title = S.invention_title
  • 첫번째 job에서 buckets 개수만큼 맵 생기고
  • 다음 job에서 buckets 개수만큼 리듀스 생김


(d) 기타 쿼리

  • show compactions
  • show transactions
  • show locks
  • abort transactions






