gRPC with Istio in Java

Overview

gRPC is a high-performance RPC framework, and Istio is a service mesh that provides traffic management, security, and observability. Combining them enables robust microservices communication with advanced features like load balancing, circuit breaking, and mutual TLS.

Setup and Dependencies

1. Maven Configuration

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>grpc-istio-demo</artifactId>
<version>1.0.0</version>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<grpc.version>1.59.0</grpc.version>
<protobuf.version>3.25.1</protobuf.version>
<istio-api.version>1.19.0</istio-api.version>
</properties>
<dependencies>
<!-- gRPC -->
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-services</artifactId>
<version>${grpc.version}</version>
</dependency>
<!-- Protobuf -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
<!-- Istio API -->
<dependency>
<groupId>io.istio</groupId>
<artifactId>istio-api</artifactId>
<version>${istio-api.version}</version>
</dependency>
<!-- Observability -->
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
<version>1.34.1</version>
</dependency>
<!-- Utilities -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>32.1.3-jre</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.9</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.9</version>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.7.1</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

Protocol Buffer Definitions

1. Core Service Definitions

// protos/user_service.proto
syntax = "proto3";
package com.example.grpc;
import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";
option java_package = "com.example.grpc";
option java_multiple_files = true;
// User service definition
service UserService {
rpc GetUser(GetUserRequest) returns (User) {};
rpc CreateUser(CreateUserRequest) returns (User) {};
rpc UpdateUser(UpdateUserRequest) returns (User) {};
rpc DeleteUser(DeleteUserRequest) returns (google.protobuf.Empty) {};
rpc ListUsers(ListUsersRequest) returns (ListUsersResponse) {};
rpc StreamUsers(StreamUsersRequest) returns (stream User) {};
}
// Order service definition
service OrderService {
rpc CreateOrder(CreateOrderRequest) returns (Order) {};
rpc GetOrder(GetOrderRequest) returns (Order) {};
rpc StreamOrders(StreamOrdersRequest) returns (stream Order) {};
}
// Messages
message GetUserRequest {
string user_id = 1;
}
message CreateUserRequest {
string name = 1;
string email = 2;
string phone = 3;
map<string, string> metadata = 4;
}
message UpdateUserRequest {
string user_id = 1;
string name = 2;
string email = 3;
string phone = 4;
map<string, string> metadata = 5;
}
message DeleteUserRequest {
string user_id = 1;
}
message ListUsersRequest {
int32 page_size = 1;
string page_token = 2;
string filter = 3;
}
message ListUsersResponse {
repeated User users = 1;
string next_page_token = 2;
int32 total_size = 3;
}
message StreamUsersRequest {
int32 batch_size = 1;
string filter = 2;
}
message User {
string user_id = 1;
string name = 2;
string email = 3;
string phone = 4;
google.protobuf.Timestamp created_at = 5;
google.protobuf.Timestamp updated_at = 6;
map<string, string> metadata = 7;
UserStatus status = 8;
}
enum UserStatus {
UNKNOWN = 0;
ACTIVE = 1;
INACTIVE = 2;
SUSPENDED = 3;
}
// Order messages
message CreateOrderRequest {
string user_id = 1;
repeated OrderItem items = 2;
string currency = 3;
}
message GetOrderRequest {
string order_id = 1;
}
message StreamOrdersRequest {
string user_id = 1;
int32 batch_size = 2;
}
message Order {
string order_id = 1;
string user_id = 2;
repeated OrderItem items = 3;
double total_amount = 4;
string currency = 5;
OrderStatus status = 6;
google.protobuf.Timestamp created_at = 7;
google.protobuf.Timestamp updated_at = 8;
}
message OrderItem {
string product_id = 1;
string name = 2;
int32 quantity = 3;
double price = 4;
}
enum OrderStatus {
PENDING = 0;
CONFIRMED = 1;
SHIPPED = 2;
DELIVERED = 3;
CANCELLED = 4;
}

2. Health Check Protocol

// protos/health.proto
syntax = "proto3";
package grpc.health.v1;
option java_package = "com.example.grpc.health";
option java_multiple_files = true;
message HealthCheckRequest {
string service = 1;
}
message HealthCheckResponse {
enum ServingStatus {
UNKNOWN = 0;
SERVING = 1;
NOT_SERVING = 2;
SERVICE_UNKNOWN = 3;
}
ServingStatus status = 1;
}
service Health {
rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse);
}

gRPC Server Implementation

1. User Service Implementation

package com.example.grpc.server;
import com.example.grpc.*;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
public class UserServiceImpl extends UserServiceGrpc.UserServiceImplBase {
private static final Logger logger = LoggerFactory.getLogger(UserServiceImpl.class);
private final Map<String, User> users = new ConcurrentHashMap<>();
private final Map<String, String> userSessions = new ConcurrentHashMap<>();
@Override
public void getUser(GetUserRequest request, StreamObserver<User> responseObserver) {
String userId = request.getUserId();
logger.info("Getting user: {}", userId);
try {
User user = users.get(userId);
if (user == null) {
responseObserver.onError(Status.NOT_FOUND
.withDescription("User not found: " + userId)
.asRuntimeException());
return;
}
responseObserver.onNext(user);
responseObserver.onCompleted();
} catch (Exception e) {
logger.error("Error getting user: {}", userId, e);
responseObserver.onError(Status.INTERNAL
.withDescription("Internal server error")
.withCause(e)
.asRuntimeException());
}
}
@Override
public void createUser(CreateUserRequest request, StreamObserver<User> responseObserver) {
logger.info("Creating user: {}", request.getEmail());
try {
// Validate request
if (request.getName().isEmpty() || request.getEmail().isEmpty()) {
responseObserver.onError(Status.INVALID_ARGUMENT
.withDescription("Name and email are required")
.asRuntimeException());
return;
}
// Check if user already exists
boolean userExists = users.values().stream()
.anyMatch(user -> user.getEmail().equals(request.getEmail()));
if (userExists) {
responseObserver.onError(Status.ALREADY_EXISTS
.withDescription("User with email already exists: " + request.getEmail())
.asRuntimeException());
return;
}
// Create user
String userId = UUID.randomUUID().toString();
User user = User.newBuilder()
.setUserId(userId)
.setName(request.getName())
.setEmail(request.getEmail())
.setPhone(request.getPhone())
.putAllMetadata(request.getMetadataMap())
.setCreatedAt(com.google.protobuf.Timestamp.newBuilder()
.setSeconds(Instant.now().getEpochSecond())
.build())
.setUpdatedAt(com.google.protobuf.Timestamp.newBuilder()
.setSeconds(Instant.now().getEpochSecond())
.build())
.setStatus(UserStatus.ACTIVE)
.build();
users.put(userId, user);
logger.info("User created successfully: {}", userId);
responseObserver.onNext(user);
responseObserver.onCompleted();
} catch (Exception e) {
logger.error("Error creating user", e);
responseObserver.onError(Status.INTERNAL
.withDescription("Failed to create user")
.withCause(e)
.asRuntimeException());
}
}
@Override
public void updateUser(UpdateUserRequest request, StreamObserver<User> responseObserver) {
String userId = request.getUserId();
logger.info("Updating user: {}", userId);
try {
User existingUser = users.get(userId);
if (existingUser == null) {
responseObserver.onError(Status.NOT_FOUND
.withDescription("User not found: " + userId)
.asRuntimeException());
return;
}
// Build updated user
User.Builder userBuilder = User.newBuilder(existingUser)
.setName(request.getName())
.setEmail(request.getEmail())
.setPhone(request.getPhone())
.setUpdatedAt(com.google.protobuf.Timestamp.newBuilder()
.setSeconds(Instant.now().getEpochSecond())
.build());
// Update metadata
userBuilder.clearMetadata();
userBuilder.putAllMetadata(request.getMetadataMap());
User updatedUser = userBuilder.build();
users.put(userId, updatedUser);
logger.info("User updated successfully: {}", userId);
responseObserver.onNext(updatedUser);
responseObserver.onCompleted();
} catch (Exception e) {
logger.error("Error updating user: {}", userId, e);
responseObserver.onError(Status.INTERNAL
.withDescription("Failed to update user")
.withCause(e)
.asRuntimeException());
}
}
@Override
public void deleteUser(DeleteUserRequest request, StreamObserver<com.google.protobuf.Empty> responseObserver) {
String userId = request.getUserId();
logger.info("Deleting user: {}", userId);
try {
if (!users.containsKey(userId)) {
responseObserver.onError(Status.NOT_FOUND
.withDescription("User not found: " + userId)
.asRuntimeException());
return;
}
users.remove(userId);
userSessions.values().removeIf(sessionUserId -> sessionUserId.equals(userId));
logger.info("User deleted successfully: {}", userId);
responseObserver.onNext(com.google.protobuf.Empty.getDefaultInstance());
responseObserver.onCompleted();
} catch (Exception e) {
logger.error("Error deleting user: {}", userId, e);
responseObserver.onError(Status.INTERNAL
.withDescription("Failed to delete user")
.withCause(e)
.asRuntimeException());
}
}
@Override
public void listUsers(ListUsersRequest request, StreamObserver<ListUsersResponse> responseObserver) {
logger.info("Listing users, page size: {}", request.getPageSize());
try {
List<User> userList = new ArrayList<>(users.values());
// Apply filtering if provided
if (!request.getFilter().isEmpty()) {
userList.removeIf(user -> !matchesFilter(user, request.getFilter()));
}
// Pagination
int pageSize = request.getPageSize() > 0 ? request.getPageSize() : 10;
int fromIndex = 0;
if (!request.getPageToken().isEmpty()) {
try {
fromIndex = Integer.parseInt(request.getPageToken());
} catch (NumberFormatException e) {
responseObserver.onError(Status.INVALID_ARGUMENT
.withDescription("Invalid page token")
.asRuntimeException());
return;
}
}
int toIndex = Math.min(fromIndex + pageSize, userList.size());
List<User> pageUsers = userList.subList(fromIndex, toIndex);
String nextPageToken = toIndex < userList.size() ? String.valueOf(toIndex) : "";
ListUsersResponse response = ListUsersResponse.newBuilder()
.addAllUsers(pageUsers)
.setNextPageToken(nextPageToken)
.setTotalSize(userList.size())
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (Exception e) {
logger.error("Error listing users", e);
responseObserver.onError(Status.INTERNAL
.withDescription("Failed to list users")
.withCause(e)
.asRuntimeException());
}
}
@Override
public void streamUsers(StreamUsersRequest request, StreamObserver<User> responseObserver) {
logger.info("Streaming users, batch size: {}", request.getBatchSize());
try {
List<User> userList = new ArrayList<>(users.values());
// Apply filtering if provided
if (!request.getFilter().isEmpty()) {
userList.removeIf(user -> !matchesFilter(user, request.getFilter()));
}
int batchSize = request.getBatchSize() > 0 ? request.getBatchSize() : 10;
for (int i = 0; i < userList.size(); i++) {
if (i > 0 && i % batchSize == 0) {
// Small delay between batches to simulate processing
Thread.sleep(100);
}
responseObserver.onNext(userList.get(i));
// Check if client cancelled the stream
if (responseObserver instanceof io.grpc.stub.ServerCallStreamObserver) {
io.grpc.stub.ServerCallStreamObserver<User> serverObserver = 
(io.grpc.stub.ServerCallStreamObserver<User>) responseObserver;
if (serverObserver.isCancelled()) {
logger.info("Client cancelled user stream");
return;
}
}
}
responseObserver.onCompleted();
logger.info("User streaming completed");
} catch (Exception e) {
logger.error("Error streaming users", e);
responseObserver.onError(Status.INTERNAL
.withDescription("Failed to stream users")
.withCause(e)
.asRuntimeException());
}
}
private boolean matchesFilter(User user, String filter) {
// Simple filter implementation
return user.getName().toLowerCase().contains(filter.toLowerCase()) ||
user.getEmail().toLowerCase().contains(filter.toLowerCase());
}
}

2. Order Service Implementation

package com.example.grpc.server;
import com.example.grpc.*;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
public class OrderServiceImpl extends OrderServiceGrpc.OrderServiceImplBase {
private static final Logger logger = LoggerFactory.getLogger(OrderServiceImpl.class);
private static final AtomicLong orderIdCounter = new AtomicLong(1000);
private final Map<String, Order> orders = new ConcurrentHashMap<>();
private final Map<String, List<Order>> userOrders = new ConcurrentHashMap<>();
@Override
public void createOrder(CreateOrderRequest request, StreamObserver<Order> responseObserver) {
String userId = request.getUserId();
logger.info("Creating order for user: {}", userId);
try {
// Validate request
if (request.getItemsList().isEmpty()) {
responseObserver.onError(Status.INVALID_ARGUMENT
.withDescription("Order must contain at least one item")
.asRuntimeException());
return;
}
// Calculate total amount
double totalAmount = request.getItemsList().stream()
.mapToDouble(item -> item.getPrice() * item.getQuantity())
.sum();
// Create order
String orderId = "ORD-" + orderIdCounter.incrementAndGet();
Order order = Order.newBuilder()
.setOrderId(orderId)
.setUserId(userId)
.addAllItems(request.getItemsList())
.setTotalAmount(totalAmount)
.setCurrency(request.getCurrency())
.setStatus(OrderStatus.PENDING)
.setCreatedAt(com.google.protobuf.Timestamp.newBuilder()
.setSeconds(Instant.now().getEpochSecond())
.build())
.setUpdatedAt(com.google.protobuf.Timestamp.newBuilder()
.setSeconds(Instant.now().getEpochSecond())
.build())
.build();
// Store order
orders.put(orderId, order);
userOrders.computeIfAbsent(userId, k -> new ArrayList<>()).add(order);
logger.info("Order created successfully: {} for user: {}", orderId, userId);
responseObserver.onNext(order);
responseObserver.onCompleted();
} catch (Exception e) {
logger.error("Error creating order for user: {}", userId, e);
responseObserver.onError(Status.INTERNAL
.withDescription("Failed to create order")
.withCause(e)
.asRuntimeException());
}
}
@Override
public void getOrder(GetOrderRequest request, StreamObserver<Order> responseObserver) {
String orderId = request.getOrderId();
logger.info("Getting order: {}", orderId);
try {
Order order = orders.get(orderId);
if (order == null) {
responseObserver.onError(Status.NOT_FOUND
.withDescription("Order not found: " + orderId)
.asRuntimeException());
return;
}
responseObserver.onNext(order);
responseObserver.onCompleted();
} catch (Exception e) {
logger.error("Error getting order: {}", orderId, e);
responseObserver.onError(Status.INTERNAL
.withDescription("Failed to get order")
.withCause(e)
.asRuntimeException());
}
}
@Override
public void streamOrders(StreamOrdersRequest request, StreamObserver<Order> responseObserver) {
String userId = request.getUserId();
logger.info("Streaming orders for user: {}", userId);
try {
List<Order> userOrderList = userOrders.getOrDefault(userId, Collections.emptyList());
int batchSize = request.getBatchSize() > 0 ? request.getBatchSize() : 5;
for (int i = 0; i < userOrderList.size(); i++) {
if (i > 0 && i % batchSize == 0) {
// Small delay between batches
Thread.sleep(50);
}
responseObserver.onNext(userOrderList.get(i));
// Check if client cancelled the stream
if (responseObserver instanceof io.grpc.stub.ServerCallStreamObserver) {
io.grpc.stub.ServerCallStreamObserver<Order> serverObserver = 
(io.grpc.stub.ServerCallStreamObserver<Order>) responseObserver;
if (serverObserver.isCancelled()) {
logger.info("Client cancelled order stream for user: {}", userId);
return;
}
}
}
responseObserver.onCompleted();
logger.info("Order streaming completed for user: {}", userId);
} catch (Exception e) {
logger.error("Error streaming orders for user: {}", userId, e);
responseObserver.onError(Status.INTERNAL
.withDescription("Failed to stream orders")
.withCause(e)
.asRuntimeException());
}
}
}

gRPC Server with Istio Features

1. Advanced gRPC Server

package com.example.grpc.server;
import com.example.grpc.health.HealthCheckResponse;
import com.example.grpc.health.HealthGrpc;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerInterceptors;
import io.grpc.health.v1.HealthCheckRequest;
import io.grpc.services.HealthStatusManager;
import io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class GrpcServer {
private static final Logger logger = LoggerFactory.getLogger(GrpcServer.class);
private final int port;
private final Server server;
private final HealthStatusManager healthStatusManager;
public GrpcServer(int port) {
this.port = port;
this.healthStatusManager = new HealthStatusManager();
this.server = ServerBuilder.forPort(port)
.addService(ServerInterceptors.intercept(
new UserServiceImpl(), 
new IstioHeadersInterceptor(),
new MetricsInterceptor()
))
.addService(ServerInterceptors.intercept(
new OrderServiceImpl(),
new IstioHeadersInterceptor(),
new MetricsInterceptor()
))
.addService(healthStatusManager.getHealthService())
.addService(new CustomHealthService())
.intercept(new AuthenticationInterceptor())
.build();
}
public void start() throws IOException {
server.start();
logger.info("gRPC Server started, listening on port: {}", port);
// Set health status to serving
healthStatusManager.setStatus("", HealthCheckResponse.ServingStatus.SERVING);
healthStatusManager.setStatus("user.UserService", HealthCheckResponse.ServingStatus.SERVING);
healthStatusManager.setStatus("order.OrderService", HealthCheckResponse.ServingStatus.SERVING);
// Add shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
logger.info("Shutting down gRPC server...");
try {
GrpcServer.this.stop();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Server shutdown interrupted", e);
}
logger.info("gRPC server shut down successfully");
}));
}
public void stop() throws InterruptedException {
if (server != null) {
// Set health status to not serving
healthStatusManager.setStatus("", HealthCheckResponse.ServingStatus.NOT_SERVING);
healthStatusManager.clearStatus("user.UserService");
healthStatusManager.clearStatus("order.OrderService");
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
}
}
public void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}
public static void main(String[] args) throws IOException, InterruptedException {
int port = Integer.parseInt(System.getenv().getOrDefault("GRPC_PORT", "50051"));
GrpcServer server = new GrpcServer(port);
server.start();
server.blockUntilShutdown();
}
}
// Custom Health Service for Istio readiness/liveness probes
class CustomHealthService extends HealthGrpc.HealthImplBase {
private static final Logger logger = LoggerFactory.getLogger(CustomHealthService.class);
@Override
public void check(HealthCheckRequest request, StreamObserver<HealthCheckResponse> responseObserver) {
String service = request.getService();
HealthCheckResponse.ServingStatus status;
if (service.isEmpty() || "user.UserService".equals(service) || "order.OrderService".equals(service)) {
status = HealthCheckResponse.ServingStatus.SERVING;
} else {
status = HealthCheckResponse.ServingStatus.SERVICE_UNKNOWN;
}
HealthCheckResponse response = HealthCheckResponse.newBuilder()
.setStatus(status)
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
logger.debug("Health check for service: {} - status: {}", service, status);
}
@Override
public void watch(HealthCheckRequest request, StreamObserver<HealthCheckResponse> responseObserver) {
// Implement watch functionality for continuous health monitoring
String service = request.getService();
// Send initial status
HealthCheckResponse initialResponse = HealthCheckResponse.newBuilder()
.setStatus(HealthCheckResponse.ServingStatus.SERVING)
.build();
responseObserver.onNext(initialResponse);
// In production, you would watch for health status changes
// For simplicity, we'll just complete the stream
responseObserver.onCompleted();
}
}

2. Istio Headers Interceptor

package com.example.grpc.server;
import io.grpc.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class IstioHeadersInterceptor implements ServerInterceptor {
private static final Logger logger = LoggerFactory.getLogger(IstioHeadersInterceptor.class);
// Istio-specific headers
private static final String[] ISTIO_HEADERS = {
"x-request-id",
"x-b3-traceid",
"x-b3-spanid",
"x-b3-parentspanid",
"x-b3-sampled",
"x-b3-flags",
"x-ot-span-context",
"x-forwarded-client-cert",
"user-agent",
"x-envoy-external-address",
"x-forwarded-for",
"x-envoy-internal"
};
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
// Extract and log Istio headers
Map<String, String> istioHeaders = extractIstioHeaders(headers);
// Add to MDC for logging
MDC.put("x-request-id", istioHeaders.getOrDefault("x-request-id", "unknown"));
MDC.put("x-b3-traceid", istioHeaders.getOrDefault("x-b3-traceid", "unknown"));
logger.info("Received gRPC call with Istio headers: {}", istioHeaders);
// Create context with Istio headers
Context context = Context.current()
.withValue(IstioHeadersContextKey.INSTIO_HEADERS, istioHeaders);
return Contexts.interceptCall(context, call, headers, next);
}
private Map<String, String> extractIstioHeaders(Metadata headers) {
Map<String, String> istioHeaders = new ConcurrentHashMap<>();
for (String headerName : ISTIO_HEADERS) {
Metadata.Key<String> key = Metadata.Key.of(headerName, Metadata.ASCII_STRING_MARSHALLER);
String value = headers.get(key);
if (value != null) {
istioHeaders.put(headerName, value);
}
}
return istioHeaders;
}
// Context key for accessing Istio headers
public static class IstioHeadersContextKey {
public static final Context.Key<Map<String, String>> INSTIO_HEADERS = 
Context.keyWithDefault("istio-headers", Map.of());
}
}

3. Authentication Interceptor

package com.example.grpc.server;
import io.grpc.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Base64;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
public class AuthenticationInterceptor implements ServerInterceptor {
private static final Logger logger = LoggerFactory.getLogger(AuthenticationInterceptor.class);
private static final Metadata.Key<String> AUTHORIZATION_HEADER = 
Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER);
private static final Set<String> PUBLIC_METHODS = Set.of(
"grpc.health.v1.Health/Check",
"grpc.health.v1.Health/Watch"
);
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
String methodName = call.getMethodDescriptor().getFullMethodName();
// Skip authentication for public methods
if (PUBLIC_METHODS.contains(methodName)) {
return next.startCall(call, headers);
}
// Extract and validate JWT token
String authHeader = headers.get(AUTHORIZATION_HEADER);
if (authHeader == null || !authHeader.startsWith("Bearer ")) {
call.close(Status.UNAUTHENTICATED.withDescription("Missing or invalid authorization header"), 
new Metadata());
return new ServerCall.Listener<ReqT>() {};
}
String token = authHeader.substring(7);
try {
if (!validateToken(token)) {
call.close(Status.UNAUTHENTICATED.withDescription("Invalid token"), 
new Metadata());
return new ServerCall.Listener<ReqT>() {};
}
// Extract user info from token and add to context
String userId = extractUserIdFromToken(token);
Context context = Context.current().withValue(UserContextKey.USER_ID, userId);
logger.info("Authenticated user: {} for method: {}", userId, methodName);
return Contexts.interceptCall(context, call, headers, next);
} catch (Exception e) {
logger.error("Authentication error", e);
call.close(Status.UNAUTHENTICATED.withDescription("Authentication failed"), 
new Metadata());
return new ServerCall.Listener<ReqT>() {};
}
}
private boolean validateToken(String token) {
// In production, use a proper JWT validation library
// This is a simplified example
try {
String[] parts = token.split("\\.");
if (parts.length != 3) {
return false;
}
// Basic validation - check if token is properly formatted
Base64.getDecoder().decode(parts[0]);
Base64.getDecoder().decode(parts[1]);
return true;
} catch (Exception e) {
return false;
}
}
private String extractUserIdFromToken(String token) {
// In production, parse JWT and extract user ID from claims
// This is a simplified example
try {
String[] parts = token.split("\\.");
String payload = new String(Base64.getDecoder().decode(parts[1]));
// Parse JSON payload and extract user_id
// For now, return a dummy user ID
return "user-123";
} catch (Exception e) {
throw new RuntimeException("Failed to extract user ID from token", e);
}
}
// Context key for user ID
public static class UserContextKey {
public static final Context.Key<String> USER_ID = 
Context.keyWithDefault("user-id", "unknown");
}
}

4. Metrics Interceptor

package com.example.grpc.server;
import io.grpc.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
public class MetricsInterceptor implements ServerInterceptor {
private static final Logger logger = LoggerFactory.getLogger(MetricsInterceptor.class);
private final Map<String, AtomicLong> requestCounters = new ConcurrentHashMap<>();
private final Map<String, AtomicLong> errorCounters = new ConcurrentHashMap<>();
private final Map<String, AtomicLong> successCounters = new ConcurrentHashMap<>();
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
String methodName = call.getMethodDescriptor().getFullMethodName();
long startTime = System.currentTimeMillis();
// Increment request counter
requestCounters.computeIfAbsent(methodName, k -> new AtomicLong(0)).incrementAndGet();
// Create wrapper for response monitoring
ServerCall<ReqT, RespT> monitoringCall = new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendMessage(RespT message) {
super.sendMessage(message);
}
@Override
public void close(Status status, Metadata trailers) {
long duration = System.currentTimeMillis() - startTime;
if (status.isOk()) {
successCounters.computeIfAbsent(methodName, k -> new AtomicLong(0)).incrementAndGet();
logger.info("gRPC call completed successfully: {} - duration: {}ms", 
methodName, duration);
} else {
errorCounters.computeIfAbsent(methodName, k -> new AtomicLong(0)).incrementAndGet();
logger.warn("gRPC call failed: {} - status: {} - duration: {}ms", 
methodName, status.getCode(), duration);
}
// Log metrics (in production, send to monitoring system)
logMetrics(methodName, status, duration);
super.close(status, trailers);
}
};
return next.startCall(monitoringCall, headers);
}
private void logMetrics(String methodName, Status status, long duration) {
// In production, send metrics to Prometheus, Stackdriver, etc.
logger.debug("Metrics - Method: {}, Status: {}, Duration: {}ms, " +
"Total Requests: {}, Success: {}, Errors: {}",
methodName,
status.getCode(),
duration,
requestCounters.getOrDefault(methodName, new AtomicLong(0)).get(),
successCounters.getOrDefault(methodName, new AtomicLong(0)).get(),
errorCounters.getOrDefault(methodName, new AtomicLong(0)).get());
}
public Map<String, Long> getRequestCounts() {
Map<String, Long> counts = new ConcurrentHashMap<>();
requestCounters.forEach((method, counter) -> counts.put(method, counter.get()));
return counts;
}
public Map<String, Long> getErrorCounts() {
Map<String, Long> counts = new ConcurrentHashMap<>();
errorCounters.forEach((method, counter) -> counts.put(method, counter.get()));
return counts;
}
}

gRPC Client with Istio Support

1. Advanced gRPC Client

package com.example.grpc.client;
import com.example.grpc.*;
import io.grpc.*;
import io.grpc.stub.MetadataUtils;
import io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class GrpcClient {
private static final Logger logger = LoggerFactory.getLogger(GrpcClient.class);
private final ManagedChannel channel;
private final UserServiceGrpc.UserServiceBlockingStub userServiceBlockingStub;
private final UserServiceGrpc.UserServiceStub userServiceAsyncStub;
private final OrderServiceGrpc.OrderServiceBlockingStub orderServiceBlockingStub;
private final Metadata headers;
public GrpcClient(String host, int port, String authToken) {
this.channel = ManagedChannelBuilder.forAddress(host, port)
.usePlaintext() // In production, use TLS
.intercept(new ClientHeadersInterceptor(authToken))
.intercept(new ClientMetricsInterceptor())
.enableRetry() // Enable retry with Istio
.maxRetryAttempts(3)
.build();
this.headers = new Metadata();
if (authToken != null && !authToken.isEmpty()) {
headers.put(Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER), 
"Bearer " + authToken);
}
// Add Istio headers for distributed tracing
headers.put(Metadata.Key.of("x-request-id", Metadata.ASCII_STRING_MARSHALLER), 
generateRequestId());
headers.put(Metadata.Key.of("user-agent", Metadata.ASCII_STRING_MARSHALLER), 
"java-grpc-client");
this.userServiceBlockingStub = UserServiceGrpc.newBlockingStub(channel)
.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(headers));
this.userServiceAsyncStub = UserServiceGrpc.newStub(channel)
.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(headers));
this.orderServiceBlockingStub = OrderServiceGrpc.newBlockingStub(channel)
.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(headers));
}
public void shutdown() throws InterruptedException {
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}
// User Service Methods
public User getUser(String userId) {
logger.info("Getting user: {}", userId);
GetUserRequest request = GetUserRequest.newBuilder()
.setUserId(userId)
.build();
try {
User user = userServiceBlockingStub.getUser(request);
logger.info("Retrieved user: {} - {}", user.getUserId(), user.getName());
return user;
} catch (StatusRuntimeException e) {
logger.error("RPC failed: {}", e.getStatus());
throw e;
}
}
public User createUser(String name, String email, String phone) {
logger.info("Creating user: {} - {}", name, email);
CreateUserRequest request = CreateUserRequest.newBuilder()
.setName(name)
.setEmail(email)
.setPhone(phone)
.build();
try {
User user = userServiceBlockingStub.createUser(request);
logger.info("Created user: {} - {}", user.getUserId(), user.getName());
return user;
} catch (StatusRuntimeException e) {
logger.error("RPC failed: {}", e.getStatus());
throw e;
}
}
public void listUsersAsync(int pageSize) {
logger.info("Listing users asynchronously, page size: {}", pageSize);
ListUsersRequest request = ListUsersRequest.newBuilder()
.setPageSize(pageSize)
.build();
CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<ListUsersResponse> responseObserver = new StreamObserver<ListUsersResponse>() {
@Override
public void onNext(ListUsersResponse response) {
logger.info("Received {} users", response.getUsersCount());
for (User user : response.getUsersList()) {
logger.info("User: {} - {} - {}", user.getUserId(), user.getName(), user.getEmail());
}
if (!response.getNextPageToken().isEmpty()) {
logger.info("More users available, next page token: {}", response.getNextPageToken());
}
}
@Override
public void onError(Throwable t) {
logger.error("List users failed", t);
finishLatch.countDown();
}
@Override
public void onCompleted() {
logger.info("List users completed");
finishLatch.countDown();
}
};
userServiceAsyncStub.listUsers(request, responseObserver);
try {
if (!finishLatch.await(1, TimeUnit.MINUTES)) {
logger.warn("List users request timed out");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("List users request interrupted");
}
}
public void streamUsers() {
logger.info("Starting user streaming");
StreamUsersRequest request = StreamUsersRequest.newBuilder()
.setBatchSize(5)
.build();
CountDownLatch finishLatch = new CountDownLatch(1);
AtomicInteger userCount = new AtomicInteger(0);
StreamObserver<User> responseObserver = new StreamObserver<User>() {
@Override
public void onNext(User user) {
int count = userCount.incrementAndGet();
logger.info("Received user {}: {} - {}", count, user.getUserId(), user.getName());
}
@Override
public void onError(Throwable t) {
logger.error("User streaming failed", t);
finishLatch.countDown();
}
@Override
public void onCompleted() {
logger.info("User streaming completed. Total users received: {}", userCount.get());
finishLatch.countDown();
}
};
userServiceAsyncStub.streamUsers(request, responseObserver);
try {
if (!finishLatch.await(2, TimeUnit.MINUTES)) {
logger.warn("User streaming timed out");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("User streaming interrupted");
}
}
// Order Service Methods
public Order createOrder(String userId, List<OrderItem> items, String currency) {
logger.info("Creating order for user: {}", userId);
CreateOrderRequest request = CreateOrderRequest.newBuilder()
.setUserId(userId)
.addAllItems(items)
.setCurrency(currency)
.build();
try {
Order order = orderServiceBlockingStub.createOrder(request);
logger.info("Created order: {} - Total: {} {}", 
order.getOrderId(), order.getTotalAmount(), order.getCurrency());
return order;
} catch (StatusRuntimeException e) {
logger.error("RPC failed: {}", e.getStatus());
throw e;
}
}
public void streamOrders(String userId) {
logger.info("Streaming orders for user: {}", userId);
StreamOrdersRequest request = StreamOrdersRequest.newBuilder()
.setUserId(userId)
.setBatchSize(3)
.build();
CountDownLatch finishLatch = new CountDownLatch(1);
AtomicInteger orderCount = new AtomicInteger(0);
Iterator<Order> orderIterator = orderServiceBlockingStub.streamOrders(request);
try {
while (orderIterator.hasNext()) {
Order order = orderIterator.next();
int count = orderCount.incrementAndGet();
logger.info("Received order {}: {} - {} - Status: {}", 
count, order.getOrderId(), order.getTotalAmount(), order.getStatus());
}
logger.info("Order streaming completed. Total orders received: {}", orderCount.get());
} catch (StatusRuntimeException e) {
logger.error("Order streaming failed", e);
} finally {
finishLatch.countDown();
}
try {
finishLatch.await(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private String generateRequestId() {
return "req-" + System.currentTimeMillis() + "-" + Math.abs((int) (Math.random() * 10000));
}
public static void main(String[] args) throws InterruptedException {
String host = System.getenv().getOrDefault("GRPC_SERVER_HOST", "localhost");
int port = Integer.parseInt(System.getenv().getOrDefault("GRPC_SERVER_PORT", "50051"));
String authToken = System.getenv().getOrDefault("AUTH_TOKEN", "dummy-token");
GrpcClient client = new GrpcClient(host, port, authToken);
try {
// Test user operations
User user = client.createUser("John Doe", "[email protected]", "+1234567890");
client.getUser(user.getUserId());
client.listUsersAsync(10);
client.streamUsers();
// Test order operations
OrderItem item1 = OrderItem.newBuilder()
.setProductId("prod-1")
.setName("Laptop")
.setQuantity(1)
.setPrice(999.99)
.build();
OrderItem item2 = OrderItem.newBuilder()
.setProductId("prod-2")
.setName("Mouse")
.setQuantity(2)
.setPrice(29.99)
.build();
Order order = client.createOrder(user.getUserId(), List.of(item1, item2), "USD");
client.streamOrders(user.getUserId());
} finally {
client.shutdown();
}
}
}
// Client Interceptors
class ClientHeadersInterceptor implements ClientInterceptor {
private final String authToken;
public ClientHeadersInterceptor(String authToken) {
this.authToken = authToken;
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions,
Channel next) {
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
// Add authentication header
if (authToken != null && !authToken.isEmpty()) {
headers.put(Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER), 
"Bearer " + authToken);
}
// Add Istio headers for distributed tracing
headers.put(Metadata.Key.of("x-request-id", Metadata.ASCII_STRING_MARSHALLER), 
generateRequestId());
headers.put(Metadata.Key.of("x-b3-traceid", Metadata.ASCII_STRING_MARSHALLER), 
generateTraceId());
headers.put(Metadata.Key.of("x-b3-spanid", Metadata.ASCII_STRING_MARSHALLER), 
generateSpanId());
headers.put(Metadata.Key.of("x-b3-sampled", Metadata.ASCII_STRING_MARSHALLER), 
"1");
super.start(responseListener, headers);
}
};
}
private String generateRequestId() {
return "client-req-" + System.currentTimeMillis();
}
private String generateTraceId() {
return Long.toHexString(System.currentTimeMillis());
}
private String generateSpanId() {
return Long.toHexString((long) (Math.random() * Long.MAX_VALUE));
}
}
class ClientMetricsInterceptor implements ClientInterceptor {
private static final Logger logger = LoggerFactory.getLogger(ClientMetricsInterceptor.class);
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions,
Channel next) {
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
private long startTime;
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
startTime = System.currentTimeMillis();
super.start(new ForwardingClientCallListener<RespT>() {
@Override
public void onClose(Status status, Metadata trailers) {
long duration = System.currentTimeMillis() - startTime;
if (status.isOk()) {
logger.info("gRPC client call succeeded: {} - duration: {}ms", 
method.getFullMethodName(), duration);
} else {
logger.warn("gRPC client call failed: {} - status: {} - duration: {}ms", 
method.getFullMethodName(), status.getCode(), duration);
}
super.onClose(status, trailers);
}
@Override
protected Listener<RespT> delegate() {
return responseListener;
}
}, headers);
}
};
}
}

Kubernetes and Istio Configuration

1. Dockerfile for gRPC Service

FROM eclipse-temurin:11-jre as builder
WORKDIR /app
COPY target/grpc-istio-demo-1.0.0.jar app.jar
RUN java -Djarmode=layertools -jar app.jar extract
FROM eclipse-temurin:11-jre
RUN useradd -m -u 1000 spring
USER 1000
WORKDIR /app
COPY --from=builder /app/dependencies/ ./
COPY --from=builder /app/spring-boot-loader/ ./
COPY --from=builder /app/snapshot-dependencies/ ./
COPY --from=builder /app/application/ ./
EXPOSE 50051
EXPOSE 8080
ENTRYPOINT ["java", "org.springframework.boot.loader.JarLauncher"]

2. Kubernetes Deployment

# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: grpc-user-service
labels:
app: grpc-user-service
version: v1
spec:
replicas: 3
selector:
matchLabels:
app: grpc-user-service
template:
metadata:
labels:
app: grpc-user-service
version: v1
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "8080"
prometheus.io/path: "/actuator/prometheus"
spec:
containers:
- name: grpc-server
image: your-registry/grpc-user-service:latest
ports:
- containerPort: 50051
name: grpc
- containerPort: 8080
name: http-metrics
env:
- name: GRPC_PORT
value: "50051"
- name: JAVA_OPTS
value: "-Xmx512m -Xms256m"
resources:
requests:
memory: "512Mi"
cpu: "250m"
limits:
memory: "1Gi"
cpu: "500m"
livenessProbe:
httpGet:
path: /actuator/health
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /actuator/health
port: 8080
initialDelaySeconds: 5
periodSeconds: 5
startupProbe:
httpGet:
path: /actuator/health
port: 8080
failureThreshold: 30
periodSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
name: grpc-user-service
labels:
app: grpc-user-service
spec:
selector:
app: grpc-user-service
ports:
- name: grpc
port: 50051
targetPort: 50051
- name: http-metrics
port: 8080
targetPort: 8080

3. Istio Configuration

# k8s/istio/
# VirtualService for gRPC
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
name: grpc-user-service
spec:
hosts:
- grpc-user-service
- grpc-user-service.example.com
http:
- match:
- port: 50051
route:
- destination:
host: grpc-user-service
port:
number: 50051
weight: 100
timeout: 30s
retries:
attempts: 3
perTryTimeout: 2s
corsPolicy:
allowOrigins:
- exact: "*"
allowMethods:
- POST
- GET
- OPTIONS
allowHeaders:
- content-type
- authorization
- x-grpc-web
- x-user-agent
---
# DestinationRule for load balancing
apiVersion: networking.istio.io/v1alpha3
kind: DestinationRule
metadata:
name: grpc-user-service
spec:
host: grpc-user-service
trafficPolicy:
loadBalancer:
simple: LEAST_CONN
connectionPool:
http:
http2MaxRequests: 100
maxRequestsPerConnection: 10
tcp:
maxConnections: 100
outlierDetection:
consecutiveErrors: 5
interval: 10s
baseEjectionTime: 30s
maxEjectionPercent: 50
---
# ServiceEntry for external gRPC services
apiVersion: networking.istio.io/v1alpha3
kind: ServiceEntry
metadata:
name: external-grpc-service
spec:
hosts:
- api.external-service.com
ports:
- number: 443
name: https
protocol: HTTPS
- number: 50051
name: grpc
protocol: GRPC
resolution: DNS
location: MESH_EXTERNAL
---
# Gateway for external gRPC access
apiVersion: networking.istio.io/v1alpha3
kind: Gateway
metadata:
name: grpc-gateway
spec:
selector:
istio: ingressgateway
servers:
- port:
number: 50051
name: grpc
protocol: GRPC
hosts:
- "grpc.example.com"
tls:
httpsRedirect: false
mode: SIMPLE
credentialName: grpc-cert
---
# PeerAuthentication for mTLS
apiVersion: security.istio.io/v1beta1
kind: PeerAuthentication
metadata:
name: default
spec:
mtls:
mode: STRICT

4. Istio Telemetry Configuration

# k8s/istio/telemetry.yaml
apiVersion: telemetry.istio.io/v1alpha1
kind: Telemetry
metadata:
name: grpc-telemetry
spec:
selector:
matchLabels:
app: grpc-user-service
tracing:
- providers:
- name: zipkin
randomSamplingPercentage: 100.0
customTags:
"grpc.method":
literal:
value: "unknown"
"grpc.service":
literal:
value: "unknown"
"grpc.status_code":
literal:
value: "unknown"
---
# Access logging
apiVersion: telemetry.istio.io/v1alpha1
kind: Telemetry
metadata:
name: grpc-access-logging
spec:
selector:
matchLabels:
app: grpc-user-service
accessLogging:
- providers:
- name: envoy
disabled: false

Testing and Monitoring

1. Health Check Endpoint

package com.example.grpc.health;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.stereotype.Component;
@Component
public class GrpcHealthIndicator implements HealthIndicator {
@Override
public Health health() {
try {
// Check if gRPC server is healthy
// You can add more sophisticated checks here
return Health.up()
.withDetail("grpc_port", 50051)
.withDetail("status", "serving")
.build();
} catch (Exception e) {
return Health.down(e)
.withDetail("grpc_port", 50051)
.withDetail("status", "not_serving")
.build();
}
}
}

2. Integration Tests

package com.example.grpc.test;
import com.example.grpc.GrpcClient;
import com.example.grpc.User;
import io.grpc.StatusRuntimeException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.jupiter.api.Assertions.*;
public class GrpcClientTest {
private static final Logger logger = LoggerFactory.getLogger(GrpcClientTest.class);
private GrpcClient client;
@BeforeEach
public void setUp() {
String host = System.getenv().getOrDefault("GRPC_SERVER_HOST", "localhost");
int port = Integer.parseInt(System.getenv().getOrDefault("GRPC_SERVER_PORT", "50051"));
client = new GrpcClient(host, port, "test-token");
}
@AfterEach
public void tearDown() throws InterruptedException {
if (client != null) {
client.shutdown();
}
}
@Test
public void testCreateAndGetUser() {
// Create user
User createdUser = client.createUser("Test User", "[email protected]", "+1234567890");
assertNotNull(createdUser);
assertNotNull(createdUser.getUserId());
assertEquals("Test User", createdUser.getName());
assertEquals("[email protected]", createdUser.getEmail());
// Get user
User retrievedUser = client.getUser(createdUser.getUserId());
assertNotNull(retrievedUser);
assertEquals(createdUser.getUserId(), retrievedUser.getUserId());
assertEquals(createdUser.getName(), retrievedUser.getName());
}
@Test
public void testGetNonExistentUser() {
assertThrows(StatusRuntimeException.class, () -> {
client.getUser("non-existent-user-id");
});
}
@Test
public void testCreateUserWithInvalidData() {
assertThrows(StatusRuntimeException.class, () -> {
client.createUser("", "", "");
});
}
}

This comprehensive guide covers gRPC with Istio in Java, including protocol buffer definitions, server and client implementations, Istio-specific interceptors, Kubernetes deployment configurations, and testing strategies. The implementation demonstrates how to leverage Istio's features for traffic management, security, and observability in gRPC-based microservices.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper