ES理解

ES就是一套搜索引擎技术,它是elastic技术栈中的一部分。完整的技术栈包括:

  • Elasticsearch:用于数据存储、计算和搜索
  • Logstash/Beats:用于数据收集
  • Kibana:用于数据可视化

ES为什么快?他的搜索逻辑是什么?为什么不用mysql搜索?

如果使用mysql进行搜索,比如我要查询“手机”这个关键字,通常会使用like %手机%,这个操作会导致mysql索引失效(%前缀索引失效),就会进行全表扫描,逐条搜索判断。ES是怎么做的呢,他是使用倒排索引技术,他内置有一个分词算法,会将数据进行分词管理,比如有一个字段为“中国华为手机牛”,就会分成"中国"“华为”“手机”“牛”,然后将分词以后的词条当做索引,再创建文档id列,这样搜索某个关键词就会去文档ID找,因为词条有索引,所以很快,再通过文档id,查询文档。这样因为都使用到了索引所以非常快。

image-20250815100244629

分词器

通常使用ik分词器,支持自定义分词和停止词。这里我使用的时候出现了点小问题,我是在线安装的,查看日志docker logs es | grep -i "dictionary"发现我的字典加载位置和文档不一样。/usr/share/elasticsearch/config/analysis-ik/IKAnalyzer.cfg.xml我的配置类在这里,所以要修改这里的配置文件并添加ext.dic。

1
2
3
4
5
6
7
8
9
10
11
12
13
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd">
<properties>
<comment>IK Analyzer 扩展配置</comment>
<!--用户可以在这里配置自己的扩展字典 -->
<entry key="ext_dict">ext.dic</entry>
<!--用户可以在这里配置自己的扩展停止词字典-->
<entry key="ext_stopwords"></entry>
<!--用户可以在这里配置远程扩展字典 -->
<!-- <entry key="remote_ext_dict">words_location</entry> -->
<!--用户可以在这里配置远程扩展停止词字典-->
<!-- <entry key="remote_ext_stopwords">words_location</entry> -->
</properties>

AOP通过MQ异步同步数据

当我们通过REST接口对商品数据进行增修删的时候要保证ES同步更新,就和redis保持一致性一样的道理,这里因为我们之前代码都开发好了,所以想到了AOP进行业务解耦,使用AOP监控Controller层,传参情况就两种raw(json)数据和路径传参。json在java中已经按照dto封装好了,路径传参也都会有itemId,那我要做的事情就是拿到商品id就好了,我使用@AfterReturning后置返回通知,这样会在controller层执行完以后再执行拦截类的代码。这样数据都已经到数据库了,然后可以发itemId到MQ,当MQ监听到使用了哪些方法以后,消费者进行处理,可以通过id拿新的数据增加到es,修改或者删除就不多说了。看实现逻辑吧。

先自定义注解。标记需要发送消息的方法,标识那些执行后需要向消息队列发送消息的业务方法

1
2
3
4
5
6
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface SeedMessageToMq {
String exchange(); //发送交换机
String routingKey(); //发送路由键
}

这里我写了一个命名类,比较优雅。(前期)将es索引库名和MQ收到的工厂和队列以及Routing key进行命名

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class MqConstants {
//es 索引库名
public static final String SEARCH_ITEM_INDEX = "search_item_index";
//MQ工厂
public static final String SEARCH_ITEM_DIRCT_EXCHANGE = "search-item-dirct-exchange";
//routingKey && queue
//增
public static final String SEARCH_ITEM_INSERT_KEY = "search-item-insert-key";
public static final String SEARCH_ITEM_INSERT_QUEUE = "search-item-insert-queue";
//删
public static final String SEARCH_ITEM_DELETE_KEY = "search-item-delete-key";
public static final String SEARCH_ITEM_DELETE_QUEUE = "search-item-delete-queue";
//改
public static final String SEARCH_ITEM_UPDATE_KEY = "search-item-update-key";
public static final String SEARCH_ITEM_UPDATE_QUEUE = "search-item-update-queue";
//改状态
public static final String SEARCH_ITEM_UPDATE_STATUS_KEY = "search-item-update-status-key";
public static final String SEARCH_ITEM_UPDATE_STATUS_QUEUE = "search-item-update-status-queue";
}

