Process Super Big JSON File#

有的时候我们需要处理体积巨大的 JSON 文件. 如果是 multi line JSON 文件, 那事情就比较简单了, 你可以轻易的遍历所有行, 只是花点时间而已. 而如果是一个巨大的 Object, 一个 array 最终是 nest 在很深的地方, 并且整个 JSON 仅仅只有 1 行, 这就比较难办了.

主要思路

一般一个 JSON 之所以大, 通常是有某一个 Array 里面有非常多的 item. 或者一个 Dictionary 里面有很多很多的 Key (1, 2, 3, …). 所以我们只要能将这些 Item 分拆出来成为小文件, 然后把剩下的数据写入到另外一个文件.

工具

ijson 是一个基于 C 的 Python 库, 能顺序的读入字节流来解析 JSON, 这样能节约非常多的内存. 基于 ijson 库, 我们能定位到有很多 item 的 Array 节点, 然后将这些数据拆分出来. 换言之, 我们用额外的时间节约了内存空间.

下面是两个脚本, 分别是处理本地文件, 和处理在 S3 上的文件的版本.

  1# -*- coding: utf-8 -*-
  2
  3"""
  4This is a solution can split large JSON file into smaller chunks (if there is a
  5big array node) without using much memory.
  6
  7Use case 1:
  8
  9Input::
 10
 11    {
 12        "id": 1,
 13        "records": [
 14            {"key": 1},
 15            {"key": 2},
 16            {"key": 3},
 17            ...
 18        ],
 19        "name: "alice"
 20    }
 21
 22Output::
 23
 24    # data.json
 25    {
 26        "id": 1,
 27        "name: "alice"
 28    }
 29
 30    # arrays/1.json
 31    [
 32        {"key": 1},
 33        {"key": 2},
 34        ...
 35        {"key": 10}
 36    ]
 37
 38    # arrays/2.json
 39    [
 40        {"key": 11},
 41        {"key": 12},
 42        ...
 43        {"key": 20}
 44    ]
 45
 46    # arrays/10.json
 47    [
 48        {"key": 91},
 49        {"key": 92},
 50        ...
 51        {"key": 100}
 52    ]
 53
 54Use case 2:
 55
 56Input::
 57
 58    {
 59        "id": 1,
 60        "data": {
 61            "date": "2000-01-01",
 62            "records": [
 63                {"key": 1},
 64                {"key": 2},
 65                {"key": 3},
 66                ...
 67            ],
 68        },
 69        "name: "alice"
 70    }
 71
 72Output::
 73
 74    # data.json
 75    {
 76        "id": 1,
 77        "data": {
 78            "date": "2000-01-01",
 79        },
 80        "name: "alice"
 81    }
 82
 83    # arrays/1.json
 84    [
 85        {"key": 1},
 86        {"key": 2},
 87        ...
 88        {"key": 10}
 89    ]
 90
 91    # arrays/2.json
 92    [
 93        {"key": 11},
 94        {"key": 12},
 95        ...
 96        {"key": 20}
 97    ]
 98
 99    # arrays/10.json
100    [
101        {"key": 91},
102        {"key": 92},
103        ...
104        {"key": 100}
105    ]
106
107Benchmark result:
108
109- Input file: 1G, split into 10 files
110- Macbook pro 1: 32G memory + SSD
111- Time: 12s
112"""
113
114import typing as T
115import json
116import ijson
117import shutil
118import itertools
119from pathlib import Path
120from datetime import datetime
121
122dir_here = Path(__file__).parent
123
124
125def make_data(path: Path):
126    # n_records = 10
127    # str_length = 10
128
129    n_records = 1000
130    str_length = 1000000
131
132    data = {
133        "id": 1,
134        "data": {
135            "date": "2000-01-01",
136            "records": [
137                {"k": i, "v": "a" * str_length} for i in range(1, 1 + n_records)
138            ],
139        },
140        "name": "alice",
141    }
142
143    # data = {
144    #     "id": 1,
145    #     "records": [
146    #         {"k": i, "v": "a" * str_length}
147    #         for i in range(1, 1 + n_file)
148    #     ],
149    #     "name": "alice",
150    # }
151
152    with path.open("w") as f:
153        json.dump(data, f)
154
155
156def delete_node(
157    p_in: Path,
158    json_path: str,
159) -> dict:
160    """
161    Example::
162
163        # example 1
164        >>> input_data = {
165        ...     "id": 1,
166        ...     "delete": [],
167        ... },
168        >>> json_path = "delete"
169        >>> print(output_data)
170        {
171            "id": 1
172        },
173
174        # example 2
175        >>> input_data ={
176        ...     "id": 1,
177        ...     "a": {
178        ...         "delete": []
179        ...     },
180        ... }
181        >>> json_path = "a.delete"
182        >>> print(output_data)
183        {
184            "id": 1,
185            "a": {}
186        }
187
188        # example 3
189        >>> input_data ={
190        ...     "id": 1,
191        ...     "a": {
192        ...         "a_value": 2,
193        ...         "b": {
194        ...             "delete": [],
195        ...             "b_value": 3,
196        ...         },
197        ...     },
198        ... }
199        >>> json_path = "a.b.delete"
200        >>> print(output_data)
201        {
202            "id": 1,
203            "a": {
204                "a_value": 2,
205                "b": {
206                    "b_value": 3
207                }
208            }
209        }
210    """
211    parts = json_path.split(".")
212    prefix_and_key_pairs = []
213    lst = list()
214    for part in parts:
215        prefix = ".".join(lst)
216        key = part
217        prefix_and_key_pairs.append((prefix, key))
218        lst.append(part)
219
220    new_data = dict()
221    parent_data = new_data
222    for prefix, key in prefix_and_key_pairs:
223        # print(f"------ prefix = {prefix}, key = {key} ------")
224        data = dict()
225        with p_in.open("r") as f_in:
226            for k, v in ijson.kvitems(f_in, prefix):
227                if k != key:
228                    data[k] = v
229        if prefix == "":
230            new_data = data
231        else:
232            parent_data[prefix.split(".")[-1]] = data
233        parent_data = data
234        # print("new_data:", new_data)
235
236    return new_data
237
238
239def take(n: int, iterable: T.Iterable):
240    """
241    Return first n items of the iterable as a list
242    """
243    return list(itertools.islice(iterable, n))
244
245
246def split_json(
247    p_in: Path,
248    dir_out: Path,
249    json_path: str,
250    chunk_size: int,
251):
252    """
253    :param p_in: input data path
254    :param dir_out: output data directory, it should not exist
255    :param json_path: the json path in dot notation to the array you want to split
256    :param chunk_size: group items in the array into chunks of this size
257    """
258    if dir_out.exists():
259        raise FileExistsError(f"{dir_out} already exists")
260
261    path_data = dir_out.joinpath("data.json")
262    dir_arrays = dir_out.joinpath("arrays")
263    dir_arrays.mkdir(parents=True)
264
265    # split the big json array into many small json arrays
266    with p_in.open("r") as f_in:
267        iterator = ijson.items(f_in, f"{json_path}.item")
268        for ith in range(1, 1 + 999):
269            items = take(chunk_size, iterator)
270            path_out = dir_arrays.joinpath(f"{ith}.json")
271            if len(items) == 0:
272                break
273            else:
274                with path_out.open("w") as f_out:
275                    json.dump(items, f_out)
276
277    # delete the big json array node from the original json
278    data = delete_node(p_in=p_in, json_path=json_path)
279    with path_data.open("w") as f_out:
280        json.dump(data, f_out)
281
282
283if __name__ == "__main__":
284    dir_output = dir_here / "output"
285    path_data = dir_here / "data.json"
286
287    def test_delete_node():
288        path = dir_here.joinpath("test_delete_node.json")
289        input_output_jsonpath = [
290            (
291                {
292                    "id": 1,
293                    "delete": [],
294                },
295                {
296                    "id": 1,
297                },
298                "delete",
299            ),
300            (
301                {
302                    "id": 1,
303                    "a": {"delete": []},
304                },
305                {
306                    "id": 1,
307                    "a": {},
308                },
309                "a.delete",
310            ),
311            (
312                {
313                    "id": 1,
314                    "a": {
315                        "a_value": 2,
316                        "b": {
317                            "delete": [],
318                            "b_value": 3,
319                        },
320                    },
321                },
322                {
323                    "id": 1,
324                    "a": {
325                        "a_value": 2,
326                        "b": {
327                            "b_value": 3,
328                        },
329                    },
330                },
331                "a.b.delete",
332            ),
333        ]
334        for input_data, output_data, jsonpath in input_output_jsonpath:
335            path.write_text(json.dumps(input_data))
336            result = delete_node(path, json_path=jsonpath)
337            assert result == output_data
338
339    def test_split_json():
340        shutil.rmtree(dir_output, ignore_errors=True)
341        st = datetime.utcnow()
342        split_json(
343            p_in=path_data,
344            dir_out=dir_output,
345            json_path="data.records",
346            chunk_size=120,
347        )
348        et = datetime.utcnow()
349        elapse = (et - st).total_seconds()
350        print(f"elapsed time: {elapse:.2f} seconds")
351
352    # test_delete_node()
353    # make_data(path_data)
354    # test_split_json()
  1# -*- coding: utf-8 -*-
  2
  3"""
  4This is a solution can split large JSON file into smaller chunks (if there is a
  5big array node) without using much memory.
  6
  7Requirements::
  8
  9    ijson
 10
 11**Example 1**
 12
 13- Input file: 1G, split into 10 files
 14- Memory: 10G
 15
 16if only split:
 17
 18- Duration: 25s
 19- Max Memory Used: 540 MB
 20
 21if also delete the node
 22
 23- Duration: 50s
 24- Max Memory Used: 1037 MB
 25
 26**Example 2**
 27
 28- Input file: 1G, split into 10 files
 29- Memory: 2G
 30
 31if only split:
 32
 33- Duration: 25s
 34- Max Memory Used: 540 MB
 35
 36if also delete the node
 37
 38- Duration: 50s
 39- Max Memory Used: 1037 MB
 40"""
 41
 42import typing as T
 43import json
 44import ijson
 45import dataclasses
 46import itertools
 47
 48import boto3
 49
 50
 51s3_client = boto3.client("s3")
 52
 53
 54def split_s3_uri(uri: str) -> T.Tuple[str, str]:
 55    parts = uri.split("/", 3)
 56    bucket = parts[2]
 57    key = parts[3]
 58    return bucket, key
 59
 60
 61def get_object(uri: str):
 62    bucket, key = split_s3_uri(uri)
 63    return s3_client.get_object(
 64        Bucket=bucket,
 65        Key=key,
 66    )
 67
 68
 69def put_object(uri: str, body):
 70    bucket, key = split_s3_uri(uri)
 71    return s3_client.put_object(
 72        Bucket=bucket,
 73        Key=key,
 74        Body=body,
 75    )
 76
 77
 78def delete_node(
 79    s3uri: str,
 80    json_path: str,
 81) -> dict:
 82    """
 83    Read json file from s3, delete node at certain json path, and return the
 84    json data with the node deleted.
 85
 86    Example::
 87
 88        # example 1
 89        >>> input_data = {
 90        ...     "id": 1,
 91        ...     "delete": [],
 92        ... },
 93        >>> json_path = "delete"
 94        >>> print(output_data)
 95        {
 96            "id": 1
 97        },
 98
 99        # example 2
100        >>> input_data ={
101        ...     "id": 1,
102        ...     "a": {
103        ...         "delete": []
104        ...     },
105        ... }
106        >>> json_path = "a.delete"
107        >>> print(output_data)
108        {
109            "id": 1,
110            "a": {}
111        }
112
113        # example 3
114        >>> input_data ={
115        ...     "id": 1,
116        ...     "a": {
117        ...         "a_value": 2,
118        ...         "b": {
119        ...             "delete": [],
120        ...             "b_value": 3,
121        ...         },
122        ...     },
123        ... }
124        >>> json_path = "a.b.delete"
125        >>> print(output_data)
126        {
127            "id": 1,
128            "a": {
129                "a_value": 2,
130                "b": {
131                    "b_value": 3
132                }
133            }
134        }
135    """
136    parts = json_path.split(".")
137    prefix_and_key_pairs = []
138    lst = list()
139    for part in parts:
140        prefix = ".".join(lst)
141        key = part
142        prefix_and_key_pairs.append((prefix, key))
143        lst.append(part)
144
145    new_data = dict()
146    parent_data = new_data
147    for prefix, key in prefix_and_key_pairs:
148        # print(f"------ prefix = {prefix}, key = {key} ------")
149        data = dict()
150        with get_object(s3uri)["Body"] as f_in:
151            for k, v in ijson.kvitems(f_in, prefix):
152                if k != key:
153                    data[k] = v
154        if prefix == "":
155            new_data = data
156        else:
157            parent_data[prefix.split(".")[-1]] = data
158        parent_data = data
159        # print("new_data:", new_data)
160
161    return new_data
162
163
164def take(n: int, iterable: T.Iterable):
165    """
166    Return first n items of the iterable as a list
167    """
168    return list(itertools.islice(iterable, n))
169
170
171def split_json(
172    s3file_input: str,
173    s3dir_output: str,
174    json_path: str,
175    chunk_size: int,
176):
177    """
178    :param s3file_input: the s3 uri of the input JSON file
179    :param s3dir_output: the s3 uri of the output directory, it suppose to be empty
180    :param json_path: the json path in dot notation to the array you want to split
181    :param chunk_size: group items in the array into chunks of this size
182    """
183    json_path = json_path.strip(".")
184
185    s3file_data = f"{s3dir_output}data.json"
186    s3dir_arrays = f"{s3dir_output}arrays/"
187
188    # split the big json array into many small json arrays
189    with get_object(s3file_input)["Body"] as f_in:
190        iterator = ijson.items(f_in, f"{json_path}.item")
191        for ith in range(1, 1 + 999):
192            items = take(chunk_size, iterator)
193            s3path_output = f"{s3dir_arrays}{ith}.json"
194            if len(items) == 0:
195                break
196            else:
197                put_object(
198                    s3path_output,
199                    "\n".join([
200                        json.dumps(item)
201                        for item in items
202                    ])
203                )
204
205    # delete the big json array node from the original json
206    data = delete_node(s3uri=s3file_input, json_path=json_path)
207    put_object(s3file_data, json.dumps(data))
208
209
210@dataclasses.dataclass
211class Request:
212    """
213    Lambda request event
214
215    :param s3file_input: the s3 uri of the input JSON file
216    :param s3dir_output: the s3 uri of the output directory, it suppose to be empty
217    :param json_path: the json path in dot notation to the array you want to split
218    :param chunk_size: group items in the array into chunks of this size
219    """
220    s3file_input: str
221    s3dir_output: str
222    json_path: str
223    chunk_size: int
224
225
226def lambda_handler(event, context):
227    """
228    Example event::
229
230        {
231            "s3file_input": "s3://807388292768-us-east-1-data/tmp/data.json",
232            "s3dir_output": "s3://807388292768-us-east-1-data/tmp/output/",
233            "json_path": "data.records",
234            "chunk_size": 120
235        }
236    """
237    request = Request(**event)
238    split_json(
239        s3file_input=request.s3file_input,
240        s3dir_output=request.s3dir_output,
241        json_path=request.json_path,
242        chunk_size=request.chunk_size,
243    )
244    return {"statusCode": 200}