Feel The Power of DuckDB#

Let’s feel the power of DuckDB!

[1]:
import typing as T
import textwrap
from datetime import date, timedelta

import mpire
import numpy as np
import pandas as pd
import polars as pl
from faker import Faker
from s3pathlib import S3Path, context
from boto_session_manager import BotoSesManager
from fixa.timer import DateTimeTimer

import duckdb

Prepare#

[2]:
# define the boto3 session for this POC
bsm = BotoSesManager(profile_name="awshsh_app_dev_us_east_1")

# set the boto3 session as the default for s3pathlib
context.attach_boto_session(bsm.boto_ses)

# this is the bucket where we upload test data
s3bucket = S3Path(f"s3://{bsm.aws_account_id}-{bsm.aws_region}-data/")

# this is the S3 location for the "database"
s3dir_root = s3bucket.joinpath("projects", "duckdb", "db").to_dir()

# this is the S3 location for the "table"
s3dir_t_poc = s3dir_root.joinpath("poc").to_dir()

# we need the faker library to generate dummy text data
fake = Faker()

print(f"preview s3dir_root: {s3dir_root.console_url}")
preview s3dir_root: https://console.aws.amazon.com/s3/buckets/807388292768-us-east-1-data?prefix=projects/duckdb/db/

Create Test Data#

The test data has four columns:

  • id: integer, from 1, 2, 3, …, is the globally unique id for each record

  • number: integer, is a random value from 0 ~ 1,000,000, we can use this column for range query

  • time_str: string, is a timestamp in %Y-%m-%d %H:%M:%S.%f format, the vlaue is from 2022-01-01 00:00:00 to 2023-01-01 00:00:00 we can use this column for range query

  • text: string, is a random text, we can use this column for full-text-search query

the create_data function can create a data.parquet file at the s3://my-bucket/prefix/date=${date}/data.parquet location.

[5]:
def delete_all_data():
    s3dir_root.delete()


def create_data(ith: int, date: str, dry_run: bool = False):
    print(f"working on {date} ...")
    n_row = 1000000
    df = pd.DataFrame()
    df["id"] = range((ith-1) * n_row + 1, ith * n_row + 1)
    df["number"] = np.random.randint(0, 1000000, n_row)
    df["time_str"] = pd.to_datetime(
        np.random.choice(
            pd.date_range("2022-01-01", "2023-01-01", periods=n_row),
            n_row,
            replace=False,
        )
    )
    df["text"] = [fake.sentence() for _ in range(n_row)]
    df = pl.from_pandas(df)
    s3path = s3dir_t_poc.joinpath(f"date={date}", "data.parquet")
    if dry_run is False:
        with s3path.open("wb") as f:
            df.write_parquet(f, row_group_size=250000)
    return df

Below is a sample data created by the create_data function.

[9]:
# create on day's data, just for preview
df = create_data(1, date="2021-01-01", dry_run=True)
with pl.Config(fmt_str_lengths=120):
    print(df)
working on 2021-01-01
shape: (1_000_000, 4)
┌─────────┬────────┬───────────────────────────────┬───────────────────────────────────────────────┐
│ id      ┆ number ┆ time_str                      ┆ text                                          │
│ ---     ┆ ---    ┆ ---                           ┆ ---                                           │
│ i64     ┆ i64    ┆ datetime[ns]                  ┆ str                                           │
╞═════════╪════════╪═══════════════════════════════╪═══════════════════════════════════════════════╡
│ 1       ┆ 679064 ┆ 2022-08-08 09:02:44.106164108 ┆ Particularly second accept record             │
│         ┆        ┆                               ┆ professional actually scene.                  │
│ 2       ┆ 615755 ┆ 2022-09-03 13:20:54.432054432 ┆ Something democratic benefit training common. │
│ 3       ┆ 344864 ┆ 2022-05-07 07:29:50.609390610 ┆ Value yes soldier inside worker successful    │
│         ┆        ┆                               ┆ term number.                                  │
│ 4       ┆ 42400  ┆ 2022-10-18 12:31:44.301104300 ┆ Even vote issue trial door.                   │
│ …       ┆ …      ┆ …                             ┆ …                                             │
│ 999997  ┆ 583034 ┆ 2022-01-12 09:25:34.152334152 ┆ Say fill move direction who each.             │
│ 999998  ┆ 724490 ┆ 2022-02-02 19:26:13.874773875 ┆ Candidate second character nor.               │
│ 999999  ┆ 480276 ┆ 2022-02-10 18:15:54.785754786 ┆ Present serve then commercial accept weight   │
│         ┆        ┆                               ┆ fly.                                          │
│ 1000000 ┆ 892577 ┆ 2022-12-02 12:37:13.917433916 ┆ Hold any security pull.                       │
└─────────┴────────┴───────────────────────────────┴───────────────────────────────────────────────┘

create_all_data is a wraper of the create_data function. It leverage multiple CPU core to create 100 files in parallel.

[ ]:
def create_all_data():
    print(f"preview at: {s3dir_t_poc.console_url}")
    s3dir_t_poc.delete()

    kwargs_list = list()
    n_date = 100
    start_date = date(2021, 1, 1)
    for i in range(n_date):
        new_date = start_date + timedelta(days=i)
        kwargs = {"ith": i+1, "date": new_date}
        kwargs_list.append(kwargs)

    with mpire.WorkerPool(n_jobs=12) as pool:
        pool.map(create_data, kwargs_list)
[10]:
# create all test data
# create_all_data()

Below is the spec of the test dataset:

  • We have 100 days’ data from 2021-01-01 to 2021-04-10, data is partitioned by date

  • Each day has 1M records, total size for each day is around 25MB

  • total size for 100 day’s data is 2.5GB

[33]:
n_file, total_size = s3dir_t_poc.calculate_total_size(for_human=True)
print(f"n_file = {n_file}")
print(f"total_size = {total_size}")
n_file = 100
total_size = 2.54 GB

Declare Common SQL Snippet#

Here we declare some common SQL command that will be used frequently.

[4]:
# common SQL snippet
# enable the httpfs (HTTP file system plugin https://duckdb.org/docs/extensions/httpfs), so we can read data from AWS S3
sql_httpfs = textwrap.dedent(
    f"""
INSTALL httpfs;
LOAD httpfs;
"""
)

# enable the fts (full text search plugin https://duckdb.org/docs/extensions/full_text_search), so we can build a full text search index
sql_fts = textwrap.dedent(
    f"""
INSTALL fts;
LOAD fts;
"""
)

# set AWS credential
sql_credential = textwrap.dedent(
    f"""
SET s3_region='us-east-1';
SET s3_access_key_id='{bsm.boto_ses.get_credentials().access_key}';
SET s3_secret_access_key='{bsm.boto_ses.get_credentials().secret_key}';
"""
)

# this is the 100 days' data source that leveraging the "date" partition
sql_from_table_all_parquet = f"read_parquet('{s3dir_t_poc.uri}*/*.parquet', hive_partitioning=1)"

# this is only one file
s3path = s3dir_t_poc.joinpath("date=2021-02-01/data.parquet")
sql_from_table_one_parquet = f"read_parquet('{s3path.uri}')"

Example 1 - Filter by Partition and Integer Column#

This query leverages the Hive partition and parquet row group metadata (number of record for each row group) to avoid data scan.

[39]:
sql = textwrap.dedent(
    f"""
SELECT
    COUNT(t.id) as count
FROM {sql_from_table_all_parquet} t
WHERE
    (t.date BETWEEN '2021-02-01' AND '2021-02-07')
;
"""
)
with DateTimeTimer():
    duckdb.sql(sql_httpfs)
    duckdb.sql(sql_credential)
    duckdb.sql(sql).show()
┌─────────┐
│  count  │
│  int64  │
├─────────┤
│ 7000000 │
└─────────┘

from 2023-09-14 02:36:16.344082 to 2023-09-14 02:36:18.235196 elapsed 1.891114 second.
[40]:
# this is how people usually do this
with DateTimeTimer():
    count = 0
    for s3path in s3dir_t_poc.iter_objects():
        if "date=2021-02-01" <= s3path.dirname <= "date=2021-02-07":
            with s3path.open("rb") as f:
                df = pl.read_parquet(f)
                count += df.shape[0]
    print(f"count: {count}")
count: 7000000
from 2023-09-14 02:36:38.875767 to 2023-09-14 02:37:09.511694 elapsed 30.635927 second.

Example 2 - Filter by Partition and Integer Column#

This query leverages the Hive partition and parquet row group statistic metadata (min and max value of the number column in the row group) to reduce data scan.

[44]:
sql = textwrap.dedent(
    f"""
SELECT
    count(t.id) as count
FROM {sql_from_table_all_parquet} t
WHERE
    (t.date BETWEEN '2021-02-01' AND '2021-02-07')
    AND t.number BETWEEN 600000 AND 700000
;
"""
)
with DateTimeTimer():
    duckdb.sql(sql_httpfs)
    duckdb.sql(sql_credential)
    duckdb.sql(sql).show()
┌────────┐
│ count  │
│ int64  │
├────────┤
│ 701051 │
└────────┘

from 2023-09-14 02:39:15.366255 to 2023-09-14 02:39:17.383965 elapsed 2.017710 second.

Example 3 - Filter by Partition and Sortable String Column#

Just another example of range query on string column

[45]:
sql = textwrap.dedent(
    f"""
SELECT
    count(t.id) as count
FROM {sql_from_table_all_parquet} t
WHERE
    (t.date BETWEEN '2021-02-01' AND '2021-02-07')
    AND (t.time_str BETWEEN '2022-06-01' AND '2022-07-01')
;
"""
)
with DateTimeTimer():
    duckdb.sql(sql_httpfs)
    duckdb.sql(sql_credential)
    duckdb.sql(sql).show()
┌────────┐
│ count  │
│ int64  │
├────────┤
│ 575337 │
└────────┘

from 2023-09-14 02:39:18.727477 to 2023-09-14 02:39:22.165245 elapsed 3.437768 second.

Conclusion#

If the necessary data involved in the computation can be narrow down using partition, row group statistics, and it fit the memory, DuckDB is as fast as Athena (maybe event faster because you don’t need to read result from S3).

[ ]: