Complete Guide to Database Tables for Data Science

Introduction to Database Tables in Data Science

Database tables are the fundamental building blocks of data storage in data science. They provide a structured way to organize, store, and retrieve data for analysis. Understanding how to work with database tables effectively is crucial for data scientists who need to extract insights from structured data.

Key Concepts

  • Tables: Organized collections of data with rows and columns
  • Rows (Records): Individual entries in a table
  • Columns (Fields): Attributes or features of the data
  • Primary Keys: Unique identifiers for each row
  • Foreign Keys: References to other tables
  • Relationships: How tables connect to each other

1. Database Table Fundamentals

What is a Database Table?

A database table is a structured set of data organized in rows and columns, similar to a spreadsheet but with additional capabilities for data integrity, relationships, and querying.

-- Example: Customer table structure
CREATE TABLE customers (
customer_id INT PRIMARY KEY,
first_name VARCHAR(50) NOT NULL,
last_name VARCHAR(50) NOT NULL,
email VARCHAR(100) UNIQUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

Table Components

import pandas as pd
import sqlite3
# Creating a sample database table
conn = sqlite3.connect(':memory:')
# Create table
conn.execute('''
CREATE TABLE sales (
sale_id INTEGER PRIMARY KEY,
product_name TEXT NOT NULL,
quantity INTEGER CHECK(quantity > 0),
price REAL CHECK(price >= 0),
sale_date DATE,
customer_id INTEGER,
FOREIGN KEY (customer_id) REFERENCES customers(customer_id)
)
''')
print("Table structure:")
for row in conn.execute("PRAGMA table_info(sales)"):
print(f"  {row[1]}: {row[2]} (nullable: {not row[3]})")

Row and Column Operations

import pandas as pd
import sqlalchemy as sa
# Create engine
engine = sa.create_engine('sqlite:///:memory:')
# Create sample data
df = pd.DataFrame({
'order_id': range(1, 101),
'customer_name': [f'Customer_{i}' for i in range(1, 101)],
'amount': np.random.uniform(10, 1000, 100),
'order_date': pd.date_range('2023-01-01', periods=100, freq='D')
})
# Insert into database
df.to_sql('orders', engine, index=False, if_exists='replace')
# Read back
result = pd.read_sql("SELECT * FROM orders LIMIT 5", engine)
print(result)

2. Table Design Principles

Normalization

Normalization is the process of organizing data to reduce redundancy and improve data integrity.

-- Denormalized table (bad design)
CREATE TABLE orders_denormalized (
order_id INT,
customer_name VARCHAR(100),
customer_email VARCHAR(100),
customer_address TEXT,
product_name VARCHAR(100),
product_price DECIMAL,
quantity INT,
order_date DATE
);
-- Issues:
-- - Data duplication (customer info repeated for each order)
-- - Update anomalies (changing customer email requires multiple updates)
-- - Insert anomalies (can't add customer without order)
-- - Delete anomalies (lose customer info when deleting order)
-- Normalized design (good practice)
CREATE TABLE customers (
customer_id INT PRIMARY KEY,
name VARCHAR(100),
email VARCHAR(100),
address TEXT
);
CREATE TABLE products (
product_id INT PRIMARY KEY,
name VARCHAR(100),
price DECIMAL
);
CREATE TABLE orders (
order_id INT PRIMARY KEY,
customer_id INT,
order_date DATE,
FOREIGN KEY (customer_id) REFERENCES customers(customer_id)
);
CREATE TABLE order_items (
order_id INT,
product_id INT,
quantity INT,
PRIMARY KEY (order_id, product_id),
FOREIGN KEY (order_id) REFERENCES orders(order_id),
FOREIGN KEY (product_id) REFERENCES products(product_id)
);

Normalization Levels

def explain_normalization():
"""Explain different normalization forms"""
forms = {
'1NF (First Normal Form)': [
"Each cell contains a single value (atomic)",
"Each column has a unique name",
"Rows are uniquely identifiable",
"No repeating groups"
],
'2NF (Second Normal Form)': [
"Meets 1NF requirements",
"No partial dependencies on composite keys"
],
'3NF (Third Normal Form)': [
"Meets 2NF requirements",
"No transitive dependencies",
"All non-key attributes depend only on the primary key"
],
'BCNF (Boyce-Codd Normal Form)': [
"Meets 3NF requirements",
"Every determinant is a candidate key"
]
}
for form, rules in forms.items():
print(f"\n{form}:")
for rule in rules:
print(f"  • {rule}")
explain_normalization()

Denormalization for Data Warehouses

import pandas as pd
import numpy as np
def create_star_schema():
"""Create a star schema for data warehouse"""
# Dimension tables
customers = pd.DataFrame({
'customer_id': range(1, 1001),
'customer_name': [f'Customer_{i}' for i in range(1, 1001)],
'segment': np.random.choice(['Gold', 'Silver', 'Bronze'], 1000),
'region': np.random.choice(['North', 'South', 'East', 'West'], 1000)
})
products = pd.DataFrame({
'product_id': range(1, 101),
'product_name': [f'Product_{i}' for i in range(1, 101)],
'category': np.random.choice(['Electronics', 'Clothing', 'Books'], 100),
'price': np.random.uniform(10, 500, 100)
})
time_dim = pd.DataFrame({
'date_id': pd.date_range('2023-01-01', '2023-12-31'),
'year': 2023,
'month': [d.month for d in pd.date_range('2023-01-01', '2023-12-31')],
'quarter': [((d.month-1)//3 + 1) for d in pd.date_range('2023-01-01', '2023-12-31')],
'day': [d.day for d in pd.date_range('2023-01-01', '2023-12-31')],
'day_of_week': [d.dayofweek for d in pd.date_range('2023-01-01', '2023-12-31')]
})
# Fact table
fact_sales = pd.DataFrame({
'sale_id': range(1, 10001),
'customer_id': np.random.randint(1, 1001, 10000),
'product_id': np.random.randint(1, 101, 10000),
'date_id': np.random.choice(time_dim['date_id'], 10000),
'quantity': np.random.randint(1, 10, 10000),
'unit_price': np.random.uniform(10, 500, 10000),
'discount': np.random.uniform(0, 0.3, 10000)
})
fact_sales['total_amount'] = fact_sales['quantity'] * fact_sales['unit_price'] * (1 - fact_sales['discount'])
return {
'customers': customers,
'products': products,
'time': time_dim,
'sales': fact_sales
}
# Create star schema
star_schema = create_star_schema()
for name, df in star_schema.items():
print(f"{name}: {df.shape[0]} rows, {df.shape[1]} columns")

3. Table Indexing

Types of Indexes

-- Creating different types of indexes
-- Primary key index (automatically created)
CREATE TABLE users (
user_id INT PRIMARY KEY,
username VARCHAR(50)
);
-- Unique index
CREATE UNIQUE INDEX idx_users_email ON users(email);
-- Single column index
CREATE INDEX idx_orders_customer ON orders(customer_id);
-- Composite index (multiple columns)
CREATE INDEX idx_orders_date_customer ON orders(order_date, customer_id);
-- Full-text index
CREATE FULLTEXT INDEX idx_products_description ON products(description);
-- Bitmap index (useful for low-cardinality columns)
CREATE BITMAP INDEX idx_customers_segment ON customers(segment);

Index Performance Analysis

import sqlite3
import pandas as pd
import numpy as np
import time
def analyze_index_performance():
"""Compare query performance with and without indexes"""
conn = sqlite3.connect(':memory:')
# Create large table
n_rows = 100000
df = pd.DataFrame({
'id': range(n_rows),
'category': np.random.choice(['A', 'B', 'C', 'D'], n_rows),
'value': np.random.randn(n_rows),
'date': pd.date_range('2023-01-01', periods=n_rows, freq='H')
})
df.to_sql('data', conn, index=False, if_exists='replace')
# Query without index
start = time.time()
result1 = pd.read_sql("SELECT * FROM data WHERE category = 'A'", conn)
time_no_index = time.time() - start
# Create index
conn.execute("CREATE INDEX idx_category ON data(category)")
# Query with index
start = time.time()
result2 = pd.read_sql("SELECT * FROM data WHERE category = 'A'", conn)
time_with_index = time.time() - start
print(f"Without index: {time_no_index:.4f} seconds")
print(f"With index: {time_with_index:.4f} seconds")
print(f"Speed improvement: {time_no_index/time_with_index:.2f}x")
# Analyze query plan
print("\nQuery plan:")
for row in conn.execute("EXPLAIN QUERY PLAN SELECT * FROM data WHERE category = 'A'"):
print(f"  {row}")
analyze_index_performance()

When to Use Indexes

def index_recommendations():
"""Provide recommendations for index usage"""
recommendations = {
"Primary Keys": "Always create an index (usually automatically)",
"Foreign Keys": "Index columns used in JOIN operations",
"WHERE Clauses": "Index columns frequently used in filtering",
"ORDER BY": "Index columns used for sorting",
"GROUP BY": "Index columns used for grouping",
"Unique Constraints": "Index columns that must be unique",
"Low Cardinality": "Consider bitmap indexes for few distinct values",
"Small Tables": "May not need indexes (< 1000 rows)",
"Frequent Updates": "Indexes slow down INSERT/UPDATE/DELETE",
"Large Tables": "Indexes are crucial for performance"
}
for scenario, recommendation in recommendations.items():
print(f"{scenario}: {recommendation}")
index_recommendations()

4. Table Relationships

One-to-One Relationships

-- One-to-One relationship (e.g., user and profile)
CREATE TABLE users (
user_id INT PRIMARY KEY,
username VARCHAR(50) UNIQUE
);
CREATE TABLE profiles (
profile_id INT PRIMARY KEY,
user_id INT UNIQUE,
bio TEXT,
avatar_url VARCHAR(200),
FOREIGN KEY (user_id) REFERENCES users(user_id)
);

One-to-Many Relationships

-- One-to-Many relationship (e.g., customer and orders)
CREATE TABLE customers (
customer_id INT PRIMARY KEY,
name VARCHAR(100)
);
CREATE TABLE orders (
order_id INT PRIMARY KEY,
customer_id INT,
order_date DATE,
amount DECIMAL(10,2),
FOREIGN KEY (customer_id) REFERENCES customers(customer_id)
);

Many-to-Many Relationships

-- Many-to-Many relationship (e.g., students and courses)
CREATE TABLE students (
student_id INT PRIMARY KEY,
name VARCHAR(100)
);
CREATE TABLE courses (
course_id INT PRIMARY KEY,
title VARCHAR(100)
);
-- Junction table
CREATE TABLE enrollments (
student_id INT,
course_id INT,
enrollment_date DATE,
grade VARCHAR(2),
PRIMARY KEY (student_id, course_id),
FOREIGN KEY (student_id) REFERENCES students(student_id),
FOREIGN KEY (course_id) REFERENCES courses(course_id)
);

Visualizing Relationships

import networkx as nx
import matplotlib.pyplot as plt
def visualize_table_relationships():
"""Create a graph of table relationships"""
# Define schema
tables = {
'customers': ['customer_id', 'name', 'email'],
'orders': ['order_id', 'customer_id', 'order_date', 'total'],
'products': ['product_id', 'name', 'price'],
'order_items': ['order_id', 'product_id', 'quantity', 'price'],
'reviews': ['review_id', 'product_id', 'customer_id', 'rating', 'comment']
}
relationships = [
('customers', 'orders', 'one-to-many', 'customer_id'),
('orders', 'order_items', 'one-to-many', 'order_id'),
('products', 'order_items', 'one-to-many', 'product_id'),
('products', 'reviews', 'one-to-many', 'product_id'),
('customers', 'reviews', 'one-to-many', 'customer_id')
]
# Create graph
G = nx.DiGraph()
# Add nodes
for table, columns in tables.items():
G.add_node(table, columns=columns)
# Add edges
for source, target, rel_type, key in relationships:
G.add_edge(source, target, relationship=rel_type, key=key)
# Draw graph
pos = nx.spring_layout(G, k=2, iterations=50)
plt.figure(figsize=(12, 8))
nx.draw_networkx_nodes(G, pos, node_size=3000, node_color='lightblue')
nx.draw_networkx_labels(G, pos, font_size=10, font_weight='bold')
nx.draw_networkx_edges(G, pos, edge_color='gray', arrows=True, arrowsize=20)
edge_labels = {(u, v): f"{d['relationship']}\n({d['key']})" 
for u, v, d in G.edges(data=True)}
nx.draw_networkx_edge_labels(G, pos, edge_labels, font_size=8)
plt.title("Database Schema Relationships")
plt.axis('off')
plt.tight_layout()
plt.show()
visualize_table_relationships()

5. Data Types in Tables

SQL Data Types

-- Common SQL data types
-- Numeric types
INT, INTEGER           -- 4-byte integer
SMALLINT               -- 2-byte integer
BIGINT                 -- 8-byte integer
DECIMAL(10,2)          -- Exact numeric with 10 digits, 2 decimal places
FLOAT                  -- Floating point (approximate)
NUMERIC                -- Exact numeric
REAL                   -- Single-precision float
DOUBLE PRECISION       -- Double-precision float
-- Character types
CHAR(50)               -- Fixed-length string
VARCHAR(255)           -- Variable-length string
TEXT                   -- Large text
CLOB                   -- Character Large Object
-- Date and time
DATE                   -- YYYY-MM-DD
TIME                   -- HH:MM:SS
TIMESTAMP              -- YYYY-MM-DD HH:MM:SS
DATETIME               -- Combined date and time
INTERVAL               -- Time interval
-- Boolean
BOOLEAN                -- TRUE/FALSE
-- Binary
BLOB                   -- Binary Large Object
BYTEA                  -- Byte array
-- JSON/XML
JSON                   -- JSON data
JSONB                  -- Binary JSON (PostgreSQL)
XML                    -- XML data
-- Array and composite
ARRAY                  -- Array of values
ENUM                   -- Enumerated type

Choosing Appropriate Data Types

import pandas as pd
import numpy as np
def optimize_data_types(df):
"""Optimize DataFrame memory usage by choosing appropriate types"""
memory_before = df.memory_usage(deep=True).sum() / 1024**2
for col in df.columns:
col_type = df[col].dtype
# Convert integers to smaller types
if col_type == 'int64':
min_val = df[col].min()
max_val = df[col].max()
if min_val >= 0 and max_val <= 255:
df[col] = df[col].astype('uint8')
elif min_val >= 0 and max_val <= 65535:
df[col] = df[col].astype('uint16')
elif min_val >= -32768 and max_val <= 32767:
df[col] = df[col].astype('int16')
elif min_val >= -2147483648 and max_val <= 2147483647:
df[col] = df[col].astype('int32')
# Convert floats
elif col_type == 'float64':
df[col] = df[col].astype('float32')
# Convert objects to categories
elif col_type == 'object':
unique_ratio = df[col].nunique() / len(df)
if unique_ratio < 0.5:  # Less than 50% unique
df[col] = df[col].astype('category')
memory_after = df.memory_usage(deep=True).sum() / 1024**2
print(f"Memory before: {memory_before:.2f} MB")
print(f"Memory after: {memory_after:.2f} MB")
print(f"Reduction: {(1 - memory_after/memory_before)*100:.1f}%")
return df
# Example
df = pd.DataFrame({
'id': range(1, 100001),
'small_int': np.random.randint(0, 100, 100000),
'medium_int': np.random.randint(0, 10000, 100000),
'large_int': np.random.randint(0, 1000000, 100000),
'float_val': np.random.randn(100000),
'category': np.random.choice(['A', 'B', 'C', 'D', 'E'], 100000),
'text': [f'Text_{i}' for i in range(100000)]
})
optimize_data_types(df)

6. Working with Large Tables

Partitioning

-- Table partitioning strategies
-- Range partitioning (by date)
CREATE TABLE sales (
sale_id INT,
sale_date DATE,
amount DECIMAL(10,2)
) PARTITION BY RANGE (sale_date) (
PARTITION p2023_q1 VALUES LESS THAN ('2023-04-01'),
PARTITION p2023_q2 VALUES LESS THAN ('2023-07-01'),
PARTITION p2023_q3 VALUES LESS THAN ('2023-10-01'),
PARTITION p2023_q4 VALUES LESS THAN ('2024-01-01')
);
-- List partitioning (by category)
CREATE TABLE products (
product_id INT,
category VARCHAR(50),
name VARCHAR(100)
) PARTITION BY LIST (category) (
PARTITION p_electronics VALUES IN ('Electronics'),
PARTITION p_clothing VALUES IN ('Clothing'),
PARTITION p_books VALUES IN ('Books')
);
-- Hash partitioning (for even distribution)
CREATE TABLE logs (
log_id INT,
log_message TEXT,
log_level VARCHAR(10)
) PARTITION BY HASH (log_id) PARTITIONS 10;

Sharding Strategy

import hashlib
class DatabaseShard:
"""Simple sharding implementation"""
def __init__(self, num_shards):
self.num_shards = num_shards
self.shards = [{} for _ in range(num_shards)]
def _get_shard_key(self, key):
"""Determine which shard a key belongs to"""
return int(hashlib.md5(str(key).encode()).hexdigest(), 16) % self.num_shards
def insert(self, key, value):
shard = self._get_shard_key(key)
self.shards[shard][key] = value
def get(self, key):
shard = self._get_shard_key(key)
return self.shards[shard].get(key)
def get_shard_stats(self):
stats = []
for i, shard in enumerate(self.shards):
stats.append({
'shard': i,
'size': len(shard),
'memory': sum(len(str(v)) for v in shard.values()) / 1024  # KB
})
return stats
# Example
shard = DatabaseShard(4)
# Insert data
for i in range(1000):
shard.insert(f'key_{i}', f'value_{i}')
# View distribution
stats = shard.get_shard_stats()
for stat in stats:
print(f"Shard {stat['shard']}: {stat['size']} items, {stat['memory']:.1f} KB")

Handling Large Table Queries

import sqlite3
import pandas as pd
class LargeTableHandler:
"""Handle queries on large tables efficiently"""
def __init__(self, connection):
self.conn = connection
self.chunk_size = 10000
def query_in_chunks(self, query, columns):
"""Execute query in chunks to manage memory"""
results = []
offset = 0
while True:
chunk_query = f"{query} LIMIT {self.chunk_size} OFFSET {offset}"
chunk = pd.read_sql(chunk_query, self.conn)
if chunk.empty:
break
results.append(chunk)
offset += self.chunk_size
print(f"Processed {offset} rows...")
return pd.concat(results, ignore_index=True) if results else pd.DataFrame(columns=columns)
def aggregate_in_db(self, table, group_by, agg_func):
"""Perform aggregation in database instead of memory"""
query = f"""
SELECT {group_by}, {agg_func} 
FROM {table}
GROUP BY {group_by}
"""
return pd.read_sql(query, self.conn)
def create_indexes_for_queries(self, table, columns):
"""Create indexes for commonly queried columns"""
for col in columns:
try:
self.conn.execute(f"CREATE INDEX idx_{table}_{col} ON {table}({col})")
print(f"Created index on {table}.{col}")
except sqlite3.OperationalError:
print(f"Index on {table}.{col} already exists")

7. Table Analytics

Summary Statistics

-- Basic statistical analysis on tables
-- Count, sum, average, min, max
SELECT 
COUNT(*) as total_rows,
COUNT(DISTINCT customer_id) as unique_customers,
SUM(amount) as total_sales,
AVG(amount) as avg_order_value,
MIN(amount) as min_order,
MAX(amount) as max_order,
STDDEV(amount) as std_dev
FROM orders;
-- Percentiles
SELECT 
PERCENTILE_CONT(0.25) WITHIN GROUP (ORDER BY amount) as q1,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY amount) as median,
PERCENTILE_CONT(0.75) WITHIN GROUP (ORDER BY amount) as q3
FROM orders;
-- Frequency distribution
SELECT 
category,
COUNT(*) as count,
COUNT(*) * 100.0 / SUM(COUNT(*)) OVER () as percentage
FROM products
GROUP BY category
ORDER BY count DESC;

Python Analytics on Tables

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
def analyze_table(df, table_name):
"""Comprehensive analysis of a database table"""
print(f"\n{'='*50}")
print(f"Analysis of {table_name}")
print(f"{'='*50}")
# Basic info
print(f"\nShape: {df.shape[0]} rows, {df.shape[1]} columns")
print(f"Memory usage: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
# Missing values
missing = df.isnull().sum()
if missing.sum() > 0:
print(f"\nMissing values:\n{missing[missing > 0]}")
# Numeric columns summary
numeric_cols = df.select_dtypes(include=[np.number]).columns
if len(numeric_cols) > 0:
print(f"\nNumeric Columns Summary:")
print(df[numeric_cols].describe())
# Categorical columns summary
categorical_cols = df.select_dtypes(include=['object', 'category']).columns
if len(categorical_cols) > 0:
print(f"\nCategorical Columns Summary:")
for col in categorical_cols:
print(f"\n{col}:")
print(df[col].value_counts().head(10))
# Correlation analysis
if len(numeric_cols) > 1:
corr = df[numeric_cols].corr()
print(f"\nCorrelation Matrix:")
print(corr)
# Correlation heatmap
plt.figure(figsize=(10, 8))
sns.heatmap(corr, annot=True, cmap='coolwarm', center=0)
plt.title(f'Correlation Matrix - {table_name}')
plt.tight_layout()
plt.show()
# Distribution plots
fig, axes = plt.subplots(1, min(3, len(numeric_cols)), figsize=(15, 4))
if len(numeric_cols) == 1:
axes = [axes]
for i, col in enumerate(numeric_cols[:3]):
df[col].hist(ax=axes[i], bins=30, edgecolor='black')
axes[i].set_title(f'Distribution of {col}')
axes[i].set_xlabel(col)
axes[i].set_ylabel('Frequency')
plt.suptitle(f'Column Distributions - {table_name}')
plt.tight_layout()
plt.show()
# Example
df = pd.DataFrame({
'customer_id': range(1, 1001),
'age': np.random.randint(18, 80, 1000),
'income': np.random.normal(50000, 20000, 1000),
'purchase_frequency': np.random.poisson(5, 1000),
'avg_spend': np.random.gamma(2, 50, 1000),
'segment': np.random.choice(['Premium', 'Standard', 'Basic'], 1000, p=[0.2, 0.5, 0.3])
})
analyze_table(df, 'customers')

8. Table Operations

CRUD Operations

class TableCRUD:
"""Complete CRUD operations for database tables"""
def __init__(self, connection, table_name):
self.conn = connection
self.table = table_name
def create(self, **columns):
"""Create a new record"""
cols = ', '.join(columns.keys())
placeholders = ', '.join(['?'] * len(columns))
query = f"INSERT INTO {self.table} ({cols}) VALUES ({placeholders})"
cursor = self.conn.cursor()
cursor.execute(query, list(columns.values()))
self.conn.commit()
return cursor.lastrowid
def read(self, condition=None, columns='*'):
"""Read records"""
query = f"SELECT {columns} FROM {self.table}"
if condition:
query += f" WHERE {condition}"
return pd.read_sql(query, self.conn)
def update(self, updates, condition):
"""Update records"""
set_clause = ', '.join([f"{col} = ?" for col in updates.keys()])
query = f"UPDATE {self.table} SET {set_clause} WHERE {condition}"
cursor = self.conn.cursor()
cursor.execute(query, list(updates.values()))
self.conn.commit()
return cursor.rowcount
def delete(self, condition):
"""Delete records"""
query = f"DELETE FROM {self.table} WHERE {condition}"
cursor = self.conn.cursor()
cursor.execute(query)
self.conn.commit()
return cursor.rowcount
# Example usage
conn = sqlite3.connect(':memory:')
# Create table
conn.execute('''
CREATE TABLE inventory (
id INTEGER PRIMARY KEY,
product TEXT NOT NULL,
quantity INTEGER,
price REAL
)
''')
crud = TableCRUD(conn, 'inventory')
# Create
id1 = crud.create(product='Laptop', quantity=10, price=999.99)
id2 = crud.create(product='Mouse', quantity=50, price=29.99)
print(f"Created records with IDs: {id1}, {id2}")
# Read
print("\nAll records:")
print(crud.read())
# Update
updated = crud.update({'quantity': 45, 'price': 24.99}, "product = 'Mouse'")
print(f"\nUpdated {updated} record(s)")
# Read after update
print("\nAfter update:")
print(crud.read())
# Delete
deleted = crud.delete("product = 'Mouse'")
print(f"\nDeleted {deleted} record(s)")
# Final state
print("\nFinal records:")
print(crud.read())

Batch Operations

def batch_insert(conn, table_name, data, batch_size=1000):
"""Insert data in batches for better performance"""
cursor = conn.cursor()
columns = list(data[0].keys())
placeholders = ', '.join(['?'] * len(columns))
query = f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({placeholders})"
total_rows = 0
for i in range(0, len(data), batch_size):
batch = data[i:i+batch_size]
values = [list(row.values()) for row in batch]
cursor.executemany(query, values)
conn.commit()
total_rows += len(batch)
print(f"Inserted {total_rows}/{len(data)} rows...")
return total_rows
def batch_update(conn, table_name, updates, condition, batch_size=1000):
"""Update records in batches"""
cursor = conn.cursor()
# Get IDs to update
ids = conn.execute(f"SELECT id FROM {table_name} WHERE {condition}").fetchall()
ids = [id[0] for id in ids]
total_updated = 0
for i in range(0, len(ids), batch_size):
batch_ids = ids[i:i+batch_size]
placeholders = ','.join(['?'] * len(batch_ids))
set_clause = ', '.join([f"{col} = ?" for col in updates.keys()])
query = f"UPDATE {table_name} SET {set_clause} WHERE id IN ({placeholders})"
values = list(updates.values()) + batch_ids
cursor.execute(query, values)
conn.commit()
total_updated += len(batch_ids)
print(f"Updated {total_updated}/{len(ids)} rows...")
return total_updated

9. Table Joins and Queries

Types of Joins

-- INNER JOIN: Only matching records
SELECT o.order_id, c.name, o.amount
FROM orders o
INNER JOIN customers c ON o.customer_id = c.customer_id;
-- LEFT JOIN: All records from left table
SELECT c.name, o.order_id, o.amount
FROM customers c
LEFT JOIN orders o ON c.customer_id = o.customer_id;
-- RIGHT JOIN: All records from right table
SELECT c.name, o.order_id, o.amount
FROM orders o
RIGHT JOIN customers c ON o.customer_id = c.customer_id;
-- FULL OUTER JOIN: All records from both tables
SELECT c.name, o.order_id, o.amount
FROM customers c
FULL OUTER JOIN orders o ON c.customer_id = o.customer_id;
-- CROSS JOIN: Cartesian product
SELECT c.name, p.name
FROM customers c
CROSS JOIN products p;

Advanced Query Techniques

-- Window functions
SELECT 
order_id,
customer_id,
amount,
AVG(amount) OVER (PARTITION BY customer_id) as customer_avg,
RANK() OVER (ORDER BY amount DESC) as rank,
LAG(amount, 1) OVER (ORDER BY order_date) as previous_amount
FROM orders;
-- Common Table Expressions (CTEs)
WITH customer_stats AS (
SELECT 
customer_id,
COUNT(*) as order_count,
SUM(amount) as total_spent
FROM orders
GROUP BY customer_id
)
SELECT 
c.name,
cs.order_count,
cs.total_spent,
RANK() OVER (ORDER BY cs.total_spent DESC) as spending_rank
FROM customers c
JOIN customer_stats cs ON c.customer_id = cs.customer_id;
-- Recursive CTE (hierarchical data)
WITH RECURSIVE employee_hierarchy AS (
SELECT employee_id, manager_id, name, 1 as level
FROM employees
WHERE manager_id IS NULL
UNION ALL
SELECT e.employee_id, e.manager_id, e.name, eh.level + 1
FROM employees e
JOIN employee_hierarchy eh ON e.manager_id = eh.employee_id
)
SELECT * FROM employee_hierarchy;

Python Query Builder

class QueryBuilder:
"""Dynamic SQL query builder"""
def __init__(self, table):
self.table = table
self.select_columns = ['*']
self.where_conditions = []
self.join_clauses = []
self.group_by_columns = []
self.having_conditions = []
self.order_by_columns = []
self.limit_value = None
self.offset_value = None
def select(self, *columns):
self.select_columns = columns
return self
def where(self, condition):
self.where_conditions.append(condition)
return self
def join(self, table, condition, join_type='INNER'):
self.join_clauses.append(f"{join_type} JOIN {table} ON {condition}")
return self
def group_by(self, *columns):
self.group_by_columns = columns
return self
def having(self, condition):
self.having_conditions.append(condition)
return self
def order_by(self, *columns):
self.order_by_columns = columns
return self
def limit(self, limit):
self.limit_value = limit
return self
def offset(self, offset):
self.offset_value = offset
return self
def build(self):
query = f"SELECT {', '.join(self.select_columns)} FROM {self.table}"
if self.join_clauses:
query += f" {' '.join(self.join_clauses)}"
if self.where_conditions:
query += f" WHERE {' AND '.join(self.where_conditions)}"
if self.group_by_columns:
query += f" GROUP BY {', '.join(self.group_by_columns)}"
if self.having_conditions:
query += f" HAVING {' AND '.join(self.having_conditions)}"
if self.order_by_columns:
query += f" ORDER BY {', '.join(self.order_by_columns)}"
if self.limit_value is not None:
query += f" LIMIT {self.limit_value}"
if self.offset_value is not None:
query += f" OFFSET {self.offset_value}"
return query
# Example
query = (QueryBuilder('orders')
.select('customer_id', 'SUM(amount) as total')
.where("order_date >= '2023-01-01'")
.group_by('customer_id')
.having('SUM(amount) > 1000')
.order_by('total DESC')
.limit(10)
.build())
print(query)

10. Table Optimization

Query Optimization

import time
import sqlite3
class QueryOptimizer:
"""Tools for query optimization"""
def __init__(self, connection):
self.conn = connection
def explain_query(self, query):
"""Get query execution plan"""
plan = self.conn.execute(f"EXPLAIN QUERY PLAN {query}").fetchall()
print(f"Query Plan for: {query[:100]}...")
print("-" * 50)
for step in plan:
indent = "  " * step[1]
print(f"{indent}{step[3]}")
return plan
def analyze_query_performance(self, query, iterations=10):
"""Measure query performance"""
times = []
for i in range(iterations):
start = time.time()
self.conn.execute(query).fetchall()
times.append(time.time() - start)
avg_time = sum(times) / len(times)
min_time = min(times)
max_time = max(times)
print(f"Query Performance Analysis:")
print(f"  Average: {avg_time*1000:.2f} ms")
print(f"  Min: {min_time*1000:.2f} ms")
print(f"  Max: {max_time*1000:.2f} ms")
print(f"  Std Dev: {(max_time - min_time)*1000/2:.2f} ms")
return avg_time
def suggest_indexes(self, table, query):
"""Suggest indexes based on query patterns"""
plan = self.explain_query(query)
suggested_indexes = []
for step in plan:
if 'SCAN TABLE' in step[3]:
# This is a full table scan - might benefit from index
parts = step[3].split()
if 'WHERE' in str(step):
# Extract column names from WHERE clause
import re
where_match = re.search(r'WHERE\s+(\w+)', step[3])
if where_match:
suggested_indexes.append(where_match.group(1))
if suggested_indexes:
print("\nSuggested indexes:")
for col in set(suggested_indexes):
print(f"  CREATE INDEX idx_{table}_{col} ON {table}({col});")
return suggested_indexes
# Example
conn = sqlite3.connect(':memory:')
optimizer = QueryOptimizer(conn)
# Create sample data
conn.execute('''
CREATE TABLE sales (
id INTEGER PRIMARY KEY,
customer_id INTEGER,
product_id INTEGER,
amount REAL,
sale_date DATE
)
''')
# Insert sample data
import numpy as np
data = [(i, np.random.randint(1, 100), np.random.randint(1, 50), 
np.random.uniform(10, 1000), f'2023-{np.random.randint(1,13)}-{np.random.randint(1,28)}') 
for i in range(10000)]
conn.executemany("INSERT INTO sales VALUES (?,?,?,?,?)", data)
# Analyze query
query = "SELECT * FROM sales WHERE customer_id = 42 AND amount > 500"
optimizer.analyze_query_performance(query)
optimizer.suggest_indexes('sales', query)

Table Maintenance

def table_maintenance(conn, table_name):
"""Perform table maintenance tasks"""
print(f"Maintenance for table: {table_name}")
print("-" * 40)
# Get table size
size = conn.execute(f"SELECT SUM(pgsize) FROM dbstat WHERE name='{table_name}'").fetchone()[0]
print(f"Table size: {size/1024:.2f} KB")
# Get row count
count = conn.execute(f"SELECT COUNT(*) FROM {table_name}").fetchone()[0]
print(f"Row count: {count:,}")
# Analyze table
conn.execute(f"ANALYZE {table_name}")
print("Analyzed table statistics")
# Vacuum (reclaim space)
conn.execute("VACUUM")
print("Vacuumed database")
# Get fragmentation info
fragment = conn.execute(f"SELECT * FROM dbstat WHERE name='{table_name}' ORDER BY page").fetchall()
contiguous = len([p for p in fragment if p[5] == p[4] + 1])  # Pages are contiguous
total = len(fragment)
if total > 0:
fragmentation = (1 - contiguous/total) * 100
print(f"Fragmentation: {fragmentation:.1f}%")
# Get index information
indexes = conn.execute(f"PRAGMA index_list({table_name})").fetchall()
if indexes:
print(f"\nIndexes:")
for idx in indexes:
idx_name = idx[1]
idx_info = conn.execute(f"PRAGMA index_info({idx_name})").fetchall()
columns = [col[2] for col in idx_info]
print(f"  {idx_name}: ({', '.join(columns)})")
return {
'size_kb': size/1024,
'row_count': count,
'fragmentation_percent': fragmentation if total > 0 else 0
}

11. Data Science Queries

Customer Segmentation

-- RFM Analysis (Recency, Frequency, Monetary)
WITH customer_rfm AS (
SELECT 
customer_id,
MAX(order_date) as last_order,
COUNT(*) as frequency,
SUM(amount) as monetary,
JULIANDAY('2024-01-01') - JULIANDAY(MAX(order_date)) as recency
FROM orders
GROUP BY customer_id
),
rfm_scores AS (
SELECT 
customer_id,
NTILE(4) OVER (ORDER BY recency) as r_score,
NTILE(4) OVER (ORDER BY frequency DESC) as f_score,
NTILE(4) OVER (ORDER BY monetary DESC) as m_score
FROM customer_rfm
)
SELECT 
customer_id,
r_score,
f_score,
m_score,
CASE 
WHEN r_score >= 3 AND f_score >= 3 AND m_score >= 3 THEN 'Champions'
WHEN r_score >= 3 AND f_score >= 3 THEN 'Loyal'
WHEN r_score >= 3 AND f_score <= 2 THEN 'New'
WHEN r_score <= 2 AND f_score >= 3 THEN 'At Risk'
WHEN r_score <= 2 AND f_score <= 2 THEN 'Lost'
ELSE 'Others'
END as segment
FROM rfm_scores;

Time Series Analysis

-- Daily sales with moving averages
WITH daily_sales AS (
SELECT 
DATE(order_date) as sale_date,
SUM(amount) as daily_total,
COUNT(*) as order_count
FROM orders
GROUP BY DATE(order_date)
)
SELECT 
sale_date,
daily_total,
order_count,
AVG(daily_total) OVER (ORDER BY sale_date ROWS BETWEEN 6 PRECEDING AND CURRENT ROW) as ma_7d,
AVG(daily_total) OVER (ORDER BY sale_date ROWS BETWEEN 29 PRECEDING AND CURRENT ROW) as ma_30d,
LAG(daily_total, 1) OVER (ORDER BY sale_date) as prev_day
FROM daily_sales;

Cohort Analysis

-- Cohort analysis by customer acquisition month
WITH cohorts AS (
SELECT 
customer_id,
MIN(DATE(order_date)) as first_order,
STRFTIME('%Y-%m', MIN(DATE(order_date))) as cohort_month
FROM orders
GROUP BY customer_id
),
customer_orders AS (
SELECT 
o.customer_id,
c.cohort_month,
STRFTIME('%Y-%m', o.order_date) as order_month,
o.amount
FROM orders o
JOIN cohorts c ON o.customer_id = c.customer_id
)
SELECT 
cohort_month,
order_month,
COUNT(DISTINCT customer_id) as customers,
SUM(amount) as revenue,
AVG(amount) as avg_order_value,
COUNT(DISTINCT customer_id) * 100.0 / 
FIRST_VALUE(COUNT(DISTINCT customer_id)) OVER (PARTITION BY cohort_month ORDER BY order_month) as retention_pct
FROM customer_orders
GROUP BY cohort_month, order_month
ORDER BY cohort_month, order_month;

12. Best Practices

Table Design Best Practices

def table_design_checklist():
"""Checklist for good table design"""
checklist = {
"Naming": [
"Use descriptive, meaningful names",
"Use singular nouns (customer, not customers)",
"Use snake_case naming convention",
"Be consistent (all lowercase, underscores)",
"Avoid reserved words"
],
"Columns": [
"Each column has a single purpose",
"Use appropriate data types",
"Define NOT NULL where applicable",
"Use DEFAULT values appropriately",
"Consider adding comments/documentation"
],
"Constraints": [
"Always define PRIMARY KEY",
"Use FOREIGN KEY for relationships",
"Add CHECK constraints for data validation",
"Use UNIQUE for unique columns",
"Consider NOT NULL constraints"
],
"Normalization": [
"Follow 3NF for OLTP systems",
"Denormalize for data warehouses",
"Avoid data duplication",
"Consider performance vs. normalization",
"Document design decisions"
],
"Indexes": [
"Index foreign key columns",
"Index columns used in WHERE clauses",
"Consider composite indexes",
"Avoid over-indexing",
"Monitor index usage"
],
"Performance": [
"Consider partitioning for large tables",
"Use appropriate storage engine",
"Monitor query performance",
"Regular maintenance (VACUUM, ANALYZE)",
"Archive old data when appropriate"
]
}
for category, items in checklist.items():
print(f"\n{category}:")
for item in items:
print(f"  ✓ {item}")
table_design_checklist()

Performance Best Practices

def performance_checklist():
"""Performance optimization checklist"""
print("Database Table Performance Checklist")
print("=" * 40)
practices = [
("Index Strategy", "Create indexes on columns used in WHERE, JOIN, ORDER BY"),
("Query Optimization", "Use EXPLAIN to analyze query plans"),
("Batch Operations", "Use bulk inserts/updates for large datasets"),
("Connection Management", "Use connection pooling"),
("Data Types", "Use smallest appropriate data types"),
("Partitioning", "Partition large tables by date or key ranges"),
("Archiving", "Move old data to archive tables"),
("Statistics", "Keep table statistics updated"),
("Caching", "Use query caching for expensive operations"),
("Monitoring", "Monitor slow queries and index usage")
]
for practice, description in practices:
print(f"\n{practice}:")
print(f"  {description}")
print(f"  Status: [ ] Implemented")
print(f"  Notes: _________________")

Conclusion

Database tables are the foundation of structured data storage for data science. Understanding how to design, optimize, and query tables effectively is essential for extracting insights from data.

Key Takeaways

  1. Design Matters: Good table design prevents data anomalies and improves performance
  2. Normalization: Balance between normalization and denormalization based on use case
  3. Indexing: Critical for query performance, but over-indexing hurts write operations
  4. Data Types: Choose appropriate types for storage efficiency and data integrity
  5. Relationships: Properly define relationships between tables
  6. Query Optimization: Use indexes, analyze query plans, and batch operations
  7. Maintenance: Regular maintenance keeps tables performing well
  8. Documentation: Document schema, relationships, and design decisions

Quick Reference

ConceptBest PracticeCommon Pitfall
Primary KeyUse surrogate keys (auto-increment)Using natural keys that may change
Foreign KeysAlways define for referential integrityMissing constraints, orphaned records
IndexesIndex selective columnsOver-indexing, unused indexes
Data TypesUse smallest appropriate typeUsing TEXT for everything
NULLsUse NOT NULL when possibleAllowing NULLs unnecessarily
JoinsUse appropriate join typesCartesian products from missing conditions

Mastering database tables is a fundamental skill for any data scientist working with structured data!

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper