Step Functions Security in Java: Secure AWS Step Functions Implementation

AWS Step Functions is a serverless orchestration service that lets you coordinate multiple AWS services into serverless workflows. This implementation focuses on securing Step Functions workflows in Java applications.

Step Functions Security Overview

Key security aspects:

  • IAM roles and policies for execution permissions
  • Encryption for data at rest and in transit
  • Input/Output validation and sanitization
  • Access control and resource-based policies
  • Audit logging and monitoring
  • VPC endpoints for private network access

Dependencies and Setup

Maven Configuration:

<dependencies>
<!-- AWS SDK v2 -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>stepfunctions</artifactId>
<version>2.20.0</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sts</artifactId>
<version>2.20.0</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>kms</artifactId>
<version>2.20.0</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>secretsmanager</artifactId>
<version>2.20.0</version>
</dependency>
<!-- Encryption -->
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk18on</artifactId>
<version>1.76</version>
</dependency>
<!-- JSON Processing -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version>
</dependency>
<!-- Validation -->
<dependency>
<groupId>jakarta.validation</groupId>
<artifactId>jakarta.validation-api</artifactId>
<version>3.0.2</version>
</dependency>
<dependency>
<groupId>org.hibernate.validator</groupId>
<artifactId>hibernate-validator</artifactId>
<version>8.0.0.Final</version>
</dependency>
<!-- Spring Boot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
</dependency>
<!-- Logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.7</version>
</dependency>
</dependencies>

Configuration Management

StepFunctionsConfig.java:

@Configuration
@ConfigurationProperties(prefix = "aws.step-functions")
@Data
public class StepFunctionsConfig {
private String region = "us-east-1";
private String endpointOverride;
private ExecutionConfig execution = new ExecutionConfig();
private SecurityConfig security = new SecurityConfig();
private MonitoringConfig monitoring = new MonitoringConfig();
@Data
public static class ExecutionConfig {
private String defaultStateMachineArn;
private int maxExecutionHistorySize = 1000;
private Duration executionTimeout = Duration.ofHours(24);
private boolean enableXRayTracing = true;
private boolean enableCloudWatchLogs = true;
}
@Data
public static class SecurityConfig {
private String kmsKeyId;
private String encryptionKeyAlias;
private boolean enforceInputValidation = true;
private boolean enableVpcEndpoints = false;
private List<String> allowedIamRoles = new ArrayList<>();
private List<String> restrictedStates = new ArrayList<>();
private int maxInputSizeBytes = 1024 * 1024; // 1MB
}
@Data
public static class MonitoringConfig {
private boolean enableCloudWatchMetrics = true;
private boolean enableDetailedMetrics = true;
private String logGroupName = "/aws/stepfunctions";
private Duration auditLogRetention = Duration.ofDays(365);
}
public StepFunctionsClient createStepFunctionsClient() {
StepFunctionsClientBuilder builder = StepFunctionsClient.builder()
.region(Region.of(region));
if (endpointOverride != null) {
builder.endpointOverride(URI.create(endpointOverride));
}
return builder.build();
}
}

IAM Policy Management

IamPolicyManager.java:

@Service
@Slf4j
public class IamPolicyManager {
private final ObjectMapper objectMapper;
public IamPolicyManager() {
this.objectMapper = new ObjectMapper();
}
public String createLeastPrivilegePolicy(String stateMachineArn, 
List<String> allowedActions,
Map<String, String> resourceConstraints) {
try {
JsonNode policy = createPolicyDocument(stateMachineArn, allowedActions, resourceConstraints);
return objectMapper.writeValueAsString(policy);
} catch (Exception e) {
throw new PolicyException("Failed to create IAM policy", e);
}
}
public String createScopedExecutionPolicy(String stateMachineArn, 
String executionId,
Map<String, Object> context) {
try {
JsonNode policy = objectMapper.createObjectNode()
.set("Version", objectMapper.valueToTree("2012-10-17"))
.set("Statement", createScopedStatements(stateMachineArn, executionId, context));
return objectMapper.writeValueAsString(policy);
} catch (Exception e) {
throw new PolicyException("Failed to create scoped execution policy", e);
}
}
public boolean validatePolicy(String policyJson) {
try {
JsonNode policy = objectMapper.readTree(policyJson);
// Basic policy validation
if (!policy.has("Version") || !policy.has("Statement")) {
return false;
}
JsonNode statements = policy.get("Statement");
if (!statements.isArray()) {
return false;
}
// Check for dangerous permissions
for (JsonNode statement : statements) {
if (hasDangerousPermissions(statement)) {
return false;
}
}
return true;
} catch (Exception e) {
log.warn("Policy validation failed", e);
return false;
}
}
private JsonNode createPolicyDocument(String stateMachineArn,
List<String> allowedActions,
Map<String, String> resourceConstraints) {
return objectMapper.createObjectNode()
.set("Version", objectMapper.valueToTree("2012-10-17"))
.set("Statement", createPolicyStatements(stateMachineArn, allowedActions, resourceConstraints));
}
private JsonNode createPolicyStatements(String stateMachineArn,
List<String> allowedActions,
Map<String, String> resourceConstraints) {
ArrayNode statements = objectMapper.createArrayNode();
// Allow specific Step Functions actions
ObjectNode stepFunctionsStatement = objectMapper.createObjectNode();
stepFunctionsStatement.put("Effect", "Allow");
stepFunctionsStatement.set("Action", objectMapper.valueToTree(allowedActions));
stepFunctionsStatement.set("Resource", objectMapper.valueToTree(List.of(stateMachineArn)));
// Add resource constraints if provided
if (resourceConstraints != null && !resourceConstraints.isEmpty()) {
stepFunctionsStatement.set("Condition", createConditions(resourceConstraints));
}
statements.add(stepFunctionsStatement);
// Add necessary service actions
statements.add(createServiceStatement("logs", List.of(
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
)));
statements.add(createServiceStatement("xray", List.of(
"xray:PutTraceSegments",
"xray:PutTelemetryRecords"
)));
return statements;
}
private JsonNode createScopedStatements(String stateMachineArn,
String executionId,
Map<String, Object> context) {
ArrayNode statements = objectMapper.createArrayNode();
// Allow execution-specific actions
ObjectNode executionStatement = objectMapper.createObjectNode();
executionStatement.put("Effect", "Allow");
executionStatement.set("Action", objectMapper.valueToTree(List.of(
"states:DescribeExecution",
"states:GetExecutionHistory"
)));
executionStatement.set("Resource", objectMapper.valueToTree(
stateMachineArn + ":execution:" + executionId));
// Add context-based conditions
if (context != null && !context.isEmpty()) {
executionStatement.set("Condition", createContextConditions(context));
}
statements.add(executionStatement);
return statements;
}
private JsonNode createServiceStatement(String service, List<String> actions) {
ObjectNode statement = objectMapper.createObjectNode();
statement.put("Effect", "Allow");
statement.set("Action", objectMapper.valueToTree(actions));
statement.set("Resource", objectMapper.valueToTree("*"));
return statement;
}
private JsonNode createConditions(Map<String, String> constraints) {
ObjectNode conditions = objectMapper.createObjectNode();
constraints.forEach((key, value) -> {
ObjectNode condition = objectMapper.createObjectNode();
condition.set(key, objectMapper.valueToTree(List.of(value)));
conditions.set("StringEquals", condition);
});
return conditions;
}
private JsonNode createContextConditions(Map<String, Object> context) {
ObjectNode conditions = objectMapper.createObjectNode();
context.forEach((key, value) -> {
if (value instanceof String) {
ObjectNode condition = objectMapper.createObjectNode();
condition.set(key, objectMapper.valueToTree(List.of(value)));
conditions.set("StringEquals", condition);
}
});
return conditions;
}
private boolean hasDangerousPermissions(JsonNode statement) {
if (!"Allow".equals(statement.get("Effect").asText())) {
return false;
}
JsonNode actions = statement.get("Action");
if (actions != null) {
for (JsonNode action : actions) {
String actionStr = action.asText();
if (actionStr.equals("*") || 
actionStr.startsWith("iam:") ||
actionStr.startsWith("sts:") ||
actionStr.contains("PassRole") ||
actionStr.contains("PutRolePolicy")) {
return true;
}
}
}
JsonNode resources = statement.get("Resource");
if (resources != null) {
for (JsonNode resource : resources) {
if ("*".equals(resource.asText())) {
return true;
}
}
}
return false;
}
}

Input Validation and Sanitization

InputValidator.java:

@Service
@Slf4j
public class InputValidator {
private final ObjectMapper objectMapper;
private final Validator validator;
private final Set<String> allowedJsonPaths;
public InputValidator() {
this.objectMapper = new ObjectMapper();
this.validator = Validation.buildDefaultValidatorFactory().getValidator();
this.allowedJsonPaths = initializeAllowedJsonPaths();
}
public ValidationResult validateExecutionInput(String input, 
Class<?> inputType,
ValidationContext context) {
try {
List<ValidationError> errors = new ArrayList<>();
// Check input size
if (input.getBytes().length > context.getMaxInputSize()) {
errors.add(new ValidationError("INPUT_SIZE", 
"Input exceeds maximum allowed size: " + context.getMaxInputSize()));
}
// Parse and validate JSON
JsonNode jsonInput = objectMapper.readTree(input);
// Validate against schema
errors.addAll(validateJsonSchema(jsonInput, context.getSchema()));
// Validate sensitive data
errors.addAll(validateSensitiveData(jsonInput));
// Validate against input type constraints
if (inputType != null) {
errors.addAll(validateInputType(jsonInput, inputType));
}
// Sanitize input if validation passes
JsonNode sanitizedInput = errors.isEmpty() ? 
sanitizeInput(jsonInput, context) : jsonInput;
return ValidationResult.builder()
.valid(errors.isEmpty())
.errors(errors)
.originalInput(input)
.sanitizedInput(objectMapper.writeValueAsString(sanitizedInput))
.build();
} catch (Exception e) {
log.error("Input validation failed", e);
return ValidationResult.builder()
.valid(false)
.errors(List.of(new ValidationError("VALIDATION_ERROR", 
"Input validation failed: " + e.getMessage())))
.originalInput(input)
.build();
}
}
public ValidationResult validateStateInput(String input, 
String stateName,
Map<String, Object> stateConstraints) {
try {
List<ValidationError> errors = new ArrayList<>();
JsonNode jsonInput = objectMapper.readTree(input);
// State-specific validation
if (stateConstraints != null) {
errors.addAll(validateStateConstraints(jsonInput, stateConstraints));
}
// Check for prohibited patterns
errors.addAll(validateProhibitedPatterns(jsonInput));
return ValidationResult.builder()
.valid(errors.isEmpty())
.errors(errors)
.originalInput(input)
.sanitizedInput(objectMapper.writeValueAsString(sanitizeStateInput(jsonInput, stateName)))
.build();
} catch (Exception e) {
throw new ValidationException("State input validation failed", e);
}
}
private List<ValidationError> validateJsonSchema(JsonNode input, JsonNode schema) {
List<ValidationError> errors = new ArrayList<>();
if (schema == null) {
return errors; // No schema defined
}
try {
// Use JSON Schema validator (implementation would use library like networknt/json-schema-validator)
// This is a simplified version
if (!input.isObject() && schema.get("type").asText().equals("object")) {
errors.add(new ValidationError("SCHEMA_TYPE", "Input must be a JSON object"));
}
// Add more schema validation logic...
} catch (Exception e) {
errors.add(new ValidationError("SCHEMA_VALIDATION", 
"Schema validation error: " + e.getMessage()));
}
return errors;
}
private List<ValidationError> validateSensitiveData(JsonNode input) {
List<ValidationError> errors = new ArrayList<>();
// Check for sensitive data patterns
detectSensitiveData(input, "", errors);
return errors;
}
private void detectSensitiveData(JsonNode node, String path, List<ValidationError> errors) {
if (node.isObject()) {
node.fields().forEachRemaining(entry -> {
String fieldName = entry.getKey();
JsonNode fieldValue = entry.getValue();
String currentPath = path.isEmpty() ? fieldName : path + "." + fieldName;
// Check field name for sensitive indicators
if (isSensitiveField(fieldName)) {
errors.add(new ValidationError("SENSITIVE_FIELD", 
"Potential sensitive field detected: " + currentPath));
}
// Check field value for sensitive patterns
if (fieldValue.isTextual()) {
String value = fieldValue.asText();
if (containsSensitivePattern(value)) {
errors.add(new ValidationError("SENSITIVE_DATA", 
"Potential sensitive data detected in: " + currentPath));
}
}
// Recursively check nested objects
detectSensitiveData(fieldValue, currentPath, errors);
});
} else if (node.isArray()) {
for (int i = 0; i < node.size(); i++) {
detectSensitiveData(node.get(i), path + "[" + i + "]", errors);
}
}
}
private List<ValidationError> validateInputType(JsonNode input, Class<?> inputType) {
List<ValidationError> errors = new ArrayList<>();
try {
Object typedInput = objectMapper.treeToValue(input, inputType);
Set<ConstraintViolation<Object>> violations = validator.validate(typedInput);
violations.forEach(violation -> 
errors.add(new ValidationError(
"TYPE_VALIDATION", 
violation.getPropertyPath() + ": " + violation.getMessage())));
} catch (Exception e) {
errors.add(new ValidationError("TYPE_CONVERSION", 
"Failed to convert input to type: " + e.getMessage()));
}
return errors;
}
private List<ValidationError> validateStateConstraints(JsonNode input, 
Map<String, Object> constraints) {
List<ValidationError> errors = new ArrayList<>();
constraints.forEach((key, value) -> {
JsonNode node = input.at(key);
if (!node.isMissingNode()) {
// Validate constraint
if (value instanceof Number) {
if (node.isNumber() && node.decimalValue().compareTo(
BigDecimal.valueOf(((Number) value).doubleValue())) > 0) {
errors.add(new ValidationError("CONSTRAINT_VIOLATION", 
key + " exceeds maximum value: " + value));
}
} else if (value instanceof String) {
String pattern = (String) value;
if (node.isTextual() && !node.asText().matches(pattern)) {
errors.add(new ValidationError("CONSTRAINT_VIOLATION", 
key + " does not match pattern: " + pattern));
}
}
}
});
return errors;
}
private List<ValidationError> validateProhibitedPatterns(JsonNode input) {
List<ValidationError> errors = new ArrayList<>();
// Check for SQL injection patterns
checkForPatterns(input, "", 
Pattern.compile("(?i)(SELECT|INSERT|UPDATE|DELETE|DROP|UNION|OR|AND)"), 
"SQL_INJECTION", errors);
// Check for command injection patterns
checkForPatterns(input, "", 
Pattern.compile("[|&;`$<>]"), 
"COMMAND_INJECTION", errors);
// Check for path traversal patterns
checkForPatterns(input, "", 
Pattern.compile("\\.\\./|\\.\\.\\\\"), 
"PATH_TRAVERSAL", errors);
return errors;
}
private void checkForPatterns(JsonNode node, String path, Pattern pattern, 
String errorCode, List<ValidationError> errors) {
if (node.isTextual()) {
String value = node.asText();
if (pattern.matcher(value).find()) {
errors.add(new ValidationError(errorCode, 
"Prohibited pattern detected in: " + (path.isEmpty() ? "root" : path)));
}
} else if (node.isObject()) {
node.fields().forEachRemaining(entry -> 
checkForPatterns(entry.getValue(), 
path.isEmpty() ? entry.getKey() : path + "." + entry.getKey(), 
pattern, errorCode, errors));
} else if (node.isArray()) {
for (int i = 0; i < node.size(); i++) {
checkForPatterns(node.get(i), path + "[" + i + "]", pattern, errorCode, errors);
}
}
}
private JsonNode sanitizeInput(JsonNode input, ValidationContext context) {
try {
// Remove sensitive fields
JsonNode sanitized = removeSensitiveFields(input, context.getSensitiveFields());
// Truncate large fields
sanitized = truncateLargeFields(sanitized, context.getMaxFieldSizes());
// Escape special characters
sanitized = escapeSpecialCharacters(sanitized);
return sanitized;
} catch (Exception e) {
log.warn("Input sanitization failed, returning original input", e);
return input;
}
}
private JsonNode sanitizeStateInput(JsonNode input, String stateName) {
// State-specific sanitization
JsonNode sanitized = input.deepCopy();
// Remove fields not allowed for this state
if (allowedJsonPaths.contains(stateName)) {
sanitized = filterAllowedPaths(sanitized, stateName);
}
return sanitized;
}
private JsonNode removeSensitiveFields(JsonNode node, Set<String> sensitiveFields) {
if (node.isObject()) {
ObjectNode objectNode = (ObjectNode) node;
sensitiveFields.forEach(objectNode::remove);
return objectNode;
}
return node;
}
private JsonNode truncateLargeFields(JsonNode node, Map<String, Integer> maxSizes) {
if (node.isObject()) {
ObjectNode objectNode = (ObjectNode) node;
maxSizes.forEach((field, maxSize) -> {
if (objectNode.has(field) && objectNode.get(field).isTextual()) {
String value = objectNode.get(field).asText();
if (value.length() > maxSize) {
objectNode.put(field, value.substring(0, maxSize) + "...");
}
}
});
return objectNode;
}
return node;
}
private JsonNode escapeSpecialCharacters(JsonNode node) {
// Implementation would escape HTML, JSON, etc.
return node;
}
private JsonNode filterAllowedPaths(JsonNode node, String stateName) {
// Implementation would filter JSON based on allowed paths for the state
return node;
}
private boolean isSensitiveField(String fieldName) {
String lowerField = fieldName.toLowerCase();
return lowerField.contains("password") ||
lowerField.contains("secret") ||
lowerField.contains("key") ||
lowerField.contains("token") ||
lowerField.contains("credential") ||
lowerField.contains("auth");
}
private boolean containsSensitivePattern(String value) {
// Check for common sensitive data patterns
return value.matches(".*[A-Za-z0-9+/]{40,}.*") || // Base64-like
value.matches(".*[0-9]{9,}.*") || // Potential SSN/credit card
value.matches(".*[a-fA-F0-9]{32,}.*"); // MD5 hash-like
}
private Set<String> initializeAllowedJsonPaths() {
return Set.of(
"PaymentState./amount",
"PaymentState./currency",
"UserState./userId",
"UserState./email"
);
}
@Data
@Builder
public static class ValidationResult {
private boolean valid;
private List<ValidationError> errors;
private String originalInput;
private String sanitizedInput;
}
@Data
@AllArgsConstructor
public static class ValidationError {
private String code;
private String message;
}
@Data
@Builder
public static class ValidationContext {
private int maxInputSize;
private JsonNode schema;
private Set<String> sensitiveFields;
private Map<String, Integer> maxFieldSizes;
}
}

Secure Step Functions Client

SecureStepFunctionsClient.java:

@Service
@Slf4j
public class SecureStepFunctionsClient {
private final StepFunctionsConfig config;
private final InputValidator inputValidator;
private final EncryptionService encryptionService;
private final AuditLogger auditLogger;
private final StepFunctionsClient client;
private final KmsClient kmsClient;
public SecureStepFunctionsClient(StepFunctionsConfig config,
InputValidator inputValidator,
EncryptionService encryptionService,
AuditLogger auditLogger) {
this.config = config;
this.inputValidator = inputValidator;
this.encryptionService = encryptionService;
this.auditLogger = auditLogger;
this.client = config.createStepFunctionsClient();
this.kmsClient = KmsClient.builder()
.region(Region.of(config.getRegion()))
.build();
}
public SecureExecution startSecureExecution(StartExecutionRequest request,
SecurityContext securityContext) {
try {
// Validate input
InputValidator.ValidationResult validation = inputValidator.validateExecutionInput(
request.input(), null, createValidationContext());
if (!validation.isValid()) {
auditLogger.logSecurityEvent(SecurityEvent.builder()
.eventType("INPUT_VALIDATION_FAILED")
.resourceArn(request.stateMachineArn())
.principal(securityContext.getPrincipal())
.details(Map.of("errors", validation.getErrors()))
.build());
throw new ValidationException("Input validation failed: " + validation.getErrors());
}
// Encrypt sensitive data in input
String encryptedInput = encryptionService.encryptSensitiveData(
validation.getSanitizedInput(), securityContext);
// Create secure execution request
StartExecutionRequest secureRequest = request.toBuilder()
.input(encryptedInput)
.build();
// Start execution
StartExecutionResponse response = client.startExecution(secureRequest);
// Log audit event
auditLogger.logSecurityEvent(SecurityEvent.builder()
.eventType("EXECUTION_STARTED")
.resourceArn(request.stateMachineArn())
.executionArn(response.executionArn())
.principal(securityContext.getPrincipal())
.details(Map.of(
"inputSize", encryptedInput.length(),
"validated", true
))
.build());
return SecureExecution.builder()
.executionArn(response.executionArn())
.startDate(response.startDate())
.securityContext(securityContext)
.validationResult(validation)
.build();
} catch (Exception e) {
auditLogger.logSecurityEvent(SecurityEvent.builder()
.eventType("EXECUTION_FAILED")
.resourceArn(request.stateMachineArn())
.principal(securityContext.getPrincipal())
.details(Map.of("error", e.getMessage()))
.severity(Severity.ERROR)
.build());
throw new ExecutionException("Secure execution failed", e);
}
}
public DescribeExecutionResponse describeSecureExecution(String executionArn,
SecurityContext securityContext) {
try {
// Verify access permissions
if (!hasExecutionAccess(executionArn, securityContext)) {
auditLogger.logSecurityEvent(SecurityEvent.builder()
.eventType("UNAUTHORIZED_ACCESS")
.executionArn(executionArn)
.principal(securityContext.getPrincipal())
.severity(Severity.HIGH)
.build());
throw new AccessDeniedException("Access denied to execution: " + executionArn);
}
DescribeExecutionResponse response = client.describeExecution(
DescribeExecutionRequest.builder()
.executionArn(executionArn)
.build());
// Decrypt output if present
String decryptedOutput = response.output() != null ? 
encryptionService.decryptData(response.output(), securityContext) : null;
auditLogger.logSecurityEvent(SecurityEvent.builder()
.eventType("EXECUTION_ACCESSED")
.executionArn(executionArn)
.principal(securityContext.getPrincipal())
.build());
return response.toBuilder()
.output(decryptedOutput)
.build();
} catch (Exception e) {
auditLogger.logSecurityEvent(SecurityEvent.builder()
.eventType("EXECUTION_ACCESS_FAILED")
.executionArn(executionArn)
.principal(securityContext.getPrincipal())
.details(Map.of("error", e.getMessage()))
.build());
throw new ExecutionException("Failed to describe execution", e);
}
}
public GetExecutionHistoryResponse getSecureExecutionHistory(String executionArn,
SecurityContext securityContext,
HistoryFilter filter) {
try {
// Verify access permissions
if (!hasExecutionAccess(executionArn, securityContext)) {
throw new AccessDeniedException("Access denied to execution history: " + executionArn);
}
GetExecutionHistoryRequest request = GetExecutionHistoryRequest.builder()
.executionArn(executionArn)
.maxResults(filter.getMaxResults())
.reverseOrder(filter.isReverseOrder())
.build();
GetExecutionHistoryResponse response = client.getExecutionHistory(request);
// Sanitize history events
List<HistoryEvent> sanitizedEvents = response.events().stream()
.map(this::sanitizeHistoryEvent)
.collect(Collectors.toList());
auditLogger.logSecurityEvent(SecurityEvent.builder()
.eventType("HISTORY_ACCESSED")
.executionArn(executionArn)
.principal(securityContext.getPrincipal())
.details(Map.of("eventCount", sanitizedEvents.size()))
.build());
return response.toBuilder()
.events(sanitizedEvents)
.build();
} catch (Exception e) {
auditLogger.logSecurityEvent(SecurityEvent.builder()
.eventType("HISTORY_ACCESS_FAILED")
.executionArn(executionArn)
.principal(securityContext.getPrincipal())
.details(Map.of("error", e.getMessage()))
.build());
throw new ExecutionException("Failed to get execution history", e);
}
}
public void stopSecureExecution(String executionArn, 
String cause,
SecurityContext securityContext) {
try {
// Verify stop permissions
if (!hasStopPermission(executionArn, securityContext)) {
throw new AccessDeniedException("Stop permission denied for execution: " + executionArn);
}
client.stopExecution(StopExecutionRequest.builder()
.executionArn(executionArn)
.cause(cause)
.build());
auditLogger.logSecurityEvent(SecurityEvent.builder()
.eventType("EXECUTION_STOPPED")
.executionArn(executionArn)
.principal(securityContext.getPrincipal())
.details(Map.of("cause", cause))
.build());
} catch (Exception e) {
auditLogger.logSecurityEvent(SecurityEvent.builder()
.eventType("EXECUTION_STOP_FAILED")
.executionArn(executionArn)
.principal(securityContext.getPrincipal())
.details(Map.of("error", e.getMessage()))
.build());
throw new ExecutionException("Failed to stop execution", e);
}
}
public List<SecureExecution> listSecureExecutions(String stateMachineArn,
SecurityContext securityContext,
ExecutionFilter filter) {
try {
// Apply security filters
ListExecutionsRequest request = applySecurityFilters(
ListExecutionsRequest.builder()
.stateMachineArn(stateMachineArn)
.maxResults(filter.getMaxResults())
.statusFilter(filter.getStatusFilter())
.build(),
securityContext);
ListExecutionsResponse response = client.listExecutions(request);
// Filter results based on security context
List<ExecutionListItem> secureExecutions = response.executions().stream()
.filter(execution -> hasExecutionAccess(execution.executionArn(), securityContext))
.collect(Collectors.toList());
auditLogger.logSecurityEvent(SecurityEvent.builder()
.eventType("EXECUTIONS_LISTED")
.resourceArn(stateMachineArn)
.principal(securityContext.getPrincipal())
.details(Map.of("count", secureExecutions.size()))
.build());
return secureExecutions.stream()
.map(execution -> SecureExecution.builder()
.executionArn(execution.executionArn())
.stateMachineArn(execution.stateMachineArn())
.name(execution.name())
.status(execution.status())
.startDate(execution.startDate())
.stopDate(execution.stopDate())
.securityContext(securityContext)
.build())
.collect(Collectors.toList());
} catch (Exception e) {
auditLogger.logSecurityEvent(SecurityEvent.builder()
.eventType("EXECUTIONS_LIST_FAILED")
.resourceArn(stateMachineArn)
.principal(securityContext.getPrincipal())
.details(Map.of("error", e.getMessage()))
.build());
throw new ExecutionException("Failed to list executions", e);
}
}
private boolean hasExecutionAccess(String executionArn, SecurityContext securityContext) {
// Implement access control logic
if (securityContext.isAdmin()) {
return true;
}
// Check if principal owns the execution
if (executionArn.contains(securityContext.getPrincipal())) {
return true;
}
// Check resource-based policies
return checkResourcePolicy(executionArn, securityContext);
}
private boolean hasStopPermission(String executionArn, SecurityContext securityContext) {
// Only admins and execution owners can stop executions
return securityContext.isAdmin() || executionArn.contains(securityContext.getPrincipal());
}
private boolean checkResourcePolicy(String resourceArn, SecurityContext securityContext) {
// Implement resource-based policy checking
// This would typically check IAM policies and resource tags
return true; // Simplified for example
}
private ListExecutionsRequest applySecurityFilters(ListExecutionsRequest request,
SecurityContext securityContext) {
// Apply security-based filters to the request
if (!securityContext.isAdmin()) {
// Non-admins can only see their own executions
// This would typically use resource tags or naming conventions
}
return request;
}
private HistoryEvent sanitizeHistoryEvent(HistoryEvent event) {
// Remove sensitive data from history events
return event.toBuilder()
.build(); // Implementation would sanitize event details
}
private InputValidator.ValidationContext createValidationContext() {
return InputValidator.ValidationContext.builder()
.maxInputSize(config.getSecurity().getMaxInputSizeBytes())
.sensitiveFields(Set.of("password", "secret", "token"))
.maxFieldSizes(Map.of("description", 500, "notes", 1000))
.build();
}
}

Encryption Service

EncryptionService.java:

@Service
@Slf4j
public class EncryptionService {
private final StepFunctionsConfig config;
private final KmsClient kmsClient;
private final ObjectMapper objectMapper;
public EncryptionService(StepFunctionsConfig config) {
this.config = config;
this.kmsClient = KmsClient.builder()
.region(Region.of(config.getRegion()))
.build();
this.objectMapper = new ObjectMapper();
}
public String encryptSensitiveData(String input, SecurityContext securityContext) {
try {
JsonNode jsonInput = objectMapper.readTree(input);
JsonNode encrypted = encryptJsonNode(jsonInput, securityContext);
return objectMapper.writeValueAsString(encrypted);
} catch (Exception e) {
throw new EncryptionException("Failed to encrypt sensitive data", e);
}
}
public String decryptData(String encryptedData, SecurityContext securityContext) {
try {
JsonNode jsonData = objectMapper.readTree(encryptedData);
JsonNode decrypted = decryptJsonNode(jsonData, securityContext);
return objectMapper.writeValueAsString(decrypted);
} catch (Exception e) {
throw new EncryptionException("Failed to decrypt data", e);
}
}
public String encryptField(String plaintext, String fieldName, SecurityContext context) {
try {
EncryptRequest request = EncryptRequest.builder()
.keyId(config.getSecurity().getKmsKeyId())
.plaintext(SdkBytes.fromUtf8String(plaintext))
.encryptionContext(createEncryptionContext(fieldName, context))
.build();
EncryptResponse response = kmsClient.encrypt(request);
return Base64.getEncoder().encodeToString(response.ciphertextBlob().asByteArray());
} catch (Exception e) {
throw new EncryptionException("Failed to encrypt field: " + fieldName, e);
}
}
public String decryptField(String encryptedBase64, String fieldName, SecurityContext context) {
try {
byte[] ciphertext = Base64.getDecoder().decode(encryptedBase64);
DecryptRequest request = DecryptRequest.builder()
.ciphertextBlob(SdkBytes.fromByteArray(ciphertext))
.encryptionContext(createEncryptionContext(fieldName, context))
.build();
DecryptResponse response = kmsClient.decrypt(request);
return response.plaintext().asUtf8String();
} catch (Exception e) {
throw new EncryptionException("Failed to decrypt field: " + fieldName, e);
}
}
private JsonNode encryptJsonNode(JsonNode node, SecurityContext context) {
if (node.isObject()) {
ObjectNode objectNode = (ObjectNode) node;
Iterator<Map.Entry<String, JsonNode>> fields = objectNode.fields();
while (fields.hasNext()) {
Map.Entry<String, JsonNode> field = fields.next();
String fieldName = field.getKey();
JsonNode fieldValue = field.getValue();
if (isSensitiveField(fieldName)) {
if (fieldValue.isTextual()) {
String encrypted = encryptField(fieldValue.asText(), fieldName, context);
objectNode.put(fieldName, "encrypted:" + encrypted);
}
} else if (fieldValue.isObject() || fieldValue.isArray()) {
encryptJsonNode(fieldValue, context);
}
}
} else if (node.isArray()) {
ArrayNode arrayNode = (ArrayNode) node;
for (int i = 0; i < arrayNode.size(); i++) {
JsonNode element = arrayNode.get(i);
if (element.isObject() || element.isArray()) {
encryptJsonNode(element, context);
}
}
}
return node;
}
private JsonNode decryptJsonNode(JsonNode node, SecurityContext context) {
if (node.isObject()) {
ObjectNode objectNode = (ObjectNode) node;
Iterator<Map.Entry<String, JsonNode>> fields = objectNode.fields();
while (fields.hasNext()) {
Map.Entry<String, JsonNode> field = fields.next();
String fieldName = field.getKey();
JsonNode fieldValue = field.getValue();
if (fieldValue.isTextual()) {
String value = fieldValue.asText();
if (value.startsWith("encrypted:")) {
String encrypted = value.substring("encrypted:".length());
String decrypted = decryptField(encrypted, fieldName, context);
objectNode.put(fieldName, decrypted);
}
} else if (fieldValue.isObject() || fieldValue.isArray()) {
decryptJsonNode(fieldValue, context);
}
}
} else if (node.isArray()) {
ArrayNode arrayNode = (ArrayNode) node;
for (int i = 0; i < arrayNode.size(); i++) {
JsonNode element = arrayNode.get(i);
if (element.isObject() || element.isArray()) {
decryptJsonNode(element, context);
}
}
}
return node;
}
private Map<String, String> createEncryptionContext(String fieldName, SecurityContext context) {
Map<String, String> encryptionContext = new HashMap<>();
encryptionContext.put("field", fieldName);
encryptionContext.put("principal", context.getPrincipal());
encryptionContext.put("timestamp", Instant.now().toString());
encryptionContext.put("purpose", "step-functions-security");
return encryptionContext;
}
private boolean isSensitiveField(String fieldName) {
String lowerField = fieldName.toLowerCase();
return lowerField.contains("password") ||
lowerField.contains("secret") ||
lowerField.contains("key") ||
lowerField.contains("token") ||
lowerField.contains("credential");
}
}

Audit Logging

AuditLogger.java:

@Service
@Slf4j
public class AuditLogger {
private final CloudWatchLogsClient cloudWatchLogsClient;
private final String logGroupName;
private final String logStreamName;
public AuditLogger(StepFunctionsConfig config) {
this.cloudWatchLogsClient = CloudWatchLogsClient.builder()
.region(Region.of(config.getRegion()))
.build();
this.logGroupName = config.getMonitoring().getLogGroupName() + "-audit";
this.logStreamName = "step-functions-security-" + Instant.now().getEpochSecond();
initializeLogStream();
}
public void logSecurityEvent(SecurityEvent event) {
try {
String logMessage = createLogMessage(event);
PutLogEventsRequest request = PutLogEventsRequest.builder()
.logGroupName(logGroupName)
.logStreamName(logStreamName)
.logEvents(LogEvent.builder()
.message(logMessage)
.timestamp(System.currentTimeMillis())
.build())
.build();
cloudWatchLogsClient.putLogEvents(request);
// Also log to application logs
log.info("Security Event: {}", logMessage);
} catch (Exception e) {
log.error("Failed to log security event", e);
// Fallback to application logs
log.warn("Security Event (Fallback): {}", event);
}
}
public void logDataAccessEvent(String executionArn, String principal, 
String operation, Map<String, Object> details) {
SecurityEvent event = SecurityEvent.builder()
.eventType("DATA_ACCESS")
.executionArn(executionArn)
.principal(principal)
.operation(operation)
.details(details)
.timestamp(Instant.now())
.build();
logSecurityEvent(event);
}
public List<SecurityEvent> querySecurityEvents(Instant startTime, Instant endTime,
String filterPattern) {
try {
FilterLogEventsRequest request = FilterLogEventsRequest.builder()
.logGroupName(logGroupName)
.startTime(startTime.toEpochMilli())
.endTime(endTime.toEpochMilli())
.filterPattern(filterPattern)
.build();
FilterLogEventsResponse response = cloudWatchLogsClient.filterLogEvents(request);
return response.events().stream()
.map(this::parseLogEvent)
.filter(Objects::nonNull)
.collect(Collectors.toList());
} catch (Exception e) {
throw new AuditException("Failed to query security events", e);
}
}
private String createLogMessage(SecurityEvent event) {
try {
ObjectMapper mapper = new ObjectMapper();
Map<String, Object> logData = new HashMap<>();
logData.put("eventType", event.getEventType());
logData.put("timestamp", event.getTimestamp().toString());
logData.put("principal", event.getPrincipal());
logData.put("resourceArn", event.getResourceArn());
logData.put("executionArn", event.getExecutionArn());
logData.put("operation", event.getOperation());
logData.put("severity", event.getSeverity().name());
logData.put("details", event.getDetails());
logData.put("sourceIp", event.getSourceIp());
logData.put("userAgent", event.getUserAgent());
return mapper.writeValueAsString(logData);
} catch (Exception e) {
return String.format("SecurityEvent[type=%s, principal=%s, resource=%s]", 
event.getEventType(), event.getPrincipal(), event.getResourceArn());
}
}
private SecurityEvent parseLogEvent(FilteredLogEvent logEvent) {
try {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(logEvent.message(), SecurityEvent.class);
} catch (Exception e) {
log.warn("Failed to parse log event: {}", logEvent.message());
return null;
}
}
private void initializeLogStream() {
try {
// Create log group if it doesn't exist
try {
cloudWatchLogsClient.createLogGroup(CreateLogGroupRequest.builder()
.logGroupName(logGroupName)
.build());
} catch (ResourceAlreadyExistsException e) {
// Log group already exists, this is fine
}
// Create log stream
cloudWatchLogsClient.createLogStream(CreateLogStreamRequest.builder()
.logGroupName(logGroupName)
.logStreamName(logStreamName)
.build());
} catch (Exception e) {
log.warn("Failed to initialize CloudWatch log stream", e);
}
}
}

Security Models

SecurityContext.java:

@Data
@Builder
public class SecurityContext {
private String principal;
private Set<String> roles;
private Set<String> permissions;
private String sourceIp;
private String userAgent;
private Map<String, String> attributes;
private Instant timestamp;
public boolean isAdmin() {
return roles.contains("admin") || roles.contains("Administrator");
}
public boolean hasPermission(String permission) {
return permissions.contains(permission) || isAdmin();
}
public boolean hasRole(String role) {
return roles.contains(role) || isAdmin();
}
}
@Data
@Builder
public class SecurityEvent {
private String eventType;
private Instant timestamp;
private String principal;
private String resourceArn;
private String executionArn;
private String operation;
private Severity severity;
private Map<String, Object> details;
private String sourceIp;
private String userAgent;
public enum Severity {
LOW, MEDIUM, HIGH, CRITICAL
}
}
@Data
@Builder
public class SecureExecution {
private String executionArn;
private String stateMachineArn;
private String name;
private String status;
private Instant startDate;
private Instant stopDate;
private SecurityContext securityContext;
private InputValidator.ValidationResult validationResult;
}
@Data
@Builder
public class ExecutionFilter {
private Integer maxResults;
private ExecutionStatus statusFilter;
private Instant startDate;
private Instant endDate;
}
@Data
@Builder
public class HistoryFilter {
private Integer maxResults;
private boolean reverseOrder;
private Set<String> includedEventTypes;
}

Best Practices

  1. Use least privilege IAM roles for Step Functions executions
  2. Encrypt sensitive data in state machine inputs/outputs
  3. Validate and sanitize all inputs before processing
  4. Implement comprehensive audit logging for all operations
  5. Use VPC endpoints for private network access
  6. Monitor execution patterns for anomalies
  7. Regularly review and update security policies

Conclusion

Step Functions Security in Java provides:

  • Secure execution with input validation and encryption
  • Fine-grained access control using IAM policies
  • Comprehensive audit logging for compliance
  • Data protection through encryption and sanitization
  • Monitoring and alerting for security events

This implementation ensures that Step Functions workflows are secure, compliant, and follow AWS security best practices while maintaining the flexibility and power of serverless orchestration.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper