Amazon Simple Queue Service (Amazon SQS) offers a secure, durable, and available hosted queue that lets you integrate and decouple distributed software systems and components. The Apache Beam Python I/O connector for Amazon SQS (sqs_pyio) aims to integrate with the queue service by supporting source and sink connectors. Currently, the sink connector is available.

Installation

The connector can be installed from PyPI.

1pip install sqs_pyio

Usage

Sink Connector

It has the main composite transform (WriteToSqs), and it expects a list or tuple PCollection element. If the element is a tuple, the tuple’s first element is taken. If the element is not of the accepted types, you can apply the GroupIntoBatches or BatchElements transform beforehand. Then, the element is sent into a SQS queue using the send_message_batch method of the boto3 package. Note that the above batch transforms can also be useful to overcome the API limitation listed below.

  • Each SendMessageBatch request supports up to 10 messages. The maximum allowed individual message size and the maximum total payload size (the sum of the individual lengths of all the batched messages) are both 256 KiB (262,144 bytes).

The transform also has options that handle failed records as listed below.

  • max_trials - The maximum number of trials when there is one or more failed records - it defaults to 3. Note that failed records after all trials are returned by a tagged output, which allows users to determine how to handle them subsequently.
  • append_error - Whether to append error details to failed records. Defaults to True.

As mentioned earlier, failed elements are returned by a tagged output where it is named as write-to-sqs-failed-output by default. You can change the name by specifying a different name using the failed_output argument.

Sink Connector Example

The example shows how to send messages in batch to a SQS queue using the sink connector and check the approximate number of messages in the queue. The source can be found in the examples folder of the connector repository.

The pipeline begins with creating sample elements where each element is a dictionary that has the Id and MessageBody attributes. Then, we apply the BatchElements transform where the minimum and maximum batch sizes are set to 10. It prevents the individual dictionary element from being pushed into the WriteToSqs transform. Also, it allows us to bypass the API limitation. Finally, in the WriteToSqs transform, it is configured that a maximum of three trials are made when there are failed elements (max_trials=3) and error details are appended to failed elements (append_error=True).

  1import argparse
  2import time
  3import logging
  4
  5import boto3
  6from botocore.exceptions import ClientError
  7
  8import apache_beam as beam
  9from apache_beam.transforms.util import BatchElements
 10from apache_beam.options.pipeline_options import PipelineOptions
 11from apache_beam.options.pipeline_options import SetupOptions
 12
 13from sqs_pyio.io import WriteToSqs
 14
 15QUEUE_NAME = "sqs-pyio-test"
 16
 17
 18def split_into_chunks(max_num, size=10):
 19    lst = list(range(max_num))
 20    for i in range(0, max_num, size):
 21        yield lst[i : i + size]
 22
 23
 24def send_message_batch(queue_name, max_num):
 25    client = boto3.client("sqs")
 26    queue_url = get_queue_url(queue_name)
 27    chunks = split_into_chunks(max_num)
 28    for chunk in chunks:
 29        print(f"sending {len(chunk)} messages...")
 30        records = [{"Id": str(i), "MessageBody": str(i)} for i in chunk]
 31        client.send_message_batch(QueueUrl=queue_url, Entries=records)
 32
 33
 34def get_queue_url(queue_name):
 35    client = boto3.client("sqs")
 36    try:
 37        return client.get_queue_url(QueueName=queue_name)["QueueUrl"]
 38    except ClientError as error:
 39        if error.response["Error"]["QueryErrorCode"] == "QueueDoesNotExist":
 40            client.create_queue(QueueName=queue_name)
 41            return client.get_queue_url(QueueName=queue_name)["QueueUrl"]
 42        else:
 43            raise error
 44
 45
 46def purge_queue(queue_name):
 47    client = boto3.client("sqs")
 48    queue_url = get_queue_url(queue_name)
 49    try:
 50        client.purge_queue(QueueUrl=queue_url)
 51    except ClientError as error:
 52        if error.response["Error"]["QueryErrorCode"] != "PurgeQueueInProgress":
 53            raise error
 54
 55
 56def check_number_of_messages(queue_name):
 57    client = boto3.client("sqs")
 58    queue_url = get_queue_url(queue_name)
 59    resp = client.get_queue_attributes(
 60        QueueUrl=queue_url, AttributeNames=["ApproximateNumberOfMessages"]
 61    )
 62    return f'{resp["Attributes"]["ApproximateNumberOfMessages"]} messages are found approximately!'
 63
 64
 65def mask_secrets(d: dict):
 66    return {k: (v if k.find("aws") < 0 else "x" * len(v)) for k, v in d.items()}
 67
 68
 69def run(argv=None, save_main_session=True):
 70    parser = argparse.ArgumentParser(description="Beam pipeline arguments")
 71    parser.add_argument(
 72        "--queue_name", default=QUEUE_NAME, type=str, help="SQS queue name"
 73    )
 74    parser.add_argument(
 75        "--num_records", default="100", type=int, help="Number of records"
 76    )
 77    known_args, pipeline_args = parser.parse_known_args(argv)
 78
 79    pipeline_options = PipelineOptions(pipeline_args)
 80    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
 81    print(f"known_args - {known_args}")
 82    print(f"pipeline options - {mask_secrets(pipeline_options.display_data())}")
 83
 84    with beam.Pipeline(options=pipeline_options) as p:
 85        (
 86            p
 87            | "CreateElements"
 88            >> beam.Create(
 89                [
 90                    {"Id": str(i), "MessageBody": str(i)}
 91                    for i in range(known_args.num_records)
 92                ]
 93            )
 94            | "BatchElements" >> BatchElements(min_batch_size=10, max_batch_size=10)
 95            | "WriteToSqs"
 96            >> WriteToSqs(
 97                queue_name=known_args.queue_name,
 98                max_trials=3,
 99                append_error=True,
100                failed_output="my-failed-output",
101            )
102        )
103
104        logging.getLogger().setLevel(logging.INFO)
105        logging.info("Building pipeline ...")
106
107
108if __name__ == "__main__":
109    # check if queue exists
110    get_queue_url(QUEUE_NAME)
111    print(">> start pipeline...")
112    run()
113    time.sleep(1)
114    print(check_number_of_messages(QUEUE_NAME))
115    print(">> purge existing messages...")
116    purge_queue(QUEUE_NAME)

We can run the pipeline on any runner that supports the Python SDK. Below shows an example of running the example pipeline on the Apache Flink Runner. Note that AWS related values (e.g. aws_access_key_id) can be specified as pipeline arguments because the package has a dedicated pipeline option (SqsOptions) that parses them. Once the pipeline runs successfully, the script checks the approximate number of messages that are created by the connector.

1python examples/pipeline.py \
2    --runner=FlinkRunner \
3    --parallelism=1 \
4    --aws_access_key_id=$AWS_ACCESS_KEY_ID \
5    --aws_secret_access_key=$AWS_SECRET_ACCESS_KEY \
6    --region_name=$AWS_DEFAULT_REGION
 1>> start pipeline...
 2known_args - Namespace(queue_name='sqs-pyio-test', num_records=100)
 3pipeline options - {'runner': 'FlinkRunner', 'save_main_session': True, 'parallelism': 1, 'aws_access_key_id': 'xxxxxxxxxxxxxxxxxxxx', 'aws_secret_access_key': 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx', 'region_name': 'us-east-1'}
 4...
 5INFO:root:total 10, succeeded 10, failed 0...
 6INFO:root:total 10, succeeded 10, failed 0...
 7INFO:root:total 10, succeeded 10, failed 0...
 8INFO:root:total 10, succeeded 10, failed 0...
 9INFO:root:total 10, succeeded 10, failed 0...
10INFO:root:total 10, succeeded 10, failed 0...
11INFO:root:total 10, succeeded 10, failed 0...
12INFO:root:total 10, succeeded 10, failed 0...
13INFO:root:total 10, succeeded 10, failed 0...
14INFO:root:total 10, succeeded 10, failed 0...
15INFO:root:BatchElements statistics: element_count=100 batch_count=10 next_batch_size=10 timings=[(10, 0.44828295707702637), (10, 0.4457244873046875), (10, 0.4468190670013428), (10, 0.4548039436340332), (10, 0.4473147392272949), (10, 0.4668114185333252), (10, 0.4462318420410156), (10, 0.4522392749786377), (10, 0.44727039337158203)]
16...
17100 messages are found approximately!
18>> purge existing messages...

Note that the following warning messages are printed if it installs the grpcio (1.65.x) package (see this GitHub issue). You can downgrade the package version to avoid those messages (e.g. pip install grpcio==1.64.1).

1WARNING: All log messages before absl::InitializeLog() is called are written to STDERR
2I0000 00:00:1721526572.886302   78332 config.cc:230] gRPC experiments enabled: call_status_override_on_cancellation, event_engine_dns, event_engine_listener, http2_stats_fix, monitoring_experiment, pick_first_new, trace_record_callops, work_serializer_clears_time_cache
3I0000 00:00:1721526573.143589   78363 subchannel.cc:806] subchannel 0x7f4010001890 {address=ipv6:%5B::1%5D:58713, args={grpc.client_channel_factory=0x2ae79b0, grpc.default_authority=localhost:58713, grpc.internal.channel_credentials=0x297dda0, grpc.internal.client_channel_call_destination=0x7f407f3b23d0, grpc.internal.event_engine=0x7f4010013870, grpc.internal.security_connector=0x7f40100140e0, grpc.internal.subchannel_pool=0x21d3d00, grpc.max_receive_message_length=-1, grpc.max_send_message_length=-1, grpc.primary_user_agent=grpc-python/1.65.1, grpc.resource_quota=0x2a99310, grpc.server_uri=dns:///localhost:58713}}: connect failed (UNKNOWN:Failed to connect to remote host: connect: Connection refused (111) {created_time:"2024-07-21T11:49:33.143192444+10:00"}), backing off for 1000 ms

More Sink Connector Examples

More usage examples can be found in the unit testing cases. Some of them are covered here.

  1. Only the list or tuple types are supported PCollection elements. In the following example, individual string elements are applied in the WriteToFirehose, and it raises the SqsClientError.
1def test_write_to_sqs_with_unsupported_record_type(self):
2    # only the list type is supported!
3    records = [{"Id": str(i), "MessageBody": str(i)} for i in range(3)]
4    with self.assertRaises(SqsClientError):
5        with TestPipeline(options=self.pipeline_opts) as p:
6            (p | beam.Create(records) | WriteToSqs(queue_name=self.queue_name))
  1. Both the list and tuple types are supported. Note the main output is tagged as None, and we can check the elements by specifying the tag value (i.e. output[None]).
 1def test_write_to_sqs_with_list_element(self):
 2    records = [{"Id": str(i), "MessageBody": str(i)} for i in range(3)]
 3    with TestPipeline(options=self.pipeline_opts) as p:
 4        output = p | beam.Create([records]) | WriteToSqs(queue_name=self.queue_name)
 5        assert_that(output[None], equal_to([]))
 6
 7def test_write_to_sqs_with_tuple_element(self):
 8    records = [{"Id": str(i), "MessageBody": str(i)} for i in range(3)]
 9    with TestPipeline(options=self.pipeline_opts) as p:
10        output = (
11            p
12            | beam.Create([("key", records)])
13            | WriteToSqs(queue_name=self.queue_name)
14        )
15        assert_that(output[None], equal_to([]))
  1. The Id and MessageBody attributes are mandatory, and they should be the string type.
 1def test_write_to_sqs_with_incorrect_message_data_type(self):
 2    # Id should be string
 3    records = [{"Id": i, "MessageBody": str(i)} for i in range(3)]
 4    with self.assertRaises(SqsClientError):
 5        with TestPipeline(options=self.pipeline_opts) as p:
 6            (p | beam.Create([records]) | WriteToSqs(queue_name=self.queue_name))
 7
 8    # MessageBody should be string
 9    records = [{"Id": str(i), "MessageBody": i} for i in range(3)]
10    with self.assertRaises(SqsClientError):
11        with TestPipeline(options=self.pipeline_opts) as p:
12            (p | beam.Create([records]) | WriteToSqs(queue_name=self.queue_name))
  1. We can batch the elements with the BatchElements or GroupIntoBatches transform.
 1def test_write_to_sqs_with_batch_elements(self):
 2    # BatchElements groups unkeyed elements into a list
 3    records = [{"Id": str(i), "MessageBody": str(i)} for i in range(3)]
 4    with TestPipeline(options=self.pipeline_opts) as p:
 5        output = (
 6            p
 7            | beam.Create(records)
 8            | BatchElements(min_batch_size=2, max_batch_size=2)
 9            | WriteToSqs(queue_name=self.queue_name)
10        )
11        assert_that(output[None], equal_to([]))
12
13def test_write_to_sqs_with_group_into_batches(self):
14    # GroupIntoBatches groups keyed elements into a list
15    records = [(i % 2, {"Id": str(i), "MessageBody": str(i)}) for i in range(3)]
16    with TestPipeline(options=self.pipeline_opts) as p:
17        output = (
18            p
19            | beam.Create(records)
20            | GroupIntoBatches(batch_size=2)
21            | WriteToSqs(queue_name=self.queue_name)
22        )
23        assert_that(output[None], equal_to([]))
  1. Failed records after all trials are returned by a tagged output. We can configure the number of trials (max_trials) and whether to append error details (append_error).
 1class TestRetryLogic(unittest.TestCase):
 2    # default failed output name
 3    failed_output = "write-to-sqs-failed-output"
 4
 5    def test_write_to_sqs_retry_no_failed_element(self):
 6        records = [{"Id": str(i), "MessageBody": str(i)} for i in range(4)]
 7        with TestPipeline() as p:
 8            output = (
 9                p
10                | beam.Create(records)
11                | BatchElements(min_batch_size=4)
12                | WriteToSqs(
13                    queue_name="test-sqs-queue",
14                    max_trials=3,
15                    append_error=True,
16                    fake_config={"num_success": 2},
17                )
18            )
19            assert_that(output[None], equal_to([]))
20
21    def test_write_to_sqs_retry_failed_element_without_appending_error(self):
22        records = [{"Id": str(i), "MessageBody": str(i)} for i in range(4)]
23        with TestPipeline() as p:
24            output = (
25                p
26                | beam.Create(records)
27                | BatchElements(min_batch_size=4)
28                | WriteToSqs(
29                    queue_name="test-sqs-queue",
30                    max_trials=3,
31                    append_error=False,
32                    fake_config={"num_success": 1},
33                )
34            )
35
36            assert_that(
37                output[self.failed_output],
38                equal_to([{"Id": "3", "MessageBody": "3"}]),
39            )
40
41    def test_write_to_sqs_retry_failed_element_with_appending_error(self):
42        records = [{"Id": str(i), "MessageBody": str(i)} for i in range(4)]
43        with TestPipeline() as p:
44            output = (
45                p
46                | beam.Create(records)
47                | BatchElements(min_batch_size=4)
48                | WriteToSqs(
49                    queue_name="test-sqs-queue",
50                    max_trials=3,
51                    append_error=True,
52                    fake_config={"num_success": 1},
53                )
54            )
55            assert_that(
56                output[self.failed_output],
57                equal_to(
58                    [
59                        (
60                            {"Id": "3", "MessageBody": "3"},
61                            {
62                                "Id": "3",
63                                "SenderFault": False,
64                                "Code": "error-code",
65                                "Message": "error-message",
66                            },
67                        ),
68                    ]
69                ),
70            )