Overview
gRPC is a high-performance RPC framework, and Istio is a service mesh that provides traffic management, security, and observability. Combining them enables robust microservices communication with advanced features like load balancing, circuit breaking, and mutual TLS.
Setup and Dependencies
1. Maven Configuration
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>grpc-istio-demo</artifactId>
<version>1.0.0</version>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<grpc.version>1.59.0</grpc.version>
<protobuf.version>3.25.1</protobuf.version>
<istio-api.version>1.19.0</istio-api.version>
</properties>
<dependencies>
<!-- gRPC -->
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-services</artifactId>
<version>${grpc.version}</version>
</dependency>
<!-- Protobuf -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
<!-- Istio API -->
<dependency>
<groupId>io.istio</groupId>
<artifactId>istio-api</artifactId>
<version>${istio-api.version}</version>
</dependency>
<!-- Observability -->
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
<version>1.34.1</version>
</dependency>
<!-- Utilities -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>32.1.3-jre</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.9</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.9</version>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.7.1</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Protocol Buffer Definitions
1. Core Service Definitions
// protos/user_service.proto
syntax = "proto3";
package com.example.grpc;
import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";
option java_package = "com.example.grpc";
option java_multiple_files = true;
// User service definition
service UserService {
rpc GetUser(GetUserRequest) returns (User) {};
rpc CreateUser(CreateUserRequest) returns (User) {};
rpc UpdateUser(UpdateUserRequest) returns (User) {};
rpc DeleteUser(DeleteUserRequest) returns (google.protobuf.Empty) {};
rpc ListUsers(ListUsersRequest) returns (ListUsersResponse) {};
rpc StreamUsers(StreamUsersRequest) returns (stream User) {};
}
// Order service definition
service OrderService {
rpc CreateOrder(CreateOrderRequest) returns (Order) {};
rpc GetOrder(GetOrderRequest) returns (Order) {};
rpc StreamOrders(StreamOrdersRequest) returns (stream Order) {};
}
// Messages
message GetUserRequest {
string user_id = 1;
}
message CreateUserRequest {
string name = 1;
string email = 2;
string phone = 3;
map<string, string> metadata = 4;
}
message UpdateUserRequest {
string user_id = 1;
string name = 2;
string email = 3;
string phone = 4;
map<string, string> metadata = 5;
}
message DeleteUserRequest {
string user_id = 1;
}
message ListUsersRequest {
int32 page_size = 1;
string page_token = 2;
string filter = 3;
}
message ListUsersResponse {
repeated User users = 1;
string next_page_token = 2;
int32 total_size = 3;
}
message StreamUsersRequest {
int32 batch_size = 1;
string filter = 2;
}
message User {
string user_id = 1;
string name = 2;
string email = 3;
string phone = 4;
google.protobuf.Timestamp created_at = 5;
google.protobuf.Timestamp updated_at = 6;
map<string, string> metadata = 7;
UserStatus status = 8;
}
enum UserStatus {
UNKNOWN = 0;
ACTIVE = 1;
INACTIVE = 2;
SUSPENDED = 3;
}
// Order messages
message CreateOrderRequest {
string user_id = 1;
repeated OrderItem items = 2;
string currency = 3;
}
message GetOrderRequest {
string order_id = 1;
}
message StreamOrdersRequest {
string user_id = 1;
int32 batch_size = 2;
}
message Order {
string order_id = 1;
string user_id = 2;
repeated OrderItem items = 3;
double total_amount = 4;
string currency = 5;
OrderStatus status = 6;
google.protobuf.Timestamp created_at = 7;
google.protobuf.Timestamp updated_at = 8;
}
message OrderItem {
string product_id = 1;
string name = 2;
int32 quantity = 3;
double price = 4;
}
enum OrderStatus {
PENDING = 0;
CONFIRMED = 1;
SHIPPED = 2;
DELIVERED = 3;
CANCELLED = 4;
}
2. Health Check Protocol
// protos/health.proto
syntax = "proto3";
package grpc.health.v1;
option java_package = "com.example.grpc.health";
option java_multiple_files = true;
message HealthCheckRequest {
string service = 1;
}
message HealthCheckResponse {
enum ServingStatus {
UNKNOWN = 0;
SERVING = 1;
NOT_SERVING = 2;
SERVICE_UNKNOWN = 3;
}
ServingStatus status = 1;
}
service Health {
rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse);
}
gRPC Server Implementation
1. User Service Implementation
package com.example.grpc.server;
import com.example.grpc.*;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
public class UserServiceImpl extends UserServiceGrpc.UserServiceImplBase {
private static final Logger logger = LoggerFactory.getLogger(UserServiceImpl.class);
private final Map<String, User> users = new ConcurrentHashMap<>();
private final Map<String, String> userSessions = new ConcurrentHashMap<>();
@Override
public void getUser(GetUserRequest request, StreamObserver<User> responseObserver) {
String userId = request.getUserId();
logger.info("Getting user: {}", userId);
try {
User user = users.get(userId);
if (user == null) {
responseObserver.onError(Status.NOT_FOUND
.withDescription("User not found: " + userId)
.asRuntimeException());
return;
}
responseObserver.onNext(user);
responseObserver.onCompleted();
} catch (Exception e) {
logger.error("Error getting user: {}", userId, e);
responseObserver.onError(Status.INTERNAL
.withDescription("Internal server error")
.withCause(e)
.asRuntimeException());
}
}
@Override
public void createUser(CreateUserRequest request, StreamObserver<User> responseObserver) {
logger.info("Creating user: {}", request.getEmail());
try {
// Validate request
if (request.getName().isEmpty() || request.getEmail().isEmpty()) {
responseObserver.onError(Status.INVALID_ARGUMENT
.withDescription("Name and email are required")
.asRuntimeException());
return;
}
// Check if user already exists
boolean userExists = users.values().stream()
.anyMatch(user -> user.getEmail().equals(request.getEmail()));
if (userExists) {
responseObserver.onError(Status.ALREADY_EXISTS
.withDescription("User with email already exists: " + request.getEmail())
.asRuntimeException());
return;
}
// Create user
String userId = UUID.randomUUID().toString();
User user = User.newBuilder()
.setUserId(userId)
.setName(request.getName())
.setEmail(request.getEmail())
.setPhone(request.getPhone())
.putAllMetadata(request.getMetadataMap())
.setCreatedAt(com.google.protobuf.Timestamp.newBuilder()
.setSeconds(Instant.now().getEpochSecond())
.build())
.setUpdatedAt(com.google.protobuf.Timestamp.newBuilder()
.setSeconds(Instant.now().getEpochSecond())
.build())
.setStatus(UserStatus.ACTIVE)
.build();
users.put(userId, user);
logger.info("User created successfully: {}", userId);
responseObserver.onNext(user);
responseObserver.onCompleted();
} catch (Exception e) {
logger.error("Error creating user", e);
responseObserver.onError(Status.INTERNAL
.withDescription("Failed to create user")
.withCause(e)
.asRuntimeException());
}
}
@Override
public void updateUser(UpdateUserRequest request, StreamObserver<User> responseObserver) {
String userId = request.getUserId();
logger.info("Updating user: {}", userId);
try {
User existingUser = users.get(userId);
if (existingUser == null) {
responseObserver.onError(Status.NOT_FOUND
.withDescription("User not found: " + userId)
.asRuntimeException());
return;
}
// Build updated user
User.Builder userBuilder = User.newBuilder(existingUser)
.setName(request.getName())
.setEmail(request.getEmail())
.setPhone(request.getPhone())
.setUpdatedAt(com.google.protobuf.Timestamp.newBuilder()
.setSeconds(Instant.now().getEpochSecond())
.build());
// Update metadata
userBuilder.clearMetadata();
userBuilder.putAllMetadata(request.getMetadataMap());
User updatedUser = userBuilder.build();
users.put(userId, updatedUser);
logger.info("User updated successfully: {}", userId);
responseObserver.onNext(updatedUser);
responseObserver.onCompleted();
} catch (Exception e) {
logger.error("Error updating user: {}", userId, e);
responseObserver.onError(Status.INTERNAL
.withDescription("Failed to update user")
.withCause(e)
.asRuntimeException());
}
}
@Override
public void deleteUser(DeleteUserRequest request, StreamObserver<com.google.protobuf.Empty> responseObserver) {
String userId = request.getUserId();
logger.info("Deleting user: {}", userId);
try {
if (!users.containsKey(userId)) {
responseObserver.onError(Status.NOT_FOUND
.withDescription("User not found: " + userId)
.asRuntimeException());
return;
}
users.remove(userId);
userSessions.values().removeIf(sessionUserId -> sessionUserId.equals(userId));
logger.info("User deleted successfully: {}", userId);
responseObserver.onNext(com.google.protobuf.Empty.getDefaultInstance());
responseObserver.onCompleted();
} catch (Exception e) {
logger.error("Error deleting user: {}", userId, e);
responseObserver.onError(Status.INTERNAL
.withDescription("Failed to delete user")
.withCause(e)
.asRuntimeException());
}
}
@Override
public void listUsers(ListUsersRequest request, StreamObserver<ListUsersResponse> responseObserver) {
logger.info("Listing users, page size: {}", request.getPageSize());
try {
List<User> userList = new ArrayList<>(users.values());
// Apply filtering if provided
if (!request.getFilter().isEmpty()) {
userList.removeIf(user -> !matchesFilter(user, request.getFilter()));
}
// Pagination
int pageSize = request.getPageSize() > 0 ? request.getPageSize() : 10;
int fromIndex = 0;
if (!request.getPageToken().isEmpty()) {
try {
fromIndex = Integer.parseInt(request.getPageToken());
} catch (NumberFormatException e) {
responseObserver.onError(Status.INVALID_ARGUMENT
.withDescription("Invalid page token")
.asRuntimeException());
return;
}
}
int toIndex = Math.min(fromIndex + pageSize, userList.size());
List<User> pageUsers = userList.subList(fromIndex, toIndex);
String nextPageToken = toIndex < userList.size() ? String.valueOf(toIndex) : "";
ListUsersResponse response = ListUsersResponse.newBuilder()
.addAllUsers(pageUsers)
.setNextPageToken(nextPageToken)
.setTotalSize(userList.size())
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (Exception e) {
logger.error("Error listing users", e);
responseObserver.onError(Status.INTERNAL
.withDescription("Failed to list users")
.withCause(e)
.asRuntimeException());
}
}
@Override
public void streamUsers(StreamUsersRequest request, StreamObserver<User> responseObserver) {
logger.info("Streaming users, batch size: {}", request.getBatchSize());
try {
List<User> userList = new ArrayList<>(users.values());
// Apply filtering if provided
if (!request.getFilter().isEmpty()) {
userList.removeIf(user -> !matchesFilter(user, request.getFilter()));
}
int batchSize = request.getBatchSize() > 0 ? request.getBatchSize() : 10;
for (int i = 0; i < userList.size(); i++) {
if (i > 0 && i % batchSize == 0) {
// Small delay between batches to simulate processing
Thread.sleep(100);
}
responseObserver.onNext(userList.get(i));
// Check if client cancelled the stream
if (responseObserver instanceof io.grpc.stub.ServerCallStreamObserver) {
io.grpc.stub.ServerCallStreamObserver<User> serverObserver =
(io.grpc.stub.ServerCallStreamObserver<User>) responseObserver;
if (serverObserver.isCancelled()) {
logger.info("Client cancelled user stream");
return;
}
}
}
responseObserver.onCompleted();
logger.info("User streaming completed");
} catch (Exception e) {
logger.error("Error streaming users", e);
responseObserver.onError(Status.INTERNAL
.withDescription("Failed to stream users")
.withCause(e)
.asRuntimeException());
}
}
private boolean matchesFilter(User user, String filter) {
// Simple filter implementation
return user.getName().toLowerCase().contains(filter.toLowerCase()) ||
user.getEmail().toLowerCase().contains(filter.toLowerCase());
}
}
2. Order Service Implementation
package com.example.grpc.server;
import com.example.grpc.*;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
public class OrderServiceImpl extends OrderServiceGrpc.OrderServiceImplBase {
private static final Logger logger = LoggerFactory.getLogger(OrderServiceImpl.class);
private static final AtomicLong orderIdCounter = new AtomicLong(1000);
private final Map<String, Order> orders = new ConcurrentHashMap<>();
private final Map<String, List<Order>> userOrders = new ConcurrentHashMap<>();
@Override
public void createOrder(CreateOrderRequest request, StreamObserver<Order> responseObserver) {
String userId = request.getUserId();
logger.info("Creating order for user: {}", userId);
try {
// Validate request
if (request.getItemsList().isEmpty()) {
responseObserver.onError(Status.INVALID_ARGUMENT
.withDescription("Order must contain at least one item")
.asRuntimeException());
return;
}
// Calculate total amount
double totalAmount = request.getItemsList().stream()
.mapToDouble(item -> item.getPrice() * item.getQuantity())
.sum();
// Create order
String orderId = "ORD-" + orderIdCounter.incrementAndGet();
Order order = Order.newBuilder()
.setOrderId(orderId)
.setUserId(userId)
.addAllItems(request.getItemsList())
.setTotalAmount(totalAmount)
.setCurrency(request.getCurrency())
.setStatus(OrderStatus.PENDING)
.setCreatedAt(com.google.protobuf.Timestamp.newBuilder()
.setSeconds(Instant.now().getEpochSecond())
.build())
.setUpdatedAt(com.google.protobuf.Timestamp.newBuilder()
.setSeconds(Instant.now().getEpochSecond())
.build())
.build();
// Store order
orders.put(orderId, order);
userOrders.computeIfAbsent(userId, k -> new ArrayList<>()).add(order);
logger.info("Order created successfully: {} for user: {}", orderId, userId);
responseObserver.onNext(order);
responseObserver.onCompleted();
} catch (Exception e) {
logger.error("Error creating order for user: {}", userId, e);
responseObserver.onError(Status.INTERNAL
.withDescription("Failed to create order")
.withCause(e)
.asRuntimeException());
}
}
@Override
public void getOrder(GetOrderRequest request, StreamObserver<Order> responseObserver) {
String orderId = request.getOrderId();
logger.info("Getting order: {}", orderId);
try {
Order order = orders.get(orderId);
if (order == null) {
responseObserver.onError(Status.NOT_FOUND
.withDescription("Order not found: " + orderId)
.asRuntimeException());
return;
}
responseObserver.onNext(order);
responseObserver.onCompleted();
} catch (Exception e) {
logger.error("Error getting order: {}", orderId, e);
responseObserver.onError(Status.INTERNAL
.withDescription("Failed to get order")
.withCause(e)
.asRuntimeException());
}
}
@Override
public void streamOrders(StreamOrdersRequest request, StreamObserver<Order> responseObserver) {
String userId = request.getUserId();
logger.info("Streaming orders for user: {}", userId);
try {
List<Order> userOrderList = userOrders.getOrDefault(userId, Collections.emptyList());
int batchSize = request.getBatchSize() > 0 ? request.getBatchSize() : 5;
for (int i = 0; i < userOrderList.size(); i++) {
if (i > 0 && i % batchSize == 0) {
// Small delay between batches
Thread.sleep(50);
}
responseObserver.onNext(userOrderList.get(i));
// Check if client cancelled the stream
if (responseObserver instanceof io.grpc.stub.ServerCallStreamObserver) {
io.grpc.stub.ServerCallStreamObserver<Order> serverObserver =
(io.grpc.stub.ServerCallStreamObserver<Order>) responseObserver;
if (serverObserver.isCancelled()) {
logger.info("Client cancelled order stream for user: {}", userId);
return;
}
}
}
responseObserver.onCompleted();
logger.info("Order streaming completed for user: {}", userId);
} catch (Exception e) {
logger.error("Error streaming orders for user: {}", userId, e);
responseObserver.onError(Status.INTERNAL
.withDescription("Failed to stream orders")
.withCause(e)
.asRuntimeException());
}
}
}
gRPC Server with Istio Features
1. Advanced gRPC Server
package com.example.grpc.server;
import com.example.grpc.health.HealthCheckResponse;
import com.example.grpc.health.HealthGrpc;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerInterceptors;
import io.grpc.health.v1.HealthCheckRequest;
import io.grpc.services.HealthStatusManager;
import io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class GrpcServer {
private static final Logger logger = LoggerFactory.getLogger(GrpcServer.class);
private final int port;
private final Server server;
private final HealthStatusManager healthStatusManager;
public GrpcServer(int port) {
this.port = port;
this.healthStatusManager = new HealthStatusManager();
this.server = ServerBuilder.forPort(port)
.addService(ServerInterceptors.intercept(
new UserServiceImpl(),
new IstioHeadersInterceptor(),
new MetricsInterceptor()
))
.addService(ServerInterceptors.intercept(
new OrderServiceImpl(),
new IstioHeadersInterceptor(),
new MetricsInterceptor()
))
.addService(healthStatusManager.getHealthService())
.addService(new CustomHealthService())
.intercept(new AuthenticationInterceptor())
.build();
}
public void start() throws IOException {
server.start();
logger.info("gRPC Server started, listening on port: {}", port);
// Set health status to serving
healthStatusManager.setStatus("", HealthCheckResponse.ServingStatus.SERVING);
healthStatusManager.setStatus("user.UserService", HealthCheckResponse.ServingStatus.SERVING);
healthStatusManager.setStatus("order.OrderService", HealthCheckResponse.ServingStatus.SERVING);
// Add shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
logger.info("Shutting down gRPC server...");
try {
GrpcServer.this.stop();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Server shutdown interrupted", e);
}
logger.info("gRPC server shut down successfully");
}));
}
public void stop() throws InterruptedException {
if (server != null) {
// Set health status to not serving
healthStatusManager.setStatus("", HealthCheckResponse.ServingStatus.NOT_SERVING);
healthStatusManager.clearStatus("user.UserService");
healthStatusManager.clearStatus("order.OrderService");
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
}
}
public void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}
public static void main(String[] args) throws IOException, InterruptedException {
int port = Integer.parseInt(System.getenv().getOrDefault("GRPC_PORT", "50051"));
GrpcServer server = new GrpcServer(port);
server.start();
server.blockUntilShutdown();
}
}
// Custom Health Service for Istio readiness/liveness probes
class CustomHealthService extends HealthGrpc.HealthImplBase {
private static final Logger logger = LoggerFactory.getLogger(CustomHealthService.class);
@Override
public void check(HealthCheckRequest request, StreamObserver<HealthCheckResponse> responseObserver) {
String service = request.getService();
HealthCheckResponse.ServingStatus status;
if (service.isEmpty() || "user.UserService".equals(service) || "order.OrderService".equals(service)) {
status = HealthCheckResponse.ServingStatus.SERVING;
} else {
status = HealthCheckResponse.ServingStatus.SERVICE_UNKNOWN;
}
HealthCheckResponse response = HealthCheckResponse.newBuilder()
.setStatus(status)
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
logger.debug("Health check for service: {} - status: {}", service, status);
}
@Override
public void watch(HealthCheckRequest request, StreamObserver<HealthCheckResponse> responseObserver) {
// Implement watch functionality for continuous health monitoring
String service = request.getService();
// Send initial status
HealthCheckResponse initialResponse = HealthCheckResponse.newBuilder()
.setStatus(HealthCheckResponse.ServingStatus.SERVING)
.build();
responseObserver.onNext(initialResponse);
// In production, you would watch for health status changes
// For simplicity, we'll just complete the stream
responseObserver.onCompleted();
}
}
2. Istio Headers Interceptor
package com.example.grpc.server;
import io.grpc.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class IstioHeadersInterceptor implements ServerInterceptor {
private static final Logger logger = LoggerFactory.getLogger(IstioHeadersInterceptor.class);
// Istio-specific headers
private static final String[] ISTIO_HEADERS = {
"x-request-id",
"x-b3-traceid",
"x-b3-spanid",
"x-b3-parentspanid",
"x-b3-sampled",
"x-b3-flags",
"x-ot-span-context",
"x-forwarded-client-cert",
"user-agent",
"x-envoy-external-address",
"x-forwarded-for",
"x-envoy-internal"
};
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
// Extract and log Istio headers
Map<String, String> istioHeaders = extractIstioHeaders(headers);
// Add to MDC for logging
MDC.put("x-request-id", istioHeaders.getOrDefault("x-request-id", "unknown"));
MDC.put("x-b3-traceid", istioHeaders.getOrDefault("x-b3-traceid", "unknown"));
logger.info("Received gRPC call with Istio headers: {}", istioHeaders);
// Create context with Istio headers
Context context = Context.current()
.withValue(IstioHeadersContextKey.INSTIO_HEADERS, istioHeaders);
return Contexts.interceptCall(context, call, headers, next);
}
private Map<String, String> extractIstioHeaders(Metadata headers) {
Map<String, String> istioHeaders = new ConcurrentHashMap<>();
for (String headerName : ISTIO_HEADERS) {
Metadata.Key<String> key = Metadata.Key.of(headerName, Metadata.ASCII_STRING_MARSHALLER);
String value = headers.get(key);
if (value != null) {
istioHeaders.put(headerName, value);
}
}
return istioHeaders;
}
// Context key for accessing Istio headers
public static class IstioHeadersContextKey {
public static final Context.Key<Map<String, String>> INSTIO_HEADERS =
Context.keyWithDefault("istio-headers", Map.of());
}
}
3. Authentication Interceptor
package com.example.grpc.server;
import io.grpc.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Base64;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
public class AuthenticationInterceptor implements ServerInterceptor {
private static final Logger logger = LoggerFactory.getLogger(AuthenticationInterceptor.class);
private static final Metadata.Key<String> AUTHORIZATION_HEADER =
Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER);
private static final Set<String> PUBLIC_METHODS = Set.of(
"grpc.health.v1.Health/Check",
"grpc.health.v1.Health/Watch"
);
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
String methodName = call.getMethodDescriptor().getFullMethodName();
// Skip authentication for public methods
if (PUBLIC_METHODS.contains(methodName)) {
return next.startCall(call, headers);
}
// Extract and validate JWT token
String authHeader = headers.get(AUTHORIZATION_HEADER);
if (authHeader == null || !authHeader.startsWith("Bearer ")) {
call.close(Status.UNAUTHENTICATED.withDescription("Missing or invalid authorization header"),
new Metadata());
return new ServerCall.Listener<ReqT>() {};
}
String token = authHeader.substring(7);
try {
if (!validateToken(token)) {
call.close(Status.UNAUTHENTICATED.withDescription("Invalid token"),
new Metadata());
return new ServerCall.Listener<ReqT>() {};
}
// Extract user info from token and add to context
String userId = extractUserIdFromToken(token);
Context context = Context.current().withValue(UserContextKey.USER_ID, userId);
logger.info("Authenticated user: {} for method: {}", userId, methodName);
return Contexts.interceptCall(context, call, headers, next);
} catch (Exception e) {
logger.error("Authentication error", e);
call.close(Status.UNAUTHENTICATED.withDescription("Authentication failed"),
new Metadata());
return new ServerCall.Listener<ReqT>() {};
}
}
private boolean validateToken(String token) {
// In production, use a proper JWT validation library
// This is a simplified example
try {
String[] parts = token.split("\\.");
if (parts.length != 3) {
return false;
}
// Basic validation - check if token is properly formatted
Base64.getDecoder().decode(parts[0]);
Base64.getDecoder().decode(parts[1]);
return true;
} catch (Exception e) {
return false;
}
}
private String extractUserIdFromToken(String token) {
// In production, parse JWT and extract user ID from claims
// This is a simplified example
try {
String[] parts = token.split("\\.");
String payload = new String(Base64.getDecoder().decode(parts[1]));
// Parse JSON payload and extract user_id
// For now, return a dummy user ID
return "user-123";
} catch (Exception e) {
throw new RuntimeException("Failed to extract user ID from token", e);
}
}
// Context key for user ID
public static class UserContextKey {
public static final Context.Key<String> USER_ID =
Context.keyWithDefault("user-id", "unknown");
}
}
4. Metrics Interceptor
package com.example.grpc.server;
import io.grpc.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
public class MetricsInterceptor implements ServerInterceptor {
private static final Logger logger = LoggerFactory.getLogger(MetricsInterceptor.class);
private final Map<String, AtomicLong> requestCounters = new ConcurrentHashMap<>();
private final Map<String, AtomicLong> errorCounters = new ConcurrentHashMap<>();
private final Map<String, AtomicLong> successCounters = new ConcurrentHashMap<>();
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
String methodName = call.getMethodDescriptor().getFullMethodName();
long startTime = System.currentTimeMillis();
// Increment request counter
requestCounters.computeIfAbsent(methodName, k -> new AtomicLong(0)).incrementAndGet();
// Create wrapper for response monitoring
ServerCall<ReqT, RespT> monitoringCall = new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendMessage(RespT message) {
super.sendMessage(message);
}
@Override
public void close(Status status, Metadata trailers) {
long duration = System.currentTimeMillis() - startTime;
if (status.isOk()) {
successCounters.computeIfAbsent(methodName, k -> new AtomicLong(0)).incrementAndGet();
logger.info("gRPC call completed successfully: {} - duration: {}ms",
methodName, duration);
} else {
errorCounters.computeIfAbsent(methodName, k -> new AtomicLong(0)).incrementAndGet();
logger.warn("gRPC call failed: {} - status: {} - duration: {}ms",
methodName, status.getCode(), duration);
}
// Log metrics (in production, send to monitoring system)
logMetrics(methodName, status, duration);
super.close(status, trailers);
}
};
return next.startCall(monitoringCall, headers);
}
private void logMetrics(String methodName, Status status, long duration) {
// In production, send metrics to Prometheus, Stackdriver, etc.
logger.debug("Metrics - Method: {}, Status: {}, Duration: {}ms, " +
"Total Requests: {}, Success: {}, Errors: {}",
methodName,
status.getCode(),
duration,
requestCounters.getOrDefault(methodName, new AtomicLong(0)).get(),
successCounters.getOrDefault(methodName, new AtomicLong(0)).get(),
errorCounters.getOrDefault(methodName, new AtomicLong(0)).get());
}
public Map<String, Long> getRequestCounts() {
Map<String, Long> counts = new ConcurrentHashMap<>();
requestCounters.forEach((method, counter) -> counts.put(method, counter.get()));
return counts;
}
public Map<String, Long> getErrorCounts() {
Map<String, Long> counts = new ConcurrentHashMap<>();
errorCounters.forEach((method, counter) -> counts.put(method, counter.get()));
return counts;
}
}
gRPC Client with Istio Support
1. Advanced gRPC Client
package com.example.grpc.client;
import com.example.grpc.*;
import io.grpc.*;
import io.grpc.stub.MetadataUtils;
import io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class GrpcClient {
private static final Logger logger = LoggerFactory.getLogger(GrpcClient.class);
private final ManagedChannel channel;
private final UserServiceGrpc.UserServiceBlockingStub userServiceBlockingStub;
private final UserServiceGrpc.UserServiceStub userServiceAsyncStub;
private final OrderServiceGrpc.OrderServiceBlockingStub orderServiceBlockingStub;
private final Metadata headers;
public GrpcClient(String host, int port, String authToken) {
this.channel = ManagedChannelBuilder.forAddress(host, port)
.usePlaintext() // In production, use TLS
.intercept(new ClientHeadersInterceptor(authToken))
.intercept(new ClientMetricsInterceptor())
.enableRetry() // Enable retry with Istio
.maxRetryAttempts(3)
.build();
this.headers = new Metadata();
if (authToken != null && !authToken.isEmpty()) {
headers.put(Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER),
"Bearer " + authToken);
}
// Add Istio headers for distributed tracing
headers.put(Metadata.Key.of("x-request-id", Metadata.ASCII_STRING_MARSHALLER),
generateRequestId());
headers.put(Metadata.Key.of("user-agent", Metadata.ASCII_STRING_MARSHALLER),
"java-grpc-client");
this.userServiceBlockingStub = UserServiceGrpc.newBlockingStub(channel)
.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(headers));
this.userServiceAsyncStub = UserServiceGrpc.newStub(channel)
.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(headers));
this.orderServiceBlockingStub = OrderServiceGrpc.newBlockingStub(channel)
.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(headers));
}
public void shutdown() throws InterruptedException {
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}
// User Service Methods
public User getUser(String userId) {
logger.info("Getting user: {}", userId);
GetUserRequest request = GetUserRequest.newBuilder()
.setUserId(userId)
.build();
try {
User user = userServiceBlockingStub.getUser(request);
logger.info("Retrieved user: {} - {}", user.getUserId(), user.getName());
return user;
} catch (StatusRuntimeException e) {
logger.error("RPC failed: {}", e.getStatus());
throw e;
}
}
public User createUser(String name, String email, String phone) {
logger.info("Creating user: {} - {}", name, email);
CreateUserRequest request = CreateUserRequest.newBuilder()
.setName(name)
.setEmail(email)
.setPhone(phone)
.build();
try {
User user = userServiceBlockingStub.createUser(request);
logger.info("Created user: {} - {}", user.getUserId(), user.getName());
return user;
} catch (StatusRuntimeException e) {
logger.error("RPC failed: {}", e.getStatus());
throw e;
}
}
public void listUsersAsync(int pageSize) {
logger.info("Listing users asynchronously, page size: {}", pageSize);
ListUsersRequest request = ListUsersRequest.newBuilder()
.setPageSize(pageSize)
.build();
CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<ListUsersResponse> responseObserver = new StreamObserver<ListUsersResponse>() {
@Override
public void onNext(ListUsersResponse response) {
logger.info("Received {} users", response.getUsersCount());
for (User user : response.getUsersList()) {
logger.info("User: {} - {} - {}", user.getUserId(), user.getName(), user.getEmail());
}
if (!response.getNextPageToken().isEmpty()) {
logger.info("More users available, next page token: {}", response.getNextPageToken());
}
}
@Override
public void onError(Throwable t) {
logger.error("List users failed", t);
finishLatch.countDown();
}
@Override
public void onCompleted() {
logger.info("List users completed");
finishLatch.countDown();
}
};
userServiceAsyncStub.listUsers(request, responseObserver);
try {
if (!finishLatch.await(1, TimeUnit.MINUTES)) {
logger.warn("List users request timed out");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("List users request interrupted");
}
}
public void streamUsers() {
logger.info("Starting user streaming");
StreamUsersRequest request = StreamUsersRequest.newBuilder()
.setBatchSize(5)
.build();
CountDownLatch finishLatch = new CountDownLatch(1);
AtomicInteger userCount = new AtomicInteger(0);
StreamObserver<User> responseObserver = new StreamObserver<User>() {
@Override
public void onNext(User user) {
int count = userCount.incrementAndGet();
logger.info("Received user {}: {} - {}", count, user.getUserId(), user.getName());
}
@Override
public void onError(Throwable t) {
logger.error("User streaming failed", t);
finishLatch.countDown();
}
@Override
public void onCompleted() {
logger.info("User streaming completed. Total users received: {}", userCount.get());
finishLatch.countDown();
}
};
userServiceAsyncStub.streamUsers(request, responseObserver);
try {
if (!finishLatch.await(2, TimeUnit.MINUTES)) {
logger.warn("User streaming timed out");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("User streaming interrupted");
}
}
// Order Service Methods
public Order createOrder(String userId, List<OrderItem> items, String currency) {
logger.info("Creating order for user: {}", userId);
CreateOrderRequest request = CreateOrderRequest.newBuilder()
.setUserId(userId)
.addAllItems(items)
.setCurrency(currency)
.build();
try {
Order order = orderServiceBlockingStub.createOrder(request);
logger.info("Created order: {} - Total: {} {}",
order.getOrderId(), order.getTotalAmount(), order.getCurrency());
return order;
} catch (StatusRuntimeException e) {
logger.error("RPC failed: {}", e.getStatus());
throw e;
}
}
public void streamOrders(String userId) {
logger.info("Streaming orders for user: {}", userId);
StreamOrdersRequest request = StreamOrdersRequest.newBuilder()
.setUserId(userId)
.setBatchSize(3)
.build();
CountDownLatch finishLatch = new CountDownLatch(1);
AtomicInteger orderCount = new AtomicInteger(0);
Iterator<Order> orderIterator = orderServiceBlockingStub.streamOrders(request);
try {
while (orderIterator.hasNext()) {
Order order = orderIterator.next();
int count = orderCount.incrementAndGet();
logger.info("Received order {}: {} - {} - Status: {}",
count, order.getOrderId(), order.getTotalAmount(), order.getStatus());
}
logger.info("Order streaming completed. Total orders received: {}", orderCount.get());
} catch (StatusRuntimeException e) {
logger.error("Order streaming failed", e);
} finally {
finishLatch.countDown();
}
try {
finishLatch.await(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private String generateRequestId() {
return "req-" + System.currentTimeMillis() + "-" + Math.abs((int) (Math.random() * 10000));
}
public static void main(String[] args) throws InterruptedException {
String host = System.getenv().getOrDefault("GRPC_SERVER_HOST", "localhost");
int port = Integer.parseInt(System.getenv().getOrDefault("GRPC_SERVER_PORT", "50051"));
String authToken = System.getenv().getOrDefault("AUTH_TOKEN", "dummy-token");
GrpcClient client = new GrpcClient(host, port, authToken);
try {
// Test user operations
User user = client.createUser("John Doe", "[email protected]", "+1234567890");
client.getUser(user.getUserId());
client.listUsersAsync(10);
client.streamUsers();
// Test order operations
OrderItem item1 = OrderItem.newBuilder()
.setProductId("prod-1")
.setName("Laptop")
.setQuantity(1)
.setPrice(999.99)
.build();
OrderItem item2 = OrderItem.newBuilder()
.setProductId("prod-2")
.setName("Mouse")
.setQuantity(2)
.setPrice(29.99)
.build();
Order order = client.createOrder(user.getUserId(), List.of(item1, item2), "USD");
client.streamOrders(user.getUserId());
} finally {
client.shutdown();
}
}
}
// Client Interceptors
class ClientHeadersInterceptor implements ClientInterceptor {
private final String authToken;
public ClientHeadersInterceptor(String authToken) {
this.authToken = authToken;
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions,
Channel next) {
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
// Add authentication header
if (authToken != null && !authToken.isEmpty()) {
headers.put(Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER),
"Bearer " + authToken);
}
// Add Istio headers for distributed tracing
headers.put(Metadata.Key.of("x-request-id", Metadata.ASCII_STRING_MARSHALLER),
generateRequestId());
headers.put(Metadata.Key.of("x-b3-traceid", Metadata.ASCII_STRING_MARSHALLER),
generateTraceId());
headers.put(Metadata.Key.of("x-b3-spanid", Metadata.ASCII_STRING_MARSHALLER),
generateSpanId());
headers.put(Metadata.Key.of("x-b3-sampled", Metadata.ASCII_STRING_MARSHALLER),
"1");
super.start(responseListener, headers);
}
};
}
private String generateRequestId() {
return "client-req-" + System.currentTimeMillis();
}
private String generateTraceId() {
return Long.toHexString(System.currentTimeMillis());
}
private String generateSpanId() {
return Long.toHexString((long) (Math.random() * Long.MAX_VALUE));
}
}
class ClientMetricsInterceptor implements ClientInterceptor {
private static final Logger logger = LoggerFactory.getLogger(ClientMetricsInterceptor.class);
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions,
Channel next) {
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
private long startTime;
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
startTime = System.currentTimeMillis();
super.start(new ForwardingClientCallListener<RespT>() {
@Override
public void onClose(Status status, Metadata trailers) {
long duration = System.currentTimeMillis() - startTime;
if (status.isOk()) {
logger.info("gRPC client call succeeded: {} - duration: {}ms",
method.getFullMethodName(), duration);
} else {
logger.warn("gRPC client call failed: {} - status: {} - duration: {}ms",
method.getFullMethodName(), status.getCode(), duration);
}
super.onClose(status, trailers);
}
@Override
protected Listener<RespT> delegate() {
return responseListener;
}
}, headers);
}
};
}
}
Kubernetes and Istio Configuration
1. Dockerfile for gRPC Service
FROM eclipse-temurin:11-jre as builder WORKDIR /app COPY target/grpc-istio-demo-1.0.0.jar app.jar RUN java -Djarmode=layertools -jar app.jar extract FROM eclipse-temurin:11-jre RUN useradd -m -u 1000 spring USER 1000 WORKDIR /app COPY --from=builder /app/dependencies/ ./ COPY --from=builder /app/spring-boot-loader/ ./ COPY --from=builder /app/snapshot-dependencies/ ./ COPY --from=builder /app/application/ ./ EXPOSE 50051 EXPOSE 8080 ENTRYPOINT ["java", "org.springframework.boot.loader.JarLauncher"]
2. Kubernetes Deployment
# k8s/deployment.yaml apiVersion: apps/v1 kind: Deployment metadata: name: grpc-user-service labels: app: grpc-user-service version: v1 spec: replicas: 3 selector: matchLabels: app: grpc-user-service template: metadata: labels: app: grpc-user-service version: v1 annotations: prometheus.io/scrape: "true" prometheus.io/port: "8080" prometheus.io/path: "/actuator/prometheus" spec: containers: - name: grpc-server image: your-registry/grpc-user-service:latest ports: - containerPort: 50051 name: grpc - containerPort: 8080 name: http-metrics env: - name: GRPC_PORT value: "50051" - name: JAVA_OPTS value: "-Xmx512m -Xms256m" resources: requests: memory: "512Mi" cpu: "250m" limits: memory: "1Gi" cpu: "500m" livenessProbe: httpGet: path: /actuator/health port: 8080 initialDelaySeconds: 30 periodSeconds: 10 readinessProbe: httpGet: path: /actuator/health port: 8080 initialDelaySeconds: 5 periodSeconds: 5 startupProbe: httpGet: path: /actuator/health port: 8080 failureThreshold: 30 periodSeconds: 10 --- apiVersion: v1 kind: Service metadata: name: grpc-user-service labels: app: grpc-user-service spec: selector: app: grpc-user-service ports: - name: grpc port: 50051 targetPort: 50051 - name: http-metrics port: 8080 targetPort: 8080
3. Istio Configuration
# k8s/istio/ # VirtualService for gRPC apiVersion: networking.istio.io/v1alpha3 kind: VirtualService metadata: name: grpc-user-service spec: hosts: - grpc-user-service - grpc-user-service.example.com http: - match: - port: 50051 route: - destination: host: grpc-user-service port: number: 50051 weight: 100 timeout: 30s retries: attempts: 3 perTryTimeout: 2s corsPolicy: allowOrigins: - exact: "*" allowMethods: - POST - GET - OPTIONS allowHeaders: - content-type - authorization - x-grpc-web - x-user-agent --- # DestinationRule for load balancing apiVersion: networking.istio.io/v1alpha3 kind: DestinationRule metadata: name: grpc-user-service spec: host: grpc-user-service trafficPolicy: loadBalancer: simple: LEAST_CONN connectionPool: http: http2MaxRequests: 100 maxRequestsPerConnection: 10 tcp: maxConnections: 100 outlierDetection: consecutiveErrors: 5 interval: 10s baseEjectionTime: 30s maxEjectionPercent: 50 --- # ServiceEntry for external gRPC services apiVersion: networking.istio.io/v1alpha3 kind: ServiceEntry metadata: name: external-grpc-service spec: hosts: - api.external-service.com ports: - number: 443 name: https protocol: HTTPS - number: 50051 name: grpc protocol: GRPC resolution: DNS location: MESH_EXTERNAL --- # Gateway for external gRPC access apiVersion: networking.istio.io/v1alpha3 kind: Gateway metadata: name: grpc-gateway spec: selector: istio: ingressgateway servers: - port: number: 50051 name: grpc protocol: GRPC hosts: - "grpc.example.com" tls: httpsRedirect: false mode: SIMPLE credentialName: grpc-cert --- # PeerAuthentication for mTLS apiVersion: security.istio.io/v1beta1 kind: PeerAuthentication metadata: name: default spec: mtls: mode: STRICT
4. Istio Telemetry Configuration
# k8s/istio/telemetry.yaml apiVersion: telemetry.istio.io/v1alpha1 kind: Telemetry metadata: name: grpc-telemetry spec: selector: matchLabels: app: grpc-user-service tracing: - providers: - name: zipkin randomSamplingPercentage: 100.0 customTags: "grpc.method": literal: value: "unknown" "grpc.service": literal: value: "unknown" "grpc.status_code": literal: value: "unknown" --- # Access logging apiVersion: telemetry.istio.io/v1alpha1 kind: Telemetry metadata: name: grpc-access-logging spec: selector: matchLabels: app: grpc-user-service accessLogging: - providers: - name: envoy disabled: false
Testing and Monitoring
1. Health Check Endpoint
package com.example.grpc.health;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.stereotype.Component;
@Component
public class GrpcHealthIndicator implements HealthIndicator {
@Override
public Health health() {
try {
// Check if gRPC server is healthy
// You can add more sophisticated checks here
return Health.up()
.withDetail("grpc_port", 50051)
.withDetail("status", "serving")
.build();
} catch (Exception e) {
return Health.down(e)
.withDetail("grpc_port", 50051)
.withDetail("status", "not_serving")
.build();
}
}
}
2. Integration Tests
package com.example.grpc.test;
import com.example.grpc.GrpcClient;
import com.example.grpc.User;
import io.grpc.StatusRuntimeException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.jupiter.api.Assertions.*;
public class GrpcClientTest {
private static final Logger logger = LoggerFactory.getLogger(GrpcClientTest.class);
private GrpcClient client;
@BeforeEach
public void setUp() {
String host = System.getenv().getOrDefault("GRPC_SERVER_HOST", "localhost");
int port = Integer.parseInt(System.getenv().getOrDefault("GRPC_SERVER_PORT", "50051"));
client = new GrpcClient(host, port, "test-token");
}
@AfterEach
public void tearDown() throws InterruptedException {
if (client != null) {
client.shutdown();
}
}
@Test
public void testCreateAndGetUser() {
// Create user
User createdUser = client.createUser("Test User", "[email protected]", "+1234567890");
assertNotNull(createdUser);
assertNotNull(createdUser.getUserId());
assertEquals("Test User", createdUser.getName());
assertEquals("[email protected]", createdUser.getEmail());
// Get user
User retrievedUser = client.getUser(createdUser.getUserId());
assertNotNull(retrievedUser);
assertEquals(createdUser.getUserId(), retrievedUser.getUserId());
assertEquals(createdUser.getName(), retrievedUser.getName());
}
@Test
public void testGetNonExistentUser() {
assertThrows(StatusRuntimeException.class, () -> {
client.getUser("non-existent-user-id");
});
}
@Test
public void testCreateUserWithInvalidData() {
assertThrows(StatusRuntimeException.class, () -> {
client.createUser("", "", "");
});
}
}
This comprehensive guide covers gRPC with Istio in Java, including protocol buffer definitions, server and client implementations, Istio-specific interceptors, Kubernetes deployment configurations, and testing strategies. The implementation demonstrates how to leverage Istio's features for traffic management, security, and observability in gRPC-based microservices.