Basic overview of software design patterns

Last Updated on by

Post summary: Basic overview of design patterns.

Although I have touched on the topic of design patterns in my Design patterns every test automation engineer should know post, I wanted for a long time to bring an article on the full topic. It is an important topic for software developers because knowing it or not they are in our everyday life. We need to be able to properly structure our code, so it is better that we are aware of them. In the current post, I will just give an overview of the patterns, the implementation details can be found all over the internet. I like Design Patterns in Java Tutorial because it is simple and is a very good starting point, and The Catalog of Design Patterns gives much more details and use-cases.

Design patterns

Design patterns give guidance on how to solve common scenarios and how to structure the code during software development. The main goal of using design patterns is to make the code easy to maintain, hence reducing the maintainability costs. Design patterns are split into several groups: creational patterns, structural patterns, and behavioral patterns.

Creational patterns

Patterns in this group are concerned about the creation of objects in a commonplace, rather than using the new operator where a new object is needed. This provides better re-usability and control over how objects are created. Please check the Creational patterns rules of thumb section which gives a very good overview of differences and combinations of the design patterns.

  • Factory Method – One class is responsible for creating different objects which implement one and the same interface. Object creation is done in one step by the factory method, which accepts arguments to define what type of object is needed.
  • Abstract Factory – A factory of factories used to create objects with similar characteristics. One class is responsible to create factory objects which are later used to create objects with their factory methods.
  • Builder – Builds complex objects step by step. The same builder process can be used to create different objects depending on the use case.
  • Object Pool – Cache objects in order to increase performance in case of too many expensive objects being created, but not that many are in use.
  • Prototype – Keeps an instance of an object, which is used to create a new object by cloning the cached one. Used in cases where the initial object creation requires significant resources and copying the prototype is a much faster operation.
  • Singleton – Ensures that only one instance of an object is available in the system. The object can be instantiated on-demand or on system startup by static block.

Structural patterns

Patterns in this group are concerned about object composition or representation. They optimize the objects and their relations. Please check the Structural patterns rules of thumb section which gives a very good overview of differences and combinations of the design patterns.

  • Adapter – Connects two incompatible interfaces. An object wraps a class implementing one of the interfaces, then this object implements the methods of the other interface.
  • Bridge – Separates an object’s interface from its implementation.
  • Filter – Filters set of objects based on different criteria or logical operators. Concrete criteria and operators are classes, implementing the criteria interface.
  • Composite – Compose objects into tree structures to represent whole-part hierarchies. An object can contain a list of similar objects, which are related somehow to it.
  • Decorator – Adds new functionalities to already existing objects, by using composition instead of inheritance. The decorator class holds an instance of the object and uses its functionality, it also can add more functionalities along with existing ones.
  • Facade – Defines higher-level interface on top of existing complex interfaces in order to ease the usage of the existing interfaces by hiding the complexity.
  • Flyweight – Reduces the number of created objects by caching and re-using already existing similar objects.
  • Private Class Data – Encapsulates the data of the class into a separate data class and controls access to the data.
  • Proxy – Controls access to another object.

Behavioral patterns

Patterns in this group are concerned about communication and interaction between objects. Please check the Behavioral patterns rules of thumb section which gives a very good overview of differences and combinations of the design patterns.

  • Chain of responsibility – Provides a series of processing objects which sequentially process a request. Objects in the chain decide which one is the most appropriate to process the request and whether to end processing or continue with the next handler.
  • Command – Decouples the object invoking the operation from the one executing it by encapsulating a command request as an object which then is passed to the invoking object.
  • Interpreter – Provides a way to evaluate language grammar or expression. The pattern uses a class to represent each grammar rule. Each rule is either composite or terminal, they are used together to solve the problem.
  • Iterator – Provides a way to access the elements of a collection in a sequential manner without any need to know its underlying representation.
  • Mediator – Define an object that encapsulates and manages how a set of objects interact.
  • Memento – Captures an object’s internal state in order to be able to restore it later.
  • Null Object – Act as a default value of an object. Used when in case of null no action is expected by the system, this removes the need for constant null checking.
  • Observer – Observes for changes and propagates a change of an object to all other objects related to it.
  • State – Changes an object’s behavior when its state changes.
  • Strategy – Encapsulates an algorithm inside a class. Creates objects which represent various strategies and a context object whose behavior varies as per its strategy object.
  • Template method – Defines an abstract class that exposes a predefined way (a template) to execute its methods. Its subclasses can override the method implementation as per need but the invocation is in the same way as defined by an abstract class.
  • Visitor – Represents an operation to be performed on the elements of an object structure. Visitor lets you define a new operation without changing the classes of the elements on which it operates.

Conclusion

The current post gives a basic overview of a very important topic for software engineers, design patterns. Internet is full of resources with more details on how those are implemented.

Related Posts

Read more...

Distributed system observability: extract and visualize metrics from OpenTelemetry spans

Last Updated on by

Post summary: How to extract metrics from spans by OpenTelementry collector, store them in Prometheus and properly visualize them in Grafana.

This post is part of Distributed system observability: complete end-to-end example series. The code used for this series of blog posts is located in selenium-observability-java GitHub repository.

Prometheus and metrics

Prometheus is an open-source monitoring and alerting toolkit. Prometheus collects and stores its metrics. Metrics information is stored with the timestamp at which it was recorded, alongside optional key-value pairs called labels. Metric is a way to measure something, e.g. how many people had read the current article. Metrics change over time, and Prometheus is recording and graphically visualizing the change over time.

Extract metrics from spans in OpenTelementry Collector

OpenTelemetry collector receives tracing data from the frontend, converts it into Jaeger format, and exports it to the Jaeger backend. Every span has duration, which is a metric. In order to extract the metric, the Span Metrics Processor contributors library is used. Full configurations are in otel-config.yaml. In the file are configured receivers, processors, exporters, and service, There are two receivers: oltp is receiving the traces; otlp/spanmetrics is a dummy receiver, that is never used, but the pipeline requires one to be present. There are two processors: batch compresses the data into batches and optimizes data transmission; spanmetrics extracts the metrics from spans. Spanmetrics configuration should have metrics_exporter, prometheus in the current case, which is existing in the exporters section of the configuration. An optional configuration is latency_histogram_buckets, which defines the histogram buckets. This is a very important concept and will be explained later. There are two exporters: jaeger sends the data to Jaeger backend; prometheus defines an endpoint, which Prometheus can fetch the metrics from, 0.0.0.0:8889 in the current example. Port 8889 also has to be exposed in docker-compose.yml file. The service section is used to configure what components are enabled. In the current example, otlp receiver takes the traces and exports them to jaeger, also traces are being processed by spanmetrics processor and exported as metrics to prometheus endpoint. More details can be found in OpenTelemetry Collector configuration.

receivers:
  otlp:
    protocols:
      grpc:
      http:
  otlp/spanmetrics:
    protocols:
      grpc:
        endpoint: 0.0.0.0:12346

processors:
  batch:
  spanmetrics:
    metrics_exporter: prometheus
    latency_histogram_buckets:
      [200ms, 400ms, 800ms, 1s, 1200ms, 1400ms, 1600ms, 1800ms, 2s, 5s, 7s]

exporters:
  jaeger:
    endpoint: jaeger:14250
    tls:
      insecure: true
  prometheus:
    endpoint: 0.0.0.0:8889
    metric_expiration: 1440m

service:
  pipelines:
    traces:
      receivers: [otlp]
      processors: [spanmetrics, batch]
      exporters: [jaeger]
    metrics:
      receivers: [otlp/spanmetrics]
      exporters: [prometheus]

Prometheus histogram by OpenTelemetry Collector

A Prometheus histogram collects metrics and counts them in configurable buckets. It also provides a sum of all observed values. Buckets are separate measurable dimensions that metrics are put into. In the current example, the buckets are [200ms, 400ms, 800ms, 1s, 1200ms, 1400ms, 1600ms, 1800ms, 2s, 5s, 7s]. Spans that are being received by the frontend are compared by their duration and put into a separate metric bucket. The easiest way to illustrate this is with an example. If a request takes 1.29 seconds, then buckets from 200ms to 1200ms are untouched, all other buckets from 1400ms to 7s are increased with a value of 1. When the next request comes with a duration of 1.99 seconds, then buckets from 200ms to 1800ms are untouched, buckets from 2s to 7s are increased with a value of 1. This is hard to understand but is a very important concept. You can experiment by running the examples, then open the frontend at http://localhost:3000/, and click the “Fetch persons” button. Observe the metrics buckets at OpenTelemetry Collector http://localhost:8889/metrics. The metrics of the two example requests above are visialized in the screenshot below. Buckets are with name latency_bucket and additional labels to identify the correct span. The span name is set into the operation label in the bucket. In the current example, “GET /api/person-service/persons” span is used. Along with the configured buckets, there are two additional buckets – 9.223372036854775e+12 – I truly do not what that is, and +Inf – this is the default bucket for all requests which does not fit the predefined buckets, i.e. longer than 7 seconds. There are two more counters – latency_sum – the total time in milliseconds that all the requests took, in our case – 1.29s + 1.99s = 3279ms; latency_count – the total number of requests, in our case – 2.

Visualize in Grafana

The two requests listed above are visualized in Grafana as shown below, one request is in the 1400ms (1200ms-1400ms) bucket, one request is in the 2000ms (1800ms-2000ms) bucket.

The panel above is defined in Grafana. It is a Bar gauge, the data source is Prometheus, Metric browser is latency_bucket{operation=”GET /api/person-service/persons”,service_name=”person-service-frontend”,span_kind=”SPAN_KIND_CLIENT”,status_code=”STATUS_CODE_UNSET”}, Legend is {{le}}, Min step is 1, Format is Heatmap.

Working with histogram buckets is a complex task, How to visualize Prometheus histograms in Grafana post gives good guidance.

A custom dashboard is created in the examples, it is accessible at http://localhost:3001/d/bgZ6Mf5nk/fetch-persons?orgId=1. The dashboard is defined in etc/grafana-custom-dashboard.json file.

Conclusion

In the current post, I have shown how to make OpenTelemetry Collector convert the spans into metrics, which can be fetched by Prometheus and visualized in Grafana.

Related Posts

Read more...

Distributed system observability: Instrument Cypress tests with OpenTelemetry

Last Updated on by

Post summary: Instrument Cypress tests with OpenTelemetry and be able to custom trace the tests.

This post is part of Distributed system observability: complete end-to-end example series. The code used for this series of blog posts is located in selenium-observability-java GitHub repository.

Cypress

Cypress is a front-end testing tool built for the modern web. It is most often compared to Selenium; however, Cypress is both fundamentally and architecturally different. I have lots of experience with Cypress, I have written for it in Testing with Cypress – lessons learned in a complete framework post. Although it provides some benefits over Selenium, it also comes with its problems. Writing tests in Cypress is more complex than with Selenium. Cypress is more technically complex, which gives more power but is a real struggle for making decent test automation.

Cypress tests custom observability

As stated before, in the case of HTTP calls, the OpenTelemetry binding between both parties is the traceparent header. I want to bind the Selenium tests with the frontend, so it comes naturally to mind – open the URL in the browser and provide this HTTP header. After research, I could not find a way to achieve this. I implemented a custom solution, which is Cypress independent and can be customized as needed. Moreover, it is a web automation framework independent, this approach can be used with any web automation tool. See examples for the same approach in Selenium in Distributed system observability: Instrument Selenium tests with OpenTelemetry post.

Instrument the frontend

In order to achieve linking, a JavaScript function is exposed in the frontend, which creates a parent Span. Then this JS function is called from the tests when needed. This function is named startBindingSpan() and is registered with the window global object. It creates a binding span with the same attributes (traceId, spanId, traceFlags) as the span used in the Selenium tests. This span never ends, so is not recorded in the traces. In order to enable this span, the traceSpan() function has to be manually used in the frontend code, because it links the current frontend context with the binding span. I have added another function, called flushTraces(). It forces the OpenTelemetry library to report the traces to Jaeger. Reporting is done with an HTTP call and the browser should not exit before all reporting requests are sent.

Note: some people consider exposing such a window-bound function in the frontend to modify React state as an anti-pattern. Frontend code is in src/helpers/tracing/index.ts:

declare const window: any
var bindingSpan: Span | undefined

window.startBindingSpan = (traceId: string, spanId: string, traceFlags: number) => {
  bindingSpan = webTracerWithZone.startSpan('')
  bindingSpan.spanContext().traceId = traceId
  bindingSpan.spanContext().spanId = spanId
  bindingSpan.spanContext().traceFlags = traceFlags
}

window.flushTraces = () => {
  provider.activeSpanProcessor.forceFlush().then(() => console.log('flushed'))
}

export function traceSpan<F extends (...args: any)
    => ReturnType<F>>(name: string, func: F): ReturnType<F> {
  var singleSpan: Span
  if (bindingSpan) {
    const ctx = trace.setSpan(context.active(), bindingSpan)
    singleSpan = webTracerWithZone.startSpan(name, undefined, ctx)
    bindingSpan = undefined
  } else {
    singleSpan = webTracerWithZone.startSpan(name)
  }
  return context.with(trace.setSpan(context.active(), singleSpan), () => {
    try {
      const result = func()
      singleSpan.end()
      return result
    } catch (error) {
      singleSpan.setStatus({ code: SpanStatusCode.ERROR })
      singleSpan.end()
      throw error
    }
  })
}

Instrument Cypress tests

In order to achieve the tracing, OpenTelemetry JavaScript libraries are needed. Those libraries are the same used in the frontend and described in Distributed system observability: Instrument React application with OpenTelemetry post. Those libraries send the data in OpenTelemetry format, so OpenTelemetry Collector is needed to convert the traces into Jaeger format. OpenTelemetry collector is already started into the Docker compose landscape, so it just needs to be used, its endpoint is http://localhost:4318/v1/trace. There is a function that creates an OpenTelemetry tracer. I have created two implementations on the tracing. One is by extending the existing Cypress commands. Another is by creating a tracing wrapper around Cypress. Both of them use the tracer creating function. Both of them coexist in the same project, but cannot run simultaneously.

import { WebTracerProvider } from '@opentelemetry/sdk-trace-web'
import { Resource } from '@opentelemetry/resources'
import { SimpleSpanProcessor } from '@opentelemetry/sdk-trace-base'
import { CollectorTraceExporter } from '@opentelemetry/exporter-collector'
import { ZoneContextManager } from '@opentelemetry/context-zone'

export function initTracer(name) {
  const resource = new Resource({ 'service.name': name })
  const provider = new WebTracerProvider({ resource })

  const collector = new CollectorTraceExporter({
    url: 'http://localhost:4318/v1/trace'
  })
  provider.addSpanProcessor(new SimpleSpanProcessor(collector))
  provider.register({ contextManager: new ZoneContextManager() })

  return provider.getTracer(name)
}

Tracing Cypress tests – override default commands

Cypress allows you to overwrite existing commands. This feature will be used in order to do the tracing, commands will perform their normal functions, but also will trace. This is achieved in cypress-tests/cypress/support/commands_tracing.js file.

import { context, trace } from '@opentelemetry/api'
import { initTracer } from './init_tracing'

const webTracerWithZone = initTracer('cypress-tests-overwrite')

var mainSpan = undefined
var currentSpan = undefined
var mainWindow

function initTracing(name) {
  mainSpan = webTracerWithZone.startSpan(name)
  currentSpan = mainSpan
  trace.setSpan(context.active(), mainSpan)
  mainSpan.end()
}

function initWindow(window) {
  mainWindow = window
}

function createChildSpan(name) {
  const ctx = trace.setSpan(context.active(), currentSpan)
  const span = webTracerWithZone.startSpan(name, undefined, ctx)
  trace.setSpan(context.active(), span)
  return span
}

Cypress.Commands.add('initTracing', name => initTracing(name))

Cypress.Commands.add('initWindow', window => initWindow(window))

Cypress.Commands.overwrite('visit', (originalFn, url, options) => {
  currentSpan = mainSpan
  const span = createChildSpan(`visit: ${url}`)
  currentSpan = span
  const result = originalFn(url, options)
  span.end()
  return result
})

Cypress.Commands.overwrite('get', (originalFn, selector, options) => {
  const span = createChildSpan(`get: ${selector}`)
  currentSpan = span
  const result = originalFn(selector, options)
  span.end()
  mainWindow.startBindingSpan(span.spanContext().traceId,
    span.spanContext().spanId, span.spanContext().traceFlags)
  return result
})

Cypress.Commands.overwrite('click', (originalFn, subject, options) => {
  const span = createChildSpan(`click: ${subject.selector}`)
  const result = originalFn(subject, options)
  span.end()
  return result
})

Cypress.Commands.overwrite('type', (originalFn, subject, text, options) => {
  const span = createChildSpan(`type: ${text}`)
  const result = originalFn(subject, text, options)
  span.end()
  return result
})

This file with commands overwrite can be conditionally enabled and disabled with an environment variable. Variable is enableTracking and is defined in cypress.json file. This allows switching tracing on and off. In cypress.json file there is one more setting, chromeWebSecurity which overrides the CORS problem when tracing is sent to the OpenTelemetry collector. Cypress get command is the one that is used to do the linking between the tests and the frontend. It is calling the window.startBindingSpan function. In order for this to work, a window instance has to be set into the tests with the custom initWindow command.

Note: A special set of Page Objects is used with this implementation.

Tracing Cypress tests – implement a wrapper

Cypress allows you to overwrite existing commands. This feature will be used in order to do the tracing, commands will perform their normal functions, but also will trace. This is achieved in cypress-tests/cypress/support/tracing_cypress.js file.

import { context, trace } from '@opentelemetry/api'
import { initTracer } from './init_tracing'

export default class TracingCypress {
  constructor() {
    this.webTracerWithZone = initTracer('cypress-tests-wrapper')
    this.mainSpan = undefined
    this.currentSpan = undefined
  }

  _createChildSpan(name) {
    const ctx = trace.setSpan(context.active(), this.currentSpan)
    const span = this.webTracerWithZone.startSpan(name, undefined, ctx)
    trace.setSpan(context.active(), span)
    return span
  }

  initTracing(name) {
    this.mainSpan = this.webTracerWithZone.startSpan(name)
    this.currentSpan = this.mainSpan
    trace.setSpan(context.active(), this.mainSpan)
    this.mainSpan.end()
  }

  visit(url, options) {
    this.currentSpan = this.mainSpan
    const span = this._createChildSpan(`visit: ${url}`)
    this.currentSpan = span
    const result = cy.visit(url, options)
    span.end()
    return result
  }

  get(selector, options) {
    const span = this._createChildSpan(`get: ${selector}`)
    this.currentSpan = span
    const result = cy.get(selector, options)
    span.end()
    return result
  }

  click(subject, options) {
    const span = this._createChildSpan('click')
    subject.then(element =>
      element[0].ownerDocument.defaultView.startBindingSpan(
        span.spanContext().traceId,
        span.spanContext().spanId,
        span.spanContext().traceFlags
      )
    )
    const result = subject.click(options)
    span.end()
    return result
  }

  type(subject, text, options) {
    const span = this._createChildSpan(`type: ${text}`)
    const result = subject.type(text, options)
    span.end()
    return result
  }
}

In order to make this implementation work, it is mandatory to set enableTracking variable in cypress.json file to falseTracingCypress is instantiated in each and every test. An instance of it is provided as a constructor argument to the Page Object for this approach. The important part here is that the binding window.startBindingSpan is called in the get() method.

Note: A special set of Page Objects is used with this implementation.

End-to-end traces in Jaeger

Conclusion

In the given examples, I have shown how to instrument Cypress tests in order to be able to track how they perform. I have provided two approaches, with overwriting the default Cypress command and with providing a tracing wrapper for Cypress.

Related Posts

Read more...

Distributed system observability: Instrument Selenium tests with OpenTelemetry

Last Updated on by

Post summary: Instrument Selenium tests with OpenTelemetry and be able to custom trace the tests themselves.

This post is part of Distributed system observability: complete end-to-end example series. The code used for this series of blog posts is located in selenium-observability-java GitHub repository.

Selenium

Selenium is browser automation software. It’s been around for many years and is de-facto the tool for web automation testing. It has bindings in all popular programming languages, which means people can write web automation tests in those languages.

Selenium observability

Selenium 4 comes with a pack of features. One of those features is the Selenium observability feature. It uses OpenTracing to keep track of the request’s lifecycle. This feature was the main driving factor for me to start to research the current examples. I pictured in my head end-to-end observability, from the test action down to the database call. I have to come up with a custom tracing solution, that is described in this post.

Selenium WebDriver architecture

Selenium consist of a client, those are the bindings and server, these are the executables that control the given browser. Both communicate via HTTP calls with JSON payload. This is described in detail in the W3C Selenium specification. I attach a small diagram, I used in a presentation I did a long time ago.

Selenium client instrumentation

Enabling the default selenium observability is very easy. A Jar dependency has to be added in pom.xml, environment variables to be set, and of course running Jaeger instance to collect the traces. Note that this works only for the RemoteWebDriver. It is described in detail in Remote WebDriver.

<dependency>
    <groupId>io.opentelemetry</groupId>
    <artifactId>opentelemetry-exporter-jaeger</artifactId>
    <version>1.6.0</version>
</dependency>
<dependency>
    <groupId>io.grpc</groupId>
    <artifactId>grpc-netty</artifactId>
    <version>1.41.0</version>
</dependency>

This goes along with the WebDriver instantiation code.

System.setProperty("otel.traces.exporter", "jaeger");
System.setProperty("otel.exporter.jaeger.endpoint", "http://localhost:14250");
System.setProperty("otel.resource.attributes", "service.name=selenium-java-client");
System.setProperty("otel.metrics.exporter", "none");
WebDriver driver = new RemoteWebDriver(
                new URL("http://localhost:4444"),
                new ImmutableCapabilities("browserName", "chrome"));

Selenium server instrumentation

Server instrumentation examples are shown in manoj9788/tracing-selenium-grid. Both the standalone server and Selenium grid can be instrumented. In the current examples, I am working only with the standalone server. Unlike the examples, I used Docker to do the instrumentation. I take the default selenium/standalone-chrome:4.0.0 image and install Coursier, a dependency resolver tool, on top of it. Then I run the dependency fetch, so this build sted gets cached for a faster rebuild. Selenium provides –ext flag, which can be set after the standalone command option. I could not make this work only by changing the SE_OPTS environment variable, so I made this rewrite of the startup command in /opt/bin/start-selenium-standalone.sh file. What I did was to change from java -jar to java -cp command, as -cp flag is ignored in case -jar flag is used.

FROM selenium/standalone-chrome:4.0.0

# Install coursier in order to fetch the dependencies
RUN cd /tmp && curl -k -fLo cs https://git.io/coursier-cli-"$(uname | tr LD ld)" && chmod +x cs && ./cs install cs && rm cs

# Download dependencies, so they are availble during run
RUN /home/seluser/.local/share/coursier/bin/cs fetch -p io.opentelemetry:opentelemetry-exporter-jaeger:1.6.0 io.grpc:grpc-netty:1.41.0

# Modify the run command to include dependent JARs in it
RUN sudo sed -i 's~-jar /opt/selenium/selenium-server.jar~-cp "/opt/selenium/selenium-server.jar:$(/home/seluser/.local/share/coursier/bin/cs fetch -p io.opentelemetry:opentelemetry-exporter-jaeger:1.6.0 io.grpc:grpc-netty:1.41.0)" org.openqa.selenium.grid.Main~g' /opt/bin/start-selenium-standalone.sh

# Enable OpenTelemetry
ENV JAVA_OPTS "$JAVA_OPTS \
  -Dotel.traces.exporter=jaeger \
  -Dotel.exporter.jaeger.endpoint=http://jaeger:14250 \
  -Dotel.resource.attributes=service.name=selenium-java-server"

Selenium default traces in Jaeger

RemoteWebDriver client is passing down the traceparent header when making the request to the server, this is why both client and server traces are connected.

Selenium tests custom observability

As stated before, in the case of HTTP calls, the OpenTelemetry binding between both parties is the traceparent header. I want to bind the Selenium tests with the frontend, so it comes naturally to mind – open the URL in the browser and provide this HTTP header. After research, I could not find a way to achieve this. I implemented a custom solution, which is WebDriver independent and can be customized as needed. Moreover, it is a web automation framework independent, this approach can be used with any web automation tool. An example of how tracing can be done with Cypress is shown in Distributed system observability: Instrument Cypress tests with OpenTelemetry post.

Instrument the frontend

In order to achieve linking, a JavaScript function is exposed in the frontend, which creates a parent Span. Then this JS function is called from the tests when needed. This function is named startBindingSpan() and is registered with the window global object. It creates a binding span with the same attributes (traceId, spanId, traceFlags) as the span used in the Selenium tests. This span never ends, so is not recorded in the traces. In order to enable this span, the traceSpan() function has to be manually used in the frontend code, because it links the current frontend context with the binding span. I have added another function, called flushTraces(). It forces the OpenTelemetry library to report the traces to Jaeger. Reporting is done with an HTTP call and the browser should not exit before all reporting requests are sent.

Note: some people consider exposing such a window-bound function in the frontend to modify React state as an anti-pattern. Frontend code is in src/helpers/tracing/index.ts:

declare const window: any
var bindingSpan: Span | undefined

window.startBindingSpan = (traceId: string, spanId: string, traceFlags: number) => {
  bindingSpan = webTracerWithZone.startSpan('')
  bindingSpan.spanContext().traceId = traceId
  bindingSpan.spanContext().spanId = spanId
  bindingSpan.spanContext().traceFlags = traceFlags
}

window.flushTraces = () => {
  provider.activeSpanProcessor.forceFlush().then(() => console.log('flushed'))
}

export function traceSpan<F extends (...args: any)
    => ReturnType<F>>(name: string, func: F): ReturnType<F> {
  var singleSpan: Span
  if (bindingSpan) {
    const ctx = trace.setSpan(context.active(), bindingSpan)
    singleSpan = webTracerWithZone.startSpan(name, undefined, ctx)
    bindingSpan = undefined
  } else {
    singleSpan = webTracerWithZone.startSpan(name)
  }
  return context.with(trace.setSpan(context.active(), singleSpan), () => {
    try {
      const result = func()
      singleSpan.end()
      return result
    } catch (error) {
      singleSpan.setStatus({ code: SpanStatusCode.ERROR })
      singleSpan.end()
      throw error
    }
  })
}

Instrument the Selenium tests

I have created a custom TracingWebDriver wrapper over the WebDriver. It instantiates the OpenTracing client with initializeTracer() method. It has a built-in custom logic when to generate a tracking span, which is the parent of this span, and when to link the tests’ span with the frontend span. Finding an element is done with the custom findElement() method. It creates a child span, linking it to the previously defined currentSpan. Then the window.startBindingSpan() function is being called in the browser in order to create the binding span in the frontend. This is the way to link tests and the frontend. In case of error, Span is recorded as an error and this can be tracked in Jaeger. On driver quit, or on URL change, or maybe on page change via a button, or whenever needed, window.flushTraces() function can be called by invoking forceFlushTraces() method in the tests. This has 1 second of Thread.sleep(), which waits for the tracing request to be fired from the frontend to the Jaeger. Sleeping like this is an anti-pattern for test automation, but I could not find a better way to wait for the traces. If the browser is prematurely closed or the page is navigated, then tracing is incorrect.

package com.automationrhapsody.observability;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.exporter.jaeger.JaegerGrpcSpanExporter;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import org.openqa.selenium.*;
import org.openqa.selenium.remote.tracing.opentelemetry.OpenTelemetryTracer;
import org.openqa.selenium.support.ui.ExpectedConditions;
import org.openqa.selenium.support.ui.WebDriverWait;

import java.io.File;
import java.time.Duration;
import java.util.List;

public class TracingWebDriver {

    private static final Duration WAIT_SECONDS = Duration.ofSeconds(5);
    private static final String JAEGER_GRPC_URL = "http://localhost:14250";

    private WebDriver driver;
    private Tracer tracer;
    private Span mainSpan;
    private Span currentSpan;

    public TracingWebDriver(boolean isRemote, String className, String methodName) {
        System.setProperty("otel.traces.exporter", "jaeger");
        System.setProperty("otel.exporter.jaeger.endpoint", JAEGER_GRPC_URL);
        System.setProperty("otel.resource.attributes", "service.name=selenium-java-client");
        System.setProperty("otel.metrics.exporter", "none");

        initializeTracer();

        mainSpan = tracer.spanBuilder("webdriver-create").startSpan();
        mainSpan.setAttribute("test.class.name", className);
        mainSpan.setAttribute("test.method.name", methodName);
        setCurrentSpan(mainSpan);
        driver = WebDriverFactory.createDriver(isRemote);
        mainSpan.end();
    }

    public void get(String url) {
        waitToLoad();
        setCurrentSpan(mainSpan);
        Span span = createChildSpan("get: " + url);
        try {
            forceFlushTraces();
            driver.get(url);
            createBrowserBindingSpan(span);
        } catch (Exception ex) {
            span.setStatus(StatusCode.ERROR, ex.getMessage());
            captureScreenshot();
        } finally {
            span.end();
        }
    }

    public WebElement findElement(By by) {
        waitToLoad();
        Span span = createChildSpan("findElement: " + by.toString());
        try {
            createBrowserBindingSpan(span);
            WebDriverWait wait = new WebDriverWait(driver, WAIT_SECONDS);
            return wait.until(ExpectedConditions.visibilityOfElementLocated(by));
        } catch (Exception ex) {
            span.setStatus(StatusCode.ERROR, ex.getMessage());
            captureScreenshot();
            return null;
        } finally {
            span.end();
        }
    }

    public void quit() {
        waitToLoad();
        forceFlushTraces();
        setCurrentSpan(mainSpan);
        Span span = createChildSpan("quit");
        driver.quit();
        span.end();
    }

    public Object executeJavaScript(String script) {
        return ((JavascriptExecutor) driver).executeScript(script);
    }

    public String captureScreenshot() {
        File screenshotFile = ((TakesScreenshot) driver).getScreenshotAs(OutputType.FILE);
        String output = screenshotFile.getAbsolutePath();
        System.out.println(output);
        return output;
    }

    private void waitToLoad() {
        WebDriverWait wait = new WebDriverWait(driver, WAIT_SECONDS);
        wait.until(ExpectedConditions.invisibilityOfElementLocated(By.id("test-progress-indicator")));
    }

    private void initializeTracer() {
        JaegerGrpcSpanExporter exporter = JaegerGrpcSpanExporter.builder().setEndpoint(JAEGER_GRPC_URL).build();
        Resource resource = Resource.builder()
                .put("service.name", "selenium-tests")
                .build();
        SdkTracerProvider provider = SdkTracerProvider.builder()
                .addSpanProcessor(SimpleSpanProcessor.create(exporter))
                .setResource(resource)
                .build();
        OpenTelemetrySdk openTelemetrySdk = OpenTelemetrySdk.builder()
                .setTracerProvider(provider)
                .build();
        tracer = openTelemetrySdk.getTracer("io.opentelemetry.jaeger.exporter");
    }

    private Span createChildSpan(String name) {
        Span span = tracer.spanBuilder(name)
                .setParent(Context.current().with(currentSpan))
                .startSpan();
        setCurrentSpan(span);
        return span;
    }

    private void setCurrentSpan(Span span) {
        currentSpan = span;
        OpenTelemetryTracer.getInstance().setOpenTelemetryContext(Context.current().with(span));
    }

    private void createBrowserBindingSpan(Span span) {
        executeJavaScript("window.startBindingSpan('"
                + span.getSpanContext().getTraceId() + "', '"
                + span.getSpanContext().getSpanId() + "', '"
                + span.getSpanContext().getTraceFlags().asHex() + "')");
    }

    private void forceFlushTraces() {
        executeJavaScript("if (window.flushTraces) window.flushTraces()");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            // Do nothing
        }
    }
}

End-to-end traces in Jaeger

In case of error, this is also recorded.

Linking default and custom traces

In an ideal world, I would like to make my custom Span parent of the default Selenium tracing spans, so I can attach the debug information to the custom tracing information. I was not able to do this. I have raised an issue with Selenium, OpenTelementry tracing: be able to link the default tracing with a custom tracing. I contributed to Selenium by adding this possibility to link Selenium traces to the custom traces. This is done with the following code OpenTelemetryTracer.getInstance().setOpenTelemetryContext(Context.current().with(span)). Finally, the full tracing looks as shown on the image:

Conclusion

The out-of-the-box Selenium observability is useful to trace what is happening in a complex grid. It does not give the possibility to trace tests performance and how test steps are affecting the application itself. In the current post, I have described a way to create a custom tracing, which provides end-to-end traceability from the tests down to the database calls. This approach gives the flexibility to be customized for different needs. It involves changes in the application’s frontend code though, which involves the application architecture topic in the discussions.

Related Posts

Read more...

Distributed system observability: Instrument React application with OpenTelemetry

Last Updated on by

Post summary: Create a React web application using the Material UI design system and instrument the application with OpenTelemetry.

This post is part of Distributed system observability: complete end-to-end example series. The code used for this series of blog posts is located in selenium-observability-java GitHub repository.

React

React is a JavaScript library for building user interfaces.

Create React App

Create React App provides a simple way to create React applications from scratch. It also creates and abstracts the whole toolchain needed to develop JavaScript applications, such as WebPack and Babel, so the user does not need to bother with configuring those. Application is created with the following command: create-react-app my-app –template typescript.

Project structure

With the projects I have worked on professionally I am used to a specific folder structure of the project.

  • src/components – re-usable components, building blocks, used across the application
  • src/containers – components used to build the application, e.g. pages
  • src/helpers – functionality not related to the presentation logic
  • src/stylesheets – CSS files, which hold common and re-usable functionality
  • src/types – TypeScript data models, e.g. models used with API communication

Material UI

Material UI is a React design system that provides ready-to-use components. An official example is shown in create-react-app-with-typescript.

TypeScript

TypeScript is a programming language developed and maintained by Microsoft. It is a strict syntactical superset of JavaScript and adds optional static typing to the language. TypeScript is designed for the development of large applications and transcompiles to JavaScript. TypeScript brings some overhead, but for me, this is justified. Because of the static typing, errors are shown on compile-time, not in runtime. Also, IntelliSense, the intelligent code completion, kicks in and is of great help.

Code examples

Main file is src/index.tsx. It loads the App component, which uses React Router to define different path handling, it loads different components based on the path. In the current example, /about path is covered just by a very simple page, and all other paths are loading PersonsPage.

index.tsx

import ReactDOM from 'react-dom'

import App from 'containers/App'
import reportWebVitals from './reportWebVitals'
import './stylesheets/base.scss'

ReactDOM.render(<App />, document.querySelector('#root'))

// If you want to start measuring performance in your app, pass a function
// to log results (for example: reportWebVitals(console.log))
// or send to an analytics endpoint. Learn more: https://bit.ly/CRA-vitals
reportWebVitals()

App

import { Router, Route, Switch } from 'react-router-dom'
import { createBrowserHistory } from 'history'
import { ThemeProvider } from '@mui/material/styles'
import { CssBaseline } from '@mui/material'

import PersonsPage from 'containers/PersonsPage'

import theme from 'stylesheets/theme'

export default () => (
  <ThemeProvider theme={theme}>
    <CssBaseline />
    <Router history={createBrowserHistory()}>
      <Switch>
        <Route exact path={'/about'}>
          <div>About Page</div>
        </Route>
        <Route>
          <PersonsPage />
        </Route>
      </Switch>
    </Router>
  </ThemeProvider>
)

PersonsPage


import React from 'react'

import { apiFetch } from 'helpers/api'
import { personServiceUrl } from 'helpers/config'
import { IPerson } from 'types/types'

import PersonsList from './PersonsList'

import TracingButton from 'components/TracingButton'
import CreateNewPersonModal from 'containers/CreateNewPersonModal'

import styles from './styles.module.scss'

export default () => {
  const [isModalOpen, setIsModalOpen] = React.useState<boolean>(false)
  const [persons, setPersons] = React.useState<IPerson[]>([])

  const fetchPersons = async () => {
    const persons = await apiFetch<IPerson[]>(`${personServiceUrl}/persons`)
    setPersons(persons)
  }

  return (
    <div className={styles.app}>
      <CreateNewPersonModal open={isModalOpen} onClose={() => setIsModalOpen(false)} />

      <header className={styles.appHeader}>
        <p>Sample Patient Service Frontend</p>
      </header>

      <TracingButton id="test-create-person-button" label={'Create new person'} onClick={() => setIsModalOpen(true)} />

      <TracingButton id="test-fetch-persons-button" label={'Fetch persons'} onClick={fetchPersons} />
      {persons.length > 0 && (
        <React.Fragment>
          <div id="test-persons-count-text">Found {persons.length} persons</div>
          <PersonsList persons={persons} />
        </React.Fragment>
      )}
    </div>
  )
}

Proxy

Cross-Origin Resource Sharing (CORS) is an HTTP-header-based mechanism that allows a server to indicate any origins (domain, scheme, or port) other than its own from which a browser should permit loading resources. In order to allow the frontend to connect to the backend, CORS should be allowed. One option is to instruct the backend to produce CORS headers that allow the frontend URL. Another option is to use React Create App’s mechanism to handle the CORS by defining a proxy. The file that is used is setupProxy.js. In the current examples, the proxy handles both connections to the backend and OpenTelementry connector.

const { createProxyMiddleware } = require('http-proxy-middleware')

const configureProxy = (path, target) =>
  createProxyMiddleware(path, {
    target: target,
    secure: false,
    pathRewrite: { [`^${path}`]: '' }
  })

module.exports = function (app) {
  app.use(configureProxy('/api/person-service', 'http://localhost:8090'))
  app.use(configureProxy('/api/tracing', 'http://localhost:4318'))
}

WebVitals

The default application has built-in support for WebVitals. If those need to be put into operation, a reporter just needs to be registered in src/index.tsx file by passing a method reference to reportWebVitals(). Easiest is to log to console: reportWebVitals(console.log). This can be enhanced further by creating a reporter which sends the data to Prometheus. Actually, pushing data to Prometheus is not possible. Prometheus Pushgateway can be used as metrics cache, from which Prometheus can pull.

Docker

The application is Dockerized with Nginx in exactly the same way as described in Dockerize React application with a Docker multi-staged build post.

Instrumentation

Instrumentation is done with OpenTracing JavaScript libraries. The API calls to the backend use the fetch() method. OpenTracing has a library that instruments all the calls going through fetch() – @opentelemetry/instrumentation-fetch. A WebTracerProvider is instantiated with a Resource that has the service.name. Several SimpleSpanProcessor are registered with addSpanProcessor() method. The important processor is the CollectorTraceExporter, which sends the traces to the OpenTelemetry collector. The actual tracer is returned by getTracer() method from the provider, it is used to do the custom tracing. registerInstrumentations() registers an instance of FetchInstrumentation, which actually traces the API calls. In case the API responds with a status code greater than 299, then this is considered an error, and the span is marked as ERROR. This is done in the applyCustomAttributesOnSpan function. Another custom change for fetch tracking is that the span name is overwritten in order to have a unique name for each API. This will allow separate tracing of each individual API. Custom traceSpan() method is defined in order to manually trace individual events in the application, such as a button click for e.g. In case of an error in the wrapped function func then span is also marked as an error.

import { context, trace, Span, SpanStatusCode } from '@opentelemetry/api'
import { WebTracerProvider } from '@opentelemetry/sdk-trace-web'
import { Resource } from '@opentelemetry/resources'
import { SimpleSpanProcessor } from '@opentelemetry/sdk-trace-base'
import { CollectorTraceExporter } from '@opentelemetry/exporter-collector'
import { ZoneContextManager } from '@opentelemetry/context-zone'
import { FetchInstrumentation } from '@opentelemetry/instrumentation-fetch'
import { FetchError } from '@opentelemetry/instrumentation-fetch/build/src/types'
import { registerInstrumentations } from '@opentelemetry/instrumentation'

import { tracingUrl } from 'helpers/config'

const resource = new Resource({ 'service.name': 'person-service-frontend' })
const provider = new WebTracerProvider({ resource })

const collector = new CollectorTraceExporter({ url: tracingUrl })
provider.addSpanProcessor(new SimpleSpanProcessor(collector))
provider.register({ contextManager: new ZoneContextManager() })

const webTracerWithZone = provider.getTracer('person-service-frontend')

registerInstrumentations({
  instrumentations: [
    new FetchInstrumentation({
      propagateTraceHeaderCorsUrls: ['/.*/g'],
      clearTimingResources: true,
      applyCustomAttributesOnSpan:
      (span: Span, request: Request | RequestInit, result: Response | FetchError) => {
        const attributes = (span as any).attributes
        if (attributes.component === 'fetch') {
          span.updateName(`${attributes['http.method']} ${attributes['http.url']}`)
        }
        if (result.status && result.status > 299) {
          span.setStatus({ code: SpanStatusCode.ERROR })
        }
      }
    })
  ]
})

export function traceSpan<F extends (...args: any)
    => ReturnType<F>>(name: string, func: F): ReturnType<F> {
  var singleSpan = webTracerWithZone.startSpan(name)
  return context.with(trace.setSpan(context.active(), singleSpan), () => {
    try {
      const result = func()
      singleSpan.end()
      return result
    } catch (error) {
      singleSpan.setStatus({ code: SpanStatusCode.ERROR })
      singleSpan.end()
      throw error
    }
  })
}

Custom instrumentation

import { Button } from '@mui/material'

import { traceSpan } from 'helpers/tracing'

import styles from './styles.module.scss'

interface Props {
  label: string
  id?: string
  secondary?: boolean
  onClick: () => void
}

export default (props: Props) => {
  const onClick = async () => traceSpan(`'${props.label}' button clicked`, props.onClick)

  return (
    <div className={styles.button}>
      <Button id={props.id} variant={'contained'} color={props.secondary ? 'secondary' : 'primary'} onClick={onClick}>
        {props.label}
      </Button>
    </div>
  )
}

Traceability

Traceability between the frontend and the backend is described in the Trace Context W3C standard. In a nutshell, this is done by adding a traceparent header in the HTTP request to the backend. This is done automatically by @opentelemetry/instrumentation-fetch.

React component instrumentation

OpenTelemetry provides a library that can instrument React components and monitor their performance, such as load time for e.g. This library is called @opentelemetry/plugin-react-load. I tried it, it is working properly, but it is not in the current examples for two reasons. The first is that I am not really interested in React component lifecycle events. The more important reason is that this plugin works for React class components only. I started my React journey after version 16.8, which was released on 6 Feb 2019. Prior to this version functional components were stateless, they were just for data visualization purposes. In version 16.8 hooks have been introduced, which allows state management inside a functional component. I write all my components to be functional with hooks for state management. I do not have justification whether this is good or bad, I like it that way. There is a serious drawback because functions in the functional component reinitialize every time the component is re-rendered, in some cases I had to use useCallback() hook to remember some function state.

Traces output

In order to monitor a trace, run the examples as described in Distributed system observability: complete end-to-end example with OpenTracing, Jaeger, Prometheus, Grafana, Spring Boot, React and Selenium. Accessing http://localhost:3000/ and clicking “Fetch persons” button generates a trace in Jaeger:

Conclusion

OpenTelemetry provides libraries to instrument JavaScript applications and to report the traces to an OpenTelemetry collector. Creating an application with React and instrumenting it to collect OpenTelemetry traces is easy. Behind the scenes, the fetch() method is modified to pass traceparent header in the HTTP request to the backend. This is how tracing between different systems can happen.

Related Posts

Read more...

Distributed system observability: Instrument Spring Boot application with OpenTelemetry

Last Updated on by

Post summary: Instrumenting a Spring Boot Java application with OpenTelemetry.

This post is part of Distributed system observability: complete end-to-end example series. The code used for this series of blog posts is located in selenium-observability-java GitHub repository.

Spring Boot

Spring Boot makes it easy to create stand-alone, production-grade Spring-based Applications that you can “just run”. Most Spring Boot applications need minimal Spring configuration. Creating a basic Spring Boot application takes a few steps.

Application

The application class is the entry point. It should have @SpringBootApplication annotation. I have added additional @ServletComponentScan annotation in order to register a custom response filter. I want to output the TraceId as a response header.

Application

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.servlet.ServletComponentScan;

@ServletComponentScan
@SpringBootApplication
public class PersonServiceApplication {

    public static void main(String[] args) {
        SpringApplication.run(PersonServiceApplication.class, args);
    }
}

Filter

import io.opentelemetry.api.trace.Span;

import javax.servlet.*;
import javax.servlet.annotation.WebFilter;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;

@WebFilter("*")
public class AddResponseHeaderFilter implements Filter {

    @Override
    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
        HttpServletResponse httpServletResponse = (HttpServletResponse) response;
        httpServletResponse.setHeader("x-trace-id", Span.current().getSpanContext().getTraceId());
        chain.doFilter(request, response);
    }
}

Controller

In the current example, there is only one controller with two APIs, a POST and GET endpoints with the same path. The class has to be annotated with @RestController. Each API endpoint is annotated with @RequestMapping, with more details about the path and the method. I have used Spring’s constructor-based dependency injection. I have omitted the @Autowired annotation because there is just one constructor.

import com.automationrhapsody.observability.services.PersonService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import java.util.List;

@RestController
public class PersonController {

    private static final Logger LOGGER 
            = LoggerFactory.getLogger(PersonController.class);

    private PersonService personService;

    public PersonController(PersonService personService) {
        this.personService = personService;
    }

    @RequestMapping(value = "/persons", method = RequestMethod.GET)
    public List<PersonDto> getPersons() {
        LOGGER.info("Processing GET /persons request.");

        List<PersonDto> persons = personService.getPersons();
        return persons;
    }

    @RequestMapping(value = "/persons", method = RequestMethod.POST)
    public Long savePersons(@RequestBody PersonDto person) {
        LOGGER.info("Processing POST /persons request with {}", person);

        Long resultId = personService.savePerson(person);
        return resultId;
    }
}

Repository

Defining a very basic repository can be really easy with Spring, all needed is to extend the CrudRepository interface. If fine-tuning and custom methods are needed then it is needed to create an interface that extends the Spring’s Repository interface and defines custom repository methods inside. Read more in Working with Spring Data Repositories.

import org.springframework.data.repository.CrudRepository;
import org.springframework.stereotype.Repository;

@Repository
public interface FlightRepository extends CrudRepository<PersonEntity, Long> {
}

Service

A service layer is a good idea to handle the business logic between the controller and the repository. In this case, dependency is injected with @Autowired annotation. findAll() is a method that comes from CrudRepository interface. It gets the data from the database. @WithSpan annotation creates a new span in the OpenTelemetry traces. This will be explained in more detail.

import com.automationrhapsody.observability.controllers.PersonDto;
import com.automationrhapsody.observability.repositories.person.FlightRepository;
import com.automationrhapsody.observability.repositories.person.PersonEntity;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.extension.annotations.WithSpan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

@Service
public class PersonService {

    private static final Logger LOGGER =
            LoggerFactory.getLogger(PersonService.class);

    @Autowired
    private FlightRepository flightRepository;

    public List<PersonDto> getPersons() {
        doSomeWorkNewChildSpan();
        Iterable<PersonEntity> persons = flightRepository.findAll();
        return StreamSupport.stream(persons.spliterator(), false)
                .map(this::toPersonDto)
                .collect(Collectors.toList());
    }

    @WithSpan
    public void doSomeWorkNewChildSpan() {
        LOGGER.info("Doing some work In New child span");
        Span span = Span.current();
        span.setAttribute("template.a2", "some value");
        span.addEvent("template.processing2.start", attributes("321"));
        span.addEvent("template.processing2.end", attributes("321"));
    }

    private Attributes attributes(String id) {
        return Attributes.of(AttributeKey.stringKey("app.id"), id);
    }

    private PersonDto toPersonDto(PersonEntity person) {
        PersonDto personDto = new PersonDto();
        personDto.setFirstName(person.getFirstName());
        personDto.setLastName(person.getLastName());
        personDto.setEmail(person.getEmail());
        return personDto;
    }
}

Instrumentation

OpenTelemetry provides a way for manual instrumentation, which will be covered in the subsequent Selenium-based post. OpenTelemetry also provides a Java agent JAR, that can be attached to any Java 8+ application and dynamically injects bytecode to capture telemetry from a number of popular libraries and frameworks. This JAR agent is attached to the Spring Boot application described above. This is done in the Docker file. Jaeger exporter and Jaeger backend endpoint are configured with otel.traces.exporter and otel.exporter.jaeger.endpoint environment variables.


# ========= BUILD =========
FROM maven:3-openjdk-11 as builder
WORKDIR /build

COPY pom.xml pom.xml
RUN mvn dependency:resolve

COPY . .
RUN mvn install

# ========= RUN =========
FROM openjdk:11
ENV APP_NAME person-service
# https://github.com/open-telemetry/opentelemetry-java-instrumentation
ENV JAVA_OPTS "$JAVA_OPTS \
  -Dotel.traces.exporter=jaeger \
  -Dotel.exporter.jaeger.endpoint=http://jaeger:14250 \
  -Dotel.metrics.exporter=none \
  -Dotel.resource.attributes="service.name=${APP_NAME}" \
  -Dotel.javaagent.debug=false \
  -javaagent:/app/opentelemetry-javaagent-all.jar"

ADD https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/download/v1.6.2/opentelemetry-javaagent-all.jar /app/opentelemetry-javaagent-all.jar
COPY --from=builder /build/target/$APP_NAME-*.jar /app/$APP_NAME.jar

CMD java $JAVA_OPTS -jar /app/$APP_NAME.jar

This is it. Just by attaching the Java agent application is instrumented and OpenTelemetry is ready to report traces. OpenTelemetry supports a large number of libraries and frameworks, the full list can be found in Supported libraries, frameworks, application servers, and JVMs.

Custom tracking

In many cases, apart from the standard tracing, the application has to report additional traces. This can be very easily done with the @WithSpan annotation. This annotation marks that the execution of this method or constructor should result in a new Span. It can be used to signal OpenTelemetry auto-instrumentation that a new span should be created whenever a marked method is executed. In the example above, a custom attribute has been added to the new Span. This is not mandatory though, just adding the annotation will record the method invocation as a new span in the trace output.

Traces output

In order to monitor a trace, run the examples as described in Distributed system observability: complete end-to-end example with OpenTracing, Jaeger, Prometheus, Grafana, Spring Boot, React and Selenium. Accessing http://localhost:8090/persons generates a trace in Jaeger:

Conclusion

With the steps described above, create a basic Spring Boot application is fairly easy. It is even easier instrumenting the application with OpenTelemetry tracing, just by adding the Java agent JAR. Custom tracing is also made easy with @WithSpan annotation. Traces give very valuable information about the performance of the different parts of the application.

Related Posts

Read more...

Distributed system observability: complete end-to-end example with OpenTracing, Jaeger, Prometheus, Grafana, Spring Boot, React and Selenium

Last Updated on by

Post summary: Code examples and explanations on an end-to-end example showcasing a distributed system observability from the Selenium tests through React front end, all the way to the database calls of a Spring Boot application. Examples are implemented with the OpenTracing toolset and traces are saved in Jaeger. This example also shows a complete observability setup including tools like Grafana, Prometheus, Loki, and Promtail.

This post is part of Distributed system observability: complete end-to-end example series. The code used for this series of blog posts is located in selenium-observability-java GitHub repository.

Introduction

Nowadays, the MIcroservices architecture is very popular. It certainly has its benefits, allowing the companies to deliver faster products to the market. It is much easier to manage several small applications, each one of them with isolated responsibilities, rather than one big fat monolithic application. Microservices architecture has its challenges as well. One of those challenges is traceability. What happens in case of error, where did it occur, what microservices were involved, what were the requests flow through the system, where is the stack trace? In a monolithic application, the stack trace is shown into the logs, giving the exact location of the error. In a microservices landscape, errors are in many cases meaningless, unless there is full traceability of the request flow.

Observability and distributed tracing

Distributed tracing, also called distributed request tracing, is a method used to profile and monitor applications, especially those built using a microservices architecture. Distributed tracing helps pinpoint where failures occur and what causes poor performance. Logs, metrics, and traces are often known as the three pillars of observability. Further reading on observability can be done in The Three Pillars of Observability article.

OpenTracing

OpenTracing is an API specification and libraries, that enables the instrumentation of distributed applications. It is not locked to any particular vendors and allows flexibility just by changing the configuration of already instrumented applications. More details can be found in Instrumenting your application and What is Distributed Tracing?. Current examples are based on OpenTracing libraries and tools.

End-to-end traceability and observability

In the current examples, I am going to give an end-to-end solution, how observability can be achieved in a distributed system. I have used mnadeem/boot-opentelemetry-tempo project as a basis and have extended it with React Frontend and Selenium tests, to provide a complete end-to-end example. Below is a diagram of the full setup. All applications involved will be explained on a higher level.

PostgreSQL and pgAdmin

The basic examples used PostgreSQL, I thought of changing it to MySQL, but when I did short research, I found that PostgreSQL has some advantages. PostgreSQL is an object-relational database, while MySQL is a purely relational database. This means that Postgres includes features like table inheritance and function overloading, which can be important to certain applications. Postgres also adheres more closely to SQL standards. See more in MySQL vs PostgreSQL — Choose the Right Database for Your Project.

pgAdmin is the default user interface to manage a PostgreSQL database, so it is present in the architecture as well.

Spring Boot backend

Spring Boot is used as a backend. I did want to get some exposure to the technology, so I created a very basic application in Spring Boot. It uses the PostgreSQL database for reading and writing data. Spring Boot application is instrumented with OpenTelemetry Java library and exports the traces in Jaeger format directly to the Jaeger backend. It also writes application log files on a file system. Backend exposes APIs, which are consumed by the frontend. More details on the backend can be found in Distributed system observability: Instrument Spring Boot application with OpenTelemetry post.

React frontend

I am very experienced with React, so this was the natural choice for the frontend technology. The frontend uses fetch() to consume the backend APIs. It is instrumented with OpenTelementry JavaScript libraries to trace all communication happening through fetch() and to exports the traces in OpenTelemetry format to the OpenTelemetry collector. The frontend also has manual instrumentation which traces the actions done by end-users on it. More details on the frontend can be found in Distributed system observability: Instrument React application with OpenTelemetry post.

OpenTelemetry collector

OpenTelemetry collector converts the data received from the frontend in OpenTelemetry format into Jaeger format and exports it to the Jaeger backend. The collector is also extracting the span metrics, which are read by Prometheus, read more in Distributed system observability: extract and visualize metrics from OpenTelemetry spans post. Configurations are described in the collector configuration. Local configurations are in otel-config.yaml.

Selenium tests

Selenium was chosen for the web testing framework because of its observability feature. Actually, this was the reason for which I created the current examples. After getting to know the tracing features of Selenium better, I find them not much useful. Selenium does not provide traceability of the tests, but rather on its internal operations and performance. Having started with the tracing and the whole project, I could not ditch it in the middle, so I have to create a custom way to make Selenium trace the tests. Selenium tests export tracing information in Jaeger format directly into the Jaeger backend. More details on the tests can be found in Distributed system observability: Instrument Selenium tests with OpenTelemetry post.

Cypress tests

Cypress is a front-end testing tool built for the modern web. It is most often compared to Selenium. The initial driver of the current post series was Selenium observability. After I got a better understanding of the observability topic, I’ve decided to add examples on Cypress tests observability for more completeness of the examples. Cypress interacts with the Frontend and exports its traces to OpenTelemetry Collector, which then forwards the traces into Jaeger. More details on the tests can be found in Distributed system observability: Instrument Cypress tests with OpenTelemetry post.

Jaeger

Jaeger, inspired by Dapper and OpenZipkin, is an open-source distributed tracing system. It is used for monitoring and troubleshooting microservices-based distributed systems. Jaeger collects all the traces and provides a search and visualization of the traces. In the original examples, Grafana Tempo was used as a backend and Jaeger UI via the jaeger-query module to open the traces. I initially started with it, but Tempo does not provide a possibility to search the traces. I find this rather inconvenient, so I switched completely to Jaeger.

Promtail

Promtail is an agent which ships the contents of the Spring Boot backend logs to a Loki instance. It is usually deployed to every machine that has applications needed to be monitored. Local configurations are in promtail-local.yaml.

Loki

Grafana Loki is a log aggregation system inspired by Prometheus. It does not index the contents of the logs, but rather a set of labels for each log stream. Log data itself is then compressed and stored in chunks. In the current example, logs are being pushed to Loki by Promtrail. Local configurations are in loki-local.yaml.

Prometheus

Prometheus is an open-source monitoring and alerting toolkit. Prometheus collects and stores its metrics as time-series data, i.e. metrics information is stored with the timestamp at which it was recorded, alongside optional key-value pairs called labels. In the current example, Prometheus is monitoring the Sprint Boot backend, Loki, Jaeger, and OpenTelemetry Collector. It pulls the metrics data from those applications at a regular interval and stores them in its database. Alerts can be configured based on the metrics. Local configurations are in prometheus.yaml.

Grafana

Grafana is an open-source solution for running data analytics, pulling up metrics from different data sources, and monitoring applications with the help of customizable dashboards. The tool helps to study, analyze and monitor data over a period of time, technically called time-series analytics. In the current example, Grafana pulls data from Prometheus, Jaeger, and Loki. Local configurations are in grafana-dashboards.yaml and grafana-datasource.yml.

Explore the example

Running the example is very easy. What is needed is Docker compose and IDE that can run JUnit tests, I prefer IntelliJ IDEA. Run the examples:

  1. Check out the source code from https://github.com/llatinov/selenium-observability-java
  2. Run: docker-compose build
  3. Run: docker-compose up
  4. Open selenium-tests Maven project and run all the unit tests

Explore the example artifacts:

pgAdmin

pgAdmin is accessible at http://localhost:8005/. In order to log in, use the following credentials: pgadmin4@pgadmin.org / admin. This is needed only if the database records have to be read or modified.

Jaeger

Jaeger is accessible at http://localhost:16686. The home page shows rich search functionality. There is a dropdown with all available services, then operations performed by the selected service can be also filtered.

A trace can be opened from the search results. It shows all the actions for this trace that have been recorded.

Grafana

Grafana is accessible at http://localhost:3001. Different data sources can be accessed from the left-hand side menu, there is a small compass, the Explore menu. From the top, there is a dropdown with the available data sources.

Grafana -> Loki

From Grafana select Loki as datasource. Search for {job=”person-service”}, this shows all logs for the Spring Boot backend.

Grafana -> Jaeger

Jaeger data source can open a trace by its id. This data source can be used in conjunction with Loki. Search logs in Loki, then open a log, this exposes a Jaeger button.

Jaeger data source can be opened directly from the dropdown, then type the TraceID.

Grafana -> Prometheus

From Grafana select Prometheus as a data source. Search for {job=”person-service”}, this shows all metrics for the Spring Boot backend.

Prometheus

Prometheus is accessible at http://localhost:9090/. Search for {job=”person-service”}, this shows all metrics for the Spring Boot backend.

Furter posts with details

This is an introductory post, more details, explanations, and code examples on actual implementation can be found in the following posts:

Conclusion

Microservices architecture is used more often. Alongside its advantages, it comes with specific challenges. Observability is one of those challenges and is a very important topic in a distributed software system. In the current example, I have shown end-to-end observability achieved with popular open-source tools. The main objective of my experiments was to be able to trace Selenium test execution through all the systems involved in the distributed architecture.

Related Posts

Read more...

How to use AWS Transcribe in real-time with React and .NET Core

Last Updated on by

Post summary: Practical code example how to use AWS Transcribe from an application with React frontend and .NET Core backend.

AWS Transcribe

Amazon Transcribe makes it easy for developers to add speech to text capabilities to their applications. Amazon Transcribe uses a deep learning process called automatic speech recognition (ASR) to convert speech to text quickly and accurately. Amazon Transcribe can be used to transcribe customer service calls, automate subtitling, and generate metadata for media assets to create a fully searchable archive. You can use Amazon Transcribe Medical to add medical speech to text capabilities to clinical documentation applications.

Real-time usage

Streaming Transcription utilizes HTTP 2’s implementation of bidirectional streams to handle streaming audio and transcripts between your application and the Amazon Transcribe service. Bidirectional streams allow your application to handle sending and receiving data at the same time, resulting in quicker, more reactive results. Read more along with a Java example in Amazon Transcribe now supports real-time transcriptions article. The way to achieve bidirectional communication in the browser is to use WebSocket.

How to use AWS Transcribe

The first thing that is needed is an AWS account with sufficient Transcribe privileges, read more in How Amazon Transcribe Works with IAM article. Once you have this, generate AWS AccessKey and SecretKey. With the AccessKey, SecretKey, and Region, a special pre-signed URL is generated, which is a rather complex process. A WebSocket connection is opened with this pre-signed URL. A full explanation of the process can be found in Using Amazon Transcribe Streaming with WebSockets article. Another very useful example of how to generate the pre-signed URL and open a WebSocket connection is shown in amazon-archives/amazon-transcribe-websocket-static GitHub repo.

Issues with generating pre-signed URL in the browser

So far so good, things are starting to make sense. In order to generate a pre-signed URL in the browser, AWS AccessKey and SecretKey are needed. One option is to make the users provide them every time they want to use the application, which is not really user friendly. Another option is to have the web application generate it automatically, which exposes the AWS credentials and is not really an option. The solution is to have a backend application, which can be even a Lambda function, to calculate the pre-signed URL.

Generate the pre-signed URL in C#

Below is a .NET Core example controller code on how to generate the pre-signed URL in C#:

public class PresignedUrlController : ControllerBase
{
	private const string Service = "transcribe";
	private const string Path = "/stream-transcription-websocket";
	private const string Scheme = "AWS4";
	private const string Algorithm = "HMAC-SHA256";
	private const string Terminator = "aws4_request";
	private const string HmacSha256 = "HMACSHA256";

	private readonly string _region;
	private readonly string _awsAccessKey;
	private readonly string _awsSecretKey;

	public PresignedUrlController(IOptions<Config> options)
	{
		_region = options.Value.AWS_SPEECH_REGION;
		_awsAccessKey = options.Value.AWS_SPEECH_ACCESS_KEY;
		_awsSecretKey = options.Value.AWS_SPEECH_SECRET_KEY;
	}

	[HttpPost]
	public ActionResult<string> GetPresignedUrl()
	{
		return GenerateUrl();
	}

	private string GenerateUrl()
	{
		var host = $"transcribestreaming.{_region}.amazonaws.com:8443";
		var dateNow = DateTime.UtcNow;
		var dateString = dateNow.ToString("yyyyMMdd");
		var dateTimeString = dateNow.ToString("yyyyMMddTHHmmssZ");
		var credentialScope = $"{dateString}/{_region}/{Service}/{Terminator}";
		var query = GenerateQueryParams(dateTimeString, credentialScope);
		var signature = GetSignature(host, dateString, dateTimeString, credentialScope);
		return $"wss://{host}{Path}?{query}&X-Amz-Signature={signature}";
	}

	private string GenerateQueryParams(string dateTimeString, string credentialScope)
	{
		var credentials = $"{_awsAccessKey}/{credentialScope}";
		var result = new Dictionary<string, string>
		{
			{"X-Amz-Algorithm", "AWS4-HMAC-SHA256"},
			{"X-Amz-Credential", credentials},
			{"X-Amz-Date", dateTimeString},
			{"X-Amz-Expires", "30"},
			{"X-Amz-SignedHeaders", "host"},
			{"language-code", "en-US"},
			{"media-encoding", "pcm"},
			{"sample-rate", "44100"}
		};
		return string.Join("&", result.Select(x => $"{x.Key}={Uri.EscapeDataString(x.Value)}"));
	}

	private string GetSignature(string host, string dateString, string dateTimeString, string credentialScope)
	{
		var canonicalRequest = CanonicalizeRequest(Path, host, dateTimeString, credentialScope);
		var canonicalRequestHashBytes = ComputeHash(canonicalRequest);

		// construct the string to be signed
		var stringToSign = new StringBuilder();
		stringToSign.AppendFormat("{0}-{1}\n{2}\n{3}\n", Scheme, Algorithm, dateTimeString, credentialScope);
		stringToSign.Append(ToHexString(canonicalRequestHashBytes, true));

		var kha = KeyedHashAlgorithm.Create(HmacSha256);
		kha.Key = DeriveSigningKey(HmacSha256, _awsSecretKey, _region, dateString, Service);

		// compute the final signature for the request, place into the result and return to the 
		// user to be embedded in the request as needed
		var signature = kha.ComputeHash(Encoding.UTF8.GetBytes(stringToSign.ToString()));
		var signatureString = ToHexString(signature, true);
		return signatureString;
	}

	private string CanonicalizeRequest(string path, string host, string dateTimeString, string credentialScope)
	{
		var canonicalRequest = new StringBuilder();
		canonicalRequest.AppendFormat("{0}\n", "GET");
		canonicalRequest.AppendFormat("{0}\n", path);
		canonicalRequest.AppendFormat("{0}\n", GenerateQueryParams(dateTimeString, credentialScope));
		canonicalRequest.AppendFormat("{0}\n", $"host:{host}");
		canonicalRequest.AppendFormat("{0}\n", "");
		canonicalRequest.AppendFormat("{0}\n", "host");
		canonicalRequest.Append(ToHexString(ComputeHash(""), true));
		return canonicalRequest.ToString();
	}

	private static string ToHexString(byte[] data, bool lowercase)
	{
		var sb = new StringBuilder();
		for (var i = 0; i < data.Length; i++)
		{
			sb.Append(data[i].ToString(lowercase ? "x2" : "X2"));
		}
		return sb.ToString();
	}

	private static byte[] DeriveSigningKey(string algorithm, string awsSecretAccessKey, string region, string date, string service)
	{
		char[] ksecret = (Scheme + awsSecretAccessKey).ToCharArray();
		byte[] hashDate = ComputeKeyedHash(algorithm, Encoding.UTF8.GetBytes(ksecret), Encoding.UTF8.GetBytes(date));
		byte[] hashRegion = ComputeKeyedHash(algorithm, hashDate, Encoding.UTF8.GetBytes(region));
		byte[] hashService = ComputeKeyedHash(algorithm, hashRegion, Encoding.UTF8.GetBytes(service));
		return ComputeKeyedHash(algorithm, hashService, Encoding.UTF8.GetBytes(Terminator));
	}

	private static byte[] ComputeKeyedHash(string algorithm, byte[] key, byte[] data)
	{
		var kha = KeyedHashAlgorithm.Create(algorithm);
		kha.Key = key;
		return kha.ComputeHash(data);
	}

	private static byte[] ComputeHash(string data)
	{
		return HashAlgorithm.Create("SHA-256").ComputeHash(Encoding.UTF8.GetBytes(data));
	}
}

Use the pre-signed URL in a React application

In one of the examples above, there was a raw code of how to open a WebSocket and use it. In the code below an example is given how to do the same in React with TypeScript. Note that there is a WebSocket closer configured to 15 seconds with setTimeout(). It is important to have some kind of a breaker because leave the socket open can generate a significant AWS bill.

import React from 'react';
import { EventStreamMarshaller, Message } from '@aws-sdk/eventstream-marshaller';
import { toUtf8, fromUtf8 } from '@aws-sdk/util-utf8-node';
import mic from 'microphone-stream';
import Axios from 'axios';

const sampleRate = 44100;
const eventStreamMarshaller = new EventStreamMarshaller(toUtf8, fromUtf8);

export default () => {
  const [webSocket, setWebSocket] = React.useState<WebSocket>();
  const [inputSampleRate, setInputSampleRate] = React.useState<number>();

  const streamAudioToWebSocket = async (userMediaStream: any) => {
    const micStream = new mic();

    micStream.on('format', (data: any) => {
      setInputSampleRate(data.sampleRate);
    });

    micStream.setStream(userMediaStream);

    const url = await Axios.post<string>('http://localhost:3016/url');

    //open up our WebSocket connection
    const socket = new WebSocket(url.data);
    socket.binaryType = 'arraybuffer';

    socket.onopen = () => {
      micStream.on('data', (rawAudioChunk: any) => {
        // the audio stream is raw audio bytes. Transcribe expects PCM with additional metadata, encoded as binary
        const binary = convertAudioToBinaryMessage(rawAudioChunk);
        if (socket.readyState === socket.OPEN) {
          socket.send(binary);
        }
      });
    };

    socket.onmessage = (message: MessageEvent) => {
      const messageWrapper = eventStreamMarshaller.unmarshall(Buffer.from(message.data));
      const messageBody = JSON.parse(String.fromCharCode.apply(String, messageWrapper.body as any));
      if (messageWrapper.headers[':message-type'].value === 'event') {
        handleEventStreamMessage(messageBody);
      } else {
        console.error(messageBody.Message);
        stop(socket);
      }
    };

    socket.onerror = () => {
      stop(socket);
    };

    socket.onclose = () => {
      micStream.stop();
    };

    setWebSocket(socket);

    setTimeout(() => {
      stop(socket);
    }, 15000);

    console.log('Amazon started');
  };

  const convertAudioToBinaryMessage = (audioChunk: any): any => {
    const raw = mic.toRaw(audioChunk);

    if (raw == null) return;

    // downsample and convert the raw audio bytes to PCM
    const downsampledBuffer = downsampleBuffer(raw, inputSampleRate, sampleRate);
    const pcmEncodedBuffer = pcmEncode(downsampledBuffer);

    // add the right JSON headers and structure to the message
    const audioEventMessage = getAudioEventMessage(Buffer.from(pcmEncodedBuffer));

    //convert the JSON object + headers into a binary event stream message
    const binary = eventStreamMarshaller.marshall(audioEventMessage);

    return binary;
  };

  const getAudioEventMessage = (buffer: Buffer): Message => {
    // wrap the audio data in a JSON envelope
    return {
      headers: {
        ':message-type': {
          type: 'string',
          value: 'event',
        },
        ':event-type': {
          type: 'string',
          value: 'AudioEvent',
        },
      },
      body: buffer,
    };
  };

  const pcmEncode = (input: any) => {
    var offset = 0;
    var buffer = new ArrayBuffer(input.length * 2);
    var view = new DataView(buffer);
    for (var i = 0; i < input.length; i++, offset += 2) {
      var s = Math.max(-1, Math.min(1, input[i]));
      view.setInt16(offset, s < 0 ? s * 0x8000 : s * 0x7fff, true);
    }
    return buffer;
  };

  const downsampleBuffer = (buffer: any, inputSampleRate: number = 44100, outputSampleRate: number = 16000) => {
    if (outputSampleRate === inputSampleRate) {
      return buffer;
    }

    var sampleRateRatio = inputSampleRate / outputSampleRate;
    var newLength = Math.round(buffer.length / sampleRateRatio);
    var result = new Float32Array(newLength);
    var offsetResult = 0;
    var offsetBuffer = 0;

    while (offsetResult < result.length) {
      var nextOffsetBuffer = Math.round((offsetResult + 1) * sampleRateRatio);

      var accum = 0,
        count = 0;

      for (var i = offsetBuffer; i < nextOffsetBuffer && i < buffer.length; i++) {
        accum += buffer[i];
        count++;
      }

      result[offsetResult] = accum / count;
      offsetResult++;
      offsetBuffer = nextOffsetBuffer;
    }

    return result;
  };

  const handleEventStreamMessage = (messageJson: any) => {
    const results = messageJson.Transcript.Results;
    if (results.length > 0) {
      if (results[0].Alternatives.length > 0) {
        const transcript = decodeURIComponent(escape(results[0].Alternatives[0].Transcript));
        // if this transcript segment is final, add it to the overall transcription
        if (!results[0].IsPartial) {
          const text = transcript.toLowerCase().replace('.', '').replace('?', '').replace('!', '');
          console.log(text);
        }
      }
    }
  };

  const start = () => {
    // first we get the microphone input from the browser (as a promise)...
    window.navigator.mediaDevices
      .getUserMedia({
        video: false,
        audio: true,
      })
      // ...then we convert the mic stream to binary event stream messages when the promise resolves
      .then(streamAudioToWebSocket)
      .catch(() => {
        console.error('Please check thet you microphose is working and try again.');
      });
  };

  const stop = (socket: WebSocket) => {
    if (socket) {
      socket.close();
      setWebSocket(undefined);
      console.log('Amazon stoped');
    }
  };

  return (
    <div className="App">
      <button onClick={() => (webSocket ? stop(webSocket) : start())}>{webSocket ? 'Stop' : 'Start'}</button>
    </div>
  );
};

TypeScript declaration

Module microphone-stream does not have a TypeScript package. In order to create one, a file named microphone-stream.d.ts with content declare module ‘microphone-stream’ is needed.

Conclusion

AWS Transcribe is really easy to use service, which does not require a significant implementation effort. I can compare it with Google Speech-To-Text and it is much harder to make that one work.

Read more...

AWS examples in C# – structured logging in .NET Core and AWS Lambda

Last Updated on by

Post summary: Code examples of how to do structured logging in .NET Core and C# AWS Lambda.

This post is part of AWS examples in C# – working with SQS, DynamoDB, Lambda, ECS series. The code used for this series of blog posts is located in aws.examples.csharp GitHub repository.

Structured logging

In the general case, log files are a big text file with no structure in it. This makes it hard to search and analyze log files. The idea of structured logging is to write log files into a given format, such as JSON or XML, so they can later be machine processed such search or big data analysis. In the current post, I will describe how to log in to JSON format. For e.g. one log entry is:

[12:17:16 INF] New Movie published with Die Hard, {"Title": "Die Hard", "Genre": "Action", "$type": "Movie"}

This is a simple text with the Movie object being input as JSON. In order to process it, a parser has to be implemented, which gets the data into the square brackets and extracts the date-time and the log level, then searching into the content itself is hard because it should be done with lots of regular expressions. With structured logging the same entity will look like this:

{
	"@t": "2020-03-29T10:21:24.2907688Z",
	"@mt": "New Movie published with {Title}, {@Content}",
	"Title": "Die Hard",
	"Content": {
		"Title": "Die Hard",
		"Genre": "Action",
		"$type": "Movie"
	},
	"SourceContext": "SqsWriter.Controllers.PublishController",
	"ActionId": "20de3310-ebce-48d5-8f9c-28914e199937",
	"ActionName": "SqsWriter.Controllers.PublishController.PublishMovie (SqsWriter)",
	"RequestId": "0HLUJPBNDGPSJ:00000001",
	"RequestPath": "/api/publish/movie",
	"SpanId": "|7bab219a-415f5c121e1756f5.",
	"TraceId": "7bab219a-415f5c121e1756f5",
	"ParentId": "",
	"ConnectionId": "0HLUJPBNDGPSJ"
}

There is much more data in the latter and it is also in JSON format which makes it easy to search with JsonPath.

JSON Path

JsonPath is a way to navigate into a JSON document, very similar to XPath for XML. With JSONPath Online Evaluator is easy to try some expressions. Both $.Content.Title and $.Title resolves to Die Hard. So it is very easy for a tool that understands the JsonPath to query logs where some JSON entity is related somehow to a given rule, for e.g. $.Title equals to Die Hard.

AWS CloudWatch

CloudWatch provides different monitoring functions, one of them is logging. By default, all AWS services log into CloudWatch. CloudWatch provides a feature called insights which is able to search into JSON log files.

Serilog

Serilog is a .NET library that provides logging capabilities. Its main benefits are that it can log into JSON format. Serilog can be very easily integrated into any C# project. Two Nuget packages are needed: Serilog and Serilog.AspNetCore.

Configure Serilog for .NET Core application

In the case of .NET Core microservice Serilog is injected into Program.cs.

using Serilog;
using Serilog.Events;
using Serilog.Formatting.Compact;

public static void Main(string[] args)
{
	Log.Logger = new LoggerConfiguration()
		.MinimumLevel.Debug()
		.MinimumLevel.Override("Microsoft", LogEventLevel.Warning)
		.Enrich.FromLogContext()
		.WriteTo.Console(new CompactJsonFormatter())
		.CreateLogger();

	var webHost = WebHost.CreateDefaultBuilder(args)
		.UseStartup<Startup>()
		.UseSerilog()
		.UseUrls("http://*:5100")
		.Build();

	webHost.Run();
}

Configure Serilog for AWS C# Lambda function

In the case of C# lambda function, logging to console is enough, since all logs are sent to CloudWatch. I have created a proxy class called Logger, which instantiates an instance of Serilog logger. The custom logger is then used in lambda function itself.

Logger

public interface ILogger
{
	void LogInformation(string messageTemplate, params object[] arguments);
}

public class Logger : ILogger
{
	private static Serilog.Core.Logger _logger;

	public Logger()
	{
		_logger = new LoggerConfiguration()
			.MinimumLevel.Debug()
			.WriteTo.Console(new CompactJsonFormatter())
			.CreateLogger();
	}

	public void LogInformation(string messageTemplate, params object[] arguments)
	{
		_logger.Information(messageTemplate, arguments);
	}
}

MoviesFunction

private readonly ISqsWriter _sqsWriter;
private readonly IDynamoDbWriter _dynamoDbWriter;
private readonly ILogger _logger;

public MoviesFunction() : this(null, null, null) { }

public MoviesFunction(ISqsWriter sqsWriter, IDynamoDbWriter dynamoDbWriter, ILogger logger)
{
	_sqsWriter = sqsWriter ?? new SqsWriter();
	_dynamoDbWriter = dynamoDbWriter ?? new DynamoDbWriter();
	_logger = logger ?? new Logger();
}

public async Task MoviesFunctionHandler(DynamoDBEvent dynamoEvent, ILambdaContext context)
{
	foreach (var record in dynamoEvent.Records)
	{
		var title = record.Dynamodb.NewImage["Title"].S;
		var logEntry = new LogEntry
		{
			Message = $"Movie '{title}' processed by lambda",
			DateTime = DateTime.Now
		};
		_logger.LogInformation("MoviesFunctionHandler invoked with {Title}", title);

		await _sqsWriter.WriteLogEntryAsync(logEntry);
		await _dynamoDbWriter.PutLogEntryAsync(logEntry);
	}
}

GetMovie

public async Task<APIGatewayProxyResponse> GetMovie(APIGatewayProxyRequest request, ILambdaContext context)
{
	var title = WebUtility.UrlDecode(request.PathParameters["title"]);
	_logger.LogInformation("GetMovie invoked with {Title}", title);

	var document = await _dynamoDbReader.GetDocumentAsync(TableName, title);
	if (document == null)
	{
		_logger.LogInformation("GetMovie produced no results for {Title}", title);
		return new APIGatewayProxyResponse { StatusCode = (int)HttpStatusCode.NotFound };
	}

	var movie = new Movie
	{
		Title = document["Title"],
		Genre = (MovieGenre)int.Parse(document["Genre"])
	};
	_logger.LogInformation("GetMovie result is {Title}, {@Content}", movie.Title, movie);

	return new APIGatewayProxyResponse
	{
		StatusCode = (int)HttpStatusCode.OK,
		Body = _jsonConverter.SerializeObject(movie)
	};
}

AWS CloudWatch Insights

CloudWatch Logs Insights enables interactive search and analyze log data in Amazon CloudWatch Logs. Queries can be performed to help more efficiently and effectively respond to operational issues. Queries are done in a specific purpose-built query language with a few simple but powerful commands. A short example is to search for all logs in which FirstName field equals Bruce. Before that all log groups that has to be searched are selected above.

fields @@mt
| sort @timestamp desc
| limit 20
| filter FirstName = 'Bruce'

An extensive guide on query language can be found on CloudWatch Logs Insights Query Syntax page.

Conclusion

Structured logging can bring a lot of benefits to debugging an application and analyzing its behavior. It is easy to set up and be used.

Related Posts

Read more...

AWS examples in C# – AWS CLI commands

Last Updated on by

Post summary: Important AWS CLI commands used in AWS examples in C#.

This post is part of AWS examples in C# – working with SQS, DynamoDB, Lambda, ECS series. The code used for this series of blog posts is located in aws.examples.csharp GitHub repository.

Introduction

In AWS examples in C# – run the solution post I have described how to install/uninstall current examples. In the current post, I am going to show in detail individual commands used. The configuration parameters in the command below will be given with capital letters and starting with a dollar sign, e.g. $CONFIGURATION_PARAMETER. Each AWS command has its code representation in the SDK for the desired programming language.

AWS Command Line Interface

The AWS Command Line Interface (CLI) is a unified tool to manage AWS services. Control of multiple AWS services from the command line and automate them through scripts can be done with just one tool to download and configure. The full list of services that can be controlled is listed in the AWS Command Line Interface reference page. Each service has a subpage with a list of all available commands. All commands return JSON as a response. In a subsequent post, I will describe how to manage the JSON in the command line. All operations in the current post are done after AWS credentials are set as environment variables:

export AWS_ACCESS_KEY_ID=KIA57FV4.....
export AWS_SECRET_ACCESS_KEY=mSgsxOWVh...
export AWS_DEFAULT_REGION=us-east-1

SQS operations

The full list can be found in aws sqs CLI reference page. More information about SQS can be found in AWS examples in C# – create a service working with SQS post.

Create

Initially, all queues are listed with list-queues, in order to check if the queue already exists.

aws sqs list-queues

The queue is created with create-queue command, the result of the command returns the queue URL.

aws sqs create-queue --queue-name $QUEUE_NAME

After queues are created, the re-drive policy has to be set up. The ARN of the dead-letter queue can be obtained with get-queue-attributes command by providing the queue URL.

aws sqs get-queue-attributes \
	--queue-url $DEAD_LETTER_QUEUE_URL \
	--attribute-names QueueArn

The re-drive policy is set with set-queue-attributes command.

aws sqs set-queue-attributes \
	--queue-url $QUEUE_URL \
	--attributes "{\"RedrivePolicy\":\"{\\\"maxReceiveCount\\\":\\\"3\\\",\\\"deadLetterTargetArn\\\":\\\"$DEAD_LETTER_QUEUE_ARN\\\"}\",\"ReceiveMessageWaitTimeSeconds\":\"$LONG_POLLING_TIMEOUT\"}"

Delete

In order to delete the queue, its URL is needed. The URL is obtained with get-queue-url command.

aws sqs get-queue-url --queue-name $QUEUE_NAME

Deletion happens with delete-queue command.

aws sqs delete-queue --queue-url $QUEUE_URL

DynamoDB operations

The full list can be found in aws dynamodb CLI reference page. More information about DynamoDB can be found in AWS examples in C# – create a service working with DynamoDB post.

Create

The table data is obtained with describe-table command.

aws dynamodb describe-table --table-name $TABLE_NAME

If the table does not exist, it is created with create-table command. The table command has all the data needed. See more about table attributes in AWS examples in C# – create a service working with DynamoDB post.

aws dynamodb create-table \
	--table-name $TABLE_NAME \
	--attribute-definitions 'AttributeName=FirstName,AttributeType=S' 'AttributeName=LastName,AttributeType=S' \
	--key-schema 'AttributeName=FirstName,KeyType=HASH' 'AttributeName=LastName,KeyType=RANGE' \
	--provisioned-throughput 'ReadCapacityUnits=5,WriteCapacityUnits=5' \
	--stream-specification 'StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES'

Delete

The table is deleted by name with delete-table command.

aws dynamodb delete-table --table-name $TABLE_NAME

IAM roles operations

The full list can be found in aws iam CLI reference page.

Create

Roles are listed with list-roles command to check if the role exists.

aws iam list-roles

The role is created with create-role command.

aws iam create-role \
	--role-name $ROLE_NAME \
	--assume-role-policy-document file://assume-role-policy-document.json

This is the only case in the current examples where an additional JSON document is needed alongside a command. It is not possible to pass this JSON inline as it is with aws sqs set-queue-attributes command. This JSON allows certain services to be accessed by this role.

{
	"Version": "2012-10-17",
	"Statement": [
		{
			"Effect": "Allow",
			"Principal": {
				"Service": [
					"lambda.amazonaws.com",
					"ec2.amazonaws.com",
					"ecs.amazonaws.com",
					"ecs-tasks.amazonaws.com",
					"batch.amazonaws.com"
				]
			},
			"Action": "sts:AssumeRole"
		}
	]
}

List policies to get the policy ARN with list-policies command. Basically, to make things easier, AdministratorAccess existing policy is used with its ARN.

aws iam list-policies

Attach the policy to the role with attach-role-policy command.

aws iam attach-role-policy \
	--role-name $ROLE_NAME \
	--policy-arn $POLICY_ARN

Delete

List policies with list-policies command to get the ARN, then detach the policy from the role.

aws iam detach-role-policy \
	--role-name $ROLE_NAME \
	--policy-arn $POLICY_ARN

After the policy is detached, role is deleted with delete-role command.

aws iam delete-role --role-name $ROLE_NAME

AWS Lambda operations

The full list can be found in aws lambda CLI reference page.

Create

List functions with list-functions command to check if the function exists.

aws lambda list-functions

Creating a function is done with create-function command and takes many arguments. Most of the parameters are self-explanatory. Timeout is important, the lambda function execution is suspended after the timeout passes, in current examples, it is 30 seconds, I found that cold start could take up to 15 seconds some times. The lambda configurations are described in AWS examples in C# – create basic Lambda function post.

aws lambda create-function \
	--function-name $FUNCTION_NAME \
	--runtime dotnetcore2.1 \
	--role $ROLE_ARN \
	--handler $HANDLER_STRING_WITH_NAMESPACE_CLASS_METHOD \
	--environment "Variables={AWS_SQS_QUEUE_NAME=$QUEUE_NAME, AWS_SQS_IS_FIFO=$IS_QUEUE_FIFO}" \
	--timeout $FUNCTION_TIMEOUT \
	--zip-file fileb://$PATH_TO_ZIP_FILE)

Once the function is created, it can be linked to an event source, such as DynamoDB. This happens by DynamoDB stream ARN. Once a record is inserted, updated or deleted in DynamoDB, the lambda function is called with this event.

aws lambda create-event-source-mapping \
	--function-name $FUNCTION_NAME \
	--event-source-arn $DYNAMODB_STREAM_ARN \
	--starting-position LATEST)

In case of function already exists, but its code has to be updated, this is done with update-function-code command.

aws lambda update-function-code \
	--function-name $FUNCTION_NAME \
	--zip-file fileb://$PATH_TO_ZIP_FILE)

Along with the code, function configuration can be updated as well with update-function-configuration command.

aws lambda update-function-configuration \
	--function-name $FUNCTION_NAME  \
	--role $ROLE_ARN\
	--handler $HANDLER_STRING_WITH_NAMESPACE_CLASS_METHOD  \
	--environment "Variables={AWS_SQS_QUEUE_NAME=$QUEUE_NAME, AWS_SQS_IS_FIFO=$IS_QUEUE_FIFO}" \
	--timeout $FUNCTION_TIMEOUT

Delete

In order to delete, then the event source UUID has to be obtained, this is done with list-event-source-mappings command.

aws lambda list-event-source-mappings --function-name $FUNCTION_NAME

Then event source mapping is deleted with delete-event-source-mapping command.

aws lambda delete-event-source-mapping --uuid $EVNET_SOURCE_UUID

And finally, the function itself is deleted with delete-function command.

aws lambda delete-function --function-name $FUNCTION_NAME

ECS (Elastic Container Service) operations

The full list can be found in aws ecs CLI reference page.

