ES理解
ES就是一套搜索引擎技术,它是elastic技术栈中的一部分。完整的技术栈包括:
- Elasticsearch:用于数据存储、计算和搜索
- Logstash/Beats:用于数据收集
- Kibana:用于数据可视化
ES为什么快?他的搜索逻辑是什么?为什么不用mysql搜索?
如果使用mysql进行搜索,比如我要查询“手机”这个关键字,通常会使用like %手机%,这个操作会导致mysql索引失效(%前缀索引失效),就会进行全表扫描,逐条搜索判断。ES是怎么做的呢,他是使用倒排索引技术,他内置有一个分词算法,会将数据进行分词管理,比如有一个字段为“中国华为手机牛”,就会分成"中国"“华为”“手机”“牛”,然后将分词以后的词条当做索引,再创建文档id列,这样搜索某个关键词就会去文档ID找,因为词条有索引,所以很快,再通过文档id,查询文档。这样因为都使用到了索引所以非常快。

分词器
通常使用ik分词器,支持自定义分词和停止词。这里我使用的时候出现了点小问题,我是在线安装的,查看日志docker logs es | grep -i "dictionary"发现我的字典加载位置和文档不一样。/usr/share/elasticsearch/config/analysis-ik/IKAnalyzer.cfg.xml我的配置类在这里,所以要修改这里的配置文件并添加ext.dic。
1 2 3 4 5 6 7 8 9 10 11 12 13
| <?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd"> <properties> <comment>IK Analyzer 扩展配置</comment> <entry key="ext_dict">ext.dic</entry> <entry key="ext_stopwords"></entry> </properties>
|
AOP通过MQ异步同步数据
当我们通过REST接口对商品数据进行增修删的时候要保证ES同步更新,就和redis保持一致性一样的道理,这里因为我们之前代码都开发好了,所以想到了AOP进行业务解耦,使用AOP监控Controller层,传参情况就两种raw(json)数据和路径传参。json在java中已经按照dto封装好了,路径传参也都会有itemId,那我要做的事情就是拿到商品id就好了,我使用@AfterReturning后置返回通知,这样会在controller层执行完以后再执行拦截类的代码。这样数据都已经到数据库了,然后可以发itemId到MQ,当MQ监听到使用了哪些方法以后,消费者进行处理,可以通过id拿新的数据增加到es,修改或者删除就不多说了。看实现逻辑吧。
先自定义注解。标记需要发送消息的方法,标识那些执行后需要向消息队列发送消息的业务方法
1 2 3 4 5 6
| @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface SeedMessageToMq { String exchange(); String routingKey(); }
|
这里我写了一个命名类,比较优雅。(前期)将es索引库名和MQ收到的工厂和队列以及Routing key进行命名
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public class MqConstants { public static final String SEARCH_ITEM_INDEX = "search_item_index"; public static final String SEARCH_ITEM_DIRCT_EXCHANGE = "search-item-dirct-exchange"; public static final String SEARCH_ITEM_INSERT_KEY = "search-item-insert-key"; public static final String SEARCH_ITEM_INSERT_QUEUE = "search-item-insert-queue"; public static final String SEARCH_ITEM_DELETE_KEY = "search-item-delete-key"; public static final String SEARCH_ITEM_DELETE_QUEUE = "search-item-delete-queue"; public static final String SEARCH_ITEM_UPDATE_KEY = "search-item-update-key"; public static final String SEARCH_ITEM_UPDATE_QUEUE = "search-item-update-queue"; public static final String SEARCH_ITEM_UPDATE_STATUS_KEY = "search-item-update-status-key"; public static final String SEARCH_ITEM_UPDATE_STATUS_QUEUE = "search-item-update-status-queue"; }
|
对controller层需要的代码进行拦截
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| @SeedMessageToMq(exchange = SEARCH_ITEM_DIRCT_EXCHANGE, routingKey = SEARCH_ITEM_INSERT_KEY) @ApiOperation("新增商品") @PostMapping public void saveItem(@RequestBody ItemDTO item) { itemService.save(BeanUtils.copyBean(item,Item.class)); } @SeedMessageToMq(exchange = SEARCH_ITEM_DIRCT_EXCHANGE, routingKey = SEARCH_ITEM_UPDATE_STATUS_KEY) @ApiOperation("更新商品状态") @PutMapping("/status/{id}/{status}") public void updateItemStatus(@PathVariable("id") Long id, @PathVariable("status") Integer status){ Item item = new Item(); item.setId(id); item.setStatus(status); itemService.updateById(item); } @SeedMessageToMq(exchange = SEARCH_ITEM_DIRCT_EXCHANGE, routingKey = SEARCH_ITEM_UPDATE_KEY) @ApiOperation("更新商品") @PutMapping public void updateItem(@RequestBody ItemDTO item) { item.setStatus(null); itemService.updateById(BeanUtils.copyBean(item, Item.class)); } @SeedMessageToMq(exchange = SEARCH_ITEM_DIRCT_EXCHANGE, routingKey = SEARCH_ITEM_DELETE_KEY) @ApiOperation("根据id删除商品") @DeleteMapping("{id}") public void deleteItemById(@PathVariable("id") Long id) { itemService.removeById(id); }
|
然后定义切面和拦截后执行的方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| @Aspect @RequiredArgsConstructor @Component @Slf4j public class RabbitMessageAOP {
@Autowired private RabbitMqHelper rabbitMqHelper; @Pointcut("execution(* com.hmall.item.controller.ItemController.*(..)) && @annotation(com.hmall.item.annotation.SeedMessageToMq)") public void seedMessageToMqPointcut() {} @AfterReturning(pointcut = "seedMessageToMqPointcut()") public void seedMessage(JoinPoint joinPoint) { MethodSignature signature = (MethodSignature) joinPoint.getSignature(); Method method = signature.getMethod(); SeedMessageToMq annotation = method.getAnnotation(SeedMessageToMq.class); String exchange = annotation.exchange(); String routingKey = annotation.routingKey(); Object[] args = joinPoint.getArgs(); System.out.println("参数列表:"+ Arrays.toString(args)); if (args.length == 0){ log.info("没收到任何消息"); return; } Long itemId = null; ObjectMapper objectMapper = new ObjectMapper(); if (routingKey.equals(SEARCH_ITEM_INSERT_KEY) || routingKey.equals(SEARCH_ITEM_UPDATE_KEY)){ ItemDTO itemDTO = objectMapper.convertValue(args[0], ItemDTO.class); itemId = itemDTO.getId(); }else if (routingKey.equals(SEARCH_ITEM_UPDATE_STATUS_KEY) || routingKey.equals(SEARCH_ITEM_DELETE_KEY)){ itemId = objectMapper.convertValue(args[0], Long.class); } log.info("{}商品————准备发送商品ID:{}到MQ", routingKey, itemId); rabbitMqHelper.sendMessage(exchange,routingKey,itemId); } }
|
上面已经对MQ发送了消息,现在进行监听并处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
| @Slf4j @Component
public class ListenerItemMqToEs {
@Autowired private IItemService itemService;
private final RestHighLevelClient client = new RestHighLevelClient(RestClient.builder( HttpHost.create("http://192.168.219.128:9200") ));
@RabbitListener(bindings = @QueueBinding( value = @Queue(value = SEARCH_ITEM_INSERT_QUEUE, durable = "true"), exchange = @Exchange(value = SEARCH_ITEM_DIRCT_EXCHANGE), key = SEARCH_ITEM_INSERT_KEY) ) public void listenerInsertItem(Long itemId) { log.info("收到商品ID{},准备添加", itemId); Item item = itemService.getById(itemId); ItemDoc itemDoc = BeanUtils.copyBean(item, ItemDoc.class); String doc = JSONUtil.toJsonStr(itemDoc); IndexRequest indexRequest = new IndexRequest(SEARCH_ITEM_INDEX).id(itemDoc.getId()).source(doc, XContentType.JSON); try { client.index(indexRequest, RequestOptions.DEFAULT); } catch (IOException e) { throw new RuntimeException(e); } }
@RabbitListener(bindings = @QueueBinding( value = @Queue(value = SEARCH_ITEM_DELETE_QUEUE, durable = "true"), exchange = @Exchange(value = SEARCH_ITEM_DIRCT_EXCHANGE), key = SEARCH_ITEM_DELETE_KEY) ) public void listenerDeleteItem(Long itemId) { log.info("收到商品ID{},准备删除", itemId); DeleteRequest deleteRequest = new DeleteRequest(SEARCH_ITEM_INDEX).id(String.valueOf(itemId)); try { client.delete(deleteRequest, RequestOptions.DEFAULT); } catch (IOException e) { throw new RuntimeException(e); } }
@RabbitListener(bindings = @QueueBinding( value = @Queue(value = SEARCH_ITEM_UPDATE_STATUS_QUEUE, durable = "true"), exchange = @Exchange(value = SEARCH_ITEM_DIRCT_EXCHANGE), key = SEARCH_ITEM_UPDATE_STATUS_KEY) ) public void listenerUpdateStatusItem(Long itemId) { Item item = itemService.getById(itemId); if (item.getStatus() == 3) { log.info("收到商品状态为3:ID{},准备删除", itemId); listenerDeleteItem(itemId); return; } log.info("收到商品ID{},准备更新状态", itemId); UpdateRequest updateRequest = new UpdateRequest(SEARCH_ITEM_INDEX, String.valueOf(itemId)); updateRequest.doc("status", item.getStatus()); try { client.update(updateRequest, RequestOptions.DEFAULT); } catch (IOException e) { throw new RuntimeException(e); } }
@RabbitListener(bindings = @QueueBinding( value = @Queue(value = SEARCH_ITEM_UPDATE_QUEUE, durable = "true"), exchange = @Exchange(value = SEARCH_ITEM_DIRCT_EXCHANGE), key = SEARCH_ITEM_UPDATE_KEY) ) public void listenerUpdateItem(Long itemId) { Item item = itemService.getById(itemId); log.info("收到商品ID{},准备更新", itemId); ItemDoc itemDoc = BeanUtils.copyBean(item, ItemDoc.class); String doc = JSONUtil.toJsonStr(itemDoc); UpdateRequest updateRequest = new UpdateRequest(SEARCH_ITEM_INDEX, String.valueOf(itemId)); updateRequest.upsert(doc, XContentType.JSON); try { client.update(updateRequest, RequestOptions.DEFAULT); } catch (IOException e) { throw new RuntimeException(e); } } }
|
RestClient查询
这里我直接用es处理项目中搜索框接口了,es数据提前做好了预热
/search/list
1 2 3 4 5
| @ApiOperation("搜索商品") @GetMapping("/list") public PageDTO<ItemDTO> search(ItemPageQuery query) { return searchService.searchItemFilterEs(query); }
|
实现层
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
| @Override public PageDTO<ItemDTO> searchItemFilterEs(ItemPageQuery query) { SearchRequest request = new SearchRequest("items_new"); SearchSourceBuilder source = buildSearchRequest(request,query); Integer pageNo = query.getPageNo(); Integer pageSize = query.getPageSize(); source.from((pageNo - 1) * pageSize).size(pageSize); SearchResponse response; try { response = client.search(request, RequestOptions.DEFAULT); }catch (Exception e){ throw new RuntimeException(e); } return handleResponse(response); }
private SearchSourceBuilder buildSearchRequest(SearchRequest request,ItemPageQuery query) { BoolQueryBuilder bool = QueryBuilders.boolQuery(); if (StrUtil.isNotEmpty(query.getKey())){ bool.must(QueryBuilders.matchQuery("name", query.getKey())); } if (StrUtil.isNotEmpty(query.getCategory())){ bool.filter(QueryBuilders.termQuery("category.keyword", query.getCategory())); } if (StrUtil.isNotEmpty(query.getBrand())){ bool.filter(QueryBuilders.termQuery("brand.keyword", query.getBrand())); } bool.filter(QueryBuilders.rangeQuery("price").gt(query.getMinPrice()).lt(query.getMaxPrice())); SearchSourceBuilder source = request.source(); source.query(bool); String sortBy = query.getSortBy(); if (StrUtil.isNotEmpty(sortBy)) { if (query.getIsAsc()) source.sort(sortBy, SortOrder.ASC); else source.sort(sortBy, SortOrder.DESC); } source.highlighter( SearchSourceBuilder.highlight() .field("name") ); return source; }
private PageDTO<ItemDTO> handleResponse(SearchResponse search) { Page<ItemDTO> page = new Page<>(); SearchHits searchHits = search.getHits(); assert searchHits.getTotalHits() != null; long total = searchHits.getTotalHits().value; page.setTotal(total); SearchHit[] hits = searchHits.getHits(); List<ItemDTO> records = new ArrayList<>(); for (SearchHit hit : hits){ String source = hit.getSourceAsString(); ItemDTO itemDTO = JSONUtil.toBean(source, ItemDTO.class); records.add(itemDTO); Map<String, HighlightField> hfs = hit.getHighlightFields(); if (CollUtils.isNotEmpty(hfs)) { HighlightField hf = hfs.get("name"); if (hf != null) { String hfName = hf.getFragments()[0].toString(); itemDTO.setName(hfName); } } } page.setRecords(records); page.setSize(pageSize); return PageDTO.of(page); }
|
数据聚合
这里主要是一个动态的过滤条件搜索。目前搜索的商品有什么过滤条件就有啥。比如选了品牌是小米,生效的分类就只有手机 拉杆箱 和 电视

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| @Override
public JSONObject filter(ItemPageQuery query) { SearchRequest request = new SearchRequest("items_new"); SearchSourceBuilder source = buildSearchRequest(request,query); source.size(0); source.aggregation(AggregationBuilders.terms("brand_agg").field("brand.keyword").size(20)) .aggregation(AggregationBuilders.terms("category_agg").field("category.keyword").size(20)); SearchResponse response; try { response = client.search(request, RequestOptions.DEFAULT); }catch (Exception e){ throw new RuntimeException(e); } Aggregations aggregations = response.getAggregations(); if (aggregations == null) { return new JSONObject(); } ParsedTerms brandAgg = aggregations.get("brand_agg"); ParsedTerms categoryAgg = aggregations.get("category_agg"); JSONObject result = new JSONObject(); result.putByPath("brand", getBuckets(brandAgg)); result.putByPath("category",getBuckets(categoryAgg)); return result; }
private List<String> getBuckets(ParsedTerms terms) { if (terms == null) { return new ArrayList<>(); } List<String> list = new ArrayList<>(); for (Terms.Bucket bucket : terms.getBuckets()) { list.add(bucket.getKeyAsString()); } return list; }
|
一、核心结论
ES(Elasticsearch)的倒排索引(Inverted Index)是全文检索的核心数据结构,本质是 “词条(Term)→ 文档(Document) 的映射关系”—— 先对文档内容分词,再以词条为 key,记录包含该词条的所有文档信息,从而实现 “输入关键词快速定位文档”,彻底摆脱传统数据库 “逐文档扫描” 的低效模式。
简单说:传统数据库是 “找文档→看内容”(正排索引),ES 倒排索引是 “找关键词→找文档”,这也是 ES 全文检索快的根本原因。
二、倒排索引的核心结构(3 个关键部分)
倒排索引由 “词典 + 倒排列表 + 元数据” 组成,三者协同实现高效检索:
1. 词典(Dictionary/Term Dictionary)
- 核心定义:所有文档分词后,去重得到的唯一词条集合(相当于 “关键词字典”)。
- 存储内容:比如文档中出现的 “苹果”“手机”“性价比” 等去重后的词条。
- 关键优化:词典会按字母 / 拼音排序(如字典序),并通过哈希表、跳表等结构加速词条查找,确保 “输入关键词→快速找到对应词条”。
2. 倒排列表(Posting List)
- 核心定义:词典中每个词条对应的 “文档匹配信息列表”,是倒排索引的核心数据载体。
- 存储内容(每条记录称为 “倒排项 / Posting”):
- 文档 ID(Document ID):包含该词条的文档唯一标识;
- 词频(TF):该词条在当前文档中出现的次数(影响相关性打分,出现越多越相关);
- 位置(Position):词条在文档中的具体位置(如第 3 段第 5 个词),支持短语查询(如 “苹果 手机” 必须连续出现);
- 偏移量(Offset):词条在文档中的字符起始 / 结束位置(支持高亮显示查询结果)。
- 示例:词条 “手机” 的倒排列表可能是:
[(doc1, TF=3, positions=[5,12,28]), (doc3, TF=2, positions=[8,15]), ...]
- 核心定义:辅助检索和排序的补充信息,存储在词典或倒排列表中。
- 关键信息:
- 逆文档频率(IDF):包含该词条的文档总数的倒数(影响相关性打分,越稀有词条权重越高);
- 字段信息:词条来自文档的哪个字段(如 title、content,支持按字段检索);
- 文档长度:用于归一化词频(避免长文档因词条多而打分偏高)。
三、倒排索引的构建流程(从文档到索引)
ES 创建倒排索引的过程的核心是 “分词→建映射”,步骤如下:
- 文档预处理:获取原始文档(如一篇文章、一条商品信息),提取需要检索的字段(如 title、content);
- 分词(Tokenization):对每个字段的文本内容进行分词,比如 “苹果手机性价比高” 会拆分为 “苹果”“手机”“性价比”“高”(分词器可自定义,如 IK 分词器支持中文分词,Standard 分词器按空格 / 标点拆分);
- 词条标准化(Normalization):对分词结果去重、统一格式,比如:
- 大小写统一(“Apple”→“apple”);
- 去除停用词(“的”“是”“and” 等无意义词汇);
- 词干提取(“running”→“run”,“购物”→“购”);
- 构建词典和倒排列表:
- 将标准化后的词条加入词典(去重、排序);
- 为每个词条创建倒排列表,记录其在所有文档中的位置、词频等信息;
- 优化存储:对词典(如跳表优化查找)和倒排列表(如压缩存储文档 ID,减少内存占用)进行优化,提升检索速度。
四、倒排索引的查询流程(从关键词到结果)
用户输入查询词后,ES 通过倒排索引快速匹配文档,步骤如下:
- 查询预处理:对用户输入的查询词(如 “高性价比 苹果手机”)进行分词、标准化(和索引构建时逻辑一致);
- 词典查找:在词典中快速定位每个查询词条,获取对应的倒排列表;
- 倒排列表合并:根据查询逻辑(AND/OR)合并多个词条的倒排列表:
- AND 逻辑(如 “苹果 AND 手机”):取多个倒排列表的交集(仅保留同时包含所有词条的文档);
- OR 逻辑(如 “苹果 OR 手机”):取多个倒排列表的并集(保留包含任意一个词条的文档);
- 相关性打分(TF-IDF/BM25):对合并后的文档按 “词频(TF)× 逆文档频率(IDF)” 等算法打分,分数越高相关性越强;
- 返回结果:按打分排序,返回 Top N 文档,并根据偏移量高亮显示查询词条。
五、倒排索引 vs 正排索引(核心区别)
| 对比维度 |
倒排索引(ES 核心) |
正排索引(传统数据库) |
| 映射关系 |
词条 → 文档(关键词找文档) |
文档 → 词条(文档找内容) |
| 适用场景 |
全文检索(如关键词查询、短语查询) |
按主键 / 条件查询文档(如 SELECT * FROM table WHERE id=1) |
| 检索速度 |
关键词检索极快(O (1) 定位词条,合并列表即可) |
全文检索极慢(需逐文档扫描内容) |
| 存储开销 |
较高(需存储词条、倒排列表等信息) |
较低(仅存储原始文档) |
六、ES 对倒排索引的关键优化(提升性能)
- 词典优化:用 “有限状态机(FST)” 存储词典,兼顾查询速度和内存占用(FST 能将多个词条压缩存储,且查询时间与词条长度无关);
- 倒排列表压缩:对文档 ID 采用 “差值编码 + 变长整数” 压缩(如连续文档 ID 存储差值,减少存储量),对位置信息采用 “间隙编码” 压缩;
- 分片与副本:倒排索引按分片(Shard)拆分存储,每个分片是独立的索引,支持并行检索;副本(Replica)提供冗余备份,同时分担查询压力;
- 分段(Segment)存储:索引创建时先写入内存,定期刷盘生成 “分段(Segment)”,多个分段通过 “提交点(Commit Point)” 管理,避免单分段过大影响性能;
- 缓存策略:将高频词条的词典和倒排列表缓存到内存(如 ES 的 FileCache),减少磁盘 IO。
七、总结
ES 倒排索引的核心是 “以词条为中心,建立词条到文档的映射”,通过分词、标准化、压缩等优化,实现高效的全文检索。其优势在于:
- 关键词检索速度极快,无需扫描所有文档;
- 支持相关性打分,返回结果按匹配度排序;
- 支持短语查询、字段检索、高亮显示等复杂需求。
这也是 ES 成为全文检索引擎首选的根本原因,适用于日志分析、商品搜索、文档检索等场景。
举例子
你可以把倒排索引想象成 图书馆的 “关键词索引卡片” —— 不用一本本翻书找内容,直接按关键词就能快速定位到所有相关的书。
举个生活例子:
假设图书馆有 3 本书(对应 ES 里的 “文档”):
- 书 1:《苹果手机使用指南》(内容:苹果手机的性价比很高,拍照功能强)
- 书 2:《性价比数码选购》(内容:选数码产品,性价比是核心,苹果和华为都值得看)
- 书 3:《华为笔记本测评》(内容:华为笔记本轻薄便携,适合办公)
传统找书方式(正排索引):
你想找 “性价比” 相关的书,得一本本翻开看内容,直到找到包含 “性价比” 的书 —— 效率极低,就像传统数据库逐行扫描。
倒排索引的找书方式:
图书馆管理员提前做了一套 “关键词索引卡片”(对应倒排索引):
- 卡片 1(关键词:苹果):书 1、书 2
- 卡片 2(关键词:性价比):书 1、书 2
- 卡片 3(关键词:手机):书 1
- 卡片 4(关键词:华为):书 2、书 3
- 卡片 5(关键词:笔记本):书 3
现在你要找 “性价比 + 苹果” 的书,管理员直接:
- 拿 “性价比” 卡片,找到书 1、书 2;
- 拿 “苹果” 卡片,找到书 1、书 2;
- 取两者的交集(书 1、书 2),直接把这两本书给你 —— 不用翻任何一本书的内容,速度飞快!
对应到 ES 里:
- 每本书 = 一条文档(比如一条商品信息、一篇文章);
- 关键词 = 文档内容分词后的词语(比如 “苹果”“性价比”);
- 索引卡片 = 倒排索引(关键词→包含该词的所有文档)。
核心一句话:
正排索引是 “找文档→看内容”(慢),倒排索引是 “找关键词→找文档”(快),这就是 ES 搜关键词比数据库快的根本原因!