Amazon DynamoDB is a serverless, NoSQL database service that allows you to develop modern applications at any scale. The Apache Beam Python I/O connector for Amazon DynamoDB (dynamodb_pyio
) aims to integrate with the database service by supporting source and sink connectors. Currently, the sink connector is available.
Installation
The connector can be installed from PyPI.
1pip install dynamodb_pyio
Usage
Sink Connector
It has the main composite transform (WriteToDynamoDB
), and it expects a list or tuple PCollection element. If the element is a tuple, the tuple’s first element is taken. If the element is not of the accepted types, you can apply the GroupIntoBatches
or BatchElements
transform beforehand. Then, the records of the element are written to a DynamoDB table with help of the batch_writer
of the boto3 package. Note that the batch writer will automatically handle buffering and sending items in batches. In addition, it will also automatically handle any unprocessed items and resend them as needed.
The transform also has an option that handles duplicate records - see the example below for more details.
- dedup_pkeys - List of keys to be used for deduplicating items in buffer.
Sink Connector Example
The example shows how to write records in batch to a DynamoDB table using the sink connector. The source can be found in the examples folder of the connector repository.
The pipeline begins with creating a configurable number records (default 500) where each record has two attributes (pk
and sk
). The attribute values are configured so that the first half of the records have incremental values while the remaining half have a single value of 1. Therefore, as the attributes are the hash and sort key of the table, we can expect only 250 records in the DynamoDB table if we configure 500 records. Moreover, in spite of duplicate values, we do not encounter an error if we specify the dedup_pkeys
value correctly.
After creating elements, we apply the BatchElements
transform where the minimum and maximum batch sizes are set to 100 and 200 respectively. Note that it prevents individual dictionary elements from being pushed into the WriteToDynamoDB
transform. Finally, batches of elements are written to the DynamoDB table using the WriteToDynamoDB
transform.
1# pipeline.py
2import argparse
3import decimal
4import logging
5
6import boto3
7from boto3.dynamodb.types import TypeDeserializer
8
9import apache_beam as beam
10from apache_beam.transforms.util import BatchElements
11from apache_beam.options.pipeline_options import PipelineOptions
12from apache_beam.options.pipeline_options import SetupOptions
13
14from dynamodb_pyio.io import WriteToDynamoDB
15
16TABLE_NAME = "dynamodb-pyio-test"
17
18
19def get_table(table_name):
20 resource = boto3.resource("dynamodb")
21 return resource.Table(table_name)
22
23
24def create_table(table_name):
25 client = boto3.client("dynamodb")
26 try:
27 client.describe_table(TableName=table_name)
28 table_exists = True
29 except Exception:
30 table_exists = False
31 if not table_exists:
32 print(">> create table...")
33 params = {
34 "TableName": table_name,
35 "KeySchema": [
36 {"AttributeName": "pk", "KeyType": "HASH"},
37 {"AttributeName": "sk", "KeyType": "RANGE"},
38 ],
39 "AttributeDefinitions": [
40 {"AttributeName": "pk", "AttributeType": "S"},
41 {"AttributeName": "sk", "AttributeType": "N"},
42 ],
43 "BillingMode": "PAY_PER_REQUEST",
44 }
45 client.create_table(**params)
46 get_table(table_name).wait_until_exists()
47
48
49def to_int_if_decimal(v):
50 try:
51 if isinstance(v, decimal.Decimal):
52 return int(v)
53 else:
54 return v
55 except Exception:
56 return v
57
58
59def scan_table(**kwargs):
60 client = boto3.client("dynamodb")
61 paginator = client.get_paginator("scan")
62 page_iterator = paginator.paginate(**kwargs)
63 items = []
64 for page in page_iterator:
65 for document in page["Items"]:
66 items.append(
67 {
68 k: to_int_if_decimal(TypeDeserializer().deserialize(v))
69 for k, v in document.items()
70 }
71 )
72 return sorted(items, key=lambda d: d["sk"])
73
74
75def truncate_table(table_name):
76 records = scan_table(TableName=TABLE_NAME)
77 table = get_table(table_name)
78 with table.batch_writer() as batch:
79 for record in records:
80 batch.delete_item(Key=record)
81
82
83def mask_secrets(d: dict):
84 return {k: (v if k.find("aws") < 0 else "x" * len(v)) for k, v in d.items()}
85
86
87def run(argv=None, save_main_session=True):
88 parser = argparse.ArgumentParser(description="Beam pipeline arguments")
89 parser.add_argument(
90 "--table_name", default=TABLE_NAME, type=str, help="DynamoDB table name"
91 )
92 parser.add_argument(
93 "--num_records", default="500", type=int, help="Number of records"
94 )
95 known_args, pipeline_args = parser.parse_known_args(argv)
96
97 pipeline_options = PipelineOptions(pipeline_args)
98 pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
99 print(f"known_args - {known_args}")
100 print(f"pipeline options - {mask_secrets(pipeline_options.display_data())}")
101
102 with beam.Pipeline(options=pipeline_options) as p:
103 (
104 p
105 | "CreateElements"
106 >> beam.Create(
107 [
108 {
109 "pk": str(int(1 if i >= known_args.num_records / 2 else i)),
110 "sk": int(1 if i >= known_args.num_records / 2 else i),
111 }
112 for i in range(known_args.num_records)
113 ]
114 )
115 | "BatchElements" >> BatchElements(min_batch_size=100, max_batch_size=200)
116 | "WriteToDynamoDB"
117 >> WriteToDynamoDB(
118 table_name=known_args.table_name, dedup_pkeys=["pk", "sk"]
119 )
120 )
121
122 logging.getLogger().setLevel(logging.INFO)
123 logging.info("Building pipeline ...")
124
125
126if __name__ == "__main__":
127 create_table(TABLE_NAME)
128 print(">> start pipeline...")
129 run()
130 print(">> check number of records...")
131 print(len(scan_table(TableName=TABLE_NAME)))
132 print(">> truncate table...")
133 truncate_table(TABLE_NAME)
We can run the pipeline on any runner that supports the Python SDK. Below shows an example of running the example pipeline on the Apache Flink Runner. Note that AWS related values (e.g. aws_access_key_id
) can be specified as pipeline arguments because the package has a dedicated pipeline option (DynamoDBOptions
) that parses them. Once the pipeline runs successfully, the script checks the number of records that are created by the connector followed by truncating the table.
1python examples/pipeline.py \
2 --runner=FlinkRunner \
3 --parallelism=1 \
4 --aws_access_key_id=$AWS_ACCESS_KEY_ID \
5 --aws_secret_access_key=$AWS_SECRET_ACCESS_KEY \
6 --region_name=$AWS_DEFAULT_REGION
1>> create table...
2>> start pipeline...
3known_args - Namespace(table_name='dynamodb-pyio-test', num_records=500)
4pipeline options - {'runner': 'FlinkRunner', 'save_main_session': True, 'parallelism': 1, 'aws_access_key_id': 'xxxxxxxxxxxxxxxxxxxx', 'aws_secret_access_key': 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx', 'region_name': 'us-east-1'}
5...
6INFO:root:total 100 elements processed...
7INFO:root:total 100 elements processed...
8INFO:root:total 200 elements processed...
9INFO:root:total 100 elements processed...
10INFO:root:BatchElements statistics: element_count=500 batch_count=4 next_batch_size=101 timings=[(100, 0.9067182540893555), (200, 1.815131664276123), (100, 0.22190475463867188)]
11...
12>> check number of records...
13...
14250
15>> truncate table...
Note that the following warning messages are printed if it installs the grpcio (1.65.x) package (see this GitHub issue). You can downgrade the package version to avoid those messages (e.g. pip install grpcio==1.64.1
).
1WARNING: All log messages before absl::InitializeLog() is called are written to STDERR
2I0000 00:00:1721526572.886302 78332 config.cc:230] gRPC experiments enabled: call_status_override_on_cancellation, event_engine_dns, event_engine_listener, http2_stats_fix, monitoring_experiment, pick_first_new, trace_record_callops, work_serializer_clears_time_cache
3I0000 00:00:1721526573.143589 78363 subchannel.cc:806] subchannel 0x7f4010001890 {address=ipv6:%5B::1%5D:58713, args={grpc.client_channel_factory=0x2ae79b0, grpc.default_authority=localhost:58713, grpc.internal.channel_credentials=0x297dda0, grpc.internal.client_channel_call_destination=0x7f407f3b23d0, grpc.internal.event_engine=0x7f4010013870, grpc.internal.security_connector=0x7f40100140e0, grpc.internal.subchannel_pool=0x21d3d00, grpc.max_receive_message_length=-1, grpc.max_send_message_length=-1, grpc.primary_user_agent=grpc-python/1.65.1, grpc.resource_quota=0x2a99310, grpc.server_uri=dns:///localhost:58713}}: connect failed (UNKNOWN:Failed to connect to remote host: connect: Connection refused (111) {created_time:"2024-07-21T11:49:33.143192444+10:00"}), backing off for 1000 ms
More Sink Connector Examples
More usage examples can be found in the unit testing cases. Some of them are covered here.
- The transform can process many records, thanks to the batch writer.
1def test_write_to_dynamodb_with_large_items(self):
2 # batch writer automatically handles buffering and sending items in batches
3 records = [{"pk": str(i), "sk": i} for i in range(5000)]
4 with TestPipeline(options=self.pipeline_opts) as p:
5 (p | beam.Create([records]) | WriteToDynamoDB(table_name=self.table_name))
6 self.assertListEqual(records, scan_table(TableName=self.table_name))
- Only the list or tuple types are supported PCollection elements. In the following example, individual dictionary elements are applied in the
WriteToDynamoDB
, and it raises theDynamoDBClientError
.
1def test_write_to_dynamodb_with_unsupported_record_type(self):
2 # supported types are list or tuple where the second element is a list!
3 records = [{"pk": str(i), "sk": i} for i in range(20)]
4 with self.assertRaises(DynamoDBClientError):
5 with TestPipeline(options=self.pipeline_opts) as p:
6 (p | beam.Create(records) | WriteToDynamoDB(table_name=self.table_name))
- Incorrect key attribute data types throws an error.
1def test_write_to_dynamodb_with_wrong_data_type(self):
2 # pk and sk should be string and number respectively
3 records = [{"pk": i, "sk": str(i)} for i in range(20)]
4 with self.assertRaises(DynamoDBClientError):
5 with TestPipeline(options=self.pipeline_opts) as p:
6 (
7 p
8 | beam.Create([records])
9 | WriteToDynamoDB(table_name=self.table_name)
10 )
- Duplicate records are not allowed unless the
dedup_pkeys
value is correctly specified.
1def test_write_to_dynamodb_duplicate_records_without_dedup_keys(self):
2 records = [{"pk": str(1), "sk": 1} for _ in range(20)]
3 with self.assertRaises(DynamoDBClientError):
4 with TestPipeline(options=self.pipeline_opts) as p:
5 (
6 p
7 | beam.Create([records])
8 | WriteToDynamoDB(table_name=self.table_name)
9 )
10
11def test_write_to_dynamodb_duplicate_records_with_dedup_keys(self):
12 records = [{"pk": str(1), "sk": 1} for _ in range(20)]
13 with TestPipeline(options=self.pipeline_opts) as p:
14 (
15 p
16 | beam.Create([records])
17 | WriteToDynamoDB(table_name=self.table_name, dedup_pkeys=["pk", "sk"])
18 )
19 self.assertListEqual(records[:1], scan_table(TableName=self.table_name))
- We can control batches of elements further with the
BatchElements
orGroupIntoBatches
transform.
1def test_write_to_dynamodb_with_batch_elements(self):
2 records = [{"pk": str(i), "sk": i} for i in range(20)]
3 with TestPipeline(options=self.pipeline_opts) as p:
4 (
5 p
6 | beam.Create(records)
7 | BatchElements(min_batch_size=10, max_batch_size=10)
8 | WriteToDynamoDB(table_name=self.table_name)
9 )
10 self.assertListEqual(records, scan_table(TableName=self.table_name))
11
12def test_write_to_dynamodb_with_group_into_batches(self):
13 records = [(i % 2, {"pk": str(i), "sk": i}) for i in range(20)]
14 with TestPipeline(options=self.pipeline_opts) as p:
15 (
16 p
17 | beam.Create(records)
18 | GroupIntoBatches(batch_size=10)
19 | WriteToDynamoDB(table_name=self.table_name)
20 )
21 self.assertListEqual(
22 [r[1] for r in records], scan_table(TableName=self.table_name)
23 )