Distributed Transactions with XA in Java

Overview

Distributed transactions coordinate multiple resources (databases, message queues, etc.) across different systems using the X/Open XA specification. The XA protocol ensures ACID properties (Atomicity, Consistency, Isolation, Durability) across distributed resources.

Architecture

Two-Phase Commit (2PC) Protocol

  1. Phase 1 (Prepare): All participants prepare to commit
  2. Phase 2 (Commit/Rollback): Transaction manager commits or rolls back all participants

Dependencies

<dependencies>
<!-- Spring Boot Starter JTA -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>
<!-- Alternatively, use Bitronix -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-bitronix</artifactId>
</dependency>
<!-- Database with XA support -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.33</version>
</dependency>
<!-- JMS for messaging -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-artemis</artifactId>
</dependency>
<!-- Monitoring -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
</dependency>
</dependencies>

Core Implementation

1. XA Transaction Manager Configuration

@Configuration
@EnableTransactionManagement
public class XATransactionConfig {
@Bean(initMethod = "init", destroyMethod = "close")
public UserTransactionManager userTransactionManager() {
UserTransactionManager userTransactionManager = new UserTransactionManager();
userTransactionManager.setForceShutdown(false);
return userTransactionManager;
}
@Bean
public UserTransaction userTransaction() throws SystemException {
UserTransactionImp userTransaction = new UserTransactionImp();
userTransaction.setTransactionTimeout(300); // 5 minutes
return userTransaction;
}
@Bean
public TransactionManager transactionManager() {
return new JtaTransactionManager(userTransaction(), userTransactionManager());
}
// XA DataSource for MySQL
@Bean
public DataSource dataSource() {
MysqlXADataSource xaDataSource = new MysqlXADataSource();
xaDataSource.setUrl("jdbc:mysql://localhost:3306/xa_db");
xaDataSource.setUser("username");
xaDataSource.setPassword("password");
AtomikosDataSourceBean dataSource = new AtomikosDataSourceBean();
dataSource.setXaDataSource(xaDataSource);
dataSource.setUniqueResourceName("mysqlXADB");
dataSource.setMaxPoolSize(50);
dataSource.setTestQuery("SELECT 1");
return dataSource;
}
// XA Connection Factory for JMS
@Bean
public ConnectionFactory connectionFactory() {
ActiveMQXAConnectionFactory xaConnectionFactory = new ActiveMQXAConnectionFactory();
xaConnectionFactory.setBrokerURL("tcp://localhost:61616");
AtomikosConnectionFactoryBean connectionFactory = new AtomikosConnectionFactoryBean();
connectionFactory.setXaConnectionFactory(xaConnectionFactory);
connectionFactory.setUniqueResourceName("activemqXA");
connectionFactory.setMaxPoolSize(50);
return connectionFactory;
}
// JMS Template
@Bean
public JmsTemplate jmsTemplate() {
return new JmsTemplate(connectionFactory());
}
// JTA Transaction Manager
@Bean
public JtaTransactionManager transactionManager(UserTransaction userTransaction,
TransactionManager userTransactionManager) {
return new JtaTransactionManager(userTransaction, userTransactionManager);
}
}

2. Custom XA Transaction Manager

@Component
public class DistributedTransactionManager {
private final UserTransaction userTransaction;
private final DataSource dataSource;
private final ConnectionFactory connectionFactory;
private final MeterRegistry meterRegistry;
public DistributedTransactionManager(UserTransaction userTransaction,
DataSource dataSource,
ConnectionFactory connectionFactory,
MeterRegistry meterRegistry) {
this.userTransaction = userTransaction;
this.dataSource = dataSource;
this.connectionFactory = connectionFactory;
this.meterRegistry = meterRegistry;
}
public <T> T executeInTransaction(TransactionalOperation<T> operation) 
throws TransactionException {
long startTime = System.currentTimeMillis();
boolean success = false;
try {
userTransaction.begin();
T result = operation.execute();
userTransaction.commit();
success = true;
return result;
} catch (Exception e) {
try {
userTransaction.rollback();
} catch (SystemException se) {
throw new TransactionException("Failed to rollback transaction", se);
}
throw new TransactionException("Transaction failed", e);
} finally {
recordTransactionMetrics(startTime, success);
}
}
public void executeWithResources(Consumer<TransactionContext> operation) 
throws TransactionException {
executeInTransaction(() -> {
try (TransactionContext context = new TransactionContext(dataSource, connectionFactory)) {
operation.accept(context);
return null;
}
});
}
private void recordTransactionMetrics(long startTime, boolean success) {
long duration = System.currentTimeMillis() - startTime;
meterRegistry.timer("xa.transaction.duration")
.record(Duration.ofMillis(duration));
meterRegistry.counter("xa.transaction.outcome", 
"success", String.valueOf(success)).increment();
}
@FunctionalInterface
public interface TransactionalOperation<T> {
T execute() throws Exception;
}
}

3. Transaction Context for Resource Management

public class TransactionContext implements AutoCloseable {
private final DataSource dataSource;
private final ConnectionFactory connectionFactory;
private Connection dbConnection;
private Session jmsSession;
private final List<AutoCloseable> resources;
public TransactionContext(DataSource dataSource, ConnectionFactory connectionFactory) {
this.dataSource = dataSource;
this.connectionFactory = connectionFactory;
this.resources = new ArrayList<>();
}
public Connection getDatabaseConnection() throws SQLException {
if (dbConnection == null) {
dbConnection = dataSource.getConnection();
resources.add(dbConnection);
}
return dbConnection;
}
public Session getJmsSession() throws JMSException {
if (jmsSession == null) {
Connection jmsConnection = connectionFactory.createConnection();
jmsSession = jmsConnection.createSession(true, Session.SESSION_TRANSACTED);
resources.add(jmsSession);
resources.add(jmsConnection);
}
return jmsSession;
}
public void executeSql(String sql, Object... params) throws SQLException {
try (PreparedStatement stmt = getDatabaseConnection().prepareStatement(sql)) {
for (int i = 0; i < params.length; i++) {
stmt.setObject(i + 1, params[i]);
}
stmt.executeUpdate();
}
}
public void sendMessage(String destination, String message) throws JMSException {
Session session = getJmsSession();
MessageProducer producer = session.createProducer(session.createQueue(destination));
TextMessage textMessage = session.createTextMessage(message);
producer.send(textMessage);
producer.close();
}
@Override
public void close() {
// Resources will be closed by transaction manager
// We just track them for reference
}
}

Business Service Implementation

1. Order Processing Service with XA

@Service
@Transactional
public class OrderProcessingService {
private final DistributedTransactionManager transactionManager;
private final OrderRepository orderRepository;
private final InventoryService inventoryService;
private final NotificationService notificationService;
public OrderProcessingService(DistributedTransactionManager transactionManager,
OrderRepository orderRepository,
InventoryService inventoryService,
NotificationService notificationService) {
this.transactionManager = transactionManager;
this.orderRepository = orderRepository;
this.inventoryService = inventoryService;
this.notificationService = notificationService;
}
public Order processOrder(OrderRequest request) throws TransactionException {
return transactionManager.executeInTransaction(() -> {
// Step 1: Create order in database
Order order = createOrder(request);
// Step 2: Update inventory (different database)
updateInventory(order);
// Step 3: Send notification (message queue)
sendOrderNotification(order);
// Step 4: Record audit trail (another database)
recordAuditTrail(order);
return order;
});
}
public void processBatchOrders(List<OrderRequest> requests) throws TransactionException {
transactionManager.executeWithResources(context -> {
for (OrderRequest request : requests) {
processSingleOrderInContext(request, context);
}
});
}
private void processSingleOrderInContext(OrderRequest request, TransactionContext context) 
throws SQLException, JMSException {
// Database operation
String insertOrderSql = "INSERT INTO orders (id, customer_id, amount, status) VALUES (?, ?, ?, ?)";
context.executeSql(insertOrderSql, 
UUID.randomUUID().toString(), 
request.getCustomerId(), 
request.getAmount(), 
"PROCESSING");
// Inventory update
String updateInventorySql = "UPDATE inventory SET quantity = quantity - ? WHERE product_id = ?";
context.executeSql(updateInventorySql, request.getQuantity(), request.getProductId());
// Send JMS message
String message = String.format("Order processed: %s for customer %s", 
request.getProductId(), request.getCustomerId());
context.sendMessage("ORDER_QUEUE", message);
}
private Order createOrder(OrderRequest request) {
Order order = new Order();
order.setId(UUID.randomUUID().toString());
order.setCustomerId(request.getCustomerId());
order.setProductId(request.getProductId());
order.setQuantity(request.getQuantity());
order.setAmount(request.getAmount());
order.setStatus("CREATED");
order.setCreatedAt(Instant.now());
return orderRepository.save(order);
}
private void updateInventory(Order order) {
inventoryService.reserveProduct(
order.getProductId(), 
order.getQuantity(), 
order.getId()
);
}
private void sendOrderNotification(Order order) {
OrderEvent event = new OrderEvent(
order.getId(),
order.getCustomerId(),
order.getProductId(),
order.getQuantity(),
order.getAmount(),
Instant.now()
);
notificationService.sendOrderCreatedEvent(event);
}
private void recordAuditTrail(Order order) {
// Audit to separate database
AuditRecord audit = new AuditRecord(
"ORDER_CREATED",
order.getId(),
order.getCustomerId(),
Instant.now()
);
// auditRepository.save(audit); // Different datasource
}
}