Create

Before doing anything with ECR, docker login command should be created with get-login, so docker is authenticated with AWS ECR. With eval function, the docker login command is directly executed.

eval $(aws ecr get-login --no-include-email)

Clusters are first listed, in order to evaluate if the application is already deployed.

aws ecs list-clusters

Cluster is created with create-cluster command. A cluster consists of services.

aws ecs create-cluster --cluster-name $CLUSTER_NAME

Existing task definitions are listed, to evaluate whether they are published or not. Task definitions are Docker configurations.

aws ecs describe-task-definition --task-definition $TASK_DEFINITION_NAME

Task definition is created with register-task-definition command.

aws ecs register-task-definition \
	--family $TASK_DEFINITION_NAME \
	--execution-role-arn $ROLE_ARN\
	--network-mode awsvpc \
	--container-definitions $CONTAINER_DEFINITIONS \
	--requires-compatibilities "FARGATE" \
	--cpu "256" \
	--memory "512"

$CONTAINER_DEFINITIONS is a Docker configuration which defines the task definition:

name=$TASK_DEFINITION_NAME,\
image=$IMAGE_TAG,\
environment=[\
	{name=AwsQueueIsFifo,value=$_IS_QUEUE_FIFO},\
	{name=AwsRegion,value=$REGION},\
	{name=AwsQueueName,value=$QUEUE_NAME},\
	{name=AwsAccessKey,value=$AWS_ACCESS_KEY},\
	{name=AwsSecretKey,value=$AWS_SECRET_KEY},\
	{name=AwsQueueAutomaticallyCreate,value=$AWS_QUEUE_AUTO_CREATE},\
	{name=AwsQueueLongPollTimeSeconds,value=$AWS_POLL_TIME_SECONDS}\
],\
logConfiguration={\
	logDriver=awslogs,\
	options={\
		awslogs-group=ecs/$SERVICE_NAME,\
		awslogs-region=$REGION,\
		awslogs-stream-prefix=ecs\
	}\
}

Before creating a service, existing ones are listed with describe-services command. Service has one or more running instances of a task definition. This is how service can scale.

aws ecs describe-services \
	--cluster $CLUSTER_NAME\
	--services $SERVICE_NAME

Creating a service is done with create-service command. $TASK_REVISION is the result of the register-task-definition command. $SUBNET_ID is returned by aws ec2 describe-subnets command.

aws ecs create-service --cluster $CLUSTER_NAME \
	--service-name $SERVICE_NAME \
	--task-definition "$TASK_DEFINITION_NAME:$TASK_REVISION" \
	--desired-count 1 \
	--launch-type "FARGATE" \
	--network-configuration "awsvpcConfiguration={subnets=[$SUBNET_ID],securityGroups=[$SECURITY_GROUP_ID],assignPublicIp=ENABLED}")

Updating of the service is done with a very update-service similar command.

aws ecs update-service --cluster $CLUSTER_NAME \
	--service $SERVICE_NAME \
	--task-definition "$TASK_DEFINITION_NAME:$TASK_REVISION" \
	--desired-count 1 \
	--network-configuration "awsvpcConfiguration={subnets=[$SUBNET_ID],securityGroups=[$SECURITY_GROUP_ID],assignPublicIp=ENABLED}")

Delete

In order to delete task definitions, they should be first listed with list-task-definitions command, so the task definition version is available.

aws ecs list-task-definitions

Removing of the task definition is done with deregister-task-definition command. Note that the command does what it says, deregister, it does not delete. The task definition is kept in history in status INACTIVE.

aws ecs deregister-task-definition --task-definition "$TASK_DEFINITION_VERSION"

Deleting the service is done with the delete-service command, the –force parameter also stops the running tasks.

aws ecs delete-service \
	--cluster $CLUSTER_NAME \
	--service $SERVICE_NAME \
	--force

In the end, the whole cluster is deleted with delete-cluster command.

aws ecs delete-cluster --cluster $CLUSTER_NAME

ECR (Elastic Container Registry) operations

The full list can be found in aws ecr CLI reference page.

Delete

The repository is created by Docker when the image is pushed to it. Repository and images inside are deleted with delete-repository command.

aws ecr delete-repository \
	--repository-name $REPOSITORY_NAME \
	--force

EC2 (Elastic Compute Cloud) operations

The full list can be found in aws ec2 CLI reference page.

Create

EC2 is responsible for security groups, which expose the service to the world by applying firewall rules. Before creating the group, it is first searched for presence with describe-security-groups command.

aws ec2 describe-security-groups

The security group is created with create-security-group command.

aws ec2 create-security-group \
	--description $SECIRITY_GROUP_DESCRIPTION\
	--group-name $SECIRITY_GROUP_NAME

Inbound rules are defined with authorize-security-group-ingress command, where ip_permission is a bash function generation the JSON for better reuse.

aws ec2 authorize-security-group-ingress \
	--group-id $SECURITY_GROUP_ID \
	--ip-permissions "[$(ip_permission $SERVICE_PORT)]"

Function generation firewall rule JSON, $1 is an argument given to the function.

function ip_permission() {
	echo "{\"IpProtocol\": \"tcp\", \"FromPort\": $1, \"ToPort\": $1, \"IpRanges\": [{\"CidrIp\": \"0.0.0.0/0\", \"Description\": \"Port $1\"}]}"
}

Subnets are listed with describe-subnets command. Each subnet has 3 availability zones.

aws ec2 describe-subnets

Finally, in order to report the IP of the deployed service, describe-network-interfaces command is used.

aws ec2 describe-network-interfaces --filters "Name=network-interface-id,Values=$networkInterfaceId"

Delete

A security group is deleted by name with delete-security-group command.

aws ec2 delete-security-group --group-name $SECURITY_GROUP

CloudWatch operations

The full list can be found in aws logs CLI reference page.

Delete

CloudWatch logs are created by default from the services. Deleting the logs is done with delete-log-group command. Note that I am using Git Bash on Windows and MSYS_NO_PATHCONV=1 is mandatory because the log group name starts with /.

MSYS_NO_PATHCONV=1 aws logs delete-log-group --log-group-name ecs/$SERVICE_NAME

Conclusion

AWS command-line interface provides tooling to handle all needed operations of the AWS services. It is the preferred way to manage services over the Web user interface.

Related Posts

Read more...

AWS examples in C# – introduction to Serverless framework

Last Updated on by

Post summary: Introduction to Serverless framework and .NET code example of a lambda function with API Gateway.

This post is part of AWS examples in C# – working with SQS, DynamoDB, Lambda, ECS series. The code used for this series of blog posts is located in aws.examples.csharp GitHub repository.

When speaking about Serverless there are two concepts and terms that need to be clarified.

Serverless architecture

Serverless architecture is an application architectural concept of the cloud, enables shifting more of your operational responsibilities to the cloud. In the current examples, AWS is used, but this is a valid concept for Azure and Google cloud. Serverless allows building and running applications and services without thinking about servers.

Serverless framework

Serverless framework is a toolset that makes deployment of serverless applications to different cloud providers extremely easy and streamlined. It supports the following cloud providers: AWS, Google Cloud, Azure, OpenWhisk, and Kubeless and following programming languages: nodeJS, Go, Python, Swift, Java, PHP, and Ruby.

AWS Lambda

AWS Lambda allows easy ramp-up of service without all the hassle to manage servers and environments. The ready code is uploaded to Lambda and automatically run. More detailed information and design considerations about AWS Lambda can be found in AWS examples in C# – working with Lambda functions post.

CloudFormation

AWS CloudFormation provides infrastructure as a code (IoC) capabilities. It defines a common language to model and provision AWS application resources. AWS resources and applications are described in YAML or JSON files, which are then provisioned by CloudFormation. This gives a single source of truth. The Serverless framework uses applications when creating the underlying AWS JSON CloudFormation templates. What Serverless framework is doing is to translate its custom YAML format to a JSON CloudFormation templates, which can be found in .serverless folder of DynamoDbServerless after the deployment to AWS is done.

Create a Serverless application

Before creating the application, Serverless needs to be installed. Installation is possible as a standalone binary or as a Node.JS package. I prefer the latter because it is much simpler.

npm install -g serverless

Creating an empty application is done with the following command:

sls create --template aws-csharp --path MyService

The command outputs a nice message:

$ sls create --template aws-csharp --path MyService
Serverless: Generating boilerplate...
Serverless: Generating boilerplate in "C:\MyService"
 _______                             __
|   _   .-----.----.--.--.-----.----|  .-----.-----.-----.
|   |___|  -__|   _|  |  |  -__|   _|  |  -__|__ --|__ --|
|____   |_____|__|  \___/|_____|__| |__|_____|_____|_____|
|   |   |             The Serverless Application Framework
|       |                           serverless.com, v1.61.3
 -------'

Serverless: Successfully generated boilerplate for template: "aws-csharp"

Apart from the default lambda project created by AWS tools, see AWS examples in C# – create basic Lambda function post for more details, the Serverless project is not split to src and test. It is a good idea to manually split the project in order to add tests. The configuration of Serverless projects is done in serverless.yml file. The default one is very simple, it states the provider and runtime, which is aws (Amazon) and dotnetcore2.1. The handler shows which method is being called when this lambda is invoked. In the default example, the handler is CsharpHandlers::AwsDotnetCsharp.Handler::Hello, which means CsharpHandlers is the assembly name, configured in aws-csharp.csproj file. The namespace is AwsDotnetCsharp, the class name is Handler and the method is Hello.

service: myservice

provider:
  name: aws
  runtime: dotnetcore2.1

package:
  individually: true

functions:
  hello:
    handler: CsharpHandlers::AwsDotnetCsharp.Handler::Hello

    package:
      artifact: bin/release/netcoreapp2.1/hello.zip

After the application is created, it has to be built with build.cmd or build.sh scripts.

Before deployment, AWS credential should be set:

export AWS_ACCESS_KEY_ID=KIA57FV4.....
export AWS_SECRET_ACCESS_KEY=mSgsxOWVh...
export AWS_DEFAULT_REGION=us-east-1

Deployment happens with sls deploy –region $AWS_DEFAULT_REGION command, the result of deployment command is:

Testing of the function can be done with sls invoke -f hello –region $AWS_DEFAULT_REGION command. Result of testing is:

{
	"Message": "Go Serverless v1.0! Your function executed successfully!",
	"Request": {
		"Key1": null,
		"Key2": null,
		"Key3": null
	}
}

Finally, the stack can be deleted with sls remove –region $AWS_DEFAULT_REGION command.

Use API Gateway

The default, Hello World example is pretty simple. In current examples, I have elaborated a bit on Lambda usage with API Gateway in order to expose it as a RESTful service. Two lambda functions are defined inside functions node, for movies and actions. Each has a handler node which defines where is the code that lambda function is executing. With events/http node and API Gateway endpoint is created. Each endpoint has a path and a method. Movies are using GET method, Actors are working via POST method. Both functions’ handler methods receive APIGatewayProxyRequest object and return APIGatewayProxyResponse object. The payload is found in Body string property of APIGatewayProxyRequest object.

Actors function is querying the Actors table. Mode details on the query and how it is constructed in BuildQueryRequest method can be found in Querying using the low-level interface section in AWS examples in C# – basic DynamoDB operations post.

Movies lambda function is getting the JSON document from Movies table. More details on getting the data can be found in Get item using document interface section in AWS examples in C# – basic DynamoDB operations post.

Actors lambda function has two more security features defined. One is an API Key defined with private: true setting. The request should have x-api-key header with the value which is returned by deployment command, otherwise, an HTTP status code 403 (Forbidden) is returned by API Gateway. See Run the project in AWS section in AWS examples in C# – run the solution post how to obtain the proper value of aws-examples-csharp-api-key API key. The example given here is not really scalable, more details on how to manage properly API keys can be found in Managing secrets, API keys and more with Serverless article.

The other security feature is lambda authorizer configured with authorizer: authorizer setting. The authorizer lambda function is not really doing anything significant in the examples, it uses UserManagement class to check if the Authorization header has proper value.  In the real-life, this would be a database call to get the user authorizations, not like in the examples with a dummy token. The request should have Authorization header with value Bearer validToken, otherwise, an HTTP status code 401 (Unauthorized) is returned by API Gateway.

serverless.yml

service: DynamoDbServerless

provider:
  name: aws
  runtime: dotnetcore2.1
  iamRoleStatements:
    - Effect: "Allow"
      Action:
        - dynamodb:Query
      Resource: ${env:actorsTableArn}
    - Effect: "Allow"
      Action:
        - dynamodb:DescribeTable
        - dynamodb:GetItem
      Resource: ${env:moviesTableArn}
  apiKeys:
    - aws-examples-csharp-api-key

package:
  individually: true
  artifact: bin/release/netcoreapp2.1/DynamoDbServerless.zip

functions:
  movies:
    handler: DynamoDbServerless::DynamoDbServerless.Handlers.MoviesHandler::GetMovie
    events:
      - http:
          path: movies/title/{title}
          method: get

  actors:
    handler: DynamoDbServerless::DynamoDbServerless.Handlers.ActorsHandler::QueryActors
    events:
      - http:
          path: actors/search
          method: post
          private: true
          authorizer: authorizer

  authorizer:
    handler: DynamoDbServerless::DynamoDbServerless.Handlers.AuthorizationHandler::Authorize

ActorsFunction

public async Task<APIGatewayProxyResponse> QueryActors(APIGatewayProxyRequest request, ILambdaContext context)
{
	context.Logger.LogLine($"Query request: {_jsonConverter.SerializeObject(request)}");

	var requestBody = _jsonConverter.DeserializeObject<ActorsSearchRequest>(request.Body);
	if (string.IsNullOrEmpty(requestBody.FirstName))
	{
		return new APIGatewayProxyResponse
		{
			StatusCode = (int)HttpStatusCode.BadRequest,
			Body = "FirstName is mandatory"
		};
	}
	var queryRequest = BuildQueryRequest(requestBody.FirstName, requestBody.LastName);

	var response = await _dynamoDbReader.QueryAsync(queryRequest);
	context.Logger.LogLine($"Query result: {_jsonConverter.SerializeObject(response)}");

	var queryResults = BuildActorsResponse(response);

	return new APIGatewayProxyResponse
	{
		StatusCode = (int)HttpStatusCode.OK,
		Body = _jsonConverter.SerializeObject(queryResults)
	};
}

MoviesFunction

public async Task<APIGatewayProxyResponse> GetMovie(APIGatewayProxyRequest request, ILambdaContext context)
{
	context.Logger.LogLine($"Query request: {_jsonConverter.SerializeObject(request)}");

	var title = WebUtility.UrlDecode(request.PathParameters["title"]);
	var document = await _dynamoDbReader.GetDocumentAsync(TableName, title);
	context.Logger.LogLine($"Query response: {_jsonConverter.SerializeObject(document)}");

	if (document == null)
	{
		return new APIGatewayProxyResponse { StatusCode = (int)HttpStatusCode.NotFound };
	}

	var movie = new Movie
	{
		Title = document["Title"],
		Genre = (MovieGenre)int.Parse(document["Genre"])
	};
	return new APIGatewayProxyResponse
	{
		StatusCode = (int)HttpStatusCode.OK,
		Body = _jsonConverter.SerializeObject(movie)
	};
}

MoviesFunction

public async Task<APIGatewayCustomAuthorizerResponse> Authorize(APIGatewayCustomAuthorizerRequest request, ILambdaContext context)
{
	context.Logger.LogLine($"Query request: {_jsonConverter.SerializeObject(request)}");

	var userInfo = await _userManager.Authorize(request.AuthorizationToken?.Replace("Bearer ", string.Empty));

	return new APIGatewayCustomAuthorizerResponse
	{
		PrincipalID = userInfo.UserId,
		PolicyDocument = new APIGatewayCustomAuthorizerPolicy
		{
			Version = "2012-10-17",
			Statement = new List<APIGatewayCustomAuthorizerPolicy.IAMPolicyStatement>
			{
				new APIGatewayCustomAuthorizerPolicy.IAMPolicyStatement
				{
					Action = new HashSet<string> {"execute-api:Invoke"},
					Effect = userInfo.Effect.ToString(),
					Resource = new HashSet<string> { request.MethodArn }
				}
			}
		}
	};
}

UserManager

public interface IUserManager
{
	Task<UserInfo> Authorize(string token);
}

public class UserManager : IUserManager
{
	private const string ValidToken = "validToken";
	private const string UserId = "usedId";

	public async Task<UserInfo> Authorize(string token)
	{
		var userInfo = new UserInfo
		{
			UserId = UserId,
			Effect = token == ValidToken ? EffectType.Allow : EffectType.Deny
		};

		return userInfo;
	}
}

Conclusion

The Serverless framework is making deployment and maintenance of lambdas very easy. It also supports different cloud providers.

Related Posts

Read more...

AWS examples in C# – create basic Lambda function

Last Updated on by

Post summary: Code examples on how to create AWS Lambda function in .NET Core.

This post is part of AWS examples in C# – working with SQS, DynamoDB, Lambda, ECS series. The code used for this series of blog posts is located in aws.examples.csharp GitHub repository.

AWS Lambda

AWS Lambda allows easy ramp-up of service without all the hassle to manage servers and environments. The ready code is uploaded to Lambda and automatically run. More detailed information and design considerations about AWS Lambda can be found in AWS examples in C# – working with Lambda functions post.

Create Lambda

Amazon provides Amazon.Lambda.Templates NuGet package which contains a lot of templates for Lambda functions. The NuGet package can be installed with dotnet new -i Amazon.Lambda.Templates. Once templates are installed, a new empty function is created with:

dotnet new lambda.EmptyFunction --name MyFunction

Two projects are created: src and test. The source project is with netcoreapp2.1 runtime and has reference to Amazon.Lambda.Core and Amazon.Lambda.Serialization.Json NuGet packages. It has only one simple function that takes a string input and converts it to uppercase. The test project has a unit test that tests this function.

Once the function is ready, it can be deployed to AWS Lambda and tested. In order to deploy, the Amazon lambda tools should be installed with: dotnet tool install -g Amazon.Lambda.Tools. Once the tool is installed, there should be an IAM role created with name MyRole for e.g. Before deploying the lambda, environment variable with AWS access information should be set:

export AwsAccessKey=KIA57FV4.....
export AwsSecretKey=mSgsxOWVh...
export AwsRegion=us-east-1

Then lambda is deployed with:

dotnet lambda deploy-function MyFunction \
	--function-role MyRole \
	--project-location src/MyFunction \
	--region $AwsRegion \
	--aws-access-key-id $AwsAccessKey \
	--aws-secret-key $AwsSecretKey

The function can be tested with:

dotnet lambda invoke-function MyFunction \
	--payload "Just Testing the Payload" \
	--project-location src/MyFunction \
	--region $AwsRegion \
	--aws-access-key-id $AwsAccessKey \
	--aws-secret-key $AwsSecretKey

The result is shown:

Amazon Lambda Tools for .NET Core applications (3.3.1)
Project Home: https://github.com/aws/aws-extensions-for-dotnet-cli, https://github.com/aws/aws-lambda-dotnet

Payload:
"JUST TESTING THE PAYLOAD"

Log Tail:
START RequestId: 3f1844e0-7437-4219-a391-621a55dec0e9 Version: $LATEST
END RequestId: 3f1844e0-7437-4219-a391-621a55dec0e9
REPORT RequestId: 3f1844e0-7437-4219-a391-621a55dec0e9  Duration: 925.38 ms    Billed Duration: 1000 ms Memory Size: 256 MB     Max Memory Used: 62 MB  Init Duration: 196.85 ms

The duration in the example above is 925.38ms, billed duration is 1000ms. This is the first run, the cold start as described in the previous section, which took almost a second for a very simple function. Next run the duration was 0.28ms.

The lambda can also manually be deployed and tested, see how in Run a “Hello, World!” with AWS Lambda article.

Listen to DynamoDB events

In AWS examples in C# – create a service working with DynamoDB post, I have described more about DynamoDB and its streams are very well integrated with AWS Lambda. In the current examples, the lambda functions are designed to process DynamoDB stream events. DynamoDB stream ARN (Amazon Resource Name) is defined as an event source for the lambda. Then the lambda receives DynamoDBEvent object, which is defined in Amazon.Lambda.DynamoDBEvents NuGet package. There is not much business logic in the lambda function, once the event object is received, it is read, logged to AWS CloudWatch with ILambdaContext, and its data is written to another DynamoDB table and to an SQS queue. For this purpose, references to AWSSDK.DynamoDBv2 and AWSSDK.SQS NuGet packages are made. In the example code, I have created SQS and DynamoDB proxies, so functionalities are isolated and only what is needed is exposed, these are DynamoDbWriter and SqsWriter. In both proxies, the configuration is done with Region, which is read by AWS_REGION environment variable. This variable is always present in the AWS Lambda environment. In this case, there is no need for authentication with AWSCredentials class, as everything happens inside AWS. The lambda function has two constructors, one is receiving instances of both proxies, if none are passed it instantiates them. An automatic dependency injection is not used with lambda. This constructor is used by the unit tests to pass mocked objects. The parameterless constructor is needed by AWS Lambda to instantiate the function class.

MoviesFunction

public class MoviesFunction
{
	private readonly ISqsWriter _sqsWriter;
	private readonly IDynamoDbWriter _dynamoDbWriter;

	public MoviesFunction() : this(null, null) { }

	public MoviesFunction(ISqsWriter sqsWriter, IDynamoDbWriter dynamoDbWriter)
	{
		_sqsWriter = sqsWriter ?? new SqsWriter();
		_dynamoDbWriter = dynamoDbWriter ?? new DynamoDbWriter();
	}

	public async Task FunctionHandler(DynamoDBEvent dynamoEvent, ILambdaContext context)
	{
		context.Logger.LogLine($"Beginning to process {dynamoEvent.Records.Count} records...");

		foreach (var record in dynamoEvent.Records)
		{
			context.Logger.LogLine($"Event ID: {record.EventID}");
			context.Logger.LogLine($"Event Name: {record.EventName}");

			var streamRecordJson = _dynamoDbWriter.SerializeStreamRecord(record.Dynamodb);
			context.Logger.LogLine($"DynamoDB Record:{streamRecordJson}");
			context.Logger.LogLine(streamRecordJson);

			var logEntry = new LogEntry
			{
				Message = $"Movie '{record.Dynamodb.NewImage["Title"].S}' processed by lambda",
				DateTime = DateTime.Now
			};
			await _sqsWriter.WriteLogEntryAsync(logEntry);
			await _dynamoDbWriter.PutLogEntryAsync(logEntry);
		}

		context.Logger.LogLine("Stream processing complete.");
	}
}

DynamoDbWriter

public interface IDynamoDbWriter
{
	Task PutLogEntryAsync(LogEntry logEntry);
	string SerializeStreamRecord(StreamRecord streamRecord);
}

public class DynamoDbWriter : IDynamoDbWriter
{
	private static readonly string Region = Environment.GetEnvironmentVariable("AWS_REGION") ?? "us-east-1";

	private readonly IAmazonDynamoDB _dynamoDbClient;
	private readonly JsonSerializer _jsonSerializer;

	public DynamoDbWriter()
	{
		var dynamoDbConfig = new AmazonDynamoDBConfig
		{
			RegionEndpoint = RegionEndpoint.GetBySystemName(Region)
		};
		_dynamoDbClient = new AmazonDynamoDBClient(dynamoDbConfig);
		_jsonSerializer = new JsonSerializer();
	}

	public async Task PutLogEntryAsync(LogEntry logEntry)
	{
		var request = new PutItemRequest
		{
			TableName = "LogEntries",
			Item = new Dictionary<string, AttributeValue>
			{
				{"Message", new AttributeValue {S = logEntry.Message}},
				{"DateTime", new AttributeValue {S = logEntry.ToString()}}
			}
		};

		await _dynamoDbClient.PutItemAsync(request);
	}

	public string SerializeStreamRecord(StreamRecord streamRecord)
	{
		using (var writer = new StringWriter())
		{
			_jsonSerializer.Serialize(writer, streamRecord);
			return writer.ToString();
		}
	}
}

SqsWriter

public interface ISqsWriter
{
	Task WriteLogEntryAsync(LogEntry logEntry);
}

public class SqsWriter : ISqsWriter
{
	private static readonly string QueueName = Environment.GetEnvironmentVariable("AWS_SQS_QUEUE_NAME");
	private static readonly bool IsQueueFifo = bool.Parse(Environment.GetEnvironmentVariable("AWS_SQS_IS_FIFO") ?? "false");
	private static readonly string Region = Environment.GetEnvironmentVariable("AWS_REGION") ?? "us-east-1";

	private readonly IAmazonSQS _sqsClient;

	public SqsWriter()
	{
		var sqsConfig = new AmazonSQSConfig
		{
			RegionEndpoint = RegionEndpoint.GetBySystemName(Region)
		};
		_sqsClient = new AmazonSQSClient(sqsConfig);
	}

	public async Task WriteLogEntryAsync(LogEntry logEntry)
	{
		var queueUrl = await _sqsClient.GetQueueUrlAsync(QueueName);
		var sendMessageRequest = new SendMessageRequest
		{
			QueueUrl = queueUrl.QueueUrl,
			MessageBody = JsonConvert.SerializeObject(logEntry),
			MessageAttributes = SqsMessageTypeAttribute.CreateAttributes(typeof(LogEntry).Name)
		};
		if (IsQueueFifo)
		{
			sendMessageRequest.MessageGroupId = typeof(LogEntry).Name;
			sendMessageRequest.MessageDeduplicationId = Guid.NewGuid().ToString();
		}

		await _sqsClient.SendMessageAsync(sendMessageRequest);
	}
}

Conclusion

As shown in the current post, it is very easy to create lambda function, deploy and run it. The current post shows a lambda function that listens to DynamoDB events and processes those events by writing into another DynamoDB table and SQS queue.

Related Posts

Read more...

AWS examples in C# – working with Lambda functions

Last Updated on by

Post summary: Iintroduction to AWS Lambda functions.

This post is part of AWS examples in C# – working with SQS, DynamoDB, Lambda, ECS series. The code used for this series of blog posts is located in aws.examples.csharp GitHub repository.

AWS Lambda

AWS Lambda allows easy ramp-up of service without all the hassle to manage servers and environments. The ready code is uploaded to Lambda and automatically run. AWS Lambda automatically scales applications by running code in response to each trigger. The code runs in parallel and processes each trigger individually, scaling precisely with the size of the workload.

Main concepts

There are several terms that need to be briefly explained to get some understanding of what AWS Lambda offers.

  • Function – A code written in a programming language, for supported runtime, that does some computational work.
  • Runtime – Allow running of functions in different programming languages, supported languages are: Node.js, Python, Ruby, Java, Go, .NET.
  • Event – A JSON formatted document that contains data for a function to process, which is converted to object by the runtime and passed to the function.
  • Concurrency – The number of requests that your function is serving at any given time. If a function is invoked, meanwhile executing another task, then another instance is provisioned, increasing the function’s concurrency.
  • Trigger – A resource or configuration that invokes a Lambda function. This includes AWS services, applications, and event source mappings.
  • Event source mapping – A resource in Lambda that reads items from a stream or queue and invokes a function.

AWS Lambda applications

Lambda is the actual name of serverless functions in AWS. Along with the lambda functions, AWS supports also a concept of an application, which is a combination of Lambda functions, event sources, and other resources that work together to perform tasks. AWS CloudFormation is used to collect application’s components into a single package that can be deployed and managed as one resource. Applications make Lambda projects portable.

CloudFormation

AWS CloudFormation provides infrastructure as a code (IoC) capabilities. It defines a common language to model and provision AWS application resources. AWS resources and applications are described in YAML or JSON files, which are then provisioned by CloudFormation. This gives a single source of truth.

API Gateway

API Gateway is a fully managed service that makes it easy to create, publish, maintain, monitor, and secure APIs. APIs act as the “front door” for applications to access data, business logic, or functionality from backend services. It is very easy to create RESTful APIs and WebSocket APIs with API Gateway. It supports traffic management, CORS support, authorization, access control, throttling, monitoring, and API version management.

API Keys

API keys are the way to create usage plans, so APIs can be given to customers as a product offering with predefined request rates and quotas. A usage plan is created in AWS and it has a throttling limit, which is basically the request rate limit that is applied to each API key that you add to the usage plan. A quota is configured to the usage plan and applied to its API keys. This is the maximum number of requests with a given API key that can be submitted within a specified time interval. API keys can be provided to API Gateway in the X-API-Key header, this is what is shown in the current examples. Another way to work with API keys is with a lambda authorizer function, which returns the API key as part of the authorization response. API Keys can be created or imported from a file. Important is that API keys are not used to manage authentication and authorization.

Access control

Access control to a REST API in API Gateway can be done with several mechanisms:

  • Resource policies
  • Standard AWS IAM roles and policies
  • IAM tags can be used together with IAM policies to control access
  • Endpoint policies for interface VPC endpoints
  • Lambda authorizers
  • Amazon Cognito user pools

Lambda (custom) authorizers

In the examples given, lambda (formerly knows as custom) authorizer is used. API Gateway uses a dedicated Lambda function to do the authorization. More details on how to use authorizers can be found in AWS examples in C# – introduction to Serverless framework post.

CloudWatch

CloudWatch is a monitoring and observability service. CloudWatch collects monitoring and operational data in the form of logs, metrics, and events, providing a unified view of AWS resources, applications, and services. CloudWatch can be used to detect anomalous behavior, set alarms, visualize logs and metrics side by side, take automated actions, troubleshoot issues. By default, AWS Lambda is logging into CloudWatch. This makes it very easy to trace lambda function issues.

Design considerations

There are some specifics that have to be taken into consideration when using lambdas. One of the benefits of lambdas is to be cost-effective. Users can select what amount of RAM to set for the lambda function when it is created. This is done with –memory-size in aws lambda create-function command, see more in AWS examples in C# – deploy with AWS CLI commands post. The default value is 128MB and CPU is allocated proportionally. Sometimes defining too low memory can end up in unexpected performance issues. This should be monitored and optimized based on specific programming language and code. Lambdas are paid per 100ms execution time, so this also should be taken into consideration when tweaking the memory setting. In terms of cost-effectiveness, it is more expensive to add more RAM in order to optimize from 100ms to 50ms execution time, because 100ms on the higher amount of RAM is being paid. It has to be analyzed how much it makes sense for the end-users. Also, another consideration is that API Gateway adds additional delay in total time for the request. CloudWatch logs cost money, so awareness is needed about how much data a lambda function is logging. More pitfalls with more details using lambdas can be found in Serverless Pitfalls: Issues With Running a Startup on AWS Lambda article.

Still, the main consideration for lambda performance is so-called cold start. If the function has not been run for a while then it needs some time for the first request to go through. I’ve seen up to 4 seconds when experimenting, although I had not really measured it. Theoretically, there is an option to ping your API at a certain amount of time to keep it “warm”. In practice, for heavy loads, AWS runs parallel instances of the lambda, in order to handle the traffic, and each new instance will have a cold start.

Create lambda

Practical examples of how to create AWS Lambda functions are available in the following posts:

Conclusion

AWS Lambda is a very convenient and easy way to create running applications with minimal overhead. There are certain design considerations such as lambda cold start that has to be taken into consideration when deciding on lambda usage.

Related Posts

Read more...

AWS examples in C# – create a service working with DynamoDB

Last Updated on by

Post summary: Introduction to NoSQL, introduction to DynamoDB and what are its basic features and capabilities.

This post is part of AWS examples in C# – working with SQS, DynamoDB, Lambda, ECS series. The code used for this series of blog posts is located in aws.examples.csharp GitHub repository. In the current post, I give an overview of DyanmoDB and what it can be used for.

NoSQL database

NoSQL database provides a mechanism for storage and retrieval of data that is modeled in means other than the tabular relations used in relational databases (RDBMS). There are several types of NoSQL databases:

  • Key-value stores – every single item in the database is stored as an attribute name (or ‘key’), together with its value.
  • Document databases – pair each key with a complex data structure known as a document, usually, it is a JSON document. Documents can contain many different key-value pairs, or key-array pairs, or even nested documents.
  • Graph stores – used to store information about networks of data. Data is organized in the form of nodes and connections between the nodes.
  • Wide-column stores – store columns of data together, instead of rows. It can query large data volumes faster than conventional relational databases.

A very good article on the NoSQL topic is NoSQL Databases Explained.

AWS DynamoDB

Amazon DynamoDB is a key-value and document database that delivers single-digit millisecond performance at any scale. It’s a fully managed, multi-region, multi-master, durable database with built-in security, backup and restore, and in-memory caching for internet-scale applications.

DynamoDB tables

DynamoDB stores data in tables. The data is represented as items, which have attributes. When a table is created, along with its name, a primary key should be provided. The primary key can consist only of a partition key (HASH), it is mandatory. DynamoDB uses an internal hash function to evenly distribute data items across partitions, based on their partition key values. The primary key can also consist of the partition key and sort key (RANGE), which is complementary to the partition. DynamoDB stores items with the same partition key physically close together, in sorted order by the sort key value.

Secondary indexes

DynamoDB offers the possibility to define so-called secondary indexes. There are two types – global and local. A global secondary index is a one that has a partition, a HASH, key different than the HASH key or the table, each table has a limit of 20 global indexes. A local index is one that has the same partition key but different sorting key. Up to 5 local secondary indexes per table are allowed. Properly managing those indexes is the key to using efficiently DynamoDB as a storage unit.

Streams

DynamoDB Streams is an optional feature that captures data modification events in DynamoDB tables. The data about different DynamoDB events appear in the stream in near-real-time, and in the order that the events occurred. Each event is represented by a stream record in case of add, update or delete an item. Stream records can be configured what data to hold, they can have the old and the new item, or only one of them if needed, or even only the keys. Stream records have a lifetime of 24 hours, after that, they are automatically removed from the stream. Streams are used together with AWS Lambda to create a trigger code that executes automatically whenever an event appears in a stream.

Read/Write Capacity Mode

Amazon DynamoDB has two read/write capacity modes for processing reads and writes on your tables: on-demand and provisioned, which is the default, free-tier eligible mode. The read/write capacity mode controls how charges are applied to read and write throughput and how to manage capacity. The capacity mode is set when the table is created and it can be changed later. The provisioned mode is the default one, it is recommended to be used in case of known workloads. The on-demand mode is recommended to be used in case of unpredictable and unknown workloads. DynamoDB provides auto-scaling capabilities so the table’s provisioned capacity is adjusted automatically in response to traffic changes.

Understanding the concept around read and write capacity units is tricky. One write capacity unit is up to 1KB of data per second. If write is done in a transaction though, then the capacity unit count doubles. An example is if there is 2KB of data to be written per second, then the table definition needs 2 write capacity units. If the write is done in a transaction though, then 4 capacity units have to be defined. Read capacity unit is similar, with the difference that there are two flavors of reading – strongly consistent read and eventually consistent read. An eventually consistent read means, that data returned by DynamiDB might not be up to date and some write operation might not have been refracted to it. If data should be guaranteed to be propagated on all DynamoDB nodes and it is up-to-date data, then strongly consistent read is needed. One read capacity unit gives one strongly consistent read or two eventually consistent reads for data up to 4KB. Transactions double the count if read units needed, hence two units are required to read data up to 4KB. For example, if the data to be read is 8 KB, then 2 read capacity units are required to sustain one strongly consistent read per second, 1 read capacity unit if in case of eventually consistent reads, or 4 read capacity units for a transactional read request.

In case the application exceeds the provisioned throughput capacity on a table or index, then it is subject to request throttling. Throttling prevents the application from consuming too many capacity units. When a request is throttled, it fails with an HTTP 400 code (Bad Request) and a ProvisionedThroughputExceededException. The AWS SDKs have built-in support for retrying throttled requests, so no custom logic is needed.

Different programmatic interfaces

Every AWS SDK provides one or more programmatic interfaces for working with Amazon DynamoDB. These interfaces range from simple low-level DynamoDB wrappers to object-oriented persistence layers. The available interfaces vary depending on the AWS SDK and programming language that you use. For C# available interfaces are low-level interface, document interface and object persistence interface. In AWS examples in C# – basic DynamoDB operations post I have given detailed code examples of all of them.

Low-level interface

The low-level interface lets the consumer manage all the details and do the data mapping. Data is mapped manually to its proper data type. Supported data types are:

  • B – binary value, a MemoryStream
  • BS – list of MemoryStream objects
  • S – string
  • SS – list of string objects
  • N – number converted into a string
  • NS – list of number strings
  • BOOL – boolean
  • L – list of AttributeValue objects
  • M – map, dictionary of AttributeValue objects
  • NULL – if set to true, then this is a null value

If the low-level interface is used for querying then a KeyConditionExpression is used to query the data. It is called a query, but it not actually a query in terms of RDBMS way of thinking, as the HASH key should be only used with an equality operator. For the RANGE key, there is a variety of operators to be used, such as:

  • sortKeyName = :sortkeyval – true if the sort key value is equal to :sortkeyval
  • sortKeyName < :sortkeyval – true if the sort key value is less than :sortkeyval
  • sortKeyName <= :sortkeyval – true if the sort key value is less than or equal to :sortkeyval
  • sortKeyName > :sortkeyval – true if the sort key value is greater than :sortkeyval
  • sortKeyName >= :sortkeyval – true if the sort key value is greater than or equal to :sortkeyval
  • sortKeyName BETWEEN :sortkeyval1 AND :sortkeyval2 – true if the sort key value is greater than or equal to :sortkeyval1, and less than or equal to :sortkeyval2
  • begins_with ( sortKeyName, :sortkeyval ) – true if the sort key value begins with a particular operand. (You cannot use this function with a sort key that is of type Number.) Note that the function name begins_with is case-sensitive.

Document interface

The document programming interface returns the full document by its unique HASH key. The document is actually a JSON.

{
	"Title": {
		"Value": "Die Hard",
		"Type": 0
	},
	"Genre": {
		"Value": "0",
		"Type": 1
	}
}

Object persistence interface

WIth object persistency client classes are mapped to DynamoDB tables. There are several attributes that can be applied to database model classes, such as  DynamoDBTable, DynamoDBHashKey, DynamoDBRangeKey, DynamoDBProperty, DynamoDBIgnore, etc. To save the client-side objects to the tables, the object persistence model provides the DynamoDBContext class, an entry point to DynamoDB. This class provides a connection to DynamoDB and enables you to access tables, perform various CRUD operations.

Architectural constraints

Understanding DynamoDB nature is important in order to design a service that works with it. It is important to cost-efficiently define the table capacity. If less capacity is defined, then consumers can get 400 responses, the other extreme is to generate way too much cost. Another aspect is reading the data. DynamoDB does not provide a way to search for data. In any case, the application that used DynamoDB has to have a proper way to access the data by key.

Using DynamoDB in a service

DynamoDB can be straight forward used in a service, such as SqsReader or ActorsServerlessLambda and MoviesServerlessLambda functions, see the bigger picture in AWS examples in C# – working with SQS, DynamoDB, Lambda, ECS post. An AmazonDynamoDBClient is instantiated and used with one of the programming interfaces described above.

Another important usage is to subscribe to and process stream events. This is done in both ActorsLambdaFunction and MoviessLambdaFunction. See more details about Lambda usage in AWS examples in C# – working with Lambda functions post.

More information on how to run the solution can be found in AWS examples in C# – run the solution post.

Conclusion

In the current post, I have given a basic overview of DynamoDB. It is important to understand its specifics in order to use it efficiently.

Related Posts

Read more...

AWS examples in C# – basic DynamoDB operations

Last Updated on by

Post summary: Code examples with DynamoDB write and read operations.

This post is part of AWS examples in C# – working with SQS, DynamoDB, Lambda, ECS series. The code used for this series of blog posts is located in aws.examples.csharp GitHub repository. In the current post, I give practical code examples of how to work with DynamoDB.

Instantiate Amazon DynamoDB client

In the current examples, in SqsReader project, a configuration class called AppConfig is used. Its values are injected from the environment variables by .NET Core framework in Startup class. In order to work with DynamoDB, a client is needed. The DynamoDB client interface is called IAmazonDynamoDB and comes from AWS C# SDK. The NuGet package is called AWSSDK.DynamoDBv2. The concrete AWS client implementation is AmazonDynamoDBClient and an object is instantiated in DynamoDbClientFactory class and used as a singleton. RegionEndpoint is used to instantiate AmazonDynamoDBConfigAwsCredentials class extends the AWS’ abstract AWSCredentials and is used in order to manage the credentials.

DynamoDbClientFactory.cs

public static AmazonDynamoDBClient CreateClient(AppConfig appConfig)
{
	var dynamoDbConfig = new AmazonDynamoDBConfig
	{
		RegionEndpoint = RegionEndpoint.GetBySystemName(appConfig.AwsRegion)
	};
	var awsCredentials = new AwsCredentials(appConfig);
	return new AmazonDynamoDBClient(awsCredentials, dynamoDbConfig);
}

AwsCredentials.cs

public class AwsCredentials : AWSCredentials
{
	private readonly AppConfig _appConfig;

	public AwsCredentials(AppConfig appConfig)
	{
		_appConfig = appConfig;
	}

	public override ImmutableCredentials GetCredentials()
	{
		return new ImmutableCredentials(_appConfig.AwsAccessKey,
						_appConfig.AwsSecretKey, null);
	}
}

AppConfig.cs

public class AppConfig
{
	public string AwsRegion { get; set; }
	public string AwsAccessKey { get; set; }
	public string AwsSecretKey { get; set; }
}

Creating tables

DatabaseClient class that uses and exposes just a few methods of IAmazonDynamoDB is custom created. It checks if the table is created and if it is not, then it creates the table. Afterward, it waits for the table to become in status ACTIVE. Movies and Actors tables creation is done in separate classes with CreateTableRequest, which needs the table name. KeySchema specifies the attributes that build the primary key for a table or an index. The attributes must also be defined in the AttributeDefinitions list. KeyType has two possible values – HASH and RANGE. In the case of the Movies table, there is only a HASH key, which is always mandatory and unique, this means no two items can have the same partition key value, the second insert overwrites the first one. In the case of the Actors table, along with the partition key, there is also a sort key with KeyType of RANGE which is complimentary to the HASH. I have not used secondary indexes in the current example, but DynamoDB provides this functionality. They can be defined with GlobalSecondaryIndexes and LocalSecondaryIndexes elements of the CreateTableRequest. A stream is defined with StreamSpecification element in the CreateTableRequest, its StreamViewType is NEW_AND_OLD_IMAGES. This means that in case of add, update or delete, the DynamoDBEvent, which is later used in a lambda, holds both the new values and the old values of the item. ProvisionedThroughput is used to set the read and write capacity mode. In the current example, it is 5 capacity units for reading and the same for writing. The ProvisionedThroughput is needed because the default BillingMode is PROVISIONED. It can be changed to PAY_PER_REQUEST and then ProvisionedThroughput should not be specified. A more detailed explanation of each parameter can be found in AWS examples in C# – create a service working with DynamoDB post.

MoviesRepository.cs

public async Task CreateTableAsync()
{
	var request = new CreateTableRequest
	{
		TableName = TableName,
		KeySchema = new List<KeySchemaElement>
		{
			new KeySchemaElement
			{
				AttributeName = "Title",
				KeyType = "HASH"
			}
		},
		AttributeDefinitions = new List<AttributeDefinition>
		{
			new AttributeDefinition
			{
				AttributeName = "Title",
				AttributeType = "S"
			}
		},
		ProvisionedThroughput = new ProvisionedThroughput
		{
			ReadCapacityUnits = 5,
			WriteCapacityUnits = 5
		},
		StreamSpecification = new StreamSpecification
		{
			StreamEnabled = true,
			StreamViewType = StreamViewType.NEW_AND_OLD_IMAGES
		}
	};

	await _client.CreateTableAsync(request);
}

ActorsRepository.cs

public async Task CreateTableAsync()
{
	var request = new CreateTableRequest
	{
		TableName = TableName,
		KeySchema = new List<KeySchemaElement>
		{
			new KeySchemaElement
			{
				AttributeName = "FirstName",
				KeyType = "HASH"
			},
			new KeySchemaElement
			{
				AttributeName = "LastName",
				KeyType = "RANGE"
			}
		},
		AttributeDefinitions = new List<AttributeDefinition>
		{
		   new AttributeDefinition
			{
				AttributeName = "FirstName",
				AttributeType = "S"
			},
			new AttributeDefinition
			{
				AttributeName = "LastName",
				AttributeType = "S"
			}
		},
		ProvisionedThroughput = new ProvisionedThroughput
		{
			ReadCapacityUnits = 5,
			WriteCapacityUnits = 5
		},
		StreamSpecification = new StreamSpecification
		{
			StreamEnabled = true,
			StreamViewType = StreamViewType.NEW_AND_OLD_IMAGES
		}
	};

	await _client.CreateTableAsync(request);
}

DatabaseClient.cs

private const string StatusUnknown = "UNKNOWN";
private const string StatusActive = "ACTIVE";

private readonly IAmazonDynamoDB _client;

public DatabaseClient(IAmazonDynamoDB client)
{
	_client = client;
}

public async Task CreateTableAsync(CreateTableRequest createTableRequest)
{
	var status = await GetTableStatusAsync(createTableRequest.TableName);
	if (status != StatusUnknown)
	{
		return;
	}

	await _client.CreateTableAsync(createTableRequest);

	await WaitUntilTableReady(createTableRequest.TableName);
}

public async Task PutItemAsync(PutItemRequest putItemRequest)
{
	await _client.PutItemAsync(putItemRequest);
}

private async Task<string> GetTableStatusAsync(string tableName)
{
	try
	{
		var response = await _client.DescribeTableAsync(new DescribeTableRequest
		{
			TableName = tableName
		});
		return response?.Table.TableStatus;
	}
	catch (ResourceNotFoundException)
	{
		return StatusUnknown;
	}
}

private async Task WaitUntilTableReady(string tableName)
{
	var status = await GetTableStatusAsync(tableName);
	for (var i = 0; i < 10 && status != StatusActive; ++i)
	{
		await Task.Delay(500);
		status = await GetTableStatusAsync(tableName);
	}
}

Different programmatic interfaces

In AWS examples in C# – create a service working with DynamoDB post, I have explained more details about the three different programmatic interfaces, that DynamoDB offers, a low-level interface, document interface, and object persistence interface.

Writing using the low-level interface

The low-level interface lets the consumer manage all the details and do the data mapping. Here is an example of how to create an Actor using the low-level interface. Data is mapped manually to its proper data type. In this case, the actor.FirstName and actor.LastName is assigned to the S property of the AttributeValue, which is a string type.

private readonly IDatabaseClient _client;
public async Task SaveActorAsync(Actor actor)
{
	var request = new PutItemRequest
	{
		TableName = TableName,
		Item = new Dictionary<string, AttributeValue>
		{
			{"FirstName", new AttributeValue {S = actor.FirstName}},
			{"LastName", new AttributeValue {S = actor.LastName}}
		}
	};
	await _client.PutItemAsync(request);
}

The full code is in ActorsRepository.cs.

Writing using the object persistence interface

With the object persistency interface, client classes are mapped to DynamoDB tables. The example given below comes from the original AWS documentation and shows explicit mapping. With DynamoDBTable the mapping to the table is created, then DynamoDBHashKey and DynamoDBRangeKey annotate the keys. With DynamoDBProperty a specific name can be given, so it is different from the table field name. Title is directly mapped to Title field in the database table. DynamoDBIgnore attribute ignores writing and reading this particular property to and from the table.


[DynamoDBTable("ProductCatalog")]
public class Book
{
	[DynamoDBHashKey]
	public int Id { get; set; }

	public string Title { get; set; }

	[DynamoDBRangeKey]
	public int ISBN { get; set; }

	[DynamoDBProperty("Authors")]
	public List<string> BookAuthors { get; set; }

	[DynamoDBIgnore]
	public string CoverPage { get; set; }
}

To save the client-side objects to the tables, the object persistence model provides the DynamoDBContext class, an entry point to DynamoDB. This class provides a connection to DynamoDB and enables you to access tables and perform various CRUD operations. The current examples are slightly different since Movie model is very simple, there are no DynamoDB attributes on it, so DynamoDBContext uses its default mapping features to map them. The movie has two properties, Title, which is a string and is the HASH key in the table and Genre, which is an enum, practically an integer.


public enum MovieGenre
{
	[EnumMember(Value = "Action Movie")]
	Action,
	[EnumMember(Value = "Drama Movie")]
	Drama
}

public class Movie
{
	public string Title { get; set; }

	[JsonConverter(typeof(StringEnumConverter))]
	public MovieGenre Genre { get; set; }
}

Since there is no DynamoDBTable attribute of the model, then DynamoDBContext is trying to map it by default to a table with the name Movie, but such a table does not exist. This is why DynamoDBOperationConfig is needed to map to the correct table name.


private readonly IDynamoDBContext _context;

public async Task SaveMovieAsync(Movie movie)
{
	var operationConfig = new DynamoDBOperationConfig
	{
		OverrideTableName = "Movies"
	};
	await _context.SaveAsync(movie, operationConfig);
}

The full code is in MoviesRepository.cs. Object persistence interface is a wide topic, full details can be found in .NET: Object Persistence Model page.

Querying using the low-level interface

An example is given for query request for Actors table that has FirstName as a HASH key and LastName as RANGE key. Important here is KeyConditionExpression, it holds the actual query. It is called a query, but it not actually a query in terms of RDBMS way of thinking, as the HASH key should be only used with an equality operator. For the RANGE key, there is a variety of operators to be used, in the example given equality operator is used as well. To add value to the value placeholder, :FirstName in the example, ExpressionAttributeValues is used. The dictionary key is the placeholder value, and AttributeValue is the value mapped to a specific value type, in the example, it is S, for a string. It is also possible to give placeholder value for the table field name as well, which is then replaced with the actual value in ExpressionAttributeNames dictionary, such as #LastName.

private static QueryRequest BuildQueryRequest(string firstName, string lastName)
{
	var request = new QueryRequest("Actors")
	{
		KeyConditionExpression = "FirstName = :FirstName"
	};
	request.ExpressionAttributeValues.Add(":FirstName", new AttributeValue
	{
		S = firstName
	});

	if (!string.IsNullOrEmpty(lastName))
	{
		request.KeyConditionExpression += " AND #LastName = :LastName";
		request.ExpressionAttributeNames.Add("#LastName", "LastName");
		request.ExpressionAttributeValues.Add(":LastName", new AttributeValue
		{
			S = lastName
		});
	}

	return request;
}

The full code is in ActorsHandler.cs. More about DynamoDB queries can be found in DynamoDB API_Query page.

Get item using document interface

The document programming interface returns the full document by its unique HASH key. The table is accessed with public static Table LoadTable(IAmazonDynamoDB ddbClient, TableConfig config) and then the document is loaded with public Task<Document> GetItemAsync(Primitive hashKey). In current examples, a proxy class is defined, which isolates the IAmazonDynamoDB operations:

GetDocumentAsync


public async Task<Document> GetDocumentAsync(string tableName, string documentKey)
{
	var table = Table.LoadTable(_dynamoDbClient, new TableConfig(tableName));
	return await table.GetItemAsync(new Primitive(documentKey));
}

GetDocumentAsync

var document = await _dynamoDbReader.GetDocumentAsync(TableName, title);

var movie = new Movie
{
	Title = document["Title"],
	Genre = (MovieGenre)int.Parse(document["Genre"])
};

The document is actually a JSON.

{
	"Title": {
		"Value": "Die Hard",
		"Type": 0
	},
	"Genre": {
		"Value": "0",
		"Type": 1
	}
}

The full code is in MoviesHandler.cs.

Conclusion

In the current post, I have given practical code examples of how to do the basic DynamoDB operations in C#. This post is complimentary to AWS examples in C# – create a service working with DynamoDB post.

Related Posts

Read more...

AWS examples in C# – create a service working with SQS

Last Updated on by

Post summary: To give a basic overview of AWS SQS, how to write a message to it and how to make a consumer that constantly polls the queue for new messages.

This post is part of AWS examples in C# – working with SQS, DynamoDB, Lambda, ECS series. The code used for this series of blog posts is located in aws.examples.csharp GitHub repository.

Event-driven architecture

I would like to briefly touch the topic of event-driven architecture since message service providers, such as SQS or RabbitMQ are the basis of its implementation. This is a software architecture paradigm promoting the production, detection, consumption of, and reaction to events. An event is a significant change in the state of an object, to which someone might be interested in. All communication happens asynchronously and systems are loosely coupled. An event-driven system typically consists of event emitters, event consumers, and event channels. Emitters have the responsibility to detect, gather, and transfer events. Emitters do not know the consumers of the events, they do not even know if a consumer exists. Consumers have the responsibility of applying a reaction as soon as an event is presented in a dedicated channel. This leads to the pattern commonly known as eventual consistency, which pushes the complexity of consistency to the application tier, which is the biggest challenge to solve in an event-driven architecture.

Apart from SQS, there is even more sophisticated service from AWS called EventBridge, which makes it easy to build event-driven applications because it takes care of event ingestion and delivery, security, authorization, and error handling. It is basically a serverless event bus that makes it easy to connect applications together using data from its own applications, integrated Software-as-a-Service (SaaS) applications, and AWS services.

AWS SQS

SQS stands for Simple Queue Service, it is a fully managed message queuing service that enables to decouple and scale microservices, distributed systems, and serverless applications. SQS eliminates the complexity and overhead associated with managing and operating message-oriented middleware and empowers developers to focus on differentiating work.

Types of queues

SQS offers two types of message queues:

  • Standard queues – they offer maximum throughput, best-effort ordering, and at-least-once delivery. This means there is no guaranteed order and messages can be duplicated.
  • FIFO queues – they are designed to guarantee that messages are processed exactly once, in the exact order that they are sent.

Dead-letter queue

In addition to those, there is a special type of queues, called dead-letter queues. They are used mainly for debugging and failure proofing applications. If a message cannot be successfully processed after several retries from one of the source queues above, it ends in the dead-letter queue, from which it can be analyzed and returned back to source queue for reprocessing.

Message processing

It is important to know how SQS operates, in order to make good architectural decisions. When a message is published to the queue it becomes visible. When some consumer reads the message, then the message becomes not visible, but still present in the queue, its status now is in-flight. There is visibility timeout which by default is 30 seconds, the maximum value is 12 hours. After the visibility timeout passes then the message is visible again to be read by consumers. In case there is no dead-letter queue, this process happens over and over until the message retention period is reached, afterward message gets automatically deleted. The retention period default value is 4 days, the maximum value is 14 days. In case of a dead-letter queue, after the message cannot be processed for more than maximum receive count times, then it goes to dead-letter queue and stays in the dead-letter for its message retention period. See more info on SQS on How Amazon SQS Works page.

Architectural approaches

One queue or many queues

Since many event emitters can write messages to the queue it gets tricky to process the messages properly. One option is to have a separate queue for separate types of messages, another option is to put some metadata into the messages. I have decided to go for the solution with one queue because I have just one consumer which knows which message processor to call and thus simplify the code. In the case of many SQS queues, there should be many consumers defined in the code, which is better to split into many micro-services, for each SQS queue.

Dead-letter

I would say a dead-letter queue with the maximum retention period of 14 days is a good idea. In this case, messages can be quarantined which will not slow down the normal queue operations. In the case of no dead-letter queue and default timeouts, if a message cannot be processed, then it will appear every 30 seconds for a period of 4 days, this makes 2880 times a day, 11520 times in total. Now imagine there are thousands of messages like this one. I have decided to go for a dead-letter queue with the default retention period.

Long polling

Long polling is another aspect that has to be considered. It can be enabled in two ways. One is on a queue level, by setting the ReceiveMessageWaitTimeSeconds when creating the queue, it can be from 1 to 20 seconds. Other way to enable it is when messages are read from the queue, there is WaitTimeSeconds setting in the request, which can be from 1 to 20 seconds. In case both options are combined, then WaitTimeSeconds takes precedence.

Unknown messages

Another architectural decision in case there is only one queue is what to be done with unknown messages. In the case of no dead-letter queue, messages are good to be deleted, otherwise, they will keep showing for the queue’s retention period. I throw an error in the logs and after 3 unsuccessful attempts, which is the receive count times I have configured, the message goes to the dead-letter queue.

Standard vs. FIFO queues

SQS is able to handle a high amount of messages, theoretically an unlimited amount of messages per second. Standard SQS queue does not maintain any order of messages and also it is possible that there is a duplication of messages delivery. For this reason, AWS offers a FIFO (First-In-First-Out) queue, they provide message order and ensure exactly-once processing. The limitation of the FIFO queue is its number of transactions per second, which are 300 messages per second or 3000 if they are in batched mode.

SQS queue operations at a glance

In AWS examples in C# – basic SQS queue operations post following the operations briefed below were described in more details:

  • Create queue with dead-letter queue
  • Read messages from the queue
  • Write a message to the queue (comes in two flavors)
  • Delete messages to the queue
  • Move messages from dead-letter to source queue

Creating SQS message consumer

In order to read the messages, there should be a consumer that constantly polls the queue and processes the messages. ProcessMessageAsync uses the strategy design pattern to get the proper message processor based on MessageType attribute. Processors are stored in _messageProcessors which is IEnumerable<IMessageProcessor> and is injected by .NET Core dependency injection. If a processor is found, then the processor is invoked, if not an error is shown in the logs. This logic can be subject to change if unknown messages are tolerated in the queue. In ProcessAsync method there is a while loop, which constantly reads for messages by _sqsClient which SqsClient class described in previous sections. SQS returns the response if there are some messages or if WaitTimeSeconds time expired when reading the message or ReceiveMessageWaitTimeSeconds configured by AwsQueueLongPollTimeSeconds environment variable has expired. This while loop is a little tricky to unit test though as it consumes the main thread, and the mocked object should be instructed to wait. Everything is controlled by a CancellationTokenSource, when this is canceled, then consumption is stopped.

ProcessMessageAsync

private async Task ProcessMessageAsync(Message message)
{
	try
	{
		var messageType = message.MessageAttributes.GetMessageTypeAttributeValue();
		if (messageType == null)
		{
			throw new Exception($"No 'MessageType' attribute present in message {JsonConvert.SerializeObject(message)}");
		}

		var processor = _messageProcessors.SingleOrDefault(x => x.CanProcess(messageType));
		if (processor == null)
		{
			throw new Exception($"No processor found for message type '{messageType}'");
		}

		await processor.ProcessAsync(message);
		await _sqsClient.DeleteMessageAsync(message.ReceiptHandle);
	}
	catch (Exception ex)
	{
		_logger.LogError(ex, $"Cannot process message [id: {message.MessageId}, receiptHandle: {message.ReceiptHandle}, body: {message.Body}] from queue {_sqsClient.GetQueueName()}");
	}
}

ProcessAsync

private async void ProcessAsync()
{
	try
	{
		while (!_tokenSource.Token.IsCancellationRequested)
		{
			var messages = await _sqsClient.GetMessagesAsync(_tokenSource.Token);
			messages.ForEach(async x => await ProcessMessageAsync(x));
		}
	}
	catch (OperationCanceledException)
	{
		//operation has been canceled but it shouldn't be propagated
	}
}

StartConsuming

public void StartConsuming()
{
	if (!IsConsuming())
	{
		_tokenSource = new CancellationTokenSource();
		ProcessAsync();
	}
}

private bool IsConsuming()
{
	return _tokenSource != null && !_tokenSource.Token.IsCancellationRequested;
}

Message processors

In the current example, I have taken the architectural design decision to have one queue and different messages into it. For each different type of message, there is a relevant processor. With the strategy design pattern, the appropriate message processor is picked based on MessageType attribute. Processors implement a very simple interface IMessageProcessor. In the current example, they take the message as a string, serialize it to an object and save this object to DynamoDB. A sample implementation is shown below:

IMessageProcessor

public interface IMessageProcessor
{
	bool CanProcess(string messageType);
	Task ProcessAsync(Message message);
}

ActorMessageProcessor

public bool CanProcess(string messageType)
{
	return messageType == typeof(Actor).Name;
}

public async Task ProcessAsync(Message message)
{
	var actor = JsonConvert.DeserializeObject<Actor>(message.Body);
	await _actorsRepository.SaveActorAsync(actor);
	_logger.LogInformation($"ActorMessageProcessor invoked with: {message.Body}");
}

AWS ECS and AWS ECR

ECS stands for Elastic Container Service is a fully managed container orchestration service. Containers can be run in clusters using AWS Fargate, which is a serverless compute for containers. Fargate removes the need to provision and manage servers, lets you specify and pay for resources per application, and improves security through application isolation by design.

ECR stands for Elastic Container Registry is a fully-managed Docker container registry that makes it easy for developers to store, manage, and deploy Docker container images. ECR is integrated with ECS, eliminating the need to operate own container repositories or worry about scaling the underlying infrastructure.

SqsWriter and SqsReader

SqsWriter is a .NET Core 3.0 application, that is dockerized and run in AWS ECS with Fargate, and its container images are stored in ECR. It exposes an API that can be used to publish Actor or Movie objects as messages with separate MessageType attributes in the SQS queue.

SqsReader is a .NET Core 3.0 application, that is dockerized and run in AWS ECS with Fargate, and its container images are stored in ECR. It has a consumer that listens to the SQS queue and processes the messages by writing them into appropriate AQS DynamoDB tables. It also exposes API to stop or start processing, as long as reprocess the dead-letter queue or simply get the queue status.

More information on how to run the solution can be found in AWS examples in C# – run the solution post.

Conclusion

In the current post, I have given some concepts of event-driven architecture and how SQS fits in it. Also, I have described some architectural considerations when using SQS queues, such as dead-letter queues, one queue with different message type or several queues, etc. In the end, I have given practical code on how to make a consumer for the SQS queue.

Related Posts

Read more...

AWS examples in C# – basic SQS queue operations

Last Updated on by

Post summary: Code examples of how to perform basic SQS queue operations like reading, writing, deleting, creating a queue, etc.

This post is part of AWS examples in C# – working with SQS, DynamoDB, Lambda, ECS series. The code used for this series of blog posts is located in aws.examples.csharp GitHub repository. In the current post, I will put in practice example basic SQS operations, a more detailed description of their usage is available in AWS examples in C# – create a service working with SQS post.

Instantiate Amazon SQS client

In the current examples, I use a configuration class called AppConfig. Its values are injected from the environment variables by .NET Core framework in Startup class. In order to work with SQS, a client is needed. The SQS client interface is called IAmazonSQS and comes from AWS C# SDK. The NuGet package is called AWSSDK.SQS, which in the current example comes as a sub-reference from Automationrhapsody.Aws.Examples.Models NuGet package. The concrete AWS client implementation is AmazonSQSClient and a singleton object is instantiated in SqsClientFactory class, where RegionEndpoint is used to instantiate AmazonSQSConfig. I use the AwsCredentials class which extends the AWS’ abstract AWSCredentials in order to manage the credentials.

SqsClientFactory.cs

public static AmazonSQSClient CreateClient(AppConfig appConfig)
{
	var sqsConfig = new AmazonSQSConfig
	{
		RegionEndpoint = RegionEndpoint.GetBySystemName(appConfig.AwsRegion)
	};
	var awsCredentials = new AwsCredentials(appConfig);
	return new AmazonSQSClient(awsCredentials, sqsConfig);
}

AwsCredentials.cs

public class AwsCredentials : AWSCredentials
{
	private readonly AppConfig _appConfig;

	public AwsCredentials(AppConfig appConfig)
	{
		_appConfig = appConfig;
	}

	public override ImmutableCredentials GetCredentials()
	{
		return new ImmutableCredentials(_appConfig.AwsAccessKey,
						_appConfig.AwsSecretKey, null);
	}
}

AppConfig.cs

public class AppConfig
{
	private const string FifoSuffix = ".fifo";
	private string _queueName;

	public string AwsRegion { get; set; }
	public string AwsAccessKey { get; set; }
	public string AwsSecretKey { get; set; }
	public string AwsQueueName
	{
		get => AwsQueueIsFifo ? _queueName + FifoSuffix : _queueName;
		set => _queueName = value;
	}
	public string AwsDeadLetterQueueName
	{
		get
		{
			var deadLetter = _queueName + "-exceptions";
			return AwsQueueIsFifo ? deadLetter + FifoSuffix : deadLetter;
		}
	}

	public bool AwsQueueAutomaticallyCreate { get; set; }
	public bool AwsQueueIsFifo { get; set; }
	public int AwsQueueLongPollTimeSeconds { get; set; }
}

Startup.cs

public Startup()
{
	var configurationBuilder = new ConfigurationBuilder()
		.AddEnvironmentVariables();
}

Local SqsClient dependencies

This sample code shows what external dependencies the SqsClient class needs. They are injected into the constructor by .NET Core dependency injection.

private readonly AppConfig _appConfig;
private readonly IAmazonSQS _sqsClient;
private readonly ILogger<SqsClient> _logger;
private readonly ConcurrentDictionary<string, string> _queueUrlCache;

public SqsClient(IOptions<AppConfig> awsConfig, 
	IAmazonSQS sqsClient, ILogger<SqsClient> logger)
{
	_appConfig = awsConfig.Value;
	_sqsClient = sqsClient;
	_logger = logger;
	_queueUrlCache = new ConcurrentDictionary<string, string>();
}

Create SQS queue and dead-letter queue

Queues can be created programmatically, something that will be described in the current post. Another option is to create them from the AWS CLI, see more information in AWS examples in C# – deploy with AWS CLI commands post.

Once the client is in place, then the queue and dead-letter queue is created with the code below. The code snippet also enables long polling for the queue, which allows reducing costs while allowing consumers to receive messages as soon as they arrive in the queue. Basically SQS waits until a message is available in a queue before sending a response.

public async Task CreateQueueAsync()
{
	const string arnAttribute = "QueueArn";

	try
	{
		var createQueueRequest = new CreateQueueRequest();
		if (_appConfig.AwsQueueIsFifo)
		{
			createQueueRequest.Attributes.Add("FifoQueue", "true");
		}

		createQueueRequest.QueueName = _appConfig.AwsQueueName;
		var createQueueResponse = await _sqsClient.CreateQueueAsync(createQueueRequest);
		createQueueRequest.QueueName = _appConfig.AwsDeadLetterQueueName;
		var createDeadLetterQueueResponse = await _sqsClient.CreateQueueAsync(createQueueRequest);

		// Get the the ARN of dead letter queue and configure main queue to deliver messages to it
		var attributes = await _sqsClient.GetQueueAttributesAsync(new GetQueueAttributesRequest
		{
			QueueUrl = createDeadLetterQueueResponse.QueueUrl,
			AttributeNames = new List<string> { arnAttribute }
		});
		var deadLetterQueueArn = attributes.Attributes[arnAttribute];

		// RedrivePolicy on main queue to deliver messages to dead letter queue if they fail processing after 3 times
		var redrivePolicy = new
		{
			maxReceiveCount = "3",
			deadLetterTargetArn = deadLetterQueueArn
		};
		await _sqsClient.SetQueueAttributesAsync(new SetQueueAttributesRequest
		{
			QueueUrl = createQueueResponse.QueueUrl,
			Attributes = new Dictionary<string, string>
			{
				{"RedrivePolicy", JsonConvert.SerializeObject(redrivePolicy)},
				// Enable Long polling
				{"ReceiveMessageWaitTimeSeconds", _appConfig.AwsQueueLongPollTimeSeconds.ToString()}
			}
		});
	}
	catch (Exception ex)
	{
		_logger.LogError(ex, $"Error when creating SQS queue {_appConfig.AwsQueueName} and {_appConfig.AwsDeadLetterQueueName}");
	}
}

Read messages from the SQS queue

Reading is done with the given code, where _queueUrlCache is ConcurrentDictionary<string, string>. Queue URL is cached for better performance in GetQueueUrl method.

GetMessagesAsync

public async Task<List<Message>> GetMessagesAsync(string queueName, CancellationToken cancellationToken = default)
{
	var queueUrl = await GetQueueUrl(queueName);

	try
	{
		var response = await _sqsClient.ReceiveMessageAsync(new ReceiveMessageRequest
		{
			QueueUrl = queueUrl,
			WaitTimeSeconds = _appConfig.AwsQueueLongPollTimeSeconds,
			AttributeNames = new List<string> { "ApproximateReceiveCount" },
			MessageAttributeNames = new List<string> { "All" }
		}, cancellationToken);

		if (response.HttpStatusCode != HttpStatusCode.OK)
		{
			throw new AmazonSQSException($"Failed to GetMessagesAsync for queue {queueName}. Response: {response.HttpStatusCode}");
		}

		return response.Messages;
	}
	catch (TaskCanceledException)
	{
		_logger.LogWarning($"Failed to GetMessagesAsync for queue {queueName} because the task was canceled");
		return new List<Message>();
	}
	catch (Exception)
	{
		_logger.LogError($"Failed to GetMessagesAsync for queue {queueName}");
		throw;
	}
}

GetQueueUrl

private async Task<string> GetQueueUrl(string queueName)
{
	if (string.IsNullOrEmpty(queueName))
	{
		throw new ArgumentException("Queue name should not be blank.");
	}

	if (_queueUrlCache.TryGetValue(queueName, out var result))
	{
		return result;
	}

	try
	{
		var response = await _sqsClient.GetQueueUrlAsync(queueName);
		return _queueUrlCache.AddOrUpdate(queueName, response.QueueUrl, (q, url) => url);
	}
	catch (QueueDoesNotExistException ex)
	{
		throw new InvalidOperationException($"Could not retrieve the URL for the queue '{queueName}' as it does not exist or you do not have access to it.", ex);
	}
}

Write a message to the SQS queue

The current example is to write a single message to the queue. AWS SDK offers a method called SendMessageBatchAsync, which can send a group of messages. Because of the nature of the example application, the use of SendMessageBatchAsync is not needed. Writing comes in two flavors. With generic method accepting object instance or with method accepting message text and message type.

In the case of a FIFO queue, there are two more values to be set. One is the MessageGroupId, so messages from the same group are always processed one by one. In the current example, messages are grouped by type. Another mandatory thing is MessageDeduplicationId, which used by SQS for deduplication of sent messages. If a message with a particular message deduplication ID is sent successfully, any messages sent with the same message deduplication ID are accepted successfully but aren’t delivered during the 5-minute deduplication interval.

PostMessageAsync<T>

public async Task PostMessageAsync<T>(string queueName, T message)
{
	var queueUrl = await GetQueueUrl(queueName);

	try
	{
		var sendMessageRequest = new SendMessageRequest
		{
			QueueUrl = queueUrl,
			MessageBody = JsonConvert.SerializeObject(message),
			MessageAttributes = SqsMessageTypeAttribute.CreateAttributes<T>()
		};
		if (_appConfig.AwsQueueIsFifo)
		{
			sendMessageRequest.MessageGroupId = typeof(T).Name;
			sendMessageRequest.MessageDeduplicationId = Guid.NewGuid().ToString();
		}

		await _sqsClient.SendMessageAsync(sendMessageRequest);
	}
	catch (Exception ex)
	{
		_logger.LogError(ex, $"Failed to PostMessagesAsync to queue '{queueName}'. Exception: {ex.Message}");
		throw;
	}
}

PostMessageAsync

public async Task PostMessageAsync(string queueName, string messageBody, string messageType)
{
	var queueUrl = await GetQueueUrl(queueName);

	try
	{
		var sendMessageRequest = new SendMessageRequest
		{
			QueueUrl = queueUrl,
			MessageBody = messageBody,
			MessageAttributes = SqsMessageTypeAttribute.CreateAttributes(messageType)
		};
		if (_appConfig.AwsQueueIsFifo)
		{
			sendMessageRequest.MessageGroupId = messageType;
			sendMessageRequest.MessageDeduplicationId = Guid.NewGuid().ToString();
		}

		await _sqsClient.SendMessageAsync(sendMessageRequest);
	}
	catch (Exception ex)
	{
		_logger.LogError(ex, $"Failed to PostMessagesAsync to queue '{queueName}'. Exception: {ex.Message}");
		throw;
	}
}

Distinguishing messages in the queue

Since many event emitters can write messages to the queue it gets tricky to process the messages properly. One option is to have a separate queue for separate types of messages, another option is to put some metadata into the messages. I have decided to go for the solution with one queue because I have just one consumer which knows which message processor to call. In the case of many consumers, it is recommended to have several SQS queues, so the consumer does not need to read and disregard messages, this is not optimal.

Every message is added additional MessageAttributes. In the example above this is done with SqsMessageTypeAttribute.CreateAttributes(messageType) extension method, available in Automationrhapsody.Aws.Examples.Models NuGet package, which is also part of the examples code, is located in Models project. What this method does is to add MessageType string attribute, where the value is typeof(T).Name.

public static class SqsMessageTypeAttribute
{
	private const string AttributeName = "MessageType";

	public static string GetMessageTypeAttributeValue(this Dictionary<string, MessageAttributeValue> attributes)
	{
		return attributes.SingleOrDefault(x => x.Key == AttributeName).Value?.StringValue;
	}

	public static Dictionary<string, MessageAttributeValue> CreateAttributes<T>()
	{
		return CreateAttributes(typeof(T).Name);
	}

	public static Dictionary<string, MessageAttributeValue> CreateAttributes(string messageType)
	{
		return new Dictionary<string, MessageAttributeValue>
		{
			{
				AttributeName, new MessageAttributeValue
				{
					DataType = nameof(String),
					StringValue = messageType
				}
			}
		};
	}
}

Delete message from the queue

Once the message is processed, it should be removed from the queue. This is done with the following method:

public async Task DeleteMessageAsync(string queueName, string receiptHandle)
{
	var queueUrl = await GetQueueUrl(queueName);

	try
	{
		var response = await _sqsClient.DeleteMessageAsync(queueUrl, receiptHandle);

		if (response.HttpStatusCode != HttpStatusCode.OK)
		{
			throw new AmazonSQSException($"Failed to DeleteMessageAsync with for [{receiptHandle}] from queue '{queueName}'. Response: {response.HttpStatusCode}");
		}
	}
	catch (Exception)
	{
		_logger.LogError($"Failed to DeleteMessageAsync from queue {queueName}");
		throw;
	}
}

Reprocess messages from dead-letter queue

If there is a problem with message processing, they are moved to the dead-letter queue. There might be a specific bug in the consumer application for this particular type of message. This bug might be fixed, new version deployed and now all those messages should be reprocessed. Moving from dead-letter to source queue is done with the following code:

public async Task RestoreFromDeadLetterQueueAsync(CancellationToken cancellationToken = default)
{
	var deadLetterQueueName = _appConfig.AwsDeadLetterQueueName;

	try
	{
		var token = new CancellationTokenSource();
		while (!token.Token.IsCancellationRequested)
		{
			var messages = await GetMessagesAsync(deadLetterQueueName, cancellationToken);
			if (!messages.Any())
			{
				token.Cancel();
				continue;
			}

			messages.ForEach(async message =>
			{
				var messageType = message.MessageAttributes.GetMessageTypeAttributeValue();
				if (messageType != null)
				{
					await PostMessageAsync(message.Body, messageType);
					await DeleteMessageAsync(deadLetterQueueName, message.ReceiptHandle);
				}
			});
		}
	}
	catch (Exception)
	{
		_logger.LogError($"Failed to ReprocessMessages from queue {deadLetterQueueName}");
		throw;
	}
}

SQS queue operations at a glance

All operations described above can be seen in SqsReader SqsClient class and SqsWriter SqsClient class.

Conclusion

In the current post, I have given code examples of how to perform basic SQS queue operations.

Related Posts

Read more...

AWS examples in C# – run the solution

Last Updated on by

Post summary: Explanation of how to install and use the solution in AWS examples in C# blog post series.

This post is part of AWS examples in C# – working with SQS, DynamoDB, Lambda, ECS series. The code used for this series of blog posts is located in aws.examples.csharp GitHub repository. In the current post, I give information on how to install and run the project.

Disclaimer

The solution has commands to deploy to the cloud as well as to clean resources. Note not all resources are cleaned, read more in the Cleanup section. In order to be run in AWS valid account is needed. I am not going to describe how to create an account. If an account is present, then there is also knowledge and awareness of how to use it.

Important: current examples generate costs on AWS account. Use cautiously at your own risk!

Restrictions

The project was tested to be working on Linux and Windows. For Windows, it is working only with Git Bash. The project requires a valid AWS account.

Required installations

In order to fully run and enjoy the project following needs to be installed:

Configurations

AWS CLI has to be configured in order to run properly. It happens with aws configure. If there is no AWS account, this is not an issue, put some values for access and secret key and put a correct region, like us-east-1.

Import Postman collection, in order to be able to try the examples. Postman collection is in aws.examples.csharp.postman_collection.json file in the code. This is an optional step, below there are cURL examples also.

Run the project in AWS

Running on AWS requires the setting of environment variables:

export AwsAccessKey=KIA57FV4.....
export AwsSecretKey=mSgsxOWVh...
export AwsRegion=us-east-1

Then the solution is deployed to AWS with ./solution-deploy.sh script. Note that the output of the command gives the API Gateway URL and API key, as well as the SqsWriter and SqsReader endpoints. See image below:

Run the project partly in AWS

The most expensive part of the current setup is the running of the docker containers in EC2 (Elastic Compute Cloud). I have prepared a script called ./solution-deploy-hybrid.sh, which runs the containers locally and all other things are in AWS. Still, environment variables from the previous section are mandatory to be set. I believe this is the optimal variant if you want to test and run the code in a “production-like” environment.

Run the project in LocalStack

LocalStack provides an easy-to-use test/mocking framework for developing Cloud applications. It spins up a testing environment on local machines that provide the same functionality and APIs as the real AWS cloud environment. I have experimented with LocalStack, there is a localstack branch in GitHub. The solution can be run with solution-deploy-localstack.sh command. I cannot really estimate if this is a good alternative, because I am running the free tier in AWS and the most expensive part is ECS, which I skip by running the containers locally, instead of AWS. See the previous section on how to run a hybrid.

Usage

There is a Postman collection which allows easy firing of the requests. Another option is to use cURL, examples of all requests with their Postman names are available below.

SqsWriter

SqsWriter is a .NET Core 3.0 application, that is dockerized and run in AWS ECS (Elastic Container Service). It exposes an API that can be used to publish Actor or Movie objects. There is also a health check request. After AWS deployment proper endpoint is needed. The endpoint can be found as an output of deployment scripts. See the image above.

PublishActor

curl --location --request POST 'http://localhost:5100/api/publish/actor' \
--header 'Content-Type: application/json' \
--data-raw '{
	"FirstName": "Bruce",
	"LastName": "Willis"
}'

PublishMovie

curl --location --request POST 'http://localhost:5100/api/publish/movie' \
--header 'Content-Type: application/json' \
--data-raw '{
	"Title": "Die Hard",
	"Genre": "Action Movie"
}'

When Actor or Movie is published, it goes to the SQS queue, SqsReader picks it up from there and processes it. What is visible in the logs is that both LogEntryMessageProcessor and ActorMessageProcessor are invoked. See the screenshot:

SqsWriterHealthCheck

curl --location --request GET 'http://localhost:5100/health'

SqsReader

SqsReader is a .NET Core 3.0 application, that is dockerized and run in AWS ECS. It has a consumer that listens to the SQS queue and processes the messages by writing them into appropriate AQS DynamoDB tables. It also exposes API to stop or start processing, as long as reprocess the dead-letter queue or simply get the queue status. After AWS deployment proper endpoint is needed. The endpoint can be found as an output of deployment scripts. See the image above.

ConsumerStart

curl --location --request POST 'http://localhost:5200/api/consumer/start' \
--header 'Content-Type: application/json' \
--data-raw ''

ConsumerStop

curl --location --request POST 'http://localhost:5200/api/consumer/stop' \
--header 'Content-Type: application/json' \
--data-raw ''

ConsumerStatus

curl --location --request GET 'http://localhost:5200/api/consumer/status'

ConsumerReprocess
If this one is invoked with no messages in the dead-letter queue then it takes 20 seconds to finish, because it actually waits for long polling timeout.

curl --location --request POST 'http://localhost:5200/api/consumer/reprocess' \
--header 'Content-Type: application/json' \
--data-raw ''

SqsReaderHealthCheck

curl --location --request GET 'http://localhost:5200/health'

ActorsServerlessLambda

This lambda is managed by the Serverless framework. It is exposed as REST API via AWS API Gateway. It also has a custom authorizer as well as API Key attached. Those are described in a further post.

ServerlessActors

In the case of AWS, the API Key and URL are needed, those can be obtained from deployment command logs. See the screenshot above. Put correct values to CHANGE_ME and REGION placeholders. Request is:

curl --location --request POST 'https://CHANGE_ME.execute-api.REGION.amazonaws.com/dev/actors/search' \
--header 'Content-Type: application/json' \
--header 'x-api-key: CHANGE_ME' \
--header 'Authorization: Bearer validToken' \
--data-raw '{
    "FirstName": "Bruce",
    "LastName": "Willis"
}'

MoviesServerlessLambda

ServerlessMovies

Put correct values to CHANGE_ME and REGION placeholders. Request is:

curl --location --request GET 'https://CHANGE_ME.execute-api.REGION.amazonaws.com/dev/movies/title/Die Hard'

Cleanup

Nota bene: This is a very important step, as leaving the solution running in AWS will accumulate costs.

In order to stop and clean up all AWS resources run ./solution-delete.sh script.

Nota bene: There a resource that is not automatically deleted by the scripts. This is a Route53 resource created by AWS Cloud Map. It has to be deleted with the following commands. Note that the id in the delete command comes from the result of list-namespaces command.

aws servicediscovery list-namespaces
aws servicediscovery delete-namespace --id ns-kneie4niu6pwwela

Verify cleanup

In order to be sure there are no leftovers from the examples, following AWS services has to be checked:

  • SQS
  • DynamoDB
  • IAM -> Roles
  • EC2 -> Security Groups
  • ECS -> Clusters
  • ECS -> Task Definitions
  • ECR -> Repositories
  • Lambda -> Functions
  • Lambda -> Applications
  • CloudFormation -> Stacks
  • S3
  • CloudWatch -> Log Groups
  • Route 53
  • AWS Cloud Map

On top of it, Billing should be regularly monitored to ensure no costs are applied.

Conclusion

This post describes how to run and they the solution described in AWS examples in C# – working with SQS, DynamoDB, Lambda, ECS series

Related Posts

Read more...

AWS examples in C# – working with SQS, DynamoDB, Lambda, ECS

Last Updated on by

Post summary: Overview of the AWS examples in C# series.

In several blog posts, I give some practical examples of how to use AWS SQS, DynamoDB, Lambda with C# code. The code used for this series of blog posts is located in aws.examples.csharp GitHub repository.

Introduction

AWS stands for Amazon Web Services, it is a subsidiary of Amazon that provides on-demand cloud computing platforms and APIs to individuals, companies, and governments, on a metered pay-as-you-go basis. AWS is one of the big cloud service providers. The others are Microsoft Azure and Google Cloud. All three cloud service providers have functions that are semantically common but differ in practical implementation. Also, every one of them has its own flavors. I have chosen to use AWS for these examples as it is something I have used before and I am most comfortable with it.

Architectural overview

In order to get a full understanding of the architecture, I have prepared this very basic diagram. It illustrates what services are there and how they communicate.

SqsReader and SqsWriter

Both are .NET Core 3.0 microservices running in docker containers. The images are uploaded in AWS ECR (Elastic Container Registry) and containers are run in AWS ECS (Elastic Container Service). SqsReader has a REST endpoint, by which an Actor or Movie can be posted. Both are pushed as a message to AWS SQS (Simple Queue Service). SqsWriter is listening to the SQS and in case of a message, it processes it. If the message is from type Actor or Movie then SqsReader saves it to the respective AWS DynamoDB tables. If the message is LogEntry, then the message is only output into SqsReader logs.

ActorsLambdaFunction and MoviessLambdaFunction

Both are .NET Core 2.1 lambda functions run in AWS Lambda. They listen to Actors and Movies DynamoDB tables changes and in case of new entries, they write to LogEntries DynamoDB table. Also, they write SQS messages from type LogEntry, which are then read by SqsReader.

ActorsServerlessLambda and MoviesServerlessLambda

Those are again lambda functions by are fully managed by the Serverless framework. They have a lambda application defined as well as Cloud Formation templates. They expose a REST API trough AWS API Gateway, by which the Actors table can be queried or a movie can be got from the Movies table.

Post in the series

This is a long series of posts describing into detail all the pieces of the architectural diagram above. Also, every aspect of the code in the repository is explained in detail in subsequent blog posts. It was a very interesting learning opportunity for me, which I would like to share. Here are the posts in the series:

Future plans

There are several topics I would like to go into as well, but there is no code yet for them into the GitHub repository. Those are:

  • AWS examples in C# – manage with Terraform
  • AWS examples in C# – use AWS Cognito for API Gateway authorizer

Conclusion

These series of posts are intended to give some basic overview of important AWS services and how to use them in C# code.

Related Posts

Read more...

Git clone with predefined user email and user name

Last Updated on by

Post summary: Small bash script to clone a Git repository and set user.email and user.name.

Usecase

There are cases when committing with a different user to a different Git repository is needed. Git offers a very easy command to change user.email and user.name, as long as you remember to do so.

git config user.name "Firstname Lastname"
git config user.email "Firstname.LastnameDoe@somemailhost.com"

I always forget to do it, so I made up a small script that I use to clone a repository and it does it for me.

Git also offers a command to globally change user.name and user.email and this is valid for each and every repository that is cloned. If the use case is to work with one name and email only, then maybe this is the best option.

git config --global user.name "Firstname Lastname"
git config --global user.email "Firstname.LastnameDoe@somemailhost.com"

Script

#!/bin/bash

if [ -z "$1" ]
then
  echo "Please provide the Git repo as argument"
  exit 1
fi

if [ -z "$2" ]
then
  echo "Please provide the user.name repo as argument"
  exit 1
fi

if [ -z "$3" ]
then
  echo "Please provide the user.email repo as argument"
  exit 1
fi

IFS='/' read -r -a urlParts <<< "$1"
urlPartsLast=${urlParts[${#urlParts[@]}-1]}

IFS="." read -r -a repoParts <<< "$urlPartsLast"
repoPartsLast=${repoParts[${#repoParts[@]}-1]}
if [ "$repoPartsLast" == "git" ]
then
  unset 'repoParts[${#repoParts[@]}-1]'
fi
repoName=$(printf ".%s" "${repoParts[@]}")
repoName=${repoName:1}

git clone "$1"

cd $repoName
git config user.name "$2"
git config user.email "$3"

The script file should be made executable with chmod +x git-clone.sh and then the script can be invoked with the following command:

./git-clone.sh https://github.com/llatinov/aws.examples.csharp.git "Firstname Lastname" Firstname.LastnameDoe@somemailhost.com

Script insights

The script checks for empty arguments and returns error in case of such. Note that user.name and user.email can be hardcoded into the script itself, this makes it easier to invoke. Then the script splits by slash (/) the Git URL into different parts. It takes the last part, which is supposed to be the repository name. The last part is additionally split by dot (.) and the git suffix is ignored. Script clones the repository and navigates to the folder where it sets the user.name and user.email.

Conclusion

This script is helping not to forget to clone a Git repository with correct user.name and user.email.

Read more...