Janus Gateway Integration in Java: WebRTC Media Server Communication

Janus Gateway is a general-purpose WebRTC server that enables real-time communication. This comprehensive guide covers Java integration for building WebRTC applications, video conferencing, and streaming solutions.

Project Setup and Dependencies

Maven Configuration

<!-- pom.xml -->
<properties>
<java.websocket.version>1.5.3</java.websocket.version>
<gson.version>2.10.1</gson.version>
<okhttp.version>4.11.0</okhttp.version>
</properties>
<dependencies>
<!-- WebSocket client for Janus API -->
<dependency>
<groupId>org.java-websocket</groupId>
<artifactId>Java-WebSocket</artifactId>
<version>${java.websocket.version}</version>
</dependency>
<!-- JSON processing -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>${gson.version}</version>
</dependency>
<!-- HTTP client for REST API -->
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>${okhttp.version}</version>
</dependency>
<!-- Logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.9</version>
</dependency>
</dependencies>

Core Janus Client Implementation

WebSocket Connection Handler

package com.janus.client;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import java.net.URI;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class JanusWebSocketClient extends WebSocketClient {
private static final Gson gson = new Gson();
private final ConcurrentHashMap<String, CompletableFuture<JsonObject>> pendingTransactions;
private final ScheduledExecutorService heartbeatExecutor;
private final JanusMessageHandler messageHandler;
private boolean connected = false;
public JanusWebSocketClient(URI serverUri, JanusMessageHandler messageHandler) {
super(serverUri);
this.pendingTransactions = new ConcurrentHashMap<>();
this.heartbeatExecutor = Executors.newSingleThreadScheduledExecutor();
this.messageHandler = messageHandler;
}
@Override
public void onOpen(ServerHandshake handshake) {
System.out.println("Connected to Janus server");
this.connected = true;
// Start heartbeat
heartbeatExecutor.scheduleAtFixedRate(this::sendKeepAlive, 30, 30, TimeUnit.SECONDS);
if (messageHandler != null) {
messageHandler.onConnected();
}
}
@Override
public void onMessage(String message) {
System.out.println("Received: " + message);
try {
JsonObject jsonResponse = JsonParser.parseString(message).getAsJsonObject();
// Handle transaction responses
if (jsonResponse.has("transaction")) {
String transactionId = jsonResponse.get("transaction").getAsString();
CompletableFuture<JsonObject> future = pendingTransactions.remove(transactionId);
if (future != null) {
future.complete(jsonResponse);
}
}
// Handle events
if (jsonResponse.has("janus")) {
String janusType = jsonResponse.get("janus").getAsString();
handleJanusEvent(janusType, jsonResponse);
}
} catch (Exception e) {
System.err.println("Error processing message: " + e.getMessage());
}
}
@Override
public void onClose(int code, String reason, boolean remote) {
System.out.println("Connection closed: " + reason + " (code: " + code + ")");
this.connected = false;
heartbeatExecutor.shutdown();
if (messageHandler != null) {
messageHandler.onDisconnected(code, reason);
}
}
@Override
public void onError(Exception ex) {
System.err.println("WebSocket error: " + ex.getMessage());
if (messageHandler != null) {
messageHandler.onError(ex);
}
}
private void handleJanusEvent(String janusType, JsonObject message) {
if (messageHandler == null) return;
switch (janusType) {
case "event":
messageHandler.onEvent(message);
break;
case "media":
messageHandler.onMediaEvent(message);
break;
case "webrtc-up":
messageHandler.onWebRTCUp(message);
break;
case "hangup":
messageHandler.onHangup(message);
break;
case "detached":
messageHandler.onDetached(message);
break;
case "error":
messageHandler.onError(message);
break;
default:
messageHandler.onUnknownEvent(janusType, message);
}
}
private void sendKeepAlive() {
if (isOpen()) {
JsonObject keepAlive = new JsonObject();
keepAlive.addProperty("janus", "keepalive");
send(keepAlive.toString());
}
}
public CompletableFuture<JsonObject> sendRequest(JsonObject request) {
if (!isOpen()) {
return CompletableFuture.failedFuture(new IllegalStateException("WebSocket not connected"));
}
// Generate transaction ID if not present
if (!request.has("transaction")) {
String transactionId = "txn_" + System.currentTimeMillis() + "_" + System.nanoTime();
request.addProperty("transaction", transactionId);
}
String transactionId = request.get("transaction").getAsString();
CompletableFuture<JsonObject> future = new CompletableFuture<>();
pendingTransactions.put(transactionId, future);
// Set timeout
CompletableFuture<JsonObject> timeoutFuture = future
.orTimeout(10, TimeUnit.SECONDS)
.exceptionally(throwable -> {
pendingTransactions.remove(transactionId);
return null;
});
send(request.toString());
return timeoutFuture;
}
public boolean isConnected() {
return connected && isOpen();
}
public void disconnect() {
heartbeatExecutor.shutdown();
close();
}
}

Message Handler Interface

package com.janus.client;
import com.google.gson.JsonObject;
public interface JanusMessageHandler {
void onConnected();
void onDisconnected(int code, String reason);
void onError(Exception ex);
void onEvent(JsonObject event);
void onMediaEvent(JsonObject mediaEvent);
void onWebRTCUp(JsonObject event);
void onHangup(JsonObject event);
void onDetached(JsonObject event);
void onError(JsonObject error);
void onUnknownEvent(String eventType, JsonObject event);
default void onLocalStream(String streamId) {}
default void onRemoteStream(String streamId) {}
default void onIceCandidate(String candidate, int sdpMLineIndex, String sdpMid) {}
}

Janus Session and Plugin Management

Session Management

package com.janus.client;
import com.google.gson.JsonObject;
import com.google.gson.JsonArray;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
public class JanusSession {
private final JanusWebSocketClient client;
private final long sessionId;
private final ConcurrentHashMap<Long, JanusPluginHandle> pluginHandles;
private boolean destroyed = false;
public JanusSession(JanusWebSocketClient client, long sessionId) {
this.client = client;
this.sessionId = sessionId;
this.pluginHandles = new ConcurrentHashMap<>();
}
/**
* Attach to a plugin
*/
public CompletableFuture<JanusPluginHandle> attachPlugin(String plugin) {
JsonObject request = new JsonObject();
request.addProperty("janus", "attach");
request.addProperty("session_id", sessionId);
request.addProperty("plugin", plugin);
return client.sendRequest(request).thenApply(response -> {
JsonObject data = response.getAsJsonObject("data");
long handleId = data.get("id").getAsLong();
JanusPluginHandle pluginHandle = new JanusPluginHandle(client, sessionId, handleId, plugin);
pluginHandles.put(handleId, pluginHandle);
return pluginHandle;
});
}
/**
* Send keep-alive for session
*/
public CompletableFuture<JsonObject> sendKeepAlive() {
JsonObject request = new JsonObject();
request.addProperty("janus", "keepalive");
request.addProperty("session_id", sessionId);
return client.sendRequest(request);
}
/**
* Destroy session
*/
public CompletableFuture<JsonObject> destroy() {
if (destroyed) {
return CompletableFuture.completedFuture(null);
}
JsonObject request = new JsonObject();
request.addProperty("janus", "destroy");
request.addProperty("session_id", sessionId);
return client.sendRequest(request).thenApply(response -> {
destroyed = true;
pluginHandles.clear();
return response;
});
}
public long getSessionId() {
return sessionId;
}
public boolean isDestroyed() {
return destroyed;
}
public JanusPluginHandle getPluginHandle(long handleId) {
return pluginHandles.get(handleId);
}
}

Plugin Handle Implementation

package com.janus.client;
import com.google.gson.JsonObject;
import com.google.gson.JsonArray;
import java.util.concurrent.CompletableFuture;
public class JanusPluginHandle {
private final JanusWebSocketClient client;
private final long sessionId;
private final long handleId;
private final String plugin;
private boolean detached = false;
public JanusPluginHandle(JanusWebSocketClient client, long sessionId, long handleId, String plugin) {
this.client = client;
this.sessionId = sessionId;
this.handleId = handleId;
this.plugin = plugin;
}
/**
* Send message to plugin
*/
public CompletableFuture<JsonObject> sendMessage(JsonObject body) {
return sendMessage(body, null);
}
public CompletableFuture<JsonObject> sendMessage(JsonObject body, JsonObject jsep) {
JsonObject request = new JsonObject();
request.addProperty("janus", "message");
request.addProperty("session_id", sessionId);
request.addProperty("handle_id", handleId);
request.add("body", body);
if (jsep != null) {
request.add("jsep", jsep);
}
return client.sendRequest(request);
}
/**
* Send trickle candidate
*/
public CompletableFuture<JsonObject> sendTrickleCandidate(String candidate, int sdpMLineIndex, String sdpMid) {
JsonObject candidateObj = new JsonObject();
candidateObj.addProperty("candidate", candidate);
candidateObj.addProperty("sdpMLineIndex", sdpMLineIndex);
candidateObj.addProperty("sdpMid", sdpMid);
JsonObject request = new JsonObject();
request.addProperty("janus", "trickle");
request.addProperty("session_id", sessionId);
request.addProperty("handle_id", handleId);
request.add("candidate", candidateObj);
return client.sendRequest(request);
}
/**
* Send complete trickle
*/
public CompletableFuture<JsonObject> sendTrickleComplete() {
JsonObject request = new JsonObject();
request.addProperty("janus", "trickle");
request.addProperty("session_id", sessionId);
request.addProperty("handle_id", handleId);
request.add("candidate", new JsonObject());
request.getAsJsonObject("candidate").addProperty("completed", true);
return client.sendRequest(request);
}
/**
* Detach from plugin
*/
public CompletableFuture<JsonObject> detach() {
JsonObject request = new JsonObject();
request.addProperty("janus", "detach");
request.addProperty("session_id", sessionId);
request.addProperty("handle_id", handleId);
return client.sendRequest(request).thenApply(response -> {
detached = true;
return response;
});
}
public long getHandleId() {
return handleId;
}
public String getPlugin() {
return plugin;
}
public boolean isDetached() {
return detached;
}
}

VideoRoom Plugin Implementation

VideoRoom Client

package com.janus.plugins;
import com.janus.client.JanusPluginHandle;
import com.janus.client.JanusSession;
import com.google.gson.JsonObject;
import com.google.gson.JsonArray;
import java.util.concurrent.CompletableFuture;
import java.util.ArrayList;
import java.util.List;
public class VideoRoomPlugin {
private final JanusPluginHandle handle;
public VideoRoomPlugin(JanusPluginHandle handle) {
this.handle = handle;
}
/**
* Create a new video room
*/
public CompletableFuture<Long> createRoom(String roomName, String description, int bitrate) {
JsonObject body = new JsonObject();
body.addProperty("request", "create");
body.addProperty("description", description);
body.addProperty("bitrate", bitrate);
body.addProperty("publishers", 10); // Max publishers
body.addProperty("bitrate", bitrate);
JsonObject config = new JsonObject();
config.addProperty("description", description);
body.add("config", config);
return handle.sendMessage(body).thenApply(response -> {
JsonObject pluginData = response.getAsJsonObject("plugindata");
JsonObject data = pluginData.getAsJsonObject("data");
return data.get("room").getAsLong();
});
}
/**
* List available rooms
*/
public CompletableFuture<List<RoomInfo>> listRooms() {
JsonObject body = new JsonObject();
body.addProperty("request", "list");
return handle.sendMessage(body).thenApply(response -> {
JsonObject pluginData = response.getAsJsonObject("plugindata");
JsonObject data = pluginData.getAsJsonObject("data");
JsonArray rooms = data.getAsJsonArray("list");
List<RoomInfo> roomList = new ArrayList<>();
for (int i = 0; i < rooms.size(); i++) {
JsonObject room = rooms.get(i).getAsJsonObject();
roomList.add(new RoomInfo(
room.get("room").getAsLong(),
room.get("description").getAsString(),
room.get("num_participants").getAsInt()
));
}
return roomList;
});
}
/**
* Join a room as publisher
*/
public CompletableFuture<JoinResult> joinAsPublisher(long roomId, String displayName) {
JsonObject body = new JsonObject();
body.addProperty("request", "join");
body.addProperty("room", roomId);
body.addProperty("ptype", "publisher");
body.addProperty("display", displayName);
return handle.sendMessage(body).thenApply(response -> {
JsonObject pluginData = response.getAsJsonObject("plugindata");
JsonObject data = pluginData.getAsJsonObject("data");
JsonArray publishers = data.getAsJsonArray("publishers");
List<PublisherInfo> publisherList = new ArrayList<>();
for (int i = 0; i < publishers.size(); i++) {
JsonObject publisher = publishers.get(i).getAsJsonObject();
publisherList.add(new PublisherInfo(
publisher.get("id").getAsLong(),
publisher.get("display").getAsString()
));
}
return new JoinResult(
data.get("id").getAsLong(),
publisherList
);
});
}
/**
* Join a room as subscriber
*/
public CompletableFuture<JoinResult> joinAsSubscriber(long roomId, long feedId, String displayName) {
JsonObject body = new JsonObject();
body.addProperty("request", "join");
body.addProperty("room", roomId);
body.addProperty("ptype", "subscriber");
body.addProperty("feed", feedId);
body.addProperty("display", displayName);
return handle.sendMessage(body);
}
/**
* Publish stream (send offer)
*/
public CompletableFuture<JsonObject> publish(JsonObject jsep) {
JsonObject body = new JsonObject();
body.addProperty("request", "publish");
body.addProperty("audio", true);
body.addProperty("video", true);
return handle.sendMessage(body, jsep);
}
/**
* Subscribe to a feed
*/
public CompletableFuture<JsonObject> subscribe(long feedId, JsonObject jsep) {
JsonObject body = new JsonObject();
body.addProperty("request", "start");
body.addProperty("feed", feedId);
return handle.sendMessage(body, jsep);
}
/**
* Leave room
*/
public CompletableFuture<JsonObject> leave() {
JsonObject body = new JsonObject();
body.addProperty("request", "leave");
return handle.sendMessage(body);
}
/**
* Data models
*/
public static class RoomInfo {
public final long roomId;
public final String description;
public final int participantCount;
public RoomInfo(long roomId, String description, int participantCount) {
this.roomId = roomId;
this.description = description;
this.participantCount = participantCount;
}
}
public static class JoinResult {
public final long participantId;
public final List<PublisherInfo> publishers;
public JoinResult(long participantId, List<PublisherInfo> publishers) {
this.participantId = participantId;
this.publishers = publishers;
}
}
public static class PublisherInfo {
public final long feedId;
public final String displayName;
public PublisherInfo(long feedId, String displayName) {
this.feedId = feedId;
this.displayName = displayName;
}
}
}

Streaming Plugin Implementation

Streaming Client

package com.janus.plugins;
import com.janus.client.JanusPluginHandle;
import com.google.gson.JsonObject;
import com.google.gson.JsonArray;
import java.util.concurrent.CompletableFuture;
import java.util.ArrayList;
import java.util.List;
public class StreamingPlugin {
private final JanusPluginHandle handle;
public StreamingPlugin(JanusPluginHandle handle) {
this.handle = handle;
}
/**
* List available streams
*/
public CompletableFuture<List<StreamInfo>> listStreams() {
JsonObject body = new JsonObject();
body.addProperty("request", "list");
return handle.sendMessage(body).thenApply(response -> {
JsonObject pluginData = response.getAsJsonObject("plugindata");
JsonObject data = pluginData.getAsJsonObject("data");
JsonArray streams = data.getAsJsonArray("list");
List<StreamInfo> streamList = new ArrayList<>();
for (int i = 0; i < streams.size(); i++) {
JsonObject stream = streams.get(i).getAsJsonObject();
streamList.add(new StreamInfo(
stream.get("id").getAsLong(),
stream.get("description").getAsString(),
stream.get("type").getAsString()
));
}
return streamList;
});
}
/**
* Watch a stream
*/
public CompletableFuture<JsonObject> watch(long streamId, JsonObject jsep) {
JsonObject body = new JsonObject();
body.addProperty("request", "watch");
body.addProperty("id", streamId);
return handle.sendMessage(body, jsep);
}
/**
* Start a stream
*/
public CompletableFuture<JsonObject> startStream(long streamId, JsonObject jsep) {
JsonObject body = new JsonObject();
body.addProperty("request", "publish");
body.addProperty("id", streamId);
return handle.sendMessage(body, jsep);
}
/**
* Stop watching
*/
public CompletableFuture<JsonObject> stop() {
JsonObject body = new JsonObject();
body.addProperty("request", "stop");
return handle.sendMessage(body);
}
/**
* Create a new stream
*/
public CompletableFuture<Long> createStream(String description, String type, boolean audio, boolean video) {
JsonObject body = new JsonObject();
body.addProperty("request", "create");
body.addProperty("description", description);
body.addProperty("type", type);
body.addProperty("audio", audio);
body.addProperty("video", video);
JsonObject config = new JsonObject();
config.addProperty("description", description);
body.add("config", config);
return handle.sendMessage(body).thenApply(response -> {
JsonObject pluginData = response.getAsJsonObject("plugindata");
JsonObject data = pluginData.getAsJsonObject("data");
return data.get("id").getAsLong();
});
}
/**
* Data models
*/
public static class StreamInfo {
public final long streamId;
public final String description;
public final String type;
public StreamInfo(long streamId, String description, String type) {
this.streamId = streamId;
this.description = description;
this.type = type;
}
}
}

SIP Plugin Implementation

SIP Client

package com.janus.plugins;
import com.janus.client.JanusPluginHandle;
import com.google.gson.JsonObject;
import java.util.concurrent.CompletableFuture;
public class SIPPlugin {
private final JanusPluginHandle handle;
private String callId;
public SIPPlugin(JanusPluginHandle handle) {
this.handle = handle;
}
/**
* Register with SIP server
*/
public CompletableFuture<JsonObject> register(String username, String password, String server) {
JsonObject body = new JsonObject();
body.addProperty("request", "register");
body.addProperty("username", username);
body.addProperty("secret", password);
body.addProperty("proxy", server);
return handle.sendMessage(body);
}
/**
* Make a call
*/
public CompletableFuture<JsonObject> call(String uri, JsonObject jsep) {
JsonObject body = new JsonObject();
body.addProperty("request", "call");
body.addProperty("uri", uri);
return handle.sendMessage(body, jsep).thenApply(response -> {
// Store call ID for later reference
if (response.has("plugindata")) {
JsonObject pluginData = response.getAsJsonObject("plugindata");
JsonObject data = pluginData.getAsJsonObject("data");
if (data.has("call_id")) {
this.callId = data.get("call_id").getAsString();
}
}
return response;
});
}
/**
* Accept incoming call
*/
public CompletableFuture<JsonObject> accept(JsonObject jsep) {
JsonObject body = new JsonObject();
body.addProperty("request", "accept");
return handle.sendMessage(body, jsep);
}
/**
* Hang up current call
*/
public CompletableFuture<JsonObject> hangup() {
JsonObject body = new JsonObject();
body.addProperty("request", "hangup");
return handle.sendMessage(body).thenApply(response -> {
this.callId = null;
return response;
});
}
/**
* Send DTMF tones
*/
public CompletableFuture<JsonObject> sendDTMF(String digits) {
JsonObject body = new JsonObject();
body.addProperty("request", "dtmf");
body.addProperty("digits", digits);
return handle.sendMessage(body);
}
public String getCallId() {
return callId;
}
}

Main Janus Client Class

Comprehensive Janus Client

package com.janus.client;
import com.google.gson.JsonObject;
import java.net.URI;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
public class JanusClient {
private final String serverUrl;
private JanusWebSocketClient webSocketClient;
private final ConcurrentHashMap<Long, JanusSession> sessions;
private JanusMessageHandler messageHandler;
public JanusClient(String serverUrl) {
this.serverUrl = serverUrl;
this.sessions = new ConcurrentHashMap<>();
}
public JanusClient(String serverUrl, JanusMessageHandler messageHandler) {
this(serverUrl);
this.messageHandler = messageHandler;
}
/**
* Connect to Janus server
*/
public CompletableFuture<Boolean> connect() {
try {
URI serverUri = new URI(serverUrl);
this.webSocketClient = new JanusWebSocketClient(serverUri, new InternalMessageHandler());
CompletableFuture<Boolean> connectionFuture = new CompletableFuture<>();
// Add connection listener
new Thread(() -> {
try {
webSocketClient.connectBlocking();
connectionFuture.complete(true);
} catch (InterruptedException e) {
connectionFuture.completeExceptionally(e);
}
}).start();
return connectionFuture;
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
}
/**
* Create a new session
*/
public CompletableFuture<JanusSession> createSession() {
JsonObject request = new JsonObject();
request.addProperty("janus", "create");
return webSocketClient.sendRequest(request).thenApply(response -> {
JsonObject data = response.getAsJsonObject("data");
long sessionId = data.get("id").getAsLong();
JanusSession session = new JanusSession(webSocketClient, sessionId);
sessions.put(sessionId, session);
return session;
});
}
/**
* Get server info
*/
public CompletableFuture<JsonObject> getServerInfo() {
JsonObject request = new JsonObject();
request.addProperty("janus", "info");
return webSocketClient.sendRequest(request);
}
/**
* Disconnect from server
*/
public void disconnect() {
if (webSocketClient != null) {
// Destroy all sessions
sessions.values().forEach(session -> {
if (!session.isDestroyed()) {
session.destroy();
}
});
sessions.clear();
webSocketClient.disconnect();
}
}
public boolean isConnected() {
return webSocketClient != null && webSocketClient.isConnected();
}
public void setMessageHandler(JanusMessageHandler messageHandler) {
this.messageHandler = messageHandler;
}
/**
* Internal message handler that delegates to external handler
*/
private class InternalMessageHandler implements JanusMessageHandler {
@Override
public void onConnected() {
if (messageHandler != null) {
messageHandler.onConnected();
}
}
@Override
public void onDisconnected(int code, String reason) {
if (messageHandler != null) {
messageHandler.onDisconnected(code, reason);
}
}
@Override
public void onError(Exception ex) {
if (messageHandler != null) {
messageHandler.onError(ex);
}
}
@Override
public void onEvent(JsonObject event) {
if (messageHandler != null) {
messageHandler.onEvent(event);
}
}
@Override
public void onMediaEvent(JsonObject mediaEvent) {
if (messageHandler != null) {
messageHandler.onMediaEvent(mediaEvent);
}
}
@Override
public void onWebRTCUp(JsonObject event) {
if (messageHandler != null) {
messageHandler.onWebRTCUp(event);
}
}
@Override
public void onHangup(JsonObject event) {
if (messageHandler != null) {
messageHandler.onHangup(event);
}
}
@Override
public void onDetached(JsonObject event) {
if (messageHandler != null) {
messageHandler.onDetached(event);
}
}
@Override
public void onError(JsonObject error) {
if (messageHandler != null) {
messageHandler.onError(error);
}
}
@Override
public void onUnknownEvent(String eventType, JsonObject event) {
if (messageHandler != null) {
messageHandler.onUnknownEvent(eventType, event);
}
}
@Override
public void onIceCandidate(String candidate, int sdpMLineIndex, String sdpMid) {
if (messageHandler != null) {
messageHandler.onIceCandidate(candidate, sdpMLineIndex, sdpMid);
}
}
}
}

Usage Examples

Video Conference Example

package com.janus.examples;
import com.janus.client.*;
import com.janus.plugins.VideoRoomPlugin;
import com.google.gson.JsonObject;
import java.util.concurrent.CompletableFuture;
public class VideoConferenceExample {
private JanusClient janusClient;
private JanusSession session;
private VideoRoomPlugin videoRoom;
private long roomId;
public CompletableFuture<Boolean> startConference(String serverUrl, String roomName) {
janusClient = new JanusClient(serverUrl, new ConferenceMessageHandler());
return janusClient.connect()
.thenCompose(connected -> {
if (!connected) {
throw new RuntimeException("Failed to connect to Janus");
}
return janusClient.createSession();
})
.thenCompose(session -> {
this.session = session;
return session.attachPlugin("janus.plugin.videoroom");
})
.thenCompose(handle -> {
this.videoRoom = new VideoRoomPlugin(handle);
return videoRoom.createRoom(roomName, "Video Conference Room", 128000);
})
.thenCompose(roomId -> {
this.roomId = roomId;
System.out.println("Created room: " + roomId);
return videoRoom.joinAsPublisher(roomId, "Conference Host");
})
.thenApply(joinResult -> {
System.out.println("Joined room as publisher with ID: " + joinResult.participantId);
return true;
});
}
public CompletableFuture<Boolean> joinConference(String serverUrl, long roomId, String displayName) {
janusClient = new JanusClient(serverUrl, new ConferenceMessageHandler());
return janusClient.connect()
.thenCompose(connected -> janusClient.createSession())
.thenCompose(session -> session.attachPlugin("janus.plugin.videoroom"))
.thenCompose(handle -> {
this.videoRoom = new VideoRoomPlugin(handle);
return videoRoom.joinAsPublisher(roomId, displayName);
})
.thenApply(joinResult -> {
System.out.println("Joined conference as: " + displayName);
return true;
});
}
public void leaveConference() {
if (videoRoom != null) {
videoRoom.leave();
}
if (session != null) {
session.destroy();
}
if (janusClient != null) {
janusClient.disconnect();
}
}
private static class ConferenceMessageHandler implements JanusMessageHandler {
@Override
public void onConnected() {
System.out.println("Connected to Janus server");
}
@Override
public void onDisconnected(int code, String reason) {
System.out.println("Disconnected: " + reason);
}
@Override
public void onError(Exception ex) {
System.err.println("Error: " + ex.getMessage());
}
@Override
public void onEvent(JsonObject event) {
System.out.println("Conference event: " + event);
}
@Override
public void onMediaEvent(JsonObject mediaEvent) {
System.out.println("Media event: " + mediaEvent);
}
@Override
public void onWebRTCUp(JsonObject event) {
System.out.println("WebRTC connection established");
}
@Override
public void onHangup(JsonObject event) {
System.out.println("Call hung up");
}
// Implement other required methods...
@Override public void onDetached(JsonObject event) {}
@Override public void onError(JsonObject error) {}
@Override public void onUnknownEvent(String eventType, JsonObject event) {}
}
public static void main(String[] args) {
VideoConferenceExample conference = new VideoConferenceExample();
// Create a conference room
conference.startConference("ws://localhost:8188/", "Java Conference")
.thenRun(() -> System.out.println("Conference started successfully"))
.exceptionally(throwable -> {
System.err.println("Failed to start conference: " + throwable.getMessage());
return null;
});
// Keep the application running
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
conference.leaveConference();
}
}

Streaming Example

package com.janus.examples;
import com.janus.client.*;
import com.janus.plugins.StreamingPlugin;
import com.google.gson.JsonObject;
import java.util.concurrent.CompletableFuture;
public class StreamingExample {
private JanusClient janusClient;
private JanusSession session;
private StreamingPlugin streaming;
public CompletableFuture<Void> watchStream(String serverUrl, long streamId) {
janusClient = new JanusClient(serverUrl, new StreamingMessageHandler());
return janusClient.connect()
.thenCompose(connected -> janusClient.createSession())
.thenCompose(session -> {
this.session = session;
return session.attachPlugin("janus.plugin.streaming");
})
.thenCompose(handle -> {
this.streaming = new StreamingPlugin(handle);
return streaming.listStreams();
})
.thenCompose(streams -> {
System.out.println("Available streams:");
streams.forEach(stream -> 
System.out.println("  " + stream.streamId + ": " + stream.description));
// Watch the specified stream
JsonObject jsep = createOfferSDP(); // You need to implement WebRTC offer creation
return streaming.watch(streamId, jsep);
})
.thenAccept(response -> {
System.out.println("Watching stream: " + streamId);
});
}
private JsonObject createOfferSDP() {
// This would typically come from a WebRTC implementation
JsonObject jsep = new JsonObject();
jsep.addProperty("type", "offer");
jsep.addProperty("sdp", "v=0\r\no=- 123456 2 IN IP4 127.0.0.1\r\n...");
return jsep;
}
private static class StreamingMessageHandler implements JanusMessageHandler {
@Override
public void onConnected() {
System.out.println("Connected to streaming server");
}
@Override
public void onMediaEvent(JsonObject mediaEvent) {
System.out.println("Streaming media event received");
}
@Override
public void onWebRTCUp(JsonObject event) {
System.out.println("Streaming WebRTC connection up");
}
// Implement other required methods...
@Override public void onDisconnected(int code, String reason) {}
@Override public void onError(Exception ex) {}
@Override public void onEvent(JsonObject event) {}
@Override public void onHangup(JsonObject event) {}
@Override public void onDetached(JsonObject event) {}
@Override public void onError(JsonObject error) {}
@Override public void onUnknownEvent(String eventType, JsonObject event) {}
}
}

Error Handling and Recovery

Robust Janus Client with Retry Logic

package com.janus.client;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class RobustJanusClient {
private final String serverUrl;
private JanusClient janusClient;
private final AtomicInteger reconnectAttempts = new AtomicInteger(0);
private final int maxReconnectAttempts = 5;
public RobustJanusClient(String serverUrl) {
this.serverUrl = serverUrl;
}
public CompletableFuture<JanusSession> connectWithRetry() {
return connectInternal(0);
}
private CompletableFuture<JanusSession> connectInternal(int attempt) {
if (attempt >= maxReconnectAttempts) {
return CompletableFuture.failedFuture(
new RuntimeException("Max reconnection attempts reached"));
}
if (janusClient == null || !janusClient.isConnected()) {
janusClient = new JanusClient(serverUrl, new ReconnectionHandler());
}
return janusClient.connect()
.thenCompose(connected -> {
if (!connected) {
throw new RuntimeException("Connection failed");
}
return janusClient.createSession();
})
.exceptionally(throwable -> {
System.err.println("Connection attempt " + (attempt + 1) + " failed: " + 
throwable.getMessage());
// Wait before retry (exponential backoff)
long delay = Math.min(1000 * (long) Math.pow(2, attempt), 30000);
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
return connectInternal(attempt + 1).join();
});
}
private class ReconnectionHandler implements JanusMessageHandler {
@Override
public void onDisconnected(int code, String reason) {
System.out.println("Disconnected, attempting reconnection...");
reconnectAttempts.set(0);
connectWithRetry();
}
@Override
public void onError(Exception ex) {
System.err.println("Connection error: " + ex.getMessage());
}
// Implement other methods...
@Override public void onConnected() {}
@Override public void onEvent(JsonObject event) {}
@Override public void onMediaEvent(JsonObject mediaEvent) {}
@Override public void onWebRTCUp(JsonObject event) {}
@Override public void onHangup(JsonObject event) {}
@Override public void onDetached(JsonObject event) {}
@Override public void onError(JsonObject error) {}
@Override public void onUnknownEvent(String eventType, JsonObject event) {}
}
}

Conclusion

This Java Janus Gateway integration provides:

Key Features:

  • WebSocket-based communication with Janus server
  • Session and plugin management for different use cases
  • VideoRoom plugin for video conferencing
  • Streaming plugin for live streaming
  • SIP plugin for VoIP integration
  • Comprehensive error handling and reconnection logic

Use Cases:

  • Video conferencing applications
  • Live streaming platforms
  • VoIP systems integration
  • Real-time communication apps
  • WebRTC-based services

Best Practices:

  1. Connection Management: Implement reconnection logic for production use
  2. Error Handling: Handle WebRTC negotiation failures gracefully
  3. Resource Cleanup: Always destroy sessions and close connections
  4. Message Processing: Use transaction IDs for request-response mapping
  5. Performance: Consider connection pooling for multiple sessions

Integration Notes:

  • Janus uses JSON over WebSocket for control messages
  • WebRTC media is handled separately through ICE candidates
  • Different plugins provide specialized functionality
  • The API is extensible for custom plugin development

This implementation provides a solid foundation for building Java applications that leverage Janus Gateway's powerful WebRTC capabilities.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper