日志平台构建

​ 对于初具规模的公司来讲,日志系统都是属于不可或缺的一部分,本文通过应用日志作为示例,讲解日志采集、日志加工、日志展示相关流程。


1.日志采集

目前比较成熟且使用广泛的日志采集工具有以下两种

  1. Flume

    目前来讲使用最广泛的日志采集框架,作为apache项目之一,维护和更新不必担心。且其功能支持相比于filebeat来讲会更多,但是使用起来也会相对复杂一点。

  2. Filebeat

    Filebeat属于elastic旗下beats的一种,很轻量级的日志采集框架,只能用于抓取文件使用,相比于flume来讲扩展性更低,但是更易部署。

我们目前两种方式都有用到,但是考虑到之前的应用日志抓取使用的就是ELK技术栈,所以为了使改动最小,本次对于日志采集使用的为filebeat。以下是filebeat日志采集的配置文件,供大家参考,https://www.elastic.co/cn/downloads/past-releases/这个地址可以下载elasic的任意版本全家桶。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
filebeat.inputs:
- type: log
enabled: true
# 采集目录
paths:
- /Users/lvqiushi/idea/engine/logs/test.log

# 多行日志合并
multiline.pattern: ^[0-9][0-9][0-9][0-9]
multiline.negate: true
multiline.match: after
multiline.timeout: 10s
# 额外增加的字段 可以用于指定应用的名称和ip
fields:
serverip: 127.0.0.1
appname: engine

filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
setup.template.settings:
index.number_of_shards: 3
setup.kibana:

output.kafka:
enabled: true
# kafka集群地址
hosts: [xxx","xxx","xxx"]
topic: 'xx'
partition.hash:
reachable_only: true
compression: gzip

processors:
- drop_fields:
fields:
- beat
- host
- input
- source
- offset
- prospector
- tags
- agent
- ecs
- log

考虑到目前线上应用日志量已经比较庞大了,所以直接使用的kafka,kafka最为当今开源mq中吞吐量最大的消息中间件,性能毋庸置疑。比方说现在线上每日千万级日志新增,三台2G的kafka集群都显得毫无压力。

2.日志加工

当采集端配置好以后,接下来就是日志处理工作。之前使用的是ELK技术栈,这次之所以更换,是因为我觉得,在logstash中想要进行复杂的日志处理太困难了,而且调试比较麻烦。在采集端既然已经将日志写入了kafka,不妨我们直接用java写一个消费端来处理日志,这样的话,多么复杂的功能也会变的简单一点。

其中对于kafka的配置如下,最主要的就是设置了批量抓取,来提升写入性能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 线上集群分区数是1
factory.setConcurrency(1);
// 批量监听
factory.setBatchListener(true);

return factory;
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>(16);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "xx,xx,xx");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "xxx");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);

props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "50");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}

至于日志怎么进行加工的,我这里就不阐述了,每个人的需求和想法不同,反正都是java代码(logstash那个鬼语法真的受不了),日志的存储端使用的则为elasicsearch,毕竟上亿数据的快速查询,目前也就es能抗一抗了。

es为查询速度快付出太大的代价,导致写入性能会比较差一点,为了减少es集群的负担,在日志写入的时候,同样使用批量写入的方法进行优化。在es自身提供的高级api中为我们封装好了批量写入的处理—-BulkProcessor。

以下为BulkProcessor的配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Bean
public BulkProcessor bulkProcessor(RestHighLevelClient restHighLevelClient) {
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {

}

@Override
public void afterBulk(long executionId, BulkRequest request,
BulkResponse response) {
if (response.hasFailures()) {
String errorMsg = response.buildFailureMessage();
log.error("es 写入数据失败" + errorMsg);
}

log.info("es bulk 写入数据成功{}条", request.numberOfActions());
}

@Override
public void afterBulk(long executionId, BulkRequest request,
Throwable failure) {
log.error("{} data bulk failed , reason :{}", request.numberOfActions(), failure);
}
};

BulkProcessor bulkProcessor = BulkProcessor.builder(
(request, bulkListener) ->
restHighLevelClient.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
listener).setFlushInterval(TimeValue.timeValueSeconds(3)).build();
return bulkProcessor;
}

BulkProcessor只有两个关键参数,用于控制批操作何时被触发,flushInterval(超时时间)和bulkActions(最大数量),结合你们的实际情况调整这两个参数,来保证写入性能和读取实时性能得到最优的权衡。

配置了BulkProcessor以后,kafka每来一次消息,无脑往里面扔就可以了

1
2
3
4
5
 IndexRequest indexRequest = new IndexRequest(indexName);
indexRequest.source(logMap);
// 按天routing
indexRequest.routing(log.getTime().substring(0, 10).replace("-", ""));
bulkProcessor.add(indexRequest);

并且,为了能让es查询速度更快一点,我使用了es的routing功能对日志进行了按天分区,这是从日志查询的实际情况考虑触发的。如果不使用routing操作,那么即使是es对于上亿的数据也会显得力不从心,最简单的查询也要耗时十几秒(我们的es集群配置太差了,每台就给了两个G = =),加上了routing以后,直接变成了秒级查询,性能提升非常明显,建议大家多多使用。

3.日志展示

相比于前面来讲,日志展示反而是最简单的~ 直接根据条件在es里面查询就好了,其中最困难的就是,我们开发平时查询日志最多的就是grep操作,但是es会对你的日志内容进行分词的,会导致你有些时候根据关键字会查不出来,并且es中keyword不分词的字符串存储是有上限的(65536)。

所以我能想到的办法是,对于日志内容,使用两个字段分别存储,一个分词用于日常查询,一个不分词(截取日志的前1000个字符)用于精准查询需求。这样的话,可能80%的需求都能通过分词查询满足,剩下的20%精准查询需求,查日志1000个字符也基本上能满足了。

最终页面展示
showlogpage

查询优化建议

  1. 对于日志来讲,按照应用来分索引,避免单索引中日志量太庞大
  2. es索引结构避免使用数字类型,能使用keyword就使用keyword
  3. 使用MatchPhrase短语查询提升精度,来尽量避免使用wildcard模糊匹配来查询,如果同一时刻大家都在使用模糊查询搜日志,es集群可能会直接宕掉。
  4. 控制日期范围查询的时间段,对于日志查询来讲,很难会有需要查询数十天内所有日志的情况。