【实战】 elasticsearch 写入速度提升的案例分享
- 文章首发投稿至InfoQ,【侠梦的开发笔记】公众号,欢迎关注
基本配置
- 基本配置,5台配置为 24C 125G 17T 的主机,每台主机上搭建了一个elasticsearch节点。
采用的elasticsearch集群版本为7.1.1。管理工具包括kibana和cerebro。
应用案例
- 数据来源为kafka的三个topic,主要用于实时日志数据的存储和检索,由于实时性要求,所以需要将数据快速的写入到es中。
这里就分别称它们为TopicA、TopicB、TopicC吧。由于是调优写入,所以对源数据的一些基本的指标需要作出一个详细的梳理,便于后续分析。以下为三个topic的数据产生情况:
规划
Topic | 报文大小 | 数据量预估(天) | es索引分片数规划 |
---|---|---|---|
topicA | 900b | 2T | 30个主分片 |
topicB | 800b | 400G | 10个主分片 |
topicC | 750b | 300G | 10个主分片 |
问题重现
- 未做任何配置的情况下,分别使用java和logstash进行数据抽取,发现效率都不高,具体问题表现在:
- 1、kafka数据积压严重,消费跟不上生产的速度。
- 2、elasticsearch集群负载很高,大量写入被拒绝。
- 3、java程序频繁抛出RejectionException异常。
- 4、主机cpu异常的高。
操作系统层面及JVM的配置调整这里不再阐述,有很多关于此类的文章可以参考。
我们分模块对各个部分进行调整,具体细节如下:
写入程序优化
从定数到定量
- 使用的java程序中,我们将固定条数插入改为固定大小插入,由于使用的es版本较高,直接替换成了官方推荐的BulkProcessor方式。具体指定属性如下:
# 每2w条执行一次bulk插入
bulkActions: 20000
# 数据量达到15M后执行bulk插入
bulkSizeMb: 15
# 无论数据量多少,间隔20s执行一次bulk
flushInterval: 20
# 允许并发的bulk请求数
concurrentRequests: 10
- 这里的具体配置值,可以根据观察集群状态,来逐步增加。对于高版本的es,可以通过x-pack的监控页面观察索引速度进行相应调整,如果es版本较低,可以使用推荐的rest api进行逻辑封装。
在低版本的es中,统计写入速度的思路是:写一个程序定时检查索引的数据量,来计算。如果使用python,就两行代码就能获取索引的数据总量。
call_list = es.indices.stats(index=index)
total = call_list['indices'][index]['total']['indexing']['index_total']。
也可以隔几分钟用CURL来粗略统计单个索引的数据量大小。命令如下:
查询索引文档总量
curl -XGET -uname:pwd
'http://esip:port/_cat/count/index-name?v&format=json&pretty'
###### 启动多个进程
- 由于Bulkprocess是线程安全的,所以我们可以使用多线程的方式来共享一个批处理器。更好的消费方式是,启动多个消费程序进程,将其部署在不同的主机上,让多个进程中开启的多线程总数和topic的分区数相等,并且将他们设置为同一个消费组。每一个进程包含一个bulkprocess,可以提高消费和批量写入能力。同时可以避免单点问题,假如一个消费者进程挂掉,则kafka集群会重新平衡分区的消费者。少了消费者只是会影响消费速度,并不影响数据的处理。
###### “压测”,提升批量插入条数
- 通过对各个监控指标的观察,来判断是否能继续提高写入条数或增加线程数,从而达到最大吞吐量。
###### 一、观察集群负载Load Average值
- 负载值,一定程度上代表了CPU的繁忙程度,那我们如何来解读elasticsearch 监控页面的的负载值呢?如下是一个三个节点的集群,从左侧cerebo提供的界面来看,load值标红,表明es的负载可能有点高了,那么这个具体达到什么值会显示红色呢,让我们一起来研究研究。
- 先从主机层面说起,linux下提供了一个uptime命令来观察主机的负载
- 其中load average的三个值,分别代表主机在1分钟、5分钟、15分钟内的一个负载情况。有人可能会疑惑,26.01是代表主机的负载在26%的意思吗,从我们跑的es集群情况来看,这显然不是负载很低的表现。其实啊,在单个cpu的情况下,这个值是可以看做一个百分比的,比如负载为0.05,表明目前系统的负荷为5%。但我们的服务器一般都是多个处理器,每个处理器内部会包含多个cpu核心,所以这里负载显示的值,是和cpu的核心数有关的,如果非要用百分比来表示系统负荷的话,可以用具体的负载值 除以 服务器的总核心数,观察是否大于1。总核心数查看的命令为:
cat /proc/cpuinfo |grep -c 'model name'
- 这台主机显示为24,从26的负载来看,目前处理的任务需要排队了,这就是为什么负载标红的原因。同时,这里列举一下,如何查看CPU情况
总逻辑CPU数 = 物理CPU个数 X 每颗物理CPU的核数 X 超线程数
# 物理CPU个数(我们的服务器是2个)
cat /proc/cpuinfo| grep "physical id"| sort| uniq| wc -l
# 查看每个物理CPU中core的个数(就是核数)(6核)
cat /proc/cpuinfo| grep "cpu cores"| uniq
# 查看逻辑CPU的个数
cat /proc/cpuinfo| grep "processor"| wc -l
(显示24,不等于上面的cpu个数 * 每个cpu的核数,说明是开启了超线程)
###### 二、观察集群在 “忙什么”
- 通过tasks api可以直观的 观察到集群在忙什么?,包括父级任务,任务的持续时间等指标。命令如下:
curl -u username:pwd ip:port/_cat/tasks/?v | more
- 上面是我把副本设置为0后截的图。理论上还应该有一个bulk[s][r] 操作。可以看到目前写入很耗时,正常情况一批bulk操作应该是毫秒级的,这也从侧面说明了es的负载很高。
从task_id、parent_task_id可以看出,一个bulk操作下面分为写主分片的动作 和写副本的动作。其中:
indices:data/write/bulk[s][p]:s表示分片,p表示主分片。
indices:data/write/bulk[s][r]:s表示分片,r表示副本。
简易的写入流程
- 如下是bulk请求的简易写入流程,我们知道客户端会选择一个节点发送请求,这个节点被之称为协调节点,也叫客户端节点,但是在执行之前,如果定义了预处理的pipline操作(比如写入前将key值转换,或者增加时间戳等),则此写操作会被拦截并进行对应逻辑处理。
- 从图中可以看出,写入操作会现根据路由出来的规则,决定发送数据到那个分片上去,默认情况下,是通过数据的文档id来进行路由的,这能保证数据平均分配到各个节点上去,也可以自定义路由规则,具体定义方式我们在下面会讲到。
- 接着,请求发送到了主分片上,主分片执行成功后,会将请求再转发给相应的副本分片,在副本分片上执行成功后,这个请求才算是执行完毕,然后将执行结果返回给客户端。
- 可以看出多副本在写多读少的场景下,十分的消耗性能,近似的,多了几个副本就相当于重复写了几份数据。如果不考虑数据容灾,则可以适当的降低副本数量,或者去掉副本,提高写入速度。
在我们的集群里面并没有用到ingest角色类型的节点,这里提出来说也是为了便于大家更好理解各个节点的角色分工。
- 通过ES提供的API观察各个节点的热线程,api结果会显示出占用cpu高的线程,这也是我们可以优化的地方。大量写入场景下,这里一般大多数会显示:Lucene Merge Thread 或者[write],查询命令为:
GET /_nodes/hot_threads
三、观察集群线程池状态
- 避免大量写入被拒绝,可以通过观察elasticsearch后台日志或是通过使用Thread pool Api来观察内部线程池的使用情况,以及相应使用的队列大小,判断是否还可以继续调整写入配置参数。
curl -uusername:pwd-XGET "http://esip:port/_cat/thread_pool?v" | grep write
写入负载高的情况下,可能会出现大量拒绝,如下:
node-name | name | active | queue |
---|---|---|---|
node-3 | write | 4 | 0 |
node-1 | write | 3 | 2 |
node-2 | write | 9 | 1 |
主机部分
每个目录挂载不同的磁盘
- 在data目录下,我们分出了10个子目录,分别挂载到不同的硬盘上去。这相当于做了raid0。能大大的提高写入速度。
###### 配置多个path.data
- 由于在前面我们将10个目录分别挂载到不同的硬盘上去,所以在elasticsearch.yml的path.data属性中,我们配置多个路径,让数据能高效的写入不同的目录(硬盘),需要注意的是,如果只有一个索引,它的分片在某个节点的存储目录是固定的。所以这个特性,也只有在存在多个索引的情况下,能发挥出它的作用。
###### 一个主机启动两个节点
- es实例分配内存不会超过32G,对于主机数量固定的我们,如果125G的机器只放一个es节点,实属有点浪费,所以考虑在主机上启动两个es节点实例。
配置上需要注意关注以下几点:
– 1、http的端口、节点间通信的trasport端口设置。
– 2、节点的角色分配。
– 3、脑裂配置对应修改。
– 4、path.data属性修改(重要)
– 5、path.logs属性修改。
###### 修改path.data配置,使同一主机两个节点均分硬盘
– 这里着重说一下第4点,同一个主机启动两个实例后,我们将path.data配置从原来的10个目录改为了各自配置5个不同目录。
path.data: /data01/esdata,/data02/esdata,/data03/esdata
,/data04/esdata,/data05/esdata
- 一方面是 能够控制分片的分配,避免太多分片分配到一台主机上的其中一个节点上。另一方面是避免两个es进程对同一磁盘进行写入。随机写造成的磁头非常频繁的大面积移动肯定比单进程的顺序写入慢,这也是我们提高写入速度的初衷。
###### 更换ssd
– ssd能成倍的提高写入速度,如果使用ssd,可能就不会折腾这篇文章出来了。
elasticsearch部分
节点角色的设置
- elasticsearch提供几种类型的节点角色设置,需要在elasticsearch.yml配置中指定。
类型。
###### 指定索引模板
可以根据需要修改,具体配置含义不再细说。
{
"order": 0,
"index_patterns": [
"topicA*"
],
"settings": {
"index": {
"refresh_interval": "40s",
"number_of_shards": "30",
"translog": {
"flush_threshold_size": "1024mb",
"sync_interval": "120s",
"durability": "async"
},
"number_of_replicas": "0",
"merge": {
"scheduler": {
"max_thread_count": "1"
}
}
}
},
"mappings": {
},
"aliases": {}
}
计算分片数
- 需要注意分片数量最好设置为节点数的整数倍,保证每一个主机的负载是差不多一样的,特别如果是一个主机部署多个实例的情况,更要注意这一点,否则可能遇到其他主机负载正常,就某个主机负载特别高的情况。
一般我们根据每天的数据量来计算分片,保持每个分片的大小在50G以下比较合理。如果还不能满足要求,那么可能需要在索引层面通过拆分更多的索引或者通过别名+按小时 创建索引的方式来实现了。
控制分片均分在各个主机上
- 以TopicA数据的一个索引为例,共30个分片,在10个节点上分配,应该每个节点分配3个分片,一个主机上一共有6个分片才算是均衡。如果分配不是这样,可以使用cerebo或者通过命令行进行分片迁移。
curl -X POST "localhost:9200/_cluster/reroute?pretty" -H 'Content-Type: application/json' -d'
{
"commands" : [
{
"move" : {
"index" : "test", "shard" : 0,
"from_node" : "node1", "to_node" : "node2"
}
}
]
}
配置索引缓冲区
- 即是指定 indices.memory.index_buffer_size的大小,这个是一个静态变量,需要修改配置文件,重启后才能生效。
参考的计算公式:indices.memory.index_buffer_size / shards_count > 512MB(超过这个值索引性能并不会有太明显提高)
shards_count为一个节点上面的分片数量,可以配置具体指或者一个占用Es内存总值的百分比。这里我们修改成了20%(默认10%)。
路由分片
- 可以使用elasticsearch提供的routing特性,将数据按一定规则计算后(内部采用hash算法,把相同hash值的文档放入同一个分片中),默认情况下是使用DocId来计算,写入到分片,查询时指定routing查询,则可以提高查询速度,避免了扫描过多的分片带来的性能开销。首先,在创建索引模板的时候,需要在mappings中增加配置,要求匹配到此索引模板的索引,必须配置routing:
"_routing": {
"required": true
}
第二步、为BulkPorcess创建IndexRequest时,通过routing(java.lang.String routing) 方法指定参与计算hash的值。
注意这里是具体的值,而不是字段名称。
经过如上的调优配置,三个Topic数据都能正常写入,集群文档总数在170亿,33个索引,每个索引保留4天,242个分片,整体负载不高。
踩过的坑
1、节点角色的设置方面
- 如果集群中节点数量不多,并且不需要对数据进行预处理,那么其实可以放弃使用Ingest类型的节点。默认情况下所有的节点的默认设置都为true。所以我们手动将主节点和数据节点做如下设置
node.ingest: false
但是需要注意一点,x-pack监控用到了这种类型的节点。会如下错误:
failed to flush export bulks 、no ingest node
解决办法是,打开这个属性配置,或者elasticsearch.yml中指定:
xpack.monitoring.exporters.my_local: type:
xpack.monitoring.exporters.local use_ingest: false
2、elasticsearch 线程池相关配置参数改变
- 从5.0版本以后,禁止了修改各个模块线程池的类型,线程池相关配置的前缀从threadpool 变成了thread_pool.并且线程池相关配置级别上升至节点级配置,禁止通过使用API修改,因为场景是写多读少,所以我们只是增加了写队列的大小,配置为: thread_pool.write.queue_size: 1000。
只能通过修改配置文件的方式修改。
3、单台主机负载过高
- 同一个主机两个节点都是数据节点,并且分片分配不均匀,导致这个主机CPU使用率在98%左右,后面通过迁移分片的方式将负载降低。
4、使用自定义的routing规则后带来的写热点问题
- 比如按省份分的数据, 省份为北京的数据过多,西藏的数据很少,可能会带来写热点问题。所以合理的路由分配同样很重要。
5、数据时区差8小时问题
- 将业务时间转换为带时区的字符串。yyyy-MM-dd’T’HH:mm:ss.SSSZ
参考文章:
http://kane-xie.github.io/2017/09/09/2017-09-09_Elasticsearch%E5%86%99%E5%85%A5%E9%80%9F%E5%BA%A6%E4%BC%98%E5%8C%96/
https://www.elastic.co/guide/en/elasticsearch/reference/5.0/breaking_50_settings_changes.html
https://elasticsearch.cn/question/1915
https://juejin.im/entry/5d0f17cce51d454d544abf7f
欢迎来公众号【侠梦的开发笔记】 一起交流进步