ES理解
ES就是一套搜索引擎技术,它是elastic技术栈中的一部分。完整的技术栈包括:
- Elasticsearch:用于数据存储、计算和搜索
- Logstash/Beats:用于数据收集
- Kibana:用于数据可视化
ES为什么快?他的搜索逻辑是什么?为什么不用mysql搜索?
如果使用mysql进行搜索,比如我要查询“手机”这个关键字,通常会使用like %手机%,这个操作会导致mysql索引失效(%前缀索引失效),就会进行全表扫描,逐条搜索判断。ES是怎么做的呢,他是使用倒排索引技术,他内置有一个分词算法,会将数据进行分词管理,比如有一个字段为“中国华为手机牛”,就会分成"中国"“华为”“手机”“牛”,然后将分词以后的词条当做索引,再创建文档id列,这样搜索某个关键词就会去文档ID找,因为词条有索引,所以很快,再通过文档id,查询文档。这样因为都使用到了索引所以非常快。

分词器
通常使用ik分词器,支持自定义分词和停止词。这里我使用的时候出现了点小问题,我是在线安装的,查看日志docker logs es | grep -i "dictionary"发现我的字典加载位置和文档不一样。/usr/share/elasticsearch/config/analysis-ik/IKAnalyzer.cfg.xml我的配置类在这里,所以要修改这里的配置文件并添加ext.dic。
1 2 3 4 5 6 7 8 9 10 11 12 13
| <?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd"> <properties> <comment>IK Analyzer 扩展配置</comment> <entry key="ext_dict">ext.dic</entry> <entry key="ext_stopwords"></entry> </properties>
|
AOP通过MQ异步同步数据
当我们通过REST接口对商品数据进行增修删的时候要保证ES同步更新,就和redis保持一致性一样的道理,这里因为我们之前代码都开发好了,所以想到了AOP进行业务解耦,使用AOP监控Controller层,传参情况就两种raw(json)数据和路径传参。json在java中已经按照dto封装好了,路径传参也都会有itemId,那我要做的事情就是拿到商品id就好了,我使用@AfterReturning后置返回通知,这样会在controller层执行完以后再执行拦截类的代码。这样数据都已经到数据库了,然后可以发itemId到MQ,当MQ监听到使用了哪些方法以后,消费者进行处理,可以通过id拿新的数据增加到es,修改或者删除就不多说了。看实现逻辑吧。
先自定义注解。标记需要发送消息的方法,标识那些执行后需要向消息队列发送消息的业务方法
1 2 3 4 5 6
| @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface SeedMessageToMq { String exchange(); String routingKey(); }
|
这里我写了一个命名类,比较优雅。(前期)将es索引库名和MQ收到的工厂和队列以及Routing key进行命名
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public class MqConstants { public static final String SEARCH_ITEM_INDEX = "search_item_index"; public static final String SEARCH_ITEM_DIRCT_EXCHANGE = "search-item-dirct-exchange"; public static final String SEARCH_ITEM_INSERT_KEY = "search-item-insert-key"; public static final String SEARCH_ITEM_INSERT_QUEUE = "search-item-insert-queue"; public static final String SEARCH_ITEM_DELETE_KEY = "search-item-delete-key"; public static final String SEARCH_ITEM_DELETE_QUEUE = "search-item-delete-queue"; public static final String SEARCH_ITEM_UPDATE_KEY = "search-item-update-key"; public static final String SEARCH_ITEM_UPDATE_QUEUE = "search-item-update-queue"; public static final String SEARCH_ITEM_UPDATE_STATUS_KEY = "search-item-update-status-key"; public static final String SEARCH_ITEM_UPDATE_STATUS_QUEUE = "search-item-update-status-queue"; }
|
对controller层需要的代码进行拦截
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| @SeedMessageToMq(exchange = SEARCH_ITEM_DIRCT_EXCHANGE, routingKey = SEARCH_ITEM_INSERT_KEY) @ApiOperation("新增商品") @PostMapping public void saveItem(@RequestBody ItemDTO item) { itemService.save(BeanUtils.copyBean(item,Item.class)); } @SeedMessageToMq(exchange = SEARCH_ITEM_DIRCT_EXCHANGE, routingKey = SEARCH_ITEM_UPDATE_STATUS_KEY) @ApiOperation("更新商品状态") @PutMapping("/status/{id}/{status}") public void updateItemStatus(@PathVariable("id") Long id, @PathVariable("status") Integer status){ Item item = new Item(); item.setId(id); item.setStatus(status); itemService.updateById(item); } @SeedMessageToMq(exchange = SEARCH_ITEM_DIRCT_EXCHANGE, routingKey = SEARCH_ITEM_UPDATE_KEY) @ApiOperation("更新商品") @PutMapping public void updateItem(@RequestBody ItemDTO item) { item.setStatus(null); itemService.updateById(BeanUtils.copyBean(item, Item.class)); } @SeedMessageToMq(exchange = SEARCH_ITEM_DIRCT_EXCHANGE, routingKey = SEARCH_ITEM_DELETE_KEY) @ApiOperation("根据id删除商品") @DeleteMapping("{id}") public void deleteItemById(@PathVariable("id") Long id) { itemService.removeById(id); }
|
然后定义切面和拦截后执行的方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| @Aspect @RequiredArgsConstructor @Component @Slf4j public class RabbitMessageAOP {
@Autowired private RabbitMqHelper rabbitMqHelper; @Pointcut("execution(* com.hmall.item.controller.ItemController.*(..)) && @annotation(com.hmall.item.annotation.SeedMessageToMq)") public void seedMessageToMqPointcut() {} @AfterReturning(pointcut = "seedMessageToMqPointcut()") public void seedMessage(JoinPoint joinPoint) { MethodSignature signature = (MethodSignature) joinPoint.getSignature(); Method method = signature.getMethod(); SeedMessageToMq annotation = method.getAnnotation(SeedMessageToMq.class); String exchange = annotation.exchange(); String routingKey = annotation.routingKey(); Object[] args = joinPoint.getArgs(); System.out.println("参数列表:"+ Arrays.toString(args)); if (args.length == 0){ log.info("没收到任何消息"); return; } Long itemId = null; ObjectMapper objectMapper = new ObjectMapper(); if (routingKey.equals(SEARCH_ITEM_INSERT_KEY) || routingKey.equals(SEARCH_ITEM_UPDATE_KEY)){ ItemDTO itemDTO = objectMapper.convertValue(args[0], ItemDTO.class); itemId = itemDTO.getId(); }else if (routingKey.equals(SEARCH_ITEM_UPDATE_STATUS_KEY) || routingKey.equals(SEARCH_ITEM_DELETE_KEY)){ itemId = objectMapper.convertValue(args[0], Long.class); } log.info("{}商品————准备发送商品ID:{}到MQ", routingKey, itemId); rabbitMqHelper.sendMessage(exchange,routingKey,itemId); } }
|
上面已经对MQ发送了消息,现在进行监听并处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
| @Slf4j @Component
public class ListenerItemMqToEs {
@Autowired private IItemService itemService;
private final RestHighLevelClient client = new RestHighLevelClient(RestClient.builder( HttpHost.create("http://192.168.219.128:9200") ));
@RabbitListener(bindings = @QueueBinding( value = @Queue(value = SEARCH_ITEM_INSERT_QUEUE, durable = "true"), exchange = @Exchange(value = SEARCH_ITEM_DIRCT_EXCHANGE), key = SEARCH_ITEM_INSERT_KEY) ) public void listenerInsertItem(Long itemId) { log.info("收到商品ID{},准备添加", itemId); Item item = itemService.getById(itemId); ItemDoc itemDoc = BeanUtils.copyBean(item, ItemDoc.class); String doc = JSONUtil.toJsonStr(itemDoc); IndexRequest indexRequest = new IndexRequest(SEARCH_ITEM_INDEX).id(itemDoc.getId()).source(doc, XContentType.JSON); try { client.index(indexRequest, RequestOptions.DEFAULT); } catch (IOException e) { throw new RuntimeException(e); } }
@RabbitListener(bindings = @QueueBinding( value = @Queue(value = SEARCH_ITEM_DELETE_QUEUE, durable = "true"), exchange = @Exchange(value = SEARCH_ITEM_DIRCT_EXCHANGE), key = SEARCH_ITEM_DELETE_KEY) ) public void listenerDeleteItem(Long itemId) { log.info("收到商品ID{},准备删除", itemId); DeleteRequest deleteRequest = new DeleteRequest(SEARCH_ITEM_INDEX).id(String.valueOf(itemId)); try { client.delete(deleteRequest, RequestOptions.DEFAULT); } catch (IOException e) { throw new RuntimeException(e); } }
@RabbitListener(bindings = @QueueBinding( value = @Queue(value = SEARCH_ITEM_UPDATE_STATUS_QUEUE, durable = "true"), exchange = @Exchange(value = SEARCH_ITEM_DIRCT_EXCHANGE), key = SEARCH_ITEM_UPDATE_STATUS_KEY) ) public void listenerUpdateStatusItem(Long itemId) { Item item = itemService.getById(itemId); if (item.getStatus() == 3) { log.info("收到商品状态为3:ID{},准备删除", itemId); listenerDeleteItem(itemId); return; } log.info("收到商品ID{},准备更新状态", itemId); UpdateRequest updateRequest = new UpdateRequest(SEARCH_ITEM_INDEX, String.valueOf(itemId)); updateRequest.doc("status", item.getStatus()); try { client.update(updateRequest, RequestOptions.DEFAULT); } catch (IOException e) { throw new RuntimeException(e); } }
@RabbitListener(bindings = @QueueBinding( value = @Queue(value = SEARCH_ITEM_UPDATE_QUEUE, durable = "true"), exchange = @Exchange(value = SEARCH_ITEM_DIRCT_EXCHANGE), key = SEARCH_ITEM_UPDATE_KEY) ) public void listenerUpdateItem(Long itemId) { Item item = itemService.getById(itemId); log.info("收到商品ID{},准备更新", itemId); ItemDoc itemDoc = BeanUtils.copyBean(item, ItemDoc.class); String doc = JSONUtil.toJsonStr(itemDoc); UpdateRequest updateRequest = new UpdateRequest(SEARCH_ITEM_INDEX, String.valueOf(itemId)); updateRequest.upsert(doc, XContentType.JSON); try { client.update(updateRequest, RequestOptions.DEFAULT); } catch (IOException e) { throw new RuntimeException(e); } } }
|
RestClient查询
这里我直接用es处理项目中搜索框接口了,es数据提前做好了预热
/search/list
1 2 3 4 5
| @ApiOperation("搜索商品") @GetMapping("/list") public PageDTO<ItemDTO> search(ItemPageQuery query) { return searchService.searchItemFilterEs(query); }
|
实现层
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
| @Override public PageDTO<ItemDTO> searchItemFilterEs(ItemPageQuery query) { SearchRequest request = new SearchRequest("items_new"); SearchSourceBuilder source = buildSearchRequest(request,query); Integer pageNo = query.getPageNo(); Integer pageSize = query.getPageSize(); source.from((pageNo - 1) * pageSize).size(pageSize); SearchResponse response; try { response = client.search(request, RequestOptions.DEFAULT); }catch (Exception e){ throw new RuntimeException(e); } return handleResponse(response); }
private SearchSourceBuilder buildSearchRequest(SearchRequest request,ItemPageQuery query) { BoolQueryBuilder bool = QueryBuilders.boolQuery(); if (StrUtil.isNotEmpty(query.getKey())){ bool.must(QueryBuilders.matchQuery("name", query.getKey())); } if (StrUtil.isNotEmpty(query.getCategory())){ bool.filter(QueryBuilders.termQuery("category.keyword", query.getCategory())); } if (StrUtil.isNotEmpty(query.getBrand())){ bool.filter(QueryBuilders.termQuery("brand.keyword", query.getBrand())); } bool.filter(QueryBuilders.rangeQuery("price").gt(query.getMinPrice()).lt(query.getMaxPrice())); SearchSourceBuilder source = request.source(); source.query(bool); String sortBy = query.getSortBy(); if (StrUtil.isNotEmpty(sortBy)) { if (query.getIsAsc()) source.sort(sortBy, SortOrder.ASC); else source.sort(sortBy, SortOrder.DESC); } source.highlighter( SearchSourceBuilder.highlight() .field("name") ); return source; }
private PageDTO<ItemDTO> handleResponse(SearchResponse search) { Page<ItemDTO> page = new Page<>(); SearchHits searchHits = search.getHits(); assert searchHits.getTotalHits() != null; long total = searchHits.getTotalHits().value; page.setTotal(total); SearchHit[] hits = searchHits.getHits(); List<ItemDTO> records = new ArrayList<>(); for (SearchHit hit : hits){ String source = hit.getSourceAsString(); ItemDTO itemDTO = JSONUtil.toBean(source, ItemDTO.class); records.add(itemDTO); Map<String, HighlightField> hfs = hit.getHighlightFields(); if (CollUtils.isNotEmpty(hfs)) { HighlightField hf = hfs.get("name"); if (hf != null) { String hfName = hf.getFragments()[0].toString(); itemDTO.setName(hfName); } } } page.setRecords(records); page.setSize(pageSize); return PageDTO.of(page); }
|
数据聚合
这里主要是一个动态的过滤条件搜索。目前搜索的商品有什么过滤条件就有啥。比如选了品牌是小米,生效的分类就只有手机 拉杆箱 和 电视

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| @Override
public JSONObject filter(ItemPageQuery query) { SearchRequest request = new SearchRequest("items_new"); SearchSourceBuilder source = buildSearchRequest(request,query); source.size(0); source.aggregation(AggregationBuilders.terms("brand_agg").field("brand.keyword").size(20)) .aggregation(AggregationBuilders.terms("category_agg").field("category.keyword").size(20)); SearchResponse response; try { response = client.search(request, RequestOptions.DEFAULT); }catch (Exception e){ throw new RuntimeException(e); } Aggregations aggregations = response.getAggregations(); if (aggregations == null) { return new JSONObject(); } ParsedTerms brandAgg = aggregations.get("brand_agg"); ParsedTerms categoryAgg = aggregations.get("category_agg"); JSONObject result = new JSONObject(); result.putByPath("brand", getBuckets(brandAgg)); result.putByPath("category",getBuckets(categoryAgg)); return result; }
private List<String> getBuckets(ParsedTerms terms) { if (terms == null) { return new ArrayList<>(); } List<String> list = new ArrayList<>(); for (Terms.Bucket bucket : terms.getBuckets()) { list.add(bucket.getKeyAsString()); } return list; }
|