dynamodb_pyio
Amazon DynamoDB is a serverless, NoSQL database service that allows you to develop modern applications at any scale. The Apache Beam Python I/O connector for Amazon DynamoDB (dynamodb_pyio
) aims to integrate with the database service by supporting source and sink connectors. Currently, the sink connector is available.
Installation
The connector can be installed from PyPI.
$ pip install dynamodb_pyio
Usage
Sink Connector
It has the main composite transform (WriteToDynamoDB
), and it expects a list or tuple PCollection element. If the element is a tuple, the tuple’s first element is taken. If the element is not of the accepted types, you can apply the GroupIntoBatches
or BatchElements
transform beforehand. Then, the records of the element are written to a DynamoDB table with help of the batch_writer
of the boto3 package. Note that the batch writer will automatically handle buffering and sending items in batches. In addition, it will also automatically handle any unprocessed items and resend them as needed.
The transform also has an option that handles duplicate records.
dedup_pkeys - List of keys to be used for deduplicating items in buffer.
Sink Connector Example
The transform can process many records, thanks to the batch writer.
import apache_beam as beam
from dynamodb_pyio.io import WriteToDynamoDB
records = [{"pk": str(i), "sk": i} for i in range(500)]
with beam.Pipeline() as p:
(
p
| beam.Create([records])
| WriteToDynamoDB(table_name=self.table_name)
)
Duplicate records can be handled using the dedup_pkeys option.
import apache_beam as beam
from dynamodb_pyio.io import WriteToDynamoDB
records = [{"pk": str(1), "sk": 1} for _ in range(20)]
with beam.Pipeline() as p:
(
p
| beam.Create([records])
| WriteToDynamoDB(table_name=self.table_name, dedup_pkeys=["pk", "sk"])
)
Batches of elements can be controlled further with the BatchElements
or GroupIntoBatches
transform
import apache_beam as beam
from apache_beam.transforms.util import BatchElements
from dynamodb_pyio.io import WriteToDynamoDB
records = [{"pk": str(i), "sk": i} for i in range(100)]
with beam.Pipeline() as p:
(
p
| beam.Create(records)
| BatchElements(min_batch_size=50, max_batch_size=50)
| WriteToDynamoDB(table_name=self.table_name)
)
See Introduction to DynamoDB PyIO Sink Connector for more examples.
Contributing
Interested in contributing? Check out the contributing guidelines. Please note that this project is released with a Code of Conduct. By contributing to this project, you agree to abide by its terms.
License
dynamodb_pyio
was created as part of the Apache Beam Python I/O Connectors project. It is licensed under the terms of the Apache License 2.0 license.
Credits
dynamodb_pyio
was created with cookiecutter
and the pyio-cookiecutter
template.