The Ops Community ⚙️

Arseny Zinchenko
Arseny Zinchenko

Posted on • Originally published at rtfm.co.ua on

VictoriaLogs: a Grafana dashboard for AWS VPC Flow Logs — migrating from Grafana Loki

VictoriaLogs: a Grafana dashboard for AWS VPC Flow Logs — migrating from Grafana Loki

In the previous post — AWS: VPC Flow Logs — logs to S3 and Grafana dashboard with Loki, we created a Grafana dashboard that displays NAT Gateway traffic usage statistics.

What we were interested in there was which Kubernetes Pods use the most bytes, because it directly affects our AWS Costs.

And everything appears to be fine with this board, except for one thing — Loki cannot process raw logs and build graphs in more than 30 minutes, maximum 1 hour, and even then, some of the visualizations do not load, although I tried to shade it — see Grafana Loki: performance optimization with Recording Rules, caching, and parallel queries.

So I decided to try the same approach — with S3 for VPC Flow Logs, Lambda, and Promtail — but with VictoriaLogs, especially since VictoriaLogs Grafana data source has already got better query support since version 0.8.0, and now you can build visualizations without Grafana Transformations.

So, what are we going to do?

  • quickly show Terraform the code that creates S3 for VPC Flow Logs and AWS Lambda with Promtail, which sends data to VictoriaLogs
  • create a new Grafana dashboard with VictoriaLogs datasource, and migrate queries from Loki and its LogQL to VictoriaLogs and LogsQL

Let me remind you from the previous post what we have in our setup:

  • we know the CIDR of private subnets for Kubernetes Pods  — in my current project, we use only one network in the us-east-1a Availability Zone — 10.0.32.0/20
  • we know the Elastic Network Interface ID of our NAT Gateway — we have one, so everything is simple here
  • in the logs we have the pkt_src_addr and pkt_dst_addr fields, by which we can select traffic only from/to Kubernetes Pods

Also worth checking out other posts on this topic:

Terraform

S3, and Promtail Lambda

I won’t go into detail here because the code seems to have enough comments describing each resource. This is just an example of how it can be done. In addition, the first version of the module was described in Terraform: creating a module for collecting AWS ALB logs in Grafana Loki, but here is a slightly modified version to be able to configure both Loki and VictoriaLogs, and not only ALB logs, but also VPC Flow Logs.

So, here’s how I implemented it:

  • the atlas-tf-modules repository : Terraform modules, which contains code for creating S3 buckets, Lambda, notifications and permissions
  • the atlas-monitoring repository: Terraform code and the Helm-chart of our monitoring, where the necessary resources are created - RDS, various additional S3 buckets, AWS ACM certificates, and the module from atlas-tf-modules/alb-s3-logs is called to configure the collection of logs from S3 buckets

Let’s start with the alb-s3-logs module for S3 and Lambda. I wrote about Terraform and modules in Terraform: modules, Outputs, and Variables.

File structure in alb-s3-logs:

$ tree alb-s3-logs/
alb-s3-logs/
|-- README.md
|-- lambda.tf
|-- outputs.tf
|-- s3.tf
`-- variables.tf
Enter fullscreen mode Exit fullscreen mode

Creating S3 buckets

The s3.tf file - creation of buckets:

# define S3 bucket names from parameters passed from a calling/root module in the 'atlas-monitoring' repository
locals {

  # ops-1-30-devops-ingress-ops-alb-loki-logs
  # "ops" "1-30" "devops" "ingress" "ops" "alb" "loki" "logs"
  # <aws_env>-<eks_version>-<component>-<application>-<app_env>-<aws_service>-<logger_type>-logs

  # ops-1-30-devops-vpc-ops-flow-loki-logs
  # "ops" "1-30" "devops" "vpc" "ops" "flow" "loki" "logs"
  # <aws_env>-<eks_version>-<component>-<application>-<app_env>-<aws_service>-<logger_type>-logs
  logs_bucket_names = { for env in var.app_environments : env => "${var.aws_env}-${var.eks_version}-${var.component}-${var.application}-${env}-${var.aws_service}-${var.logger_type}-logs" }
}

resource "aws_s3_bucket" "s3_logs" {
  for_each = local.logs_bucket_names

  bucket = each.value

  # to drop a bucket, set to `true` first
  # run `terraform apply`
  # then remove the block
  # and run `terraform apply` again
  force_destroy = true
}

# remove logs older than 30 days
resource "aws_s3_bucket_lifecycle_configuration" "bucket_config" {
  for_each = aws_s3_bucket.s3_logs

  bucket = each.value.id

  rule {
    id = "logs"
    status = "Enabled"

    expiration {
      days = 30
    }
  }
}

# block S3 bucket public access
resource "aws_s3_bucket_public_access_block" "s3_logs_backend_acl" {
  for_each = aws_s3_bucket.s3_logs

  bucket = each.value.id

  block_public_acls = true
  block_public_policy = true
  ignore_public_acls = true
  restrict_public_buckets = true
}

# using the 'var.aws_service == "alb"', attach the S3 bucket Policy to buckets for ALB Logs only
resource "aws_s3_bucket_policy" "s3_logs_alb" {
  for_each = {
    for key, bucket_name in aws_s3_bucket.s3_logs :
    key => bucket_name if var.aws_service == "alb"
  }

  bucket = each.value.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Sid = "RegionELBLogsWrite"
        Effect = "Allow"
        Principal = {
          AWS = "arn:aws:iam::${var.elb_account_id}:root"
        }
        Action = "s3:PutObject"
        Resource = "arn:aws:s3:::${each.value.id}/AWSLogs/${var.aws_account_id}/*"
      },
      {
        Sid = "PromtailLambdaLogsGet"
        Effect = "Allow"
        Principal = {
          AWS = module.logs_promtail_lambda[each.key].lambda_role_arn
        }
        Action = "s3:GetObject"
        Resource = "arn:aws:s3:::${each.value.id}/*"
      }
    ]
  })
}

# using the 'var.aws_service == "flow"', attach attach the S3 bucket Policy to buckets for VPC Flow Logs only
resource "aws_s3_bucket_policy" "s3_logs_flow" {
  for_each = {
    for key, bucket_name in aws_s3_bucket.s3_logs :
    key => bucket_name if var.aws_service == "flow"
  }

  bucket = each.value.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Sid = "VPCFlowLogsDeliveryWrite",
        Effect = "Allow",
        Principal = {
          Service = "delivery.logs.amazonaws.com"
        },
        Action = "s3:PutObject",
        Resource = "arn:aws:s3:::${each.value.id}/AWSLogs/${var.aws_account_id}/*",
        Condition = {
          StringEquals = {
            "aws:SourceAccount": "${var.aws_account_id}",
            "s3:x-amz-acl": "bucket-owner-full-control"
          },
          ArnLike = {
            "aws:SourceArn": "arn:aws:logs:us-east-1:${var.aws_account_id}:*"
          }
        }
      },
      {
        Sid = "VPCFlowLogsAclCheck",
        Effect = "Allow",
        Principal = {
          Service = "delivery.logs.amazonaws.com"
        },
        Action = "s3:GetBucketAcl",
        Resource = "arn:aws:s3:::${each.value.id}",
        Condition = {
          StringEquals = {
            "aws:SourceAccount": "${var.aws_account_id}"
          },
          ArnLike = {
            "aws:SourceArn": "arn:aws:logs:us-east-1:${var.aws_account_id}:*"
          }
        }
      },
      {
        Sid = "PromtailLambdaLogsGet"
        Effect = "Allow"
        Principal = {
          AWS = module.logs_promtail_lambda[each.key].lambda_role_arn
        }
        Action = "s3:GetObject"
        Resource = "arn:aws:s3:::${each.value.id}/*"
      }      
    ]
  })
}

# send notifications to a Lambda function with Promtail when a new object is created in the S3 bucket
resource "aws_s3_bucket_notification" "s3_logs_lambda_notification" {
  for_each = aws_s3_bucket.s3_logs

  bucket = each.value.id

  lambda_function {
    lambda_function_arn = module.logs_promtail_lambda[each.key].lambda_function_arn
    events = ["s3:ObjectCreated:*"]
    filter_prefix = "AWSLogs/${var.aws_account_id}/"
  }
}
Enter fullscreen mode Exit fullscreen mode

Creating Lambda functions with Promtail

The lambda.tf file:

# to allow network connections from S3 buckets IP range
data "aws_prefix_list" "s3" {
  filter {
    name = "prefix-list-name"
    values = ["com.amazonaws.us-east-1.s3"]
  }
}

# allow connections from S3 and from/to VPC Private Subnets to access Loki and VictoriaLogs
module "logs_security_group_lambda" {
  source = "terraform-aws-modules/security-group/aws"
  version = "~> 5.2.0"

  # 'ops-1-30-loki-lambda-sg'
  name = "${var.aws_env}-${var.eks_version}-lambda-${var.logger_type}-sg"
  description = "Security Group for Lambda Egress"

  vpc_id = var.vpc_id

  egress_cidr_blocks = var.vpc_private_subnets_cidrs
  egress_ipv6_cidr_blocks = []
  egress_prefix_list_ids = [data.aws_prefix_list.s3.id]

  ingress_cidr_blocks = var.vpc_private_subnets_cidrs
  ingress_ipv6_cidr_blocks = []

  egress_rules = ["https-443-tcp"]
  ingress_rules = ["https-443-tcp"]
}

# S3 buckets names:

  # ops-1-30-devops-ingress-ops-alb-loki-logs
  # "ops" "1-30" "devops" "ingress" "ops" "alb" "loki" "logs"
  # <aws_env>-<eks_version>-<component>-<application>-<app_env>-<aws_service>-<logger_type>-logs

  # ops-1-30-devops-vpc-ops-flow-loki-logs
  # "ops" "1-30" "devops" "vpc" "ops" "flow" "loki" "logs"
  # <aws_env>-<eks_version>-<component>-<application>-<app_env>-<aws_service>-<logger_type>-logs

module "logs_promtail_lambda" {
  source = "terraform-aws-modules/lambda/aws"
  version = "~> 7.16.0"
  # key: 'ops'
  # value: 'ops-1-30-devops-vpc-ops-flow-loki-logs'
  for_each = aws_s3_bucket.s3_logs

  # build Lambda function name like 'ops-1-30-devops-vpc-ops-flow-loki-logs-logger'
  function_name = "${each.value.id}-${var.logger_type}-logger"
  description = "Promtail instance to collect logs from S3"

  create_package = false
  # https://github.com/terraform-aws-modules/terraform-aws-lambda/issues/36
  publish = true

  # an error when sending logs from Flow Logs S3:
  # 'Task timed out after 3.05 seconds'
  timeout = 60

  image_uri = var.promtail_image
  package_type = "Image"
  architectures = ["x86_64"]

  # component=devops, logtype=alb, environment=ops, logger_type=loki
  # component=devops, logtype=flow, environment=ops, logger_type=loki
  environment_variables = {
    EXTRA_LABELS = "component,${var.component},logtype,${var.aws_service},environment,${each.key},logger_type,${var.logger_type}"
    KEEP_STREAM = "true"
    OMIT_EXTRA_LABELS_PREFIX = "true"
    PRINT_LOG_LINE = "true"
    WRITE_ADDRESS = var.logger_write_address
  }

  vpc_subnet_ids = var.vpc_private_subnets_ids
  vpc_security_group_ids = [module.logs_security_group_lambda.security_group_id]
  attach_network_policy = true

  # writing too many logs
  # see in CloudWatch Metrics by the 'IncomingBytes' metric
  # to save CloudWatch Logs costs, decrease the logs number
  # set to 'INFO' for debugging
  logging_application_log_level = "FATAL"
  logging_system_log_level = "WARN"
  logging_log_format = "JSON"

  # allow calling the Lambda from an S3 bucket
  # bucket name: ops-1-28-backend-api-dev-alb-logs
  allowed_triggers = {
    S3 = {
      principal = "s3.amazonaws.com"
      source_arn = "arn:aws:s3:::${each.value.id}"
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Calling the atlas-tf-modules module from the monitoring code

Next, we describe the resources in the Terraform code in the atlas-monitoring repository - the logs.tf file.

Three resources are created here:

  • a module for the Load Balancers Logs for Loki
  • a module for the VPC Flow Logs for Loki
  • a module for the VPC Flow Logs for VictoriaLogs
/*

Collect ALB Logs to Loki module

S3:

- will create an aws_s3_bucket for each app_environments[]:
  # bucket names:
  # '<eks_env>-<eks_version>-<component>-<application>-<app_env>-alb-logs'
  # i.e:
  # 'ops-1-28-backend-api-dev-alb-logs'
- will create an aws_s3_bucket_policy with Allow for each Lambda
- will create an aws_s3_bucket_notification with Push event on each s3:ObjectCreated to each Lambda

Lambda:

- will create a security_group_lambda with Allow 443 from VPC CIDR
- will create a Lambda with Promtail for each aws_s3_bucket

*/

module "vpc_flow_logs_loki" {
  # create the module for each EKS cluster by its version
  # for_each = var.eks_versions
  for_each = toset(["1-30"])
  source = "git@github.com:ORG-NAME/atlas-tf-modules//alb-s3-logs?ref=master"
  # for local development
  # source = "/home/setevoy/Work/atlas-tf-modules//alb-s3-logs"

  # ops-1-30-devops-ingress-ops-alb-loki-logs
  # "ops" "1-30" "devops" "ingress" "ops" "alb" "loki" "logs"
  # <aws_env>-<eks_version>-<component>-<application>-<app_env>-<aws_service>-<logger_type>-logs

  # ops-1-30-devops-vpc-ops-flow-loki-logs
  # "ops" "1-30" "devops" "vpc" "ops" "flow" "loki" "logs"
  # <aws_env>-<eks_version>-<component>-<application>-<app_env>-<aws_service>-<logger_type>-logs

  # 'ops'
  aws_env = var.aws_environment
  # '1-30'
  eks_version = each.value
  # by team: 'backend', 'devops'
  component = "devops"
  application = "vpc"
  app_environments = ["ops"]
  aws_service = "flow"
  logger_type = "loki"

  vpc_id = local.vpc_out.vpc_id
  vpc_private_subnets_cidrs = local.vpc_out.vpc_private_subnets_cidrs
  vpc_private_subnets_ids = local.vpc_out.vpc_private_subnets_ids
  # 'https://loki.monitoring.1-30.ops.example.co:443/loki/api/v1/push'
  logger_write_address = "https://loki.monitoring.${each.value}.ops.example.com:443/loki/api/v1/push"
  aws_account_id = data.aws_caller_identity.current.account_id
}

module "vpc_flow_logs_vmlogs" {
  # create the module for each EKS cluster by its version
  # for_each = var.eks_versions
  for_each = toset(["1-30"])
  source = "git@github.com:ORG-NAME/atlas-tf-modules//alb-s3-logs?ref=master"
  # for local development
  # source = "/home/setevoy/Work/atlas-tf-modules//alb-s3-logs"

  # ops-1-30-devops-ingress-ops-alb-loki-logs
  # "ops" "1-30" "devops" "ingress" "ops" "alb" "loki" "logs"
  # <aws_env>-<eks_version>-<component>-<application>-<app_env>-<aws_service>-<logger_type>-logs

  # ops-1-30-devops-vpc-ops-flow-loki-logs
  # "ops" "1-30" "devops" "vpc" "ops" "flow" "loki" "logs"
  # <aws_env>-<eks_version>-<component>-<application>-<app_env>-<aws_service>-<logger_type>-logs

  # 'ops'
  aws_env = var.aws_environment
  # '1-30'
  eks_version = each.value
  # by team: 'backend', 'devops'
  component = "devops"
  application = "vpc"
  app_environments = ["ops"]
  aws_service = "flow"
  logger_type = "vmlogs"

  vpc_id = local.vpc_out.vpc_id
  vpc_private_subnets_cidrs = local.vpc_out.vpc_private_subnets_cidrs
  vpc_private_subnets_ids = local.vpc_out.vpc_private_subnets_ids
  # create log streams by the 'logtype,environment,logger_type' fields
  # see https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields
  logger_write_address = "https://vmlogs.monitoring.${each.value}.ops.example.com:443/insert/loki/api/v1/push?_stream_fields=logtype,environment,logger_type"
  aws_account_id = data.aws_caller_identity.current.account_id
}

# ../../atlas-load-balancers/helm/templates/external-ingress-alb.yaml:    
# alb.ingress.kubernetes.io/load-balancer-attributes: access_logs.s3.enabled=true,access_logs.s3.bucket=ops-1-30-devops-ingress-ops-alb-logs
# two ALB are using this buckets for their logs - the External, 'ops-external-ingress', and the Internal one, 'ops-internal-ingress'
# both are in the 'ops-common-alb-ns' Namespace
module "single_ingress_alb_logs_loki" {
  # create the module for each EKS cluster by its version
  # for_each = var.eks_versions
  for_each = toset(["1-30"])
  source = "git@github.com:ORG-NAME/atlas-tf-modules//alb-s3-logs?ref=master"
  # for local development
  # source = "/home/setevoy/Work/atlas-tf-modules//alb-s3-logs"

  # ops-1-30-devops-ingress-ops-alb-loki-logs
  # "ops" "1-30" "devops" "ingress" "ops" "alb" "loki" "logs"
  # <aws_env>-<eks_version>-<component>-<application>-<app_env>-<aws_service>-<logger_type>-logs

  # ops-1-30-devops-vpc-ops-flow-loki-logs
  # "ops" "1-30" "devops" "vpc" "ops" "flow" "loki" "logs"
  # <aws_env>-<eks_version>-<component>-<application>-<app_env>-<aws_service>-<logger_type>-logs

  # 'ops'
  aws_env = var.aws_environment
  # '1-30'
  eks_version = each.value
  component = "devops"
  application = "ingress"
  app_environments = ["ops"]
  aws_service = "alb"
  logger_type = "loki"

  vpc_id = local.vpc_out.vpc_id
  vpc_private_subnets_cidrs = local.vpc_out.vpc_private_subnets_cidrs
  vpc_private_subnets_ids = local.vpc_out.vpc_private_subnets_ids
  # 'https://loki.monitoring.1-30.ops.example.co:443/loki/api/v1/push'
  logger_write_address = "https://loki.monitoring.${each.value}.ops.example.com:443/loki/api/v1/push"
  aws_account_id = data.aws_caller_identity.current.account_id
}
Enter fullscreen mode Exit fullscreen mode

That’s all there is to it.

A VPC Flow Logs module

The terraform-aws-modules/vpc/aws module has support for the VPC Flow Logs, but you can only set one flow_log_destination_arn, in which I currently have Grafana Loki - the ops-1-30-devops-vpc-ops-flow-loki-logs S3 bucket:

module "vpc" {
  source = "terraform-aws-modules/vpc/aws"
  version = "~> 5.16.0"

  ...

  enable_flow_log = var.vpc_params.enable_flow_log

  # Default: "cloud-watch-logs"
  flow_log_destination_type = "s3"

  # disalbe to use S3
  create_flow_log_cloudwatch_log_group = false
  create_flow_log_cloudwatch_iam_role = false

  # ARN of the CloudWatch log group or S3 bucket
  # disable if use 'create_flow_log_cloudwatch_log_group' and the default 'flow_log_destination_type' value (cloud-watch-logs)
  flow_log_destination_arn = "arn:aws:s3:::ops-1-30-devops-vpc-ops-flow-loki-logs"

  flow_log_cloudwatch_log_group_name_prefix = "/aws/${local.env_name}-flow-logs/"
  flow_log_log_format = "$${region} $${vpc-id} $${az-id} $${subnet-id} $${instance-id} $${interface-id} $${flow-direction} $${srcaddr} $${dstaddr} $${srcport} $${dstport} $${pkt-srcaddr} $${pkt-dstaddr} $${pkt-src-aws-service} $${pkt-dst-aws-service} $${traffic-path} $${packets} $${bytes} $${action}"

  vpc_flow_log_tags = {
    "Name" = "flow-logs-s3-to-loki"
  }
}
Enter fullscreen mode Exit fullscreen mode

To write to two S3 buckets at once, just add another aws_flow_log resource.

VPC Flow Logs are written in a custom format:

resource "aws_flow_log" "vpc_flow_vmlogs" {
  vpc_id = module.vpc.vpc_id
  log_destination = "arn:aws:s3:::ops-1-30-devops-vpc-ops-flow-vmlogs-logs"
  log_destination_type = "s3"
  traffic_type = "ALL"
  log_format = "$${region} $${vpc-id} $${az-id} $${subnet-id} $${instance-id} $${interface-id} $${flow-direction} $${srcaddr} $${dstaddr} $${srcport} $${dstport} $${pkt-srcaddr} $${pkt-dstaddr} $${pkt-src-aws-service} $${pkt-dst-aws-service} $${traffic-path} $${packets} $${bytes} $${action}"
  tags = {
    "Name" = "flow-logs-s3-to-vmlogs"
  }
}
Enter fullscreen mode Exit fullscreen mode

In addition, I manually created Flow Logs with a destination to the CloudWatch Logs to validate query results in the Loki and VictoriaLogs.

Creating a Grafana dashboard

NAT Gateway Total processed

First, we display general statistics on how much traffic passed through the NAT Gateway — both from Kubernetes Pods to the Internet, and from the Internet to Kubernetes Pods.

A Loki query

In the Loki, the request looks like this:

sum (
    sum_over_time(
        ({logtype="flow", logger_type="loki"} | pattern `<region> <vpc_id> <az_id> <subnet_id> <instance_id> <interface_id> <flow_direction> <src_add> <dst_addr> <src_port> <dst_port> <pkt_src_addr> <pkt_dst_addr> <pkt_src_aws_service> <pkt_dst_aws_service> <traffic_path> <packets> <bytes> <action>`
        | interface_id="eni-0352f8c82da6aa229"
        | pkt_src_addr=ip("10.0.32.0/20") OR pkt_dst_addr=ip("10.0.32.0/20")
        | pkt_src_addr=~"${kubernetes_pod_ip}"
        | pkt_dst_addr=~"${remote_svc_ip}"
        | unwrap bytes
        | __error__ =""
        )[$__range]
    )
)
Enter fullscreen mode Exit fullscreen mode

Here:

  • calculate sum_over_time() for the period selected in the Grafana dashboard - $__range
  • count the traffic either from Kubernetes Pods — pkt_src_addr=ip("10.0.32.0/20"), or vice versa to Kubernetes Pods - pkt_dst_addr=ip("10.0.32.0/20")
  • count by the bytes field - unwrap bytes

With this query, we have the following data:

kubernetes_pod_ip and remote_svc_ip are variables in the Grafana dashboard to be able to check data for specific addresses:

A VictoriaLogs query

Now, we need to translate this query into LogsQL format for VictoriaLogs.

It will look like this:

_time:$__range {logtype=flow, environment=ops, logger_type=vmlogs} seq("eni-0352f8c82da6aa229", "ACCEPT")
| extract "<region> <vpc_id> <az_id> <subnet_id> <instance_id> <interface_id> <flow_direction> <src_addr> <dst_addr> <src_port> <dst_port> <pkt_src_addr> <pkt_dst_addr> <pkt_src_aws_service> <pkt_dst_aws_service> <traffic_path> <packets> <bytes> <action>"
  | filter
      interface_id:="eni-0352f8c82da6aa229"
      action:="ACCEPT"
      (pkt_src_addr:ipv4_range("10.0.32.0/20") OR pkt_dst_addr:ipv4_range("10.0.32.0/20"))
      pkt_src_addr:~"${kubernetes_pod_ip}"
      pkt_dst_addr:~"${remote_svc_ip}"   
  | stats sum(bytes) bytes_total
Enter fullscreen mode Exit fullscreen mode

Support for the Grafana $__range variable was introduced only recently in the VictoriaLogs datasource version 0.9.0, so update yourself.

Here we are:

  • select data for _time:$__range
  • in the {logtype=flow, environment=ops, logger_type=vmlogs} we use the log stream selector with labels that are set in Lambda Promtail when writing logs - /insert/loki/api/v1/push?_stream_fields=logtype,environment,logger_type
  • the seq("eni-0352f8c82da6aa229", "ACCEPT") - use the Sequence filter to select only records from the NAT Gateway interface and ACCEPT to speed up the query execution (see the comment from Olexandr Valialkin here>>>)
  • with the extract we form fields from log entries
  • with the filter we select the NAT Gateway interface, ACCEPT, and as in the Loki request - filter traffic either from Kubernetes Pods with IPv4 range filter - pkt_src_addr:ipv4_range("10.0.32.0/20"), or vice versa to Kubernetes Pods - pkt_dst_addr:ipv4_range("10.0.32.0/20") (note that the OR condition are enclosed in brackets)
  • and at the end with the stats we calculate the sum of the bytes field, and write the result in the bytes_total field

Verification with CloudWatch Logs Insights

To be able to check the data in Loki and VictoriaLogs, VPC now also writes to CloudWatch Logs.

Let’s make such a query in the Logs Insights:

parse @message "* * * * * * * * * * * * * * * * * * *" 
| as region, vpc_id, az_id, subnet_id, instance_id, interface_id, 
| flow_direction, srcaddr, dstaddr, srcport, dstport, 
| pkt_srcaddr, pkt_dstaddr, pkt_src_aws_service, pkt_dst_aws_service, 
| traffic_path, packets, bytes, action 
| filter ((interface_id="eni-0352f8c82da6aa229") AND ((isIpv4InSubnet(pkt_srcaddr,"10.0.32.0/20") OR isIpv4InSubnet(pkt_dstaddr,"10.0.32.0/20") )) | stats sum(bytes) as bytes_total
Enter fullscreen mode Exit fullscreen mode

As a result, we have 8547192734 bytes:

That in SI format (see Binary prefixes) gives us 1.87 gigabytes — let’s calculate with a calculator:

$ bc
scale=2
8547192734/1000/1000/1000
8.54
Enter fullscreen mode Exit fullscreen mode

In the Loki we had 7.56 GiB, and in the VictoriaLogs — 8.66 GiB.

There can differ a bit in the Loki, VictoriaLogs, and CloudWatch results, especially when sampling for only 30 minutes, because Flow Logs themselves are written with a difference of several minutes.

For example, in the Loki bucket, the last object was created at 13:06:50:

But in the VMLogs bucket — in the 13:05:29:

Validating with the Cost Explorer

You can also check the data usage with the Cost Explorer.

Select the Service == EC2-Other, and Usage type == NatGateway-Bytes (GB):

Over the past day, we have 129 gigabytes of traffic processed by the NAT Gateway.

If we make a range of 24 hours in Grafana (we can finally do this because we now have VictoriaLogs), we will see 135 gigabytes in the “NAT Gateway Total processed” panel:

Plus or minus it’s the same, because Cost Explorer doesn’t count the last 24 hours like Grafana, but the previous day, and it uses UTC (+00:00) time zone.

NAT Gateway Total OUT and IN processed

Next, we want to see the direction of the traffic — from Kubernetes Pods to the Internet, and from the Internet to Kubernetes Pods.

Let’s recall what we have in the records for packets passing through the NAT Gateway — as we analyzed in the Traffic from the Pod to the External Server through the NAT Gateway, and VPC Flow Logs:

  • by the interface_id field, we filter only those records that were made from the NAT Gateway interface
  • if the packet goes from a Kubernetes Pod to the Internet, then the pkt_src_addr field will contain the IP of this Pod
  • if the packet goes from the Internet to a Kubernetes Pod, then the pkt_dst_addr field will contain the IP of this Pod

Loki queries

Therefore, to count bytes from the Internet to Kubernetes Pods we can make the following query in Loki: with sum_over_time() and $__range to select data for 30 minutes, and the pkt_dst_addr=ip("10.0.32.0/20 ") to select IPs only from the VPC Private Subnet used for Kubernetes Pods:

sum (
    sum_over_time(
        ({logtype="flow", logger_type="loki"} | pattern `<region> <vpc_id> <az_id> <subnet_id> <instance_id> <interface_id> <flow_direction> <src_add> <dst_addr> <src_port> <dst_port> <pkt_src_addr> <pkt_dst_addr> <pkt_src_aws_service> <pkt_dst_aws_service> <traffic_path> <packets> <bytes> <action>`
        | interface_id="eni-0352f8c82da6aa229"
        | pkt_dst_addr=ip("10.0.32.0/20")
        | pkt_src_addr=~"${kubernetes_pod_ip}"
        | pkt_dst_addr=~"${remote_svc_ip}"
        | unwrap bytes
        | __error__ =""
        )[$__range]
    )
)
Enter fullscreen mode Exit fullscreen mode

To accelerate the request process, you can set Type == Instant at the bottom of the Options.

And similarly, but we count from Kubernetes Pods to the Internet:

sum (
    sum_over_time(
        ({logtype="flow", logger_type="loki"} | pattern `<region> <vpc_id> <az_id> <subnet_id> <instance_id> <interface_id> <flow_direction> <src_add> <dst_addr> <src_port> <dst_port> <pkt_src_addr> <pkt_dst_addr> <pkt_src_aws_service> <pkt_dst_aws_service> <traffic_path> <packets> <bytes> <action>`
        | interface_id="eni-0352f8c82da6aa229"
        | flow_direction="ingress"
        | pkt_src_addr=ip("10.0.32.0/20")
        | pkt_src_addr=~"${kubernetes_pod_ip}"
        | pkt_dst_addr=~"${remote_svc_ip}"
        | unwrap bytes
        | __error__ =""
        )[$__range]
    )
)
Enter fullscreen mode Exit fullscreen mode

VictoriaLogs queries

Queries for VictoriaLogs will look like this: from the Internet to Kubernetes Pods:

_time:$__range {logtype=flow, environment=ops, logger_type=vmlogs} seq("eni-0352f8c82da6aa229", "ACCEPT")
| extract "<region> <vpc_id> <az_id> <subnet_id> <instance_id> <interface_id> <flow_direction> <src_addr> <dst_addr> <src_port> <dst_port> <pkt_src_addr> <pkt_dst_addr> <pkt_src_aws_service> <pkt_dst_aws_service> <traffic_path> <packets> <bytes> <action>"
  | filter
      interface_id:="eni-0352f8c82da6aa229"
      action:="ACCEPT"
      pkt_dst_addr:ipv4_range("10.0.32.0/20")
      pkt_src_addr:~"${kubernetes_pod_ip}"
      pkt_dst_addr:~"${remote_svc_ip}"        
  | stats sum(bytes) bytes_total
Enter fullscreen mode Exit fullscreen mode

From Kubernetes Pods to the Internet:

_time:$__range {logtype=flow, environment=ops, logger_type=vmlogs} seq("eni-0352f8c82da6aa229", "ACCEPT")
| extract "<region> <vpc_id> <az_id> <subnet_id> <instance_id> <interface_id> <flow_direction> <src_addr> <dst_addr> <src_port> <dst_port> <pkt_src_addr> <pkt_dst_addr> <pkt_src_aws_service> <pkt_dst_aws_service> <traffic_path> <packets> <bytes> <action>"
  | filter
      interface_id:="eni-0352f8c82da6aa229"
      action:="ACCEPT"
      pkt_src_addr:ipv4_range("10.0.32.0/20")
      pkt_src_addr:~"${kubernetes_pod_ip}"
      pkt_dst_addr:~"${remote_svc_ip}"  
  | stats sum(bytes) bytes_total
Enter fullscreen mode Exit fullscreen mode

And all three panels together:

NAT Gateway Total processed bytes/sec

In addition to the Stat panels, I would like to see a historical picture of how traffic has changed over the time.

Loki query

In Loki, everything is simple — we just use the rate() function:

sum (
    rate(
        ({logtype="flow", logger_type="loki"} | pattern `<region> <vpc_id> <az_id> <subnet_id> <instance_id> <interface_id> <flow_direction> <src_add> <dst_addr> <src_port> <dst_port> <pkt_src_addr> <pkt_dst_addr> <pkt_src_aws_service> <pkt_dst_aws_service> <traffic_path> <packets> <bytes> <action>`
        | interface_id="eni-0352f8c82da6aa229"
        | pkt_dst_addr=ip("10.0.32.0/20")
        | pkt_src_addr=~"${kubernetes_pod_ip}"
        | pkt_dst_addr=~"${remote_svc_ip}"
        | unwrap bytes
        | __error__ =""
        )[$__interval]
    )
)
Enter fullscreen mode Exit fullscreen mode

In the Options and in the rate() we use an interval of 5 minutes, in Standard Options > Unit set "bytes/sec(SI)", and as a result we have 7.25 MB/s at 12:20:

VictoriaLogs query

With the VictoriaLogs it’s a bit more interesting because VictoriaLogs doesn’t have yet a rate() function out of the box (but developers promise to add it soon).

Also, there is one more nuance:

  • Loki counts data “backwards”, that is, the point on the graph is at 12:25 and rate() takes the previous 5 minutes - [5m] from the Options which is passed in $__interval
  • in VictoriaLogs, the graph will be displayed at the time of the request execution

To calculate the per-second rate of our bytes, we can use the math pipe.

So, the query will be as follows:

_time:$__range {logtype=flow, environment=ops, logger_type=vmlogs} seq("eni-0352f8c82da6aa229", "ACCEPT")
| extract "<region> <vpc_id> <az_id> <subnet_id> <instance_id> <interface_id> <flow_direction> <src_addr> <dst_addr> <src_port> <dst_port> <pkt_src_addr> <pkt_dst_addr> <pkt_src_aws_service> <pkt_dst_aws_service> <traffic_path> <packets> <bytes> <action>"
  | filter
      interface_id:="eni-0352f8c82da6aa229"
      action:="ACCEPT"
      pkt_dst_addr:ipv4_range("10.0.32.0/20")
      pkt_src_addr:~"${kubernetes_pod_ip}"
      pkt_dst_addr:~"${remote_svc_ip}"        
  | stats sum(bytes) sum_bytes
  | math (sum_bytes / ($__interval/1s)) as bytes_total
Enter fullscreen mode Exit fullscreen mode

Here:

  • in the stats sum(bytes) we calculate the sum of bytes for the interval specified in Options (5 minutes), the result is saved as sum_bytes
  • then we use math to calculate the sum of bytes from the sum_bytes for each interval on the graph, and divide them by the number of seconds in the selected $__interval

Here we have 8.30 MB/s at 12:20. It appears to be similar to the Loki’s result. You can go all out with the checking and calculate it manually from the logs, but super accurate numbers are not critical here, we are just interested in the total trend, so for me, it’s OK.

In general, when building graphs, you don’t need to write _time:$__range, because it is done in VMLogs itself "under the hood", but I'm adding it for clarity.

Kubernetes Pods IN From IP

Next, let’s display the top Kubernetes Pods IPs by traffic received from the Internet.

Loki query

For Loki, we use the sum_over_time() for the $__range, in our dashboard it is 30 minutes:

topk(5,
  sum by (pkt_src_addr) (
    sum_over_time(
      (
        {logtype="flow", logger_type="loki"}
        | pattern `<region> <vpc_id> <az_id> <subnet_id> <instance_id> <interface_id> <flow_direction> <src_addr> <dst_addr> <src_port> <dst_port> <pkt_src_addr> <pkt_dst_addr> <pkt_src_aws_service> <pkt_dst_aws_service> <traffic_path> <packets> <bytes> <action>`
        | interface_id="eni-0352f8c82da6aa229"
        | action="ACCEPT"
        | pkt_dst_addr=ip("10.0.32.0/20")
        | pkt_src_addr=~"${remote_svc_ip}"
        | pkt_dst_addr=~"${kubernetes_pod_ip}"
        | unwrap bytes
        | __error__ =""
      )[$__range]
    )
  )
)
Enter fullscreen mode Exit fullscreen mode

See the Grafana Data links — very useful feature in Grafana.

VictoriaLogs query

And a similar query for VictoriaLogs will look like this:

_time:$__range {logtype=flow, environment=ops, logger_type=vmlogs} seq("eni-0352f8c82da6aa229", "ACCEPT")
| extract "<region> <vpc_id> <az_id> <subnet_id> <instance_id> <interface_id> <flow_direction> <src_addr> <dst_addr> <src_port> <dst_port> <pkt_src_addr> <pkt_dst_addr> <pkt_src_aws_service> <pkt_dst_aws_service> <traffic_path> <packets> <bytes> <action>"
  | filter 
    interface_id:="eni-0352f8c82da6aa229"
    action:=ACCEPT
    pkt_dst_addr:ipv4_range("10.0.32.0/20")
    pkt_dst_addr:~"${kubernetes_pod_ip}"
    pkt_src_addr:~"${remote_svc_ip}"    
  | stats by (pkt_src_addr) sum(bytes) sum_bytes
  | sort by (sum_bytes) desc limit 10
Enter fullscreen mode Exit fullscreen mode

VictoriaLogs does not yet support Options for Legend and returns the result directly to JSON.

Therefore, to make everything beautiful and without any unnecessary data, we can add Transformations > “Rename fields by regex”, in which we will “cut” only IP addresses with the regular expression .*addr="(.*)".*:

And what do we have here:

  • in the Loki, we have in the top an 20.150.90.164 IP with 954 MB
  • in the VictoriaLogs, we have the same 20.150.90.164 IP with 954 MB is in the top

In general, the data is similar, although the sorting is slightly different in Loki, again due to a slight delay. And topk() in Loki works a bit weird. I tried to dig into this once, but gave up. In VictoriaLogs, limit works better (although there is also a bug, we'll see later).

Let’s check the 20.150.90.164 IP in the CloudWatch Logs Insights with the following query:

parse @message "* * * * * * * * * * * * * * * * * * *" 
| as region, vpc_id, az_id, subnet_id, instance_id, interface_id, 
| flow_direction, srcaddr, dstaddr, srcport, dstport, 
| pkt_srcaddr, pkt_dstaddr, pkt_src_aws_service, pkt_dst_aws_service, 
| traffic_path, packets, bytes, action 
| filter ((interface_id="eni-0352f8c82da6aa229") AND (isIpv4InSubnet(pkt_dstaddr,"10.0.32.0/20"))) | stats sum(bytes) as bytes_total by pkt_srcaddr
| sort bytes_total desc
Enter fullscreen mode Exit fullscreen mode

The data in VictoriaLogs is more similar to the truth, but in general, both systems display the data more or less correctly.

Again, if we take a longer period of time (which we can’t do with Loki, but we can with VictoriaLogs), the data in CloudWatch Logs and VictoriaLogs will be even more accurate.

Kubernetes Pods IN From IP bytes/sec

It’s the same as we did for the “NAT Gateway Total IN processed” panel — to get a historical picture of traffic.

Loki query

topk(5,
  sum by (pkt_src_addr) (
    rate(
      (
        {logtype="flow", logger_type="loki"}
        | pattern `<region> <vpc_id> <az_id> <subnet_id> <instance_id> <interface_id> <flow_direction> <src_addr> <dst_addr> <src_port> <dst_port> <pkt_src_addr> <pkt_dst_addr> <pkt_src_aws_service> <pkt_dst_aws_service> <traffic_path> <packets> <bytes> <action>`
        | interface_id="eni-0352f8c82da6aa229"
        | action="ACCEPT"
        | pkt_dst_addr=ip("10.0.32.0/20")
        | pkt_src_addr=~"${remote_svc_ip}"
        | pkt_dst_addr=~"${kubernetes_pod_ip}"
        | unwrap bytes
        | __error__ =""
      )[$__interval]
    )
  )
)
Enter fullscreen mode Exit fullscreen mode

VictoriaLogs query

_time:$__range {logtype=flow, environment=ops, logger_type=vmlogs} seq("eni-0352f8c82da6aa229", "ACCEPT")
| extract "<region> <vpc_id> <az_id> <subnet_id> <instance_id> <interface_id> <flow_direction> <src_addr> <dst_addr> <src_port> <dst_port> <pkt_src_addr> <pkt_dst_addr> <pkt_src_aws_service> <pkt_dst_aws_service> <traffic_path> <packets> <bytes> <action>"
  | filter
      interface_id:="eni-0352f8c82da6aa229"
      action:="ACCEPT"
      pkt_dst_addr:ipv4_range("10.0.32.0/20")
      pkt_drc_addr:~"${kubernetes_pod_ip}"
      pkt_src_addr:~"${remote_svc_ip}"
  | stats by (pkt_src_addr) sum(bytes) bytes_total
  | sort by (bytes_total) desc
  | math (bytes_total / ($__interval/1s)) as bytes_total
Enter fullscreen mode Exit fullscreen mode

The data is also similar.

But here again, there is a problem with topk() in Loki - the limit is set to the top 5 results, but Loki displays 11.

VictoriaLogs also has a problem with limit, for example, set | sort by (bytes_total) desc limit 5:

As a result, we have not the top 5 IPs, but just 5 points on the chart.

I talked to the VictoriaMetrics developers — they say it looks like a bug, so I opened a GitHub Issue for them, and we’ll see what happens in the next releases with bugfixes.

UPD : this was already fixed

Kubernetes Pods IN by IP and Port

It remains to display information on IP and ports, it can be useful when determining the service that generates traffic — see pkt_src_aws_service, pkt_dst_aws_service, and finding a service.

Loki query

Use the Table visualization type and the following query:

topk(10,
  sum by (pkt_src_addr, src_port, pkt_dst_addr, dst_port) (
    sum_over_time(
      (
        {logtype="flow", logger_type="loki"}
        | pattern `<region> <vpc_id> <az_id> <subnet_id> <instance_id> <interface_id> <flow_direction> <src_addr> <dst_addr> <src_port> <dst_port> <pkt_src_addr> <pkt_dst_addr> <pkt_src_aws_service> <pkt_dst_aws_service> <traffic_path> <packets> <bytes> <action>`
        | interface_id="eni-0352f8c82da6aa229"
        | action="ACCEPT"
        | pkt_dst_addr=ip("10.0.32.0/20")
        | pkt_src_addr=~"${remote_svc_ip}"
        | pkt_dst_addr=~"${kubernetes_pod_ip}"
        | unwrap bytes
        | __error__ =""
      )[$__range]
    )
  )
)
Enter fullscreen mode Exit fullscreen mode

In the Fields override, set the Unit of the Value field to the “bytes(SI)”, and for each column — set its own Data link.

We can change the column headers and hide the Time field in Transformations:

VictoriaLogs query

The query here will be like this:

_time:$__range {logtype=flow, environment=ops, logger_type=vmlogs} seq("eni-0352f8c82da6aa229", "ACCEPT")
| extract "<region> <vpc_id> <az_id> <subnet_id> <instance_id> <interface_id> <flow_direction> <src_addr> <dst_addr> <src_port> <dst_port> <pkt_src_addr> <pkt_dst_addr> <pkt_src_aws_service> <pkt_dst_aws_service> <traffic_path> <packets> <bytes> <action>"
  | filter
      interface_id:="eni-0352f8c82da6aa229"
      action:="ACCEPT"
      pkt_dst_addr:ipv4_range("10.0.32.0/20")
      pkt_dst_addr:~"${kubernetes_pod_ip}"
      pkt_src_addr:~"${remote_svc_ip}"
  | stats by (pkt_src_addr, src_port, pkt_dst_addr, dst_port) sum(bytes) bytes_total
  | sort by (bytes_total) desc limit 10
Enter fullscreen mode Exit fullscreen mode

Since VictoriaLogs returns (so far) the results in JSON, we’ll add the “Extract fields” Transformation.

In the “Filter fields by name”, just like for Loki, we remove the “Time” column.

And in the “Organize fields by name”, we change the column headings and sorting of the columns:

The final Grafana dashboard result, and performance of Loki vs VictoriaLogs

The result with the VictoriaLogs for 12 (!) hours range:

And CPU/memory resources used:

$ kk top pod atlas-victoriametrics-victoria-logs-single-server-0 
NAME CPU(cores) MEMORY(bytes)   
atlas-victoriametrics-victoria-logs-single-server-0 2754m 34Mi
Enter fullscreen mode Exit fullscreen mode

The result with the Loki for only 30 minutes range:

And CPU/memory resources used:

$ kk top pod -l app.kubernetes.io/name=loki,app.kubernetes.io/component=read
NAME CPU(cores) MEMORY(bytes)   
loki-read-89fbb8f5c-874xq 683m 402Mi           
loki-read-89fbb8f5c-8qpdw 952m 399Mi           
loki-read-89fbb8f5c-qh8dg 848m 413Mi
Enter fullscreen mode Exit fullscreen mode

And that’s it for now.

It remains to wait for minor updates on the datasource and VictoriaLogs itself.

What’s next?

And then I still want to have fields with Kubernetes Pods IP and external resource addresses in the log fields rather than parsing them in the VictoriaLogs dashboard itself.

Then it will be possible to make samples for several days or maybe even weeks.

To achieve this, we can try the vector.dev to use it to collect logs from S3, perform transformations, add fields there, and then write these logs to VictoriaLogs.

I’ll probably try it when I have time, because it looks like a very interesting solution.

Originally published at RTFM: Linux, DevOps, and system administration.


Top comments (0)