2. Saga Pattern for Long-Running Transactions

@Service
public class OrderSagaService {
private final DistributedTransactionManager transactionManager;
private final Map<String, SagaExecution> activeSagas;
public OrderSagaService(DistributedTransactionManager transactionManager) {
this.transactionManager = transactionManager;
this.activeSagas = new ConcurrentHashMap<>();
}
public String startOrderSaga(OrderRequest request) {
String sagaId = UUID.randomUUID().toString();
SagaExecution saga = new SagaExecution(sagaId, request);
activeSagas.put(sagaId, saga);
try {
executeSagaStep(saga, this::createOrder);
executeSagaStep(saga, this::reserveInventory);
executeSagaStep(saga, this::processPayment);
executeSagaStep(saga, this::confirmOrder);
saga.setStatus(SagaStatus.COMPLETED);
} catch (Exception e) {
saga.setStatus(SagaStatus.FAILED);
compensateSaga(saga);
}
return sagaId;
}
private void executeSagaStep(SagaExecution saga, SagaStep step) throws Exception {
transactionManager.executeInTransaction(() -> {
step.execute(saga.getRequest());
saga.addCompletedStep(step.getClass().getSimpleName());
});
}
private void compensateSaga(SagaExecution saga) {
// Execute compensation in reverse order
List<String> completedSteps = saga.getCompletedSteps();
Collections.reverse(completedSteps);
for (String step : completedSteps) {
try {
compensateStep(step, saga.getRequest());
} catch (Exception e) {
// Log compensation failure but continue
log.error("Compensation failed for step: {}", step, e);
}
}
}
private void compensateStep(String stepName, OrderRequest request) {
switch (stepName) {
case "createOrder":
// Cancel order
break;
case "reserveInventory":
// Release inventory
break;
case "processPayment":
// Refund payment
break;
// Add other compensation handlers
}
}
@FunctionalInterface
private interface SagaStep {
void execute(OrderRequest request) throws Exception;
}
private void createOrder(OrderRequest request) {
// Create order in database
}
private void reserveInventory(OrderRequest request) {
// Reserve inventory
}
private void processPayment(OrderRequest request) {
// Process payment
}
private void confirmOrder(OrderRequest request) {
// Confirm order
}
}

XA Recovery and Monitoring

1. Transaction Recovery Service

@Service
public class XARecoveryService {
private final UserTransactionManager transactionManager;
private final DataSource dataSource;
private final ScheduledExecutorService scheduler;
public XARecoveryService(UserTransactionManager transactionManager,
DataSource dataSource) {
this.transactionManager = transactionManager;
this.dataSource = dataSource;
this.scheduler = Executors.newSingleThreadScheduledExecutor();
}
@PostConstruct
public void startRecoveryMonitor() {
scheduler.scheduleAtFixedRate(this::recoverPendingTransactions, 
5, 5, TimeUnit.MINUTES);
}
@PreDestroy
public void stopRecoveryMonitor() {
scheduler.shutdown();
}
public void recoverPendingTransactions() {
try {
// Query for in-doubt transactions
List<Xid> inDoubtTransactions = findInDoubtTransactions();
for (Xid xid : inDoubtTransactions) {
recoverTransaction(xid);
}
} catch (Exception e) {
log.error("Failed to recover pending transactions", e);
}
}
private List<Xid> findInDoubtTransactions() throws SQLException {
List<Xid> inDoubtXids = new ArrayList<>();
// This is vendor-specific - example for MySQL
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(
"SELECT * FROM information_schema.innodb_trx WHERE trx_state = 'PREPARED'");
ResultSet rs = stmt.executeQuery()) {
while (rs.next()) {
// Convert to Xid - this is simplified
// Real implementation would parse the XID from transaction data
byte[] globalId = rs.getBytes("trx_id");
// Xid xid = new CustomXid(globalId);
// inDoubtXids.add(xid);
}
}
return inDoubtXids;
}
private void recoverTransaction(Xid xid) {
try {
// Try to commit the prepared transaction
transactionManager.commit(xid, true);
log.info("Recovered and committed transaction: {}", xid);
} catch (Exception commitException) {
try {
// If commit fails, try to rollback
transactionManager.rollback(xid);
log.info("Recovered and rolled back transaction: {}", xid);
} catch (Exception rollbackException) {
log.error("Failed to recover transaction: {}", xid, rollbackException);
}
}
}
public TransactionStatistics getTransactionStatistics() throws SystemException {
// Get statistics from transaction manager
return new TransactionStatistics(
transactionManager.getTransactionCount(),
transactionManager.getActiveTransactionCount(),
transactionManager.getCommitedTransactionCount(),
transactionManager.getRolledBackTransactionCount()
);
}
}

2. Transaction Monitoring

@Component
public class TransactionMonitor {
private final MeterRegistry meterRegistry;
private final UserTransactionManager transactionManager;
private final Map<String, Timer.Sample> activeTransactions;
public TransactionMonitor(MeterRegistry meterRegistry,
UserTransactionManager transactionManager) {
this.meterRegistry = meterRegistry;
this.transactionManager = transactionManager;
this.activeTransactions = new ConcurrentHashMap<>();
}
public void transactionStarted(String transactionId) {
Timer.Sample sample = Timer.start(meterRegistry);
activeTransactions.put(transactionId, sample);
meterRegistry.counter("xa.transaction.started").increment();
}
public void transactionCompleted(String transactionId, boolean success) {
Timer.Sample sample = activeTransactions.remove(transactionId);
if (sample != null) {
sample.stop(meterRegistry.timer("xa.transaction.duration"));
}
meterRegistry.counter("xa.transaction.completed", 
"success", String.valueOf(success)).increment();
}
public void recordResourceEnlistment(String resourceName) {
meterRegistry.counter("xa.resource.enlisted", 
"resource", resourceName).increment();
}
@Scheduled(fixedRate = 30000) // Every 30 seconds
public void recordActiveTransactions() {
try {
int activeCount = transactionManager.getActiveTransactionCount();
meterRegistry.gauge("xa.transaction.active", activeCount);
} catch (SystemException e) {
log.error("Failed to get active transaction count", e);
}
}
}

Error Handling and Resilience

1. XA Exception Handling

@Service
public class XAErrorHandler {
private final TransactionMonitor transactionMonitor;
private final XARecoveryService recoveryService;
public XAErrorHandler(TransactionMonitor transactionMonitor,
XARecoveryService recoveryService) {
this.transactionMonitor = transactionMonitor;
this.recoveryService = recoveryService;
}
@EventListener
public void handleTransactionException(TransactionExceptionEvent event) {
TransactionException exception = event.getException();
if (isHeuristicException(exception)) {
handleHeuristicException(exception);
} else if (isXAException(exception)) {
handleXAException((XAException) exception.getCause());
} else {
handleGenericTransactionException(exception);
}
}
private boolean isHeuristicException(TransactionException exception) {
return exception.getCause() instanceof HeuristicMixedException ||
exception.getCause() instanceof HeuristicRollbackException;
}
private boolean isXAException(TransactionException exception) {
return exception.getCause() instanceof XAException;
}
private void handleHeuristicException(TransactionException exception) {
log.error("Heuristic transaction exception occurred", exception);
// Manual intervention required
// Alert administrators
sendHeuristicAlert(exception);
}
private void handleXAException(XAException xaException) {
switch (xaException.errorCode) {
case XAException.XA_RBROLLBACK:
log.warn("Transaction rolled back by resource manager");
break;
case XAException.XA_RBTIMEOUT:
log.warn("Transaction timed out");
break;
case XAException.XA_RBTRANSIENT:
log.warn("Transient transaction failure, may retry");
break;
case XAException.XAER_NOTA:
log.error("Invalid XID specified");
break;
case XAException.XAER_RMERR:
log.error("Resource manager error occurred");
break;
default:
log.error("Unknown XA error code: {}", xaException.errorCode);
}
// Trigger recovery for certain error types
if (shouldTriggerRecovery(xaException)) {
recoveryService.recoverPendingTransactions();
}
}
private boolean shouldTriggerRecovery(XAException xaException) {
return xaException.errorCode == XAException.XAER_RMFAIL ||
xaException.errorCode == XAException.XAER_RMERR;
}
private void sendHeuristicAlert(TransactionException exception) {
// Send alert to operations team
// This could be email, Slack, PagerDuty, etc.
}
}

Testing Distributed Transactions

1. Integration Tests

@SpringBootTest
@Transactional
class DistributedTransactionTest {
@Autowired
private OrderProcessingService orderService;
@Autowired
private DataSource dataSource;
@Autowired
private ConnectionFactory connectionFactory;
@Test
void testSuccessfulOrderProcessing() throws Exception {
OrderRequest request = new OrderRequest("customer1", "product1", 2, 100.0);
Order order = orderService.processOrder(request);
assertNotNull(order);
assertEquals("PROCESSING", order.getStatus());
// Verify database state
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(
"SELECT COUNT(*) FROM orders WHERE id = ?")) {
stmt.setString(1, order.getId());
ResultSet rs = stmt.executeQuery();
rs.next();
assertEquals(1, rs.getInt(1));
}
}
@Test
void testOrderProcessingRollback() throws Exception {
OrderRequest request = new OrderRequest("customer1", "invalid-product", 2, 100.0);
assertThrows(TransactionException.class, () -> {
orderService.processOrder(request);
});
// Verify rollback - no order should be created
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(
"SELECT COUNT(*) FROM orders WHERE customer_id = ?")) {
stmt.setString(1, "customer1");
ResultSet rs = stmt.executeQuery();
rs.next();
assertEquals(0, rs.getInt(1));
}
}
}

2. Test Configuration

@TestConfiguration
public class XATestConfig {
@Bean
public DataSource testDataSource() {
// Use embedded database for testing
EmbeddedDatabaseBuilder builder = new EmbeddedDatabaseBuilder();
return builder.setType(EmbeddedDatabaseType.H2).build();
}
@Bean
public ConnectionFactory testConnectionFactory() {
// Use embedded ActiveMQ for testing
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL("vm://localhost?broker.persistent=false");
return connectionFactory;
}
}

Best Practices

  1. Keep Transactions Short: XA transactions hold locks across systems
  2. Use Timeouts: Prevent long-running transactions from blocking resources
  3. Monitor Heuristic Outcomes: Manual intervention may be required
  4. Test Recovery Scenarios: Ensure system can recover from failures
  5. Use Saga Pattern: For long-running business transactions
  6. Implement Idempotency: Retries should be safe
  7. Monitor Resource Usage: XA resources can be expensive

This implementation provides a comprehensive foundation for distributed transactions with XA in Java, covering transaction management, error handling, recovery, and monitoring.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper