Introduction
Vert.x Native Transport leverages operating system-specific transports (like Linux epoll, KQueue on BSD/macOS) to provide high-performance, non-blocking I/O operations. This offers significant performance benefits over the standard Java NIO transport for network-intensive applications.
Understanding Native Transports
Transport Comparison
public class TransportComparison {
public void demonstrateTransportTypes() {
// Standard Java NIO Transport
VertxOptions nioOptions = new VertxOptions()
.setPreferNativeTransport(false); // Uses Java NIO
// Native Transport (epoll on Linux, kqueue on BSD/macOS)
VertxOptions nativeOptions = new VertxOptions()
.setPreferNativeTransport(true); // Uses native transport if available
Vertx nioVertx = Vertx.vertx(nioOptions);
Vertx nativeVertx = Vertx.vertx(nativeOptions);
System.out.println("NIO Vertx using JDK transport: " +
nioVertx.isNativeTransportEnabled());
System.out.println("Native Vertx using native transport: " +
nativeVertx.isNativeTransportEnabled());
}
public void checkTransportCapabilities() {
Vertx vertx = Vertx.vertx(new VertxOptions().setPreferNativeTransport(true));
System.out.println("Native transport enabled: " + vertx.isNativeTransportEnabled());
System.out.println("Available transports:");
System.out.println(" epoll: " + VertxNativeTransport.isAvailable());
System.out.println(" epoll available: " + VertxNativeTransport.isEpollAvailable());
System.out.println(" kqueue available: " + VertxNativeTransport.isKqueueAvailable());
if (vertx.isNativeTransportEnabled()) {
System.out.println("Using transport: " +
(VertxNativeTransport.isEpollAvailable() ? "epoll" : "kqueue"));
}
}
}
Setting Up Native Transport
Maven Dependencies
<!-- Vert.x Core --> <dependency> <groupId>io.vertx</groupId> <artifactId>vertx-core</artifactId> <version>4.4.6</version> </dependency> <!-- Linux epoll native transport --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-transport-native-epoll</artifactId> <version>4.1.100.Final</version> <classifier>linux-x86_64</classifier> </dependency> <!-- macOS/BSD kqueue native transport --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-transport-native-kqueue</artifactId> <version>4.1.100.Final</version> <classifier>osx-x86_64</classifier> </dependency> <!-- Universal native transport (auto-detects platform) --> <dependency> <groupId>io.vertx</groupId> <artifactId>vertx-netty-transport-native</artifactId> <version>4.4.6</version> <classifier>linux-x86_64</classifier> <!-- or osx-x86_64 --> </dependency>
Basic Native Transport Configuration
public class NativeTransportSetup {
public Vertx createVertxWithNativeTransport() {
VertxOptions options = new VertxOptions()
.setPreferNativeTransport(true)
.setEventLoopPoolSize(4) // Optimize for native transport
.setWorkerPoolSize(20)
.setInternalBlockingPoolSize(20);
return Vertx.vertx(options);
}
public Vertx createClusteredVertxWithNativeTransport() {
VertxOptions options = new VertxOptions()
.setPreferNativeTransport(true)
.setEventLoopPoolSize(Runtime.getRuntime().availableProcessors())
.setWorkerPoolSize(40);
// Cluster manager configuration
ClusterManager clusterManager = new HazelcastClusterManager();
CompletableFuture<Vertx> future = new CompletableFuture<>();
Vertx.clusteredVertx(options, clusterManager, result -> {
if (result.succeeded()) {
future.complete(result.result());
} else {
future.completeExceptionally(result.cause());
}
});
return future.join();
}
public void demonstrateTransportSelection() {
// Auto-detect and use best available transport
VertxOptions options = new VertxOptions()
.setPreferNativeTransport(true);
Vertx vertx = Vertx.vertx(options);
if (vertx.isNativeTransportEnabled()) {
String transportName = VertxNativeTransport.isEpollAvailable() ? "epoll" : "kqueue";
System.out.println("Using native transport: " + transportName);
} else {
System.out.println("Falling back to Java NIO transport");
}
}
}
High-Performance HTTP Server
Native Transport HTTP Server
public class NativeHttpServer {
private static final Logger logger = LoggerFactory.getLogger(NativeHttpServer.class);
public void createHighPerformanceServer() {
Vertx vertx = Vertx.vertx(new VertxOptions()
.setPreferNativeTransport(true)
.setEventLoopPoolSize(4));
HttpServerOptions serverOptions = new HttpServerOptions()
.setTcpFastOpen(true) // Linux TCP fast open
.setTcpCork(true) // Linux TCP cork
.setTcpQuickAck(true) // Linux TCP quick ACK
.setReusePort(true) // SO_REUSEPORT
.setLogActivity(true) // Log network activity
.setSsl(false); // Disable SSL for benchmarking
HttpServer server = vertx.createHttpServer(serverOptions);
// High-performance request handler
Router router = Router.router(vertx);
// Simple ping endpoint
router.get("/ping").handler(ctx -> {
ctx.response()
.putHeader("Content-Type", "text/plain")
.end("pong");
});
// JSON response endpoint
router.get("/api/info").handler(ctx -> {
JsonObject response = new JsonObject()
.put("timestamp", System.currentTimeMillis())
.put("transport", vertx.isNativeTransportEnabled() ? "native" : "nio")
.put("thread", Thread.currentThread().getName());
ctx.response()
.putHeader("Content-Type", "application/json")
.end(response.encode());
});
// File serving endpoint
router.get("/file/*").handler(ctx -> {
String path = ctx.request().path().substring(6); // Remove "/file/" prefix
vertx.fileSystem().readFile(path)
.onSuccess(buffer -> {
ctx.response()
.putHeader("Content-Type", "application/octet-stream")
.end(buffer);
})
.onFailure(err -> ctx.response().setStatusCode(404).end());
});
// Start server
server.requestHandler(router).listen(8080, result -> {
if (result.succeeded()) {
logger.info("HTTP server listening on port 8080");
logger.info("Native transport enabled: {}", vertx.isNativeTransportEnabled());
} else {
logger.error("Failed to start server", result.cause());
}
});
}
public void createMultipleInstances() {
// Create multiple server instances for load balancing
int instances = Runtime.getRuntime().availableProcessors();
for (int i = 0; i < instances; i++) {
Vertx vertx = Vertx.vertx(new VertxOptions()
.setPreferNativeTransport(true)
.setEventLoopPoolSize(1)); // One event loop per instance
HttpServerOptions options = new HttpServerOptions()
.setPort(8080)
.setReusePort(true); // Allow multiple processes to bind to same port
HttpServer server = vertx.createHttpServer(options);
Router router = Router.router(vertx);
router.get("/").handler(ctx -> {
ctx.response().end("Served by instance " + i +
" on thread " + Thread.currentThread().getName());
});
server.requestHandler(router).listen(result -> {
if (result.succeeded()) {
logger.info("Server instance {} started on port 8080", i);
} else {
logger.error("Instance {} failed to start", i, result.cause());
}
});
}
}
}
TCP Server with Native Optimizations
High-Performance TCP Server
public class NativeTcpServer {
private static final Logger logger = LoggerFactory.getLogger(NativeTcpServer.class);
public void createTcpServer() {
Vertx vertx = Vertx.vertx(new VertxOptions()
.setPreferNativeTransport(true));
NetServerOptions options = new NetServerOptions()
.setTcpFastOpen(true)
.setTcpCork(true)
.setTcpQuickAck(true)
.setReusePort(true)
.setReceiveBufferSize(32 * 1024) // 32KB receive buffer
.setSendBufferSize(32 * 1024) // 32KB send buffer
.setLogActivity(true);
NetServer server = vertx.createNetServer(options);
server.connectHandler(socket -> {
logger.info("Client connected from: {}", socket.remoteAddress());
// Handle incoming data
socket.handler(buffer -> {
String message = buffer.toString();
logger.info("Received: {}", message);
// Echo back with timestamp
String response = String.format("[%s] Echo: %s",
Instant.now(), message);
socket.write(response);
// Close connection on specific command
if ("quit".equalsIgnoreCase(message.trim())) {
socket.close();
}
});
socket.closeHandler(v -> {
logger.info("Client disconnected: {}", socket.remoteAddress());
});
socket.exceptionHandler(err -> {
logger.error("Socket error", err);
});
// Send welcome message
socket.write("Welcome to Native TCP Server!\n");
});
server.listen(8888, result -> {
if (result.succeeded()) {
logger.info("TCP Server listening on port 8888 with native transport");
} else {
logger.error("Failed to start TCP server", result.cause());
}
});
}
public void createProtocolHandler() {
Vertx vertx = Vertx.vertx(new VertxOptions().setPreferNativeTransport(true));
NetServer server = vertx.createNetServer(new NetServerOptions()
.setTcpFastOpen(true)
.setReusePort(true));
server.connectHandler(socket -> {
// Simple protocol handler
ProtocolHandler handler = new SimpleProtocolHandler(socket);
handler.start();
});
server.listen(9999);
}
private static class SimpleProtocolHandler {
private final NetSocket socket;
private final ByteArrayOutputStream buffer = new ByteArrayOutputStream();
public SimpleProtocolHandler(NetSocket socket) {
this.socket = socket;
}
public void start() {
socket.handler(this::handleData);
socket.endHandler(v -> cleanup());
socket.exceptionHandler(err -> {
logger.error("Protocol handler error", err);
cleanup();
});
}
private void handleData(Buffer data) {
try {
buffer.write(data.getBytes());
processBuffer();
} catch (Exception e) {
logger.error("Error processing data", e);
socket.close();
}
}
private void processBuffer() {
byte[] data = buffer.toByteArray();
// Simple protocol: messages are newline separated
for (int i = 0; i < data.length; i++) {
if (data[i] == '\n') {
byte[] message = Arrays.copyOf(data, i);
handleMessage(new String(message, StandardCharsets.UTF_8));
// Remove processed data from buffer
byte[] remaining = Arrays.copyOfRange(data, i + 1, data.length);
buffer.reset();
buffer.write(remaining);
break;
}
}
}
private void handleMessage(String message) {
logger.info("Processing message: {}", message);
// Simple command processing
String response;
if (message.startsWith("TIME")) {
response = "TIME: " + Instant.now();
} else if (message.startsWith("ECHO ")) {
response = message.substring(5);
} else {
response = "UNKNOWN COMMAND: " + message;
}
socket.write(response + "\n");
}
private void cleanup() {
try {
buffer.close();
} catch (IOException e) {
// Ignore
}
}
}
}
UDP Server with Native Transport
High-Performance UDP Server
public class NativeUdpServer {
private static final Logger logger = LoggerFactory.getLogger(NativeUdpServer.class);
public void createUdpServer() {
Vertx vertx = Vertx.vertx(new VertxOptions()
.setPreferNativeTransport(true));
DatagramSocketOptions options = new DatagramSocketOptions()
.setReusePort(true)
.setReuseAddress(true)
.setLogActivity(true);
vertx.createDatagramSocket(options)
.listen(9999, "0.0.0.0", result -> {
if (result.succeeded()) {
DatagramSocket socket = result.result();
logger.info("UDP server listening on port 9999");
// Handle incoming packets
socket.handler(packet -> {
SocketAddress sender = packet.sender();
Buffer data = packet.data();
String message = data.toString();
logger.info("Received UDP packet from {}: {}",
sender, message);
// Echo back with timestamp
String response = String.format("[%s] Echo: %s",
Instant.now(), message);
socket.send(Buffer.buffer(response),
sender.port(), sender.host(), sendResult -> {
if (sendResult.failed()) {
logger.error("Failed to send UDP response",
sendResult.cause());
}
});
});
// Handle socket errors
socket.exceptionHandler(err -> {
logger.error("UDP socket error", err);
});
} else {
logger.error("Failed to start UDP server", result.cause());
}
});
}
public void createMulticastServer() {
Vertx vertx = Vertx.vertx(new VertxOptions()
.setPreferNativeTransport(true));
vertx.createDatagramSocket(new DatagramSocketOptions()
.setReusePort(true)
.setReuseAddress(true))
.listen(8888, "0.0.0.0", result -> {
if (result.succeeded()) {
DatagramSocket socket = result.result();
// Join multicast group
socket.listenMulticastGroup("230.0.0.1", listenResult -> {
if (listenResult.succeeded()) {
logger.info("Joined multicast group 230.0.0.1");
// Handle multicast packets
socket.handler(packet -> {
logger.info("Multicast from {}: {}",
packet.sender(), packet.data().toString());
});
// Send periodic multicast messages
vertx.setPeriodic(5000, timerId -> {
String message = "Multicast message at " + Instant.now();
socket.send(Buffer.buffer(message),
8888, "230.0.0.1", sendResult -> {
if (sendResult.succeeded()) {
logger.debug("Sent multicast message");
}
});
});
} else {
logger.error("Failed to join multicast group",
listenResult.cause());
}
});
} else {
logger.error("Failed to start multicast server", result.cause());
}
});
}
}
Performance Monitoring and Metrics
Native Transport Metrics
public class NativeTransportMetrics {
private final Vertx vertx;
private final MeterRegistry meterRegistry;
public NativeTransportMetrics(Vertx vertx, MeterRegistry meterRegistry) {
this.vertx = vertx;
this.meterRegistry = meterRegistry;
setupMetrics();
}
private void setupMetrics() {
// Event loop metrics
Gauge.builder("vertx.eventloop.size")
.description("Event loop pool size")
.register(meterRegistry, vertx, v -> v.nettyEventLoopGroup().count());
// Worker pool metrics
Gauge.builder("vertx.workerpool.size")
.description("Worker pool size")
.register(meterRegistry, vertx, v ->
((WorkerPool) v.getWorkerPool()).maxPoolSize());
// Native transport metrics
Gauge.builder("vertx.native.enabled")
.description("Native transport enabled")
.register(meterRegistry, vertx, v -> v.isNativeTransportEnabled() ? 1 : 0);
}
public void monitorHttpServer(HttpServer server) {
Timer requestTimer = Timer.builder("http.requests.duration")
.description("HTTP request duration")
.register(meterRegistry);
Counter requestCounter = Counter.builder("http.requests.total")
.description("Total HTTP requests")
.register(meterRegistry);
// You would typically integrate this with your request handlers
server.requestHandler(request -> {
long startTime = System.nanoTime();
request.endHandler(v -> {
long duration = System.nanoTime() - startTime;
requestTimer.record(duration, TimeUnit.NANOSECONDS);
requestCounter.increment();
});
// Your normal request processing...
request.response().end("OK");
});
}
public JsonObject getTransportStats() {
return new JsonObject()
.put("nativeTransportEnabled", vertx.isNativeTransportEnabled())
.put("eventLoopPoolSize", vertx.nettyEventLoopGroup().count())
.put("transportType", getTransportType())
.put("timestamp", Instant.now().toString());
}
private String getTransportType() {
if (!vertx.isNativeTransportEnabled()) {
return "NIO";
}
return VertxNativeTransport.isEpollAvailable() ? "epoll" : "kqueue";
}
}
Advanced Configuration
Custom Event Loop Groups
public class AdvancedNativeConfiguration {
public Vertx createVertxWithCustomEventLoop() {
// Create custom event loop group for native transport
EventLoopGroup eventLoopGroup;
if (VertxNativeTransport.isEpollAvailable()) {
eventLoopGroup = new EpollEventLoopGroup(4);
} else if (VertxNativeTransport.isKqueueAvailable()) {
eventLoopGroup = new KQueueEventLoopGroup(4);
} else {
eventLoopGroup = new NioEventLoopGroup(4);
}
VertxOptions options = new VertxOptions()
.setEventLoopPoolSize(4)
.setPreferNativeTransport(true);
return Vertx.vertx(options);
}
public void configureNetworkOptions() {
HttpServerOptions httpOptions = new HttpServerOptions()
// Linux-specific optimizations
.setTcpFastOpen(VertxNativeTransport.isEpollAvailable())
.setTcpCork(VertxNativeTransport.isEpollAvailable())
.setTcpQuickAck(VertxNativeTransport.isEpollAvailable())
// General optimizations
.setReusePort(true)
.setReuseAddress(true)
.setTcpNoDelay(true) // Disable Nagle's algorithm
.setSendBufferSize(32 * 1024) // 32KB send buffer
.setReceiveBufferSize(32 * 1024) // 32KB receive buffer
.setLogActivity(false) // Disable in production
.setIdleTimeout(30) // 30 second idle timeout
.setCompressionSupported(true) // Enable compression
.setMaxWebSocketFrameSize(65536); // 64KB WebSocket frames
NetServerOptions netOptions = new NetServerOptions()
.setTcpFastOpen(VertxNativeTransport.isEpollAvailable())
.setTcpCork(VertxNativeTransport.isEpollAvailable())
.setReusePort(true)
.setReceiveBufferSize(64 * 1024) // 64KB for high-throughput TCP
.setSendBufferSize(64 * 1024);
}
public void configureForHighConcurrency() {
int cpuCores = Runtime.getRuntime().availableProcessors();
VertxOptions options = new VertxOptions()
.setPreferNativeTransport(true)
.setEventLoopPoolSize(cpuCores) // One event loop per core
.setWorkerPoolSize(20) // Worker threads for blocking ops
.setInternalBlockingPoolSize(20) // Internal blocking operations
.setMaxEventLoopExecuteTime(2000000000L) // 2 seconds max event loop time
.setMaxWorkerExecuteTime(60000000000L) // 60 seconds max worker time
.setWarningExceptionTime(5000000000L); // 5 second warning threshold
HttpServerOptions serverOptions = new HttpServerOptions()
.setReusePort(true)
.setTcpFastOpen(true)
.setTcpNoDelay(true)
.setAcceptBacklog(1024) // Connection backlog
.setMaxChunkSize(8192) // 8KB max chunk size
.setInitialBufferSizeHttpDecoder(128); // Initial HTTP decoder buffer
}
}
SSL/TLS with Native Transport
Native SSL Performance
public class NativeSslConfiguration {
public void createHttpsServer() {
Vertx vertx = Vertx.vertx(new VertxOptions()
.setPreferNativeTransport(true));
HttpServerOptions options = new HttpServerOptions()
.setSsl(true)
.setKeyCertOptions(new JksOptions()
.setPath("server-keystore.jks")
.setPassword("password"))
.setUseAlpn(true) // HTTP/2 with ALPN
.setSslEngineOptions(new OpenSSLEngineOptions()) // Use OpenSSL if available
.setTcpFastOpen(true)
.setReusePort(true);
HttpServer server = vertx.createHttpServer(options);
Router router = Router.router(vertx);
router.get("/").handler(ctx -> {
ctx.response().end("Secure connection with native transport!");
});
server.requestHandler(router).listen(8443, result -> {
if (result.succeeded()) {
System.out.println("HTTPS server listening on port 8443");
} else {
result.cause().printStackTrace();
}
});
}
public void checkSslCapabilities() {
System.out.println("OpenSSL available: " + OpenSSLAvailability.isAvailable());
System.out.println("HTTP/2 available: " + HttpVersion.HTTP_2 != null);
if (OpenSSLAvailability.isAvailable()) {
System.out.println("Using OpenSSL for better SSL performance");
}
}
}
Testing and Benchmarking
Performance Testing
public class NativeTransportBenchmark {
public void benchmarkThroughput() {
Vertx nioVertx = Vertx.vertx(new VertxOptions().setPreferNativeTransport(false));
Vertx nativeVertx = Vertx.vertx(new VertxOptions().setPreferNativeTransport(true));
System.out.println("=== Benchmarking Transports ===");
System.out.println("NIO Transport: " + benchmarkServer(nioVertx));
System.out.println("Native Transport: " + benchmarkServer(nativeVertx));
nioVertx.close();
nativeVertx.close();
}
private long benchmarkServer(Vertx vertx) {
AtomicLong requestCount = new AtomicLong();
CountDownLatch latch = new CountDownLatch(1);
HttpServer server = vertx.createHttpServer();
server.requestHandler(req -> {
requestCount.incrementAndGet();
req.response().end("OK");
});
CompletableFuture<Void> serverReady = new CompletableFuture<>();
server.listen(0, result -> {
if (result.succeeded()) {
serverReady.complete(null);
} else {
serverReady.completeExceptionally(result.cause());
}
});
serverReady.join();
int port = server.actualPort();
HttpClient client = vertx.createHttpClient();
// Warm up
for (int i = 0; i < 1000; i++) {
client.get(port, "localhost", "/").send().result();
}
// Benchmark
long startTime = System.nanoTime();
int requests = 10000;
List<Future> futures = new ArrayList<>();
for (int i = 0; i < requests; i++) {
futures.add(client.get(port, "localhost", "/").send());
}
CompositeFuture.all(futures).onComplete(result -> {
long duration = System.nanoTime() - startTime;
double rps = (requests * 1e9) / duration;
System.out.printf("Completed %d requests in %.2f ms (%.0f req/s)%n",
requests, duration / 1e6, rps);
latch.countDown();
});
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return requestCount.get();
}
}
Troubleshooting and Best Practices
Common Issues and Solutions
public class NativeTransportTroubleshooting {
public void diagnoseNativeTransport() {
System.out.println("=== Native Transport Diagnostics ===");
// Check dependencies
checkDependencies();
// Check platform support
checkPlatformSupport();
// Check configuration
checkConfiguration();
}
private void checkDependencies() {
try {
Class.forName("io.netty.channel.epoll.Epoll");
System.out.println("✓ epoll dependency available");
} catch (ClassNotFoundException e) {
System.out.println("✗ epoll dependency missing");
}
try {
Class.forName("io.netty.channel.kqueue.KQueue");
System.out.println("✓ kqueue dependency available");
} catch (ClassNotFoundException e) {
System.out.println("✗ kqueue dependency missing");
}
}
private void checkPlatformSupport() {
System.out.println("OS Name: " + System.getProperty("os.name"));
System.out.println("OS Arch: " + System.getProperty("os.arch"));
System.out.println("Linux: " + isLinux());
System.out.println("macOS: " + isMacOS());
System.out.println("epoll available: " + VertxNativeTransport.isEpollAvailable());
System.out.println("kqueue available: " + VertxNativeTransport.isKqueueAvailable());
}
private void checkConfiguration() {
VertxOptions options = new VertxOptions().setPreferNativeTransport(true);
Vertx vertx = Vertx.vertx(options);
System.out.println("Native transport enabled: " + vertx.isNativeTransportEnabled());
System.out.println("Event loop size: " + vertx.nettyEventLoopGroup().count());
vertx.close();
}
private boolean isLinux() {
return System.getProperty("os.name").toLowerCase().contains("linux");
}
private boolean isMacOS() {
return System.getProperty("os.name").toLowerCase().contains("mac");
}
// Best practices
public void applyBestPractices() {
VertxOptions options = new VertxOptions()
.setPreferNativeTransport(true)
.setEventLoopPoolSize(Runtime.getRuntime().availableProcessors())
.setWorkerPoolSize(20)
.setInternalBlockingPoolSize(20)
.setWarningExceptionTime(5000000000L); // 5 seconds
HttpServerOptions serverOptions = new HttpServerOptions()
.setReusePort(true)
.setTcpNoDelay(true)
.setSendBufferSize(32 * 1024)
.setReceiveBufferSize(32 * 1024);
if (VertxNativeTransport.isEpollAvailable()) {
serverOptions
.setTcpFastOpen(true)
.setTcpCork(true)
.setTcpQuickAck(true);
}
}
}
Conclusion
Vert.x Native Transport provides significant performance benefits for I/O-intensive applications:
- Higher Throughput - Reduced context switching and system calls
- Lower Latency - Optimized event notification mechanisms
- Better Resource Utilization - More efficient connection handling
- Platform Optimizations - Linux epoll and BSD/macOS kqueue specific optimizations
Key configuration aspects:
- Enable with
setPreferNativeTransport(true) - Use platform-specific dependencies
- Configure appropriate pool sizes
- Leverage transport-specific optimizations (TCP fast open, cork, etc.)
- Monitor performance and adjust based on workload
By properly configuring and leveraging native transports, you can achieve substantial performance improvements in your Vert.x applications.