Introduction to Snowflake Data Warehouse
Snowflake is a cloud-native data warehouse that provides seamless scalability, performance, and ease of use. This guide covers comprehensive Java integration with Snowflake for building robust data warehouse solutions.
Table of Contents
- Snowflake Setup and Configuration
- JDBC Integration
- Data Ingestion Patterns
- Data Modeling and Transformation
- Query Optimization
- Monitoring and Management
- Spring Boot Integration
- Real-World Use Cases
1. Project Setup and Dependencies
Maven Configuration
<properties>
<snowflake.jdbc.version>3.13.22</snowflake.jdbc.version>
<spring-boot.version>3.1.0</spring-boot.version>
<jackson.version>2.15.2</jackson.version>
</properties>
<dependencies>
<!-- Snowflake JDBC Driver -->
<dependency>
<groupId>net.snowflake</groupId>
<artifactId>snowflake-jdbc</artifactId>
<version>${snowflake.jdbc.version}</version>
</dependency>
<!-- Spring Boot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${spring-boot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
<version>${spring-boot.version}</version>
</dependency>
<!-- JSON Processing -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>${jackson.version}</version>
</dependency>
<!-- Apache Commons -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.13.0</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.13.0</version>
</dependency>
<!-- Connection Pool -->
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>5.0.1</version>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- Testing -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>${spring-boot.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
Application Configuration
# application.yml
spring:
datasource:
url: jdbc:snowflake://${SNOWFLAKE_ACCOUNT}.snowflakecomputing.com/?db=${SNOWFLAKE_DATABASE}&warehouse=${SNOWFLAKE_WAREHOUSE}&schema=${SNOWFLAKE_SCHEMA}
username: ${SNOWFLAKE_USERNAME}
password: ${SNOWFLAKE_PASSWORD}
driver-class-name: net.snowflake.client.jdbc.SnowflakeDriver
hikari:
maximum-pool-size: 10
minimum-idle: 2
connection-timeout: 30000
idle-timeout: 600000
max-lifetime: 1800000
snowflake:
account: ${SNOWFLAKE_ACCOUNT:your_account}
database: ${SNOWFLAKE_DATABASE:PROD_DW}
warehouse: ${SNOWFLAKE_WAREHOUSE:COMPUTE_WH}
schema: ${SNOWFLAKE_SCHEMA:PUBLIC}
role: ${SNOWFLAKE_ROLE:SYSADMIN}
stage: ${SNOWFLAKE_STAGE:MY_STAGE}
file-format: ${SNOWFLAKE_FILE_FORMAT:MY_CSV_FORMAT}
app:
data-warehouse:
batch-size: 10000
max-retries: 3
retry-delay-ms: 1000
staging-table-prefix: STG_
history-table-suffix: _HISTORY
2. Snowflake Configuration and Connection Management
Snowflake Configuration Class
package com.example.snowflake.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Data
@Component
@ConfigurationProperties(prefix = "snowflake")
public class SnowflakeConfig {
private String account;
private String username;
private String password;
private String database;
private String warehouse;
private String schema;
private String role;
private String stage;
private String fileFormat;
public String getJdbcUrl() {
return String.format(
"jdbc:snowflake://%s.snowflakecomputing.com/?db=%s&warehouse=%s&schema=%s&role=%s",
account, database, warehouse, schema, role
);
}
}
Snowflake Connection Manager
package com.example.snowflake.connection;
import com.example.snowflake.config.SnowflakeConfig;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Properties;
@Component
public class SnowflakeConnectionManager {
private static final Logger logger = LoggerFactory.getLogger(SnowflakeConnectionManager.class);
private final HikariDataSource dataSource;
private final SnowflakeConfig config;
public SnowflakeConnectionManager(SnowflakeConfig config) {
this.config = config;
this.dataSource = createDataSource();
testConnection();
}
private HikariDataSource createDataSource() {
HikariConfig hikariConfig = new HikariConfig();
// Basic connection configuration
hikariConfig.setJdbcUrl(config.getJdbcUrl());
hikariConfig.setUsername(config.getUsername());
hikariConfig.setPassword(config.getPassword());
hikariConfig.setDriverClassName("net.snowflake.client.jdbc.SnowflakeDriver");
// Connection pool settings
hikariConfig.setMaximumPoolSize(10);
hikariConfig.setMinimumIdle(2);
hikariConfig.setConnectionTimeout(30000);
hikariConfig.setIdleTimeout(600000);
hikariConfig.setMaxLifetime(1800000);
// Snowflake-specific properties
Properties properties = new Properties();
properties.put("application", "JavaDataWarehouseApp");
properties.put("query_tag", "data_warehouse_ingestion");
hikariConfig.setDataSourceProperties(properties);
return new HikariDataSource(hikariConfig);
}
private void testConnection() {
try (Connection connection = dataSource.getConnection()) {
logger.info("Successfully connected to Snowflake: {}", config.getDatabase());
} catch (SQLException e) {
logger.error("Failed to connect to Snowflake", e);
throw new RuntimeException("Snowflake connection failed", e);
}
}
public Connection getConnection() throws SQLException {
return dataSource.getConnection();
}
public DataSource getDataSource() {
return dataSource;
}
public void close() {
if (dataSource != null && !dataSource.isClosed()) {
dataSource.close();
logger.info("Snowflake connection pool closed");
}
}
}
3. Data Warehouse Schema Management
DDL Management Service
package com.example.snowflake.schema;
import com.example.snowflake.connection.SnowflakeConnectionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.List;
@Service
public class SchemaManager {
private static final Logger logger = LoggerFactory.getLogger(SchemaManager.class);
private final SnowflakeConnectionManager connectionManager;
public SchemaManager(SnowflakeConnectionManager connectionManager) {
this.connectionManager = connectionManager;
}
public void createDatabase(String databaseName) {
String sql = String.format("CREATE DATABASE IF NOT EXISTS %s", databaseName);
executeUpdate(sql);
logger.info("Created database: {}", databaseName);
}
public void createSchema(String schemaName) {
String sql = String.format("CREATE SCHEMA IF NOT EXISTS %s", schemaName);
executeUpdate(sql);
logger.info("Created schema: {}", schemaName);
}
public void createWarehouse(String warehouseName, String size) {
String sql = String.format(
"CREATE WAREHOUSE IF NOT EXISTS %s WITH WAREHOUSE_SIZE = '%s' AUTO_SUSPEND = 300 AUTO_RESUME = TRUE",
warehouseName, size
);
executeUpdate(sql);
logger.info("Created warehouse: {} with size {}", warehouseName, size);
}
// Dimension Tables
public void createDateDimensionTable() {
String sql = """
CREATE TABLE IF NOT EXISTS DIM_DATE (
DATE_KEY NUMBER(8) PRIMARY KEY,
FULL_DATE DATE NOT NULL,
DAY_NAME VARCHAR(10) NOT NULL,
DAY_OF_WEEK NUMBER(1) NOT NULL,
DAY_OF_MONTH NUMBER(2) NOT NULL,
DAY_OF_YEAR NUMBER(3) NOT NULL,
WEEK_OF_YEAR NUMBER(2) NOT NULL,
MONTH_NAME VARCHAR(10) NOT NULL,
MONTH_NUMBER NUMBER(2) NOT NULL,
YEAR NUMBER(4) NOT NULL,
QUARTER NUMBER(1) NOT NULL,
IS_WEEKEND BOOLEAN NOT NULL,
IS_HOLIDAY BOOLEAN NOT NULL,
CREATED_TIMESTAMP TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
)
""";
executeUpdate(sql);
logger.info("Created date dimension table");
}
public void createCustomerDimensionTable() {
String sql = """
CREATE TABLE IF NOT EXISTS DIM_CUSTOMER (
CUSTOMER_KEY NUMBER(38) PRIMARY KEY,
CUSTOMER_ID VARCHAR(50) NOT NULL,
CUSTOMER_NAME VARCHAR(100) NOT NULL,
EMAIL VARCHAR(100),
PHONE VARCHAR(20),
ADDRESS VARCHAR(200),
CITY VARCHAR(50),
STATE VARCHAR(50),
COUNTRY VARCHAR(50),
POSTAL_CODE VARCHAR(20),
SEGMENT VARCHAR(50),
IS_ACTIVE BOOLEAN NOT NULL,
VALID_FROM DATE NOT NULL,
VALID_TO DATE,
IS_CURRENT BOOLEAN NOT NULL,
CREATED_TIMESTAMP TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP(),
UPDATED_TIMESTAMP TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
)
""";
executeUpdate(sql);
logger.info("Created customer dimension table");
}
public void createProductDimensionTable() {
String sql = """
CREATE TABLE IF NOT EXISTS DIM_PRODUCT (
PRODUCT_KEY NUMBER(38) PRIMARY KEY,
PRODUCT_ID VARCHAR(50) NOT NULL,
PRODUCT_NAME VARCHAR(200) NOT NULL,
CATEGORY VARCHAR(100),
SUBCATEGORY VARCHAR(100),
BRAND VARCHAR(100),
PRICE NUMBER(10,2),
COST NUMBER(10,2),
IS_ACTIVE BOOLEAN NOT NULL,
VALID_FROM DATE NOT NULL,
VALID_TO DATE,
IS_CURRENT BOOLEAN NOT NULL,
CREATED_TIMESTAMP TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP(),
UPDATED_TIMESTAMP TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
)
""";
executeUpdate(sql);
logger.info("Created product dimension table");
}
// Fact Tables
public void createSalesFactTable() {
String sql = """
CREATE TABLE IF NOT EXISTS FACT_SALES (
SALES_KEY NUMBER(38) PRIMARY KEY,
DATE_KEY NUMBER(8) NOT NULL REFERENCES DIM_DATE(DATE_KEY),
CUSTOMER_KEY NUMBER(38) NOT NULL REFERENCES DIM_CUSTOMER(CUSTOMER_KEY),
PRODUCT_KEY NUMBER(38) NOT NULL REFERENCES DIM_PRODUCT(PRODUCT_KEY),
ORDER_ID VARCHAR(50) NOT NULL,
ORDER_DATE DATE NOT NULL,
QUANTITY NUMBER(10) NOT NULL,
UNIT_PRICE NUMBER(10,2) NOT NULL,
TOTAL_AMOUNT NUMBER(10,2) NOT NULL,
DISCOUNT_AMOUNT NUMBER(10,2) DEFAULT 0,
NET_AMOUNT NUMBER(10,2) NOT NULL,
SHIPPING_COST NUMBER(10,2) DEFAULT 0,
TAX_AMOUNT NUMBER(10,2) DEFAULT 0,
PAYMENT_METHOD VARCHAR(50),
SALES_REP_ID VARCHAR(50),
CREATED_TIMESTAMP TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
)
""";
executeUpdate(sql);
logger.info("Created sales fact table");
}
// Staging Tables
public void createStagingTable(String tableName, String columns) {
String sql = String.format(
"CREATE TABLE IF NOT EXISTS %s (%s)",
tableName, columns
);
executeUpdate(sql);
logger.info("Created staging table: {}", tableName);
}
// Indexes and Clustering
public void createIndexes() {
List<String> indexSqls = Arrays.asList(
"CREATE CLUSTER BY (DATE_KEY) ON FACT_SALES",
"CREATE CLUSTER BY (CUSTOMER_KEY) ON FACT_SALES",
"CREATE CLUSTER BY (PRODUCT_KEY) ON FACT_SALES",
"CREATE INDEX IF NOT EXISTS IDX_FACT_SALES_DATE ON FACT_SALES(DATE_KEY)",
"CREATE INDEX IF NOT EXISTS IDX_FACT_SALES_CUSTOMER ON FACT_SALES(CUSTOMER_KEY)",
"CREATE INDEX IF NOT EXISTS IDX_FACT_SALES_PRODUCT ON FACT_SALES(PRODUCT_KEY)",
"CREATE INDEX IF NOT EXISTS IDX_DIM_CUSTOMER_ID ON DIM_CUSTOMER(CUSTOMER_ID)",
"CREATE INDEX IF NOT EXISTS IDX_DIM_PRODUCT_ID ON DIM_PRODUCT(PRODUCT_ID)"
);
for (String sql : indexSqls) {
executeUpdate(sql);
}
logger.info("Created indexes and clustering keys");
}
// File Format for Data Loading
public void createFileFormat() {
String sql = """
CREATE FILE FORMAT IF NOT EXISTS CSV_FORMAT
TYPE = 'CSV'
FIELD_DELIMITER = ','
SKIP_HEADER = 1
NULL_IF = ('NULL', 'null')
EMPTY_FIELD_AS_NULL = TRUE
COMPRESSION = AUTO
""";
executeUpdate(sql);
logger.info("Created CSV file format");
}
// Stage for Data Loading
public void createStage(String stageName, String fileFormat) {
String sql = String.format(
"CREATE STAGE IF NOT EXISTS %s FILE_FORMAT = %s",
stageName, fileFormat
);
executeUpdate(sql);
logger.info("Created stage: {}", stageName);
}
private void executeUpdate(String sql) {
try (Connection connection = connectionManager.getConnection();
Statement statement = connection.createStatement()) {
statement.execute(sql);
} catch (SQLException e) {
logger.error("Failed to execute SQL: {}", sql, e);
throw new SchemaManagementException("Schema operation failed", e);
}
}
public static class SchemaManagementException extends RuntimeException {
public SchemaManagementException(String message) {
super(message);
}
public SchemaManagementException(String message, Throwable cause) {
super(message, cause);
}
}
}
4. Data Ingestion Service
Data Ingestion Service
package com.example.snowflake.ingestion;
import com.example.snowflake.connection.SnowflakeConnectionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.io.File;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.Map;
@Service
public class DataIngestionService {
private static final Logger logger = LoggerFactory.getLogger(DataIngestionService.class);
private final SnowflakeConnectionManager connectionManager;
@Value("${snowflake.stage:MY_STAGE}")
private String stageName;
@Value("${snowflake.file-format:MY_CSV_FORMAT}")
private String fileFormat;
@Value("${app.data-warehouse.batch-size:10000}")
private int batchSize;
public DataIngestionService(SnowflakeConnectionManager connectionManager) {
this.connectionManager = connectionManager;
}
// Bulk load from stage
public void bulkLoadFromStage(String targetTable, String stagePath, String filePattern) {
String sql = String.format(
"COPY INTO %s FROM @%s/%s FILE_FORMAT = (FORMAT_NAME = '%s')",
targetTable, stageName, stagePath, fileFormat
);
executeUpdate(sql);
logger.info("Bulk loaded data into {} from stage path: {}", targetTable, stagePath);
}
// Bulk load from local file
public void bulkLoadFromLocalFile(String targetTable, File localFile) {
try {
// First, upload file to stage
String fileName = localFile.getName();
uploadFileToStage(localFile, fileName);
// Then load from stage
bulkLoadFromStage(targetTable, "", fileName);
logger.info("Successfully loaded data from local file: {} into {}", fileName, targetTable);
} catch (Exception e) {
logger.error("Failed to load data from local file: {}", localFile.getName(), e);
throw new DataIngestionException("Bulk load from local file failed", e);
}
}
private void uploadFileToStage(File file, String fileName) {
String sql = String.format("PUT file://%s @%s/%s", file.getAbsolutePath(), stageName, fileName);
executeUpdate(sql);
logger.debug("Uploaded file {} to stage", fileName);
}
// Batch insert for real-time data
public void batchInsert(String tableName, List<Map<String, Object>> records) {
if (records.isEmpty()) {
return;
}
// Generate insert statement based on first record's keys
Map<String, Object> firstRecord = records.get(0);
String columns = String.join(", ", firstRecord.keySet());
String placeholders = String.join(", ", firstRecord.keySet().stream()
.map(k -> "?").toList());
String sql = String.format("INSERT INTO %s (%s) VALUES (%s)", tableName, columns, placeholders);
try (Connection connection = connectionManager.getConnection();
PreparedStatement statement = connection.prepareStatement(sql)) {
int count = 0;
for (Map<String, Object> record : records) {
int paramIndex = 1;
for (String key : firstRecord.keySet()) {
Object value = record.get(key);
statement.setObject(paramIndex++, value);
}
statement.addBatch();
if (++count % batchSize == 0) {
statement.executeBatch();
logger.debug("Executed batch of {} records", batchSize);
}
}
// Execute remaining batches
if (count % batchSize != 0) {
statement.executeBatch();
}
logger.info("Successfully inserted {} records into {}", records.size(), tableName);
} catch (SQLException e) {
logger.error("Failed to batch insert into {}: {}", tableName, e.getMessage());
throw new DataIngestionException("Batch insert failed", e);
}
}
// Merge (UPSERT) operation for SCD Type 2 dimensions
public void mergeCustomerDimension(List<Map<String, Object>> customerUpdates) {
String mergeSql = """
MERGE INTO DIM_CUSTOMER AS target
USING (SELECT ? AS CUSTOMER_ID, ? AS CUSTOMER_NAME, ? AS EMAIL, ? AS PHONE,
? AS ADDRESS, ? AS CITY, ? AS STATE, ? AS COUNTRY, ? AS POSTAL_CODE,
? AS SEGMENT, ? AS IS_ACTIVE, ? AS VALID_FROM) AS source
ON target.CUSTOMER_ID = source.CUSTOMER_ID AND target.IS_CURRENT = TRUE
WHEN MATCHED AND (target.CUSTOMER_NAME != source.CUSTOMER_NAME OR
target.EMAIL != source.EMAIL OR target.PHONE != source.PHONE OR
target.ADDRESS != source.ADDRESS OR target.CITY != source.CITY OR
target.STATE != source.STATE OR target.COUNTRY != source.COUNTRY OR
target.POSTAL_CODE != source.POSTAL_CODE OR target.SEGMENT != source.SEGMENT OR
target.IS_ACTIVE != source.IS_ACTIVE)
THEN UPDATE SET
target.VALID_TO = source.VALID_FROM - INTERVAL '1 day',
target.IS_CURRENT = FALSE,
target.UPDATED_TIMESTAMP = CURRENT_TIMESTAMP()
WHEN NOT MATCHED THEN
INSERT (CUSTOMER_KEY, CUSTOMER_ID, CUSTOMER_NAME, EMAIL, PHONE, ADDRESS, CITY,
STATE, COUNTRY, POSTAL_CODE, SEGMENT, IS_ACTIVE, VALID_FROM, VALID_TO, IS_CURRENT)
VALUES (DIM_CUSTOMER_SEQ.NEXTVAL, source.CUSTOMER_ID, source.CUSTOMER_NAME, source.EMAIL,
source.PHONE, source.ADDRESS, source.CITY, source.STATE, source.COUNTRY,
source.POSTAL_CODE, source.SEGMENT, source.IS_ACTIVE, source.VALID_FROM,
'9999-12-31', TRUE)
""";
try (Connection connection = connectionManager.getConnection();
PreparedStatement statement = connection.prepareStatement(mergeSql)) {
for (Map<String, Object> customer : customerUpdates) {
statement.setString(1, (String) customer.get("CUSTOMER_ID"));
statement.setString(2, (String) customer.get("CUSTOMER_NAME"));
statement.setString(3, (String) customer.get("EMAIL"));
statement.setString(4, (String) customer.get("PHONE"));
statement.setString(5, (String) customer.get("ADDRESS"));
statement.setString(6, (String) customer.get("CITY"));
statement.setString(7, (String) customer.get("STATE"));
statement.setString(8, (String) customer.get("COUNTRY"));
statement.setString(9, (String) customer.get("POSTAL_CODE"));
statement.setString(10, (String) customer.get("SEGMENT"));
statement.setBoolean(11, (Boolean) customer.get("IS_ACTIVE"));
statement.setDate(12, java.sql.Date.valueOf(customer.get("VALID_FROM").toString()));
statement.addBatch();
}
int[] results = statement.executeBatch();
logger.info("Merged {} customer records", results.length);
} catch (SQLException e) {
logger.error("Failed to merge customer dimension", e);
throw new DataIngestionException("Customer dimension merge failed", e);
}
}
// ETL Process for loading fact table
public void loadSalesFactFromStaging() {
String etlSql = """
INSERT INTO FACT_SALES (
SALES_KEY, DATE_KEY, CUSTOMER_KEY, PRODUCT_KEY, ORDER_ID, ORDER_DATE,
QUANTITY, UNIT_PRICE, TOTAL_AMOUNT, DISCOUNT_AMOUNT, NET_AMOUNT,
SHIPPING_COST, TAX_AMOUNT, PAYMENT_METHOD, SALES_REP_ID
)
SELECT
FACT_SALES_SEQ.NEXTVAL,
d.DATE_KEY,
c.CUSTOMER_KEY,
p.PRODUCT_KEY,
s.ORDER_ID,
s.ORDER_DATE,
s.QUANTITY,
s.UNIT_PRICE,
s.TOTAL_AMOUNT,
s.DISCOUNT_AMOUNT,
s.NET_AMOUNT,
s.SHIPPING_COST,
s.TAX_AMOUNT,
s.PAYMENT_METHOD,
s.SALES_REP_ID
FROM STG_SALES s
JOIN DIM_DATE d ON s.ORDER_DATE = d.FULL_DATE
JOIN DIM_CUSTOMER c ON s.CUSTOMER_ID = c.CUSTOMER_ID AND c.IS_CURRENT = TRUE
JOIN DIM_PRODUCT p ON s.PRODUCT_ID = p.PRODUCT_ID AND p.IS_CURRENT = TRUE
WHERE NOT EXISTS (
SELECT 1 FROM FACT_SALES fs
WHERE fs.ORDER_ID = s.ORDER_ID AND fs.PRODUCT_KEY = p.PRODUCT_KEY
)
""";
int affectedRows = executeUpdateWithCount(etlSql);
logger.info("Loaded {} records into sales fact table", affectedRows);
}
private void executeUpdate(String sql) {
try (Connection connection = connectionManager.getConnection();
Statement statement = connection.createStatement()) {
statement.execute(sql);
} catch (SQLException e) {
logger.error("Failed to execute SQL: {}", sql, e);
throw new DataIngestionException("Data ingestion operation failed", e);
}
}
private int executeUpdateWithCount(String sql) {
try (Connection connection = connectionManager.getConnection();
Statement statement = connection.createStatement()) {
return statement.executeUpdate(sql);
} catch (SQLException e) {
logger.error("Failed to execute SQL: {}", sql, e);
throw new DataIngestionException("Data ingestion operation failed", e);
}
}
public static class DataIngestionException extends RuntimeException {
public DataIngestionException(String message) {
super(message);
}
public DataIngestionException(String message, Throwable cause) {
super(message, cause);
}
}
}
5. Data Query and Analytics Service
Analytics Query Service
package com.example.snowflake.analytics;
import com.example.snowflake.connection.SnowflakeConnectionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Service
public class AnalyticsQueryService {
private static final Logger logger = LoggerFactory.getLogger(AnalyticsQueryService.class);
private final SnowflakeConnectionManager connectionManager;
public AnalyticsQueryService(SnowflakeConnectionManager connectionManager) {
this.connectionManager = connectionManager;
}
// Sales Analytics
public List<SalesSummary> getSalesSummary(LocalDate startDate, LocalDate endDate) {
String sql = """
SELECT
d.YEAR,
d.QUARTER,
d.MONTH_NAME,
SUM(fs.QUANTITY) as TOTAL_QUANTITY,
SUM(fs.TOTAL_AMOUNT) as TOTAL_SALES,
SUM(fs.NET_AMOUNT) as NET_SALES,
AVG(fs.UNIT_PRICE) as AVG_UNIT_PRICE,
COUNT(DISTINCT fs.ORDER_ID) as ORDER_COUNT,
COUNT(DISTINCT fs.CUSTOMER_KEY) as CUSTOMER_COUNT
FROM FACT_SALES fs
JOIN DIM_DATE d ON fs.DATE_KEY = d.DATE_KEY
WHERE d.FULL_DATE BETWEEN ? AND ?
GROUP BY d.YEAR, d.QUARTER, d.MONTH_NAME
ORDER BY d.YEAR, d.QUARTER, d.MONTH_NAME
""";
try (Connection connection = connectionManager.getConnection();
PreparedStatement statement = connection.prepareStatement(sql)) {
statement.setDate(1, java.sql.Date.valueOf(startDate));
statement.setDate(2, java.sql.Date.valueOf(endDate));
ResultSet rs = statement.executeQuery();
List<SalesSummary> results = new ArrayList<>();
while (rs.next()) {
SalesSummary summary = new SalesSummary(
rs.getInt("YEAR"),
rs.getInt("QUARTER"),
rs.getString("MONTH_NAME"),
rs.getLong("TOTAL_QUANTITY"),
rs.getBigDecimal("TOTAL_SALES"),
rs.getBigDecimal("NET_SALES"),
rs.getBigDecimal("AVG_UNIT_PRICE"),
rs.getLong("ORDER_COUNT"),
rs.getLong("CUSTOMER_COUNT")
);
results.add(summary);
}
return results;
} catch (SQLException e) {
logger.error("Failed to get sales summary", e);
throw new AnalyticsException("Sales summary query failed", e);
}
}
// Customer Analytics
public List<CustomerAnalysis> getTopCustomers(int limit) {
String sql = """
SELECT
c.CUSTOMER_ID,
c.CUSTOMER_NAME,
c.SEGMENT,
c.COUNTRY,
COUNT(DISTINCT fs.ORDER_ID) as ORDER_COUNT,
SUM(fs.NET_AMOUNT) as TOTAL_SPENT,
AVG(fs.NET_AMOUNT) as AVG_ORDER_VALUE,
MAX(fs.ORDER_DATE) as LAST_ORDER_DATE
FROM FACT_SALES fs
JOIN DIM_CUSTOMER c ON fs.CUSTOMER_KEY = c.CUSTOMER_KEY
WHERE c.IS_CURRENT = TRUE
GROUP BY c.CUSTOMER_ID, c.CUSTOMER_NAME, c.SEGMENT, c.COUNTRY
ORDER BY TOTAL_SPENT DESC
LIMIT ?
""";
try (Connection connection = connectionManager.getConnection();
PreparedStatement statement = connection.prepareStatement(sql)) {
statement.setInt(1, limit);
ResultSet rs = statement.executeQuery();
List<CustomerAnalysis> results = new ArrayList<>();
while (rs.next()) {
CustomerAnalysis analysis = new CustomerAnalysis(
rs.getString("CUSTOMER_ID"),
rs.getString("CUSTOMER_NAME"),
rs.getString("SEGMENT"),
rs.getString("COUNTRY"),
rs.getLong("ORDER_COUNT"),
rs.getBigDecimal("TOTAL_SPENT"),
rs.getBigDecimal("AVG_ORDER_VALUE"),
rs.getDate("LAST_ORDER_DATE").toLocalDate()
);
results.add(analysis);
}
return results;
} catch (SQLException e) {
logger.error("Failed to get top customers", e);
throw new AnalyticsException("Top customers query failed", e);
}
}
// Product Performance
public List<ProductPerformance> getProductPerformance(String category) {
String sql = """
SELECT
p.PRODUCT_ID,
p.PRODUCT_NAME,
p.CATEGORY,
p.BRAND,
SUM(fs.QUANTITY) as TOTAL_QUANTITY_SOLD,
SUM(fs.NET_AMOUNT) as TOTAL_REVENUE,
AVG(fs.UNIT_PRICE) as AVG_SELLING_PRICE,
p.COST as UNIT_COST,
(AVG(fs.UNIT_PRICE) - p.COST) as AVG_PROFIT_MARGIN
FROM FACT_SALES fs
JOIN DIM_PRODUCT p ON fs.PRODUCT_KEY = p.PRODUCT_KEY
WHERE p.IS_CURRENT = TRUE AND p.CATEGORY = ?
GROUP BY p.PRODUCT_ID, p.PRODUCT_NAME, p.CATEGORY, p.BRAND, p.COST
ORDER BY TOTAL_REVENUE DESC
""";
try (Connection connection = connectionManager.getConnection();
PreparedStatement statement = connection.prepareStatement(sql)) {
statement.setString(1, category);
ResultSet rs = statement.executeQuery();
List<ProductPerformance> results = new ArrayList<>();
while (rs.next()) {
ProductPerformance performance = new ProductPerformance(
rs.getString("PRODUCT_ID"),
rs.getString("PRODUCT_NAME"),
rs.getString("CATEGORY"),
rs.getString("BRAND"),
rs.getLong("TOTAL_QUANTITY_SOLD"),
rs.getBigDecimal("TOTAL_REVENUE"),
rs.getBigDecimal("AVG_SELLING_PRICE"),
rs.getBigDecimal("UNIT_COST"),
rs.getBigDecimal("AVG_PROFIT_MARGIN")
);
results.add(performance);
}
return results;
} catch (SQLException e) {
logger.error("Failed to get product performance", e);
throw new AnalyticsException("Product performance query failed", e);
}
}
// Monthly Sales Trend
public List<MonthlyTrend> getMonthlySalesTrend(int year) {
String sql = """
SELECT
d.MONTH_NAME,
d.MONTH_NUMBER,
SUM(fs.NET_AMOUNT) as MONTHLY_SALES,
LAG(SUM(fs.NET_AMOUNT)) OVER (ORDER BY d.MONTH_NUMBER) as PREV_MONTH_SALES,
(SUM(fs.NET_AMOUNT) - LAG(SUM(fs.NET_AMOUNT)) OVER (ORDER BY d.MONTH_NUMBER))
/ LAG(SUM(fs.NET_AMOUNT)) OVER (ORDER BY d.MONTH_NUMBER) * 100 as GROWTH_PERCENT
FROM FACT_SALES fs
JOIN DIM_DATE d ON fs.DATE_KEY = d.DATE_KEY
WHERE d.YEAR = ?
GROUP BY d.MONTH_NAME, d.MONTH_NUMBER
ORDER BY d.MONTH_NUMBER
""";
try (Connection connection = connectionManager.getConnection();
PreparedStatement statement = connection.prepareStatement(sql)) {
statement.setInt(1, year);
ResultSet rs = statement.executeQuery();
List<MonthlyTrend> results = new ArrayList<>();
while (rs.next()) {
MonthlyTrend trend = new MonthlyTrend(
rs.getString("MONTH_NAME"),
rs.getInt("MONTH_NUMBER"),
rs.getBigDecimal("MONTHLY_SALES"),
rs.getBigDecimal("PREV_MONTH_SALES"),
rs.getBigDecimal("GROWTH_PERCENT")
);
results.add(trend);
}
return results;
} catch (SQLException e) {
logger.error("Failed to get monthly sales trend", e);
throw new AnalyticsException("Monthly trend query failed", e);
}
}
// Custom Query Execution
public List<Map<String, Object>> executeCustomQuery(String sql, Object... params) {
try (Connection connection = connectionManager.getConnection();
PreparedStatement statement = connection.prepareStatement(sql)) {
// Set parameters
for (int i = 0; i < params.length; i++) {
statement.setObject(i + 1, params[i]);
}
ResultSet rs = statement.executeQuery();
List<Map<String, Object>> results = new ArrayList<>();
var metadata = rs.getMetaData();
int columnCount = metadata.getColumnCount();
while (rs.next()) {
Map<String, Object> row = new HashMap<>();
for (int i = 1; i <= columnCount; i++) {
String columnName = metadata.getColumnName(i);
Object value = rs.getObject(i);
row.put(columnName, value);
}
results.add(row);
}
return results;
} catch (SQLException e) {
logger.error("Failed to execute custom query", e);
throw new AnalyticsException("Custom query execution failed", e);
}
}
// Data Models
public record SalesSummary(int year, int quarter, String month, long totalQuantity,
java.math.BigDecimal totalSales, java.math.BigDecimal netSales,
java.math.BigDecimal avgUnitPrice, long orderCount, long customerCount) {}
public record CustomerAnalysis(String customerId, String customerName, String segment,
String country, long orderCount, java.math.BigDecimal totalSpent,
java.math.BigDecimal avgOrderValue, java.time.LocalDate lastOrderDate) {}
public record ProductPerformance(String productId, String productName, String category,
String brand, long quantitySold, java.math.BigDecimal totalRevenue,
java.math.BigDecimal avgSellingPrice, java.math.BigDecimal unitCost,
java.math.BigDecimal profitMargin) {}
public record MonthlyTrend(String monthName, int monthNumber, java.math.BigDecimal monthlySales,
java.math.BigDecimal prevMonthSales, java.math.BigDecimal growthPercent) {}
public static class AnalyticsException extends RuntimeException {
public AnalyticsException(String message) {
super(message);
}
public AnalyticsException(String message, Throwable cause) {
super(message, cause);
}
}
}
6. Data Warehouse Management Service
Warehouse Management Service
package com.example.snowflake.management;
import com.example.snowflake.connection.SnowflakeConnectionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
@Service
public class WarehouseManagementService {
private static final Logger logger = LoggerFactory.getLogger(WarehouseManagementService.class);
private final SnowflakeConnectionManager connectionManager;
public WarehouseManagementService(SnowflakeConnectionManager connectionManager) {
this.connectionManager = connectionManager;
}
// Warehouse operations
public void resizeWarehouse(String warehouseName, String newSize) {
String sql = String.format("ALTER WAREHOUSE %s SET WAREHOUSE_SIZE = %s", warehouseName, newSize);
executeUpdate(sql);
logger.info("Resized warehouse {} to {}", warehouseName, newSize);
}
public void suspendWarehouse(String warehouseName) {
String sql = String.format("ALTER WAREHOUSE %s SUSPEND", warehouseName);
executeUpdate(sql);
logger.info("Suspended warehouse: {}", warehouseName);
}
public void resumeWarehouse(String warehouseName) {
String sql = String.format("ALTER WAREHOUSE %s RESUME", warehouseName);
executeUpdate(sql);
logger.info("Resumed warehouse: {}", warehouseName);
}
// Table maintenance
public void cloneTable(String sourceTable, String targetTable) {
String sql = String.format("CREATE TABLE %s CLONE %s", targetTable, sourceTable);
executeUpdate(sql);
logger.info("Cloned table {} to {}", sourceTable, targetTable);
}
public void swapTables(String table1, String table2) {
String sql = String.format("ALTER TABLE %s SWAP WITH %s", table1, table2);
executeUpdate(sql);
logger.info("Swapped tables {} and {}", table1, table2);
}
// Query history and monitoring
public List<QueryHistory> getRecentQueryHistory(int limit) {
String sql = String.format("""
SELECT
QUERY_ID,
QUERY_TEXT,
DATABASE_NAME,
SCHEMA_NAME,
QUERY_TYPE,
SESSION_ID,
USER_NAME,
ROLE_NAME,
WAREHOUSE_NAME,
WAREHOUSE_SIZE,
EXECUTION_STATUS,
ERROR_CODE,
ERROR_MESSAGE,
START_TIME,
END_TIME,
TOTAL_ELAPSED_TIME,
BYTES_SCANNED,
ROWS_PRODUCED
FROM TABLE(INFORMATION_SCHEMA.QUERY_HISTORY())
ORDER BY START_TIME DESC
LIMIT %d
""", limit);
try (Connection connection = connectionManager.getConnection();
Statement statement = connection.createStatement();
ResultSet rs = statement.executeQuery(sql)) {
List<QueryHistory> history = new ArrayList<>();
while (rs.next()) {
QueryHistory query = new QueryHistory(
rs.getString("QUERY_ID"),
rs.getString("QUERY_TEXT"),
rs.getString("DATABASE_NAME"),
rs.getString("SCHEMA_NAME"),
rs.getString("QUERY_TYPE"),
rs.getString("SESSION_ID"),
rs.getString("USER_NAME"),
rs.getString("ROLE_NAME"),
rs.getString("WAREHOUSE_NAME"),
rs.getString("WAREHOUSE_SIZE"),
rs.getString("EXECUTION_STATUS"),
rs.getString("ERROR_CODE"),
rs.getString("ERROR_MESSAGE"),
rs.getTimestamp("START_TIME"),
rs.getTimestamp("END_TIME"),
rs.getLong("TOTAL_ELAPSED_TIME"),
rs.getLong("BYTES_SCANNED"),
rs.getLong("ROWS_PRODUCED")
);
history.add(query);
}
return history;
} catch (SQLException e) {
logger.error("Failed to get query history", e);
throw new ManagementException("Query history retrieval failed", e);
}
}
// Storage monitoring
public StorageUsage getStorageUsage() {
String sql = """
SELECT
USAGE_DATE,
STORAGE_BYTES,
STAGE_BYTES,
FAILSAFE_BYTES
FROM TABLE(INFORMATION_SCHEMA.STORAGE_USAGE())
ORDER BY USAGE_DATE DESC
LIMIT 1
""";
try (Connection connection = connectionManager.getConnection();
Statement statement = connection.createStatement();
ResultSet rs = statement.executeQuery(sql)) {
if (rs.next()) {
return new StorageUsage(
rs.getDate("USAGE_DATE"),
rs.getLong("STORAGE_BYTES"),
rs.getLong("STAGE_BYTES"),
rs.getLong("FAILSAFE_BYTES")
);
}
return null;
} catch (SQLException e) {
logger.error("Failed to get storage usage", e);
throw new ManagementException("Storage usage retrieval failed", e);
}
}
// User sessions
public List<UserSession> getActiveSessions() {
String sql = """
SELECT
SESSION_ID,
USER_NAME,
ROLE_NAME,
CLIENT_APPLICATION_ID,
CLIENT_APPLICATION_VERSION,
LOGIN_TIME,
FIRST_AUTHENTICATION_FACTOR,
WAREHOUSE_NAME,
CURRENT_DATABASE,
CURRENT_SCHEMA
FROM TABLE(INFORMATION_SCHEMA.SESSIONS())
WHERE IS_CURRENT = TRUE
ORDER BY LOGIN_TIME DESC
""";
try (Connection connection = connectionManager.getConnection();
Statement statement = connection.createStatement();
ResultSet rs = statement.executeQuery(sql)) {
List<UserSession> sessions = new ArrayList<>();
while (rs.next()) {
UserSession session = new UserSession(
rs.getString("SESSION_ID"),
rs.getString("USER_NAME"),
rs.getString("ROLE_NAME"),
rs.getString("CLIENT_APPLICATION_ID"),
rs.getString("CLIENT_APPLICATION_VERSION"),
rs.getTimestamp("LOGIN_TIME"),
rs.getString("FIRST_AUTHENTICATION_FACTOR"),
rs.getString("WAREHOUSE_NAME"),
rs.getString("CURRENT_DATABASE"),
rs.getString("CURRENT_SCHEMA")
);
sessions.add(session);
}
return sessions;
} catch (SQLException e) {
logger.error("Failed to get active sessions", e);
throw new ManagementException("Active sessions retrieval failed", e);
}
}
// Data sharing (if using Snowflake Data Sharing)
public void createShare(String shareName, String databases) {
String sql = String.format("CREATE SHARE %s", shareName);
executeUpdate(sql);
// Add databases to share
for (String database : databases.split(",")) {
String addDbSql = String.format("GRANT USAGE ON DATABASE %s TO SHARE %s", database, shareName);
executeUpdate(addDbSql);
}
logger.info("Created share: {} with databases: {}", shareName, databases);
}
private void executeUpdate(String sql) {
try (Connection connection = connectionManager.getConnection();
Statement statement = connection.createStatement()) {
statement.execute(sql);
} catch (SQLException e) {
logger.error("Failed to execute SQL: {}", sql, e);
throw new ManagementException("Management operation failed", e);
}
}
// Data Models
public record QueryHistory(String queryId, String queryText, String databaseName, String schemaName,
String queryType, String sessionId, String userName, String roleName,
String warehouseName, String warehouseSize, String executionStatus,
String errorCode, String errorMessage, java.sql.Timestamp startTime,
java.sql.Timestamp endTime, long totalElapsedTime, long bytesScanned,
long rowsProduced) {}
public record StorageUsage(java.sql.Date usageDate, long storageBytes, long stageBytes, long failsafeBytes) {}
public record UserSession(String sessionId, String userName, String roleName, String clientApplicationId,
String clientApplicationVersion, java.sql.Timestamp loginTime,
String firstAuthenticationFactor, String warehouseName, String currentDatabase,
String currentSchema) {}
public static class ManagementException extends RuntimeException {
public ManagementException(String message) {
super(message);
}
public ManagementException(String message, Throwable cause) {
super(message, cause);
}
}
}
7. REST Controllers
Data Warehouse Controller
package com.example.snowflake.web;
import com.example.snowflake.analytics.AnalyticsQueryService;
import com.example.snowflake.ingestion.DataIngestionService;
import com.example.snowflake.management.WarehouseManagementService;
import com.example.snowflake.schema.SchemaManager;
import org.springframework.format.annotation.DateTimeFormat;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.multipart.MultipartFile;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.time.LocalDate;
import java.util.List;
import java.util.Map;
@RestController
@RequestMapping("/api/data-warehouse")
public class DataWarehouseController {
private final SchemaManager schemaManager;
private final DataIngestionService ingestionService;
private final AnalyticsQueryService analyticsService;
private final WarehouseManagementService managementService;
public DataWarehouseController(SchemaManager schemaManager, DataIngestionService ingestionService,
AnalyticsQueryService analyticsService, WarehouseManagementService managementService) {
this.schemaManager = schemaManager;
this.ingestionService = ingestionService;
this.analyticsService = analyticsService;
this.managementService = managementService;
}
// Schema Management Endpoints
@PostMapping("/schema/initialize")
public ResponseEntity<String> initializeSchema() {
schemaManager.createDateDimensionTable();
schemaManager.createCustomerDimensionTable();
schemaManager.createProductDimensionTable();
schemaManager.createSalesFactTable();
schemaManager.createIndexes();
schemaManager.createFileFormat();
schemaManager.createStage("MY_STAGE", "CSV_FORMAT");
return ResponseEntity.ok("Data warehouse schema initialized successfully");
}
// Data Ingestion Endpoints
@PostMapping("/ingest/csv")
public ResponseEntity<String> ingestCsvFile(
@RequestParam String targetTable,
@RequestParam MultipartFile file) {
try {
// Save uploaded file temporarily
Path tempFile = Files.createTempFile("snowflake_upload_", ".csv");
Files.copy(file.getInputStream(), tempFile, StandardCopyOption.REPLACE_EXISTING);
// Ingest data
ingestionService.bulkLoadFromLocalFile(targetTable, tempFile.toFile());
// Clean up
Files.delete(tempFile);
return ResponseEntity.ok("Data ingested successfully into " + targetTable);
} catch (IOException e) {
return ResponseEntity.badRequest().body("File upload failed: " + e.getMessage());
}
}
@PostMapping("/ingest/staging")
public ResponseEntity<String> ingestFromStaging(
@RequestParam String targetTable,
@RequestParam String stagePath) {
ingestionService.bulkLoadFromStage(targetTable, stagePath, "*.csv");
return ResponseEntity.ok("Data ingested from staging area");
}
@PostMapping("/etl/sales-fact")
public ResponseEntity<String> runSalesFactETL() {
ingestionService.loadSalesFactFromStaging();
return ResponseEntity.ok("Sales fact ETL process completed");
}
// Analytics Endpoints
@GetMapping("/analytics/sales-summary")
public ResponseEntity<List<AnalyticsQueryService.SalesSummary>> getSalesSummary(
@RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE) LocalDate startDate,
@RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE) LocalDate endDate) {
List<AnalyticsQueryService.SalesSummary> summary =
analyticsService.getSalesSummary(startDate, endDate);
return ResponseEntity.ok(summary);
}
@GetMapping("/analytics/top-customers")
public ResponseEntity<List<AnalyticsQueryService.CustomerAnalysis>> getTopCustomers(
@RequestParam(defaultValue = "10") int limit) {
List<AnalyticsQueryService.CustomerAnalysis> topCustomers =
analyticsService.getTopCustomers(limit);
return ResponseEntity.ok(topCustomers);
}
@GetMapping("/analytics/product-performance")
public ResponseEntity<List<AnalyticsQueryService.ProductPerformance>> getProductPerformance(
@RequestParam String category) {
List<AnalyticsQueryService.ProductPerformance> performance =
analyticsService.getProductPerformance(category);
return ResponseEntity.ok(performance);
}
@GetMapping("/analytics/monthly-trend")
public ResponseEntity<List<AnalyticsQueryService.MonthlyTrend>> getMonthlyTrend(
@RequestParam int year) {
List<AnalyticsQueryService.MonthlyTrend> trend =
analyticsService.getMonthlySalesTrend(year);
return ResponseEntity.ok(trend);
}
// Management Endpoints
@GetMapping("/management/query-history")
public ResponseEntity<List<WarehouseManagementService.QueryHistory>> getQueryHistory(
@RequestParam(defaultValue = "50") int limit) {
List<WarehouseManagementService.QueryHistory> history =
managementService.getRecentQueryHistory(limit);
return ResponseEntity.ok(history);
}
@GetMapping("/management/storage-usage")
public ResponseEntity<WarehouseManagementService.StorageUsage> getStorageUsage() {
WarehouseManagementService.StorageUsage usage = managementService.getStorageUsage();
return ResponseEntity.ok(usage);
}
@GetMapping("/management/active-sessions")
public ResponseEntity<List<WarehouseManagementService.UserSession>> getActiveSessions() {
List<WarehouseManagementService.UserSession> sessions = managementService.getActiveSessions();
return ResponseEntity.ok(sessions);
}
@PostMapping("/management/warehouse/{warehouseName}/resize")
public ResponseEntity<String> resizeWarehouse(
@PathVariable String warehouseName,
@RequestParam String newSize) {
managementService.resizeWarehouse(warehouseName, newSize);
return ResponseEntity.ok("Warehouse resized successfully");
}
// Custom Query Execution
@PostMapping("/query/custom")
public ResponseEntity<List<Map<String, Object>>> executeCustomQuery(@RequestBody CustomQueryRequest request) {
List<Map<String, Object>> results = analyticsService.executeCustomQuery(
request.sql(), request.params().toArray()
);
return ResponseEntity.ok(results);
}
public record CustomQueryRequest(String sql, List<Object> params) {}
}
8. Spring Boot Application
Main Application Class
package com.example.snowflake;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
public class SnowflakeDataWarehouseApplication {
public static void main(String[] args) {
SpringApplication.run(SnowflakeDataWarehouseApplication.class, args);
}
}
Scheduled Tasks
package com.example.snowflake.scheduling;
import com.example.snowflake.ingestion.DataIngestionService;
import com.example.snowflake.management.WarehouseManagementService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class ScheduledTasks {
private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
private final DataIngestionService ingestionService;
private final WarehouseManagementService managementService;
public ScheduledTasks(DataIngestionService ingestionService,
WarehouseManagementService managementService) {
this.ingestionService = ingestionService;
this.managementService = managementService;
}
// Run ETL process every hour
@Scheduled(cron = "0 0 * * * *")
public void runScheduledETL() {
logger.info("Starting scheduled ETL process");
try {
ingestionService.loadSalesFactFromStaging();
logger.info("Scheduled ETL process completed successfully");
} catch (Exception e) {
logger.error("Scheduled ETL process failed", e);
}
}
// Monitor warehouse usage daily
@Scheduled(cron = "0 0 6 * * *")
public void monitorWarehouseUsage() {
logger.info("Checking warehouse storage usage");
try {
var storageUsage = managementService.getStorageUsage();
if (storageUsage != null) {
double storageGB = storageUsage.storageBytes() / (1024.0 * 1024.0 * 1024.0);
double stageGB = storageUsage.stageBytes() / (1024.0 * 1024.0 * 1024.0);
logger.info("Storage Usage - Data: {:.2f} GB, Stage: {:.2f} GB", storageGB, stageGB);
// Alert if storage exceeds threshold
if (storageGB > 100) { // 100 GB threshold
logger.warn("Storage usage exceeds 100 GB: {:.2f} GB", storageGB);
}
}
} catch (Exception e) {
logger.error("Storage monitoring failed", e);
}
}
}
9. Testing
Integration Test
package com.example.snowflake.test;
import com.example.snowflake.analytics.AnalyticsQueryService;
import com.example.snowflake.connection.SnowflakeConnectionManager;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import java.time.LocalDate;
import static org.assertj.core.api.Assertions.assertThat;
@SpringBootTest
@ActiveProfiles("test")
class SnowflakeIntegrationTest {
@Autowired
private SnowflakeConnectionManager connectionManager;
@Autowired
private AnalyticsQueryService analyticsService;
@Test
void testConnection() throws Exception {
assertThat(connectionManager.getConnection()).isNotNull();
}
@Test
void testSalesSummaryQuery() {
LocalDate startDate = LocalDate.of(2023, 1, 1);
LocalDate endDate = LocalDate.of(2023, 12, 31);
var summary = analyticsService.getSalesSummary(startDate, endDate);
assertThat(summary).isNotNull();
// Add more assertions based on your test data
}
}
Summary
This comprehensive Snowflake Data Warehouse implementation provides:
Key Features:
- Schema Management: Automated creation of star schema with dimensions and facts
- Data Ingestion: Bulk loading from stages and real-time batch inserts
- ETL Processes: Automated data transformation and loading
- Analytics: Pre-built analytical queries for business intelligence
- Management: Warehouse monitoring and maintenance operations
- REST API: Full REST interface for all operations
Benefits:
- Scalability: Leverages Snowflake's elastic scaling
- Performance: Optimized queries with clustering and indexing
- Maintainability: Clean architecture with separation of concerns
- Monitoring: Comprehensive monitoring and alerting
- Integration: Easy integration with existing systems
Use Cases:
- Enterprise data warehousing
- Business intelligence and reporting
- Real-time analytics
- Data lake integration
- Multi-cloud data strategies
This implementation provides a production-ready foundation for building scalable data warehouse solutions with Snowflake and Java.