Loading
BLOG 開発者ブログ

2024年6月24日

DynamoDB Streams & Lambdaのローカル環境を作成する

DynamoDB Streams & Lambdaのローカル環境を作成する

Amazon DynamoDBには、DynamoDB Streamsというデータ更新を契機にAWSの各種リソースと連携する機能があります。
今回は、DynamoDB Streamsを用いたシステムをAWS上に開発するにあたり、ローカルの開発環境を組んでみます。

目次

 

はじめに

こんにちは。
クラウドソリューション第二グループのimai.kです。

業務でDynamoDB Streamsをコアとしたシステムを開発するにあたり、クラウド上の処理をローカルで再現する必要がありました。
そのため、実務で経験した内容をアウトプットしたいと思い、記載します。

今回のゴールは、以下のような構成です。

図の通りではありますが、実現したいことはDynamoDBのデータ変更を起点に、Lambdaを起動することにあります。

実際のAWS環境であれば、DynamoDB StreamとLambdaを接続することで比較的簡易に実現可能ですが、同様の仕組みをローカルで実現する場合、Streamsをポーリング・監視する仕組みを構築する必要があり、工夫が必要です。

では、やっていきます。

DynamoDB Localのインストールと起動

まずはDynamoDB Localをインストールしましょう。

インストール方法は、公式のページを参照してください。

インストール方法にもよりますが、DockerやJREの環境が必要です。別途ご準備ください。
なお、後ほどDockerを利用するため、ご利用の環境に合わせてDocker Desktopのインストールを行っておいてください。

DynamoDB Localのインストールができたら、立ち上げを行なっておきましょう。

以下の例では、jarのDynamoDB Localを起動しています。


$ java -Djava.library.path=./DynamoDBLocal_lib -jar DynamoDBLocal.jar -sharedDb

Initializing DynamoDB Local with the following configuration:
Port:	8000
InMemory:	false
Version:	2.3.0
DbPath:	null
SharedDb:	true
shouldDelayTransientStatuses:	false
CorsParams:	null

localhostのポート8000で動作していますね!

AWS SAMのインストールとセットアップ

次に、Streamsの変更を契機に起動するローカルのLambda関数を作成するために、AWS SAMをインストールします。

インストール方法は、DynamoDB Local同様に公式のページを参照してください。

AWS SAMのインストールができたら、ローカルで動作するLambda関数を用意します。

簡単のために、AWS SAMのサンプルのプロジェクトを作成します。
基本的に、こちらのページを参考に作成します。

任意のディレクトリに移動して、以下のように実行します。
今回はランタイムにPython(3.12)を使用します。


$ sam init

You can preselect a particular runtime or package type when using the `sam init` experience.
Call `sam init --help` to learn more.

Which template source would you like to use?
	1 - AWS Quick Start Templates
	2 - Custom Template Location
Choice: 1

Choose an AWS Quick Start application template
	1 - Hello World Example
	2 - Data processing
	3 - Hello World Example with Powertools for AWS Lambda
	4 - Multi-step workflow
	5 - Scheduled task
	6 - Standalone function
	7 - Serverless API
	8 - Infrastructure event management
	9 - Lambda Response Streaming
	10 - Serverless Connector Hello World Example
	11 - Multi-step workflow with Connectors
	12 - GraphQLApi Hello World Example
	13 - Full Stack
	14 - Lambda EFS example
	15 - Hello World Example With Powertools for AWS Lambda
	16 - DynamoDB Example
	17 - Machine Learning
Template: 1

Use the most popular runtime and package type? (Python and zip) [y/N]:

Which runtime would you like to use?
	1 - aot.dotnet7 (provided.al2)
	2 - dotnet8
	3 - dotnet6
	4 - go1.x
	5 - go (provided.al2)
	6 - go (provided.al2023)
	7 - graalvm.java11 (provided.al2)
	8 - graalvm.java17 (provided.al2)
	9 - java21
	10 - java17
	11 - java11
	12 - java8.al2
	13 - nodejs20.x
	14 - nodejs18.x
	15 - nodejs16.x
	16 - python3.9
	17 - python3.8
	18 - python3.12
	19 - python3.11
	20 - python3.10
	21 - ruby3.2
	22 - rust (provided.al2)
	23 - rust (provided.al2023)
Runtime: 18

What package type would you like to use?
	1 - Zip
	2 - Image
Package type: 1

Based on your selections, the only dependency manager available is pip.
We will proceed copying the template using pip.

Would you like to enable X-Ray tracing on the function(s) in your application?  [y/N]: N

Would you like to enable monitoring using CloudWatch Application Insights?
For more info, please view https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/cloudwatch-application-insights.html [y/N]: N

Would you like to set Structured Logging in JSON format on your Lambda functions?  [y/N]: N

Project name [sam-app]:

    -----------------------
    Generating application:
    -----------------------
    Name: sam-app
    Runtime: python3.12
    Architectures: x86_64
    Dependency Manager: pip
    Application Template: hello-world
    Output Directory: .
    Configuration file: sam-app/samconfig.toml

    Next steps can be found in the README file at sam-app/README.md


Commands you can use next
=========================
[*] Create pipeline: cd sam-app && sam pipeline init --bootstrap
[*] Validate SAM template: cd sam-app && sam validate
[*] Test Function in the Cloud: cd sam-app && sam sync --stack-name {stack-name} --watch

サンプルアプリが作成できたら、アプリケーションの動作確認をしておきましょう。


$ cd sam-app
$ sam local invoke

初めて実行するときはDockerコンテナのビルドが走るため、少々時間がかかりますが、以下のようにメッセージが表示されれば問題ありません。


{"statusCode": 200, "body": "{\"message\": \"hello world!\"}"}

DynamoDB Streams変更監視スクリプトの追加

では、今回の構成のコアになる変更監視スクリプトを作成・配置していきます。

上記で作成したsam-app直下に、以下スクリプト(stream_observer.py)を配置します。


import boto3
import time
import subprocess

table = 'test'
dynamodb = boto3.client('dynamodb', endpoint_url='http://localhost:8000')
dynamodb_streams = boto3.client('dynamodbstreams', endpoint_url='http://localhost:8000')

latest_stream_arn = dynamodb.describe_table(TableName=table)['Table']['LatestStreamArn']
shards = dynamodb_streams.describe_stream(StreamArn=latest_stream_arn)['StreamDescription']['Shards']

shard_iterators = {}
for shard in shards:
	shard_id = shard['ShardId']
	shard_iterators[shard_id] = dynamodb_streams.get_shard_iterator(
		StreamArn=latest_stream_arn,
		ShardId=shard_id,
		ShardIteratorType='LATEST',
	)['ShardIterator']

def invoke_function(records):
	print(records)

	# AWS SAMで作成した関数をローカルで呼び出す
	subprocess.call(["sam", "local", "invoke"])

while True:
	if shard_iterators == []:
		break

	for shard_id, iterator in [*shard_iterators.items()]:
		if iterator is None:
			del shard_iterators[shard_id]
			continue

		get_records_result = dynamodb_streams.get_records(ShardIterator=iterator)
		next_shard_iterator = get_records_result.get('NextShardIterator')

		records = get_records_result['Records']

		while next_shard_iterator == iterator:
			get_records_result = dynamodb_streams.get_records(ShardIterator=next_shard_iterator)
			next_shard_iterator = get_records_result.get('NextShardIterator')
			records.append(get_records_result['Records'])

		shard_iterators[shard_id] = next_shard_iterator
		if records == []:
			continue

		invoke_function(records)

	time.sleep(5)

DynamoDBにテーブル追加 & Streamsの有効化

次に、DynamoDBにテーブルを作成し、作成したテーブルのStreamsを有効化しましょう。

今回は、NoSQL WorkbenchからDynamoDBのローカル環境にテーブルを作ってみます。

NoSQL Workbenchの導入につきましては、ぜひこちらの記事をご参照ください。

では、簡単のために、NoSQL Workbenchで用意されているSampleを利用していきます。

NoSQL Workbenchのトップページにある「Sample data models」から「AWS Discussion Forum Data Model」を選択し、「Import」してみましょう。

インポートすると「Data modeler」の画面に遷移すると思います。

次に、「Visualize data model」を選択し、「Visualizer」の画面に遷移しましょう。

では、サンプルのテーブル・データをDynamoDB Localにコミットしていきます。
「Commit to Amazon DynamoDB」を選択しましょう。

以下のようなダイアログが表示されると思いますので、「Add a new DynamoDB local connection」から、コネクションを作成しましょう。
今回は、localhostの8000番ポートに対してコネクションを作成し、コミットします。

コミットに成功すると、成功のダイアログが表示されると思います。

DynamoDB Localにテーブルが作成できたら、Streamsの有効化を行いましょう。

今回は、該当のテーブルのStreamsを有効化するPythonのスクリプトで有効化します。

任意のディレクトリに以下スクリプト(enable_stream.py)を配置します。


	import boto3
	dynamodb = boto3.client('dynamodb', endpoint_url='http://localhost:8000')

# NoSQL Workbenchで作成したテーブルのStreamを有効化(一度実行すれば良い) response = dynamodb.update_table(TableName='Forum', StreamSpecification={ 'StreamEnabled': True, 'StreamViewType': 'NEW_IMAGE' })

配置できたら、スクリプトを実行しておきましょう。


$ python enable_stream.py

動作確認

ここまでの作業で、下準備が整いましたので、動作確認を行います。

まずは、先ほど作成したstream_observer.pyを起動しておきます。


$ python stream_observer.py

次に、NoSQL Workbenchからテーブルのデータを更新してみます。

「Operation builder」から先ほど作成したコネクションを使用して、DynamoDB Localに接続します。

接続できたら、以下のようにデータを操作してみましょう。

データが更新されると、stream_observer.py側で以下のように出力されると思います。


	python stream_observer.py

	[{'eventID': '71ee59c3-6170-4a4a-8bef-013823613f8e', 'eventName': 'MODIFY', 'eventVersion': '1.1', 'eventSource': 'aws:dynamodb', 'awsRegion': 'ddblocal', 'dynamodb': {'ApproximateCreationDateTime': datetime.datetime(2024, 6, 19, 19, 44, tzinfo=tzlocal()), 'Keys': {'ForumName': {'S': 'AWS Data Pipeline'}}, 'NewImage': {'Threads': {'N': '20'}, 'Category': {'S': 'Amazon Web Services'}, 'Messages': {'N': '9'}, 'Views': {'N': '500'}, 'ForumName': {'S': 'AWS Data Pipeline'}}, 'SequenceNumber': '000000000000000000040', 'SizeBytes': 105, 'StreamViewType': 'NEW_IMAGE'}}]
	Invoking app.lambda_handler (python3.12)
	Local image is out of date and will be updated to the latest runtime. To skip this, pass in the
	parameter --skip-pull-image
	Building image.......................................................................................
	.....................................................................................................
	.....................................................................................................
	..........................................................................
	Using local image: public.ecr.aws/lambda/python:3.12-rapid-x86_64.

Mounting /Users/imai.k/Documents/Isoroot/engineeer_blog/DynamoDB Streams With NoSQL Workbench/sam-app/hello_world as /var/task:ro,delegated, inside runtime container START RequestId: 5be7f9d9-47a9-4013-917d-26bd98db748e Version: $LATEST END RequestId: c7280d65-713d-4b5a-a0fb-9a9274729879 REPORT RequestId: c7280d65-713d-4b5a-a0fb-9a9274729879 Init Duration: 0.87 ms Duration: 485.93 ms Billed Duration: 486 ms Memory Size: 128 MB Max Memory Used: 128 MB {"statusCode": 200, "body": "{\"message\": \"hello world\"}"}

さいごに

DynamoDBの更新を契機に、Lambda関数を起動するフローをローカルに構築することができました。
NoSQL Workbenchを利用すると、DynamoDBのデータ更新をGUIで操作できるため、何かと便利だと思います。

以上です。


imai.kのブログ