15

I have a Firehose stream that is intended to ingest millions of events from different sources and of different event-types. The stream should deliver all data to one S3 bucket as a store of raw\unaltered data.

I was thinking of partitioning this data in S3 based on metadata embedded within the event message like event-souce, event-type and event-date.

However, Firehose follows its default partitioning based on record arrival time. Is it possible to customize this partitioning behavior to fit my needs?

Update: Accepted answer updated as a new answer suggests the feature is available as of Sep 2021

mowienay
  • 1,084
  • 2
  • 15
  • 30
  • Similar to: [Partitioning AWS Kinesis Firehose data to s3 by payload](https://stackoverflow.com/q/45432265/174777) – John Rotenstein Jul 13 '18 at 04:33
  • @JohnRotenstein Unfortunately answers do not address the question. Both suggesting attaching a lambda function that would route the incoming data based to different streams based on a particular ID. This and the other question was addressing whether it is possible to define the partitioning methodology for firehose. Thank you for the reference, though !! – mowienay Jul 13 '18 at 13:23

5 Answers5

9

Since September 1st, 2021, AWS Kinesis Firehose supports this feature. Read the announcement blog post here.

From the documentation:

You can use the Key and Value fields to specify the data record parameters to be used as dynamic partitioning keys and jq queries to generate dynamic partitioning key values. ...

Here is how it looks like from UI:

enter image description here enter image description here

Vlad Holubiev
  • 3,924
  • 6
  • 38
  • 54
5

No. You cannot 'partition' based upon event content.

Some options are:

  • Send to separate Firehose streams
  • Send to a Kinesis Data Stream (instead of Firehose) and write your own custom Lambda function to process and save the data (See: AWS Developer Forums: Athena and Kinesis Firehose)
  • Use Kinesis Analytics to process the message and 'direct' it to different Firehose streams

If you are going to use the output with Amazon Athena or Amazon EMR, you could also consider converting it into Parquet format, which has much better performance. This would require post-processing of the data in S3 as a batch rather than converting the data as it arrives in a stream.

John Rotenstein
  • 203,710
  • 21
  • 304
  • 382
4

As of writing this, the dynamic partitioning feature Vlad has mentioned is still pretty new. I needed it to be a part of CloudFormation template, which was still not properly documented. I had to add in DynamicPartitioningConfiguration to get it working properly. MetadataExtractionQuery syntax was also not properly documented.

  MyKinesisFirehoseStream:
    Type: AWS::KinesisFirehose::DeliveryStream
    ...
    Properties:
      ExtendedS3DestinationConfiguration:
        Prefix: "clients/client_id=!{client_id}/dt=!{timestamp:yyyy-MM-dd}/"
        ErrorOutputPrefix: "errors/!{firehose:error-output-type}/"
        DynamicPartitioningConfiguration:
          Enabled: "true"
          RetryOptions:
            DurationInSeconds: "300"
        ProcessingConfiguration:
          Enabled: "true"
          Processors:
            - Type: AppendDelimiterToRecord
            - Type: MetadataExtraction
              Parameters:
                - ParameterName: MetadataExtractionQuery
                  ParameterValue: "{client_id:.client_id}"
                - ParameterName: JsonParsingEngine
                  ParameterValue: JQ-1.6

Murali Varma
  • 73
  • 1
  • 6
  • 1
    It is April 2022 and there is still no documentation I could find on MetadataExtractionQuery. This answer was really helpful to figure out the syntax. Thanks! – BjornO Apr 22 '22 at 08:54
2

To build on John's answer, if you don't have the near real-time streaming requirements, we've found batch-processing with Athena to be a simple solution for us.

Kinesis streams to a given table unpartitioned_event_data, which can make use of the native record arrival time partitioning.

We define another Athena table partitioned_event_table which can be defined with custom partition keys and make use of the INSERT INTO capabilities that Athena has. Athena will automatically repartition your data in the format you want without requiring any custom consumers or new infrastructure to manage. This can be scheduled with a cron, SNS, or something like Airflow.

What's cool is you can create a view that does a UNION of the two tables to query historical and real-time data in one place.

We actually dealt with this problem at Radar and talk about more trade-offs in this blog post.

J Kao
  • 1,863
  • 2
  • 14
  • 16
1

To expand on Murali's answer, we have implemented it in CDK:

Our incomming json data looks something like this:

{
    "data": 
        {
        "timestamp":1633521266990,
        "defaultTopic":"Topic",
        "data":
        {
            "OUT1":"Inactive",
            "Current_mA":3.92
        }
    }
}

The CDK code looks as following:

const DeliveryStream = new CfnDeliveryStream(this, 'deliverystream', {
  deliveryStreamName: 'deliverystream',
  extendedS3DestinationConfiguration: {
    cloudWatchLoggingOptions: {
      enabled: true,
    },
    bucketArn: Bucket.bucketArn,
    roleArn: deliveryStreamRole.roleArn,
    prefix: 'defaultTopic=!{partitionKeyFromQuery:defaultTopic}/!{timestamp:yyyy/MM/dd}/',
    errorOutputPrefix: 'error/!{firehose:error-output-type}/',
    bufferingHints: {
      intervalInSeconds: 60,
    },
    dynamicPartitioningConfiguration: {
      enabled: true,
    },
    processingConfiguration: {
      enabled: true,
      processors: [
        {
          type: 'MetadataExtraction',
          parameters: [
            {
              parameterName: 'MetadataExtractionQuery',
              parameterValue: '{Topic: .data.defaultTopic}',
            },
            {
              parameterName: 'JsonParsingEngine',
              parameterValue: 'JQ-1.6',
            },
          ],
        },
        {
          type: 'AppendDelimiterToRecord',
          parameters: [
            {
              parameterName: 'Delimiter',
              parameterValue: '\\n',
            },
          ],
        },
      ],
    },
  },
})
  • Do you know how to use 2 fields as 2 separate values? – ArielB Dec 15 '21 at 09:20
  • sample parameter with 2 fields, { parameterName: 'MetadataExtractionQuery', parameterValue: '{Topic:.data.defaultTopic,out1:.data.data.OUT1}', } – MikA Apr 25 '22 at 16:14