Amazon Data Firehose is a fully managed service for delivering real-time streaming data to destinations such as Amazon Simple Storage Service (Amazon S3), Amazon Redshift, Amazon OpenSearch Service and Amazon OpenSearch Serverless. The Apache Beam Python I/O connector for Amazon Data Firehose (firehose_pyio
) provides a data sink feature that facilitates integration with those services.
Installation
The connector can be installed from PyPI.
1pip install firehose_pyio
Usage
Sink Connector
It has the main composite transform (WriteToFirehose
), and it expects a list or tuple PCollection element. If the element is a tuple, the tuple’s first element is taken. If the element is not of the accepted types, you can apply the GroupIntoBatches
or BatchElements
transform beforehand. Then, the records of the element are sent into a Firehose delivery stream using the put_record_batch
method of the boto3 package. Note that the above batch transforms can also be useful to overcome the API limitation listed below.
- Each
PutRecordBatch
request supports up to 500 records. Each record in the request can be as large as 1,000 KB (before base64 encoding), up to a limit of 4 MB for the entire request. These limits cannot be changed.
The transform also has options that control individual records as well as handle failed records.
- jsonify - A flag that indicates whether to convert a record into JSON. Note that a record should be of bytes, bytearray or file-like object, and, if it is not of a supported type (e.g. integer), we can convert it into a Json string by specifying this flag to True.
- multiline - A flag that indicates whether to add a new line character (
\n
) to each record. It is useful to save records into a CSV or Jsonline file. - max_trials - The maximum number of trials when there is one or more failed records - it defaults to 3. Note that failed records after all trials are returned, which allows users to determine how to handle them subsequently.
- append_error - Whether to append error details to failed records. Defaults to True.
As mentioned earlier, failed elements are returned by a tagged output where it is named as write-to-firehose-failed-output
by default. You can change the name by specifying a different name using the failed_output
argument.
Sink Connector Example
The example shows how to put records to a Firehose delivery stream that delivers data into an S3 bucket. We first need to create a delivery stream and related resources using the following Python script. The source can be found in the examples folder of the connector repository.
1# create_resources.py
2import json
3import boto3
4from botocore.exceptions import ClientError
5
6COMMON_NAME = "firehose-pyio-test"
7
8
9def create_destination_bucket(bucket_name):
10 client = boto3.client("s3")
11 suffix = client._client_config.region_name
12 client.create_bucket(Bucket=f"{bucket_name}-{suffix}")
13
14
15def create_firehose_iam_role(role_name):
16 assume_role_policy_document = json.dumps(
17 {
18 "Version": "2012-10-17",
19 "Statement": [
20 {
21 "Effect": "Allow",
22 "Principal": {"Service": "firehose.amazonaws.com"},
23 "Action": "sts:AssumeRole",
24 }
25 ],
26 }
27 )
28 client = boto3.client("iam")
29 try:
30 return client.get_role(RoleName=role_name)
31 except ClientError as error:
32 if error.response["Error"]["Code"] == "NoSuchEntity":
33 resp = client.create_role(
34 RoleName=role_name, AssumeRolePolicyDocument=assume_role_policy_document
35 )
36 client.attach_role_policy(
37 RoleName=role_name,
38 PolicyArn="arn:aws:iam::aws:policy/AmazonS3FullAccess",
39 )
40 return resp
41
42
43def create_delivery_stream(delivery_stream_name, role_arn, bucket_name):
44 client = boto3.client("firehose")
45 suffix = client._client_config.region_name
46 try:
47 client.create_delivery_stream(
48 DeliveryStreamName=delivery_stream_name,
49 DeliveryStreamType="DirectPut",
50 S3DestinationConfiguration={
51 "RoleARN": role_arn,
52 "BucketARN": f"arn:aws:s3:::{bucket_name}-{suffix}",
53 "BufferingHints": {"SizeInMBs": 1, "IntervalInSeconds": 0},
54 },
55 )
56 except ClientError as error:
57 if error.response["Error"]["Code"] == "ResourceInUseException":
58 pass
59 else:
60 raise error
61
62
63if __name__ == "__main__":
64 print("create a destination bucket...")
65 create_destination_bucket(COMMON_NAME)
66 print("create an iam role...")
67 iam_resp = create_firehose_iam_role(COMMON_NAME)
68 print("create a delivery stream...")
69 create_delivery_stream(COMMON_NAME, iam_resp["Role"]["Arn"], COMMON_NAME)
The main example script is constructed so that it (1) deletes all existing objects in the S3 bucket, (2) runs the example pipeline and (3) prints contents of the object(s) created by the pipeline.
The pipeline begins with creating sample elements where each element is a dictionary that has the id
, name
and created_at
(created date time) attributes. Then, we apply the following two transforms before we apply the main transform (WriteToFirehose
).
DatetimeToStr
- It converts thecreated_at
attribute values into string because the Pythondatetime
class cannot be converted into Json by default.BatchElements
- It batches the elements into the minimum batch size of 50. It prevents the individual dictionary element from being pushed into theWriteToFirehose
transform.
In the WriteToFirehose
transform, it is configured that individual records are converted into JSON (jsonify=True
) as well as a new line character is appended (multiline=True
). The former is required because the Python dictionary is not a supported data type while the latter makes the records are saved as JSONLines.
1# pipeline.py
2import argparse
3import datetime
4import random
5import string
6import logging
7import boto3
8import time
9
10import apache_beam as beam
11from apache_beam.transforms.util import BatchElements
12from apache_beam.options.pipeline_options import PipelineOptions
13from apache_beam.options.pipeline_options import SetupOptions
14
15from firehose_pyio.io import WriteToFirehose
16
17
18def get_all_contents(bucket_name):
19 client = boto3.client("s3")
20 bucket_objects = client.list_objects_v2(
21 Bucket=f"{bucket_name}-{client._client_config.region_name}"
22 )
23 return bucket_objects.get("Contents") or []
24
25
26def delete_all_objects(bucket_name):
27 client = boto3.client("s3")
28 contents = get_all_contents(bucket_name)
29 for content in contents:
30 client.delete_object(
31 Bucket=f"{bucket_name}-{client._client_config.region_name}",
32 Key=content["Key"],
33 )
34
35
36def print_bucket_contents(bucket_name):
37 client = boto3.client("s3")
38 contents = get_all_contents(bucket_name)
39 for content in contents:
40 resp = client.get_object(
41 Bucket=f"{bucket_name}-{client._client_config.region_name}",
42 Key=content["Key"],
43 )
44 print(f"Key - {content['Key']}")
45 print(resp["Body"].read().decode())
46
47
48def create_records(n=100):
49 return [
50 {
51 "id": i,
52 "name": "".join(random.choices(string.ascii_letters, k=5)).lower(),
53 "created_at": datetime.datetime.now(),
54 }
55 for i in range(n)
56 ]
57
58
59def convert_ts(record: dict):
60 record["created_at"] = record["created_at"].isoformat(timespec="milliseconds")
61 return record
62
63
64def mask_secrets(d: dict):
65 return {k: (v if k.find("aws") < 0 else "x" * len(v)) for k, v in d.items()}
66
67
68def run(argv=None, save_main_session=True):
69 parser = argparse.ArgumentParser(description="Beam pipeline arguments")
70 parser.add_argument(
71 "--stream_name",
72 default="firehose-pyio-test",
73 type=str,
74 help="Delivery stream name",
75 )
76 parser.add_argument(
77 "--num_records", default="100", type=int, help="Number of records"
78 )
79 known_args, pipeline_args = parser.parse_known_args(argv)
80
81 pipeline_options = PipelineOptions(pipeline_args)
82 pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
83 print(f"known_args - {known_args}")
84 print(f"pipeline options - {mask_secrets(pipeline_options.display_data())}")
85
86 with beam.Pipeline(options=pipeline_options) as p:
87 (
88 p
89 | "CreateElements" >> beam.Create(create_records(known_args.num_records))
90 | "DatetimeToStr" >> beam.Map(convert_ts)
91 | "BatchElements" >> BatchElements(min_batch_size=50)
92 | "WriteToFirehose"
93 >> WriteToFirehose(
94 delivery_stream_name=known_args.stream_name,
95 jsonify=True,
96 multiline=True,
97 max_trials=3,
98 )
99 )
100
101 logging.getLogger().setLevel(logging.WARN)
102 logging.info("Building pipeline ...")
103
104
105if __name__ == "__main__":
106 BUCKET_NAME = "firehose-pyio-test"
107 print(">> delete existing objects...")
108 delete_all_objects(BUCKET_NAME)
109 print(">> start pipeline...")
110 run()
111 time.sleep(1)
112 print(">> print bucket contents...")
113 print_bucket_contents(BUCKET_NAME)
We can run the pipeline on any runner that supports the Python SDK. Below shows an example of running the example pipeline on the Apache Flink Runner. Note that AWS related values (e.g. aws_access_key_id
) can be specified as pipeline arguments because the package has a dedicated pipeline option (FirehoseOptions
) that parses them. Once the pipeline runs successfully, the script continues to read the contents of the file object(s) that are created by the connector.
1python examples/pipeline.py \
2 --runner=FlinkRunner \
3 --parallelism=1 \
4 --aws_access_key_id=$AWS_ACCESS_KEY_ID \
5 --aws_secret_access_key=$AWS_SECRET_ACCESS_KEY \
6 --region_name=$AWS_DEFAULT_REGION
1>> start pipeline...
2known_args - Namespace(stream_name='firehose-pyio-test', num_records=100)
3pipeline options - {'runner': 'FlinkRunner', 'save_main_session': True, 'parallelism': 1, 'aws_access_key_id': 'xxxxxxxxxxxxxxxxxxxx', 'aws_secret_access_key': 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx', 'region_name': 'us-east-1'}
4>> print bucket contents...
5Key - 2024/07/21/01/firehose-pyio-test-1-2024-07-21-01-49-51-5fd0fc8d-b656-4f7b-9bc6-5039975d941f
6{"id": 50, "name": "amlis", "created_at": "2024-07-21T11:49:32.536"}
7{"id": 51, "name": "bqvhr", "created_at": "2024-07-21T11:49:32.536"}
8{"id": 52, "name": "stsbv", "created_at": "2024-07-21T11:49:32.536"}
9{"id": 53, "name": "rttjg", "created_at": "2024-07-21T11:49:32.536"}
10{"id": 54, "name": "avozb", "created_at": "2024-07-21T11:49:32.536"}
11{"id": 55, "name": "fyesu", "created_at": "2024-07-21T11:49:32.536"}
12{"id": 56, "name": "pvxlw", "created_at": "2024-07-21T11:49:32.536"}
13{"id": 57, "name": "qyjlo", "created_at": "2024-07-21T11:49:32.536"}
14{"id": 58, "name": "smhns", "created_at": "2024-07-21T11:49:32.536"}
15...
Note that the following warning messages are printed if it installs the grpcio (1.65.x) package (see this GitHub issue). You can downgrade the package version to avoid those messages (eg pip install grpcio==1.64.1
).
1WARNING: All log messages before absl::InitializeLog() is called are written to STDERR
2I0000 00:00:1721526572.886302 78332 config.cc:230] gRPC experiments enabled: call_status_override_on_cancellation, event_engine_dns, event_engine_listener, http2_stats_fix, monitoring_experiment, pick_first_new, trace_record_callops, work_serializer_clears_time_cache
3I0000 00:00:1721526573.143589 78363 subchannel.cc:806] subchannel 0x7f4010001890 {address=ipv6:%5B::1%5D:58713, args={grpc.client_channel_factory=0x2ae79b0, grpc.default_authority=localhost:58713, grpc.internal.channel_credentials=0x297dda0, grpc.internal.client_channel_call_destination=0x7f407f3b23d0, grpc.internal.event_engine=0x7f4010013870, grpc.internal.security_connector=0x7f40100140e0, grpc.internal.subchannel_pool=0x21d3d00, grpc.max_receive_message_length=-1, grpc.max_send_message_length=-1, grpc.primary_user_agent=grpc-python/1.65.1, grpc.resource_quota=0x2a99310, grpc.server_uri=dns:///localhost:58713}}: connect failed (UNKNOWN:Failed to connect to remote host: connect: Connection refused (111) {created_time:"2024-07-21T11:49:33.143192444+10:00"}), backing off for 1000 ms
More Sink Connector Examples
More usage examples can be found in the unit testing cases. Some of them are covered here.
- Only the list or tuple types are supported PCollection elements. In the following example, individual string elements are applied in the
WriteToFirehose
, and it raises theTypeError
.
1def test_write_to_firehose_with_unsupported_types(self):
2 # only the list type is supported!
3 with self.assertRaises(FirehoseClientError):
4 with TestPipeline(options=self.pipeline_opts) as p:
5 (
6 p
7 | beam.Create(["one", "two", "three", "four"])
8 | WriteToFirehose(
9 delivery_stream_name=self.delivery_stream_name,
10 jsonify=False,
11 multiline=False,
12 )
13 )
- Jsonify the element if it is not of the bytes, bytearray or file-like object. In this example, the second element is a list of integers, and it should be converted into JSON (
jsonify=True
). Or we can convert it into string manually.
1def test_write_to_firehose_with_list_elements(self):
2 with TestPipeline(options=self.pipeline_opts) as p:
3 output = (
4 p
5 | beam.Create([["one", "two", "three", "four"], [1, 2, 3, 4]])
6 | WriteToFirehose(
7 delivery_stream_name=self.delivery_stream_name,
8 jsonify=True,
9 multiline=False,
10 )
11 )
12 assert_that(output[None], equal_to([]))
13
14 bucket_contents = collect_bucket_contents(self.s3_client, self.bucket_name)
15 self.assertSetEqual(
16 set(bucket_contents), set(['"one""two""three""four"', "1234"])
17 )
- If an element is a tuple, its first element is applied to the
WriteToFirehose
transform.
1def test_write_to_firehose_with_tuple_elements(self):
2 with TestPipeline(options=self.pipeline_opts) as p:
3 output = (
4 p
5 | beam.Create([(1, ["one", "two", "three", "four"]), (2, [1, 2, 3, 4])])
6 | WriteToFirehose(
7 delivery_stream_name=self.delivery_stream_name,
8 jsonify=True,
9 multiline=False,
10 )
11 )
12 assert_that(output[None], equal_to([]))
13
14 bucket_contents = collect_bucket_contents(self.s3_client, self.bucket_name)
15 self.assertSetEqual(
16 set(bucket_contents), set(['"one""two""three""four"', "1234"])
17 )
- We can batch an element if it is not of the supported types. Note that, a new line character (
\n
) is appended to each record, and it is particularly useful for saving a CSV or JSONLines file to S3.
1def test_write_to_firehose_with_list_multilining(self):
2 with TestPipeline(options=self.pipeline_opts) as p:
3 output = (
4 p
5 | beam.Create(["one", "two", "three", "four"])
6 | BatchElements(min_batch_size=2, max_batch_size=2)
7 | WriteToFirehose(
8 delivery_stream_name=self.delivery_stream_name,
9 jsonify=False,
10 multiline=True,
11 )
12 )
13 assert_that(output[None], equal_to([]))
14
15 bucket_contents = collect_bucket_contents(self.s3_client, self.bucket_name)
16 self.assertSetEqual(set(bucket_contents), set(["one\ntwo\n", "three\nfour\n"]))
17
18def test_write_to_firehose_with_tuple_multilining(self):
19 with TestPipeline(options=self.pipeline_opts) as p:
20 output = (
21 p
22 | beam.Create([(1, "one"), (2, "three"), (1, "two"), (2, "four")])
23 | GroupIntoBatches(batch_size=2)
24 | WriteToFirehose(
25 delivery_stream_name=self.delivery_stream_name,
26 jsonify=False,
27 multiline=True,
28 )
29 )
30 assert_that(output[None], equal_to([]))
31
32 bucket_contents = collect_bucket_contents(self.s3_client, self.bucket_name)
33 self.assertSetEqual(set(bucket_contents), set(["one\ntwo\n", "three\nfour\n"]))
- Failed records after all trials are returned by a tagged output. We can configure the number of trials (
max_trials
) and whether to append error details (append_error
).
1class TestRetryLogic(unittest.TestCase):
2 # default failed output name
3 failed_output = "write-to-firehose-failed-output"
4
5 def test_write_to_firehose_retry_with_no_failed_element(self):
6 with TestPipeline() as p:
7 output = (
8 p
9 | beam.Create(["one", "two", "three", "four"])
10 | BatchElements(min_batch_size=4)
11 | WriteToFirehose(
12 delivery_stream_name="non-existing-delivery-stream",
13 jsonify=False,
14 multiline=False,
15 max_trials=3,
16 append_error=False,
17 fake_config={"num_success": 2},
18 )
19 )
20 assert_that(output[self.failed_output], equal_to([]))
21
22 def test_write_to_firehose_retry_failed_element_without_appending_error(self):
23 with TestPipeline() as p:
24 output = (
25 p
26 | beam.Create(["one", "two", "three", "four"])
27 | BatchElements(min_batch_size=4)
28 | WriteToFirehose(
29 delivery_stream_name="non-existing-delivery-stream",
30 jsonify=False,
31 multiline=False,
32 max_trials=3,
33 append_error=False,
34 fake_config={"num_success": 1},
35 )
36 )
37 assert_that(output[self.failed_output], equal_to(["four"]))
38
39 def test_write_to_firehose_retry_failed_element_with_appending_error(self):
40 with TestPipeline() as p:
41 output = (
42 p
43 | beam.Create(["one", "two", "three", "four"])
44 | BatchElements(min_batch_size=4)
45 | WriteToFirehose(
46 delivery_stream_name="non-existing-delivery-stream",
47 jsonify=False,
48 multiline=False,
49 max_trials=3,
50 append_error=True,
51 fake_config={"num_success": 1},
52 )
53 )
54 assert_that(
55 output[self.failed_output],
56 equal_to(
57 [("four", {"ErrorCode": "Error", "ErrorMessage": "This error"})]
58 ),
59 )