Automated Traffic Mirroring in AWS at Scale

Other Posts in This Series

  1. Automated Traffic Mirroring in AWS
  2. Automated Traffic Mirroring in AWS at Scale (you are here)
  3. Automated Traffic Mirroring in AWS with Full Coverage
  4. Automated Traffic Mirroring in AWS with Tags

Not Just Scale for Scale’s Sake!
Pricing for AWS Lambda is based on the number of requests for your functions and the duration, or the time it takes for your code to execute. The first post in the series, Automated Traffic Mirroring in AWS laid a solid foundation but the scale mechanics are missing. If you never expect to launch more than a handful of instances simultaneously and/or your applications are not prone to a high degree of dynamism, then you’re probably okay.

However, if you are dynamically scaling your instances horizontally (ie. EC2 Auto Scaling), and/or your organization enforces policies like rehydration where instances are periodically terminated and re-launched from golden images, you would absolutely need to decouple to ensure maximum coverage and minimum cost.

I will be using a documented method for asynchronously calling/invoking one Lambda function from another. I realize there are potentially more robust ways to accomplish this using SNS, AWS Step Functions, or even a simple Lambda Application.

However, for the purposes of simplicity and the fact that we only have two functions, I’ve opted for this approach to minimize setup and the number of moving parts for now. I may add a bonus post to the end of this series where I use one of these methods, but for now let’s keep things simple shall we?

Step 1: Update IAM Policy
We need to add the lambda:InvokeFunction and ec2:DescribeInstances actions to our IAM policy to allow our rx-vpctm-manager function to call the new rx-vpctm-create function, and to allow the new function to get full details for an EC2 instance. I’m choosing to have only one IAM Role to be used as the execution role for both functions. If you choose to have a distinct role for each function, instead of updating the existing policy attached to the existing role, you would need to create a new policy and role before creating your Lambda function in Step 2.

Again, for simplicity sake here I will stick with a single role/policy for both, so you would only need to update the existing policy we created in the first post of the series to include the new actions and the resource ARN for our second function:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "lambda:InvokeFunction",
        "ec2:CreateTags",
        "logs:CreateLogStream",
        "logs:PutLogEvents"
      ],
      "Resource": [
        "arn:aws:lambda:*:012345678901:function:rx-vpctm-create",
        "arn:aws:ec2:*:012345678901:traffic-mirror-session/*",
        "arn:aws:logs:*:012345678901:log-group:/aws/lambda/rx-vpctm-*:log-stream:*",
        "arn:aws:logs:*:012345678901:log-group:/aws/lambda/rx-vpctm-*"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "logs:CreateLogGroup",
        "ec2:DescribeInstances",
        "ec2:DescribeNetworkInterfaces",
        "ec2:DescribeTrafficMirrorTargets",
        "ec2:DescribeTrafficMirrorFilters",
        "ec2:CreateTrafficMirrorSession"
      ],
      "Resource": "*"
    }
  ]
}

Step 2: Create a new Function in Lambda
Now we’re ready to create our new function which needs nothing more than a single instanceId to do all the heavy lifting on that single instance, and return a success/failure status. I’m calling my function, rx-vpctm-create which will be called by my existing rx-vpctm-manager function:

IMPORTANT: When creating your Lambda function, be sure to choose the same role you updated or created in the previous step as the Execution Role instead of having a new basic execution role created for you, which is the default.

'use strict';

const AWS = require('aws-sdk')
const EC2 = new AWS.EC2({apiVersion:'2016-11-15'})

exports.handler = async (event) =>
{
	try
	{
		// Simple Safety Net
		if (! event.instanceId || ! /^i-/.test(event.instanceId)) {throw `Invalid Request: ${JSON.stringify(event)}`}

		// Initiate parallel setup calls
		let [instance, targets, filterId] = await Promise.all([

			// Get the full [instance] object using [instanceId]
			getInstance(event.instanceId),

			// Get all available traffic mirror [targets] in this REGION
			getTargets(),

			// Get the traffic mirror [filter] for this REGION
			getFilterId()
		])

		// Set the best [target] for each ENI attached to the [instance]
		instance = await setTargets(instance, targets)

		// Create a new VPC TM Session for each [eni] attached to [instance]
		await Promise.all(instance.nics.map(eni =>
		{
			let options = {
				NetworkInterfaceId: eni.id,
				SessionNumber: 1,
				TrafficMirrorFilterId: filterId,
				TrafficMirrorTargetId: eni.target,
				Description: 'Managed'
			}

			if (instance.tags.Name)
			{
				options.TagSpecifications = [
					{ResourceType:'traffic-mirror-session', Tags:[{Key:'Name', Value:instance.tags.Name}]}
				]
			}

			return EC2.createTrafficMirrorSession(options).promise().catch(err =>
			{
				// Catch and safely ignore "already in use" errors
				if (/SessionNumber [0-9]+ already in use/i.test(err.message)) {return}

				throw err.message
			})
		}))

		// All done ...
		return {
			statusCode: 200,
			body: `VPC Traffic Mirroring enabled for EC2 Instance: ${event.instanceId}`
		}
	}

	catch (err)
	{
		console.error(err.message || err)

		return {
			statusCode: (err.message ? 500 : 400),
			body: `Error enabling VPC Traffic Mirroring for EC2 Instance: ${event.instanceId}`,
			error: err.message || err
		}
	}
}

/* ========================================================================== >>
   WORKER FUNCTIONS
============================================================================= */
async function getInstance (instanceId)
{
	// Get the full [instance] object using [instanceId]
	const data = await EC2.describeInstances({'InstanceIds': [instanceId]}).promise()

	// Extract the single [instance] from the response [data]
	let instance = data.Reservations[0].Instances[0]

	// Ensure this is a Nitro instance
	if (! isNitro(instance.InstanceType))
	{throw `Instance not eligible for VPC Traffic Mirroring [isNitro]: ${instanceId} (${instance.InstanceType})`}

	// Setup Utility Variables
	instance.az = instance.Placement.AvailabilityZone
	instance.nics = getNics(instance.NetworkInterfaces)
	instance.tags = getTags(instance.Tags)

	return instance
}

async function getTargets ()
{
	// Get all available traffic mirror [targets] in this REGION
	const data = await EC2.describeTrafficMirrorTargets().promise()

	// Convert [TrafficMirrorTargets] into a hash table of [targets] by ENI
	let targets = {}
	data.TrafficMirrorTargets.forEach(t => (targets[t.NetworkInterfaceId] = {
		'id': t.TrafficMirrorTargetId
	}))

	return targets
}

async function getFilterId ()
{
	const data = await EC2.describeTrafficMirrorFilters().promise()

	if (! data.TrafficMirrorFilters || ! data.TrafficMirrorFilters.length)
	{throw `No Traffic Mirror Filters available in this region`}

	// Take the first [TrafficMirrorFilters] entry in this region
	return data.TrafficMirrorFilters[0].TrafficMirrorFilterId
}

async function setTargets (instance, targets)
{
	// Get full detail for each [target] ENI
	const data = await EC2.describeNetworkInterfaces({
		NetworkInterfaceIds: Object.keys(targets)
	}).promise()

	// Add applicable ENI properties to [targets] from [data], and filter out
	// [targets] not in the same [AvailabilityZone] as the [instance]
	for (const eni of data.NetworkInterfaces)
	{
		if (eni.AvailabilityZone !== instance.az)
		{
			delete targets[eni.NetworkInterfaceId]
			continue
		}

		targets[eni.NetworkInterfaceId] = {
			'id': targets[eni.NetworkInterfaceId].id,
			'subnet': eni.SubnetId,
			'vpc': eni.VpcId
		}
	}

	// Convert [targets] hash table to an Array of [targets]
	targets = Object.values(targets)
	if (! targets.length) {throw `No Traffic Mirror Targets available: ${instance.az}`}

	// Set the [target] for each [nic] on the [instance]
	instance.nics = instance.nics.map(nic =>
	{
		nic.target = false

		// Determine if a [target] exists in the same [subnet]
		for (const target of targets)
		{
			// Stop instantly: a local target is always ideal
			if (target.subnet === nic.subnet)
			{
				nic.target = target.id
				break
			}

			// Set the [target] based on VPC until something better is found
			if (target.vpc === nic.vpc) {nic.target = target.id}
		}

		// No [targets] in local [subnet] or [vpc], use first found in this AZ
		if (! nic.target) {nic.target = targets[0].id}

		return nic
	})

	return instance
}

/* ========================================================================== >>
   HELPER FUNCTIONS
============================================================================= */

// The [hypervisor] property on the [instance] object is not useful for this, so
// it's necessary to check the instance family manually.
// Reference: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/instance-types.html#ec2-nitro-instances
function isNitro (instanceType)
{
	const nitro = [
    'A1', 'C5', 'C5d', 'C5n', 'G4', 'I3en', 'Inf1', 'M5', 'M5a', 'M5ad', 'M5d',
    'M5dn', 'M5n', 'p3dn.24xlarge', 'R5', 'R5a', 'R5ad', 'R5d', 'R5dn', 'R5n',
    'T3', 'T3a', 'z1d', 'a1.metal', 'c5.metal', 'c5d.metal', 'c5n.metal',
    'i3.metal', 'i3en.metal', 'm5.metal', 'm5d.metal', 'r5.metal', 'r5d.metal',
    'u-6tb1.metal', 'u-9tb1.metal', 'u-12tb1.metal', 'u-18tb1.metal',
    'u-24tb1.metal', 'z1d.metal'
  ]
  return (new RegExp(`^(?:${nitro.join('|')})`, 'i').test(instanceType))
}

// Converts a [networkInterfaceSet] into a simple dictionary of [nics]
function getNics (networkInterfaceSet)
{
  if (! networkInterfaceSet || ! networkInterfaceSet.length) {return false}
  return networkInterfaceSet.map(nic => ({
    'id': nic.NetworkInterfaceId,
    'subnet': nic.SubnetId,
    'vpc': nic.VpcId,
    'owner': nic.OwnerId,
    'status': nic.Status
  }))
}

// Collapses a [tagSet] into a more usable hash table of [tags]
function getTags (tagSet)
{
  if (tagSet === undefined || ! tagSet.length) {return false}
  let tags = {}
  tagSet.forEach(tag => tags[tag.Key] = tag.Value)
  return tags
}

So what’s really happening here …
You will find that the code is heavily commented so I would ask you give it a read, you’ll be surprised how simple this all really is. For a more comprehensive tl;dr you can refer to the first post in the series as the functionality and order of operations here hasn’t changed, it’s just been decoupled into it’s own function for scale purposes. Moving along …

Step 3: Update the existing Manager Function in Lambda
As you would expect, now that the createTrafficMirrorSession logic has all been decoupled, you’ll need to update the original rx-vpctm-manager function with the following:

'use strict';

const AWS = require('aws-sdk')
const LAMBDA = new AWS.Lambda({apiVersion:'2015-03-31'})

// The name/alias of the Lambda function to invoke for creating VPC TM Sessions
const LAMBDA_CREATE = 'rx-vpctm-create'

exports.handler = async (event) =>
{
  try
  {
    // Safety Net for unsupported Operations
    if (event.detail.eventName !== 'RunInstances') {throw `Unsupported Operation: ${event.detail.eventName}`}

    // Convert [instancesSet] to an Array of [instanceIds]
    const instanceIds = event.detail.responseElements.instancesSet.items.map(i => (i.instanceId))

    // Create a VPC Traffic Mirror Session for all [instanceIds]
    await Promise.all(
      instanceIds.map(async id =>
        LAMBDA.invoke({
          FunctionName: LAMBDA_CREATE,
          Payload: JSON.stringify({instanceId:id})
        }).promise().then(response =>
        {
          // Record SUCCESS/ERROR for each [instanceId] individually
          let payload = JSON.parse(response.Payload || '{}')
          if (payload.statusCode === 200) {console.log(payload.body)}
          else {console.error(payload.body, payload.statusCode, payload.error)}
        })
      )
    )
  }
  catch (err) {console.error(err.message || err)}
}

So what’s really happening here …
This function is quickly turning into a proper manager where all infrastructure API calls have been moved to the new rx-vpctm-create function. This function now simply collects all the instanceIds into an Array, then asynchronously (ie. in parallel, non-blocking, fancy) passes each one off for processing.

Conclusion, for now …
Now this solution can really scale! There are always lots of other optimizations we can make here. For instance we could technically make a single API call to EC2.DescribeInstances in the rx-vpctm-manager Lambda function for all instances being launched and then pass those full objects to the rx-vpctm-create function instead of having rx-vpctm-create get the EC2 instance details by calling the API itself. In a scenario where you just launched 1000 instances, you would condense 1000 API calls into one call where you specify 1000 instanceIds to return details for. However, simplicity is the name of the game for this tutorial so I opted to fully decouple all EC2 API calls to the second function.

The next post in this series is focused on increasing the coverage of this solution by catching other events such as the StartInstances operation for existing stopped instances that are started , along with the AttachNetworkInterface operation for cases where one or more ENIs are attached to running instances, and even catching the DeleteTrafficMirrorSession operation to prevent accidental or malicious removal of traffic mirror sessions.


UPDATE [2020/01/06]: Both of the Lambda functions in this post were refactored based on awesome unicast feedback with a gentle reminder for this ex-software engineer to get with the times and use proper async and await syntax! Additionally, the isNitro() helper function in the rx-vpctm-create Lambda function was updated with the full list of Nitro instance types, and refactored to use a single RegExp test instead of Array.indexOf() because it’s 2020 now.

2 Likes