Oracle에서 Snowflake로 CDC(변경 데이터 캡처) 데이터 처리
CDC(Change Data Capture)는 데이터베이스에서 데이터 변경 사항(삽입, 업데이트, 삭제)을 추적하고 캡처하는 기술이나 프로세스를 의미한다. CDC는 실시간 데이터 동기화, 데이터 복제, 데이터 웨어하우스 로드, 로그 분석 등에 사용된다. 오늘날 CDC의 중요성은 데이터 중심 비즈니스 환경과 실시간 데이터 요구사항이 점점 더 증가하는 상황이다. 기업은 데이터의 신속한 활용과 통합을 통해 더 나은 의사결정을 내리고 경쟁 우위를 확보하려 하기 때문에 CDC는 필수적인 기술로 자리 잡고 있다. 그럼 이번 글에서는 Oracle에서 Snowflake로 CDC 데이터 처리를 어떻게 하는지 알아보도록 하자. 본 글에서는 실제로 수행했던 사례 기반으로 작성할 예정이다.
위 그림은 Oracle의 데이터를 AWS DMS를 통해 Snowflake로 데이터를 적재하는 아키텍처이다. DMS는 Data Migration Service의 약자로 AWS에서 제공하는 데이터 마이그레이션 서비스를 말한다. AWS내의 인스턴스 사이의 데이터 마이그레이션뿐만 아니라 온프레미스와 클라우드 사이에서의 데이터 마이그레이션도 지원한다.
데이터를 대용량으로 적재하게 되면 시간이 지남에 따라 Oracle의 데이터가 커지기 때문에 DMS로 데이터를 가져올 때 소스 데이터베이스에 과부하가 발생할 수밖에 없고, S3에 쌓인 데이터를 Snowflake로 적재하려고 할 때도 데이터가 크기 때문에 많은 시간과 비용이 발생할 수밖에 없다. 따라서 이렇게 대용량으로 적재하는 방식보다 변경분 데이터만 캡처해서 가져오는 CDC 방식을 고려해야 한다. 그럼 아래에서 어떻게 CDC를 적용시키는지 자세히 살펴보자.
옵션 1. Dynamic Table을 통한 CDC 로그 적용
Snowflake의 Dynamic Table은 실시간으로 변경된 데이터를 자동으로 처리하고, 사용자 정의 쿼리를 기반으로 최신 상태의 데이터를 유지할 수 있도록 도와주는 테이블이다. 정리하자면, 전통적인 테이블과는 다르게 데이터의 변동을 자동으로 감지하고 이를 기반으로 테이블을 지속적으로 업데이트한다. 이러한 Dynamic Table을 활용하여 CDC를 적용시키는 방법에 대해 알아보자.
SQS는 AWS Simple Queue Service의 약자로, 서버들끼리 사용할 수 있는 메세지 큐를 제공하는 서비스이다. Snowpipe는 데이터가 오브젝트 스토리지(S3)에 업로드 되면 이를 감지하여 마이크로 배치로 자동으로 Snowflake 테이블로 로드하는 역할을 하게 된다. 따라서 DMS를 통해 초기 원본 데이터를 S3로 가져오고 소스 데이터베이스에 데이터 변경이 발생하면 SQS는 이러한 메세지를 Snowpipe로 전달한다. 이후 Snowpipe를 통해 변경분 데이터를 Snowflake 물리 테이블로 지속적 복제를 진행하게 되고 Dynamic Table을 활용하여 일정한 주기별로 최신 데이터를 반영하게 된다. 아키텍처만 봐서는 되게 복잡해보이고 어려워보일 수가 있는데 실제로 구현해보면 큰 어려움 없이 간단하게 CDC를 구현할 수 있을 것이다. 아래에서 단계별로 자세히 살펴보자.
DMS에서 S3로 데이터가 적재될 때, S3에는 두 종류로 나뉘어 데이터가 적재된다. 'LOAD' 로 시작하는 데이터는 초기 원본 데이터를 의미하게 되고 '20240819-081446041'처럼 날짜로 시작하는 데이터는 변경분 데이터를 의미한다. 데이터가 크면 클수록 당연히 초기 원본 데이터는 많아질 것이고 변경분 데이터는 지속적으로 쌓이게 될 것이다.
먼저 초기 원본 데이터를 Snowflake 물리 테이블로 적재한 다음, 변경분 데이터가 S3에 적재되면 Snowpipe는 이를 감지하여 물리 테이블에 지속적으로 적재하게 된다. 각각 데이터를 조회해보면 실 데이터뿐만 아니라 TRANSACTIONS 컬럼과 TRANSACTION_ID 컬럼이 추가 된 것을 볼 수 있다. 이는 DMS를 통해 데이터를 FETCH 할 때 추가한 컬럼으로 TRANSACTIONS 컬럼은 트랜잭션(INSERT, UPDATE, DELETE) FLAG 정보를 가지고 있는 컬럼이고, TRANSACTION_ID 컬럼은 타임스탬프와 자동 증분 숫자로 구성된 정보를 가지고 있다. 이러한 컬럼을 추가한 이유는 어떤 트랜잭션이 발생한 것인지 판별하기 위함도 있고 실제 마이그레이션 프로젝트를 진행하다보면 언제 최신화가 진행되었는지를 판별할 수 있는 UPDATE_DT 컬럼 등이 있는데 해당 컬럼의 데이터는 정상적으로 최신화가 되지 않아 이를 활용할 수 없는 경우가 많았다. 따라서 이를 해결하기 위해 두 컬럼을 추가한 것이다. 이로 인해 대상 테이블에 UPDATE 시간 정보를 가지고 있는 컬럼이 없어도 해당 값을 통해 Snowflake에는 최신 데이터를 반영할 수 있게 된다. 그럼 마지막으로 Dynamic Table을 활용하여 최신 데이터를 어떻게 반영하게 되는지 확인해보자.
앞서 말했듯이 Snowflake의 Dynamic Table은 실시간으로 변경된 데이터를 자동으로 처리하고, 사용자 정의 쿼리를 기반으로 최신 상태의 데이터를 유지할 수 있도록 도와주는 테이블이다. 따라서 위 쿼리처럼 최신 데이터를 반영하기 위해 순서를 부여하는 컬럼을 생성하여 최신 데이터는 반영하고 삭제된 데이터는 반영하지 않는 로직을 통해 최신 데이터를 반영하게 된다. 또한 실제로 사용할 컬럼만 지정하여 불필요한 메타 컬럼은 반영하지 않도록 한다.
옵션 2. External Table을 통한 CDC 로그 적용
Snowflake의 External Table은 데이터를 Snowflake에 적재하지 않고도 S3의 데이터를 조회할 수 있는 테이블이다. 두 번째 옵션으로는 External Table을 활용하여 CDC를 적용시켜보자.
Dynamic Table을 활용하여 CDC를 적용시켰던 아키텍처와 크게 다르지 않다. External Table은 S3의 데이터를 바로 조회할 수 있기 때문에 Dynamic Table을 활용할 때와 달리 Snowpipe는 필요 없게 된다. Stream은 테이블의 변경사항을 추적하고 변경분 데이터를 저장하는 Snowflake의 객체로 External Table의 변경 사항을 캡쳐하고 저장하게 된다. 이후 Stream이 저장하고 있는 최신 데이터를 사용자가 바라보는 최종 테이블인 물리 테이블로 적재하여 이를 통해 사용자는 최신 데이터를 가지고 있는 테이블을 사용할 수 있게 된다. 단계별로 살펴보자.
DMS를 통해 S3로 데이터를 가져오는 것은 동일하다. 초기 원본 데이터와 변경분 데이터가 지속적으로 적재된다.
External Table을 조회해보면 위와 같다. 초기 원본 데이터와 변경분 데이터를 가지고 있는 것을 확인할 수 있다. 역시나 TRASACTIONS 컬럼과 TRANSACTION_ID 컬럼을 추가하였고 앞서 설명한 로그성 테이블과 같은 역할을 하게 된다.
이후 External Table에 대한 변경분을 캡쳐하는 Stream을 조회하면 위와 같은 것을 확인할 수 있다. Stream은 External Table과 동일한 형식을 가지게 되며 오로지 변경분에 대한 데이터만 갖고 있는다. 즉, 변경분에 대한 데이터만 갖고 있음으로써, 이후 이어지는 과정에서 속도를 향상시킬 수 있게 된다. 그럼 마지막 과정을 살펴보자.
최종적으로 MERGE INTO 작업을 통해 타겟 테이블에 최신 데이터를 반영하게 된다. 하지만 이는 External Table이 아닌 Stream에서 MERGE를 치기 때문에 반영 속도를 향상시킬 수 있게 된다. 앞서 말했듯이 Stream은 변경분 데이터만 가지고 있기 때문에 이러한 성능 향상을 가져올 수 있었다. 또한 Dynamic Table을 생성할 때와 동일하게 불필요한 메타 컬럼을 제외하고 최신 데이터를 반영한 테이블을 조회하면 아래와 같은 데이터를 확인할 수 있게 된다.
이번 글에서는 Oracle에서 Snowflake로 CDC 데이터 처리하는 방법을 정리해보았다. Dynamic Table을 활용하는 방법 또는 External Table & Stream을 활용하는 방법 두 가지 모두 직접 수행해보면 크게 어렵지 않다는 것을 알 수 있게 된다.