日志平台构建
对于初具规模的公司来讲,日志系统都是属于不可或缺的一部分,本文通过应用日志作为示例,讲解日志采集、日志加工、日志展示相关流程。
1.日志采集
目前比较成熟且使用广泛的日志采集工具有以下两种
Flume
目前来讲使用最广泛的日志采集框架,作为apache项目之一,维护和更新不必担心。且其功能支持相比于filebeat来讲会更多,但是使用起来也会相对复杂一点。
Filebeat
Filebeat属于elastic旗下beats的一种,很轻量级的日志采集框架,只能用于抓取文件使用,相比于flume来讲扩展性更低,但是更易部署。
我们目前两种方式都有用到,但是考虑到之前的应用日志抓取使用的就是ELK技术栈,所以为了使改动最小,本次对于日志采集使用的为filebeat。以下是filebeat日志采集的配置文件,供大家参考,https://www.elastic.co/cn/downloads/past-releases/这个地址可以下载elasic的任意版本全家桶。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| filebeat.inputs: - type: log enabled: true paths: - /Users/lvqiushi/idea/engine/logs/test.log
multiline.pattern: ^[0-9][0-9][0-9][0-9] multiline.negate: true multiline.match: after multiline.timeout: 10s fields: serverip: 127.0.0.1 appname: engine
filebeat.config.modules: path: ${path.config}/modules.d/*.yml reload.enabled: false setup.template.settings: index.number_of_shards: 3 setup.kibana:
output.kafka: enabled: true hosts: [xxx","xxx","xxx"] topic: 'xx' partition.hash: reachable_only: true compression: gzip
processors: - drop_fields: fields: - beat - host - input - source - offset - prospector - tags - agent - ecs - log
|
考虑到目前线上应用日志量已经比较庞大了,所以直接使用的kafka,kafka最为当今开源mq中吞吐量最大的消息中间件,性能毋庸置疑。比方说现在线上每日千万级日志新增,三台2G的kafka集群都显得毫无压力。
2.日志加工
当采集端配置好以后,接下来就是日志处理工作。之前使用的是ELK技术栈,这次之所以更换,是因为我觉得,在logstash中想要进行复杂的日志处理太困难了,而且调试比较麻烦。在采集端既然已经将日志写入了kafka,不妨我们直接用java写一个消费端来处理日志,这样的话,多么复杂的功能也会变的简单一点。
其中对于kafka的配置如下,最主要的就是设置了批量抓取,来提升写入性能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(1); factory.setBatchListener(true); return factory; }
@Bean public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); }
@Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(16); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "xx,xx,xx"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "xxx"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "50"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; }
|
至于日志怎么进行加工的,我这里就不阐述了,每个人的需求和想法不同,反正都是java代码(logstash那个鬼语法真的受不了),日志的存储端使用的则为elasicsearch,毕竟上亿数据的快速查询,目前也就es能抗一抗了。
es为查询速度快付出太大的代价,导致写入性能会比较差一点,为了减少es集群的负担,在日志写入的时候,同样使用批量写入的方法进行优化。在es自身提供的高级api中为我们封装好了批量写入的处理—-BulkProcessor。
以下为BulkProcessor的配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| @Bean public BulkProcessor bulkProcessor(RestHighLevelClient restHighLevelClient) { BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) {
}
@Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { if (response.hasFailures()) { String errorMsg = response.buildFailureMessage(); log.error("es 写入数据失败" + errorMsg); }
log.info("es bulk 写入数据成功{}条", request.numberOfActions()); }
@Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { log.error("{} data bulk failed , reason :{}", request.numberOfActions(), failure); } }; BulkProcessor bulkProcessor = BulkProcessor.builder( (request, bulkListener) -> restHighLevelClient.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), listener).setFlushInterval(TimeValue.timeValueSeconds(3)).build(); return bulkProcessor; }
|
BulkProcessor只有两个关键参数,用于控制批操作何时被触发,flushInterval(超时时间)和bulkActions(最大数量),结合你们的实际情况调整这两个参数,来保证写入性能和读取实时性能得到最优的权衡。
配置了BulkProcessor以后,kafka每来一次消息,无脑往里面扔就可以了
1 2 3 4 5
| IndexRequest indexRequest = new IndexRequest(indexName); indexRequest.source(logMap); // 按天routing indexRequest.routing(log.getTime().substring(0, 10).replace("-", "")); bulkProcessor.add(indexRequest);
|
并且,为了能让es查询速度更快一点,我使用了es的routing功能对日志进行了按天分区,这是从日志查询的实际情况考虑触发的。如果不使用routing操作,那么即使是es对于上亿的数据也会显得力不从心,最简单的查询也要耗时十几秒(我们的es集群配置太差了,每台就给了两个G = =),加上了routing以后,直接变成了秒级查询,性能提升非常明显,建议大家多多使用。
3.日志展示
相比于前面来讲,日志展示反而是最简单的~ 直接根据条件在es里面查询就好了,其中最困难的就是,我们开发平时查询日志最多的就是grep操作,但是es会对你的日志内容进行分词的,会导致你有些时候根据关键字会查不出来,并且es中keyword不分词的字符串存储是有上限的(65536)。
所以我能想到的办法是,对于日志内容,使用两个字段分别存储,一个分词用于日常查询,一个不分词(截取日志的前1000个字符)用于精准查询需求。这样的话,可能80%的需求都能通过分词查询满足,剩下的20%精准查询需求,查日志1000个字符也基本上能满足了。
最终页面展示
查询优化建议
- 对于日志来讲,按照应用来分索引,避免单索引中日志量太庞大
- es索引结构避免使用数字类型,能使用keyword就使用keyword
- 使用MatchPhrase短语查询提升精度,来尽量避免使用wildcard模糊匹配来查询,如果同一时刻大家都在使用模糊查询搜日志,es集群可能会直接宕掉。
- 控制日期范围查询的时间段,对于日志查询来讲,很难会有需要查询数十天内所有日志的情况。