Vert.x 踩坑记:在Vert.x中使用ElasticSearch进行高亮等高级查询协程化

1. 测试环境

  • Vert.x : 4.0.0-milestone4

  • kotlin-coroutines :1.3.20

  • jackson-databind : 2.11.0

    Vert.x 内置 jackson未提供映射到POJO实体的ObjectMapper实现

  • ElasticSearch : 6.8

  • Elasticsearch-client (for vert.x) : 0.2 - ec 6.7.0

👇贴出 Gradle(kotlin) 的关键依赖,Maven同理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//.... 

/**
* 仅贴出所需部分依赖,余下需自行在官网利用 starter 生成
*/

ext {
vertxVersion = '4.0.0-milestone4'
}


dependencies {
//objectMapper实现
implementation "com.fasterxml.jackson.core:jackson-databind:2.11.0"
//es vert.x 第三方兼容客户端
implementation "io.reactiverse:elasticsearch-client:0.2-ec6.7.0"
//kotlin
implementation "io.vertx:vertx-lang-kotlin-coroutines:$vertxVersion"
implementation "io.vertx:vertx-lang-kotlin:$vertxVersion"

// .....
}

//.....

2. 初始化Es客户端

io.reactiverse:elasticsearch-client 在 RestHighLevelClient 基础上将异步响应接口简单封装了一层 Vert.x 范的AsyncResult,故而迁移 RestHighLevelClient下代码仅需将请求发送方法更改为elasticsearch-client封装的相对应方法即可

  • io.reactiverse:elasticsearch-client 的封装实现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    /**
    * io.reactiverse.elasticsearch.client.RestHighLevelClient
    */

    @Override()
    public void searchAsync(SearchRequest searchRequest, RequestOptions options, Handler<AsyncResult<SearchResponse>> handler) {
    Context context = vertx.getOrCreateContext();
    delegate.searchAsync(searchRequest, options, new ActionListener<SearchResponse>() {

    @Override
    public void onResponse(SearchResponse value) {
    //不阻塞地将异步响应注册到指定Verticle的运行时上下文
    context.runOnContext(v -> handler.handle(Future.succeededFuture(value)));
    }

    @Override
    public void onFailure(Exception e) {
    context.runOnContext(v -> handler.handle(Future.failedFuture(e)));
    }
    });
    }
  • 在 Verticle初始化ES客户端

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    class ElasticSearchVerticle : CoroutineVerticle() {

    private lateinit var client: RestHighLevelClient
    /**
    * Verticle 实例启动时的回调方法
    */
    override suspend fun start() {
    client = RestHighLevelClient.create(
    // vertx即当前 Verticle 实例
    vertx, RestClient
    .builder(HttpHost("127.0.0.1", 9200, "http"))
    //所写项目功能单一,故指定唯一 index
    .setPathPrefix("/items/")
    )
    }
    }

4. 构造查询语句

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//koltin 中实现单例模式的语法糖  
companion object {
private val logger = LoggerFactory.getLogger(ElasticSearchVerticle::class.java)
//高亮
private val highlightBuilder: HighlightBuilder = HighlightBuilder()
.preTags("<em>").postTags("</em>")
.field("q")
.field("o")
}

private suspend fun searchByQuestionAndOptionsAwait(qStr: String): List<TestEntity> {
if (qStr.length <= 5)
throw SecurityException("字数过短")
val boolBuilder = QueryBuilders.boolQuery()
//多个字段匹配 属性值 must query
val matchQueryBuilder =
QueryBuilders.multiMatchQuery(qStr, "q", "o")
boolBuilder.must(matchQueryBuilder)
val searchSourceBuilder = SearchSourceBuilder()
.query(boolBuilder)
.size(3)
//30分为界
.minScore(30.0f)
.highlighter(highlightBuilder)
// 协程化
return awaitResult {
return@awaitResult mapperHandle(TestEntity::class.java, searchSourceBuilder)
}
}

5. 高亮查询结果映射处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
//koltin 中实现单例模式的语法糖  
companion object {
// 字段映射器
private val mapper = ObjectMapper()
//允许忽略不存在的字段
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
}

/**
* 映射处理,高亮字段等,映射到实体等
*/
private fun <T> mapperHandle(
clazz: Class<T>,
sourceBuilder: SearchSourceBuilder
) {
//发起查询请求
return client.searchAsync(SearchRequest().source(sourceBuilder), RequestOptions.DEFAULT) {
if (it.succeeded()) {
val searchHits = it.result()?.hits
val list = ArrayList<T>()
//遍历搜索命中结果
searchHits?.forEach { searchHit ->
val source = searchHit.sourceAsMap
// fragments 因所写项目功能单一故处理不是很恰当
// 将原本与高亮域对应的原生字段替换为高亮后的结果
source["q"] =
searchHit.highlightFields["q"]?.fragments?.get(0)?.toString() ?: source["q"]
source["o"] =
searchHit.highlightFields["o"]?.fragments?.get(0)?.toString() ?: source["o"]
//映射到 pojo
list.add(
mapper.readValue(
mapper.writeValueAsString(source), clazz
)
)
}
}
}
}

6.执行查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 回调化(未贴出代码)
searchByQuestionAndOptionsAwait("HelloVert.x") {
if (it.succeeded())
//...成功
it.result()
else
//...失败
it.cause()
}
// 协程同步化
try{
val result= searchByQuestionAndOptionsAwait("HelloVert.x")
//...成功
}catch(e){
//...失败
}

然而协程在vert.x中解决Callback Hell问题暂时也并非’银弹’,比如在 eventbus 中充当消费者时会不可避免的出现以下状况

1
2
3
4
5
6
7
8
vertx.eventBus().localConsumer<String>(this.javaClass.name) { msg ->
launch {
val x = withContext(Dispatchers.Default) {
searchByQuestionAndOptionsAwait("HelloVert.x")
}
msg.reply(x)
}
}

层层回调,和普通异步回调相比似乎并没有减轻多少压力,且增加了更多心智负担(协程调度器),当然也可以尝试自行协程化封装,期待Vert.x 官方能在后续更新加强对kotlin的支持