Complete Guide to Input and Output in Programming

Introduction to Input and Output

Input and Output (I/O) operations are fundamental to programming, enabling communication between programs and the outside world. From reading user input to writing files, displaying data, and communicating over networks, I/O forms the bridge between software and its environment. Understanding I/O concepts across different programming languages is essential for building interactive, data-driven applications.

Key Concepts

  • Streams: Flow of data between program and external sources
  • Buffering: Temporary storage for efficient I/O operations
  • Standard I/O: stdin, stdout, stderr
  • File I/O: Reading from and writing to files
  • Serialization: Converting data structures to transmittable formats
  • Synchronization: Handling concurrent I/O operations

1. Standard Input/Output

Console I/O

# Python console I/O
# Input
name = input("Enter your name: ")
age = int(input("Enter your age: "))
# Output
print(f"Hello, {name}! You are {age} years old.")
print("Multiple", "values", "separated", sep=", ", end="!\n")
# Formatted output
print("Name: %s, Age: %d" % (name, age))
print("Name: {}, Age: {}".format(name, age))
// Java console I/O
import java.util.Scanner;
public class ConsoleIO {
public static void main(String[] args) {
// Input
Scanner scanner = new Scanner(System.in);
System.out.print("Enter your name: ");
String name = scanner.nextLine();
System.out.print("Enter your age: ");
int age = scanner.nextInt();
// Output
System.out.println("Hello, " + name + "! You are " + age + " years old.");
System.out.printf("Name: %s, Age: %d%n", name, age);
scanner.close();
}
}
// C console I/O
#include <stdio.h>
int main() {
char name[100];
int age;
printf("Enter your name: ");
fgets(name, sizeof(name), stdin);
printf("Enter your age: ");
scanf("%d", &age);
printf("Hello, %sYou are %d years old.\n", name, age);
return 0;
}
// Node.js console I/O
const readline = require('readline');
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout
});
rl.question('Enter your name: ', (name) => {
rl.question('Enter your age: ', (age) => {
console.log(`Hello, ${name}! You are ${age} years old.`);
rl.close();
});
});

Standard Streams

# Standard streams in Python
import sys
# stdout (standard output)
sys.stdout.write("Writing to stdout\n")
# stderr (standard error)
sys.stderr.write("Error message\n")
# stdin (standard input)
data = sys.stdin.read()
# Redirecting stdout
original_stdout = sys.stdout
with open('output.txt', 'w') as f:
sys.stdout = f
print("This goes to file")
sys.stdout = original_stdout
# Shell redirection
# Redirect stdout to file
python script.py > output.txt
# Redirect stderr to file
python script.py 2> error.txt
# Redirect both
python script.py &> all_output.txt
# Pipe to another command
ls -la | grep "txt" | head -5

2. File I/O

Reading Files

# Python file reading
# Method 1: read all
with open('file.txt', 'r') as f:
content = f.read()
print(content)
# Method 2: read line by line
with open('file.txt', 'r') as f:
for line in f:
print(line.strip())
# Method 3: read all lines into list
with open('file.txt', 'r') as f:
lines = f.readlines()
for line in lines:
print(line.strip())
# Method 4: read chunks
with open('large_file.txt', 'r') as f:
while chunk := f.read(1024):  # Read 1KB chunks
process_chunk(chunk)
// Java file reading
import java.nio.file.*;
import java.io.*;
import java.util.List;
public class FileRead {
public static void main(String[] args) throws IOException {
// Method 1: Files.readAllLines (Java 8+)
List<String> lines = Files.readAllLines(Paths.get("file.txt"));
for (String line : lines) {
System.out.println(line);
}
// Method 2: BufferedReader
try (BufferedReader br = new BufferedReader(new FileReader("file.txt"))) {
String line;
while ((line = br.readLine()) != null) {
System.out.println(line);
}
}
// Method 3: Files.lines (stream)
try (Stream<String> stream = Files.lines(Paths.get("file.txt"))) {
stream.forEach(System.out::println);
}
}
}
// C file reading
#include <stdio.h>
#include <stdlib.h>
int main() {
FILE *file = fopen("file.txt", "r");
if (file == NULL) {
perror("Error opening file");
return 1;
}
// Method 1: read character by character
int ch;
while ((ch = fgetc(file)) != EOF) {
putchar(ch);
}
// Method 2: read line by line
rewind(file);
char buffer[256];
while (fgets(buffer, sizeof(buffer), file)) {
printf("%s", buffer);
}
fclose(file);
return 0;
}

