SpringBoot 配置 ElasticSearch

default

部署ES

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
version: '3'

services:
  elasticsearch:
    image: elasticsearch:8.1.0
    container_name: elasticsearch
    restart: unless-stopped
    # volumes:
    #   - "./elasticsearch/data:/usr/share/elasticsearch/data"
    #   - "./elasticsearch/logs:/usr/share/elasticsearch/logs"
    #   - "./elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml"
    environment: 
      TZ: Asia/Shanghai
      LANG: en_US.UTF-8
      discovery.type: single-node
      ES_JAVA_OPTS: "-Xmx512m -Xms512m"
      ELASTIC_PASSWORD: "123456"
    ports:
      - "9200:9200"
      - "9300:9300"
    networks:
      - es

  # kibana:
  #   image: registry.cn-hangzhou.aliyuncs.com/zhengqing/kibana:7.14.1       # 原镜像`kibana:7.14.1`
  #   container_name: kibana
  #   restart: unless-stopped
  #   volumes:
  #     - ./elasticsearch/kibana/config/kibana.yml:/usr/share/kibana/config/kibana.yml
  #   ports:
  #     - "5601:5601"
  #   depends_on:
  #     - elasticsearch
  #   links:
  #     - elasticsearch
  #   networks:
  #     - es

networks:
  es:
    driver: bridge
1
docker-compose up -d

配置依赖

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<dependencies>

    <dependency>
        <groupId>co.elastic.clients</groupId>
        <artifactId>elasticsearch-java</artifactId>
        <version>8.13.1</version>
    </dependency>

    <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-client</artifactId>
        <version>8.13.1</version>
        <scope>compile</scope>
    </dependency>

    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.17.0</version>
    </dependency>

    <dependency>
        <groupId>jakarta.json</groupId>
        <artifactId>jakarta.json-api</artifactId>
        <version>2.0.1</version>
    </dependency>

</dependencies>

配置文件

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
server:
  port: 9010

spring:
  elasticsearch:
    uris: https://192.168.20.152:9200
#    password: b3M*r6f3bs+qAmsRTlAu
    username: elastic
    password: 123456
  #    password: buWFYsXinLs0ElvZ47UB
  servlet:
    multipart:
      max-file-size: 10000MB
      max-request-size: 10000MB

客户端连接

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Slf4j
@Component
public class Config {

    @Autowired
    ElasticsearchProperties elasticsearchProperties;

    @Bean
    public ElasticsearchClient getRestClientByPassword() throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
        // URL and API key
        String serverUrl = elasticsearchProperties.getUris().get(0);
        String username = elasticsearchProperties.getUsername();
        String password = elasticsearchProperties.getPassword();

        final CredentialsProvider credentialsProvider =
                new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY,
                new UsernamePasswordCredentials(username, password));

        SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, (arg0, arg1) -> true).build();
        RestClientBuilder builder = RestClient.builder(HttpHost.create(serverUrl))
                .setHttpClientConfigCallback(httpClientBuilder -> {
                    httpClientBuilder.setSSLContext(sslContext)
                            .setSSLHostnameVerifier((s, sslSession) -> true)
                            .setDefaultCredentialsProvider(credentialsProvider);
                    return httpClientBuilder;
                });
        try {
            RestClient restClient = builder.build();
            ElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
            ElasticsearchClient client = new ElasticsearchClient(transport);
            if (client.ping().value()) {
                log.info("-----------------初始化 es 客户端成功!-----------------");
                return client;
            }
        } catch (Exception e) {
            throw new RuntimeException("初始化 es 客户端失败!", e);
        }
        return null;
    }
}

使用

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
package com.gear.poi.search.es.dao;

import cn.hutool.core.lang.Snowflake;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.GeoDistanceType;
import co.elastic.clients.elasticsearch.core.GetResponse;
import co.elastic.clients.elasticsearch.core.IndexResponse;
import co.elastic.clients.elasticsearch.core.SearchResponse;
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
import co.elastic.clients.elasticsearch.core.search.Hit;
import co.elastic.clients.elasticsearch.indices.CreateIndexResponse;
import co.elastic.clients.transport.endpoints.BooleanResponse;
import com.gear.poi.search.es.dto.BaseDTO;
import com.gear.poi.search.es.dto.PoiDTO;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.locationtech.jts.geom.Point;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

@Slf4j
@Component
@RequiredArgsConstructor
public class EsPoiRepository {

    private final ElasticsearchClient esClient;

    /**
     * 雪花ID
     */
    private final Snowflake snowflake = new Snowflake(1, 1);


    /**
     * 创建索引
     *
     * @param index 指数
     * @return {@link Boolean}
     */
    public Boolean createIndexWithPointField(String index, String pointFieldName) {
        try {
            CreateIndexResponse response = esClient.indices().create(c -> c
                    .index(index)
                    .mappings(m -> m.properties(pointFieldName, p -> p.geoPoint(g -> g.ignoreZValue(true))))
            );
        } catch (Exception e) {
            log.warn("创建索引失败", e);
            return false;
        }
        return true;
    }

    /**
     * 删除索引
     *
     * @param index 指数
     * @return {@link Boolean}
     */
    public Boolean deleteIndex(String index) {
        try {
            esClient.indices().delete(d -> d
                    .index(index)
            );
        } catch (Exception e) {
            log.warn("删除索引失败", e);
            return false;
        }
        return true;
    }

    /**
     * 存在索引
     *
     * @param index 指数
     * @return {@link Boolean}
     */
    public Boolean existsIndex(String index) {
        try {
            BooleanResponse exists = esClient.indices().exists(d -> d
                    .index(index)
            );
            return exists.value();
        } catch (Exception e) {
            log.warn("查询索引失败", e);
            return false;
        }
    }

    /**
     * 插入文档
     *
     * @param t t
     * @return long
     */
    public <T extends BaseDTO> long insertDoc(String index, T t) {
        IndexResponse response = null;
        if (t.getId() == null) {
            t.setId(snowflake.nextIdStr());
        }
        try {
            response = esClient.index(i -> i
                    .index(index)
                    .id(t.getId())
                    .document(t)
            );
        } catch (Exception e) {
            log.warn("插入文档失败", e);
        }
        return response.version();
    }

    /**
     * 获取文档
     *
     * @param index  指数
     * @param id     身份证件
     * @param tClass t类
     * @return {@link T}
     */
    public <T extends BaseDTO> T getDoc(String index, String id, Class<T> tClass) {
        try {
            GetResponse<T> response = esClient.get(g -> g
                            .index(index)
                            .id(id),
                    tClass
            );
            if (response.found()) {
                return response.source();
            } else {
                log.warn("未查询到文档");
            }
        } catch (Exception e) {
            log.warn("查询文档失败", e);
        }
        return null;
    }

    /**
     * 搜索文档
     *
     * @param index      指数
     * @param fieldName  字段名称
     * @param searchText 搜索文本
     * @param tClass     t类
     * @return {@link List}<{@link T}>
     */
    public <T> List<T> searchDocs(String index, String fieldName, String searchText, Class<T> tClass) {
        SearchResponse<T> response = null;
        try {
            response = esClient.search(s -> s
                            .index(index)
                            .query(q -> q
                                    .match(t -> t
                                            .field(fieldName)
                                            .query(searchText)
                                    )
                            ),
                    tClass
            );
            return response.hits().hits().stream().map(Hit::source).collect(Collectors.toList());
        } catch (Exception e) {
            log.warn("查询文档失败", e);
        }
        return new ArrayList<>();
    }

    /**
     * 更新文档
     *
     * @param index  指数
     * @param t      t
     * @param tClass t类
     * @return boolean
     */
    public <T extends BaseDTO> boolean updateDoc(String index, T t, Class<T> tClass) {
        try {
            esClient.update(u -> u
                            .index(index)
                            .id(t.getId())
                            .upsert(t),
                    tClass
            );
            return true;
        } catch (Exception e) {
            log.warn("更新文档失败", e);
            return false;
        }
    }

    /**
     * 删除单据
     *
     * @param index 指数
     * @param id    身份证件
     * @return boolean
     */
    public <T extends BaseDTO> boolean deleteDoc(String index, String id) {
        try {
            esClient.delete(d -> d
                    .index(index)
                    .id(id)
            );
            return true;
        } catch (Exception e) {
            log.warn("删除文档失败", e);
            return false;
        }
    }

    public <T> List<T> searchByDistance(String index, Point pos, Double distance, Class<T> tClass) {
        try {
            SearchResponse<T> response = esClient.search(s -> s
                            .index(index)
                            .query(q -> q
                                    .bool(b -> b
                                            .must(m -> m.matchAll(m2 -> m2))
                                            .filter(f -> f.geoDistance(
                                                    g -> g.distance(String.valueOf(distance))
                                                            .distanceType(GeoDistanceType.Arc)
                                                            .field("location")
                                                            .location(l -> l
                                                                    .latlon(l2 -> l2
                                                                            .lat(pos.getY())
                                                                            .lon(pos.getX())
                                                                    )
                                                            )
                                            ))
                                    )
                            ),
                    tClass
            );
            return response.hits().hits().stream().map(Hit::source).collect(Collectors.toList());
        } catch (Exception e) {
            log.warn("查询文档失败", e);
        }
        return new ArrayList<>();
    }

    public boolean saveAllAndFlush(String index, List<PoiDTO> poiEntityList) {
        if (CollectionUtils.isEmpty(poiEntityList)) {
            return false;
        }
        List<BulkOperation> bulkOperations = new ArrayList<>();
        for (PoiDTO poiDTO : poiEntityList) {
            bulkOperations.add(BulkOperation.of(b -> b
                            .index(i -> i
                                    .index(index)
                                    .document(poiDTO)
                                    .id(poiDTO.getId())
                            )
                    )
            );
        }
        try {
            esClient.bulk(b -> b.operations(bulkOperations));
            return true;
        } catch (IOException e) {
            log.warn("批量插入文档失败", e);
            return false;
        }
    }
}
Gear(夕照)的博客。记录开发、生活,以及一些不足为道的思考……