본문으로 바로가기
본문으로 바로가기

고속 시계열 분석을 위한 materialized view 기반 롤업 구축

이 튜토리얼에서는 materialized views를 사용하여 대용량 이벤트 테이블에서 미리 집계된 롤업 데이터를 관리하는 방법을 설명합니다. 원본 테이블, 롤업 테이블, 그리고 롤업 테이블에 자동으로 데이터를 쓰는 materialized view의 세 가지 객체를 생성합니다.

이 패턴을 사용할 때

다음과 같은 경우에 이 패턴을 사용합니다.

  • 추가 전용 이벤트 스트림(클릭, 페이지뷰, IoT, 로그)이 있는 경우
  • 대부분의 쿼리가 시간 범위(분/시간/일 단위)를 기준으로 한 집계인 경우
  • 모든 원본 행을 재스캔하지 않고 일관된 1초 미만 읽기 성능을 원하는 경우

원시 이벤트 테이블 생성

CREATE TABLE events_raw
(
    event_time   DateTime,
    user_id      UInt64,
    country      LowCardinality(String),
    event_type   LowCardinality(String),
    value        Float64
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(event_time)
ORDER BY (event_time, user_id)
TTL event_time + INTERVAL 90 DAY DELETE

참고

  • PARTITION BY toYYYYMM(event_time)은 파티션 크기를 작게 유지해 손쉽게 삭제할 수 있습니다.
  • ORDER BY (event_time, user_id)는 시간 범위 기반 쿼리와 2차 필터링을 지원합니다.
  • LowCardinality(String)를 사용하면 범주형 차원을 저장할 때 메모리를 절약할 수 있습니다.
  • TTL은 90일 후 원시 데이터를 정리합니다(데이터 보존 요구 사항에 맞게 기간을 조정하십시오).

롤업(집계) 테이블 설계

시간별 단위로 사전 집계합니다. 가장 일반적인 분석 기간에 맞춰 집계 단위(grain)를 선택하세요.

CREATE TABLE events_rollup_1h
(
    bucket_start  DateTime,            -- start of the hour
    country       LowCardinality(String),
    event_type    LowCardinality(String),
    users_uniq    AggregateFunction(uniqExact, UInt64),
    value_sum     AggregateFunction(sum, Float64),
    value_avg     AggregateFunction(avg, Float64),
    events_count  AggregateFunction(count)
)
ENGINE = AggregatingMergeTree
PARTITION BY toYYYYMM(bucket_start)
ORDER BY (bucket_start, country, event_type)

집계 상태(예: AggregateFunction(sum, ...))를 저장합니다. 이는 부분 집계를 간결하게 표현하며 나중에 병합하거나 최종화할 수 있습니다.

롤업을 채우는 구체화된 뷰(Materialized View) 생성하기

이 materialized view는 events_raw에 데이터가 삽입될 때 자동으로 실행되며 집계 상태를 롤업에 기록합니다.

CREATE MATERIALIZED VIEW mv_events_rollup_1h
TO events_rollup_1h
AS
SELECT
    toStartOfHour(event_time) AS bucket_start,
    country,
    event_type,
    uniqExactState(user_id)   AS users_uniq,
    sumState(value)           AS value_sum,
    avgState(value)           AS value_avg,
    countState()              AS events_count
FROM events_raw
GROUP BY bucket_start, country, event_type;

샘플 데이터 삽입하기

샘플 데이터를 삽입하세요:

INSERT INTO events_raw VALUES
    (now() - INTERVAL 4 SECOND, 101, 'US', 'view', 1),
    (now() - INTERVAL 3 SECOND, 101, 'US', 'click', 1),
    (now() - INTERVAL 2 SECOND, 202, 'DE', 'view', 1),
    (now() - INTERVAL 1 SECOND, 101, 'US', 'view', 1);

롤업 쿼리 실행하기

읽기 시점에 상태를 **병합(merge)**하거나 **최종화(finalize)**할 수 있습니다:

SELECT
    bucket_start,
    country,
    event_type,
    uniqExactMerge(users_uniq) AS users,
    sumMerge(value_sum)        AS value_sum,
    avgMerge(value_avg)        AS value_avg,
    countMerge(events_count)   AS events
FROM events_rollup_1h
WHERE bucket_start >= now() - INTERVAL 1 DAY
GROUP BY ALL
ORDER BY bucket_start, country, event_type;

읽기가 항상 롤업을 참조할 것으로 예상되는 경우, 동일한 1시간 단위로 최종 확정된 값을 "일반" MergeTree 테이블에 기록하는 두 번째 materialized view를 생성할 수 있습니다. 상태(State)는 더 많은 유연성을 제공하는 반면, 최종 확정된 값은 읽기를 약간 더 단순하게 만듭니다.

최상의 성능을 위해 기본 키 필드로 필터링하세요

EXPLAIN 명령을 사용하여 인덱스가 데이터 프루닝에 어떻게 사용되는지 확인하실 수 있습니다:

EXPLAIN indexes=1
SELECT *
FROM events_rollup_1h
WHERE bucket_start BETWEEN now() - INTERVAL 3 DAY AND now()
  AND country = 'US';
        ┌─explain────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
    1.  │ Expression ((Project names + Projection))                                                                                          │
    2.  │   Expression                                                                                                                       │
    3.  │     ReadFromMergeTree (default.events_rollup_1h)                                                                                   │
    4.  │     Indexes:                                                                                                                       │
    5.  │       MinMax                                                                                                                       │
    6.  │         Keys:                                                                                                                      │
    7.  │           bucket_start                                                                                                             │
    8.  │         Condition: and((bucket_start in (-Inf, 1758550242]), (bucket_start in [1758291042, +Inf)))                                 │
    9.  │         Parts: 1/1                                                                                                                 │
    10. │         Granules: 1/1                                                                                                              │
    11. │       Partition                                                                                                                    │
    12. │         Keys:                                                                                                                      │
    13. │           toYYYYMM(bucket_start)                                                                                                   │
    14. │         Condition: and((toYYYYMM(bucket_start) in (-Inf, 202509]), (toYYYYMM(bucket_start) in [202509, +Inf)))                     │
    15. │         Parts: 1/1                                                                                                                 │
    16. │         Granules: 1/1                                                                                                              │
    17. │       PrimaryKey                                                                                                                   │
    18. │         Keys:                                                                                                                      │
    19. │           bucket_start                                                                                                             │
    20. │           country                                                                                                                  │
    21. │         Condition: and((country in ['US', 'US']), and((bucket_start in (-Inf, 1758550242]), (bucket_start in [1758291042, +Inf)))) │
    22. │         Parts: 1/1                                                                                                                 │
    23. │         Granules: 1/1                                                                                                              │
        └────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

위의 쿼리 실행 계획은 세 가지 유형의 인덱스가 사용되고 있음을 보여줍니다: MinMax 인덱스, 파티션 인덱스, 그리고 기본 키 인덱스. Each index makes use of fields specified in our primary key: (bucket_start, country, event_type). For best filtering performance you will want to make sure that your queries are making use of primary key fields to prune data.

일반적인 변형 사례

  • 서로 다른 집계 단위: 일 단위 롤업을 추가합니다:
CREATE TABLE events_rollup_1d
(
    bucket_start Date,
    country      LowCardinality(String),
    event_type   LowCardinality(String),
    users_uniq   AggregateFunction(uniqExact, UInt64),
    value_sum    AggregateFunction(sum, Float64),
    value_avg    AggregateFunction(avg, Float64),
    events_count AggregateFunction(count)
)
ENGINE = AggregatingMergeTree
PARTITION BY toYYYYMM(bucket_start)
ORDER BY (bucket_start, country, event_type);

그런 다음 두 번째 materialized view:

CREATE MATERIALIZED VIEW mv_events_rollup_1d
TO events_rollup_1d
AS
SELECT
    toDate(event_time) AS bucket_start,
    country,
    event_type,
    uniqExactState(user_id),
    sumState(value),
    avgState(value),
    countState()
FROM events_raw
GROUP BY ALL;
  • 압축(Compression): 원본 테이블의 대용량 컬럼에 코덱을 적용합니다(예: Codec(ZSTD(3))).
  • 비용 관리: 긴 보존 기간은 원시 테이블에 적용하고, 롤업은 장기간 유지합니다.
  • 백필링(Backfilling): 과거 데이터를 적재할 때는 events_raw에 데이터를 삽입하면 materialized view가 롤업을 자동으로 생성합니다. 기존 행에는 필요에 따라 materialized view를 생성할 때 POPULATE를 사용하거나 INSERT SELECT를 사용합니다.

정리 및 보관

  • 원시 TTL(예: 30/90일)은 늘리되 롤업된 데이터는 더 오래(예: 1년) 유지합니다.
  • 티어링이 활성화되어 있으면 이동용 TTL을 사용하여 오래된 파트를 더 저렴한 스토리지로 이동할 수도 있습니다.

문제 해결

  • materialized view가 업데이트되지 않습니까? 삽입이 롤업 테이블이 아닌 events_raw 테이블로 들어가고 있는지, 그리고 materialized view의 대상이 올바른지(TO events_rollup_1h)를 확인하십시오.
  • 쿼리가 느린 경우, 해당 쿼리가 롤업에 적중하는지(롤업 테이블을 직접 쿼리)와 시간 필터가 롤업 그레인에 맞게 설정되어 있는지 확인하십시오.
  • 백필 결과에 불일치가 있습니까? SYSTEM FLUSH LOGS를 실행한 다음 system.query_log / system.parts를 확인하여 insert와 merge가 수행되었는지 확인하십시오.