Amazon DynamoDB is a serverless, NoSQL database service that allows you to develop modern applications at any scale. The Apache Beam Python I/O connector for Amazon DynamoDB (dynamodb_pyio) aims to integrate with the database service by supporting source and sink connectors. Currently, the sink connector is available.

Installation

The connector can be installed from PyPI.

1pip install dynamodb_pyio

Usage

Sink Connector

It has the main composite transform (WriteToDynamoDB), and it expects a list or tuple PCollection element. If the element is a tuple, the tuple’s first element is taken. If the element is not of the accepted types, you can apply the GroupIntoBatches or BatchElements transform beforehand. Then, the records of the element are written to a DynamoDB table with help of the batch_writer of the boto3 package. Note that the batch writer will automatically handle buffering and sending items in batches. In addition, it will also automatically handle any unprocessed items and resend them as needed.

The transform also has an option that handles duplicate records - see the example below for more details.

  • dedup_pkeys - List of keys to be used for deduplicating items in buffer.

Sink Connector Example

The example shows how to write records in batch to a DynamoDB table using the sink connector. The source can be found in the examples folder of the connector repository.

The pipeline begins with creating a configurable number records (default 500) where each record has two attributes (pk and sk). The attribute values are configured so that the first half of the records have incremental values while the remaining half have a single value of 1. Therefore, as the attributes are the hash and sort key of the table, we can expect only 250 records in the DynamoDB table if we configure 500 records. Moreover, in spite of duplicate values, we do not encounter an error if we specify the dedup_pkeys value correctly.

After creating elements, we apply the BatchElements transform where the minimum and maximum batch sizes are set to 100 and 200 respectively. Note that it prevents individual dictionary elements from being pushed into the WriteToDynamoDB transform. Finally, batches of elements are written to the DynamoDB table using the WriteToDynamoDB transform.

  1# pipeline.py
  2import argparse
  3import decimal
  4import logging
  5
  6import boto3
  7from boto3.dynamodb.types import TypeDeserializer
  8
  9import apache_beam as beam
 10from apache_beam.transforms.util import BatchElements
 11from apache_beam.options.pipeline_options import PipelineOptions
 12from apache_beam.options.pipeline_options import SetupOptions
 13
 14from dynamodb_pyio.io import WriteToDynamoDB
 15
 16TABLE_NAME = "dynamodb-pyio-test"
 17
 18
 19def get_table(table_name):
 20    resource = boto3.resource("dynamodb")
 21    return resource.Table(table_name)
 22
 23
 24def create_table(table_name):
 25    client = boto3.client("dynamodb")
 26    try:
 27        client.describe_table(TableName=table_name)
 28        table_exists = True
 29    except Exception:
 30        table_exists = False
 31    if not table_exists:
 32        print(">> create table...")
 33        params = {
 34            "TableName": table_name,
 35            "KeySchema": [
 36                {"AttributeName": "pk", "KeyType": "HASH"},
 37                {"AttributeName": "sk", "KeyType": "RANGE"},
 38            ],
 39            "AttributeDefinitions": [
 40                {"AttributeName": "pk", "AttributeType": "S"},
 41                {"AttributeName": "sk", "AttributeType": "N"},
 42            ],
 43            "BillingMode": "PAY_PER_REQUEST",
 44        }
 45        client.create_table(**params)
 46        get_table(table_name).wait_until_exists()
 47
 48
 49def to_int_if_decimal(v):
 50    try:
 51        if isinstance(v, decimal.Decimal):
 52            return int(v)
 53        else:
 54            return v
 55    except Exception:
 56        return v
 57
 58
 59def scan_table(**kwargs):
 60    client = boto3.client("dynamodb")
 61    paginator = client.get_paginator("scan")
 62    page_iterator = paginator.paginate(**kwargs)
 63    items = []
 64    for page in page_iterator:
 65        for document in page["Items"]:
 66            items.append(
 67                {
 68                    k: to_int_if_decimal(TypeDeserializer().deserialize(v))
 69                    for k, v in document.items()
 70                }
 71            )
 72    return sorted(items, key=lambda d: d["sk"])
 73
 74
 75def truncate_table(table_name):
 76    records = scan_table(TableName=TABLE_NAME)
 77    table = get_table(table_name)
 78    with table.batch_writer() as batch:
 79        for record in records:
 80            batch.delete_item(Key=record)
 81
 82
 83def mask_secrets(d: dict):
 84    return {k: (v if k.find("aws") < 0 else "x" * len(v)) for k, v in d.items()}
 85
 86
 87def run(argv=None, save_main_session=True):
 88    parser = argparse.ArgumentParser(description="Beam pipeline arguments")
 89    parser.add_argument(
 90        "--table_name", default=TABLE_NAME, type=str, help="DynamoDB table name"
 91    )
 92    parser.add_argument(
 93        "--num_records", default="500", type=int, help="Number of records"
 94    )
 95    known_args, pipeline_args = parser.parse_known_args(argv)
 96
 97    pipeline_options = PipelineOptions(pipeline_args)
 98    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
 99    print(f"known_args - {known_args}")
100    print(f"pipeline options - {mask_secrets(pipeline_options.display_data())}")
101
102    with beam.Pipeline(options=pipeline_options) as p:
103        (
104            p
105            | "CreateElements"
106            >> beam.Create(
107                [
108                    {
109                        "pk": str(int(1 if i >= known_args.num_records / 2 else i)),
110                        "sk": int(1 if i >= known_args.num_records / 2 else i),
111                    }
112                    for i in range(known_args.num_records)
113                ]
114            )
115            | "BatchElements" >> BatchElements(min_batch_size=100, max_batch_size=200)
116            | "WriteToDynamoDB"
117            >> WriteToDynamoDB(
118                table_name=known_args.table_name, dedup_pkeys=["pk", "sk"]
119            )
120        )
121
122        logging.getLogger().setLevel(logging.INFO)
123        logging.info("Building pipeline ...")
124
125
126if __name__ == "__main__":
127    create_table(TABLE_NAME)
128    print(">> start pipeline...")
129    run()
130    print(">> check number of records...")
131    print(len(scan_table(TableName=TABLE_NAME)))
132    print(">> truncate table...")
133    truncate_table(TABLE_NAME)

We can run the pipeline on any runner that supports the Python SDK. Below shows an example of running the example pipeline on the Apache Flink Runner. Note that AWS related values (e.g. aws_access_key_id) can be specified as pipeline arguments because the package has a dedicated pipeline option (DynamoDBOptions) that parses them. Once the pipeline runs successfully, the script checks the number of records that are created by the connector followed by truncating the table.

1python examples/pipeline.py \
2    --runner=FlinkRunner \
3    --parallelism=1 \
4    --aws_access_key_id=$AWS_ACCESS_KEY_ID \
5    --aws_secret_access_key=$AWS_SECRET_ACCESS_KEY \
6    --region_name=$AWS_DEFAULT_REGION
 1>> create table...
 2>> start pipeline...
 3known_args - Namespace(table_name='dynamodb-pyio-test', num_records=500)
 4pipeline options - {'runner': 'FlinkRunner', 'save_main_session': True, 'parallelism': 1, 'aws_access_key_id': 'xxxxxxxxxxxxxxxxxxxx', 'aws_secret_access_key': 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx', 'region_name': 'us-east-1'}
 5...
 6INFO:root:total 100 elements processed...
 7INFO:root:total 100 elements processed...
 8INFO:root:total 200 elements processed...
 9INFO:root:total 100 elements processed...
10INFO:root:BatchElements statistics: element_count=500 batch_count=4 next_batch_size=101 timings=[(100, 0.9067182540893555), (200, 1.815131664276123), (100, 0.22190475463867188)]
11...
12>> check number of records...
13...
14250
15>> truncate table...

Note that the following warning messages are printed if it installs the grpcio (1.65.x) package (see this GitHub issue). You can downgrade the package version to avoid those messages (e.g. pip install grpcio==1.64.1).

1WARNING: All log messages before absl::InitializeLog() is called are written to STDERR
2I0000 00:00:1721526572.886302   78332 config.cc:230] gRPC experiments enabled: call_status_override_on_cancellation, event_engine_dns, event_engine_listener, http2_stats_fix, monitoring_experiment, pick_first_new, trace_record_callops, work_serializer_clears_time_cache
3I0000 00:00:1721526573.143589   78363 subchannel.cc:806] subchannel 0x7f4010001890 {address=ipv6:%5B::1%5D:58713, args={grpc.client_channel_factory=0x2ae79b0, grpc.default_authority=localhost:58713, grpc.internal.channel_credentials=0x297dda0, grpc.internal.client_channel_call_destination=0x7f407f3b23d0, grpc.internal.event_engine=0x7f4010013870, grpc.internal.security_connector=0x7f40100140e0, grpc.internal.subchannel_pool=0x21d3d00, grpc.max_receive_message_length=-1, grpc.max_send_message_length=-1, grpc.primary_user_agent=grpc-python/1.65.1, grpc.resource_quota=0x2a99310, grpc.server_uri=dns:///localhost:58713}}: connect failed (UNKNOWN:Failed to connect to remote host: connect: Connection refused (111) {created_time:"2024-07-21T11:49:33.143192444+10:00"}), backing off for 1000 ms

More Sink Connector Examples

More usage examples can be found in the unit testing cases. Some of them are covered here.

  1. The transform can process many records, thanks to the batch writer.
1def test_write_to_dynamodb_with_large_items(self):
2    # batch writer automatically handles buffering and sending items in batches
3    records = [{"pk": str(i), "sk": i} for i in range(5000)]
4    with TestPipeline(options=self.pipeline_opts) as p:
5        (p | beam.Create([records]) | WriteToDynamoDB(table_name=self.table_name))
6    self.assertListEqual(records, scan_table(TableName=self.table_name))
  1. Only the list or tuple types are supported PCollection elements. In the following example, individual dictionary elements are applied in the WriteToDynamoDB, and it raises the DynamoDBClientError.
1def test_write_to_dynamodb_with_unsupported_record_type(self):
2    # supported types are list or tuple where the second element is a list!
3    records = [{"pk": str(i), "sk": i} for i in range(20)]
4    with self.assertRaises(DynamoDBClientError):
5        with TestPipeline(options=self.pipeline_opts) as p:
6            (p | beam.Create(records) | WriteToDynamoDB(table_name=self.table_name))
  1. Incorrect key attribute data types throws an error.
 1def test_write_to_dynamodb_with_wrong_data_type(self):
 2    # pk and sk should be string and number respectively
 3    records = [{"pk": i, "sk": str(i)} for i in range(20)]
 4    with self.assertRaises(DynamoDBClientError):
 5        with TestPipeline(options=self.pipeline_opts) as p:
 6            (
 7                p
 8                | beam.Create([records])
 9                | WriteToDynamoDB(table_name=self.table_name)
10            )
  1. Duplicate records are not allowed unless the dedup_pkeys value is correctly specified.
 1def test_write_to_dynamodb_duplicate_records_without_dedup_keys(self):
 2    records = [{"pk": str(1), "sk": 1} for _ in range(20)]
 3    with self.assertRaises(DynamoDBClientError):
 4        with TestPipeline(options=self.pipeline_opts) as p:
 5            (
 6                p
 7                | beam.Create([records])
 8                | WriteToDynamoDB(table_name=self.table_name)
 9            )
10
11def test_write_to_dynamodb_duplicate_records_with_dedup_keys(self):
12    records = [{"pk": str(1), "sk": 1} for _ in range(20)]
13    with TestPipeline(options=self.pipeline_opts) as p:
14        (
15            p
16            | beam.Create([records])
17            | WriteToDynamoDB(table_name=self.table_name, dedup_pkeys=["pk", "sk"])
18        )
19    self.assertListEqual(records[:1], scan_table(TableName=self.table_name))
  1. We can control batches of elements further with the BatchElements or GroupIntoBatches transform.
 1def test_write_to_dynamodb_with_batch_elements(self):
 2    records = [{"pk": str(i), "sk": i} for i in range(20)]
 3    with TestPipeline(options=self.pipeline_opts) as p:
 4        (
 5            p
 6            | beam.Create(records)
 7            | BatchElements(min_batch_size=10, max_batch_size=10)
 8            | WriteToDynamoDB(table_name=self.table_name)
 9        )
10    self.assertListEqual(records, scan_table(TableName=self.table_name))
11
12def test_write_to_dynamodb_with_group_into_batches(self):
13    records = [(i % 2, {"pk": str(i), "sk": i}) for i in range(20)]
14    with TestPipeline(options=self.pipeline_opts) as p:
15        (
16            p
17            | beam.Create(records)
18            | GroupIntoBatches(batch_size=10)
19            | WriteToDynamoDB(table_name=self.table_name)
20        )
21    self.assertListEqual(
22        [r[1] for r in records], scan_table(TableName=self.table_name)
23    )