WAF防火墙数据接入腾讯云ES最佳实践(上)

说明

本文描述问题及解决方法适用于 腾讯云 Elasticsearch Service(ES)

另外使用到:腾讯云 Logstash

由于篇幅问题,本文会分两部分展开,下半部分请移步:WAF防火墙数据接入腾讯云ES最佳实践(下)

一、需求背景

WAF是个简称,中文全称为Web应用防护系统(也称为:网站应用级入侵防御系统。英文:Web Application Firewall,简称: WAF)。

客户在不同云厂商的WAF日志需要统一接入一个平台,集中管理,最终客户选择了腾讯云ES。

二、数据接入链路

链路上遇到的问题:

由于syslog只能往单节点推送,而腾讯云logstash又是多节点的logstash集群,这样就导致syslog无法利用到多台logstash进行数据同步,造成资源浪费。

解决方案:

Syslog → Logstash VIP → Logstash RS → ES

  1. 创建一个private link(vip),指向logstash实例下的所有RS节点,并绑定1024以上的端口,比如8888;
  2. logstash实例启动8888端口,接收数据;
  3. 对客户暴露这个vip:8888,让客户的syslog往vip推送数据;
  4. logstash实例的RS轮流接收到syslog数据推送,并消费到ES

WAF日志接入链路图:

SLS说明:

数据管道,阿里云的日志服务,提供日志类数据采集、加工等操作。

三、Logstash介绍

Logstash是 Elastic 公司提供的一款专门用于应用程序日志、事件的传输、处理、管理的产品。

我们可以通过Logstash完成跨ES集群的数据迁移工作,也可以使用logstash接入多种数据源做数据的同步,小红书WAF日志就是通过logstash进行接入的。

1. logstash安装

注:腾讯云ES提供了logstash产品,无需安装配置,下面简单介绍下logstash的安装方式

1)下载并安装公共签名密钥

代码语言:javascript
复制
rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch

2)配置yum源

代码语言:javascript
复制
[logstash-8.x]
name=Elastic repository for 8.x packages
baseurl=https://artifacts.elastic.co/packages/8.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md

3)安装

代码语言:javascript
复制
yum install logstash

2. logstash插件说明

Logstash是插件式工作模式,他的插件主要分为3种 input/filter/output

INPUT PLUGIN # 收集数据 FILTER PLUGIN # 数据清洗、数据转化 OUTPUT PLUGIN # 数据输出

2.1 Input配置

  • Beats:从beat采集
代码语言:javascript
复制
input {
  beats {
    port => 5044
  }
}
  • Syslog:从syslog采集
代码语言:javascript
复制
input {
  syslog {
    port => 5144
    syslog_field => "syslog"
  }
}
  • File:从文件采集
代码语言:javascript
复制
input {
  file {
    path => "/var/log/*/*.log"
  }
}
  • Elasticsearch:从es集群采集数据
代码语言:javascript
复制
input {
  elasticsearch {
    hosts => "localhost"
    query => '{ "query": {"match": { "statuscode": 200 } }, "sort": ["_doc" ] }'
  }
}
  • Kafka:从kafka获取数据
代码语言:javascript
复制
input {
  kafka {
    bootstrap_servers =>"xxxxx:9992 "
    topics => ["test"]
    codec => "json"
    security_protocol =>"SASL_PLAINTEXT"
    sasl_mechanism => "PLAIN"
    jaas_path =>"/etc/logstash/kafka-client-jaas.conf"
    consumer_threads => 2
    group_id => "logstash-bigdata"
    auto_offset_reset => "latest"
    max_poll_records => "500"
    max_poll_interval_ms =>"30000"
    session_timeout_ms => "30000"
    request_timeout_ms => "60000"
    auto_commit_interval_ms =>"5000"
    check_crcs => "false"
    heartbeat_interval_ms =>"9000"
    partition_assignment_strategy =>"org.apache.kafka.clients.consumer.RoundRobinAssignor"
    }
}
  • Jdbc:从数据库采集数据
代码语言:javascript
复制
input {
  jdbc {
    jdbc_driver_library =>"mysql-connector-java-5.1.36-bin.jar"
    jdbc_driver_class =>"com.mysql.jdbc.Driver"
    jdbc_connection_string =>"jdbc:mysql://localhost:3306/mydb"
    jdbc_user => "mysql"
    parameters => {"favorite_artist" => "Beethoven" }
    schedule => "* * * * *"
    statement => "SELECT * from songswhere artist = :favorite_artist"
  }
}
  • S3:从对象存储采集数据
代码语言:javascript
复制
input {
  s3 {
    "access_key_id" => "xxx"
    "secret_access_key" => "xxxxx"
    "endpoint" => "https://cos.ap-guangzhou.myqcloud.com"
    "bucket" => "my-bucket"
    "region" => "ap-guangzhou"
  }
}

2.2 Output配置

  • Elasticsearch:输出至ES集群
