怎么跟数据商聊天 老板再也不用为
相信负责过“搜索服务”的伙伴,最害怕的一句话就是:“数据怎么又搜索不出来了!!!”。每当收到这句话,都会心中一颤,因为面对几千万甚至几亿的索引数据,我真的无从下手,不知道业务要搜索什么,也不知道是哪些数据出了问题….
1.1. 背景目前,“搜索”已经成为后端管理平台的必备功能,在这个业务场景中,很多人都会基于 elasticsearch 强大的检索能力构建自己的搜索服务。但实际开发中,Elasticsearch 的引入是非常小的一部分,往往大头是索引模型的数据管理,在整个过程中,我们
- 需要根据业务需求构建检索模型和ES存储模型;
- 需要从多个数据源中获取数据,并填充到检索模型;
- 需要关注所有数据源的数据变化,并对变更数据进行索引重建;
- 需要对不一致的数据进行识别和处理…
如此繁琐的事情,哪一环出现问题都会收到业务的投诉。
1.2. 目标对搜索场景中的最佳实践进行封装,从而:
- 降低开发成本,开发人员将精力放在模型构建上,抛开繁琐的技术细节;
- 对数据索引、关联数据更新有很好的支持;
- 引入数据实时巡检能力,对于数据不一致的情况进行自动修复;
- 引入天级对账机制,保障数据的一致性;
首先,增加对 spring data elasticsearch 的支持,具体 maven 坐标如下:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
在 application.yml 中添加 es 的配置信息,具体如下:
spring:
elasticsearch:
uris:http://localhost:9200
connection-timeout:10s
socket-timeout:30s
新建 SpringESConfiguration 配置信息,指定 ES Repository 的包信息,居然如下:
@Configuration
@EnableElasticsearchRepositories(basePackages="com.geekhalo.lego.wide.es")
publicclassSpringESConfiguration{
}
最后,引入 lego-starter,具体如下:
<groupId>com.geekhalo.lego</groupId>
<artifactId>lego-starter</artifactId>
<version>0.1.14-wide-SNAPSHOT</version>
至此,就完成了项目的准备工具,可以着手构建索引模型。
2.2. 构建模型构造模型之前,需要构建一个 Enum 用以管理模型中所有关联数据,具体如下:
publicenumWideOrderTypeimplementsWideItemType<WideOrderType>{
ORDER,//订单主数据
USER,//用户数据
ADDRESS,//用户地址数据
PRODUCT//购买商品数据
}
WideOrderType 枚举实现 WideItemType 接口,用于与框架进行集成。
接下来,构建待索引的宽表模型,具体如下:
@Data
@NoArgsConstructor
@AllArgsConstructor
@Document(indexName="wide_order")
publicclassWideOrderextendsBindFromBasedWide<Long,WideOrderType>{
@org.springframework.data.annotation.Id
privateLongid;
@BindFrom(sourceClass=Order.class,field="userId")
privateLonguserId;
@BindFrom(sourceClass=Order.class,field="addressId")
privateLongaddressId;
@BindFrom(sourceClass=Order.class,field="productId")
privateLongproductId;
@BindFrom(sourceClass=Order.class,field="descr")
privateStringorderDescr;
@BindFrom(sourceClass=User.class,field="name")
privateStringuserName;
@BindFrom(sourceClass=Address.class,field="detail")
privateStringaddressDetail;
@BindFrom(sourceClass=Product.class,field="name")
privateStringproductName;
@BindFrom(sourceClass=Product.class,field="price")
privateIntegerproductPrice;
publicWideOrder(LongorderId){
setId(orderId);
}
@Override
publicLonggetId(){
returnid;
}
@Override
publicbooleanisValidate(){
returnuserId!=null&&addressId!=null&&productId!=null;
}
@Override
publicList<WideItemKey>getItemsKeyByType(WideOrderTypewideOrderType){
switch(wideOrderType){
caseORDER:
returnCollections.singletonList(newWideItemKey(wideOrderType,getId()));
caseUSER:
returnCollections.singletonList(newWideItemKey(wideOrderType,getUserId()));
caseADDRESS:
returnCollections.singletonList(newWideItemKey(wideOrderType,getAddressId()));
casePRODUCT:
returnCollections.singletonList(newWideItemKey(wideOrderType,getProductId()));
}
returnCollections.emptyList();
}
}
该模型有如下几个特点:
- 存在很多属性,是由多个表数据共同组成的“宽表”;
- 除 id 属性外,其他属性上都有 @BindFrom 注解,用于标明该字段的数据是来自于哪个实体的那个字段;
- 继承自 BindFromBasedWide<Long, WideOrderType>,其中 Long 为模型主键,WideOrderType 为刚建的枚举,BindFromBasedWide 将根据字段上的 @BindFrom 注解自动完成 数据更新 和 数据比对;
- Long getId() 方法返回模型的主键信息;
- boolean isValidate() 用于对数据的有效性进行验证,无效数据将不会进行持久化处理
- List<WideItemKey> getItemsKeyByType(WideOrderType wideOrderType) 根据关联数据类型(WideOrderType)返回不同键信息,以进行数据组装;
至此,模型就建立完毕。
2.3. 数据提供器有了模型后,我们需要构建一些组件用于为“宽表”提供数据,这就是 WideItemDataProvider 体系。
我们以 OrderProvider 为例,具体如下:
@Component @org.springframework.core.annotation.Order(value=Ordered.HIGHEST_PRECEDENCE) publicclassOrderProviderimplementsWideItemDataProvider<WideOrderType,Long,Order>{ @Autowired privateOrderDaoorderDao; @Override publicList<Order>apply(List<Long>key){ returnorderDao.findAllById(key); } @Override publicWideOrderTypegetSupportType(){ returnWideOrderType.ORDER; } }
该类有如下特点:
- 实现 WideItemDataProvider 接口,其中 WideOrderType 为刚刚定义的枚举,Long 为 Order 模型的关联键类型,Order 为要提供的数据;
- List<Order> apply(List<Long> key),根据 key 获得对应的数据;
- WideOrderType getSupportType(),获取该组件所支持的 关联类型;
- @Component 标记该类为 Spring 的托管 Bean;
- @Order(value = Ordered.HIGHEST_PRECEDENCE) 指定组件的顺序,由于为 WideOrder 提供主数据,优先级调到最高;
每一类关联数据都会提供自己的数据提供器,简单看下 UserProvider 实现,具体如下:
@Component publicclassUserProviderimplementsWideItemDataProvider<WideOrderType,Long,User>{ @Autowired privateUserDaouserDao; @Override publicList<User>apply(List<Long>key){ returnuserDao.findAllById(key); } @Override publicWideOrderTypegetSupportType(){ returnWideOrderType.USER; } }
和 OrderProvider 没有本质区别,当然,demo 中还提供了多种实现,如:
2.4. 构建宽表仓库
- OrderProvider,提供订单主数据;
- UserProvider,提供用户信息;
- AddressProvider,提供用户地址信息;
- ProductProvider,提供商品信息;
数据都准备好了,需要将 “宽表” 进行持久化,将其放入最合适的存储引擎,以便更好的处理查询请求。
基于 ElasticsearchRepository 的 WideOrderRepository 具体如下:
@Repository publicclassWideOrderRepositoryimplementsWideCommandRepository<Long,WideOrderType,WideOrder>{ @Autowired privateWideOrderESDaowideOrderDao; @Override publicvoidsave(List<WideOrder>wides){ wideOrderDao.saveAll(wides); } @Override publicList<WideOrder>findByIds(List<Long>masterIds){ returnLists.newArrayList(wideOrderDao.findAllById(masterIds)); } @Override public<KEY>voidconsumeByItem(WideOrderTypewideOrderType,KEYkey,Consumer<WideOrder>wideConsumer){ switch(wideOrderType){ casePRODUCT: this.wideOrderDao.findByProductId((Long)key).forEach(wideConsumer); caseADDRESS: this.wideOrderDao.findByAddressId((Long)key).forEach(wideConsumer); caseORDER: this.wideOrderDao.findById((Long)key).ifPresent(wideConsumer); caseUSER: this.wideOrderDao.findByUserId((Long)key).forEach(wideConsumer); } } @Override publicbooleansupportUpdateFor(WideOrderTypewideOrderType){ returnfalse; } @Override public<KEY>voidupdateByItem(WideOrderTypewideOrderType,KEYkey,Consumer<WideOrder>wideConsumer){ Consumer<WideOrder>updateAndSave=wideConsumer.andThen(wideOrder->wideOrderDao.save(wideOrder)); switch(wideOrderType){ casePRODUCT: this.wideOrderDao.findByProductId((Long)key).forEach(updateAndSave); caseADDRESS: this.wideOrderDao.findByAddressId((Long)key).forEach(updateAndSave); caseORDER: this.wideOrderDao.findById((Long)key).ifPresent(updateAndSave); caseUSER: this.wideOrderDao.findByUserId((Long)key).forEach(updateAndSave); } } @Override public<KEY>voidupdateByItem(WideOrderTypewideOrderType,KEYkey,WideItemData<WideOrderType,?>item){ } }
仓库具有如下特征:
- 实现 WideCommandRepository<Long, WideOrderType, WideOrder> 接口,其中 Long 是模型主键(也是宽表主键),WideOrderType 是之前定义的枚举,WideOrder 是宽表;
- void save(List<WideOrder> wides) 提供批量保存方法;
- List<WideOrder> findByIds(List<Long> masterIds) 提供根据主键批量查询方法;
- void consumeByItem(WideOrderType wideOrderType, KEY key, Consumer<WideOrder> wideConsumer),该方法主要用于数据巡检,根据类型 和 键信息 从底层引擎中获取数据,并进行部分比对,用于发现数据不一致情况;
- boolean supportUpdateFor(WideOrderType wideOrderType),该实现用于判断是否支持特定类型的批量更新,及依赖引擎能力批量对数据进行更新操作;
- void updateByItem(WideOrderType wideOrderType, KEY key, WideItemData<WideOrderType, ?> item),supportUpdateFor 返回为 true 时,调用该方法,使用引擎的更新能力批量对数据进行更新;
- void updateByItem(WideOrderType wideOrderType, KEY key, Consumer<WideOrder> wideConsumer),supportUpdateFor 返回为 false 时,调用该方法,根据 类型 和 键信息 依次查询所有数据,在内存中完成更新,并写回存储引擎;
所依赖的 WideOrderESDao 基于 ElasticsearchRepository 实现,具体如下:
2.5. 配置&整合
publicinterfaceWideOrderESDaoextendsElasticsearchRepository<WideOrder,Long>{ List<WideOrder>findByProductId(LongproductId); List<WideOrder>findByAddressId(LongaddressId); List<WideOrder>findByUserId(LonguserId); }
所有组件都已准备好,现在需要将他们整合在一起。
@Configuration publicclassWideOrderConfigurationextendsWideConfigurationSupport<Long,WideOrderType,WideOrder>{ @Autowired privateWideOrderRepositorywideOrderRepository; @Autowired privateList<WideItemDataProvider<WideOrderType,?,?extendsWideItemData<WideOrderType,?>>>wideItemDataProviders; @Bean publicWideIndexService<Long,WideOrderType>createWideIndexService(){ returnsuper.createWideIndexService(); } @Bean publicWideOrderPatrolServicewideOrderPatrolService(){ returnnewWideOrderPatrolService(createWidePatrolService()); } @Bean protectedWideService<Long,WideOrderType>createWideService( WideIndexService<Long,WideOrderType>wideIndexService, WideOrderPatrolServicewideOrderPatrolService){ returnsuper.createWideService(wideIndexService,wideOrderPatrolService); } @Override protectedWideFactory<Long,WideOrder>getWideFactory(){ returnWideOrder::new; } @Override protectedWideCommandRepository<Long,WideOrderType,WideOrder>getWideCommandRepository(){ returnthis.wideOrderRepository; } @Override protectedList<WideItemDataProvider<WideOrderType,?,?extendsWideItemData<WideOrderType,?>>>getWideItemProviders(){ returnthis.wideItemDataProviders; } }
WideOrderConfiguration 具有如下特点:
- 继承自 WideConfigurationSupport<Long, WideOrderType, WideOrder>,父类中存在大量的 createXXX 方法,可以大幅简单代码量;
- 使用 WideOrderRepository 作为宽表的仓库;
- 直接使用 Spring 容器中的所有 WideItemDataProvider 实现;
- 使用定制的 WideOrderPatrolService,为巡检增加延时支持;
其中自定义巡检 WideOrderPatrolService 代码如下:
publicclassWideOrderPatrolServiceimplementsWidePatrolService<Long,WideOrderType>{ privatefinalWidePatrolService<Long,WideOrderType>widePatrolService; publicWideOrderPatrolService(WidePatrolService<Long,WideOrderType>widePatrolService){ this.widePatrolService=widePatrolService; } @Override @DelayBasedRocketMQ(topic="wide_order_patrol",tag="SingleIndex",consumerGroup="order_patrol_group",delayLevel=2) publicvoidindex(LongaLong){ this.widePatrolService.index(aLong); } @Override publicvoidindex(List<Long>longs){ WideOrderPatrolServicewideOrderPatrolService=((WideOrderPatrolService)AopContext.currentProxy()); longs.forEach(id->wideOrderPatrolService.index(id)); } @Override public<KEY>voidupdateItem(WideOrderTypewideOrderType,KEYkey){ ((WideOrderPatrolService)AopContext.currentProxy()).updateItem(wideOrderType,(Long)key); } @DelayBasedRocketMQ(topic="wide_order_patrol",tag="UpdateByItem",consumerGroup="order_patrol_group",delayLevel=2) publicvoidupdateItem(WideOrderTypewideOrderType,Longid){ this.widePatrolService.updateItem(wideOrderType,id); } @Override publicvoidsetReindexConsumer(Consumer<List<Long>>consumer){ this.widePatrolService.setReindexConsumer(consumer); } }
WideOrderPatrolService 具体实现如下:
2.6. 实现效果
- 将大部分请求直接转发给内部的 widePatrolService 实例;
- 在索引和更新方法上增加了 @DelayBasedRocketMQ 注解,该注解使的方法拥有延时执行的能力,如果对该注解感兴趣可以翻找下之前的文章;
- 使用 AopContext 在类内获取 Proxy 对象并调用其方法,由于 AOP 实现的限制,在类中直接调用本类中的其他方法,不会触发拦截器;
万事具备只欠东风,写个测试用例测试下功能。
2.6.1. 数据索引首先,对数据进行索引,示例如下:
//保存User this.user=newUser(); this.user.setName("测试"); this.userDao.save(this.user); //保存Address this.address=newAddress(); this.address.setDetail("详细地址"); this.address.setUserId(this.user.getId()); this.addressDao.save(this.address); //保存Product this.product=newProduct(); this.product.setName("商品名称"); this.product.setPrice(100); this.productDao.save(this.product); //保存Order this.order=newOrder(); this.order.setUserId(this.user.getId()); this.order.setAddressId(this.address.getId()); this.order.setProductId(this.product.getId()); this.order.setDescr("我的订单"); this.orderDao.save(this.order); //进行索引 this.wideOrderService.index(this.order.getId()); //比对数据 Optional<WideOrder>optional=wideOrderDao.findById(this.order.getId()); Assertions.assertTrue(optional.isPresent()); WideOrderwideOrder=optional.get(); Assertions.assertEquals(order.getId(),wideOrder.getId()); Assertions.assertEquals(order.getAddressId(),wideOrder.getAddressId()); Assertions.assertEquals(order.getProductId(),wideOrder.getProductId()); Assertions.assertEquals(order.getUserId(),wideOrder.getUserId()); Assertions.assertEquals(order.getDescr(),wideOrder.getOrderDescr()); Assertions.assertEquals(user.getName(),wideOrder.getUserName()); Assertions.assertEquals(address.getDetail(),wideOrder.getAddressDetail()); Assertions.assertEquals(product.getName(),wideOrder.getProductName()); Assertions.assertEquals(product.getPrice(),wideOrder.getProductPrice());
单测成功运行后,数据已经成功写入到 ES,具体如下:
image
2.6.2. 数据更新更新操作,具体单测如下:
//更新订单描述 this.order.setDescr("订单详情"); this.orderDao.save(this.order); //触发索引更新 this.wideOrderService.updateOrder(this.order.getId()); //验证更新结果 Optional<WideOrder>optional=wideOrderDao.findById(this.order.getId()); Assertions.assertTrue(optional.isPresent()); WideOrderwideOrder=optional.get(); Assertions.assertEquals(order.getId(),wideOrder.getId()); Assertions.assertEquals(order.getDescr(),wideOrder.getOrderDescr());
单测成功运行后,数据已经完成更新,ES 数据具体如下:
image
2.6.3. 数据巡检仔细观察日志,会发现存在一组 Delay Task 日志,具体如下:
[main]c.g.l.core.delay.DelayMethodInterceptor:successtosentDelayTasktoRocketMQfor[126] [MessageThread_2]c.g.l.c.w.s.SimpleWidePatrolService:id126issame
3. 设计&扩展3.1. 核心设计
- 第一条日志是在提交索引时由主线程打印,向 RocketMQ 提交一个延时任务,用于对 id 为 126 的数据进行校验;
- 第二条是时间达到后由 Message Consumer 线程打印,表明 DB 与 ES 中的数据是相同的;
- 如果巡检时发现数据不同,将会自动对 126 进行索引,从而保障两者的一致性;
整体架构设计如下:
image
从功能角度,整体可分为如下几部分:
3.2. 功能扩展
- Index 索引部分。内部可以看成是一个基于 检索模型 的 Pipeline,从众多数据提供器中获取数据,并写入 检索模型,最终将填充完数据的检索模型写入的 ES 进行持久化存储;
- Query 查询部分。直接使用 ES 的 api 对成功索引的数据进行查询。
- 巡检部分。在数据变更时,会自动增加一个延时任务用于数据比较,巡检任务获取变更数据后与ES存储记录进行比较,如果数据不一致则向 Index 模块重新提交索引任务,对问题数据进行再次索引,从而对数据进行恢复;
wide 为宽表提供了索引和巡检能力支持,但在实际业务中需要处理多种情况,常见如下:
4. 项目信息
- 自动触发,这是系统核心流程之一,数据发生变化后,向 Index 提交新的索引任务。常见的实现策略有:
- 基于领域事件的索引。监听应用程序发出的领域事件,从而触发新数据的索引;
- 基于 binlog 的索引。MySQL 的变化全部记录在 binlog 中,可以通过 canal 等框架将 binlog 进行导出,用于触发数据索引;
- 手工回溯,手工触发索引流程,常见的场景有:
- 由于业务需要 ES 检索模型发生变更,需要重跑历史数据;
- 系统故障导致数据不一致,通过手工触发的方式对问题数据进行修复;
- 天级数据重建。每天凌晨对前一天的数据进行索引重建,主要目的为:
- 避免错误在 ES 进行累计,也就是在索引和巡检两个机制都不生效的情况下,对问题数据进行修复;
- 索引优化,在数据完成重建后,可以调用 ES 索引优化接口,对索引进行合并,从而提升系统查询性能;
项目仓库地址:https://gitee.com/litao851025/lego
项目文档地址:https://gitee.com/litao851025/lego/wikis/support/Wide 宽表
,
免责声明:本文仅代表文章作者的个人观点,与本站无关。其原创性、真实性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容文字的真实性、完整性和原创性本站不作任何保证或承诺,请读者仅作参考,并自行核实相关内容。文章投诉邮箱:anhduc.ph@yahoo.com