Amazon Data Firehose is a fully managed service for delivering real-time streaming data to destinations such as Amazon Simple Storage Service (Amazon S3), Amazon Redshift, Amazon OpenSearch Service and Amazon OpenSearch Serverless. The Apache Beam Python I/O connector for Amazon Data Firehose (firehose_pyio) provides a data sink feature that facilitates integration with those services.

Installation

The connector can be installed from PyPI.

1pip install firehose_pyio

Usage

Sink Connector

It has the main composite transform (WriteToFirehose), and it expects a list or tuple PCollection element. If the element is a tuple, the tuple’s first element is taken. If the element is not of the accepted types, you can apply the GroupIntoBatches or BatchElements transform beforehand. Then, the element is sent into a Firehose delivery stream using the put_record_batch method of the boto3 package. Note that the above batch transforms can also be useful to overcome the API limitation listed below.

  • Each PutRecordBatch request supports up to 500 records. Each record in the request can be as large as 1,000 KB (before base64 encoding), up to a limit of 4 MB for the entire request. These limits cannot be changed.

The transform also has options that control individual records as well as handle failed records.

  • jsonify - A flag that indicates whether to convert a record into JSON. Note that a record should be of bytes, bytearray or file-like object, and, if it is not of a supported type (e.g. integer), we can convert it into a Json string by specifying this flag to True.
  • multiline - A flag that indicates whether to add a new line character (\n) to each record. It is useful to save records into a CSV or Jsonline file.
  • max_trials - The maximum number of trials when there is one or more failed records - it defaults to 3. Note that failed records after all trials are returned, which allows users to determine how to handle them subsequently.
  • append_error - Whether to append error details to failed records. Defaults to True.

As mentioned earlier, failed elements are returned by a tagged output where it is named as write-to-firehose-failed-output by default. You can change the name by specifying a different name using the failed_output argument.

Sink Connector Example

The example shows how to put records to a Firehose delivery stream that delivers data into an S3 bucket. We first need to create a delivery stream and related resources using the following Python script. The source can be found in the examples folder of the connector repository.

 1# examples/create_resources.py
 2import json
 3import boto3
 4from botocore.exceptions import ClientError
 5
 6COMMON_NAME = "firehose-pyio-test"
 7
 8
 9def create_destination_bucket(bucket_name):
10    client = boto3.client("s3")
11    suffix = client._client_config.region_name
12    client.create_bucket(Bucket=f"{bucket_name}-{suffix}")
13
14
15def create_firehose_iam_role(role_name):
16    assume_role_policy_document = json.dumps(
17        {
18            "Version": "2012-10-17",
19            "Statement": [
20                {
21                    "Effect": "Allow",
22                    "Principal": {"Service": "firehose.amazonaws.com"},
23                    "Action": "sts:AssumeRole",
24                }
25            ],
26        }
27    )
28    client = boto3.client("iam")
29    try:
30        return client.get_role(RoleName=role_name)
31    except ClientError as error:
32        if error.response["Error"]["Code"] == "NoSuchEntity":
33            resp = client.create_role(
34                RoleName=role_name, AssumeRolePolicyDocument=assume_role_policy_document
35            )
36            client.attach_role_policy(
37                RoleName=role_name,
38                PolicyArn="arn:aws:iam::aws:policy/AmazonS3FullAccess",
39            )
40            return resp
41
42
43def create_delivery_stream(delivery_stream_name, role_arn, bucket_name):
44    client = boto3.client("firehose")
45    suffix = client._client_config.region_name
46    try:
47        client.create_delivery_stream(
48            DeliveryStreamName=delivery_stream_name,
49            DeliveryStreamType="DirectPut",
50            S3DestinationConfiguration={
51                "RoleARN": role_arn,
52                "BucketARN": f"arn:aws:s3:::{bucket_name}-{suffix}",
53                "BufferingHints": {"SizeInMBs": 1, "IntervalInSeconds": 0},
54            },
55        )
56    except ClientError as error:
57        if error.response["Error"]["Code"] == "ResourceInUseException":
58            pass
59        else:
60            raise error
61
62
63if __name__ == "__main__":
64    print("create a destination bucket...")
65    create_destination_bucket(COMMON_NAME)
66    print("create an iam role...")
67    iam_resp = create_firehose_iam_role(COMMON_NAME)
68    print("create a delivery stream...")
69    create_delivery_stream(COMMON_NAME, iam_resp["Role"]["Arn"], COMMON_NAME)

The main example script is constructed so that it (1) deletes all existing objects in the S3 bucket, (2) runs the example pipeline and (3) prints contents of the object(s) created by the pipeline.

The pipeline begins with creating sample elements where each element is a dictionary that has the id, name and created_at (created date time) attributes. Then, we apply the following two transforms before we apply the main transform (WriteToFirehose).

  • DatetimeToStr - It converts the created_at attribute values into string because the Python datetime class cannot be converted into Json by default.
  • BatchElements - It batches the elements into the minimum batch size of 50. It prevents the individual dictionary element from being pushed into the WriteToFirehose transform.

In the WriteToFirehose transform, it is configured that individual records are converted into JSON (jsonify=True) as well as a new line character is appended (multiline=True). The former is required because the Python dictionary is not a supported data type while the latter makes the records are saved as JSONLines.

  1# examples/pipeline.py
  2import argparse
  3import datetime
  4import random
  5import string
  6import logging
  7import boto3
  8import time
  9
 10import apache_beam as beam
 11from apache_beam.transforms.util import BatchElements
 12from apache_beam.options.pipeline_options import PipelineOptions
 13from apache_beam.options.pipeline_options import SetupOptions
 14
 15from firehose_pyio.io import WriteToFirehose
 16
 17
 18def get_all_contents(bucket_name):
 19    client = boto3.client("s3")
 20    bucket_objects = client.list_objects_v2(
 21        Bucket=f"{bucket_name}-{client._client_config.region_name}"
 22    )
 23    return bucket_objects.get("Contents") or []
 24
 25
 26def delete_all_objects(bucket_name):
 27    client = boto3.client("s3")
 28    contents = get_all_contents(bucket_name)
 29    for content in contents:
 30        client.delete_object(
 31            Bucket=f"{bucket_name}-{client._client_config.region_name}",
 32            Key=content["Key"],
 33        )
 34
 35
 36def print_bucket_contents(bucket_name):
 37    client = boto3.client("s3")
 38    contents = get_all_contents(bucket_name)
 39    for content in contents:
 40        resp = client.get_object(
 41            Bucket=f"{bucket_name}-{client._client_config.region_name}",
 42            Key=content["Key"],
 43        )
 44        print(f"Key - {content['Key']}")
 45        print(resp["Body"].read().decode())
 46
 47
 48def create_records(n=100):
 49    return [
 50        {
 51            "id": i,
 52            "name": "".join(random.choices(string.ascii_letters, k=5)).lower(),
 53            "created_at": datetime.datetime.now(),
 54        }
 55        for i in range(n)
 56    ]
 57
 58
 59def convert_ts(record: dict):
 60    record["created_at"] = record["created_at"].isoformat(timespec="milliseconds")
 61    return record
 62
 63
 64def mask_secrets(d: dict):
 65    return {k: (v if k.find("aws") < 0 else "x" * len(v)) for k, v in d.items()}
 66
 67
 68def run(argv=None, save_main_session=True):
 69    parser = argparse.ArgumentParser(description="Beam pipeline arguments")
 70    parser.add_argument(
 71        "--stream_name",
 72        default="firehose-pyio-test",
 73        type=str,
 74        help="Delivery stream name",
 75    )
 76    parser.add_argument(
 77        "--num_records", default="100", type=int, help="Number of records"
 78    )
 79    known_args, pipeline_args = parser.parse_known_args(argv)
 80
 81    pipeline_options = PipelineOptions(pipeline_args)
 82    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
 83    print(f"known_args - {known_args}")
 84    print(f"pipeline options - {mask_secrets(pipeline_options.display_data())}")
 85
 86    with beam.Pipeline(options=pipeline_options) as p:
 87        (
 88            p
 89            | "CreateElements" >> beam.Create(create_records(known_args.num_records))
 90            | "DatetimeToStr" >> beam.Map(convert_ts)
 91            | "BatchElements" >> BatchElements(min_batch_size=50)
 92            | "WriteToFirehose"
 93            >> WriteToFirehose(
 94                delivery_stream_name=known_args.stream_name,
 95                jsonify=True,
 96                multiline=True,
 97                max_trials=3,
 98            )
 99        )
100
101        logging.getLogger().setLevel(logging.WARN)
102        logging.info("Building pipeline ...")
103
104
105if __name__ == "__main__":
106    BUCKET_NAME = "firehose-pyio-test"
107    print(">> delete existing objects...")
108    delete_all_objects(BUCKET_NAME)
109    print(">> start pipeline...")
110    run()
111    time.sleep(1)
112    print(">> print bucket contents...")
113    print_bucket_contents(BUCKET_NAME)

We can run the pipeline on any runner that supports the Python SDK. Below shows an example of running the example pipeline on the Apache Flink Runner. Note that AWS related values (e.g. aws_access_key_id) can be specified as pipeline arguments because the package has a dedicated pipeline option (FirehoseOptions) that parses them. Once the pipeline runs successfully, the script continues to read the contents of the file object(s) that are created by the connector.

1python examples/pipeline.py \
2    --runner=FlinkRunner \
3    --parallelism=1 \
4    --aws_access_key_id=$AWS_ACCESS_KEY_ID \
5    --aws_secret_access_key=$AWS_SECRET_ACCESS_KEY \
6    --region_name=$AWS_DEFAULT_REGION
 1>> start pipeline...
 2known_args - Namespace(stream_name='firehose-pyio-test', num_records=100)
 3pipeline options - {'runner': 'FlinkRunner', 'save_main_session': True, 'parallelism': 1, 'aws_access_key_id': 'xxxxxxxxxxxxxxxxxxxx', 'aws_secret_access_key': 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx', 'region_name': 'us-east-1'}
 4>> print bucket contents...
 5Key - 2024/07/21/01/firehose-pyio-test-1-2024-07-21-01-49-51-5fd0fc8d-b656-4f7b-9bc6-5039975d941f
 6{"id": 50, "name": "amlis", "created_at": "2024-07-21T11:49:32.536"}
 7{"id": 51, "name": "bqvhr", "created_at": "2024-07-21T11:49:32.536"}
 8{"id": 52, "name": "stsbv", "created_at": "2024-07-21T11:49:32.536"}
 9{"id": 53, "name": "rttjg", "created_at": "2024-07-21T11:49:32.536"}
10{"id": 54, "name": "avozb", "created_at": "2024-07-21T11:49:32.536"}
11{"id": 55, "name": "fyesu", "created_at": "2024-07-21T11:49:32.536"}
12{"id": 56, "name": "pvxlw", "created_at": "2024-07-21T11:49:32.536"}
13{"id": 57, "name": "qyjlo", "created_at": "2024-07-21T11:49:32.536"}
14{"id": 58, "name": "smhns", "created_at": "2024-07-21T11:49:32.536"}
15...

Note that the following warning messages are printed if it installs the grpcio (1.65.x) package (see this GitHub issue). You can downgrade the package version to avoid those messages (eg pip install grpcio==1.64.1).

1WARNING: All log messages before absl::InitializeLog() is called are written to STDERR
2I0000 00:00:1721526572.886302   78332 config.cc:230] gRPC experiments enabled: call_status_override_on_cancellation, event_engine_dns, event_engine_listener, http2_stats_fix, monitoring_experiment, pick_first_new, trace_record_callops, work_serializer_clears_time_cache
3I0000 00:00:1721526573.143589   78363 subchannel.cc:806] subchannel 0x7f4010001890 {address=ipv6:%5B::1%5D:58713, args={grpc.client_channel_factory=0x2ae79b0, grpc.default_authority=localhost:58713, grpc.internal.channel_credentials=0x297dda0, grpc.internal.client_channel_call_destination=0x7f407f3b23d0, grpc.internal.event_engine=0x7f4010013870, grpc.internal.security_connector=0x7f40100140e0, grpc.internal.subchannel_pool=0x21d3d00, grpc.max_receive_message_length=-1, grpc.max_send_message_length=-1, grpc.primary_user_agent=grpc-python/1.65.1, grpc.resource_quota=0x2a99310, grpc.server_uri=dns:///localhost:58713}}: connect failed (UNKNOWN:Failed to connect to remote host: connect: Connection refused (111) {created_time:"2024-07-21T11:49:33.143192444+10:00"}), backing off for 1000 ms

More Sink Connector Examples

More usage examples can be found in the unit testing cases. Some of them are covered here.

  1. Only the list or tuple types are supported PCollection elements. In the following example, individual string elements are applied in the WriteToFirehose, and it raises the TypeError.
 1def test_write_to_firehose_with_unsupported_types(self):
 2    # only the list type is supported!
 3    with self.assertRaises(FirehoseClientError):
 4        with TestPipeline(options=self.pipeline_opts) as p:
 5            (
 6                p
 7                | beam.Create(["one", "two", "three", "four"])
 8                | WriteToFirehose(
 9                    delivery_stream_name=self.delivery_stream_name,
10                    jsonify=False,
11                    multiline=False,
12                )
13            )
  1. Jsonify the element if it is not of the bytes, bytearray or file-like object. In this example, the second element is a list of integers, and it should be converted into JSON (jsonify=True). Or we can convert it into string manually.
 1def test_write_to_firehose_with_list_elements(self):
 2    with TestPipeline(options=self.pipeline_opts) as p:
 3        output = (
 4            p
 5            | beam.Create([["one", "two", "three", "four"], [1, 2, 3, 4]])
 6            | WriteToFirehose(
 7                delivery_stream_name=self.delivery_stream_name,
 8                jsonify=True,
 9                multiline=False,
10            )
11        )
12        assert_that(output[None], equal_to([]))
13
14    bucket_contents = collect_bucket_contents(self.s3_client, self.bucket_name)
15    self.assertSetEqual(
16        set(bucket_contents), set(['"one""two""three""four"', "1234"])
17    )
  1. If an element is a tuple, its first element is applied to the WriteToFirehose transform.
 1def test_write_to_firehose_with_tuple_elements(self):
 2    with TestPipeline(options=self.pipeline_opts) as p:
 3        output = (
 4            p
 5            | beam.Create([(1, ["one", "two", "three", "four"]), (2, [1, 2, 3, 4])])
 6            | WriteToFirehose(
 7                delivery_stream_name=self.delivery_stream_name,
 8                jsonify=True,
 9                multiline=False,
10            )
11        )
12        assert_that(output[None], equal_to([]))
13
14    bucket_contents = collect_bucket_contents(self.s3_client, self.bucket_name)
15    self.assertSetEqual(
16        set(bucket_contents), set(['"one""two""three""four"', "1234"])
17    )
  1. We can batch an element if it is not of the supported types. Note that, a new line character (\n) is appended to each record, and it is particularly useful for saving a CSV or JSONLines file to S3.
 1def test_write_to_firehose_with_list_multilining(self):
 2    with TestPipeline(options=self.pipeline_opts) as p:
 3        output = (
 4            p
 5            | beam.Create(["one", "two", "three", "four"])
 6            | BatchElements(min_batch_size=2, max_batch_size=2)
 7            | WriteToFirehose(
 8                delivery_stream_name=self.delivery_stream_name,
 9                jsonify=False,
10                multiline=True,
11            )
12        )
13        assert_that(output[None], equal_to([]))
14
15    bucket_contents = collect_bucket_contents(self.s3_client, self.bucket_name)
16    self.assertSetEqual(set(bucket_contents), set(["one\ntwo\n", "three\nfour\n"]))
17
18def test_write_to_firehose_with_tuple_multilining(self):
19    with TestPipeline(options=self.pipeline_opts) as p:
20        output = (
21            p
22            | beam.Create([(1, "one"), (2, "three"), (1, "two"), (2, "four")])
23            | GroupIntoBatches(batch_size=2)
24            | WriteToFirehose(
25                delivery_stream_name=self.delivery_stream_name,
26                jsonify=False,
27                multiline=True,
28            )
29        )
30        assert_that(output[None], equal_to([]))
31
32    bucket_contents = collect_bucket_contents(self.s3_client, self.bucket_name)
33    self.assertSetEqual(set(bucket_contents), set(["one\ntwo\n", "three\nfour\n"]))
  1. Failed records after all trials are returned by a tagged output. We can configure the number of trials (max_trials) and whether to append error details (append_error).
 1class TestRetryLogic(unittest.TestCase):
 2    # default failed output name
 3    failed_output = "write-to-firehose-failed-output"
 4
 5    def test_write_to_firehose_retry_with_no_failed_element(self):
 6        with TestPipeline() as p:
 7            output = (
 8                p
 9                | beam.Create(["one", "two", "three", "four"])
10                | BatchElements(min_batch_size=4)
11                | WriteToFirehose(
12                    delivery_stream_name="non-existing-delivery-stream",
13                    jsonify=False,
14                    multiline=False,
15                    max_trials=3,
16                    append_error=False,
17                    fake_config={"num_success": 2},
18                )
19            )
20            assert_that(output[self.failed_output], equal_to([]))
21
22    def test_write_to_firehose_retry_failed_element_without_appending_error(self):
23        with TestPipeline() as p:
24            output = (
25                p
26                | beam.Create(["one", "two", "three", "four"])
27                | BatchElements(min_batch_size=4)
28                | WriteToFirehose(
29                    delivery_stream_name="non-existing-delivery-stream",
30                    jsonify=False,
31                    multiline=False,
32                    max_trials=3,
33                    append_error=False,
34                    fake_config={"num_success": 1},
35                )
36            )
37            assert_that(output[self.failed_output], equal_to(["four"]))
38
39    def test_write_to_firehose_retry_failed_element_with_appending_error(self):
40        with TestPipeline() as p:
41            output = (
42                p
43                | beam.Create(["one", "two", "three", "four"])
44                | BatchElements(min_batch_size=4)
45                | WriteToFirehose(
46                    delivery_stream_name="non-existing-delivery-stream",
47                    jsonify=False,
48                    multiline=False,
49                    max_trials=3,
50                    append_error=True,
51                    fake_config={"num_success": 1},
52                )
53            )
54            assert_that(
55                output[self.failed_output],
56                equal_to(
57                    [("four", {"ErrorCode": "Error", "ErrorMessage": "This error"})]
58                ),
59            )