firehose_pyio

doc test release pypi python

Amazon Data Firehose is a fully managed service for delivering real-time streaming data to destinations such as Amazon Simple Storage Service (Amazon S3), Amazon Redshift, Amazon OpenSearch Service and Amazon OpenSearch Serverless. The Apache Beam Python I/O connector for Amazon Data Firehose (firehose_pyio) provides a data sink feature that facilitates integration with those services.

Installation

$ pip install firehose_pyio

Usage

The connector has the main composite transform (WriteToFirehose), and it expects a list or tuple PCollection element. If the element is a tuple, the tuple’s first element is taken. If the element is not of the accepted types, you can apply the GroupIntoBatches or BatchElements transform beforehand. Then, the records of the element are sent into a Firehose delivery stream using the put_record_batch method of the boto3 package. Note that the above batch transforms can also be useful to overcome the API limitation listed below.

  • Each PutRecordBatch request supports up to 500 records. Each record in the request can be as large as 1,000 KB (before base64 encoding), up to a limit of 4 MB for the entire request. These limits cannot be changed.

The transform also has options that control individual records as well as handle failed records.

  • jsonify - A flag that indicates whether to convert a record into JSON. Note that a record should be of bytes, bytearray or file-like object, and, if it is not of a supported type (e.g. integer), we can convert it into a Json string by specifying this flag to True.

  • multiline - A flag that indicates whether to add a new line character (\n) to each record. It is useful to save records into a CSV or Jsonline file.

  • max_trials - The maximum number of trials when there is one or more failed records - it defaults to 3. Note that failed records after all trials are returned, which allows users to determine how to handle them subsequently.

  • append_error - Whether to append error details to failed records. Defaults to True.

As mentioned earlier, failed elements are returned by a tagged output where it is named as write-to-firehose-failed-output by default. You can change the name by specifying a different name using the failed_output argument.

Example

If a PCollection element is key-value pair (i.e. keyed stream), it can be batched in group using the GroupIntoBatches transform before it is connected into the main transform.

import apache_beam as beam
from apache_beam import GroupIntoBatches
from firehose_pyio.io import WriteToFirehose

with beam.Pipeline(options=pipeline_options) as p:
    (
        p
        | beam.Create([(1, "one"), (2, "three"), (1, "two"), (2, "four")])
        | GroupIntoBatches(batch_size=2)
        | WriteToFirehose(
            delivery_stream_name=delivery_stream_name,
            jsonify=True,
            multiline=True,
            max_trials=3
        )
    )

For a list element (i.e. unkeyed stream), we can apply the BatchElements transform instead.

import apache_beam as beam
from apache_beam.transforms.util import BatchElements
from firehose_pyio.io import WriteToFirehose

with beam.Pipeline(options=pipeline_options) as p:
    (
        p
        | beam.Create(["one", "two", "three", "four"])
        | BatchElements(min_batch_size=2, max_batch_size=2)
        | WriteToFirehose(
            delivery_stream_name=delivery_stream_name,
            jsonify=True,
            multiline=True,
            max_trials=3
        )
    )

See Introduction to Firehose PyIO Sink Connector for more examples.

Contributing

Interested in contributing? Check out the contributing guidelines. Please note that this project is released with a Code of Conduct. By contributing to this project, you agree to abide by its terms.

License

firehose_pyio was created as part of the Apache Beam Python I/O Connectors project. It is licensed under the terms of the Apache License 2.0 license.

Credits

firehose_pyio was created with cookiecutter and the pyio-cookiecutter template.