代码语言:javascript
复制
output {
if [log_type] == "tomcat" and([department] == "zxbi" or [department] == "jfbi") {
  elasticsearch {
    hosts =>["https://rjjd-node01:9200"]
    index =>"jfbi.prd-%{+YYYY.MM}"
    user => "beatuser"
    password =>"xxxxx"
    http_compression => "true"
    sniffing => "false"
    ssl => true
    ssl_certificate_verification =>"true"
    cacert => "/etc/ssl/xxx_ca.crt"
    id => "zxbi"
  }
 }
}
  • File:输出至文件
代码语言:javascript
复制
output {
file {
  path => “/tmp/stdout.log”
  codec => line { 
      format => "customformat: %{message}"
    }
  }
}
  • Stdout:输出至控制台
代码语言:javascript
复制
output {
  stdout { codec => json }
}

2.3 Filter配置

其中filter插件较为复杂,filter插件中实现了很多插件提供使用。

aggregate

Aggregates information from several events originating with a single task

logstash-filter-aggregate

alter

Performs general alterations to fields that the mutate filter does not handle

logstash-filter-alter

bytes

Parses string representations of computer storage sizes, such as "123 MB" or "5.6gb", into their numeric value in bytes

logstash-filter-bytes

cidr

Checks IP addresses against a list of network blocks

logstash-filter-cidr

cipher

Applies or removes a cipher to an event

logstash-filter-cipher

clone

Duplicates events

logstash-filter-clone

csv

Parses comma-separated value data into individual fields

logstash-filter-csv

date

Parses dates from fields to use as the Logstash timestamp for an event

logstash-filter-date

de_dot

Computationally expensive filter that removes dots from a field name

logstash-filter-de_dot

dissect

Extracts unstructured event data into fields using delimiters

logstash-filter-dissect

dns

Performs a standard or reverse DNS lookup

logstash-filter-dns

drop

Drops all events

logstash-filter-drop

elapsed

Calculates the elapsed time between a pair of events

logstash-filter-elapsed

elasticsearch

Copies fields from previous log events in Elasticsearch to current events

logstash-filter-elasticsearch

environment

Stores environment variables as metadata sub-fields

logstash-filter-environment

extractnumbers

Extracts numbers from a string

logstash-filter-extractnumbers

fingerprint

Fingerprints fields by replacing values with a consistent hash

logstash-filter-fingerprint

geoip

Adds geographical information about an IP address

logstash-filter-geoip

grok

Parses unstructured event data into fields

logstash-filter-grok

http

Provides integration with external web services/REST APIs

logstash-filter-http

i18n

Removes special characters from a field

logstash-filter-i18n

java_uuid

Generates a UUID and adds it to each processed event

core plugin

jdbc_static

Enriches events with data pre-loaded from a remote database

logstash-integration-jdbc

jdbc_streaming

Enrich events with your database data

logstash-integration-jdbc

json

Parses JSON events

logstash-filter-json

json_encode

Serializes a field to JSON

logstash-filter-json_encode

kv

Parses key-value pairs

logstash-filter-kv

memcached

Provides integration with external data in Memcached

logstash-filter-memcached

metricize

Takes complex events containing a number of metrics and splits these up into multiple events, each holding a single metric

logstash-filter-metricize

metrics

Aggregates metrics

logstash-filter-metrics

mutate

Performs mutations on fields

logstash-filter-mutate

prune

Prunes event data based on a list of fields to blacklist or whitelist

logstash-filter-prune

range

Checks that specified fields stay within given size or length limits

logstash-filter-range

ruby

Executes arbitrary Ruby code

logstash-filter-ruby

sleep

Sleeps for a specified time span

logstash-filter-sleep

split

Splits multi-line messages, strings, or arrays into distinct events

logstash-filter-split

syslog_pri

Parses the PRI (priority) field of a syslog message

logstash-filter-syslog_pri

threats_classifier

Enriches security logs with information about the attacker’s intent

logstash-filter-threats_classifier

throttle

Throttles the number of events

logstash-filter-throttle

tld

Replaces the contents of the default message field with whatever you specify in the configuration

logstash-filter-tld

translate

Replaces field contents based on a hash or YAML file

logstash-filter-translate

truncate

Truncates fields longer than a given length

logstash-filter-truncate

urldecode

Decodes URL-encoded fields

logstash-filter-urldecode

useragent

Parses user agent strings into fields

logstash-filter-useragent

uuid

Adds a UUID to events

logstash-filter-uuid

wurfl_device_detection

Enriches logs with device information such as brand, model, OS

logstash-filter-wurfl_device_detection

xml

Parses XML into fields

logstash-filter-xml

详细用法见:https://www.elastic.co/guide/en/logstash/7.14/filter-plugins.html

logstash常用插件说明

其中我们最常用的插件为:grok、date、drop、geoip、json、kv、ruby、mutate,下面简单介绍下这些常用的插件。

  • Date

日期过滤器用于解析字段中的日期,然后使用该日期或时间戳作为事件的logstash时间戳。

代码语言:javascript
复制
filter {
  date {
      match => ["[creatime]",                     # 时间字段
          "yyyy-MM-dd HH:mm:ss",                  #  
          "yyyy-MM-dd HH:mm:ss Z",                # 
          "MMM dd, yyyy HH:mm:ss aa",             # Oct 16, 2020 11:59:53 PM
          "yyyy-MM-dd HH:mm:ss.SSS",      
          "ISO8601",                              # 2018-06-17T03:44:01.103Z ( Z 后面可以有 "08:00" 也可以没 )
          "UNIX",                                 # UNIX时间戳格式记录的是从 1970 年起始至今的总秒数
          "UNIX_MS"                               # 从 1970 年起始至今的总毫秒数
      ]
      target => "@timestamp"                      # 目标字段
      timezone => "Asia/Shanghai"                 # 时区设置
  }
}
  • Drop

当不需要某些数据的时候,可以使用drop插件丢弃,例如:

代码语言:javascript
复制
filter {
  if [loglevel] == "debug" {
    drop { }
  }
}
  • Geoip

GeoIP过滤器根据Maxmind GeoLite2数据库中的数据添加有关IP地址的地理位置的信息。 配置表达式:

代码语言:javascript
复制
filter {
  geoip {
    database =>"/usr/local/GeoLite2-City/GeoLite2-City.mmdb"
    fields => ["ip","longitute","city_name"]
    source => "ip"
    target => "geoip"
  }
}

其中database文件需要手动下载:https://www.maxmind.com/en/accounts/436070/geoip/downloads,该站点需要注册登录。

  • Json

默认情况下,它会将解析后的JSON放在Logstash事件的根(顶层)中,但可以使用配置将此过滤器配置为将JSON放入任意任意事件字段 target。

当在解析事件期间发生不良事件时,此插件有一些回退场景。如果JSON解析在数据上失败,则事件将不受影响,并将标记为 _jsonparsefailure; 然后,您可以使用条件来清理数据。您可以使用该tag_on_failure选项配置此标记。

如果解析的数据包含@timestamp字段,则插件将尝试将其用于事件@timestamp,如果解析失败,则字段将重命名为,_@timestamp并且事件将使用标记 _timestampparsefailure。

代码语言:javascript
复制
filter {
  json {
    source => "message"
  }
}
  • Kv

此过滤器有助于自动解析各种消息(或特定事件字段)类似foo=bar。

例如,如果您有一条包含的日志消息ip=1.2.3.4 error=REFUSED,则可以通过配置来自动解析这些消息。

代码语言:javascript
复制
filter {
  kv {
    default_keys => [“host”,”ip”]
    exclude_keys => [“times”]
    field_split => “&?”
  }
}

支持的参数:

https://www.elastic.co/guide/en/logstash/6.8/plugins-filters-kv.html

  • Ruby

执行ruby代码。此过滤器接受内联ruby代码或ruby文件。这两个选项是互斥的,具有稍微不同的工作方式。

例如:

代码语言:javascript
复制
filter {
  ruby {
    code => "event.set('eventHappenedAt',event.get('createdDateTime').to_i*1000)"
  }
  ruby {
    id => "135453"
    path => "/usr/local/rubyCode/rubyLogParse.rb"
  }
}

其中引用的ruby脚本如下:

代码语言:javascript
复制
   def filter(event)
      #取出日志信息
      logdata=event.get('message')
      if (logdata.include?'reqMessage' )
        #解析日志级别和请求时间,并保存到event对象中
        logleve=logdata[(logdata.index('[')+1)...logdata.index(']')]
        event.set('logleve',logleve)
        event.set('requestTime',logdata[0,logdata.index(' - [')])
        #如果是rest接口json日志
        if(logdata.include?'restJson')
          parseJsonLog event,logdata
        else
          parseXmlLog event,logdata
        end
      end
    return [event]
   #解析json格式日志
 def parseJsonLog(event,logdata)
    #获取日志内容
    loginfo=logdata[(logdata.index('] ')+2)...(logdata.rindex('}')+1)]
    #转成json对象
    logInfoJson=JSON.parse loginfo
    #获取流水号
    event.set('transactionID',logInfoJson['transactionID'])
    event.set('channel',logInfoJson['reqMessage']['channel'])
    #获取返回报文
    event.set('respMessage',logInfoJson['respMessage'].to_json)
    #获取请求报文
    event.set('reqMessage',logInfoJson['reqMessage'].to_json)
​
    .....................省略部分代码
​
    #设置标示位
    event.set('tags','log_success')
  end
  • Mutate

强大的mutate过滤器,可以对数据进行增删改查。支持的语法多,且效率高

按照执行顺序排列: coerce:null时默认值 rename:重命名字段 update:更新数据 replace:替换字段值 convert:转换字段类型 gsub:替换字符 uppercase:转为大写的字符串 capitalize:转换大写字符串 lowercase:转为小写的字符串 strip:剥离字符空白 remove:移除字段 split:分离字段 join:合并数组 merge:合并多个数组 copy:复制字段

例如:

代码语言:javascript
复制
filter {
  mutate {
    split => ["hostname","."]
    add_field => {"shortHostname" => "%{hostname[0]}" }
  }
  mutate {
    rename =>["shortHostname", "hostname" ]
  }
}