Elasticsearch REST Client in Java: Complete Guide

The Elasticsearch REST Client provides a powerful way to interact with Elasticsearch from Java applications. It offers high-level abstraction while maintaining flexibility and performance.

Setup and Dependencies

Maven Dependencies

<properties>
<elasticsearch.version>8.11.0</elasticsearch.version>
</properties>
<dependencies>
<!-- Elasticsearch Java Client -->
<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<!-- Jackson for JSON processing -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version>
</dependency>
<!-- Alternatively, use the legacy client -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>7.17.0</version>
</dependency>
<!-- Spring Boot Starter Data Elasticsearch -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
</dependencies>

Configuration and Setup

Basic REST Client Configuration

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.client.RestClient;
@Configuration
public class ElasticsearchConfig {
@Value("${elasticsearch.host:localhost}")
private String host;
@Value("${elasticsearch.port:9200}")
private int port;
@Value("${elasticsearch.username:}")
private String username;
@Value("${elasticsearch.password:}")
private String password;
@Bean
public RestClient restClient() {
HttpHost httpHost = new HttpHost(host, port);
RestClient.Builder builder = RestClient.builder(httpHost);
// Configure authentication if provided
if (!username.isEmpty() && !password.isEmpty()) {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(
AuthScope.ANY,
new UsernamePasswordCredentials(username, password)
);
builder.setHttpClientConfigCallback(httpClientBuilder ->
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
);
}
// Configure default headers
builder.setDefaultHeaders(new Header[]{
new BasicHeader("Content-Type", "application/json"),
new BasicHeader("User-Agent", "Java-ES-Client/1.0")
});
// Configure connection timeout and retries
builder.setRequestConfigCallback(requestConfigBuilder ->
requestConfigBuilder
.setConnectTimeout(5000)
.setSocketTimeout(60000)
);
return builder.build();
}
@Bean
public ElasticsearchTransport elasticsearchTransport() {
return new RestClientTransport(restClient(), new JacksonJsonpMapper());
}
@Bean
public ElasticsearchClient elasticsearchClient() {
return new ElasticsearchClient(elasticsearchTransport());
}
}

Advanced Configuration with Multiple Nodes

@Configuration
public class AdvancedElasticsearchConfig {
@Bean
public RestClient elasticsearchRestClient() {
return RestClient.builder(
new HttpHost("es-node1", 9200, "http"),
new HttpHost("es-node2", 9200, "http"),
new HttpHost("es-node3", 9200, "http")
)
.setHttpClientConfigCallback(httpClientBuilder -> {
// Configure connection pooling
httpClientBuilder.setMaxConnTotal(100);
httpClientBuilder.setMaxConnPerRoute(50);
// Configure keep-alive strategy
httpClientBuilder.setKeepAliveStrategy((response, context) -> 60000);
return httpClientBuilder;
})
.setRequestConfigCallback(requestConfigBuilder ->
requestConfigBuilder
.setConnectTimeout(5000)
.setSocketTimeout(30000)
.setConnectionRequestTimeout(1000)
)
.setFailureListener(new RestClient.FailureListener() {
@Override
public void onFailure(HttpHost host) {
System.err.println("Connection failed to host: " + host);
}
})
.build();
}
@Bean
public ElasticsearchClient elasticsearchClient() {
ElasticsearchTransport transport = new RestClientTransport(
elasticsearchRestClient(),
new JacksonJsonpMapper()
);
return new ElasticsearchClient(transport);
}
}

Document Model Classes

// Product document
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class Product {
private String id;
private String name;
private String description;
private Double price;
private String category;
private List<String> tags;
private Integer stock;
private Boolean active;
private LocalDateTime createdAt;
private Map<String, Object> attributes;
}
// User document
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class User {
private String id;
private String username;
private String email;
private String firstName;
private String lastName;
private Integer age;
private Address address;
private List<String> roles;
private LocalDateTime registeredAt;
private Boolean enabled;
}
// Address embedded object
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class Address {
private String street;
private String city;
private String state;
private String zipCode;
private String country;
}
// Search response wrapper
@Data
@AllArgsConstructor
@NoArgsConstructor
public class SearchResponse<T> {
private List<T> hits;
private long totalHits;
private long took;
private boolean timedOut;
}

CRUD Operations

Basic CRUD Service

@Service
@Slf4j
public class ElasticsearchService {
private final ElasticsearchClient client;
private final ObjectMapper objectMapper;
public ElasticsearchService(ElasticsearchClient client, ObjectMapper objectMapper) {
this.client = client;
this.objectMapper = objectMapper;
}
// Index (Create/Update) a document
public <T> String index(String indexName, T document) throws IOException {
IndexResponse response = client.index(i -> i
.index(indexName)
.document(document)
);
log.info("Document indexed: {}", response.id());
return response.id();
}
// Index with custom ID
public <T> String indexWithId(String indexName, String id, T document) throws IOException {
IndexResponse response = client.index(i -> i
.index(indexName)
.id(id)
.document(document)
);
log.info("Document indexed with ID: {}", response.id());
return response.id();
}
// Get document by ID
public <T> Optional<T> getById(String indexName, String id, Class<T> clazz) throws IOException {
GetResponse<T> response = client.get(g -> g
.index(indexName)
.id(id),
clazz
);
if (response.found()) {
return Optional.of(response.source());
}
return Optional.empty();
}
// Check if document exists
public boolean exists(String indexName, String id) throws IOException {
return client.exists(e -> e
.index(indexName)
.id(id)
).value();
}
// Update document
public <T> String update(String indexName, String id, T document) throws IOException {
UpdateResponse<T> response = client.update(u -> u
.index(indexName)
.id(id)
.doc(document),
clazz
);
log.info("Document updated: {}", response.id());
return response.id();
}
// Partial update using script
public String partialUpdate(String indexName, String id, Map<String, Object> fields) throws IOException {
UpdateResponse<Object> response = client.update(u -> u
.index(indexName)
.id(id)
.doc(fields),
Object.class
);
return response.id();
}
// Delete document
public boolean delete(String indexName, String id) throws IOException {
DeleteResponse response = client.delete(d -> d
.index(indexName)
.id(id)
);
log.info("Document deleted: {}", response.id());
return response.result().jsonValue().equals("deleted");
}
// Bulk operations
public BulkResponse bulkIndex(String indexName, List<Product> products) throws IOException {
BulkRequest.Builder br = new BulkRequest.Builder();
for (Product product : products) {
br.operations(op -> op
.index(idx -> idx
.index(indexName)
.id(product.getId())
.document(product)
)
);
}
return client.bulk(br.build());
}
}

Search Operations

Advanced Search Service

@Service
@Slf4j
public class ProductSearchService {
private final ElasticsearchClient client;
private static final String PRODUCT_INDEX = "products";
public ProductSearchService(ElasticsearchClient client) {
this.client = client;
}
// Simple search
public SearchResponse<Product> searchProducts(String query) throws IOException {
SearchResponse<Product> response = client.search(s -> s
.index(PRODUCT_INDEX)
.query(q -> q
.multiMatch(t -> t
.fields("name", "description", "category")
.query(query)
)
),
Product.class
);
List<Product> products = response.hits().hits().stream()
.map(hit -> hit.source())
.collect(Collectors.toList());
return new SearchResponse<>(
products,
response.hits().total().value(),
response.took(),
response.timedOut()
);
}
// Filtered search with range and term filters
public SearchResponse<Product> searchWithFilters(ProductSearchRequest searchRequest) throws IOException {
List<Query> mustQueries = new ArrayList<>();
List<Query> shouldQueries = new ArrayList<>();
List<Query> filterQueries = new ArrayList<>();
// Text search
if (searchRequest.getQuery() != null && !searchRequest.getQuery().isEmpty()) {
mustQueries.add(Query.of(q -> q
.multiMatch(m -> m
.fields("name^2", "description", "category")
.query(searchRequest.getQuery())
.fuzziness("AUTO")
)
));
}
// Category filter
if (searchRequest.getCategory() != null) {
filterQueries.add(Query.of(q -> q
.term(t -> t
.field("category")
.value(searchRequest.getCategory())
)
));
}
// Price range filter
if (searchRequest.getMinPrice() != null || searchRequest.getMaxPrice() != null) {
RangeQuery.Builder rangeQuery = new RangeQuery.Builder();
rangeQuery.field("price");
if (searchRequest.getMinPrice() != null) {
rangeQuery.gte(JsonData.of(searchRequest.getMinPrice()));
}
if (searchRequest.getMaxPrice() != null) {
rangeQuery.lte(JsonData.of(searchRequest.getMaxPrice()));
}
filterQueries.add(Query.of(q -> q.range(rangeQuery.build())));
}
// Active products only
filterQueries.add(Query.of(q -> q
.term(t -> t
.field("active")
.value(true)
)
));
// Tags search (should query for better scoring)
if (searchRequest.getTags() != null && !searchRequest.getTags().isEmpty()) {
for (String tag : searchRequest.getTags()) {
shouldQueries.add(Query.of(q -> q
.term(t -> t
.field("tags")
.value(tag)
)
));
}
}
// Build the final query
Query finalQuery = Query.of(q -> q
.bool(b -> b
.must(mustQueries)
.should(shouldQueries)
.filter(filterQueries)
.minimumShouldMatch(searchRequest.getTags() != null ? "1" : "0")
)
);
SearchResponse<Product> response = client.search(s -> s
.index(PRODUCT_INDEX)
.query(finalQuery)
.from(searchRequest.getFrom())
.size(searchRequest.getSize())
.sort(so -> so
.field(f -> f
.field("_score")
.order(SortOrder.Desc)
)
)
.highlight(h -> h
.fields("name", f -> f
.type(HighlighterType.Unified)
)
.fields("description", f -> f
.type(HighlighterType.Unified)
)
),
Product.class
);
List<Product> products = response.hits().hits().stream()
.map(hit -> {
Product product = hit.source();
// You can process highlights here
return product;
})
.collect(Collectors.toList());
return new SearchResponse<>(
products,
response.hits().total().value(),
response.took(),
response.timedOut()
);
}
// Aggregation search
public Map<String, Object> getProductAggregations() throws IOException {
SearchResponse<Product> response = client.search(s -> s
.index(PRODUCT_INDEX)
.size(0)
.aggregations("categories", a -> a
.terms(t -> t.field("category"))
)
.aggregations("price_stats", a -> a
.stats(st -> st.field("price"))
)
.aggregations("price_histogram", a -> a
.histogram(h -> h
.field("price")
.interval(50.0)
)
),
Product.class
);
Map<String, Aggregate> aggs = response.aggregations();
Map<String, Object> result = new HashMap<>();
// Process category aggregation
if (aggs.containsKey("categories")) {
List<? extends StringTermsBucket> buckets = aggs.get("categories").sterms().buckets().array();
Map<String, Long> categories = buckets.stream()
.collect(Collectors.toMap(
StringTermsBucket::key,
StringTermsBucket::docCount
));
result.put("categories", categories);
}
// Process price stats
if (aggs.containsKey("price_stats")) {
StatsAggregate stats = aggs.get("price_stats").stats();
result.put("price_stats", Map.of(
"count", stats.count(),
"min", stats.min(),
"max", stats.max(),
"avg", stats.avg(),
"sum", stats.sum()
));
}
return result;
}
// Auto-complete/suggest search
public List<String> suggestProductNames(String prefix) throws IOException {
SearchResponse<Product> response = client.search(s -> s
.index(PRODUCT_INDEX)
.suggest(su -> su
.suggesters("product_suggest", sug -> sug
.prefix(prefix)
.completion(c -> c
.field("name.suggest")
.skipDuplicates(true)
.size(5)
)
)
),
Product.class
);
return response.suggest().get("product_suggest").stream()
.flatMap(suggestion -> suggestion.completion().options().stream())
.map(option -> option.text())
.collect(Collectors.toList());
}
// More Like This search
public SearchResponse<Product> findSimilarProducts(String productId) throws IOException {
SearchResponse<Product> response = client.search(s -> s
.index(PRODUCT_INDEX)
.query(q -> q
.moreLikeThis(m -> m
.like(l -> l
.document(d -> d
.index(PRODUCT_INDEX)
.id(productId)
)
)
.fields("name", "description", "category", "tags")
.minTermFreq(1)
.maxQueryTerms(12)
)
)
.size(5),
Product.class
);
List<Product> products = response.hits().hits().stream()
.map(hit -> hit.source())
.collect(Collectors.toList());
return new SearchResponse<>(
products,
response.hits().total().value(),
response.took(),
response.timedOut()
);
}
}
// Search request DTO
@Data
class ProductSearchRequest {
private String query;
private String category;
private Double minPrice;
private Double maxPrice;
private List<String> tags;
private Integer from = 0;
private Integer size = 10;
}

Index Management

Index Operations Service

@Service
@Slf4j
public class IndexManagementService {
private final ElasticsearchClient client;
public IndexManagementService(ElasticsearchClient client) {
this.client = client;
}
// Create index with mapping
public boolean createProductIndex() throws IOException {
CreateIndexResponse response = client.indices().create(c -> c
.index("products")
.mappings(m -> m
.properties("id", p -> p.keyword(k -> k))
.properties("name", p -> p
.text(t -> t
.analyzer("standard")
.fields("suggest", f -> f.completion(cp -> cp))
)
)
.properties("description", p -> p.text(t -> t.analyzer("standard")))
.properties("category", p -> p.keyword(k -> k))
.properties("price", p -> p.double_(d -> d))
.properties("tags", p -> p.keyword(k -> k))
.properties("stock", p -> p.integer(i -> i))
.properties("active", p -> p.boolean_(b -> b))
.properties("createdAt", p -> p.date(d -> d.format("strict_date_optional_time||epoch_millis")))
)
.settings(s -> s
.numberOfShards("2")
.numberOfReplicas("1")
)
);
log.info("Index created: {}", response.index());
return response.acknowledged();
}
// Check if index exists
public boolean indexExists(String indexName) throws IOException {
return client.indices().exists(e -> e.index(indexName)).value();
}
// Delete index
public boolean deleteIndex(String indexName) throws IOException {
DeleteIndexResponse response = client.indices().delete(d -> d.index(indexName));
return response.acknowledged();
}
// Get index mapping
public Map<String, Property> getIndexMapping(String indexName) throws IOException {
GetMappingResponse response = client.indices().getMapping(m -> m.index(indexName));
return response.get("products").mappings().properties();
}
// Update index settings
public boolean updateIndexSettings(String indexName) throws IOException {
UpdateIndicesSettingsResponse response = client.indices().putSettings(s -> s
.index(indexName)
.settings(settings -> settings
.numberOfReplicas("2")
.refreshInterval(i -> i.time("30s"))
)
);
return response.acknowledged();
}
// Create alias
public boolean createAlias(String indexName, String aliasName) throws IOException {
UpdateAliasesResponse response = client.indices().updateAliases(u -> u
.actions(a -> a
.add(add -> add
.index(indexName)
.alias(aliasName)
)
)
);
return response.acknowledged();
}
// Refresh index
public boolean refreshIndex(String indexName) throws IOException {
RefreshResponse response = client.indices().refresh(r -> r.index(indexName));
return response.shards().total().intValue() > 0;
}
}

Advanced Features

Bulk Operations Service

@Service
@Slf4j
public class BulkOperationsService {
private final ElasticsearchClient client;
public BulkOperationsService(ElasticsearchClient client) {
this.client = client;
}
// Bulk index with error handling
public BulkResponse bulkIndexProducts(List<Product> products) throws IOException {
BulkRequest.Builder br = new BulkRequest.Builder();
for (Product product : products) {
br.operations(op -> op
.index(idx -> idx
.index("products")
.id(product.getId())
.document(product)
)
);
}
BulkResponse response = client.bulk(br.build());
// Check for errors
if (response.errors()) {
log.error("Bulk operation had errors:");
for (BulkResponseItem item : response.items()) {
if (item.error() != null) {
log.error("Error for item {}: {}", item.id(), item.error().reason());
}
}
}
return response;
}
// Bulk update
public BulkResponse bulkUpdateProducts(Map<String, Map<String, Object>> updates) throws IOException {
BulkRequest.Builder br = new BulkRequest.Builder();
for (Map.Entry<String, Map<String, Object>> entry : updates.entrySet()) {
br.operations(op -> op
.update(u -> u
.index("products")
.id(entry.getKey())
.action(a -> a
.doc(entry.getValue())
)
)
);
}
return client.bulk(br.build());
}
// Bulk delete
public BulkResponse bulkDeleteProducts(List<String> productIds) throws IOException {
BulkRequest.Builder br = new BulkRequest.Builder();
for (String productId : productIds) {
br.operations(op -> op
.delete(d -> d
.index("products")
.id(productId)
)
);
}
return client.bulk(br.build());
}
}

Async Operations Service

@Service
@Slf4j
public class AsyncElasticsearchService {
private final ElasticsearchClient client;
private final ExecutorService executorService;
public AsyncElasticsearchService(ElasticsearchClient client) {
this.client = client;
this.executorService = Executors.newFixedThreadPool(10);
}
// Async search
public CompletableFuture<SearchResponse<Product>> searchAsync(String query) {
return CompletableFuture.supplyAsync(() -> {
try {
return client.search(s -> s
.index("products")
.query(q -> q
.match(m -> m
.field("name")
.query(query)
)
),
Product.class
);
} catch (IOException e) {
throw new CompletionException(e);
}
}, executorService);
}
// Async bulk operations
public CompletableFuture<BulkResponse> bulkIndexAsync(List<Product> products) {
return CompletableFuture.supplyAsync(() -> {
try {
BulkRequest.Builder br = new BulkRequest.Builder();
for (Product product : products) {
br.operations(op -> op
.index(idx -> idx
.index("products")
.id(product.getId())
.document(product)
)
);
}
return client.bulk(br.build());
} catch (IOException e) {
throw new CompletionException(e);
}
}, executorService);
}
// Multiple async searches
public CompletableFuture<List<SearchResponse<Product>>> multiSearchAsync(List<String> queries) {
List<CompletableFuture<SearchResponse<Product>>> futures = queries.stream()
.map(this::searchAsync)
.collect(Collectors.toList());
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
}
}

Error Handling and Retry Mechanism

@Service
@Slf4j
public class ResilientElasticsearchService {
private final ElasticsearchClient client;
private final RetryTemplate retryTemplate;
public ResilientElasticsearchService(ElasticsearchClient client) {
this.client = client;
this.retryTemplate = createRetryTemplate();
}
private RetryTemplate createRetryTemplate() {
RetryTemplate template = new RetryTemplate();
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(3);
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(1000L); // 1 second
template.setRetryPolicy(retryPolicy);
template.setBackOffPolicy(backOffPolicy);
return template;
}
public <T> T executeWithRetry(ElasticsearchOperation<T> operation) {
return retryTemplate.execute(context -> {
try {
return operation.execute();
} catch (ElasticsearchException e) {
log.warn("Elasticsearch operation failed, retry attempt: {}", context.getRetryCount());
throw e;
} catch (IOException e) {
log.error("IO error during Elasticsearch operation", e);
throw new RuntimeException(e);
}
});
}
// Search with retry
public SearchResponse<Product> searchWithRetry(String query) {
return executeWithRetry(() -> 
client.search(s -> s
.index("products")
.query(q -> q.match(m -> m.field("name").query(query))),
Product.class
)
);
}
// Index with retry
public String indexWithRetry(Product product) {
return executeWithRetry(() -> 
client.index(i -> i
.index("products")
.id(product.getId())
.document(product)
).id()
);
}
@FunctionalInterface
public interface ElasticsearchOperation<T> {
T execute() throws IOException;
}
}

Spring Data Elasticsearch Integration

// Spring Data Repository
@Repository
public interface ProductRepository extends ElasticsearchRepository<Product, String> {
List<Product> findByName(String name);
List<Product> findByCategoryAndPriceBetween(String category, Double minPrice, Double maxPrice);
List<Product> findByActiveTrue();
@Query("{\"match\": {\"name\": \"?0\"}}")
List<Product> findByNameCustom(String name);
Page<Product> findByCategory(String category, Pageable pageable);
List<Product> findByTagsIn(List<String> tags);
Long countByCategory(String category);
}
// Custom repository implementation
public interface CustomProductRepository {
SearchResponse<Product> searchComplex(ProductSearchRequest request);
}
@Repository
@RequiredArgsConstructor
class CustomProductRepositoryImpl implements CustomProductRepository {
private final ElasticsearchClient client;
@Override
public SearchResponse<Product> searchComplex(ProductSearchRequest request) {
// Implement complex search logic using the native client
// This allows you to use features not available in Spring Data
return null; // Implementation similar to previous examples
}
}

Monitoring and Metrics

@Component
@Slf4j
public class ElasticsearchMetrics {
private final ElasticsearchClient client;
private final MeterRegistry meterRegistry;
private final Counter requestCounter;
private final Timer requestTimer;
public ElasticsearchMetrics(ElasticsearchClient client, MeterRegistry meterRegistry) {
this.client = client;
this.meterRegistry = meterRegistry;
this.requestCounter = Counter.builder("elasticsearch.requests")
.description("Number of Elasticsearch requests")
.register(meterRegistry);
this.requestTimer = Timer.builder("elasticsearch.request.duration")
.description("Duration of Elasticsearch requests")
.register(meterRegistry);
}
public <T> T executeWithMetrics(String operation, Supplier<T> operationSupplier) {
requestCounter.increment();
return requestTimer.record(() -> {
try {
return operationSupplier.get();
} catch (Exception e) {
meterRegistry.counter("elasticsearch.errors", "operation", operation).increment();
throw e;
}
});
}
// Get cluster health
public ClusterHealthResponse getClusterHealth() throws IOException {
return executeWithMetrics("cluster_health", 
() -> client.cluster().health());
}
// Get node stats
public NodesStatsResponse getNodeStats() throws IOException {
return executeWithMetrics("node_stats",
() -> client.nodes().stats(n -> n));
}
// Get index stats
public IndicesStatsResponse getIndexStats(String indexName) throws IOException {
return executeWithMetrics("index_stats",
() -> client.indices().stats(s -> s.index(indexName)));
}
}

Best Practices

1. Connection Management

  • Use connection pooling
  • Set appropriate timeouts
  • Monitor connection health

2. Error Handling

  • Implement retry mechanisms
  • Handle specific Elasticsearch exceptions
  • Log errors appropriately

3. Performance

  • Use bulk operations for multiple documents
  • Implement async operations for non-blocking calls
  • Monitor query performance

4. Security

  • Use HTTPS for connections
  • Implement authentication
  • Secure sensitive data

This comprehensive Elasticsearch REST Client implementation provides a solid foundation for building robust search functionality in Java applications, with features covering CRUD operations, advanced search, bulk processing, error handling, and monitoring.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper