使用Logstash同步数据至Elasticsearch,Spring Boot中集成Elasticsearch实现搜索
安装logstash、同步数据至ElasticSearch
为什么使用logstash来同步,CSDN上有一篇文章简要的分析了以下几种同步工具的优缺点:https://blog.csdn.net/laoyang360/article/details/51694519。
下面开始实践:
1. 下载Logstash 安装包,需要注意版本与elasticsearch保持一致,windows系统下直接解压即可。
2.添加同步mysql数据库的配置,并将mysql连接驱动jar包放在指定的配置目录
注: 目前版本的logstash已经集成了logstash-jdbc-input,不需要再配置这个插件,
配置文件需要UTF-8编码,我在配置过程中开始新建文件默认的是GBK编码,后面启动logstash后读取配置文件报编码错误了。
我的配置目录:
配置文件mysql.conf
1 input { 2 stdin { 3 } 4 jdbc { 5 # mysql数据库连接 6 jdbc_connection_string => "jdbc:mysql://localhost/blog?characterEncoding=utf-8&useSSL=false&serverTimezone=UTC" 7 # mysqly用户名和密码 8 jdbc_user => "root" 9 jdbc_password => "123456" 10 # 驱动配置 11 jdbc_driver_library => "C:/logstash-6.4.0/mysqletc/mysql-connector-java-6.0.5.jar" 12 # 驱动类名 13 jdbc_driver_class => "com.mysql.cj.jdbc.Driver" 14 jdbc_paging_enabled => "true" 15 jdbc_page_size => "50000" 16 # 执行指定的sql文件 17 statement_filepath => "C:/logstash-6.4.0/mysqletc/blog.sql" 18 # 设置监听 各字段含义 分 时 天 月 年 ,默认全部为*代表含义:每分钟都更新 19 schedule => "* * * * *" 20 # 索引类型 21 type => "blog" 22 } 23 } 24 25 filter { 26 json { 27 source => "message" 28 remove_field => ["message"] 29 } 30 } 31 32 output { 33 34 elasticsearch { 35 #es服务器 36 hosts => ["localhost:9200"] 37 #ES索引名称 38 index => "sl_blog" 39 #自增ID 40 document_id => "%{id}" 41 } 42 43 44 stdout { 45 codec => json_lines 46 } 47 }
Blog.sql文件:
SELECT * FROM blog
如果需要同步多个mysql表,可以修改output配置文件mysql.conf,在input和output中添加其他的表。
3. 启动logstash,正常的话将会同步数据值elasticsearch,根据上面的配置logstash每分钟去数据库读取最新数据
logstash -f ../mysqletc/mysql.conf
Elasticseach-head插件中查看:
SpringBoot中集成Elasticsearch
下面以开源博客系统new-star-blog(https://github.com/waylau/new-star-blog)为例,简单实现这个功能。
环境:Springboot 2.0.4 +ES 6.4.0
1. 添加Spring DataElastic依赖:
<!--SpringBoot默认使用SpringData ElasticSearch模块进行操作--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-elasticsearch</artifactId> </dependency>
Spring Data Elastic是Spring官方提供的访问Elasticsearch的方式,相对于直接REST访问,它有提供了完善的封装,Spring Data Elastic遵循Spring Data规范。另外它屏蔽了Elasticsearch REST接口的复杂性,由Spring内部实现对Elastic 接口的封装。
2. 添加Elasticsearch服务器配置:
#开启 Elasticsearch 仓库(默认值:true)
spring.data.elasticsearch.repositories.enabled=true
#默认 9300 是 Java 客户端的端口。9200 是支持 Restful HTTP 的接口
spring.data.elasticsearch.cluster-nodes = 127.0.0.1:9300
# ES设置连接超时时间
#spring.data.elasticsearch.properties.transport.tcp.connect_timeout=120s
3. 实现代码:
1 package com.sl.blog.domain; 2 import org.springframework.data.elasticsearch.annotations.Document; 3 import org.springframework.data.annotation.Id; 4 import java.io.Serializable; 5 import java.util.Date; 6 //映射Elasticsearch中的索引和文档类型 7 @Document(indexName = "sl_blog", type = "blog") 8 public class EsBlog implements Serializable { 9 10 private static final long serialVersionUID = 1L; 11 @Id // 主键 12 private String id; 13 14 private String title; 15 16 //@Field(index = false) 17 private Date create_time; 18 19 //@Field(index = false,type = FieldType.Long) 20 private Long user_id; 21 22 private String tags; 23 24 //@Field(index = false,type = FieldType.Long) 25 private Long read_size; 26 27 28 private Long catalog_id; 29 30 private String summary; 31 32 //@Field(index = false,type = FieldType.Long) 33 private Long comment_size; 34 35 //@Field(index = false,type = FieldType.Long) 36 private Long like_size; 37 38 private String content; 39 40 private String username; 41 42 protected EsBlog() { 43 } 44 45 public String getId() { 46 return id; 47 } 48 49 public void setId(String id) { 50 this.id = id; 51 } 52 53 public String getTitle() { 54 return title; 55 } 56 57 public void setTitle(String title) { 58 this.title = title; 59 } 60 61 62 public Date getCreate_time() { 63 return create_time; 64 } 65 66 public void setCreate_time(Date create_time) { 67 this.create_time = create_time; 68 } 69 70 public Long getUser_id() { 71 return user_id; 72 } 73 74 public void setUser_id(Long user_id) { 75 this.user_id = user_id; 76 } 77 78 public String getTags() { 79 return tags; 80 } 81 82 public void setTags(String tags) { 83 this.tags = tags; 84 } 85 86 public Long getRead_size() { 87 return read_size; 88 } 89 90 public void setRead_size(Long read_size) { 91 this.read_size = read_size; 92 } 93 94 95 public Long getCatalog_id() { 96 return catalog_id; 97 } 98 99 public void setCatalog_id(Long catalog_id) { 100 this.catalog_id = catalog_id; 101 } 102 103 public String getSummary() { 104 return summary; 105 } 106 107 public void setSummary(String summary) { 108 this.summary = summary; 109 } 110 111 public Long getComment_size() { 112 return comment_size; 113 } 114 115 public void setComment_size(Long comment_size) { 116 this.comment_size = comment_size; 117 } 118 119 public Long getLike_size() { 120 return like_size; 121 } 122 123 public void setLike_size(Long like_size) { 124 this.like_size = like_size; 125 } 126 127 public String getContent() { 128 return content; 129 } 130 131 public void setContent(String content) { 132 this.content = content; 133 } 134 135 public String getUsername() { 136 return username; 137 } 138 139 public void setUsername(String username) { 140 this.username = username; 141 } 142 143 }
View Code
1 package com.sl.blog.repository; 2 3 import com.sl.blog.domain.EsBlog; 4 import org.springframework.data.domain.Page; 5 import org.springframework.data.domain.Pageable; 6 import org.springframework.data.elasticsearch.repository.ElasticsearchCrudRepository; 7 import org.springframework.stereotype.Component; 8 9 @Component 10 public interface IEsBlogRepository extends ElasticsearchCrudRepository<EsBlog,String> { 11 12 Page<EsBlog> findDistinctEsBlogByTitleContainingOrSummaryContainingOrContentContainingOrTagsContaining(String title, String Summary, String content, String tags, Pageable pageable); 13 14 }
View Code
1 package com.sl.blog.service; 2 3 import com.sl.blog.domain.EsBlog; 4 import org.springframework.data.domain.Page; 5 import org.springframework.data.domain.Pageable; 6 7 public interface IEsBlogService { 8 9 Page<EsBlog> getEsBlogByKeys(String keyword, Pageable pageable); 10 }
View Code
1 package com.sl.blog.service.impl; 2 3 import com.sl.blog.domain.EsBlog; 4 import com.sl.blog.repository.IEsBlogRepository; 5 import com.sl.blog.service.IEsBlogService; 6 import joptsimple.internal.Strings; 7 import org.elasticsearch.index.query.BoolQueryBuilder; 8 import org.elasticsearch.index.query.QueryBuilders; 9 import org.springframework.beans.factory.annotation.Autowired; 10 import org.springframework.data.domain.Page; 11 import org.springframework.data.domain.PageRequest; 12 import org.springframework.data.domain.Pageable; 13 import org.springframework.data.domain.Sort; 14 import org.springframework.data.elasticsearch.core.ElasticsearchTemplate; 15 import org.springframework.data.elasticsearch.core.query.NativeSearchQuery; 16 import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder; 17 import org.springframework.stereotype.Service; 18 19 @Service 20 public class EsBlogServiceImpl implements IEsBlogService { 21 22 23 @Autowired 24 private ElasticsearchTemplate elasticsearchTemplate; 25 26 @Autowired 27 private IEsBlogRepository esBlogRepository; 28 29 /** 30 * 通过关键字搜索 31 * @param keyword 32 * @param pageable 33 * @return 34 */ 35 @Override 36 public Page<EsBlog> getEsBlogByKeys(String keyword, Pageable pageable){ 37 Sort sort = new Sort(Sort.Direction.DESC,"read_size","comment_size","like_size"); 38 if (pageable.getSort() == null) { 39 pageable = new PageRequest(pageable.getPageNumber(), pageable.getPageSize(), sort); 40 } 41 if(Strings.isNullOrEmpty(keyword)){ 42 return esBlogRepository.findAll(pageable); 43 } 44 //keyword 含有空格时抛异常 45 //return esBlogRepository.findDistinctEsBlogByTitleContainingOrSummaryContainingOrContentContainingOrTagsContaining(keyword, keyword, keyword, keyword, pageable); 46 47 //使用 Elasticsearch API QueryBuilder 48 NativeSearchQueryBuilder aNativeSearchQueryBuilder = new NativeSearchQueryBuilder(); 49 aNativeSearchQueryBuilder.withIndices("sl_blog").withTypes("blog"); 50 final BoolQueryBuilder aQuery = new BoolQueryBuilder(); 51 //builder下有的must、should、mustNot 相当于逻辑运算and、or、not 52 aQuery.should(QueryBuilders.queryStringQuery(keyword).defaultField("title")); 53 aQuery.should(QueryBuilders.queryStringQuery(keyword).defaultField("summary")); 54 aQuery.should(QueryBuilders.queryStringQuery(keyword).defaultField("tags")); 55 aQuery.should(QueryBuilders.queryStringQuery(keyword).defaultField("content")); 56 57 NativeSearchQuery nativeSearchQuery = aNativeSearchQueryBuilder.withQuery(aQuery).build(); 58 Page<EsBlog> plist = elasticsearchTemplate.queryForPage(nativeSearchQuery,EsBlog.class); 59 return plist; 60 61 } 62 63 }
View Code
1 @Controller 2 @RequestMapping("/blogs") 3 public class BlogController { 4 5 @Autowired 6 private IEsBlogService esBlogService; 7 8 @GetMapping 9 public String listBlogs(@RequestParam(value="order",required=false,defaultValue="new") String order, 10 @RequestParam(value="keyword",required=false,defaultValue="" ) String keyword, 11 @RequestParam(value="async",required=false) boolean async, 12 @RequestParam(value="pageIndex",required=false,defaultValue="0") int pageIndex, 13 @RequestParam(value="pageSize",required=false,defaultValue="5") int pageSize, 14 Model model) { 15 Pageable pageable = new PageRequest(pageIndex,pageSize); 16 Page<EsBlog> page = esBlogService.getEsBlogByKeys(keyword,pageable); 17 List<EsBlog> list = page.getContent(); 18 model.addAttribute("order", order); 19 model.addAttribute("keyword", keyword); 20 model.addAttribute("page", page); 21 model.addAttribute("blogList", list); 22 return (async==true?"/index :: #mainContainerRepleace":"/index"); 23 } 24 }
View Code
演示效果: