微服务实战——ElasticSearch(保存)

📅 2025-11-17 03:00:11 ✍️ admin 👁️ 7002 ❤️ 509
微服务实战——ElasticSearch(保存)

商品上架——ElasticSearch(保存)

0.商城架构图

1.商品Mapping

分析:商品上架在 es 中是存 sku 还是 spu ?

检索的时候输入名字,是需要按照 sku 的 title 进行全文检索的检索使用商品规格,规格是 spu 的公共属性,每个 spu 是一样的按照分类 id 进去的都是直接列出 spu 的,还可以切换。我们如果将 sku 的全量信息保存到 es 中(包括 spu 属性)就太多量字段了。我们如果将 spu 以及他包含的 sku 信息保存到 es 中,也可以方便检索。但是 sku 属于spu 的级联对象,在 es 中需要 nested 模型,这种性能差点。但是存储与检索我们必须性能折中。如果我们分拆存储,spu 和 attr 一个索引,sku 单独一个索引可能涉及的问题。

检索商品的名字,如“手机”,对应的 spu 有很多,我们要分析出这些 spu 的所有关联属性,再做一次查询,就必须将所有 spu_id 都发出去。假设有 1 万个数据,数据传输一次就10000*4=4MB;并发情况下假设 1000 检索请求,那就是 4GB 的数据,传输阻塞时间会很长,业务更加无法继续。所以,我们如下设计,这样才是文档区别于关系型数据库的地方,宽表设计,不能去考虑数据库范式。向ES添加商品属性映射

向ES添加商品属性映射

PUT product

{

"mappings":{

"properties": {

"skuId":{

"type": "long"

},

"spuId":{

"type": "keyword"

},

"skuTitle": {

"type": "text",

"analyzer": "ik_smart"

},

"skuPrice": {

"type": "keyword"

},

"skuImg":{

"type": "keyword",

"index": false,

"doc_values": false

},

"saleCount":{

"type":"long"

},

"hasStock": {

"type": "boolean"

},

"hotScore": {

"type": "long"

},

"brandId": {

"type": "long"

},

"catalogId": {

"type": "long"

},

"brandName": {

"type": "keyword",

"index": false,

"doc_values": false

},

"brandImg":{

"type": "keyword",

"index": false,

"doc_values": false

},

"catalogName": {

"type": "keyword",

"index": false,

"doc_values": false

},

"attrs": {

"type": "nested",

"properties": {

"attrId": {

"type": "long"

},

"attrName": {

"type": "keyword",

"index": false,

"doc_values": false

},

"attrValue": {

"type": "keyword"

}

}

}

}

}

}

index :

默认 true ,如果为 false ,表示该字段不会被索引,但是检索结果里面有,但字段本身不能

当做检索条件。

doc_values :

默认 true ,设置为 false ,表示不可以做排序、聚合以及脚本操作,这样更节省磁盘空间。

还可以通过设定 doc_values 为 true , index 为 false 来让字段不能被搜索但可以用于排序、聚合以及脚本操作:

spu在es中的存储模型分析总结

如果每个sku都存储规格参数,会有冗余存储,因为每个spu对应的sku的规格参数都一样。但是如果将规格参数单独建立索引会出现检索时出现大量数据传输的问题,会阻塞网络因此我们选用第一种存储模型,以空间换时间。

2.上架细节

上架是将后台的商品放在 es 中可以提供检索和查询功能:

hasStock:代表是否有库存。默认上架的商品都有库存。如果库存无货的时候才需要更新一下 es库存补上以后,也需要重新更新一下 eshotScore 是热度值,我们只模拟使用点击率更新热度。点击率增加到一定程度才更新热度值。下架就是从 es 中移除检索项,以及修改 mysql 状态

商品上架步骤:

先在 es 中按照之前的 mapping 信息,建立 product 索引。点击上架,查询出所有 sku 的信息,保存到 es 中es 保存成功返回,更新数据库的上架状态信息

3.数据一致性

商品无库存的时候需要更新 es 的库存信息商品有库存也要更新 es 的信息

4.ES中的数组扁平化

关于“nested”,Nested datatype | Elasticsearch Guide [7.6] | Elastic

ES中数组的扁平化处理:

对象数组的扁平化:

内部对象字段数组的工作方式与您预期的不同。Lucene没有内部对象的概念,所以Elasticsearch将对象层次结构简化为字段名和值的简单列表。例如,以下文件:

PUT my_index/_doc/1

{

"group" : "fans",

"user" : [

{

"first" : "John",

"last" : "Smith"

},

{

"first" : "Alice",

"last" : "White"

}

]

}

在内部将转换成一个文档,看起来是这样的:

{

"group" : "fans",

"user.first" : [ "alice", "john" ],

"user.last" : [ "smith", "white" ]

}

查询my_index的映射

GET my_index/_mapping

{

"my_index" : {

"mappings" : {

"properties" : {

"group" : {

"type" : "text",

"fields" : {

"keyword" : {

"type" : "keyword",

"ignore_above" : 256

}

}

},

"user" : {

"properties" : {

"first" : {

"type" : "text",

"fields" : {

"keyword" : {

"type" : "keyword",

"ignore_above" : 256

}

}

},

"last" : {

"type" : "text",

"fields" : {

"keyword" : {

"type" : "keyword",

"ignore_above" : 256

}

}

}

}

}

}

}

}

}

user.first和user.last字段被平铺成多值字段,alice和white之间的关联也丢失了。在查询alice和smith时,这个文档将将发生错误的匹配

GET my_index/_search

{

"query": {

"bool": {

"must": [

{ "match": { "user.first": "Alice" }},

{ "match": { "user.last": "Smith" }}

]

}

}

}

所想要的只是user.first="Alice",user.last="Smith",本身是查询不到的,但是却查询出来了两条结果:

{

"took" : 49,

"timed_out" : false,

"_shards" : {

"total" : 1,

"successful" : 1,

"skipped" : 0,

"failed" : 0

},

"hits" : {

"total" : {

"value" : 1,

"relation" : "eq"

},

"max_score" : 0.5753642,

"hits" : [

{

"_index" : "my_index",

"_type" : "_doc",

"_id" : "1",

"_score" : 0.5753642,

"_source" : {

"group" : "fans",

"user" : [

{

"first" : "John",

"last" : "Smith"

},

{

"first" : "Alice",

"last" : "White"

}

]

}

}

]

}

}

删除“my_index”索引

DELETE my_index

重新创建my_index索引

PUT my_index

{

"mappings": {

"properties": {

"user": {

"type": "nested"

}

}

}

}

重新插入数据

PUT my_index/_doc/1

{

"group" : "fans",

"user" : [

{

"first" : "John",

"last" : "Smith"

},

{

"first" : "Alice",

"last" : "White"

}

]

}

再次查询user.first="Alice",user.last="Smith"时,查询不到数据

GET my_index/_search

{

"query": {

"bool": {

"must": [

{ "match": { "user.first": "Alice" }},

{ "match": { "user.last": "Smith" }}

]

}

}

}

查询结果:

{

"took" : 1,

"timed_out" : false,

"_shards" : {

"total" : 1,

"successful" : 1,

"skipped" : 0,

"failed" : 0

},

"hits" : {

"total" : {

"value" : 0,

"relation" : "eq"

},

"max_score" : null,

"hits" : [ ]

}

}

5.商品上架接口实现

商品上架需要在es中保存spu信息并更新spu的状态信息,由于SpuInfoEntity与索引的数据模型并不对应,所以我们要建立专门的vo进行数据传输

1、商品上架接口

接口文档:商品系统 - 20、商品上架

POST /product/spuinfo/{spuId}/up

请求参数

分页数据

响应数据

{ "msg": "success", "code": 0 }

功能效果

新增“com.cwh.common.to.es.SkuEsModel”类,代码如下:

package com.cwh.common.to.es;

import lombok.Data;

import java.math.BigDecimal;

import java.util.List;

@Data

public class SkuEsModel {

private Long skuId;

private Long spuId;

private String skuTitle;

private BigDecimal skuPrice;

private String skuImg;

private Long saleCount;

private boolean hasStock;

private Long hotScore;

private Long brandId;

private Long catalogId;

private String brandName;

private String brandImg;

private String catalogName;

private List attrs;

@Data

public static class Attr{

private Long attrId;

private String attrName;

private String attrValue;

}

}

编写商品上架的接口

修改“com.cwh.gulimall.product.controller.SpuInfoController”类,代码如下:

@PostMapping("spuinfo/{spuId}/up")

public R spuUp(@PathVariable("spuId") Long spuId){

spuInfoService.up(spuId);

return R.ok();

}

修改“com.cwh.gulimall.product.service.SpuInfoService”类,代码如下:

/**

* 商品上架

*

* @param spuId

*/

void up(Long spuId);

由于每个spu对应的各个sku的规格参数相同,因此我们要将查询规格参数提前,只查询一次

修改“com.cwh.gulimall.product.service.impl.SpuInfoServiceImpl”类,代码如下:

@Override

public void up(Long spuId) {

// 1、查出当前spuId对应的sku信息,品牌名字

List skus = skuInfoService.getSkuBySpuId(spuId);

List skuIdList = skus.stream().map(SkuInfoEntity::getSkuId).collect(Collectors.toList());

// 2.1、发送远程调用,库存系统查询是否有库存

Map stockMap = null;

try {

R r = wareFeignService.getSkusHasStock(skuIdList);

TypeReference> typeReference = new TypeReference>() {

};

stockMap = r.getData(typeReference).stream().collect(Collectors.toMap(SkuHasStockVo::getSkuId, SkuHasStockVo::getHasStock));

} catch (Exception e) {

log.error("库存服务查询异常,原因:", e);

}

// 2.4、查询当前sku的所有可以被用来检索的规格属性

List baseAttrs = productAttrValueService.baseAttrListForSpu(spuId);

List attrIds = baseAttrs.stream().map(attr -> attr.getAttrId()).collect(Collectors.toList());

List searchAttrIds = attrService.selectSearchAttrs(attrIds);

Set idSet = new HashSet<>(searchAttrIds);

List attrsList = baseAttrs.stream().filter(item -> idSet.contains(item.getAttrId())).map(item -> {

SkuEsModel.Attrs attrs1 = new SkuEsModel.Attrs();

BeanUtils.copyProperties(item, attrs1);

return attrs1;

}).collect(Collectors.toList());

// 2、封装每个sku的信息

Map finalStockMap = stockMap;

List upProducts = skus.stream().map(sku -> {

// 组装需要的数据

SkuEsModel esModel = new SkuEsModel();

BeanUtils.copyProperties(sku, esModel);

esModel.setSkuPrice(sku.getPrice());

esModel.setSkuImg(sku.getSkuDefaultImg());

// 2.1、是否有库存 hasStock,hotScore

if (finalStockMap == null) {

esModel.setHasStock(true);

} else {

esModel.setHasStock(finalStockMap.get(sku.getSkuId()));

}

// 2.2、热度评分。0

esModel.setHotScore(0L);

// 2.3、查询品牌和分类的名字信息

BrandEntity brand = brandService.getById(esModel.getBrandId());

esModel.setBrandName(brand.getName());

esModel.setBrandImg(brand.getLogo());

CategoryEntity category = categoryService.getById(esModel.getCatalogId());

esModel.setCatalogName(category.getName());

// 2.4、设置检索属性

esModel.setAttrs(attrsList);

System.out.println("======================esModel" + esModel);

return esModel;

}).collect(Collectors.toList());

// 3、将数据发送给es进行保存

R r = searchFeignService.productStatusUp(upProducts);

System.out.println("=========================" + r);

if (r.getCode() == 0) {

//远程调用成功

// 3.1、修改当前spu的状态

System.out.println("修改当前spu的状态");

baseMapper.updateSpuStatus(spuId, ProductConstant.StatusEnum.SPU_UP.getCode());

} else {

// 远程调用失败

// TODO 3.2、重复调用?接口幂等性;重试机制

/**

* Feign调用流程:

* 1、构造请求数据,将对象转为json

* RequestTemplate template = buildTemplateFromArgs.create(argv);

* 2、发送请求进行执行(执行成功会解码响应数据)

* executeAndDecode(template)

* 3、执行请求会有重试机制

* while(true){

* try{

* executeAndDecode(template);

* }catch(){

* retryer.continueOrPropagate(e);

* throw ex;

* continue;

* }

* }

*/

}

}

2、查出当前spuId对应的sku信息,品牌名字

修改“com.cwh.gulimall.product.service.SkuInfoService”类,代码如下:

/**

* 查出当前spuId对应的sku信息

*

* @param spuId

* @return

*/

List getSkuBySpuId(Long spuId);

修改“com.cwh.gulimall.product.service.impl.SkuInfoServiceImpl”类,代码如下:

@Override

public List getSkuBySpuId(Long spuId) {

List list = this.list(new QueryWrapper().eq("spu_id", spuId));

return list;

}

3、封装每个sku的信息

3.1、发送远程调用,库存系统查询是否有库存

修改“com.cwh.gulimall.product.feign.WareFeignService”类,代码如下:

package com.cwh.gulimall.product.feign;

import com.cwh.common.utils.R;

import com.cwh.gulimall.product.vo.SkuHasStockVo;

import org.springframework.cloud.openfeign.FeignClient;

import org.springframework.web.bind.annotation.PostMapping;

import org.springframework.web.bind.annotation.RequestBody;

import java.util.List;

@FeignClient("gulimall-ware")

public interface WareFeignService {

/**

* 1、R设计的时候可以加上泛型

* 2、直接返回我们想要的结果

* 3、自己封装返回结果

* @param skuIds

* @return

*/

@PostMapping("/ware/waresku/hasStock")

R getSkusHasStock(@RequestBody List skuIds);

}

修改”com.cwh.gulimall.ware.controller.WareSkuController”,代码如下:

/**

* 查询sku是否有库存

*/

@PostMapping("hasStock")

public R getSkusHasStock(@RequestBody List skuIds){

// sku_id, stock

List vos = wareSkuService.getSkusHasStock(skuIds);

return R.ok().setData(vos);

}

修改”com.cwh.gulimall.ware.service.WareSkuService”类,代码如下:

List getSkusHasStock(List skuIds);

修改”com.cwh.gulimall.ware.service.WareSkuService”类,代码如下:

@Override

public List getSkusHasStock(List skuIds) {

List collect = skuIds.stream().map(skuId -> {

SkuHasStockVo vo = new SkuHasStockVo();

// 查询sku的总库存量

Long count = baseMapper.getSkuStock(skuId);

vo.setSkuId(skuId);

vo.setHasStock(count == null ? false : count > 0);

return vo;

}).collect(Collectors.toList());

return collect;

}

修改“com.cwh.gulimall.ware.dao.WareSkuDao”类,代码如下

Long getSkuStock(Long skuId);

修改“com.cwh.gulimall.ware.dao.WareSkuDao.xml”类,代码如下

2.2、查询当前sku的所有可以被用来检索的规格属性

修改“com.cwh.gulimall.product.service.AttrService”类,代码如下:

/**

* 在指定的所有属性集合里面,挑出检索属性

*

* @param attrIds

* @return

*/

List selectSearchAttrs(List attrIds);

修改“com.cwh.gulimall.product.service.impl.AttrServiceImpl”类,代码如下:

@Override

public List selectSearchAttrs(List attrIds) {

return baseMapper.selectSearchAttrIds(attrIds);

}

4、将数据发送给es进行保存

修改“com.cwh.gulimall.product.feign.SearchFeignService”类,代码如下:

package com.cwh.gulimall.product.feign;

import com.cwh.common.to.es.SkuEsModel;

import com.cwh.common.utils.R;

import org.springframework.cloud.openfeign.FeignClient;

import org.springframework.web.bind.annotation.PostMapping;

import org.springframework.web.bind.annotation.RequestBody;

import java.util.List;

@FeignClient("gulimall-search")

public interface SearchFeignService {

@PostMapping("search/save/product")

public R productStatusUp(@RequestBody List skuEsModels);

}

4.1、创建gulimall-search

1、添加pom

org.springframework.boot

spring-boot-starter-parent

2.3.5.RELEASE

com.cwh.gulimall

gulimall-search

0.0.1-SNAPSHOT

gulimall-search

ElasticSearch检索服务

1.8

7.4.2

org.elasticsearch.client

elasticsearch-rest-high-level-client

7.4.2

com.auguigu.gulimall

gulimall-commom

0.0.1-SNAPSHOT

org.springframework.boot

spring-boot-starter-web

org.springframework.boot

spring-boot-starter-test

test

org.junit.vintage

junit-vintage-engine

org.springframework.boot

spring-boot-maven-plugin

2、修改yml

spring.cloud.nacos.discovery.server-addr=127.0.0.1:8848

spring.application.name=gulimall-search

server.port=12000

3、添加主配置类

package com.cwh.gulimall.search;

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;

import org.springframework.cloud.client.discovery.EnableDiscoveryClient;

@EnableDiscoveryClient

@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)

public class GulimallSearchApplication {

public static void main(String[] args) {

SpringApplication.run(GulimallSearchApplication.class, args);

}

}

4、配置ElaseaticSearch

修改“com.cwh.gulimall.search.config.GulimallElasticSearchConfig”类,代表如下:

package com.cwh.gulimall.search.config;

import org.apache.http.HttpHost;

import org.elasticsearch.client.*;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.web.client.RestOperations;

/**

* 1、导入依赖

* 2、编写配置,给容器中注入一个RestHighLevelClient

* 3、参照API操作

*/

@Configuration

public class GulimallElasticSearchConfig {

public static final RequestOptions COMMON_OPTIONS;

static {

RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();

// builder.addHeader("Authorization", "Bearer " + TOKEN);

// builder.setHttpAsyncResponseConsumerFactory(

// new HttpAsyncResponseConsumerFactory

// .HeapBufferedResponseConsumerFactory(30 * 1024 * 1024 * 1024));

COMMON_OPTIONS = builder.build();

}

@Bean

public RestHighLevelClient restHighLevelClient() {

RestClientBuilder builder = RestClient.builder(new HttpHost("192.168.43.125", 9200, "http"));

return new RestHighLevelClient(builder);

}

}

修改“com.cwh.gulimall.search.controller.ElasticSaveController”类,代表如下:

package com.cwh.gulimall.search.controller;

import com.cwh.common.constant.ProductConstant;

import com.cwh.common.exception.BizCodeEnume;

import com.cwh.common.to.es.SkuEsModel;

import com.cwh.common.utils.R;

import com.cwh.gulimall.search.service.ProductSaveService;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.PostMapping;

import org.springframework.web.bind.annotation.RequestBody;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RestController;

import java.util.List;

@RequestMapping("/search/save")

@RestController

@Slf4j

public class ElasticSaveController {

@Autowired

ProductSaveService productSaveService;

/**

* 上架商品

*/

@PostMapping("/product")

public R productStatusUp(@RequestBody List skuEsModels) {

boolean b;

try {

b = productSaveService.productStatusUp(skuEsModels);

} catch (Exception e) {

log.error("ElasticSaveController商品上架错误:{}", e);

return R.error(BizCodeEnume.PRODUCT_UP_EXCEPTION.getCode(), BizCodeEnume.PRODUCT_UP_EXCEPTION.getMsg());

}

if (!b) {

return R.ok();

} else {

return R.error(BizCodeEnume.PRODUCT_UP_EXCEPTION.getCode(), BizCodeEnume.PRODUCT_UP_EXCEPTION.getMsg());

}

}

}

修改“com.cwh.gulimall.search.service.ProductSaveService”类,代表如下:

public class EsConstant {

public static final String PRODUCT_INDEX = "product"; //sku数据在es中的索引

}

修改“com.cwh.gulimall.search.service.impl.ProductSaveServiceImpl”类,代表如下:

package com.cwh.gulimall.search.service.impl;

import com.alibaba.fastjson.JSON;

import com.cwh.common.to.es.SkuEsModel;

import com.cwh.gulimall.search.config.GulimallElasticSearchConfig;

import com.cwh.gulimall.search.constant.EsConstant;

import com.cwh.gulimall.search.service.ProductSaveService;

import lombok.extern.slf4j.Slf4j;

import org.elasticsearch.action.bulk.BulkRequest;

import org.elasticsearch.action.bulk.BulkResponse;

import org.elasticsearch.action.index.IndexRequest;

import org.elasticsearch.client.RestHighLevelClient;

import org.elasticsearch.common.xcontent.XContentType;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;

import java.io.IOException;

import java.util.Arrays;

import java.util.List;

import java.util.stream.Collectors;

@Slf4j

@Service

public class ProductSaveServiceImpl implements ProductSaveService {

@Autowired

RestHighLevelClient restHighLevelClient;

@Override

public boolean productStatusUp(List skuEsModels) throws IOException {

// 保存到es

// 1、给es中建立索引。product,建立好映射关系

// 2、给es中保存这些数据

// BulkRequest bulkRequest, RequestOptions options

BulkRequest bulkRequest = new BulkRequest();

for (SkuEsModel model : skuEsModels) {

// 1、构造保存请求

IndexRequest indexRequest = new IndexRequest(EsConstant.PRODUCT_INDEX);

indexRequest.id(model.getSkuId().toString());

String jsonString = JSON.toJSONString(model);

indexRequest.source(jsonString, XContentType.JSON);

bulkRequest.add(indexRequest);

}

BulkResponse bulk = restHighLevelClient.bulk(bulkRequest, GulimallElasticSearchConfig.COMMON_OPTIONS);

// TODO 如果批量错误

boolean b = bulk.hasFailures();

List collect = Arrays.stream(bulk.getItems()).map(item -> item.getId()).collect(Collectors.toList());

log.info("商品上架完成:{},返回数据:{}", collect, bulk.toString());

return b;

}

}

4.2、修改当前spu的状态

修改"com.cwh.gulimall.product.dao.SpuInfoDao"类,代码如下:

void updateSpuStatus(@Param("spuId") Long spuId,@Param("code") int code);

修改"com.cwh.gulimall.product.dao.SpuInfoDao.xml"类,代码如下:

update pms_spu_info set publish_status=#{code},update_time=NOW() where id =#{spuId}