Loading
BLOG 開発者ブログ

2024年6月24日

DynamoDB Streams & Lambdaのローカル環境を作成する

DynamoDB Streams & Lambdaのローカル環境を作成する

Amazon DynamoDBには、DynamoDB Streamsというデータ更新を契機にAWSの各種リソースと連携する機能があります。
今回は、DynamoDB Streamsを用いたシステムをAWS上に開発するにあたり、ローカルの開発環境を組んでみます。

目次

 

はじめに

こんにちは。
クラウドソリューション第二グループのimai.kです。

業務でDynamoDB Streamsをコアとしたシステムを開発するにあたり、クラウド上の処理をローカルで再現する必要がありました。
そのため、実務で経験した内容をアウトプットしたいと思い、記載します。

今回のゴールは、以下のような構成です。

図の通りではありますが、実現したいことはDynamoDBのデータ変更を起点に、Lambdaを起動することにあります。

実際のAWS環境であれば、DynamoDB StreamとLambdaを接続することで比較的簡易に実現可能ですが、同様の仕組みをローカルで実現する場合、Streamsをポーリング・監視する仕組みを構築する必要があり、工夫が必要です。

では、やっていきます。

DynamoDB Localのインストールと起動

まずはDynamoDB Localをインストールしましょう。

インストール方法は、公式のページを参照してください。

インストール方法にもよりますが、DockerやJREの環境が必要です。別途ご準備ください。
なお、後ほどDockerを利用するため、ご利用の環境に合わせてDocker Desktopのインストールを行っておいてください。

DynamoDB Localのインストールができたら、立ち上げを行なっておきましょう。

以下の例では、jarのDynamoDB Localを起動しています。


$ java -Djava.library.path=./DynamoDBLocal_lib -jar DynamoDBLocal.jar -sharedDb


Initializing DynamoDB Local with the following configuration:
Port: 8000
InMemory: false
Version: 2.3.0
DbPath: null
SharedDb: true
shouldDelayTransientStatuses: false
CorsParams: null

localhostのポート8000で動作していますね!

AWS SAMのインストールとセットアップ

次に、Streamsの変更を契機に起動するローカルのLambda関数を作成するために、AWS SAMをインストールします。

インストール方法は、DynamoDB Local同様に公式のページを参照してください。

AWS SAMのインストールができたら、ローカルで動作するLambda関数を用意します。

簡単のために、AWS SAMのサンプルのプロジェクトを作成します。
基本的に、こちらのページを参考に作成します。

任意のディレクトリに移動して、以下のように実行します。
今回はランタイムにPython(3.12)を使用します。


$ sam init


You can preselect a particular runtime or package type when using the `sam init` experience.
Call `sam init --help` to learn more.


Which template source would you like to use?
1 - AWS Quick Start Templates
2 - Custom Template Location
Choice: 1


Choose an AWS Quick Start application template
1 - Hello World Example
2 - Data processing
3 - Hello World Example with Powertools for AWS Lambda
4 - Multi-step workflow
5 - Scheduled task
6 - Standalone function
7 - Serverless API
8 - Infrastructure event management
9 - Lambda Response Streaming
10 - Serverless Connector Hello World Example
11 - Multi-step workflow with Connectors
12 - GraphQLApi Hello World Example
13 - Full Stack
14 - Lambda EFS example
15 - Hello World Example With Powertools for AWS Lambda
16 - DynamoDB Example
17 - Machine Learning
Template: 1


Use the most popular runtime and package type? (Python and zip) [y/N]:


Which runtime would you like to use?
1 - aot.dotnet7 (provided.al2)
2 - dotnet8
3 - dotnet6
4 - go1.x
5 - go (provided.al2)
6 - go (provided.al2023)
7 - graalvm.java11 (provided.al2)
8 - graalvm.java17 (provided.al2)
9 - java21
10 - java17
11 - java11
12 - java8.al2
13 - nodejs20.x
14 - nodejs18.x
15 - nodejs16.x
16 - python3.9
17 - python3.8
18 - python3.12
19 - python3.11
20 - python3.10
21 - ruby3.2
22 - rust (provided.al2)
23 - rust (provided.al2023)
Runtime: 18


What package type would you like to use?
1 - Zip
2 - Image
Package type: 1


Based on your selections, the only dependency manager available is pip.
We will proceed copying the template using pip.


Would you like to enable X-Ray tracing on the function(s) in your application? [y/N]: N


Would you like to enable monitoring using CloudWatch Application Insights?
For more info, please view https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/cloudwatch-application-insights.html [y/N]: N


Would you like to set Structured Logging in JSON format on your Lambda functions? [y/N]: N


Project name [sam-app]:


-----------------------
Generating application:
-----------------------
Name: sam-app
Runtime: python3.12
Architectures: x86_64
Dependency Manager: pip
Application Template: hello-world
Output Directory: .
Configuration file: sam-app/samconfig.toml


Next steps can be found in the README file at sam-app/README.md




Commands you can use next
=========================
[*] Create pipeline: cd sam-app && sam pipeline init --bootstrap
[*] Validate SAM template: cd sam-app && sam validate
[*] Test Function in the Cloud: cd sam-app && sam sync --stack-name {stack-name} --watch

サンプルアプリが作成できたら、アプリケーションの動作確認をしておきましょう。


$ cd sam-app
$ sam local invoke

初めて実行するときはDockerコンテナのビルドが走るため、少々時間がかかりますが、以下のようにメッセージが表示されれば問題ありません。


{"statusCode": 200, "body": "{\"message\": \"hello world!\"}"}

DynamoDB Streams変更監視スクリプトの追加

では、今回の構成のコアになる変更監視スクリプトを作成・配置していきます。

上記で作成したsam-app直下に、以下スクリプト(stream_observer.py)を配置します。


import boto3
import time
import subprocess


table = 'test'
dynamodb = boto3.client('dynamodb', endpoint_url='http://localhost:8000')
dynamodb_streams = boto3.client('dynamodbstreams', endpoint_url='http://localhost:8000')


latest_stream_arn = dynamodb.describe_table(TableName=table)['Table']['LatestStreamArn']
shards = dynamodb_streams.describe_stream(StreamArn=latest_stream_arn)['StreamDescription']['Shards']


shard_iterators = {}
for shard in shards:
shard_id = shard['ShardId']
shard_iterators[shard_id] = dynamodb_streams.get_shard_iterator(
StreamArn=latest_stream_arn,
ShardId=shard_id,
ShardIteratorType='LATEST',
)['ShardIterator']


def invoke_function(records):
print(records)


# AWS SAMで作成した関数をローカルで呼び出す
subprocess.call(["sam", "local", "invoke"])


while True:
if shard_iterators == []:
break


for shard_id, iterator in [*shard_iterators.items()]:
if iterator is None:
del shard_iterators[shard_id]
continue


get_records_result = dynamodb_streams.get_records(ShardIterator=iterator)
next_shard_iterator = get_records_result.get('NextShardIterator')


records = get_records_result['Records']


while next_shard_iterator == iterator:
get_records_result = dynamodb_streams.get_records(ShardIterator=next_shard_iterator)
next_shard_iterator = get_records_result.get('NextShardIterator')
records.append(get_records_result['Records'])


shard_iterators[shard_id] = next_shard_iterator
if records == []:
continue


invoke_function(records)


time.sleep(5)

DynamoDBにテーブル追加 & Streamsの有効化

次に、DynamoDBにテーブルを作成し、作成したテーブルのStreamsを有効化しましょう。

今回は、NoSQL WorkbenchからDynamoDBのローカル環境にテーブルを作ってみます。

NoSQL Workbenchの導入につきましては、ぜひこちらの記事をご参照ください。

では、簡単のために、NoSQL Workbenchで用意されているSampleを利用していきます。

NoSQL Workbenchのトップページにある「Sample data models」から「AWS Discussion Forum Data Model」を選択し、「Import」してみましょう。

インポートすると「Data modeler」の画面に遷移すると思います。

次に、「Visualize data model」を選択し、「Visualizer」の画面に遷移しましょう。

では、サンプルのテーブル・データをDynamoDB Localにコミットしていきます。
「Commit to Amazon DynamoDB」を選択しましょう。

以下のようなダイアログが表示されると思いますので、「Add a new DynamoDB local connection」から、コネクションを作成しましょう。
今回は、localhostの8000番ポートに対してコネクションを作成し、コミットします。

コミットに成功すると、成功のダイアログが表示されると思います。

DynamoDB Localにテーブルが作成できたら、Streamsの有効化を行いましょう。

今回は、該当のテーブルのStreamsを有効化するPythonのスクリプトで有効化します。

任意のディレクトリに以下スクリプト(enable_stream.py)を配置します。


import boto3

dynamodb = boto3.client('dynamodb', endpoint_url='http://localhost:8000')

# NoSQL Workbenchで作成したテーブルのStreamを有効化(一度実行すれば良い)
response = dynamodb.update_table(TableName='Forum', StreamSpecification={
'StreamEnabled': True,
'StreamViewType': 'NEW_IMAGE'
})

配置できたら、スクリプトを実行しておきましょう。


$ python enable_stream.py

動作確認

ここまでの作業で、下準備が整いましたので、動作確認を行います。

まずは、先ほど作成したstream_observer.pyを起動しておきます。


$ python stream_observer.py

次に、NoSQL Workbenchからテーブルのデータを更新してみます。

「Operation builder」から先ほど作成したコネクションを使用して、DynamoDB Localに接続します。

接続できたら、以下のようにデータを操作してみましょう。

データが更新されると、stream_observer.py側で以下のように出力されると思います。


python stream_observer.py


[{'eventID': '71ee59c3-6170-4a4a-8bef-013823613f8e', 'eventName': 'MODIFY', 'eventVersion': '1.1', 'eventSource': 'aws:dynamodb', 'awsRegion': 'ddblocal', 'dynamodb': {'ApproximateCreationDateTime': datetime.datetime(2024, 6, 19, 19, 44, tzinfo=tzlocal()), 'Keys': {'ForumName': {'S': 'AWS Data Pipeline'}}, 'NewImage': {'Threads': {'N': '20'}, 'Category': {'S': 'Amazon Web Services'}, 'Messages': {'N': '9'}, 'Views': {'N': '500'}, 'ForumName': {'S': 'AWS Data Pipeline'}}, 'SequenceNumber': '000000000000000000040', 'SizeBytes': 105, 'StreamViewType': 'NEW_IMAGE'}}]
Invoking app.lambda_handler (python3.12)
Local image is out of date and will be updated to the latest runtime. To skip this, pass in the
parameter --skip-pull-image
Building image.......................................................................................
.....................................................................................................
.....................................................................................................
..........................................................................
Using local image: public.ecr.aws/lambda/python:3.12-rapid-x86_64.

Mounting /Users/imai.k/Documents/Isoroot/engineeer_blog/DynamoDB Streams With NoSQL
Workbench/sam-app/hello_world as /var/task:ro,delegated, inside runtime container
START RequestId: 5be7f9d9-47a9-4013-917d-26bd98db748e Version: $LATEST
END RequestId: c7280d65-713d-4b5a-a0fb-9a9274729879
REPORT RequestId: c7280d65-713d-4b5a-a0fb-9a9274729879 Init Duration: 0.87 ms Duration: 485.93 ms Billed Duration: 486 ms Memory Size: 128 MB Max Memory Used: 128 MB
{"statusCode": 200, "body": "{\"message\": \"hello world\"}"}

さいごに

DynamoDBの更新を契機に、Lambda関数を起動するフローをローカルに構築することができました。
NoSQL Workbenchを利用すると、DynamoDBのデータ更新をGUIで操作できるため、何かと便利だと思います。

以上です。


imai.kのブログ