Polars vs Pandas Benchmark in AWS Lambda#
Keywords: Polars, Pandas, AWS, Lambda, Glue, ETL, Demo
Overview#
Polars is a lightning-fast dataframe library built on top of Rust. It also has Python SDK that using the compiled rust code under the hood. It has the following advantage over pandas:
Due to the underlying arrow2 implementation, it is multi-thread friendly that uses multiple CPU cores by default. It is usually 2-5x faster than pandas when reading columnar data format (parquet), 8-10x faster than pandas when reading row oriented dataformat (CSV, JSON).
The memory data model in polars is more compact and efficient. As a result, the memory consumption of polars Dataframe usually is as same as the raw data (sometime even less due to the compression). Comparing to pandas, it usually uses memory that is double size of the raw data (when you have string).
Most column-oriented transformation uses vectorized operation and multiple CPU core out-of-the-box, which results in 4-8 times faster than pandas.
Polars use lazy load and zero-copy technique heavily. Just like transformation and action concepts in Spark, it only execute the transformation when necessary so that there’s no need to copy the data for intermediate step. Comparing to pandas, it usually use 1/4 ~ 1/10 memory than pandas for transformation that depends on the number of intermediate steps.
AWS Glue is a service that allows developer to run ETL job on spark without provisioning any infrastructure. However, the AWS Glue development experience is not very good. By default, the development environment is not interactive, you usually has to wait at least 2-5 minutes to execute even just one line of code. Glue has an option to use Jupyter Notebook for development. However, it requires some setup and the Glue Job runtime and Jupyter Notebook runtime are not exactly the same, so it may cause unexpected behavior in production. The last challenge would be the deployment and DevOps. Most modern application support immutable deployment, version control, blue green deployment, and canary deployment. However, AWS Glue ETL job doesn’t support any of these out of the box.
AWS Lambda is a service that allow developer to run code in a container runtime without provisioning any infrastructure, and also no need to setup the programming language runtime. However, it has a hard limit that can only have 10GB memory and 15 minutes execution time.
Usually, AWS Lambda is not a good option for Big Data ETL Process. However, based on my career experience, most of the necessary data in an ETL job is less than 1M rows. For example, your data lake may have 1B rows of data, but in your ETL job, you may only need 1M rows for calculation.
In this document, I would like to explore the possibility to use AWS Lambda + polars Python library to perform medium size dataset ETL job..
Experimental Design#
Data Schema:
Has 25 columns.
5 columns’ data type is int64, values are between 1 and 100000, example: 397647.
5 columns’ data type is float, values are between 0, 1, example: 0.44934012731611805.
5 columns’ data type is short string, values are uuid string, example:
daa03354-c777-4b4a-b649-30998f7bd9e3
.5 columns’ data type is long string, values are lorem ipsum text has 3 ~ 6 sentences, example:
Picture wait add environment PM weight music. Type tax chair friend. Data might read value three involve.
.5 columns’ data type is timestamp, values are from random datetime in microseconds from 2000-01-01 to 2023-01-01, example:
2008-11-08T14:37:77.638096Z
.
Data Files:
We create 100 files.
Each file has 100,000 rows.
Data file format is parquet with snappy compression.
Each file is about 60MB with snappy compressed. 85MB if uncompressed.
Lambda Function:
Python3.9
Memory: 10238 MB (cap is 10GB)
Architecture: x84_64
Python Libraries (release time near 2023-01-01):
polars == 1.7.15
pandas == 1.5.3, pyarrow == 9.0.0
We try to read as many file as we could. If we can fit 10M rows in memory, usually we can handle 1M (1/10) rows’ dataset. It is because we may have to create copy of the data during data transformation.
Experiment Result#
Result sheet description (All the measurement is the average of 10 lambda invocation):
engine: polars or pandas
n_files: how many files we read
n_rows: how many rows we read (we can handle 1/10 of this number in production)
raw size: the total raw parquet file size (no compression)
polars_time: how long it take to read all files with polars
pandas_time: how long it take to read all files with pandas
polars_mem: polars memory usage
pandas_mem: pandas memory usage
n_files |
n_rows |
raw_size (MB) |
polars_time (sec) |
pandas_time (sec) |
polars_mem (MB) |
pandas_mem (MB) |
---|---|---|---|---|---|---|
1 |
100,000 |
85 |
0.8 |
1.3 |
325 |
600 |
10 |
1,000,000 |
850 |
8 |
13 |
1,200 |
1,900 |
20 |
2,000,000 |
1,700 |
16 |
25 |
2,200 |
3,450 |
30 |
3,000,000 |
2,550 |
23 |
40 |
3,050 |
4,900 |
40 |
4,000,000 |
3,400 |
32 |
51 |
4,000 |
6,400 |
50 |
5,000,000 |
4,250 |
42 |
63 |
5,000 |
7,900 |
60 |
6,000,000 |
5,100 |
53 |
85 |
5,950 |
9,400 |
70 |
7,000,000 |
5,950 |
… |
OOM |
… |
OOM |
80 |
8,000,000 |
6,800 |
… |
OOM |
… |
OOM |
90 |
9,000,000 |
7,650 |
… |
OOM |
… |
OOM |
100 |
10,000,000 |
8,500 |
110 |
OOM |
9,500 |
OOM |
Conclusion#
Due to the compact in memory data structure implementation, to dataframe size is similar to the raw data size in polars, but pandas spend 2X more than the raw data size.
Including the S3 file IO times, polars read parquet file 1.5 ~ 2X faster than pandas if dataset has lots of string. In the official benchmark, polars is 8-10X faster than pandas when reading a CSV / JSON.
With Polars, we are able to process 1M rows dataset in Lambda Function. Potentially more because polars use lazy load and zero copy technique to reduce memory usage, it less likely you really need to create copy the data.
With Pandas, we are able to process 650K rows dataset in Lambda Function.
- If your ETL job source dataset is less than 1M rows (decrease this number if your average size of the row is larger than this experiment, vice versa), and your ETL job doesn’t requires special write engine like Delta Lake, Hudi, IceBerg, you can consider using Lambda + Polars to do ETL job that originally been down in AWS Glue. And you get these goodies:
better development experience in Lambda
easy to test and you can fully test your code in unit test
better deployment strategy (versioned deployment, blue/green, canary, out-of-the-box)
easy to orchestrate
easy to integrate with other service
Additional Thought#
If your input data is not a list of files, it is actually a result of a SQL query, you can use AWS Athena to run the query (there is a 200 concurrent limit), and load the result into Lambda Function.
Code Example#
1# -*- coding: utf-8 -*-
2
3import os
4import uuid
5import random
6
7import numpy as np
8import pandas as pd
9import polars as pl
10from faker import Faker
11from mpire import WorkerPool
12from fixa.timer import DateTimeTimer
13from s3pathlib import S3Path, context
14from boto_session_manager import BotoSesManager
15
16
17bsm = BotoSesManager(profile_name="awshsh_app_dev_us_east_1")
18context.attach_boto_session(bsm.boto_ses)
19fake = Faker()
20
21s3dir_root = S3Path(
22 f"s3://{bsm.aws_account_id}-{bsm.aws_region}-data"
23 "/projects/polars_benchmark_in_aws_lambda/"
24).to_dir()
25print(f"preview at: {s3dir_root.console_url}")
26
27
28n_files = 100
29n_rows = 100000
30
31
32def generate_one_file(ith_file: int):
33 print(f"working on {ith_file} th file")
34
35 df = pl.DataFrame()
36 for id in range(1, 1 + 5):
37 col = f"col_{id}"
38 df = df.with_columns(
39 pl.Series(
40 name=col,
41 values=np.random.randint(1, 1000000, size=n_rows),
42 )
43 )
44
45 for id in range(6, 6 + 5):
46 col = f"col_{id}"
47 df = df.with_columns(
48 pl.Series(
49 name=col,
50 values=np.random.rand(n_rows),
51 )
52 )
53
54 for id in range(11, 11 + 5):
55 col = f"col_{id}"
56 df = df.with_columns(
57 pl.Series(
58 name=col,
59 values=[str(uuid.uuid4()) for _ in range(n_rows)],
60 )
61 )
62
63 for id in range(16, 16 + 5):
64 col = f"col_{id}"
65 df = df.with_columns(
66 pl.Series(
67 name=col,
68 values=[" ".join(fake.sentences()) for _ in range(n_rows)],
69 )
70 )
71
72 for id in range(21, 21 + 5):
73 col = f"col_{id}"
74 start = "{}-{}-{}".format(
75 random.randint(2001, 2020),
76 random.randint(1, 12),
77 random.randint(1, 28),
78 )
79 df = df.with_columns(
80 pl.Series(
81 name=col,
82 values=pd.date_range(start=start, periods=n_rows, freq="S"),
83 )
84 )
85
86 s3path = s3dir_root.joinpath(
87 "parquet",
88 f"{str(ith_file).zfill(9)}.snappy.parquet",
89 )
90 with s3path.open("wb") as f:
91 df.write_parquet(
92 f,
93 compression="snappy", # 60MB
94 # compression="uncompressed", # 85MB
95 )
96
97 s3path = s3dir_root.joinpath(
98 "csv",
99 f"{str(ith_file).zfill(9)}.csv",
100 )
101 with s3path.open("wb") as f:
102 df.write_csv(f, has_header=True)
103
104 s3path = s3dir_root.joinpath(
105 "json",
106 f"{str(ith_file).zfill(9)}.json",
107 )
108 with s3path.open("wb") as f:
109 df.write_ndjson(f)
110
111
112kwargs = [{"ith_file": ith_file} for ith_file in range(1, 1 + n_files)]
113with DateTimeTimer():
114 with WorkerPool(n_jobs=os.cpu_count()) as pool:
115 results = pool.map(generate_one_file, kwargs)
1# -*- coding: utf-8 -*-
2
3from datetime import datetime
4
5import boto3
6import polars as pl
7from pathlib import Path
8from s3pathlib import S3Path, context
9
10boto_ses = boto3.session.Session()
11context.attach_boto_session(boto_ses)
12s3_client = boto_ses.client("s3")
13
14path_tmp_parquet = Path("/tmp/temp.parquet")
15
16def lambda_handler(event, context):
17 df_list = list()
18 n = 1
19 start = datetime.utcnow()
20 for ith_file in range(1, 1 + n):
21 print(f"read {ith_file} file")
22 s3path = S3Path(
23 f"s3://807388292768-us-east-1-data"
24 f"/projects/polars_benchmark_in_aws_lambda/parquet"
25 f"/{str(ith_file).zfill(9)}.snappy.parquet"
26 )
27 path_tmp_parquet.unlink(missing_ok=True)
28 s3_client.download_file(
29 s3path.bucket,
30 s3path.key,
31 str(path_tmp_parquet),
32 )
33 df = pl.read_parquet(str(path_tmp_parquet))
34
35 # with s3path.open("rb") as f:
36 # df = pl.read_parquet(f)
37
38 df_list.append(df)
39
40 elapsed = int((datetime.utcnow() - start).total_seconds())
41 print(f" done, elapsed {elapsed} seconds")
42 # print(df.shape)
43
44
45# lambda_handler(None, None)
1# -*- coding: utf-8 -*-
2
3from datetime import datetime
4import awswrangler as wr
5
6
7def lambda_handler(event, context):
8 df_list = list()
9 n = 1
10 start = datetime.utcnow()
11 for ith_file in range(1, 1 + n):
12 print(f"read {ith_file} file")
13
14 uri = (
15 f"s3://807388292768-us-east-1-data"
16 f"/projects/polars_benchmark_in_aws_lambda/parquet"
17 f"/{str(ith_file).zfill(9)}.snappy.parquet"
18 )
19 df = wr.s3.read_parquet(uri)
20 df_list.append(df)
21
22 elapsed = int((datetime.utcnow() - start).total_seconds())
23 print(f" done, elapsed {elapsed} seconds")
24 print(df.shape)
25
26
27lambda_handler(None, None)