Era

Vector setup for Cloudflare log ingest

Cloudflare logs are rich in data and can provide wonderful value and here's how to collect them.

image of Vector setup for Cloudflare log ingest

Using the Elasticsearch API, our customers can use the tools they already know and love and keep more of the data at a lower cost.

Why collect Cloudflare logs?

Too often when we utilize a CDN we set it and forget it. Wouldn't it be great to watch the traffic coming to your website through a CDN? Maybe understanding what the CDN is not allowing through or maybe watching for errors at your origin. Maybe looking for speed issues between your customers all the way through the origin. Whatever your reason, we have a solution for you.

Why use EraSearch for Cloudflare Logs?

We at Era.co believe that consolidation of logs across the enterprise, be it on-prem cloud or public cloud or both, provides the ability to gain more knowledge from these logs. We believe the logging should be easy and reduce your management costs. We provide both a SAAS offering as well as the ability to run EraSearch and EraDB in your own Kubernetes cluster.

We have developed EraDB to allow for any number of API's to be developed on top of it, and our first API is an "Elasticsearch-compatible interface that is built from the ground up to be optimized for ingesting, indexing, and storing logs while also leveraging the best properties of a cloud-native architecture. We’ve built EraSearch to realize that dream. "

Logical view of the design

cloudflare-logical.png To setup Cloudflare, please see our "Cloudflare setup".

To setup Vector to accept the cloud flare logs we use the below example. It's commented in-line so it's easier to understand. We use this config as the base of our Vector docker build's. We also use this to publish multiple Vector's in one system for POC's/Demo's. It would be similar for a k8s install behind an ingress controller. You can also use Vector in a daemon set and we will publish more about that soon.

There are many ways to utilize this config, which you can read all about on Vector's website.

#                                    __   __  __
#                                    \ \ / / / /
#                                     \ V / / /
#                                      \_/  \/
#
#                                    V E C T O R
#                                   Configuration
#
# ------------------------------------------------------------------------------
# Website: https://vector.dev
# Docs: https://vector.dev/docs/
# ------------------------------------------------------------------------------

# Note: A full configuration reference is located at https://vector.dev/docs/ and examples
#       in the ./examples folder.

data_dir = "/var/lib/vector"

#Setups the splunk ingest port and TLS encryption.
[sources.cloudflare]
  type = "splunk_hec"
  address = "0.0.0.0:${VECPORT}"
  tls.enabled = true
  tls.crt_file = "vector.eradb.com.crt"
  tls.key_file = "vector.eradb.com.key"

#This transform makes sure we really want the CF message.  The .splunk_channel
[transforms.splunk_cf]
  type = "filter" # required
    #the below tells us to use the above sources.cloudflare for this input
  inputs = ["cloudflare"] # required
    #the condition statement checks the http header .splunk_channel is a channelID we want
  condition = 'includes(["${channelID}","${channelID2}","${channelID3}"], .splunk_channel)'

#This transform is an interesting one.  CF send newline seperated JSON line by reqeust
#So you may have 5000 lines in a single request.  If you need to add anything to the line,
#you have to explode or seperate the single request into lines.
#This first strips off the trailing newline and then splits the message by a newline
[transforms.explode_cf]
type = "remap"
inputs = ["splunk_cf"]
source = '''
.message, err=strip_whitespace(.message)
   if (match(.message,r'\n') != null ){
   . = split(.message,r'\n')
}
'''

# Parse Syslog logs
# See the Vector Remap Language reference for more info: https://vrl.dev
# this transform adds a _lid and _ts to the request. It's required right now to insert into
# EraSearch.  We are working to remove this requirement in the near future.
# this first turns the line into an JSON object and then looks for a timestamp
# to be used for the _ts and _lid
[transforms.parse_logs_cf]
type = "remap"
inputs = ["explode_cf"]
source = '''
#If you want to see the message, you can use log, to log the line to a error level
#log(.message,level:"error")
. = parse_json!(.message)
if exists(.EdgeEndTimestamp) {
    ._lid = to_unix_timestamp(to_timestamp!(.EdgeEndTimestamp),unit: "nanoseconds")
    ._ts =to_unix_timestamp(to_timestamp!(.EdgeEndTimestamp),unit: "milliseconds")
}else if exists(.ConnectTimestamp) {
    ._ts =to_unix_timestamp(to_timestamp!(.ConnectTimestamp),unit: "milliseconds")
    ._lid = to_unix_timestamp(to_timestamp!(.ConnectTimestamp),unit: "nanoseconds")
}else {
        ._lid = to_unix_timestamp(now(),unit: "nanoseconds")
        ._ts =to_unix_timestamp(now(),unit: "milliseconds")
}
'''

# This sends the line to EraSearch using the elasticsearch "Type" from vector
#Uses the above transform as it's input
[sinks.es2_cloudflare]
  type = "elasticsearch"
  inputs = ["parse_logs_cf"]
  endpoint="https://${host}:443"
  index = "logs-cf"

  auth.user="${uname}"
  auth.password="${passwd}"
  auth.strategy="basic"
  healthcheck.enabled = false
  batch.max_bytes=5000000
  request.concurrency = "adaptive"
  request.headers.Content-Type = "application/json"
  buffer.max_events = 5000
  buffer.type = "memory"
  buffer.when_full = "block"

# The below turns on Prometheus metrics and exports them
# This is useful to understand how the adaptive concurrency is working.
[sources.internal]
type = "internal_metrics"

[sinks.prometheus]
type = "prometheus_exporter"
inputs = ["internal"]
address = "0.0.0.0:9598"