Process Super Big JSON File#
有的时候我们需要处理体积巨大的 JSON 文件. 如果是 multi line JSON 文件, 那事情就比较简单了, 你可以轻易的遍历所有行, 只是花点时间而已. 而如果是一个巨大的 Object, 一个 array 最终是 nest 在很深的地方, 并且整个 JSON 仅仅只有 1 行, 这就比较难办了.
主要思路
一般一个 JSON 之所以大, 通常是有某一个 Array 里面有非常多的 item. 或者一个 Dictionary 里面有很多很多的 Key (1, 2, 3, …). 所以我们只要能将这些 Item 分拆出来成为小文件, 然后把剩下的数据写入到另外一个文件.
工具
ijson 是一个基于 C 的 Python 库, 能顺序的读入字节流来解析 JSON, 这样能节约非常多的内存. 基于 ijson 库, 我们能定位到有很多 item 的 Array 节点, 然后将这些数据拆分出来. 换言之, 我们用额外的时间节约了内存空间.
下面是两个脚本, 分别是处理本地文件, 和处理在 S3 上的文件的版本.
1# -*- coding: utf-8 -*-
2
3"""
4This is a solution can split large JSON file into smaller chunks (if there is a
5big array node) without using much memory.
6
7Use case 1:
8
9Input::
10
11 {
12 "id": 1,
13 "records": [
14 {"key": 1},
15 {"key": 2},
16 {"key": 3},
17 ...
18 ],
19 "name: "alice"
20 }
21
22Output::
23
24 # data.json
25 {
26 "id": 1,
27 "name: "alice"
28 }
29
30 # arrays/1.json
31 [
32 {"key": 1},
33 {"key": 2},
34 ...
35 {"key": 10}
36 ]
37
38 # arrays/2.json
39 [
40 {"key": 11},
41 {"key": 12},
42 ...
43 {"key": 20}
44 ]
45
46 # arrays/10.json
47 [
48 {"key": 91},
49 {"key": 92},
50 ...
51 {"key": 100}
52 ]
53
54Use case 2:
55
56Input::
57
58 {
59 "id": 1,
60 "data": {
61 "date": "2000-01-01",
62 "records": [
63 {"key": 1},
64 {"key": 2},
65 {"key": 3},
66 ...
67 ],
68 },
69 "name: "alice"
70 }
71
72Output::
73
74 # data.json
75 {
76 "id": 1,
77 "data": {
78 "date": "2000-01-01",
79 },
80 "name: "alice"
81 }
82
83 # arrays/1.json
84 [
85 {"key": 1},
86 {"key": 2},
87 ...
88 {"key": 10}
89 ]
90
91 # arrays/2.json
92 [
93 {"key": 11},
94 {"key": 12},
95 ...
96 {"key": 20}
97 ]
98
99 # arrays/10.json
100 [
101 {"key": 91},
102 {"key": 92},
103 ...
104 {"key": 100}
105 ]
106
107Benchmark result:
108
109- Input file: 1G, split into 10 files
110- Macbook pro 1: 32G memory + SSD
111- Time: 12s
112"""
113
114import typing as T
115import json
116import ijson
117import shutil
118import itertools
119from pathlib import Path
120from datetime import datetime
121
122dir_here = Path(__file__).parent
123
124
125def make_data(path: Path):
126 # n_records = 10
127 # str_length = 10
128
129 n_records = 1000
130 str_length = 1000000
131
132 data = {
133 "id": 1,
134 "data": {
135 "date": "2000-01-01",
136 "records": [
137 {"k": i, "v": "a" * str_length} for i in range(1, 1 + n_records)
138 ],
139 },
140 "name": "alice",
141 }
142
143 # data = {
144 # "id": 1,
145 # "records": [
146 # {"k": i, "v": "a" * str_length}
147 # for i in range(1, 1 + n_file)
148 # ],
149 # "name": "alice",
150 # }
151
152 with path.open("w") as f:
153 json.dump(data, f)
154
155
156def delete_node(
157 p_in: Path,
158 json_path: str,
159) -> dict:
160 """
161 Example::
162
163 # example 1
164 >>> input_data = {
165 ... "id": 1,
166 ... "delete": [],
167 ... },
168 >>> json_path = "delete"
169 >>> print(output_data)
170 {
171 "id": 1
172 },
173
174 # example 2
175 >>> input_data ={
176 ... "id": 1,
177 ... "a": {
178 ... "delete": []
179 ... },
180 ... }
181 >>> json_path = "a.delete"
182 >>> print(output_data)
183 {
184 "id": 1,
185 "a": {}
186 }
187
188 # example 3
189 >>> input_data ={
190 ... "id": 1,
191 ... "a": {
192 ... "a_value": 2,
193 ... "b": {
194 ... "delete": [],
195 ... "b_value": 3,
196 ... },
197 ... },
198 ... }
199 >>> json_path = "a.b.delete"
200 >>> print(output_data)
201 {
202 "id": 1,
203 "a": {
204 "a_value": 2,
205 "b": {
206 "b_value": 3
207 }
208 }
209 }
210 """
211 parts = json_path.split(".")
212 prefix_and_key_pairs = []
213 lst = list()
214 for part in parts:
215 prefix = ".".join(lst)
216 key = part
217 prefix_and_key_pairs.append((prefix, key))
218 lst.append(part)
219
220 new_data = dict()
221 parent_data = new_data
222 for prefix, key in prefix_and_key_pairs:
223 # print(f"------ prefix = {prefix}, key = {key} ------")
224 data = dict()
225 with p_in.open("r") as f_in:
226 for k, v in ijson.kvitems(f_in, prefix):
227 if k != key:
228 data[k] = v
229 if prefix == "":
230 new_data = data
231 else:
232 parent_data[prefix.split(".")[-1]] = data
233 parent_data = data
234 # print("new_data:", new_data)
235
236 return new_data
237
238
239def take(n: int, iterable: T.Iterable):
240 """
241 Return first n items of the iterable as a list
242 """
243 return list(itertools.islice(iterable, n))
244
245
246def split_json(
247 p_in: Path,
248 dir_out: Path,
249 json_path: str,
250 chunk_size: int,
251):
252 """
253 :param p_in: input data path
254 :param dir_out: output data directory, it should not exist
255 :param json_path: the json path in dot notation to the array you want to split
256 :param chunk_size: group items in the array into chunks of this size
257 """
258 if dir_out.exists():
259 raise FileExistsError(f"{dir_out} already exists")
260
261 path_data = dir_out.joinpath("data.json")
262 dir_arrays = dir_out.joinpath("arrays")
263 dir_arrays.mkdir(parents=True)
264
265 # split the big json array into many small json arrays
266 with p_in.open("r") as f_in:
267 iterator = ijson.items(f_in, f"{json_path}.item")
268 for ith in range(1, 1 + 999):
269 items = take(chunk_size, iterator)
270 path_out = dir_arrays.joinpath(f"{ith}.json")
271 if len(items) == 0:
272 break
273 else:
274 with path_out.open("w") as f_out:
275 json.dump(items, f_out)
276
277 # delete the big json array node from the original json
278 data = delete_node(p_in=p_in, json_path=json_path)
279 with path_data.open("w") as f_out:
280 json.dump(data, f_out)
281
282
283if __name__ == "__main__":
284 dir_output = dir_here / "output"
285 path_data = dir_here / "data.json"
286
287 def test_delete_node():
288 path = dir_here.joinpath("test_delete_node.json")
289 input_output_jsonpath = [
290 (
291 {
292 "id": 1,
293 "delete": [],
294 },
295 {
296 "id": 1,
297 },
298 "delete",
299 ),
300 (
301 {
302 "id": 1,
303 "a": {"delete": []},
304 },
305 {
306 "id": 1,
307 "a": {},
308 },
309 "a.delete",
310 ),
311 (
312 {
313 "id": 1,
314 "a": {
315 "a_value": 2,
316 "b": {
317 "delete": [],
318 "b_value": 3,
319 },
320 },
321 },
322 {
323 "id": 1,
324 "a": {
325 "a_value": 2,
326 "b": {
327 "b_value": 3,
328 },
329 },
330 },
331 "a.b.delete",
332 ),
333 ]
334 for input_data, output_data, jsonpath in input_output_jsonpath:
335 path.write_text(json.dumps(input_data))
336 result = delete_node(path, json_path=jsonpath)
337 assert result == output_data
338
339 def test_split_json():
340 shutil.rmtree(dir_output, ignore_errors=True)
341 st = datetime.utcnow()
342 split_json(
343 p_in=path_data,
344 dir_out=dir_output,
345 json_path="data.records",
346 chunk_size=120,
347 )
348 et = datetime.utcnow()
349 elapse = (et - st).total_seconds()
350 print(f"elapsed time: {elapse:.2f} seconds")
351
352 # test_delete_node()
353 # make_data(path_data)
354 # test_split_json()
1# -*- coding: utf-8 -*-
2
3"""
4This is a solution can split large JSON file into smaller chunks (if there is a
5big array node) without using much memory.
6
7Requirements::
8
9 ijson
10
11**Example 1**
12
13- Input file: 1G, split into 10 files
14- Memory: 10G
15
16if only split:
17
18- Duration: 25s
19- Max Memory Used: 540 MB
20
21if also delete the node
22
23- Duration: 50s
24- Max Memory Used: 1037 MB
25
26**Example 2**
27
28- Input file: 1G, split into 10 files
29- Memory: 2G
30
31if only split:
32
33- Duration: 25s
34- Max Memory Used: 540 MB
35
36if also delete the node
37
38- Duration: 50s
39- Max Memory Used: 1037 MB
40"""
41
42import typing as T
43import json
44import ijson
45import dataclasses
46import itertools
47
48import boto3
49
50
51s3_client = boto3.client("s3")
52
53
54def split_s3_uri(uri: str) -> T.Tuple[str, str]:
55 parts = uri.split("/", 3)
56 bucket = parts[2]
57 key = parts[3]
58 return bucket, key
59
60
61def get_object(uri: str):
62 bucket, key = split_s3_uri(uri)
63 return s3_client.get_object(
64 Bucket=bucket,
65 Key=key,
66 )
67
68
69def put_object(uri: str, body):
70 bucket, key = split_s3_uri(uri)
71 return s3_client.put_object(
72 Bucket=bucket,
73 Key=key,
74 Body=body,
75 )
76
77
78def delete_node(
79 s3uri: str,
80 json_path: str,
81) -> dict:
82 """
83 Read json file from s3, delete node at certain json path, and return the
84 json data with the node deleted.
85
86 Example::
87
88 # example 1
89 >>> input_data = {
90 ... "id": 1,
91 ... "delete": [],
92 ... },
93 >>> json_path = "delete"
94 >>> print(output_data)
95 {
96 "id": 1
97 },
98
99 # example 2
100 >>> input_data ={
101 ... "id": 1,
102 ... "a": {
103 ... "delete": []
104 ... },
105 ... }
106 >>> json_path = "a.delete"
107 >>> print(output_data)
108 {
109 "id": 1,
110 "a": {}
111 }
112
113 # example 3
114 >>> input_data ={
115 ... "id": 1,
116 ... "a": {
117 ... "a_value": 2,
118 ... "b": {
119 ... "delete": [],
120 ... "b_value": 3,
121 ... },
122 ... },
123 ... }
124 >>> json_path = "a.b.delete"
125 >>> print(output_data)
126 {
127 "id": 1,
128 "a": {
129 "a_value": 2,
130 "b": {
131 "b_value": 3
132 }
133 }
134 }
135 """
136 parts = json_path.split(".")
137 prefix_and_key_pairs = []
138 lst = list()
139 for part in parts:
140 prefix = ".".join(lst)
141 key = part
142 prefix_and_key_pairs.append((prefix, key))
143 lst.append(part)
144
145 new_data = dict()
146 parent_data = new_data
147 for prefix, key in prefix_and_key_pairs:
148 # print(f"------ prefix = {prefix}, key = {key} ------")
149 data = dict()
150 with get_object(s3uri)["Body"] as f_in:
151 for k, v in ijson.kvitems(f_in, prefix):
152 if k != key:
153 data[k] = v
154 if prefix == "":
155 new_data = data
156 else:
157 parent_data[prefix.split(".")[-1]] = data
158 parent_data = data
159 # print("new_data:", new_data)
160
161 return new_data
162
163
164def take(n: int, iterable: T.Iterable):
165 """
166 Return first n items of the iterable as a list
167 """
168 return list(itertools.islice(iterable, n))
169
170
171def split_json(
172 s3file_input: str,
173 s3dir_output: str,
174 json_path: str,
175 chunk_size: int,
176):
177 """
178 :param s3file_input: the s3 uri of the input JSON file
179 :param s3dir_output: the s3 uri of the output directory, it suppose to be empty
180 :param json_path: the json path in dot notation to the array you want to split
181 :param chunk_size: group items in the array into chunks of this size
182 """
183 json_path = json_path.strip(".")
184
185 s3file_data = f"{s3dir_output}data.json"
186 s3dir_arrays = f"{s3dir_output}arrays/"
187
188 # split the big json array into many small json arrays
189 with get_object(s3file_input)["Body"] as f_in:
190 iterator = ijson.items(f_in, f"{json_path}.item")
191 for ith in range(1, 1 + 999):
192 items = take(chunk_size, iterator)
193 s3path_output = f"{s3dir_arrays}{ith}.json"
194 if len(items) == 0:
195 break
196 else:
197 put_object(
198 s3path_output,
199 "\n".join([
200 json.dumps(item)
201 for item in items
202 ])
203 )
204
205 # delete the big json array node from the original json
206 data = delete_node(s3uri=s3file_input, json_path=json_path)
207 put_object(s3file_data, json.dumps(data))
208
209
210@dataclasses.dataclass
211class Request:
212 """
213 Lambda request event
214
215 :param s3file_input: the s3 uri of the input JSON file
216 :param s3dir_output: the s3 uri of the output directory, it suppose to be empty
217 :param json_path: the json path in dot notation to the array you want to split
218 :param chunk_size: group items in the array into chunks of this size
219 """
220 s3file_input: str
221 s3dir_output: str
222 json_path: str
223 chunk_size: int
224
225
226def lambda_handler(event, context):
227 """
228 Example event::
229
230 {
231 "s3file_input": "s3://807388292768-us-east-1-data/tmp/data.json",
232 "s3dir_output": "s3://807388292768-us-east-1-data/tmp/output/",
233 "json_path": "data.records",
234 "chunk_size": 120
235 }
236 """
237 request = Request(**event)
238 split_json(
239 s3file_input=request.s3file_input,
240 s3dir_output=request.s3dir_output,
241 json_path=request.json_path,
242 chunk_size=request.chunk_size,
243 )
244 return {"statusCode": 200}