Writing Files

# Python file writing
# Write entire content
with open('output.txt', 'w') as f:
f.write("Hello, World!\n")
f.write("Second line\n")
# Write multiple lines
lines = ["Line 1\n", "Line 2\n", "Line 3\n"]
with open('output.txt', 'w') as f:
f.writelines(lines)
# Append to file
with open('output.txt', 'a') as f:
f.write("Appended line\n")
# Binary file writing
with open('binary.dat', 'wb') as f:
f.write(b'\x00\x01\x02\x03')
// Java file writing
import java.nio.file.*;
import java.io.*;
public class FileWrite {
public static void main(String[] args) throws IOException {
// Method 1: Files.write (Java 7+)
List<String> lines = Arrays.asList("Line 1", "Line 2", "Line 3");
Files.write(Paths.get("output.txt"), lines);
// Method 2: BufferedWriter
try (BufferedWriter bw = new BufferedWriter(new FileWriter("output.txt"))) {
bw.write("Hello, World!");
bw.newLine();
bw.write("Second line");
}
// Append mode
try (BufferedWriter bw = new BufferedWriter(new FileWriter("output.txt", true))) {
bw.write("Appended line");
}
}
}
// C file writing
#include <stdio.h>
int main() {
// Write text file
FILE *file = fopen("output.txt", "w");
if (file == NULL) {
perror("Error opening file");
return 1;
}
fprintf(file, "Hello, World!\n");
fprintf(file, "Second line\n");
fclose(file);
// Append mode
file = fopen("output.txt", "a");
fprintf(file, "Appended line\n");
fclose(file);
return 0;
}

File Position and Navigation

# Python file positioning
with open('file.txt', 'r+') as f:
# Get current position
position = f.tell()
print(f"Position: {position}")
# Read first 10 bytes
data = f.read(10)
print(f"Data: {data}")
# Seek to beginning
f.seek(0)
# Seek from current position
f.seek(5, 1)  # 5 bytes forward from current
# Seek from end
f.seek(-10, 2)  # 10 bytes from end
# Read from position
f.seek(20)
data = f.read()
// C file positioning
#include <stdio.h>
int main() {
FILE *file = fopen("file.txt", "r+");
if (file == NULL) return 1;
// Get current position
long pos = ftell(file);
printf("Position: %ld\n", pos);
// Seek to beginning
fseek(file, 0, SEEK_SET);
// Seek from current
fseek(file, 10, SEEK_CUR);
// Seek from end
fseek(file, -10, SEEK_END);
// Get new position
pos = ftell(file);
fclose(file);
return 0;
}

3. Buffered vs Unbuffered I/O

Understanding Buffering

# Python buffering examples
import time
# Unbuffered (line buffered by default for terminal)
def unbuffered_demo():
import sys
for i in range(5):
sys.stdout.write(f"{i} ")
time.sleep(0.5)
print()  # Flush buffer
# Flush manually
def flush_demo():
import sys
for i in range(5):
print(i, end=' ', flush=True)
time.sleep(0.5)
# File buffering
def file_buffering():
# Line buffered (default for text files)
with open('output.txt', 'w') as f:
for i in range(1000):
f.write(f"{i}\n")
# May not write to disk immediately
# Fully buffered with custom buffer size
with open('output.txt', 'w', buffering=8192) as f:
for i in range(10000):
f.write(f"{i}\n")
# Unbuffered
with open('output.txt', 'w', buffering=0) as f:
f.write("Immediate write to disk\n")
// C buffering control
#include <stdio.h>
#include <unistd.h>
int main() {
// Standard output is line buffered by default
printf("Hello");
sleep(1);
printf(" World\n");  // Buffer flushed with newline
// Set unbuffered
setbuf(stdout, NULL);
printf("Immediately visible");
sleep(2);
// Set line buffered
setlinebuf(stdout);
// Set fully buffered with custom buffer
char buffer[4096];
setvbuf(stdout, buffer, _IOFBF, sizeof(buffer));
// Flush manually
fflush(stdout);
return 0;
}

