AWS examples in C# – AWS CLI commands

Last Updated on by

Post summary: Important AWS CLI commands used in AWS examples in C#.

This post is part of AWS examples in C# – working with SQS, DynamoDB, Lambda, ECS series. The code used for this series of blog posts is located in aws.examples.csharp GitHub repository.

Introduction

In AWS examples in C# – run the solution post I have described how to install/uninstall current examples. In the current post, I am going to show in detail individual commands used. The configuration parameters in the command below will be given with capital letters and starting with a dollar sign, e.g. $CONFIGURATION_PARAMETER. Each AWS command has its code representation in the SDK for the desired programming language.

AWS Command Line Interface

The AWS Command Line Interface (CLI) is a unified tool to manage AWS services. Control of multiple AWS services from the command line and automate them through scripts can be done with just one tool to download and configure. The full list of services that can be controlled is listed in the AWS Command Line Interface reference page. Each service has a subpage with a list of all available commands. All commands return JSON as a response. In a subsequent post, I will describe how to manage the JSON in the command line. All operations in the current post are done after AWS credentials are set as environment variables:

export AWS_ACCESS_KEY_ID=KIA57FV4.....
export AWS_SECRET_ACCESS_KEY=mSgsxOWVh...
export AWS_DEFAULT_REGION=us-east-1

SQS operations

The full list can be found in aws sqs CLI reference page. More information about SQS can be found in AWS examples in C# – create a service working with SQS post.

Create

Initially, all queues are listed with list-queues, in order to check if the queue already exists.

aws sqs list-queues

The queue is created with create-queue command, the result of the command returns the queue URL.

aws sqs create-queue --queue-name $QUEUE_NAME

After queues are created, the re-drive policy has to be set up. The ARN of the dead-letter queue can be obtained with get-queue-attributes command by providing the queue URL.

aws sqs get-queue-attributes \
	--queue-url $DEAD_LETTER_QUEUE_URL \
	--attribute-names QueueArn

The re-drive policy is set with set-queue-attributes command.

aws sqs set-queue-attributes \
	--queue-url $QUEUE_URL \
	--attributes "{\"RedrivePolicy\":\"{\\\"maxReceiveCount\\\":\\\"3\\\",\\\"deadLetterTargetArn\\\":\\\"$DEAD_LETTER_QUEUE_ARN\\\"}\",\"ReceiveMessageWaitTimeSeconds\":\"$LONG_POLLING_TIMEOUT\"}"

Delete

In order to delete the queue, its URL is needed. The URL is obtained with get-queue-url command.

aws sqs get-queue-url --queue-name $QUEUE_NAME

Deletion happens with delete-queue command.

aws sqs delete-queue --queue-url $QUEUE_URL

DynamoDB operations

The full list can be found in aws dynamodb CLI reference page. More information about DynamoDB can be found in AWS examples in C# – create a service working with DynamoDB post.

Create

The table data is obtained with describe-table command.

aws dynamodb describe-table --table-name $TABLE_NAME

If the table does not exist, it is created with create-table command. The table command has all the data needed. See more about table attributes in AWS examples in C# – create a service working with DynamoDB post.

aws dynamodb create-table \
	--table-name $TABLE_NAME \
	--attribute-definitions 'AttributeName=FirstName,AttributeType=S' 'AttributeName=LastName,AttributeType=S' \
	--key-schema 'AttributeName=FirstName,KeyType=HASH' 'AttributeName=LastName,KeyType=RANGE' \
	--provisioned-throughput 'ReadCapacityUnits=5,WriteCapacityUnits=5' \
	--stream-specification 'StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES'

Delete

The table is deleted by name with delete-table command.

aws dynamodb delete-table --table-name $TABLE_NAME

IAM roles operations

The full list can be found in aws iam CLI reference page.

Create

Roles are listed with list-roles command to check if the role exists.

aws iam list-roles

The role is created with create-role command.

aws iam create-role \
	--role-name $ROLE_NAME \
	--assume-role-policy-document file://assume-role-policy-document.json

This is the only case in the current examples where an additional JSON document is needed alongside a command. It is not possible to pass this JSON inline as it is with aws sqs set-queue-attributes command. This JSON allows certain services to be accessed by this role.

{
	"Version": "2012-10-17",
	"Statement": [
		{
			"Effect": "Allow",
			"Principal": {
				"Service": [
					"lambda.amazonaws.com",
					"ec2.amazonaws.com",
					"ecs.amazonaws.com",
					"ecs-tasks.amazonaws.com",
					"batch.amazonaws.com"
				]
			},
			"Action": "sts:AssumeRole"
		}
	]
}

List policies to get the policy ARN with list-policies command. Basically, to make things easier, AdministratorAccess existing policy is used with its ARN.

aws iam list-policies

Attach the policy to the role with attach-role-policy command.

aws iam attach-role-policy \
	--role-name $ROLE_NAME \
	--policy-arn $POLICY_ARN

Delete

List policies with list-policies command to get the ARN, then detach the policy from the role.

aws iam detach-role-policy \
	--role-name $ROLE_NAME \
	--policy-arn $POLICY_ARN

After the policy is detached, role is deleted with delete-role command.

aws iam delete-role --role-name $ROLE_NAME

AWS Lambda operations

The full list can be found in aws lambda CLI reference page.

Create

List functions with list-functions command to check if the function exists.

aws lambda list-functions

Creating a function is done with create-function command and takes many arguments. Most of the parameters are self-explanatory. Timeout is important, the lambda function execution is suspended after the timeout passes, in current examples, it is 30 seconds, I found that cold start could take up to 15 seconds some times. The lambda configurations are described in AWS examples in C# – create basic Lambda function post.

aws lambda create-function \
	--function-name $FUNCTION_NAME \
	--runtime dotnetcore2.1 \
	--role $ROLE_ARN \
	--handler $HANDLER_STRING_WITH_NAMESPACE_CLASS_METHOD \
	--environment "Variables={AWS_SQS_QUEUE_NAME=$QUEUE_NAME, AWS_SQS_IS_FIFO=$IS_QUEUE_FIFO}" \
	--timeout $FUNCTION_TIMEOUT \
	--zip-file fileb://$PATH_TO_ZIP_FILE)

