WebFlux 下的 ReactiveElasticSearch 高级查询
非 Reactive 的SpringData-ElasticSearch 可利用ESRepository达到高级查询的目的,但 Reative因支持较晚
,目前缺乏 search( criteria ) 等高级查询实现,暂时无法利用 Reacitve-JPA 的 Repository 实现高级查询,很尴尬的鸡肋存在
不过好在ReactiveElasticsearchTemplate支持相对较为"全面"
目的:在reactive 的 webflux 体系下实现 es 查询多字段并高亮查询命中结果
了解到 JPA 后遂拉入响应式的依赖,调用接口实现,一顿操作猛如虎...然后开心的在IDEA 敲下. 体验 JPA 的魔法
多次google,百度相关资料全网无果,被逼无奈那就自己走源码实现吧👩🦯
1. 解决构造查询丢失条件问题
查阅 JPA 相关资料了解到了Template这个相对底层的封装,发现ReactiveElasticsearchTemplate提供了传入查询构造器的 find () 方法
通过debug观察其运行流程发现了查询请求前置hook 方法prepareSearchRequest,看到这个 hook 后,问题已经解决了80%
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
   |  class CustomReactiveESTemplate(client: ReactiveElasticsearchClient?, converter: ElasticsearchConverter?, resultsMapper: ResultsMapper?) : 		   ReactiveElasticsearchTemplate(client, converter, resultsMapper) {          override fun prepareSearchRequest(request: SearchRequest): SearchRequest {         if (request.preference().isNotEmpty()) {             val sourceQuery = request.source()              						val fields = request.preference().split("#")                          val highlightBuilder = HighlightBuilder()                     .preTags("<em>").postTags("</em>")                          fields.forEach {                 highlightBuilder.field(it)             }             sourceQuery.highlighter(highlightBuilder)                          request.preference("")         }         return super.prepareSearchRequest(request)     } }
 
  | 
你也许会说既然允许自己构造查询了,何必大费周章还带有破坏性地写hook 函数,直接构造高级查询传入不行吗?
我一开始也这样想的且这样做的,不过行不通,位于ReactiveElasticsearchTemplate的find() 会调用到buildSearchRequest(),该函数只读取特定查询,高亮查询并不在此列,只能自行通过预置在buildSearchRequest()之后的 hook 函数,加入高级查询
2. 向 SpringBoot注入自定义hook的ReactiveESTemplate
在注入后并不意味着结束,还会有映射问题,因返回高亮结果不能和 pojo 自动映射,故还需自行扩展 DefaultResultMapper()修改映射
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
   | @Configuration(proxyBeanMethods = false) class ReactiveESTemplateConfiguration() {     @Bean     fun reactiveElasticsearchTemplate(client: ReactiveElasticsearchClient?, converter: ElasticsearchConverter?): CustomReactiveESTemplate? {                  val resultsMapper = object : DefaultResultMapper() {             override fun <T : Any?> mapSearchHit(searchHit: SearchHit?, type: Class<T>?): T? {                 if (!searchHit!!.hasSource())                     return null
                  val source = searchHit.sourceAsMap                 if (!source.containsKey("id") || source["id"] == null)                     source["id"] = searchHit.id                 val highlightFields = searchHit.highlightFields 								                  								                 source["q"] = highlightFields.get("q")                         ?.fragments                         ?.get(0)?.toString()                         ?: source["a"]                 source["a"] =                         highlightFields.get("a")                                 ?.fragments                                 ?.get(0)?.toString()                                 ?: source["a"]               	                 val mappedResult: Any = entityMapper.readObject(source, type) as Any                 return if (type!!.isInterface)                     projectionFactory.createProjection(type, mappedResult)                 else                     type.cast(mappedResult)             }         }
                   val template = CustomReactiveESTemplate(client, converter,                 resultsMapper)         template.setIndicesOptions(IndicesOptions.strictExpandOpenAndForbidClosed())         template.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)         return template     } }
   | 
3. 在 WebFlux 调用查询
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
   | @Component class ItemsESRepository {     @Autowired     private lateinit var customReactiveESTemplate: CustomReactiveESTemplate
      
 
      private val pageRequest = PageRequest.of(0, 5)
      
 
      private val minSearchLength = 5
      
 
      suspend fun searchByQuestionAndOptions(questions: String): Flux<TestEntity> {         
          val boolBuilder = QueryBuilders.boolQuery()                  val matchQueryBuilder = QueryBuilders.multiMatchQuery(questions, "q", "a")         boolBuilder.must(matchQueryBuilder)         val query = NativeSearchQueryBuilder()                 .withQuery(boolBuilder)                 .withPageable(pageRequest)                 .build()         query.preference = "q#a"
          return customReactiveESTemplate                 .find(query, TestEntity::class.java)     } }
   | 
结语
至此,在webflux 中进行es的高亮多字段查询完成
对了 SpringBoot 版本为 2.2.6.RELEASE, ES 版本为6.8
Spring WebFlux 目前生态算不上太好,总的来说Rective还是大势所趋的,期待loom的早日完工