4. Binary I/O

Reading and Writing Binary Data

# Python binary I/O
import struct
import array
# Write binary data
with open('data.bin', 'wb') as f:
# Write bytes
f.write(b'\x00\x01\x02\x03')
# Write integers
f.write(struct.pack('i', 42))
# Write multiple values
f.write(struct.pack('iii', 1, 2, 3))
# Write array
arr = array.array('i', [10, 20, 30])
arr.tofile(f)
# Read binary data
with open('data.bin', 'rb') as f:
# Read bytes
data = f.read(4)
print(data.hex())
# Read integers
num = struct.unpack('i', f.read(4))[0]
print(num)
# Read multiple
nums = struct.unpack('iii', f.read(12))
print(nums)
# Read array
arr = array.array('i')
arr.fromfile(f, 3)
print(arr)
// C binary I/O
#include <stdio.h>
#include <stdint.h>
typedef struct {
int id;
float value;
char name[32];
} Record;
int main() {
// Write binary file
FILE *file = fopen("data.bin", "wb");
// Write single value
int num = 42;
fwrite(&num, sizeof(int), 1, file);
// Write array
int arr[] = {1, 2, 3, 4, 5};
fwrite(arr, sizeof(int), 5, file);
// Write structure
Record rec = {1, 3.14, "Example"};
fwrite(&rec, sizeof(Record), 1, file);
fclose(file);
// Read binary file
file = fopen("data.bin", "rb");
// Read integer
fread(&num, sizeof(int), 1, file);
printf("num: %d\n", num);
// Read array
fread(arr, sizeof(int), 5, file);
for (int i = 0; i < 5; i++) {
printf("%d ", arr[i]);
}
printf("\n");
// Read structure
fread(&rec, sizeof(Record), 1, file);
printf("id: %d, value: %f, name: %s\n", rec.id, rec.value, rec.name);
fclose(file);
return 0;
}

5. Formatted I/O

String Formatting

# Python formatted output
name = "Alice"
age = 30
height = 5.7
# f-strings (Python 3.6+)
print(f"Name: {name}, Age: {age}, Height: {height}")
# format() method
print("Name: {}, Age: {}, Height: {}".format(name, age, height))
print("Name: {0}, Age: {1}, Height: {2}".format(name, age, height))
print("Name: {n}, Age: {a}, Height: {h}".format(n=name, a=age, h=height))
# Format specifiers
print(f"Age: {age:5d}")          # Right-aligned width 5
print(f"Age: {age:<5d}")         # Left-aligned
print(f"Height: {height:.2f}")   # 2 decimal places
print(f"Percent: {0.12345:.2%}") # Percentage format
print(f"Hex: {255:04x}")         # Hexadecimal with leading zeros
print(f"Binary: {255:08b}")      # Binary with 8 bits
# Old style formatting
print("Name: %s, Age: %d, Height: %.2f" % (name, age, height))
// Java formatted output
String name = "Alice";
int age = 30;
double height = 5.7;
// System.out.printf
System.out.printf("Name: %s, Age: %d, Height: %.2f%n", name, age, height);
// String.format
String formatted = String.format("Name: %s, Age: %d, Height: %.2f", name, age, height);
System.out.println(formatted);
// Format specifiers
System.out.printf("Age: %5d%n", age);        // Right-aligned width 5
System.out.printf("Age: %-5d%n", age);       // Left-aligned
System.out.printf("Height: %.2f%n", height); // 2 decimal places
System.out.printf("Hex: %04x%n", 255);       // Hexadecimal
System.out.printf("Octal: %04o%n", 255);     // Octal
// MessageFormat
import java.text.MessageFormat;
String msg = MessageFormat.format("Name: {0}, Age: {1}", name, age);
System.out.println(msg);

6. Network I/O

TCP Socket Programming

# Python TCP server
import socket
def tcp_server():
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('localhost', 8080))
server.listen(5)
print("Server listening on port 8080")
while True:
client, address = server.accept()
print(f"Connected to {address}")
# Receive data
data = client.recv(1024)
print(f"Received: {data.decode()}")
# Send response
client.send(b"Hello from server!")
client.close()
# Python TCP client
def tcp_client():
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('localhost', 8080))
# Send data
client.send(b"Hello from client!")
# Receive response
response = client.recv(1024)
print(f"Server response: {response.decode()}")
client.close()
// Java TCP server
import java.io.*;
import java.net.*;
public class TCPServer {
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(8080);
System.out.println("Server listening on port 8080");
while (true) {
Socket clientSocket = serverSocket.accept();
System.out.println("Client connected: " + clientSocket.getInetAddress());
BufferedReader in = new BufferedReader(
new InputStreamReader(clientSocket.getInputStream()));
PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
String inputLine = in.readLine();
System.out.println("Received: " + inputLine);
out.println("Hello from server!");
clientSocket.close();
}
}
}

HTTP Requests

# Python HTTP requests
import requests
import urllib.request
import urllib.parse
# Using requests library (third-party)
response = requests.get('https://api.example.com/data')
print(response.status_code)
print(response.json())
# POST request
data = {'key': 'value'}
response = requests.post('https://api.example.com/submit', json=data)
# Headers
headers = {'Authorization': 'Bearer token123'}
response = requests.get('https://api.example.com/protected', headers=headers)
# Using urllib (standard library)
with urllib.request.urlopen('https://api.example.com/data') as response:
data = response.read()
print(data.decode())
// Node.js HTTP requests
const https = require('https');
const axios = require('axios');
// Using built-in https
https.get('https://api.example.com/data', (res) => {
let data = '';
res.on('data', (chunk) => {
data += chunk;
});
res.on('end', () => {
console.log(JSON.parse(data));
});
});
// Using axios (third-party)
async function fetchData() {
try {
const response = await axios.get('https://api.example.com/data');
console.log(response.data);
const postResponse = await axios.post('https://api.example.com/submit', {
key: 'value'
});
} catch (error) {
console.error(error);
}
}

7. Serialization

JSON Serialization

# Python JSON
import json
# Serialize
data = {
'name': 'Alice',
'age': 30,
'hobbies': ['reading', 'coding'],
'active': True
}
# To JSON string
json_string = json.dumps(data, indent=2)
print(json_string)
# To file
with open('data.json', 'w') as f:
json.dump(data, f, indent=2)
# Deserialize
with open('data.json', 'r') as f:
loaded_data = json.load(f)
print(loaded_data)
# Handle custom objects
class Person:
def __init__(self, name, age):
self.name = name
self.age = age
def to_json(self):
return {'name': self.name, 'age': self.age}
person = Person('Bob', 25)
json_string = json.dumps(person, default=lambda o: o.__dict__)
// Java JSON (using Jackson)
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.*;
public class JsonExample {
public static void main(String[] args) throws Exception {
ObjectMapper mapper = new ObjectMapper();
// Serialize
Map<String, Object> data = new HashMap<>();
data.put("name", "Alice");
data.put("age", 30);
data.put("hobbies", Arrays.asList("reading", "coding"));
data.put("active", true);
String json = mapper.writeValueAsString(data);
System.out.println(json);
// Deserialize
Map<String, Object> loaded = mapper.readValue(json, Map.class);
System.out.println(loaded.get("name"));
}
}

XML Serialization

# Python XML
import xml.etree.ElementTree as ET
# Create XML
root = ET.Element("person")
name = ET.SubElement(root, "name")
name.text = "Alice"
age = ET.SubElement(root, "age")
age.text = "30"
hobbies = ET.SubElement(root, "hobbies")
hobby1 = ET.SubElement(hobbies, "hobby")
hobby1.text = "reading"
hobby2 = ET.SubElement(hobbies, "hobby")
hobby2.text = "coding"
# Write to file
tree = ET.ElementTree(root)
tree.write("person.xml")
# Read XML
tree = ET.parse("person.xml")
root = tree.getroot()
print(root.find("name").text)
print(root.find("age").text)
for hobby in root.find("hobbies").findall("hobby"):
print(hobby.text)

Binary Serialization

# Python pickle (Python-specific)
import pickle
# Serialize
data = {'name': 'Alice', 'age': 30, 'hobbies': ['reading', 'coding']}
with open('data.pkl', 'wb') as f:
pickle.dump(data, f)
# Deserialize
with open('data.pkl', 'rb') as f:
loaded = pickle.load(f)
print(loaded)
# Custom object serialization
class Person:
def __init__(self, name, age):
self.name = name
self.age = age
person = Person('Bob', 25)
with open('person.pkl', 'wb') as f:
pickle.dump(person, f)
with open('person.pkl', 'rb') as f:
loaded = pickle.load(f)
print(loaded.name, loaded.age)

8. Asynchronous I/O

Python Asyncio

# Python asyncio
import asyncio
import aiohttp
async def read_file_async(filename):
"""Asynchronous file reading"""
loop = asyncio.get_event_loop()
with open(filename, 'r') as f:
return await loop.run_in_executor(None, f.read)
async def fetch_url(session, url):
"""Asynchronous HTTP request"""
async with session.get(url) as response:
return await response.text()
async def main():
# File I/O
content = await read_file_async('file.txt')
print(f"File content: {content[:100]}")
# Network I/O
async with aiohttp.ClientSession() as session:
tasks = []
urls = ['http://example.com'] * 5
for url in urls:
tasks.append(fetch_url(session, url))
results = await asyncio.gather(*tasks)
print(f"Fetched {len(results)} pages")
asyncio.run(main())

Node.js Async I/O

// Node.js asynchronous I/O
const fs = require('fs').promises;
const http = require('https');
// Promise-based file I/O
async function readFiles() {
try {
const data = await fs.readFile('file.txt', 'utf8');
console.log('File content:', data);
// Multiple files in parallel
const files = ['file1.txt', 'file2.txt', 'file3.txt'];
const contents = await Promise.all(
files.map(file => fs.readFile(file, 'utf8'))
);
console.log('All files:', contents);
} catch (error) {
console.error('Error:', error);
}
}
// Stream-based I/O
function streamRead() {
const readStream = fs.createReadStream('large_file.txt', { encoding: 'utf8' });
readStream.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes`);
});
readStream.on('end', () => {
console.log('File read complete');
});
}
// Async network requests
async function fetchUrls() {
const urls = [
'https://api.example.com/1',
'https://api.example.com/2',
'https://api.example.com/3'
];
const fetchPromises = urls.map(url => 
fetch(url).then(res => res.json())
);
const results = await Promise.all(fetchPromises);
console.log('All responses:', results);
}
readFiles();

9. Error Handling in I/O

Exception Handling

# Python I/O error handling
import os
import errno
def safe_read_file(filename):
try:
with open(filename, 'r') as f:
return f.read()
except FileNotFoundError:
print(f"File not found: {filename}")
return None
except PermissionError:
print(f"Permission denied: {filename}")
return None
except IOError as e:
print(f"I/O error: {e}")
return None
except Exception as e:
print(f"Unexpected error: {e}")
return None
def safe_write_file(filename, content):
try:
# Create directory if it doesn't exist
os.makedirs(os.path.dirname(filename), exist_ok=True)
with open(filename, 'w') as f:
f.write(content)
return True
except OSError as e:
if e.errno == errno.ENOSPC:
print("No space left on device")
elif e.errno == errno.EROFS:
print("Read-only filesystem")
else:
print(f"OS error: {e}")
return False
# Using context managers for resource cleanup
def process_file(filename):
f = None
try:
f = open(filename, 'r')
data = f.read()
# Process data...
return data
except Exception as e:
print(f"Error: {e}")
return None
finally:
if f:
f.close()
// Java I/O error handling
import java.io.*;
import java.nio.file.*;
public class FileIO {
public static String safeReadFile(String filename) {
try {
return Files.readString(Path.of(filename));
} catch (FileNotFoundException e) {
System.err.println("File not found: " + filename);
} catch (AccessDeniedException e) {
System.err.println("Permission denied: " + filename);
} catch (IOException e) {
System.err.println("I/O error: " + e.getMessage());
}
return null;
}
public static boolean safeWriteFile(String filename, String content) {
try {
// Create parent directories if needed
Path path = Path.of(filename);
Files.createDirectories(path.getParent());
Files.writeString(path, content);
return true;
} catch (IOException e) {
System.err.println("Write error: " + e.getMessage());
return false;
}
}
// Try-with-resources for automatic cleanup
public static String processFile(String filename) {
try (BufferedReader br = new BufferedReader(new FileReader(filename))) {
StringBuilder sb = new StringBuilder();
String line;
while ((line = br.readLine()) != null) {
sb.append(line).append("\n");
}
return sb.toString();
} catch (IOException e) {
System.err.println("Error reading file: " + e.getMessage());
return null;
}
}
}

10. Performance Optimization

Buffering Strategies

# Python I/O performance
import time
import io
def test_read_performance():
# Create test data
data = "Hello World\n" * 1000000
# Write test file
with open('test.txt', 'w') as f:
f.write(data)
# Read without buffering (line buffered)
start = time.time()
with open('test.txt', 'r') as f:
lines = f.readlines()
print(f"Default buffering: {time.time() - start:.3f}s")
# Read with custom buffer size
start = time.time()
with open('test.txt', 'r', buffering=1024*1024) as f:  # 1MB buffer
lines = f.readlines()
print(f"1MB buffer: {time.time() - start:.3f}s")
# Memory-mapped file (for large files)
import mmap
start = time.time()
with open('test.txt', 'r+b') as f:
with mmap.mmap(f.fileno(), 0) as mm:
data = mm.read()
print(f"Memory-mapped: {time.time() - start:.3f}s")
def test_write_performance():
lines = [f"Line {i}\n" for i in range(1000000)]
# Write line by line
start = time.time()
with open('output1.txt', 'w') as f:
for line in lines:
f.write(line)
print(f"Line by line: {time.time() - start:.3f}s")
# Write with join
start = time.time()
with open('output2.txt', 'w') as f:
f.write(''.join(lines))
print(f"Join and write: {time.time() - start:.3f}s")
# Write with buffering
start = time.time()
with open('output3.txt', 'w', buffering=1024*1024) as f:
f.write(''.join(lines))
print(f"Large buffer: {time.time() - start:.3f}s")

Memory-Mapped I/O

# Python memory-mapped files
import mmap
import os
def memory_mapped_read(filename):
"""Read file using memory mapping (very fast for large files)"""
with open(filename, 'r+b') as f:
with mmap.mmap(f.fileno(), 0) as mm:
# Access file as if it were memory
# Read entire file
content = mm.read()
# Search for patterns
pos = mm.find(b'pattern')
# Modify in place
mm[0:5] = b'Hello'
return content
def memory_mapped_shared():
"""Create shared memory between processes"""
# Create shared memory
size = 1024 * 1024  # 1MB
fd = os.open('/dev/shm/shared_mem', os.O_CREAT | os.O_RDWR)
os.ftruncate(fd, size)
with mmap.mmap(fd, size, mmap.MAP_SHARED, mmap.PROT_READ | mmap.PROT_WRITE) as mm:
# Write to shared memory
mm[0:11] = b'Hello World'
# Another process can read the same memory
# (using the same file descriptor)

11. Real-World Examples

Logging System

# Python logging system
import logging
import sys
from datetime import datetime
class Logger:
def __init__(self, name, log_file=None, level=logging.INFO):
self.logger = logging.getLogger(name)
self.logger.setLevel(level)
# Console handler
console_handler = logging.StreamHandler(sys.stdout)
console_format = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
console_handler.setFormatter(console_format)
self.logger.addHandler(console_handler)
# File handler
if log_file:
file_handler = logging.FileHandler(log_file)
file_format = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(file_format)
self.logger.addHandler(file_handler)
def debug(self, msg):
self.logger.debug(msg)
def info(self, msg):
self.logger.info(msg)
def warning(self, msg):
self.logger.warning(msg)
def error(self, msg):
self.logger.error(msg)
def critical(self, msg):
self.logger.critical(msg)
# Usage
logger = Logger('myapp', 'app.log')
logger.info("Application started")
logger.debug("Debug information")
logger.error("An error occurred")

Configuration Parser

# Python configuration file parser
import configparser
import json
import yaml
class ConfigManager:
def __init__(self, filename):
self.filename = filename
self.config = {}
self.load()
def load(self):
"""Load configuration based on file extension"""
if self.filename.endswith('.json'):
self._load_json()
elif self.filename.endswith('.yaml') or self.filename.endswith('.yml'):
self._load_yaml()
elif self.filename.endswith('.ini') or self.filename.endswith('.conf'):
self._load_ini()
else:
raise ValueError(f"Unsupported config format: {self.filename}")
def _load_json(self):
with open(self.filename, 'r') as f:
self.config = json.load(f)
def _load_yaml(self):
with open(self.filename, 'r') as f:
self.config = yaml.safe_load(f)
def _load_ini(self):
config = configparser.ConfigParser()
config.read(self.filename)
self.config = {section: dict(config.items(section)) for section in config.sections()}
def get(self, key, default=None):
keys = key.split('.')
value = self.config
for k in keys:
if isinstance(value, dict):
value = value.get(k)
else:
return default
return value if value is not None else default
def save(self):
"""Save configuration"""
if self.filename.endswith('.json'):
with open(self.filename, 'w') as f:
json.dump(self.config, f, indent=2)
elif self.filename.endswith('.yaml') or self.filename.endswith('.yml'):
with open(self.filename, 'w') as f:
yaml.dump(self.config, f)
elif self.filename.endswith('.ini') or self.filename.endswith('.conf'):
config = configparser.ConfigParser()
for section, values in self.config.items():
config[section] = values
with open(self.filename, 'w') as f:
config.write(f)
# Usage
config = ConfigManager('config.json')
database_url = config.get('database.url', 'postgresql://localhost/db')
print(f"Database URL: {database_url}")

CSV Data Processor

# Python CSV processing
import csv
import pandas as pd
from pathlib import Path
class CSVProcessor:
def __init__(self, filename):
self.filename = Path(filename)
self.data = []
self.headers = []
def read(self):
"""Read CSV file"""
with open(self.filename, 'r') as f:
reader = csv.reader(f)
self.headers = next(reader)
self.data = list(reader)
return self
def read_dict(self):
"""Read as dictionaries"""
with open(self.filename, 'r') as f:
reader = csv.DictReader(f)
return list(reader)
def write(self, data, headers=None):
"""Write CSV file"""
with open(self.filename, 'w', newline='') as f:
writer = csv.writer(f)
if headers:
writer.writerow(headers)
writer.writerows(data)
def filter(self, column, value):
"""Filter rows by column value"""
if column not in self.headers:
raise ValueError(f"Column {column} not found")
col_index = self.headers.index(column)
filtered = [row for row in self.data if row[col_index] == value]
return filtered
def transform(self, column, func):
"""Transform column values"""
if column not in self.headers:
raise ValueError(f"Column {column} not found")
col_index = self.headers.index(column)
for row in self.data:
row[col_index] = func(row[col_index])
return self
def to_dataframe(self):
"""Convert to pandas DataFrame"""
return pd.DataFrame(self.data, columns=self.headers)
# Usage
processor = CSVProcessor('data.csv')
processor.read()
processor.transform('age', lambda x: int(x) + 1)
processor.write('output.csv')

12. Best Practices

Resource Management

# Always use context managers (with statements)
# Bad
f = open('file.txt', 'r')
data = f.read()
f.close()  # Might be missed if exception occurs
# Good
with open('file.txt', 'r') as f:
data = f.read()
# Automatically closed even if exception occurs
# Multiple resources
with open('input.txt', 'r') as infile, open('output.txt', 'w') as outfile:
for line in infile:
outfile.write(line.upper())
# Custom context manager
class ManagedFile:
def __init__(self, filename, mode):
self.filename = filename
self.mode = mode
self.file = None
def __enter__(self):
self.file = open(self.filename, self.mode)
return self.file
def __exit__(self, exc_type, exc_val, exc_tb):
if self.file:
self.file.close()

Input Validation

# Validate user input
def safe_input(prompt, type_converter=str, validator=None):
"""Get validated user input"""
while True:
try:
value = input(prompt)
converted = type_converter(value)
if validator and not validator(converted):
print("Invalid input. Please try again.")
continue
return converted
except ValueError:
print(f"Please enter a valid {type_converter.__name__}")
# Usage
age = safe_input("Enter your age: ", int, lambda x: 0 <= x <= 150)
name = safe_input("Enter your name: ", str, lambda x: len(x) > 0)

Buffering and Flushing

# Proper buffering for performance
import sys
# For real-time output
def progress_bar(iterable, prefix='', length=50):
"""Display progress bar"""
total = len(iterable)
sys.stdout.write(prefix)
for i, item in enumerate(iterable):
percent = (i + 1) / total
bar_length = int(length * percent)
bar = '█' * bar_length + '░' * (length - bar_length)
sys.stdout.write(f'\r{prefix} [{bar}] {percent:.1%}')
sys.stdout.flush()  # Force update
yield item
print()  # New line when done
# Usage
for item in progress_bar(range(100), 'Processing:'):
# Process item
pass

Conclusion

Input and Output operations are essential for any program that interacts with users, files, or networks. Understanding I/O concepts across different programming languages enables you to build robust, efficient applications.

Key Takeaways

  1. Standard Streams: stdin, stdout, stderr are universal
  2. File I/O: Always close resources (use context managers)
  3. Buffering: Balance between performance and responsiveness
  4. Error Handling: Always handle I/O exceptions
  5. Serialization: Choose appropriate format (JSON, XML, binary)
  6. Asynchronous I/O: Essential for scalable network applications
  7. Performance: Use appropriate buffering and memory mapping

Best Practices Checklist

  • [ ] Always close files/resources (use with/try-finally)
  • [ ] Validate input before processing
  • [ ] Handle I/O exceptions appropriately
  • [ ] Use appropriate buffering for performance
  • [ ] Consider character encoding (UTF-8)
  • [ ] Use async I/O for network operations
  • [ ] Log I/O errors for debugging
  • [ ] Test with edge cases (empty files, large files)
  • [ ] Use memory mapping for large files
  • [ ] Implement retry logic for transient failures

Mastering I/O operations is crucial for building real-world applications that interact with the outside world. Whether you're reading user input, processing files, or communicating over networks, understanding I/O patterns will make you a more effective programmer!

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper