Lambda実行時間の最大15分制限より、15分以内に処理完了するために、同一Lambdaを並行で処理する対策を説明します。
- AWS Lambda 関数:動的並列数制御処理
- AWS Lambda 関数:業務処理
- AWS Step Functions ステートマシン
動的並列数制御処理に処理する全体のデータ量より、並列数、各並列起動で処理するデータを決めて、Step Functions の OutputPathに並列数と同じなサイズのJsonリストで返却します。
Map stateがJsonリストの要素ごとに、各並列起動情報を引数として、業務処理を起動します。
業務処理に振り分けされたそれぞれの処理対象を処理して、15分以内に完了するように実現できます。
全件105のデータを多重で処理して、一つ並行処理ジョブに最大10件を処理する場合、
以下の制御情報リストを作成とします。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| [
{
'oneJobProcessingCount': 10,
'offset': 0
},
{
'oneJobProcessingCount': 10,
'offset': 10
},
{
'oneJobProcessingCount': 10,
'offset': 20
}
・・・・・・
{
'oneJobProcessingCount': 5,
'offset': 100
}
]
|
下記は制御情報を返却する実装例です。
note 並行処理ジョブ数を指定することより、平均的に全てのデータを分割して、並行処理することも可能です。
note 一つ並行処理ジョブに、処理件数をできるだけ15分完了近くまでに調整すれば、Lambdaの利用料金を減らすことが可能です。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| import json
def lambda_handler(event, context):
totalProcessRecordCount = 105
oneJobProcessMaxCount = 10
jobCount = totalProcessRecordCount // oneJobProcessMaxCount
controlInfoList = []
for jobIndex in range(jobCount):
controlInfo = {
'oneJobProcessingCount': oneJobProcessMaxCount,
'offset': jobIndex * oneJobProcessMaxCount
}
controlInfoList.append(controlInfo)
if totalProcessRecordCount % oneJobProcessMaxCount != 0:
controlInfo = {
'oneJobProcessingCount': totalProcessRecordCount % oneJobProcessMaxCount,
'offset': jobCount * oneJobProcessMaxCount
}
controlInfoList.append(controlInfo)
return controlInfoList
|
下記は業務処理の実装例です。
並列実行しているLambdaに処理される対象レコードを返却する。
1
2
3
4
5
6
7
8
9
| def lambda_handler(event, context):
oneJobProcessingCount = int(event['oneJobProcessingCount'])
offset = int(event['offset'])
processed = []
for i in range(oneJobProcessingCount):
processed.append(offset + i)
return processed
|
動的並列数制御処理で、生成した並列情報リストをLoopして、非同期で業務処理を起動する。
Mapを利用して、作成したステートマシンは、以下のようなイメージです。
下記はステートマシンの記述です。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
| {
"Comment": "A dynamically parallel process example of the Amazon States Language using Map",
"StartAt": "dynamically-parallel-processing-control",
"States": {
"dynamically-parallel-processing-control": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"FunctionName": "arn:aws:lambda:ap-northeast-1:370382556331:function:dynamically-parallel-processing-control"
},
"Next": "dynamically-parallel-processing-iterator"
},
"dynamically-parallel-processing-iterator": {
"Type": "Map",
"InputPath": "$",
"ItemsPath": "$",
"MaxConcurrency": 0,
"Iterator": {
"StartAt": "dynamically-parallel-processing",
"States": {
"dynamically-parallel-processing": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": "arn:aws:lambda:ap-northeast-1:370382556331:function:business-logic-process"
},
"End": true
}
}
},
"End": true
}
}
}
|
- ステートマシンのログ出力より、動的並列数制御処理の並列数処理結果は、以下です。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
| {
"name": "dynamically-parallel-processing-control",
"output": [
{
"oneJobProcessingCount": 10,
"offset": 0
},
{
"oneJobProcessingCount": 10,
"offset": 10
},
{
"oneJobProcessingCount": 10,
"offset": 20
},
{
"oneJobProcessingCount": 10,
"offset": 30
},
{
"oneJobProcessingCount": 10,
"offset": 40
},
{
"oneJobProcessingCount": 10,
"offset": 50
},
{
"oneJobProcessingCount": 10,
"offset": 60
},
{
"oneJobProcessingCount": 10,
"offset": 70
},
{
"oneJobProcessingCount": 10,
"offset": 80
},
{
"oneJobProcessingCount": 10,
"offset": 90
},
{
"oneJobProcessingCount": 5,
"offset": 100
}
],
"outputDetails": {
"truncated": false
}
}
|
- 並列起動される業務処理の入力パラメーター例は以下となります。
1
2
3
4
5
6
7
8
9
10
11
12
13
| {
"name": "dynamically-parallel-processing",
"input": {
"ParallelIndex": 0,
"ParallelControlInfo": {
"oneJobProcessingCount": 10,
"offset": 0
}
},
"inputDetails": {
"truncated": false
}
}
|
- 並列で起動された業務処理の実行結果例は以下となります。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| {
"resourceType": "lambda",
"resource": "invoke",
"output": {
"ExecutedVersion": "$LATEST",
"Payload": [
0,
1,
2,
3,
4,
5,
6,
7,
8,
9
],
・・・
|
上記の簡単なサンプルより、MAPの利用で、Lambda処理を動的並行で処理されることを確認しました。
実際の業務開発に、並行処理が複数、処理前後依存などケースがあるかもしれないので、状況よりご活用ください。