Once the function is created, it can be linked to an event source, such as DynamoDB. This happens by DynamoDB stream ARN. Once a record is inserted, updated or deleted in DynamoDB, the lambda function is called with this event.

aws lambda create-event-source-mapping \
	--function-name $FUNCTION_NAME \
	--event-source-arn $DYNAMODB_STREAM_ARN \
	--starting-position LATEST)

In case of function already exists, but its code has to be updated, this is done with update-function-code command.

aws lambda update-function-code \
	--function-name $FUNCTION_NAME \
	--zip-file fileb://$PATH_TO_ZIP_FILE)

Along with the code, function configuration can be updated as well with update-function-configuration command.

aws lambda update-function-configuration \
	--function-name $FUNCTION_NAME  \
	--role $ROLE_ARN\
	--handler $HANDLER_STRING_WITH_NAMESPACE_CLASS_METHOD  \
	--environment "Variables={AWS_SQS_QUEUE_NAME=$QUEUE_NAME, AWS_SQS_IS_FIFO=$IS_QUEUE_FIFO}" \
	--timeout $FUNCTION_TIMEOUT

Delete

In order to delete, then the event source UUID has to be obtained, this is done with list-event-source-mappings command.

aws lambda list-event-source-mappings --function-name $FUNCTION_NAME

Then event source mapping is deleted with delete-event-source-mapping command.

aws lambda delete-event-source-mapping --uuid $EVNET_SOURCE_UUID

And finally, the function itself is deleted with delete-function command.

aws lambda delete-function --function-name $FUNCTION_NAME

ECS (Elastic Container Service) operations

The full list can be found in aws ecs CLI reference page.

Create

Before doing anything with ECR, docker login command should be created with get-login, so docker is authenticated with AWS ECR. With eval function, the docker login command is directly executed.

eval $(aws ecr get-login --no-include-email)

Clusters are first listed, in order to evaluate if the application is already deployed.

aws ecs list-clusters

Cluster is created with create-cluster command. A cluster consists of services.

aws ecs create-cluster --cluster-name $CLUSTER_NAME

Existing task definitions are listed, to evaluate whether they are published or not. Task definitions are Docker configurations.

aws ecs describe-task-definition --task-definition $TASK_DEFINITION_NAME

Task definition is created with register-task-definition command.

aws ecs register-task-definition \
	--family $TASK_DEFINITION_NAME \
	--execution-role-arn $ROLE_ARN\
	--network-mode awsvpc \
	--container-definitions $CONTAINER_DEFINITIONS \
	--requires-compatibilities "FARGATE" \
	--cpu "256" \
	--memory "512"

$CONTAINER_DEFINITIONS is a Docker configuration which defines the task definition:

name=$TASK_DEFINITION_NAME,\
image=$IMAGE_TAG,\
environment=[\
	{name=AwsQueueIsFifo,value=$_IS_QUEUE_FIFO},\
	{name=AwsRegion,value=$REGION},\
	{name=AwsQueueName,value=$QUEUE_NAME},\
	{name=AwsAccessKey,value=$AWS_ACCESS_KEY},\
	{name=AwsSecretKey,value=$AWS_SECRET_KEY},\
	{name=AwsQueueAutomaticallyCreate,value=$AWS_QUEUE_AUTO_CREATE},\
	{name=AwsQueueLongPollTimeSeconds,value=$AWS_POLL_TIME_SECONDS}\
],\
logConfiguration={\
	logDriver=awslogs,\
	options={\
		awslogs-group=ecs/$SERVICE_NAME,\
		awslogs-region=$REGION,\
		awslogs-stream-prefix=ecs\
	}\
}

Before creating a service, existing ones are listed with describe-services command. Service has one or more running instances of a task definition. This is how service can scale.

aws ecs describe-services \
	--cluster $CLUSTER_NAME\
	--services $SERVICE_NAME

Creating a service is done with create-service command. $TASK_REVISION is the result of the register-task-definition command. $SUBNET_ID is returned by aws ec2 describe-subnets command.

aws ecs create-service --cluster $CLUSTER_NAME \
	--service-name $SERVICE_NAME \
	--task-definition "$TASK_DEFINITION_NAME:$TASK_REVISION" \
	--desired-count 1 \
	--launch-type "FARGATE" \
	--network-configuration "awsvpcConfiguration={subnets=[$SUBNET_ID],securityGroups=[$SECURITY_GROUP_ID],assignPublicIp=ENABLED}")

Updating of the service is done with a very update-service similar command.

aws ecs update-service --cluster $CLUSTER_NAME \
	--service $SERVICE_NAME \
	--task-definition "$TASK_DEFINITION_NAME:$TASK_REVISION" \
	--desired-count 1 \
	--network-configuration "awsvpcConfiguration={subnets=[$SUBNET_ID],securityGroups=[$SECURITY_GROUP_ID],assignPublicIp=ENABLED}")

Delete

In order to delete task definitions, they should be first listed with list-task-definitions command, so the task definition version is available.

aws ecs list-task-definitions

Removing of the task definition is done with deregister-task-definition command. Note that the command does what it says, deregister, it does not delete. The task definition is kept in history in status INACTIVE.

aws ecs deregister-task-definition --task-definition "$TASK_DEFINITION_VERSION"

Deleting the service is done with the delete-service command, the –force parameter also stops the running tasks.

aws ecs delete-service \
	--cluster $CLUSTER_NAME \
	--service $SERVICE_NAME \
	--force

In the end, the whole cluster is deleted with delete-cluster command.

aws ecs delete-cluster --cluster $CLUSTER_NAME

ECR (Elastic Container Registry) operations

The full list can be found in aws ecr CLI reference page.

Delete

The repository is created by Docker when the image is pushed to it. Repository and images inside are deleted with delete-repository command.

aws ecr delete-repository \
	--repository-name $REPOSITORY_NAME \
	--force

EC2 (Elastic Compute Cloud) operations

The full list can be found in aws ec2 CLI reference page.

Create

EC2 is responsible for security groups, which expose the service to the world by applying firewall rules. Before creating the group, it is first searched for presence with describe-security-groups command.

aws ec2 describe-security-groups

The security group is created with create-security-group command.

aws ec2 create-security-group \
	--description $SECIRITY_GROUP_DESCRIPTION\
	--group-name $SECIRITY_GROUP_NAME

Inbound rules are defined with authorize-security-group-ingress command, where ip_permission is a bash function generation the JSON for better reuse.

aws ec2 authorize-security-group-ingress \
	--group-id $SECURITY_GROUP_ID \
	--ip-permissions "[$(ip_permission $SERVICE_PORT)]"

Function generation firewall rule JSON, $1 is an argument given to the function.

function ip_permission() {
	echo "{\"IpProtocol\": \"tcp\", \"FromPort\": $1, \"ToPort\": $1, \"IpRanges\": [{\"CidrIp\": \"0.0.0.0/0\", \"Description\": \"Port $1\"}]}"
}

Subnets are listed with describe-subnets command. Each subnet has 3 availability zones.

aws ec2 describe-subnets

Finally, in order to report the IP of the deployed service, describe-network-interfaces command is used.

aws ec2 describe-network-interfaces --filters "Name=network-interface-id,Values=$networkInterfaceId"

Delete

A security group is deleted by name with delete-security-group command.

aws ec2 delete-security-group --group-name $SECURITY_GROUP

CloudWatch operations

The full list can be found in aws logs CLI reference page.

Delete

CloudWatch logs are created by default from the services. Deleting the logs is done with delete-log-group command. Note that I am using Git Bash on Windows and MSYS_NO_PATHCONV=1 is mandatory because the log group name starts with /.

MSYS_NO_PATHCONV=1 aws logs delete-log-group --log-group-name ecs/$SERVICE_NAME

Conclusion

AWS command-line interface provides tooling to handle all needed operations of the AWS services. It is the preferred way to manage services over the Web user interface.

Related Posts

Read more...

AWS examples in C# – create a service working with SQS

Last Updated on by

Post summary: To give a basic overview of AWS SQS, how to write a message to it and how to make a consumer that constantly polls the queue for new messages.

This post is part of AWS examples in C# – working with SQS, DynamoDB, Lambda, ECS series. The code used for this series of blog posts is located in aws.examples.csharp GitHub repository.

Event-driven architecture

I would like to briefly touch the topic of event-driven architecture since message service providers, such as SQS or RabbitMQ are the basis of its implementation. This is a software architecture paradigm promoting the production, detection, consumption of, and reaction to events. An event is a significant change in the state of an object, to which someone might be interested in. All communication happens asynchronously and systems are loosely coupled. An event-driven system typically consists of event emitters, event consumers, and event channels. Emitters have the responsibility to detect, gather, and transfer events. Emitters do not know the consumers of the events, they do not even know if a consumer exists. Consumers have the responsibility of applying a reaction as soon as an event is presented in a dedicated channel. This leads to the pattern commonly known as eventual consistency, which pushes the complexity of consistency to the application tier, which is the biggest challenge to solve in an event-driven architecture.

Apart from SQS, there is even more sophisticated service from AWS called EventBridge, which makes it easy to build event-driven applications because it takes care of event ingestion and delivery, security, authorization, and error handling. It is basically a serverless event bus that makes it easy to connect applications together using data from its own applications, integrated Software-as-a-Service (SaaS) applications, and AWS services.

AWS SQS

SQS stands for Simple Queue Service, it is a fully managed message queuing service that enables to decouple and scale microservices, distributed systems, and serverless applications. SQS eliminates the complexity and overhead associated with managing and operating message-oriented middleware and empowers developers to focus on differentiating work.

Types of queues

SQS offers two types of message queues:

  • Standard queues – they offer maximum throughput, best-effort ordering, and at-least-once delivery. This means there is no guaranteed order and messages can be duplicated.
  • FIFO queues – they are designed to guarantee that messages are processed exactly once, in the exact order that they are sent.

Dead-letter queue

In addition to those, there is a special type of queues, called dead-letter queues. They are used mainly for debugging and failure proofing applications. If a message cannot be successfully processed after several retries from one of the source queues above, it ends in the dead-letter queue, from which it can be analyzed and returned back to source queue for reprocessing.

Message processing

It is important to know how SQS operates, in order to make good architectural decisions. When a message is published to the queue it becomes visible. When some consumer reads the message, then the message becomes not visible, but still present in the queue, its status now is in-flight. There is visibility timeout which by default is 30 seconds, the maximum value is 12 hours. After the visibility timeout passes then the message is visible again to be read by consumers. In case there is no dead-letter queue, this process happens over and over until the message retention period is reached, afterward message gets automatically deleted. The retention period default value is 4 days, the maximum value is 14 days. In case of a dead-letter queue, after the message cannot be processed for more than maximum receive count times, then it goes to dead-letter queue and stays in the dead-letter for its message retention period. See more info on SQS on How Amazon SQS Works page.

Architectural approaches

One queue or many queues

Since many event emitters can write messages to the queue it gets tricky to process the messages properly. One option is to have a separate queue for separate types of messages, another option is to put some metadata into the messages. I have decided to go for the solution with one queue because I have just one consumer which knows which message processor to call and thus simplify the code. In the case of many SQS queues, there should be many consumers defined in the code, which is better to split into many micro-services, for each SQS queue.

Dead-letter

I would say a dead-letter queue with the maximum retention period of 14 days is a good idea. In this case, messages can be quarantined which will not slow down the normal queue operations. In the case of no dead-letter queue and default timeouts, if a message cannot be processed, then it will appear every 30 seconds for a period of 4 days, this makes 2880 times a day, 11520 times in total. Now imagine there are thousands of messages like this one. I have decided to go for a dead-letter queue with the default retention period.

Long polling

Long polling is another aspect that has to be considered. It can be enabled in two ways. One is on a queue level, by setting the ReceiveMessageWaitTimeSeconds when creating the queue, it can be from 1 to 20 seconds. Other way to enable it is when messages are read from the queue, there is WaitTimeSeconds setting in the request, which can be from 1 to 20 seconds. In case both options are combined, then WaitTimeSeconds takes precedence.

Unknown messages

Another architectural decision in case there is only one queue is what to be done with unknown messages. In the case of no dead-letter queue, messages are good to be deleted, otherwise, they will keep showing for the queue’s retention period. I throw an error in the logs and after 3 unsuccessful attempts, which is the receive count times I have configured, the message goes to the dead-letter queue.

Standard vs. FIFO queues

SQS is able to handle a high amount of messages, theoretically an unlimited amount of messages per second. Standard SQS queue does not maintain any order of messages and also it is possible that there is a duplication of messages delivery. For this reason, AWS offers a FIFO (First-In-First-Out) queue, they provide message order and ensure exactly-once processing. The limitation of the FIFO queue is its number of transactions per second, which are 300 messages per second or 3000 if they are in batched mode.

SQS queue operations at a glance

In AWS examples in C# – basic SQS queue operations post following the operations briefed below were described in more details:

  • Create queue with dead-letter queue
  • Read messages from the queue
  • Write a message to the queue (comes in two flavors)
  • Delete messages to the queue
  • Move messages from dead-letter to source queue

Creating SQS message consumer

In order to read the messages, there should be a consumer that constantly polls the queue and processes the messages. ProcessMessageAsync uses the strategy design pattern to get the proper message processor based on MessageType attribute. Processors are stored in _messageProcessors which is IEnumerable<IMessageProcessor> and is injected by .NET Core dependency injection. If a processor is found, then the processor is invoked, if not an error is shown in the logs. This logic can be subject to change if unknown messages are tolerated in the queue. In ProcessAsync method there is a while loop, which constantly reads for messages by _sqsClient which SqsClient class described in previous sections. SQS returns the response if there are some messages or if WaitTimeSeconds time expired when reading the message or ReceiveMessageWaitTimeSeconds configured by AwsQueueLongPollTimeSeconds environment variable has expired. This while loop is a little tricky to unit test though as it consumes the main thread, and the mocked object should be instructed to wait. Everything is controlled by a CancellationTokenSource, when this is canceled, then consumption is stopped.

ProcessMessageAsync

private async Task ProcessMessageAsync(Message message)
{
	try
	{
		var messageType = message.MessageAttributes.GetMessageTypeAttributeValue();
		if (messageType == null)
		{
			throw new Exception($"No 'MessageType' attribute present in message {JsonConvert.SerializeObject(message)}");
		}

		var processor = _messageProcessors.SingleOrDefault(x => x.CanProcess(messageType));
		if (processor == null)
		{
			throw new Exception($"No processor found for message type '{messageType}'");
		}

		await processor.ProcessAsync(message);
		await _sqsClient.DeleteMessageAsync(message.ReceiptHandle);
	}
	catch (Exception ex)
	{
		_logger.LogError(ex, $"Cannot process message [id: {message.MessageId}, receiptHandle: {message.ReceiptHandle}, body: {message.Body}] from queue {_sqsClient.GetQueueName()}");
	}
}

ProcessAsync

private async void ProcessAsync()
{
	try
	{
		while (!_tokenSource.Token.IsCancellationRequested)
		{
			var messages = await _sqsClient.GetMessagesAsync(_tokenSource.Token);
			messages.ForEach(async x => await ProcessMessageAsync(x));
		}
	}
	catch (OperationCanceledException)
	{
		//operation has been canceled but it shouldn't be propagated
	}
}

StartConsuming

public void StartConsuming()
{
	if (!IsConsuming())
	{
		_tokenSource = new CancellationTokenSource();
		ProcessAsync();
	}
}

private bool IsConsuming()
{
	return _tokenSource != null && !_tokenSource.Token.IsCancellationRequested;
}

Message processors

In the current example, I have taken the architectural design decision to have one queue and different messages into it. For each different type of message, there is a relevant processor. With the strategy design pattern, the appropriate message processor is picked based on MessageType attribute. Processors implement a very simple interface IMessageProcessor. In the current example, they take the message as a string, serialize it to an object and save this object to DynamoDB. A sample implementation is shown below:

IMessageProcessor

public interface IMessageProcessor
{
	bool CanProcess(string messageType);
	Task ProcessAsync(Message message);
}

ActorMessageProcessor

public bool CanProcess(string messageType)
{
	return messageType == typeof(Actor).Name;
}

public async Task ProcessAsync(Message message)
{
	var actor = JsonConvert.DeserializeObject<Actor>(message.Body);
	await _actorsRepository.SaveActorAsync(actor);
	_logger.LogInformation($"ActorMessageProcessor invoked with: {message.Body}");
}

AWS ECS and AWS ECR

ECS stands for Elastic Container Service is a fully managed container orchestration service. Containers can be run in clusters using AWS Fargate, which is a serverless compute for containers. Fargate removes the need to provision and manage servers, lets you specify and pay for resources per application, and improves security through application isolation by design.

ECR stands for Elastic Container Registry is a fully-managed Docker container registry that makes it easy for developers to store, manage, and deploy Docker container images. ECR is integrated with ECS, eliminating the need to operate own container repositories or worry about scaling the underlying infrastructure.

SqsWriter and SqsReader

SqsWriter is a .NET Core 3.0 application, that is dockerized and run in AWS ECS with Fargate, and its container images are stored in ECR. It exposes an API that can be used to publish Actor or Movie objects as messages with separate MessageType attributes in the SQS queue.

SqsReader is a .NET Core 3.0 application, that is dockerized and run in AWS ECS with Fargate, and its container images are stored in ECR. It has a consumer that listens to the SQS queue and processes the messages by writing them into appropriate AQS DynamoDB tables. It also exposes API to stop or start processing, as long as reprocess the dead-letter queue or simply get the queue status.

More information on how to run the solution can be found in AWS examples in C# – run the solution post.

Conclusion

In the current post, I have given some concepts of event-driven architecture and how SQS fits in it. Also, I have described some architectural considerations when using SQS queues, such as dead-letter queues, one queue with different message type or several queues, etc. In the end, I have given practical code on how to make a consumer for the SQS queue.

Related Posts

Read more...

AWS examples in C# – basic SQS queue operations

Last Updated on by

Post summary: Code examples of how to perform basic SQS queue operations like reading, writing, deleting, creating a queue, etc.

This post is part of AWS examples in C# – working with SQS, DynamoDB, Lambda, ECS series. The code used for this series of blog posts is located in aws.examples.csharp GitHub repository. In the current post, I will put in practice example basic SQS operations, a more detailed description of their usage is available in AWS examples in C# – create a service working with SQS post.

Instantiate Amazon SQS client

In the current examples, I use a configuration class called AppConfig. Its values are injected from the environment variables by .NET Core framework in Startup class. In order to work with SQS, a client is needed. The SQS client interface is called IAmazonSQS and comes from AWS C# SDK. The NuGet package is called AWSSDK.SQS, which in the current example comes as a sub-reference from Automationrhapsody.Aws.Examples.Models NuGet package. The concrete AWS client implementation is AmazonSQSClient and a singleton object is instantiated in SqsClientFactory class, where RegionEndpoint is used to instantiate AmazonSQSConfig. I use the AwsCredentials class which extends the AWS’ abstract AWSCredentials in order to manage the credentials.

SqsClientFactory.cs

public static AmazonSQSClient CreateClient(AppConfig appConfig)
{
	var sqsConfig = new AmazonSQSConfig
	{
		RegionEndpoint = RegionEndpoint.GetBySystemName(appConfig.AwsRegion)
	};
	var awsCredentials = new AwsCredentials(appConfig);
	return new AmazonSQSClient(awsCredentials, sqsConfig);
}

AwsCredentials.cs

public class AwsCredentials : AWSCredentials
{
	private readonly AppConfig _appConfig;

	public AwsCredentials(AppConfig appConfig)
	{
		_appConfig = appConfig;
	}

	public override ImmutableCredentials GetCredentials()
	{
		return new ImmutableCredentials(_appConfig.AwsAccessKey,
						_appConfig.AwsSecretKey, null);
	}
}

AppConfig.cs

public class AppConfig
{
	private const string FifoSuffix = ".fifo";
	private string _queueName;

	public string AwsRegion { get; set; }
	public string AwsAccessKey { get; set; }
	public string AwsSecretKey { get; set; }
	public string AwsQueueName
	{
		get => AwsQueueIsFifo ? _queueName + FifoSuffix : _queueName;
		set => _queueName = value;
	}
	public string AwsDeadLetterQueueName
	{
		get
		{
			var deadLetter = _queueName + "-exceptions";
			return AwsQueueIsFifo ? deadLetter + FifoSuffix : deadLetter;
		}
	}

	public bool AwsQueueAutomaticallyCreate { get; set; }
	public bool AwsQueueIsFifo { get; set; }
	public int AwsQueueLongPollTimeSeconds { get; set; }
}

Startup.cs

public Startup()
{
	var configurationBuilder = new ConfigurationBuilder()
		.AddEnvironmentVariables();
}

Local SqsClient dependencies

This sample code shows what external dependencies the SqsClient class needs. They are injected into the constructor by .NET Core dependency injection.

private readonly AppConfig _appConfig;
private readonly IAmazonSQS _sqsClient;
private readonly ILogger<SqsClient> _logger;
private readonly ConcurrentDictionary<string, string> _queueUrlCache;

public SqsClient(IOptions<AppConfig> awsConfig, 
	IAmazonSQS sqsClient, ILogger<SqsClient> logger)
{
	_appConfig = awsConfig.Value;
	_sqsClient = sqsClient;
	_logger = logger;
	_queueUrlCache = new ConcurrentDictionary<string, string>();
}

Create SQS queue and dead-letter queue

Queues can be created programmatically, something that will be described in the current post. Another option is to create them from the AWS CLI, see more information in AWS examples in C# – deploy with AWS CLI commands post.

Once the client is in place, then the queue and dead-letter queue is created with the code below. The code snippet also enables long polling for the queue, which allows reducing costs while allowing consumers to receive messages as soon as they arrive in the queue. Basically SQS waits until a message is available in a queue before sending a response.

public async Task CreateQueueAsync()
{
	const string arnAttribute = "QueueArn";

	try
	{
		var createQueueRequest = new CreateQueueRequest();
		if (_appConfig.AwsQueueIsFifo)
		{
			createQueueRequest.Attributes.Add("FifoQueue", "true");
		}

		createQueueRequest.QueueName = _appConfig.AwsQueueName;
		var createQueueResponse = await _sqsClient.CreateQueueAsync(createQueueRequest);
		createQueueRequest.QueueName = _appConfig.AwsDeadLetterQueueName;
		var createDeadLetterQueueResponse = await _sqsClient.CreateQueueAsync(createQueueRequest);

		// Get the the ARN of dead letter queue and configure main queue to deliver messages to it
		var attributes = await _sqsClient.GetQueueAttributesAsync(new GetQueueAttributesRequest
		{
			QueueUrl = createDeadLetterQueueResponse.QueueUrl,
			AttributeNames = new List<string> { arnAttribute }
		});
		var deadLetterQueueArn = attributes.Attributes[arnAttribute];

		// RedrivePolicy on main queue to deliver messages to dead letter queue if they fail processing after 3 times
		var redrivePolicy = new
		{
			maxReceiveCount = "3",
			deadLetterTargetArn = deadLetterQueueArn
		};
		await _sqsClient.SetQueueAttributesAsync(new SetQueueAttributesRequest
		{
			QueueUrl = createQueueResponse.QueueUrl,
			Attributes = new Dictionary<string, string>
			{
				{"RedrivePolicy", JsonConvert.SerializeObject(redrivePolicy)},
				// Enable Long polling
				{"ReceiveMessageWaitTimeSeconds", _appConfig.AwsQueueLongPollTimeSeconds.ToString()}
			}
		});
	}
	catch (Exception ex)
	{
		_logger.LogError(ex, $"Error when creating SQS queue {_appConfig.AwsQueueName} and {_appConfig.AwsDeadLetterQueueName}");
	}
}

Read messages from the SQS queue

Reading is done with the given code, where _queueUrlCache is ConcurrentDictionary<string, string>. Queue URL is cached for better performance in GetQueueUrl method.

GetMessagesAsync

public async Task<List<Message>> GetMessagesAsync(string queueName, CancellationToken cancellationToken = default)
{
	var queueUrl = await GetQueueUrl(queueName);

	try
	{
		var response = await _sqsClient.ReceiveMessageAsync(new ReceiveMessageRequest
		{
			QueueUrl = queueUrl,
			WaitTimeSeconds = _appConfig.AwsQueueLongPollTimeSeconds,
			AttributeNames = new List<string> { "ApproximateReceiveCount" },
			MessageAttributeNames = new List<string> { "All" }
		}, cancellationToken);

		if (response.HttpStatusCode != HttpStatusCode.OK)
		{
			throw new AmazonSQSException($"Failed to GetMessagesAsync for queue {queueName}. Response: {response.HttpStatusCode}");
		}

		return response.Messages;
	}
	catch (TaskCanceledException)
	{
		_logger.LogWarning($"Failed to GetMessagesAsync for queue {queueName} because the task was canceled");
		return new List<Message>();
	}
	catch (Exception)
	{
		_logger.LogError($"Failed to GetMessagesAsync for queue {queueName}");
		throw;
	}
}

GetQueueUrl

private async Task<string> GetQueueUrl(string queueName)
{
	if (string.IsNullOrEmpty(queueName))
	{
		throw new ArgumentException("Queue name should not be blank.");
	}

	if (_queueUrlCache.TryGetValue(queueName, out var result))
	{
		return result;
	}

	try
	{
		var response = await _sqsClient.GetQueueUrlAsync(queueName);
		return _queueUrlCache.AddOrUpdate(queueName, response.QueueUrl, (q, url) => url);
	}
	catch (QueueDoesNotExistException ex)
	{
		throw new InvalidOperationException($"Could not retrieve the URL for the queue '{queueName}' as it does not exist or you do not have access to it.", ex);
	}
}

Write a message to the SQS queue

The current example is to write a single message to the queue. AWS SDK offers a method called SendMessageBatchAsync, which can send a group of messages. Because of the nature of the example application, the use of SendMessageBatchAsync is not needed. Writing comes in two flavors. With generic method accepting object instance or with method accepting message text and message type.

In the case of a FIFO queue, there are two more values to be set. One is the MessageGroupId, so messages from the same group are always processed one by one. In the current example, messages are grouped by type. Another mandatory thing is MessageDeduplicationId, which used by SQS for deduplication of sent messages. If a message with a particular message deduplication ID is sent successfully, any messages sent with the same message deduplication ID are accepted successfully but aren’t delivered during the 5-minute deduplication interval.

PostMessageAsync<T>

public async Task PostMessageAsync<T>(string queueName, T message)
{
	var queueUrl = await GetQueueUrl(queueName);

	try
	{
		var sendMessageRequest = new SendMessageRequest
		{
			QueueUrl = queueUrl,
			MessageBody = JsonConvert.SerializeObject(message),
			MessageAttributes = SqsMessageTypeAttribute.CreateAttributes<T>()
		};
		if (_appConfig.AwsQueueIsFifo)
		{
			sendMessageRequest.MessageGroupId = typeof(T).Name;
			sendMessageRequest.MessageDeduplicationId = Guid.NewGuid().ToString();
		}

		await _sqsClient.SendMessageAsync(sendMessageRequest);
	}
	catch (Exception ex)
	{
		_logger.LogError(ex, $"Failed to PostMessagesAsync to queue '{queueName}'. Exception: {ex.Message}");
		throw;
	}
}

PostMessageAsync

public async Task PostMessageAsync(string queueName, string messageBody, string messageType)
{
	var queueUrl = await GetQueueUrl(queueName);

	try
	{
		var sendMessageRequest = new SendMessageRequest
		{
			QueueUrl = queueUrl,
			MessageBody = messageBody,
			MessageAttributes = SqsMessageTypeAttribute.CreateAttributes(messageType)
		};
		if (_appConfig.AwsQueueIsFifo)
		{
			sendMessageRequest.MessageGroupId = messageType;
			sendMessageRequest.MessageDeduplicationId = Guid.NewGuid().ToString();
		}

		await _sqsClient.SendMessageAsync(sendMessageRequest);
	}
	catch (Exception ex)
	{
		_logger.LogError(ex, $"Failed to PostMessagesAsync to queue '{queueName}'. Exception: {ex.Message}");
		throw;
	}
}

Distinguishing messages in the queue

Since many event emitters can write messages to the queue it gets tricky to process the messages properly. One option is to have a separate queue for separate types of messages, another option is to put some metadata into the messages. I have decided to go for the solution with one queue because I have just one consumer which knows which message processor to call. In the case of many consumers, it is recommended to have several SQS queues, so the consumer does not need to read and disregard messages, this is not optimal.

Every message is added additional MessageAttributes. In the example above this is done with SqsMessageTypeAttribute.CreateAttributes(messageType) extension method, available in Automationrhapsody.Aws.Examples.Models NuGet package, which is also part of the examples code, is located in Models project. What this method does is to add MessageType string attribute, where the value is typeof(T).Name.

public static class SqsMessageTypeAttribute
{
	private const string AttributeName = "MessageType";

	public static string GetMessageTypeAttributeValue(this Dictionary<string, MessageAttributeValue> attributes)
	{
		return attributes.SingleOrDefault(x => x.Key == AttributeName).Value?.StringValue;
	}

	public static Dictionary<string, MessageAttributeValue> CreateAttributes<T>()
	{
		return CreateAttributes(typeof(T).Name);
	}

	public static Dictionary<string, MessageAttributeValue> CreateAttributes(string messageType)
	{
		return new Dictionary<string, MessageAttributeValue>
		{
			{
				AttributeName, new MessageAttributeValue
				{
					DataType = nameof(String),
					StringValue = messageType
				}
			}
		};
	}
}

Delete message from the queue

Once the message is processed, it should be removed from the queue. This is done with the following method:

public async Task DeleteMessageAsync(string queueName, string receiptHandle)
{
	var queueUrl = await GetQueueUrl(queueName);

	try
	{
		var response = await _sqsClient.DeleteMessageAsync(queueUrl, receiptHandle);

		if (response.HttpStatusCode != HttpStatusCode.OK)
		{
			throw new AmazonSQSException($"Failed to DeleteMessageAsync with for [{receiptHandle}] from queue '{queueName}'. Response: {response.HttpStatusCode}");
		}
	}
	catch (Exception)
	{
		_logger.LogError($"Failed to DeleteMessageAsync from queue {queueName}");
		throw;
	}
}

Reprocess messages from dead-letter queue

If there is a problem with message processing, they are moved to the dead-letter queue. There might be a specific bug in the consumer application for this particular type of message. This bug might be fixed, new version deployed and now all those messages should be reprocessed. Moving from dead-letter to source queue is done with the following code:

public async Task RestoreFromDeadLetterQueueAsync(CancellationToken cancellationToken = default)
{
	var deadLetterQueueName = _appConfig.AwsDeadLetterQueueName;

	try
	{
		var token = new CancellationTokenSource();
		while (!token.Token.IsCancellationRequested)
		{
			var messages = await GetMessagesAsync(deadLetterQueueName, cancellationToken);
			if (!messages.Any())
			{
				token.Cancel();
				continue;
			}

			messages.ForEach(async message =>
			{
				var messageType = message.MessageAttributes.GetMessageTypeAttributeValue();
				if (messageType != null)
				{
					await PostMessageAsync(message.Body, messageType);
					await DeleteMessageAsync(deadLetterQueueName, message.ReceiptHandle);
				}
			});
		}
	}
	catch (Exception)
	{
		_logger.LogError($"Failed to ReprocessMessages from queue {deadLetterQueueName}");
		throw;
	}
}

SQS queue operations at a glance

All operations described above can be seen in SqsReader SqsClient class and SqsWriter SqsClient class.

Conclusion

In the current post, I have given code examples of how to perform basic SQS queue operations.

Related Posts

Read more...

AWS examples in C# – run the solution

Last Updated on by

Post summary: Explanation of how to install and use the solution in AWS examples in C# blog post series.

This post is part of AWS examples in C# – working with SQS, DynamoDB, Lambda, ECS series. The code used for this series of blog posts is located in aws.examples.csharp GitHub repository. In the current post, I give information on how to install and run the project.

Disclaimer

The solution has commands to deploy to the cloud as well as to clean resources. Note not all resources are cleaned, read more in the Cleanup section. In order to be run in AWS valid account is needed. I am not going to describe how to create an account. If an account is present, then there is also knowledge and awareness of how to use it.

Important: current examples generate costs on AWS account. Use cautiously at your own risk!

Restrictions

The project was tested to be working on Linux and Windows. For Windows, it is working only with Git Bash. The project requires a valid AWS account.

Required installations

In order to fully run and enjoy the project following needs to be installed:

Configurations

AWS CLI has to be configured in order to run properly. It happens with aws configure. If there is no AWS account, this is not an issue, put some values for access and secret key and put a correct region, like us-east-1.

Import Postman collection, in order to be able to try the examples. Postman collection is in aws.examples.csharp.postman_collection.json file in the code. This is an optional step, below there are cURL examples also.

Run the project in AWS

Running on AWS requires the setting of environment variables:

export AwsAccessKey=KIA57FV4.....
export AwsSecretKey=mSgsxOWVh...
export AwsRegion=us-east-1

Then the solution is deployed to AWS with ./solution-deploy.sh script. Note that the output of the command gives the API Gateway URL and API key, as well as the SqsWriter and SqsReader endpoints. See image below:

Run the project partly in AWS

The most expensive part of the current setup is the running of the docker containers in EC2 (Elastic Compute Cloud). I have prepared a script called ./solution-deploy-hybrid.sh, which runs the containers locally and all other things are in AWS. Still, environment variables from the previous section are mandatory to be set. I believe this is the optimal variant if you want to test and run the code in a “production-like” environment.

Run the project in LocalStack

LocalStack provides an easy-to-use test/mocking framework for developing Cloud applications. It spins up a testing environment on local machines that provide the same functionality and APIs as the real AWS cloud environment. I have experimented with LocalStack, there is a localstack branch in GitHub. The solution can be run with solution-deploy-localstack.sh command. I cannot really estimate if this is a good alternative, because I am running the free tier in AWS and the most expensive part is ECS, which I skip by running the containers locally, instead of AWS. See the previous section on how to run a hybrid.

Usage

There is a Postman collection which allows easy firing of the requests. Another option is to use cURL, examples of all requests with their Postman names are available below.

SqsWriter

SqsWriter is a .NET Core 3.0 application, that is dockerized and run in AWS ECS (Elastic Container Service). It exposes an API that can be used to publish Actor or Movie objects. There is also a health check request. After AWS deployment proper endpoint is needed. The endpoint can be found as an output of deployment scripts. See the image above.

PublishActor

curl --location --request POST 'http://localhost:5100/api/publish/actor' \
--header 'Content-Type: application/json' \
--data-raw '{
	"FirstName": "Bruce",
	"LastName": "Willis"
}'

PublishMovie

curl --location --request POST 'http://localhost:5100/api/publish/movie' \
--header 'Content-Type: application/json' \
--data-raw '{
	"Title": "Die Hard",
	"Genre": "Action Movie"
}'

When Actor or Movie is published, it goes to the SQS queue, SqsReader picks it up from there and processes it. What is visible in the logs is that both LogEntryMessageProcessor and ActorMessageProcessor are invoked. See the screenshot:

SqsWriterHealthCheck

curl --location --request GET 'http://localhost:5100/health'

SqsReader

SqsReader is a .NET Core 3.0 application, that is dockerized and run in AWS ECS. It has a consumer that listens to the SQS queue and processes the messages by writing them into appropriate AQS DynamoDB tables. It also exposes API to stop or start processing, as long as reprocess the dead-letter queue or simply get the queue status. After AWS deployment proper endpoint is needed. The endpoint can be found as an output of deployment scripts. See the image above.

ConsumerStart

curl --location --request POST 'http://localhost:5200/api/consumer/start' \
--header 'Content-Type: application/json' \
--data-raw ''

ConsumerStop

curl --location --request POST 'http://localhost:5200/api/consumer/stop' \
--header 'Content-Type: application/json' \
--data-raw ''

ConsumerStatus

curl --location --request GET 'http://localhost:5200/api/consumer/status'

ConsumerReprocess
If this one is invoked with no messages in the dead-letter queue then it takes 20 seconds to finish, because it actually waits for long polling timeout.

curl --location --request POST 'http://localhost:5200/api/consumer/reprocess' \
--header 'Content-Type: application/json' \
--data-raw ''

SqsReaderHealthCheck

curl --location --request GET 'http://localhost:5200/health'

ActorsServerlessLambda

This lambda is managed by the Serverless framework. It is exposed as REST API via AWS API Gateway. It also has a custom authorizer as well as API Key attached. Those are described in a further post.

ServerlessActors

In the case of AWS, the API Key and URL are needed, those can be obtained from deployment command logs. See the screenshot above. Put correct values to CHANGE_ME and REGION placeholders. Request is:

curl --location --request POST 'https://CHANGE_ME.execute-api.REGION.amazonaws.com/dev/actors/search' \
--header 'Content-Type: application/json' \
--header 'x-api-key: CHANGE_ME' \
--header 'Authorization: Bearer validToken' \
--data-raw '{
    "FirstName": "Bruce",
    "LastName": "Willis"
}'

MoviesServerlessLambda

ServerlessMovies

Put correct values to CHANGE_ME and REGION placeholders. Request is:

curl --location --request GET 'https://CHANGE_ME.execute-api.REGION.amazonaws.com/dev/movies/title/Die Hard'

Cleanup

Nota bene: This is a very important step, as leaving the solution running in AWS will accumulate costs.

In order to stop and clean up all AWS resources run ./solution-delete.sh script.

Nota bene: There a resource that is not automatically deleted by the scripts. This is a Route53 resource created by AWS Cloud Map. It has to be deleted with the following commands. Note that the id in the delete command comes from the result of list-namespaces command.

aws servicediscovery list-namespaces
aws servicediscovery delete-namespace --id ns-kneie4niu6pwwela

Verify cleanup

In order to be sure there are no leftovers from the examples, following AWS services has to be checked:

  • SQS
  • DynamoDB
  • IAM -> Roles
  • EC2 -> Security Groups
  • ECS -> Clusters
  • ECS -> Task Definitions
  • ECR -> Repositories
  • Lambda -> Functions
  • Lambda -> Applications
  • CloudFormation -> Stacks
  • S3
  • CloudWatch -> Log Groups
  • Route 53
  • AWS Cloud Map

On top of it, Billing should be regularly monitored to ensure no costs are applied.

Conclusion

This post describes how to run and they the solution described in AWS examples in C# – working with SQS, DynamoDB, Lambda, ECS series

Related Posts

Read more...

AWS examples in C# – working with SQS, DynamoDB, Lambda, ECS

Last Updated on by

Post summary: Overview of the AWS examples in C# series.

In several blog posts, I give some practical examples of how to use AWS SQS, DynamoDB, Lambda with C# code. The code used for this series of blog posts is located in aws.examples.csharp GitHub repository.

Introduction

AWS stands for Amazon Web Services, it is a subsidiary of Amazon that provides on-demand cloud computing platforms and APIs to individuals, companies, and governments, on a metered pay-as-you-go basis. AWS is one of the big cloud service providers. The others are Microsoft Azure and Google Cloud. All three cloud service providers have functions that are semantically common but differ in practical implementation. Also, every one of them has its own flavors. I have chosen to use AWS for these examples as it is something I have used before and I am most comfortable with it.

Architectural overview

In order to get a full understanding of the architecture, I have prepared this very basic diagram. It illustrates what services are there and how they communicate.

SqsReader and SqsWriter

Both are .NET Core 3.0 microservices running in docker containers. The images are uploaded in AWS ECR (Elastic Container Registry) and containers are run in AWS ECS (Elastic Container Service). SqsReader has a REST endpoint, by which an Actor or Movie can be posted. Both are pushed as a message to AWS SQS (Simple Queue Service). SqsWriter is listening to the SQS and in case of a message, it processes it. If the message is from type Actor or Movie then SqsReader saves it to the respective AWS DynamoDB tables. If the message is LogEntry, then the message is only output into SqsReader logs.

ActorsLambdaFunction and MoviessLambdaFunction

Both are .NET Core 2.1 lambda functions run in AWS Lambda. They listen to Actors and Movies DynamoDB tables changes and in case of new entries, they write to LogEntries DynamoDB table. Also, they write SQS messages from type LogEntry, which are then read by SqsReader.

ActorsServerlessLambda and MoviesServerlessLambda

Those are again lambda functions by are fully managed by the Serverless framework. They have a lambda application defined as well as Cloud Formation templates. They expose a REST API trough AWS API Gateway, by which the Actors table can be queried or a movie can be got from the Movies table.

Post in the series

This is a long series of posts describing into detail all the pieces of the architectural diagram above. Also, every aspect of the code in the repository is explained in detail in subsequent blog posts. It was a very interesting learning opportunity for me, which I would like to share. Here are the posts in the series:

Future plans

There are several topics I would like to go into as well, but there is no code yet for them into the GitHub repository. Those are:

  • AWS examples in C# – manage with Terraform
  • AWS examples in C# – use AWS Cognito for API Gateway authorizer

Conclusion

These series of posts are intended to give some basic overview of important AWS services and how to use them in C# code.

Related Posts

Read more...