Building a High-Performance Streaming Media Server in Java

From Basic HTTP Streaming to Adaptive Bitrate with HLS


Article

Streaming media has become the backbone of modern digital experiences, from video platforms to live broadcasts. While specialized services like FFmpeg and Nginx-RTMP exist, sometimes you need a custom solution integrated directly into your Java ecosystem.

In this guide, we'll build a streaming media server in Java, starting with basic file streaming and progressing to more advanced features like adaptive bitrate streaming with HLS.

Architecture Overview

Our streaming server will handle:

  • Progressive Download: Simple HTTP range requests for MP4 files
  • Live Streaming: Real-time media delivery
  • Adaptive Bitrate: HLS (HTTP Live Streaming) for quality adjustment
  • Chunked Transfer: Efficient delivery of large files

1. Core Dependencies

First, let's set up our dependencies in pom.xml:

<dependencies>
<!-- Netty for high-performance networking -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.86.Final</version>
</dependency>
<!-- Java NIO for file operations -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.6</version>
</dependency>
</dependencies>

2. Basic HTTP Streaming Server

Let's start with a simple server that handles range requests for media files.

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.handler.stream.ChunkedWriteHandler;
import java.io.File;
import java.io.RandomAccessFile;
public class BasicStreamingServer {
private final int port;
private final String mediaDirectory;
public BasicStreamingServer(int port, String mediaDirectory) {
this.port = port;
this.mediaDirectory = mediaDirectory;
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(
new HttpServerCodec(),
new HttpObjectAggregator(65536),
new ChunkedWriteHandler(),
new MediaStreamHandler(mediaDirectory)
);
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture f = b.bind(port).sync();
System.out.println("Streaming Server started on port " + port);
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
String mediaDir = "./media";
new BasicStreamingServer(port, mediaDir).run();
}
}

3. Media Stream Handler

This handler processes HTTP requests and serves media files with proper range support.

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.handler.stream.ChunkedFile;
import java.io.File;
import java.io.RandomAccessFile;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.text.SimpleDateFormat;
import java.util.*;
import static io.netty.handler.codec.http.HttpResponseStatus.*;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
public class MediaStreamHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private final String mediaDirectory;
public MediaStreamHandler(String mediaDirectory) {
this.mediaDirectory = mediaDirectory;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
if (!request.decoderResult().isSuccess()) {
sendError(ctx, BAD_REQUEST);
return;
}
if (request.method() != HttpMethod.GET) {
sendError(ctx, METHOD_NOT_ALLOWED);
return;
}
final String uri = request.uri();
final String path = sanitizeUri(uri);
if (path == null) {
sendError(ctx, FORBIDDEN);
return;
}
File file = new File(path);
if (file.isHidden() || !file.exists()) {
sendError(ctx, NOT_FOUND);
return;
}
if (file.isDirectory()) {
sendListing(ctx, file, uri);
return;
}
if (!file.isFile()) {
sendError(ctx, FORBIDDEN);
return;
}
// Handle range requests for seeking
RandomAccessFile raf = new RandomAccessFile(file, "r");
long fileLength = raf.length();
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
HttpUtil.setContentLength(response, fileLength);
setContentTypeHeader(response, file);
// Check for range header
if (HttpUtil.isKeepAlive(request)) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}
String rangeHeader = request.headers().get(HttpHeaderNames.RANGE);
if (rangeHeader != null) {
// Handle range request
long start = 0;
long end = fileLength - 1;
String[] ranges = rangeHeader.replace("bytes=", "").split("-");
start = Long.parseLong(ranges[0]);
if (ranges.length > 1) {
end = Long.parseLong(ranges[1]);
}
long contentLength = end - start + 1;
HttpResponse partialResponse = new DefaultHttpResponse(HTTP_1_1, PARTIAL_CONTENT);
partialResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH, contentLength);
partialResponse.headers().set(HttpHeaderNames.CONTENT_RANGE, 
"bytes " + start + "-" + end + "/" + fileLength);
setContentTypeHeader(partialResponse, file);
ctx.write(partialResponse);
ctx.write(new ChunkedFile(raf, start, contentLength, 8192));
} else {
// Full file response
ctx.write(response);
ctx.write(new ChunkedFile(raf, 0, fileLength, 8192));
}
ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
if (!HttpUtil.isKeepAlive(request)) {
lastContentFuture.addListener(ChannelFutureListener.CLOSE);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
if (ctx.channel().isActive()) {
sendError(ctx, INTERNAL_SERVER_ERROR);
}
}
private String sanitizeUri(String uri) {
try {
uri = URLDecoder.decode(uri, "UTF-8");
} catch (UnsupportedEncodingException e) {
throw new Error(e);
}
if (uri.isEmpty() || uri.charAt(0) != '/') {
return null;
}
uri = uri.replace('/', File.separatorChar);
if (uri.contains(File.separator + '.') 
|| uri.contains('.' + File.separator) 
|| uri.charAt(0) == '.' || uri.charAt(uri.length() - 1) == '.'
|| !uri.startsWith(mediaDirectory)) {
return null;
}
return mediaDirectory + File.separator + uri;
}
private void setContentTypeHeader(HttpResponse response, File file) {
String filename = file.getName().toLowerCase();
if (filename.endsWith(".mp4")) {
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "video/mp4");
} else if (filename.endsWith(".m3u8")) {
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/vnd.apple.mpegurl");
} else if (filename.endsWith(".ts")) {
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "video/MP2T");
} else if (filename.endsWith(".mp3")) {
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "audio/mpeg");
} else {
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/octet-stream");
}
}
private void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
FullHttpResponse response = new DefaultFullHttpResponse(
HTTP_1_1, status);
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
private void sendListing(ChannelHandlerContext ctx, File dir, String dirUri) {
// Directory listing implementation
// (omitted for brevity)
}
}

4. HLS (HTTP Live Streaming) Server

For adaptive bitrate streaming, let's implement an HLS server.

import java.io.*;
import java.nio.file.*;
import java.util.*;
public class HLSServer {
private final String hlsDirectory;
private final Map<String, List<String>> playlistCache;
public HLSServer(String hlsDirectory) {
this.hlsDirectory = hlsDirectory;
this.playlistCache = new HashMap<>();
loadPlaylists();
}
public void loadPlaylists() {
try {
Files.walk(Paths.get(hlsDirectory))
.filter(path -> path.toString().endsWith(".m3u8"))
.forEach(this::cachePlaylist);
} catch (IOException e) {
e.printStackTrace();
}
}
private void cachePlaylist(Path playlistPath) {
try {
List<String> lines = Files.readAllLines(playlistPath);
String relativePath = hlsDirectory.replaceFirst("^.*?media/", "") + 
"/" + playlistPath.getFileName();
playlistCache.put(relativePath, lines);
} catch (IOException e) {
e.printStackTrace();
}
}
public String getMasterPlaylist(String videoId) {
return "#EXTM3U\n" +
"#EXT-X-VERSION:3\n" +
"#EXT-X-STREAM-INF:BANDWIDTH=800000,RESOLUTION=640x360\n" +
videoId + "/360p.m3u8\n" +
"#EXT-X-STREAM-INF:BANDWIDTH=1400000,RESOLUTION=854x480\n" +
videoId + "/480p.m3u8\n" +
"#EXT-X-STREAM-INF:BANDWIDTH=2800000,RESOLUTION=1280x720\n" +
videoId + "/720p.m3u8\n";
}
}

5. Advanced Streaming with Adaptive Bitrate

Let's create a more sophisticated streaming manager.

import java.util.concurrent.ConcurrentHashMap;
public class StreamingSessionManager {
private final ConcurrentHashMap<String, StreamingSession> sessions;
private final HLSServer hlsServer;
public StreamingSessionManager(String mediaDirectory) {
this.sessions = new ConcurrentHashMap<>();
this.hlsServer = new HLSServer(mediaDirectory + "/hls");
}
public StreamingSession createSession(String sessionId, String videoPath) {
StreamingSession session = new StreamingSession(sessionId, videoPath, hlsServer);
sessions.put(sessionId, session);
return session;
}
public StreamingSession getSession(String sessionId) {
return sessions.get(sessionId);
}
public void removeSession(String sessionId) {
sessions.remove(sessionId);
}
public static class StreamingSession {
private final String sessionId;
private final String videoPath;
private final HLSServer hlsServer;
private long position;
private String quality;
public StreamingSession(String sessionId, String videoPath, HLSServer hlsServer) {
this.sessionId = sessionId;
this.videoPath = videoPath;
this.hlsServer = hlsServer;
this.position = 0;
this.quality = "auto";
}
public String getPlaylist() {
if (videoPath.endsWith(".m3u8")) {
return hlsServer.getMasterPlaylist(videoPath.replace(".mp4", ""));
}
return null;
}
public void updatePosition(long newPosition) {
this.position = newPosition;
}
public void setQuality(String quality) {
this.quality = quality;
}
// Getters
public long getPosition() { return position; }
public String getQuality() { return quality; }
public String getSessionId() { return sessionId; }
}
}

6. WebSocket for Live Streaming

For real-time live streaming capabilities:

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.*;
public class WebSocketStreamHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
private final StreamingSessionManager sessionManager;
public WebSocketStreamHandler(StreamingSessionManager sessionManager) {
this.sessionManager = sessionManager;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
if (frame instanceof TextWebSocketFrame) {
// Handle control messages (play, pause, seek, quality change)
String text = ((TextWebSocketFrame) frame).text();
handleControlMessage(ctx, text);
} else if (frame instanceof BinaryWebSocketFrame) {
// Handle binary data (for live streaming input)
handleBinaryData(ctx, (BinaryWebSocketFrame) frame);
} else if (frame instanceof CloseWebSocketFrame) {
ctx.close();
}
}
private void handleControlMessage(ChannelHandlerContext ctx, String message) {
// Parse JSON control messages
// Example: {"action": "play", "sessionId": "123", "position": 1000, "quality": "720p"}
// Implement control logic here
}
private void handleBinaryData(ChannelHandlerContext ctx, BinaryWebSocketFrame frame) {
// Handle incoming live stream data
// This would be used for broadcasting live content
}
}

7. Configuration and Main Application

import java.util.Properties;
public class StreamingServerConfig {
private final int port;
private final String mediaDirectory;
private final int maxChunkSize;
private final boolean enableHls;
private final boolean enableWebSocket;
public StreamingServerConfig(Properties props) {
this.port = Integer.parseInt(props.getProperty("server.port", "8080"));
this.mediaDirectory = props.getProperty("media.directory", "./media");
this.maxChunkSize = Integer.parseInt(props.getProperty("max.chunk.size", "8192"));
this.enableHls = Boolean.parseBoolean(props.getProperty("enable.hls", "true"));
this.enableWebSocket = Boolean.parseBoolean(props.getProperty("enable.websocket", "true"));
}
// Getters
public int getPort() { return port; }
public String getMediaDirectory() { return mediaDirectory; }
public int getMaxChunkSize() { return maxChunkSize; }
public boolean isEnableHls() { return enableHls; }
public boolean isEnableWebSocket() { return enableWebSocket; }
}
public class StreamingMediaServer {
public static void main(String[] args) throws Exception {
Properties configProps = new Properties();
configProps.load(Files.newInputStream(Paths.get("streaming-server.properties")));
StreamingServerConfig config = new StreamingServerConfig(configProps);
StreamingSessionManager sessionManager = new StreamingSessionManager(config.getMediaDirectory());
// Start the server
BasicStreamingServer server = new BasicStreamingServer(config.getPort(), config.getMediaDirectory());
server.run();
}
}

8. Sample Configuration File

Create streaming-server.properties:

server.port=8080
media.directory=./media
max.chunk.size=8192
enable.hls=true
enable.websocket=true
hls.segment.duration=10
max.connections=1000

Best Practices for Production

  1. Content Delivery Network (CDN): Integrate with CDN for global distribution
  2. Caching Strategy: Implement proper cache headers for media segments
  3. Load Balancing: Use multiple server instances behind a load balancer
  4. Monitoring: Add metrics for bandwidth, concurrent connections, and error rates
  5. Security: Implement authentication, CORS, and rate limiting
  6. Adaptive Bitrate: Use tools like FFmpeg to create multiple quality versions
  7. Containerization: Dockerize for easy deployment

Conclusion

This Java streaming server provides a solid foundation for building robust media delivery solutions. The architecture supports:

  • Progressive download for MP4 files
  • Adaptive bitrate streaming via HLS
  • Live streaming capabilities through WebSockets
  • Efficient chunked transfer for large files
  • Session management for tracking user playback state

While this implementation covers the core functionality, production systems would benefit from integrating with specialized media processing tools like FFmpeg for transcoding and incorporating cloud storage solutions for scalable media asset management.

The modular design allows for easy extension with additional features like DRM, analytics, or advanced adaptive streaming algorithms.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper