top of page
Search

Exploring Strategies for Data Replication from RDS to Kinesis

Conner Verret, Senior Software Engineer


When it comes to managed database offerings from AWS, DynamoDB and RDS are the clear category leaders in their respective domains: schema-flexible NoSQL databases and relational databases. While both support data replication, DynamoDB simplifies it with DynamoDB Streams, a straightforward and versatile option for replicating data to various targets. In contrast, RDS lacks a true one-size-fits-all approach to data replication, requiring consideration between several different options.


To meet regulatory requirements, a client sought a method to implement Change Data Capture (CDC), a specific type of data replication that tracks and captures changes in data, allowing those changes to be replicated to other systems. In this case, these CDC events were to be propagated to a Kinesis data stream for consumption by downstream systems for auditing and business intelligence purposes.


In pursuing the optimal solution for our client, we explored several data replication implementations that propagate CDC events from an RDS instance to Kinesis data stream.


  1. RDS Activity Streams

  2. Invoking an AWS Lambda function from an RDS for PostgreSQL DB instance

  3. Database Migration Service


RDS Activity Streams would have been the preferred option, as it involves minimal setup, supports CDC and is free to use (other than the cost to write records to the destination Kinesis data stream). However, RDS Activity Streams are only supported by Oracle or SQL Server engines when using RDS, and MySQL or PostgreSQL engines when using Amazon Aurora on RDS. Migrating the database to Aurora PostgreSQL was not an option for our client, who was due to launch their service soon, so we continued weighing other options. However, RDS Activity Streams is an excellent choice for anyone using a supported database engine.


The next option we explored was to invoke AWS Lambda functions from our RDS for PostgreSQL DB instance. It was relatively straightforward to set up; we simply attached an IAM role to our RDS instance, giving it Lambda invocation permissions. From there, we connected to our database instance with psql to install the aws_lambda Postgres extension, which made it possible to invoke a Lambda function from the database with a simple command.

SELECT * from 
aws_lambda.invoke(aws_commons.create_lambda_function_arn('arn:aws:lambda:aws-region:444455556666:function:simple', 'us-west-1'), '{"body": "Hello from Postgres!"}'::json );

From there, we wrote a PostgreSQL function called invoke_lambda_on_change(). This function is meant to be called by a PostgreSQL trigger whenever an insert, update, or delete occurs on a given database table. The function then asynchronously invokes a Lambda which receives and writes the payload to our Kinesis data stream.

CREATE OR REPLACE FUNCTION invoke_lambda_on_change()
RETURNS trigger AS $$
DECLARE
  row_data json;
  lambda_payload json;
BEGIN
  IF TG_OP = 'INSERT' THEN
    row_data := row_to_json(NEW);
  ELSIF TG_OP = 'UPDATE' THEN
    row_data := json_build_object('old', row_to_json(OLD), 'new', row_to_json(NEW));
  ELSIF TG_OP = 'DELETE' THEN
    row_data := row_to_json(OLD);
  ELSE
    RETURN NULL;
  END IF;

  lambda_payload := json_build_object('type', TG_OP, 'data', row_data);
  PERFORM aws_lambda.invoke(
      aws_commons.create_lambda_function_arn('arn:aws:lambda:aws-region:444455556666:function:simple', 'us-west-1'),
    lambda_payload,
    'Event'
  ');
  RETURN NULL;
END;
$$ LANGUAGE plpgsql;

After the function is created on the PostgreSQL instance, the next step is to create a trigger attached to any tables that should trigger the function. In the example below, the function has been configured to trigger the Lambda after an insert, update or delete operation occurs on table_a.

CREATE TRIGGER table_a_trigger
AFTER INSERT OR UPDATE OR DELETE ON table_a
FOR EACH ROW EXECUTE FUNCTION invoke_lambda_on_change();

This solution would have been perfect if not for the caveat that PostgreSQL does not support asynchronous invocation of triggers. So, while the Lambda invocation within the trigger's function was itself asynchronous, the trigger was still synchronous, and thus impacted the response times for any insert, update, or delete operation on the table. In other words, while it showed promise initially, this was not a viable solution for our use case and would not have functioned well at scale.


If you're reading this and have worked with PostgreSQL in the past, you'd likely point out that LISTEN/NOTIFY is a solid option for this use case. LISTEN/NOTIFY allows a PostgreSQL database to send notifications to connected clients whenever a specified event occurs. By leveraging this mechanism in Postgres, we could have triggered a notification for every data change event on our tables and have an external service, which is constantly "listening" for these notifications, process and forward the data to our Kinesis data stream. This approach is ideal because LISTEN/NOTIFY operates asynchronously, unlike PostgreSQL triggers.


However, despite its suitability, we opted against this approach because it would necessitate building (and maintaining) a custom endpoint to handle these notifications. This added a layer of complexity, and the associated maintenance overhead did not align with our client's project timeline and operational goals. Thus, we were still in search of a more streamlined solution that could be implemented quickly and easily maintained. This is what led us to AWS Database Migration Service.


AWS Database Migration Service offers both a traditional DMS replication instance that requires manual provisioning, scaling, and ongoing management and a serverless DMS replication instance. The serverless offering supports fewer source and target endpoints compared to the traditional replication instance, but it does support RDS as a source and Kinesis as a target, so we decided to try it, as it provided auto-scaling and seemed like the better option in terms of maintenance over time.


Working with a serverless DMS replication instance is relatively simple. Before replication can begin, a source DMS endpoint and a target DMS endpoint must be created. These are configurations that provide connection information to DMS for your data store. In the case of RDS, our source endpoint needed a reference to the database endpoint, the database port, a username, and a password. To connect to our Kinesis data stream, our target endpoint simply needed the ARN of our data stream.


Beyond that, the DMS instance also needs a Security Group, so that the RDS instance's security group can whitelist ingress traffic from DMS. Additionally, since our DMS instance was running in a private VPC, and Kinesis is a public AWS service, we had to provision a VPC endpoint so that our DMS instance could securely connect to Kinesis from our VPC's private subnet. Finally, we had to create and associate an IAM role to our DMS instance that had an IAM policy that allowed our instance to write records to our Kinesis stream.


Once we validated the connection between our source DMS endpoint, serverless DMS replication instance, and target DMS endpoint, we were ready to configure the replication settings. DMS supports 3 types of replication, full load and change data capture, full load, and change data capture. Our client was only concerned with ongoing change data capture, and full load was not necessary, so the DMS instance was configured for CDC.


Before replication can begin, you must describe to DMS which source tables to perform replication on, and how that data should be transformed upon reaching its target. DMS allows you to define a JSON configuration file containing table mapping rules. Below is an example of a configuration file used for Change Data Capture from an RDS instance to a Kinesis Data Stream.

{
  "rules": [
    {
      "rule-type": "selection",
      "rule-id": "1",
      "rule-name": "CaptureTableASource",
      "rule-action": "include",
      "object-locator": {
        "schema-name": "public",
        "table-name": "table_a"
      }
    },
    {
      "rule-type": "object-mapping",
      "rule-id": "2",
      "rule-name": "MapDBRecordToKinesisRecord",
      "rule-action": "map-record-to-record",
      "object-locator": {
        "schema-name": "public",
        "table-name": "table_a"
      }
    }
  ]
}

DMS offers plenty of rule types, especially for full database migrations, but we only used two types of rules for CDC replication to Kinesis. The first is a selection rule instructing DMS which table(s) to capture events from. The second is an object-mapping rule, which instructs DMS to convert the database record into a JSON record, before finally writing the record to Kinesis.


In terms of pricing, DMS uses DMS Capacity Units, also known as DCU, to price its serverless offering. Each DCU is equal to 2GB of RAM. You can define a minimum and maximum DCU threshold, and the serverless DMS replication instance will autoscale within those boundaries.


After configuring the replication mappings and auto-scaling constraints, all that was left was to enable the replication. It takes around 45 minutes to prepare the resources the first time the replication is enabled. The AWS console for DMS provides a streamlined view of table statistics in the console so that you can easily track the number of inserts, updates, and deletes that have been replicated over time. It has worked well for our client as a solution to replicate data to a shared Kinesis data stream.


I hope that you come away from this blog post with a better understanding of the various options available to you for data replication in the context of RDS. 


Happy solution building!



References:


Comments


bottom of page