对controller层需要的代码进行拦截

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@SeedMessageToMq(exchange = SEARCH_ITEM_DIRCT_EXCHANGE, routingKey = SEARCH_ITEM_INSERT_KEY)
@ApiOperation("新增商品")
@PostMapping
public void saveItem(@RequestBody ItemDTO item) {
// 新增
itemService.save(BeanUtils.copyBean(item,Item.class));
}
@SeedMessageToMq(exchange = SEARCH_ITEM_DIRCT_EXCHANGE, routingKey = SEARCH_ITEM_UPDATE_STATUS_KEY)
@ApiOperation("更新商品状态")
@PutMapping("/status/{id}/{status}")
public void updateItemStatus(@PathVariable("id") Long id, @PathVariable("status") Integer status){
Item item = new Item();
item.setId(id);
item.setStatus(status);
itemService.updateById(item);
}
@SeedMessageToMq(exchange = SEARCH_ITEM_DIRCT_EXCHANGE, routingKey = SEARCH_ITEM_UPDATE_KEY)
@ApiOperation("更新商品")
@PutMapping
public void updateItem(@RequestBody ItemDTO item) {
// 不允许修改商品状态,所以强制设置为null,更新时,就会忽略该字段
item.setStatus(null);
// 更新
itemService.updateById(BeanUtils.copyBean(item, Item.class));
}
@SeedMessageToMq(exchange = SEARCH_ITEM_DIRCT_EXCHANGE, routingKey = SEARCH_ITEM_DELETE_KEY)
@ApiOperation("根据id删除商品")
@DeleteMapping("{id}")
public void deleteItemById(@PathVariable("id") Long id) {
itemService.removeById(id);
}

然后定义切面和拦截后执行的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@Aspect
@RequiredArgsConstructor
@Component
@Slf4j
public class RabbitMessageAOP {


@Autowired
private RabbitMqHelper rabbitMqHelper;
//定义切面
@Pointcut("execution(* com.hmall.item.controller.ItemController.*(..)) && @annotation(com.hmall.item.annotation.SeedMessageToMq)")
public void seedMessageToMqPointcut() {}

// 后置返回通知:在目标方法成功执行后触发
@AfterReturning(pointcut = "seedMessageToMqPointcut()")
public void seedMessage(JoinPoint joinPoint) {
//获取目标方法的签名
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
//获取目标方法对象
Method method = signature.getMethod();
//获取方法上的@SeedMessageToMq注解
SeedMessageToMq annotation = method.getAnnotation(SeedMessageToMq.class);
//从注解中获取配置信息
String exchange = annotation.exchange();
String routingKey = annotation.routingKey();
//获取方法参数(如果需要发送参数作为消息内容)
Object[] args = joinPoint.getArgs();
System.out.println("参数列表:"+ Arrays.toString(args));
if (args.length == 0){
log.info("没收到任何消息");
return;
}
//使用rabbitTemplate发送消息到指定的exchange和routingKey
Long itemId = null;
ObjectMapper objectMapper = new ObjectMapper();
if (routingKey.equals(SEARCH_ITEM_INSERT_KEY) || routingKey.equals(SEARCH_ITEM_UPDATE_KEY)){
ItemDTO itemDTO = objectMapper.convertValue(args[0], ItemDTO.class);
itemId = itemDTO.getId();
}else if (routingKey.equals(SEARCH_ITEM_UPDATE_STATUS_KEY) || routingKey.equals(SEARCH_ITEM_DELETE_KEY)){
//更新商品状态
itemId = objectMapper.convertValue(args[0], Long.class);
}
log.info("{}商品————准备发送商品ID:{}到MQ", routingKey, itemId);
rabbitMqHelper.sendMessage(exchange,routingKey,itemId);
}
}

上面已经对MQ发送了消息,现在进行监听并处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
@Slf4j
@Component
//监听商品变化的消息 并修改对应的es -> 数据同步
public class ListenerItemMqToEs {

@Autowired
private IItemService itemService;

private final RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(
HttpHost.create("http://192.168.219.128:9200")
));

@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = SEARCH_ITEM_INSERT_QUEUE, durable = "true"),
exchange = @Exchange(value = SEARCH_ITEM_DIRCT_EXCHANGE),
key = SEARCH_ITEM_INSERT_KEY)
)
public void listenerInsertItem(Long itemId) {
log.info("收到商品ID{},准备添加", itemId);
//获取商品信息
Item item = itemService.getById(itemId);
//转成es所需json
ItemDoc itemDoc = BeanUtils.copyBean(item, ItemDoc.class);
String doc = JSONUtil.toJsonStr(itemDoc);
//准备request对象发送es
IndexRequest indexRequest = new IndexRequest(SEARCH_ITEM_INDEX).id(itemDoc.getId()).source(doc, XContentType.JSON);
try {
client.index(indexRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = SEARCH_ITEM_DELETE_QUEUE, durable = "true"),
exchange = @Exchange(value = SEARCH_ITEM_DIRCT_EXCHANGE),
key = SEARCH_ITEM_DELETE_KEY)
)
public void listenerDeleteItem(Long itemId) {
log.info("收到商品ID{},准备删除", itemId);
//获取商品信息
DeleteRequest deleteRequest = new DeleteRequest(SEARCH_ITEM_INDEX).id(String.valueOf(itemId));
try {
client.delete(deleteRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = SEARCH_ITEM_UPDATE_STATUS_QUEUE, durable = "true"),
exchange = @Exchange(value = SEARCH_ITEM_DIRCT_EXCHANGE),
key = SEARCH_ITEM_UPDATE_STATUS_KEY)
)
public void listenerUpdateStatusItem(Long itemId) {
//获取商品信息
Item item = itemService.getById(itemId);
//状态是3的删除
if (item.getStatus() == 3) {
log.info("收到商品状态为3:ID{},准备删除", itemId);
listenerDeleteItem(itemId);
return;
}
log.info("收到商品ID{},准备更新状态", itemId);
//准备request对象发送es
UpdateRequest updateRequest = new UpdateRequest(SEARCH_ITEM_INDEX, String.valueOf(itemId));
updateRequest.doc("status", item.getStatus());
try {
client.update(updateRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = SEARCH_ITEM_UPDATE_QUEUE, durable = "true"),
exchange = @Exchange(value = SEARCH_ITEM_DIRCT_EXCHANGE),
key = SEARCH_ITEM_UPDATE_KEY)
)
public void listenerUpdateItem(Long itemId) {
//获取商品信息
Item item = itemService.getById(itemId);
log.info("收到商品ID{},准备更新", itemId);
//转成es所需json
ItemDoc itemDoc = BeanUtils.copyBean(item, ItemDoc.class);
String doc = JSONUtil.toJsonStr(itemDoc);
//准备request对象发送es
UpdateRequest updateRequest = new UpdateRequest(SEARCH_ITEM_INDEX, String.valueOf(itemId));
updateRequest.upsert(doc, XContentType.JSON);
try {
client.update(updateRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

RestClient查询

这里我直接用es处理项目中搜索框接口了,es数据提前做好了预热

/search/list

1
2
3
4
5
@ApiOperation("搜索商品")
@GetMapping("/list")
public PageDTO<ItemDTO> search(ItemPageQuery query) {
return searchService.searchItemFilterEs(query);
}

实现层

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
@Override
public PageDTO<ItemDTO> searchItemFilterEs(ItemPageQuery query) {
SearchRequest request = new SearchRequest("items_new");
SearchSourceBuilder source = buildSearchRequest(request,query);
Integer pageNo = query.getPageNo();
Integer pageSize = query.getPageSize();
//分页参数
source.from((pageNo - 1) * pageSize).size(pageSize);
SearchResponse response;
// 3.发送请求
try {
response = client.search(request, RequestOptions.DEFAULT);
}catch (Exception e){
throw new RuntimeException(e);
}
return handleResponse(response);
}
//组织query参数
private SearchSourceBuilder buildSearchRequest(SearchRequest request,ItemPageQuery query) {
//搜索条件参数
BoolQueryBuilder bool = QueryBuilders.boolQuery();
if (StrUtil.isNotEmpty(query.getKey())){
bool.must(QueryBuilders.matchQuery("name", query.getKey()));
}
if (StrUtil.isNotEmpty(query.getCategory())){
bool.filter(QueryBuilders.termQuery("category.keyword", query.getCategory()));
}
if (StrUtil.isNotEmpty(query.getBrand())){
bool.filter(QueryBuilders.termQuery("brand.keyword", query.getBrand()));
}
bool.filter(QueryBuilders.rangeQuery("price").gt(query.getMinPrice()).lt(query.getMaxPrice()));
SearchSourceBuilder source = request.source();
source.query(bool);
//排序
String sortBy = query.getSortBy();
if (StrUtil.isNotEmpty(sortBy)) {
if (query.getIsAsc()) source.sort(sortBy, SortOrder.ASC);
else source.sort(sortBy, SortOrder.DESC);
}
//高亮
source.highlighter(
SearchSourceBuilder.highlight()
.field("name")
);
return source;
}
//处理SearchResponse转PageDTO
private PageDTO<ItemDTO> handleResponse(SearchResponse search) {
Page<ItemDTO> page = new Page<>();
SearchHits searchHits = search.getHits();
assert searchHits.getTotalHits() != null;
long total = searchHits.getTotalHits().value;
page.setTotal(total);
SearchHit[] hits = searchHits.getHits();
List<ItemDTO> records = new ArrayList<>();
for (SearchHit hit : hits){
String source = hit.getSourceAsString();
ItemDTO itemDTO = JSONUtil.toBean(source, ItemDTO.class);
records.add(itemDTO);
// 5.获取高亮结果
Map<String, HighlightField> hfs = hit.getHighlightFields();
if (CollUtils.isNotEmpty(hfs)) {
// 5.1.有高亮结果,获取name的高亮结果
HighlightField hf = hfs.get("name");
if (hf != null) {
//获取第一个高亮结果片段,就是商品名称的高亮值
String hfName = hf.getFragments()[0].toString();
itemDTO.setName(hfName);
}
}
}
page.setRecords(records);
page.setSize(pageSize);
return PageDTO.of(page);
}

数据聚合

这里主要是一个动态的过滤条件搜索。目前搜索的商品有什么过滤条件就有啥。比如选了品牌是小米,生效的分类就只有手机 拉杆箱 和 电视

image-20250821140415010

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Override
//聚合参数
public JSONObject filter(ItemPageQuery query) {
SearchRequest request = new SearchRequest("items_new");
SearchSourceBuilder source = buildSearchRequest(request,query);
source.size(0);
source.aggregation(AggregationBuilders.terms("brand_agg").field("brand.keyword").size(20))
.aggregation(AggregationBuilders.terms("category_agg").field("category.keyword").size(20));
//发送请求
SearchResponse response;
try {
response = client.search(request, RequestOptions.DEFAULT);
}catch (Exception e){
throw new RuntimeException(e);
}
//解析聚合结果
Aggregations aggregations = response.getAggregations();
if (aggregations == null) {
return new JSONObject();
}
ParsedTerms brandAgg = aggregations.get("brand_agg");
ParsedTerms categoryAgg = aggregations.get("category_agg");
JSONObject result = new JSONObject();
result.putByPath("brand", getBuckets(brandAgg));
result.putByPath("category",getBuckets(categoryAgg));
return result;
}
//ParsedTerms转集合
private List<String> getBuckets(ParsedTerms terms) {
if (terms == null) {
return new ArrayList<>();
}
List<String> list = new ArrayList<>();
for (Terms.Bucket bucket : terms.getBuckets()) {
list.add(bucket.getKeyAsString());
}
return list;
}