SkyWalking is an open-source Application Performance Monitoring (APM) system that provides distributed tracing, service mesh telemetry, and observability analysis. The OAP (Observability Analysis Platform) server is the core component that processes, analyzes, and aggregates telemetry data.
Architecture Overview
Key Components
- OAP Server: Processes and analyzes telemetry data
- Storage: Elasticsearch, H2, MySQL, etc.
- UI: Web interface for visualization
- Agents: Collect telemetry data from applications
Setup and Configuration
Dependencies
<properties>
<skywalking.version>9.4.0</skywalking.version>
</properties>
<!-- SkyWalking OAP Server -->
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>oap-server</artifactId>
<version>${skywalking.version}</version>
</dependency>
<!-- SkyWalking Toolkit -->
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-toolkit-trace</artifactId>
<version>${skywalking.version}</version>
</dependency>
<!-- For Logback integration -->
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-toolkit-logback-1.x</artifactId>
<version>${skywalking.version}</version>
</dependency>
Basic OAP Server Implementation
Example 1: Embedded OAP Server Setup
@Component
@Slf4j
public class EmbeddedOapServer {
private OAPServerBootstrap oapServer;
private Thread serverThread;
@Value("${skywalking.config.file:config/application.yml}")
private String configFile;
@Value("${skywalking.storage:elasticsearch}")
private String storage;
public void startOapServer() {
log.info("Starting Embedded SkyWalking OAP Server...");
serverThread = new Thread(() -> {
try {
// Set system properties for OAP configuration
System.setProperty("sw.config", configFile);
System.setProperty("sw.storage", storage);
// Initialize and start OAP server
oapServer = new OAPServerBootstrap();
oapServer.start();
log.info("SkyWalking OAP Server started successfully");
} catch (Exception e) {
log.error("Failed to start OAP Server", e);
}
});
serverThread.setName("OAP-Server-Thread");
serverThread.setDaemon(true);
serverThread.start();
// Wait for server to initialize
await().atMost(30, TimeUnit.SECONDS)
.until(this::isServerReady);
}
public void stopOapServer() {
if (oapServer != null) {
try {
oapServer.stop();
log.info("SkyWalking OAP Server stopped");
} catch (Exception e) {
log.error("Error stopping OAP Server", e);
}
}
if (serverThread != null && serverThread.isAlive()) {
serverThread.interrupt();
}
}
private boolean isServerReady() {
// Check if core modules are initialized
try {
ModuleManager moduleManager = ModuleManager.INSTANCE;
return moduleManager.isInited();
} catch (Exception e) {
return false;
}
}
@PreDestroy
public void cleanup() {
stopOapServer();
}
}
// Configuration class
@Configuration
@ConfigurationProperties(prefix = "skywalking.oap")
@Data
public class OapServerConfig {
private boolean enabled = false;
private String configPath = "config/application.yml";
private String storage = "elasticsearch";
private int grpcPort = 11800;
private int restPort = 12800;
// Cluster configuration
private ClusterConfig cluster = new ClusterConfig();
@Data
public static class ClusterConfig {
private boolean enabled = false;
private String type = "standalone";
private String namespace = "";
private List<String> addresses = new ArrayList<>();
}
}
Example 2: Custom OAP Server Configuration
# config/application.yml
cluster:
standalone:
core:
default:
# Rest port
restPort: 12800
# Rest context path
restContextPath: /
# gRPC port
gRPCConfig:
gRPCHost: 0.0.0.0
gRPCPort: 11800
storage:
selector: ${SW_STORAGE:elasticsearch}
elasticsearch:
nameSpace: ${SW_NAMESPACE:""}
clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200}
protocol: ${SW_STORAGE_ES_HTTP_PROTOCOL:"http"}
connectTimeout: ${SW_STORAGE_ES_CONNECT_TIMEOUT:3000}
socketTimeout: ${SW_STORAGE_ES_SOCKET_TIMEOUT:30000}
user: ${SW_ES_USER:""}
password: ${SW_ES_PASSWORD:""}
secretsManagementFile: ${SW_ES_SECRETS_MANAGEMENT_FILE:""}
dayStep: ${SW_STORAGE_DAY_STEP:1} # Represent the number of days in the one minute/hour/day index.
indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:1}
indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:1}
# Super data set has been defined in the codes, such as trace segments.The following 3 config would be improve es performance when storage super size data in es.
superDatasetIndexShardsFactor: ${SW_STORAGE_ES_SUPER_DATASET_INDEX_SHARDS_FACTOR:5}
superDatasetIndexReplicasNumber: ${SW_STORAGE_ES_SUPER_DATASET_INDEX_REPLICAS_NUMBER:0}
# Batch process setting, refer to https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html
bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the bulk every 1000 requests
flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 10 seconds whatever the number of requests
concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests
resultWindowMaxSize: ${SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE:10000}
metadataQueryMaxSize: ${SW_STORAGE_ES_QUERY_MAX_SIZE:5000}
segmentQueryMaxSize: ${SW_STORAGE_ES_QUERY_SEGMENT_SIZE:200}
receiver-sharing-server:
selector: ${SW_RECEIVER_SHARING_SERVER:default}
default:
restHost: ${SW_RECEIVER_RS_REST_HOST:0.0.0.0}
restPort: ${SW_RECEIVER_RS_REST_PORT:12800}
restContextPath: ${SW_RECEIVER_RS_REST_CONTEXT_PATH:/}
restMinThreads: ${SW_RECEIVER_RS_REST_JETTY_MIN_THREADS:1}
restMaxThreads: ${SW_RECEIVER_RS_REST_JETTY_MAX_THREADS:200}
restIdleTimeOut: ${SW_RECEIVER_RS_REST_JETTY_IDLE_TIMEOUT:30000}
restAcceptorPriorityDelta: ${SW_RECEIVER_RS_REST_JETTY_DELTA:0}
restAcceptQueueSize: ${SW_RECEIVER_RS_REST_JETTY_QUEUE_SIZE:0}
gRPCConfig:
gRPCHost: ${SW_RECEIVER_RS_GRPC_HOST:0.0.0.0}
gRPCPort: ${SW_RECEIVER_RS_GRPC_PORT:11800}
maxConcurrentCallsPerConnection: ${SW_RECEIVER_RS_GRPC_MAX_CONCURRENT_CALL:0}
maxMessageSize: ${SW_RECEIVER_RS_GRPC_MAX_MESSAGE_SIZE:10485760}
gRPCThreadPoolQueueSize: ${SW_RECEIVER_RS_GRPC_POOL_QUEUE_SIZE:0}
gRPCThreadPoolSize: ${SW_RECEIVER_RS_GRPC_THREAD_POOL_SIZE:0}
authentication: ${SW_RECEIVER_RS_GRPC_AUTHENTICATION:""}
receiver-trace:
selector: ${SW_RECEIVER_TRACE:default}
default:
sampleRate: ${SW_TRACE_SAMPLE_RATE:10000} # The sample rate precision is 1/10000. 10000 means 100% sample in default.
slowDBAccessThreshold: ${SW_SLOW_DB_THRESHOLD:default:200} # The slow database access thresholds. Unit ms.
receiver-jvm:
selector: ${SW_RECEIVER_JVM:default}
default:
receiver-clr:
selector: ${SW_RECEIVER_CLR:default}
default:
receiver-profile:
selector: ${SW_RECEIVER_PROFILE:default}
default:
receiver-meter:
selector: ${SW_RECEIVER_METER:default}
default:
receiver-event:
selector: ${SW_RECEIVER_EVENT:default}
default:
receiver-istio-telemetry:
selector: ${SW_RECEIVER_ISTIO_TELEMETRY:default}
default:
receiver-envoy-metrics:
selector: ${SW_RECEIVER_ENVOY_METRICS:default}
default:
receiver-zabbix:
selector: ${SW_RECEIVER_ZABBIX:default}
default:
receiver-ebpf-profiling:
selector: ${SW_RECEIVER_EBPF_PROFILING:default}
default:
query:
selector: ${SW_QUERY:graphql}
graphql:
path: ${SW_QUERY_GRAPHQL_PATH:/graphql}
# The tls settings for the server. If the certificate and key files are not set, the server will start without tls.
# ssl:
# keyPath: /path/to/key
# certChainPath: /path/to/cert
# trustedCAPath: /path/to/ca
profile:
selector: ${SW_PROFILE:default}
default:
# The maximum parallel monitor count
maxParallel: ${SW_PROFILE_MAX_PARALLEL:5}
# The monitor duration (minutes)
duration: ${SW_PROFILE_DURATION:10}
# The maximum monitor count in dump period
dumpMaxPeriod: ${SW_PROFILE_DUMP_PERIOD:10}
# The maximum sampling count in snapshot.
maxSamplingCount: ${SW_PROFILE_MAX_SAMPLING_COUNT:500}
# The configuration of the alarm.
alarm:
selector: ${SW_ALARM:default}
default:
# The alarm rules.
rules:
# Rule unique name, must be ended with `_rule`.
endpoint_percent_rule:
# Metrics value need to be long, double or int
metrics-name: endpoint_percent
op: ">"
threshold: 75
# The length of time to evaluate the metrics
period: 10
# How many times after the metrics match the condition, will trigger alarm
count: 3
# How many times of checks, the alarm keeps silence after alarm triggered, default as same as period.
silence-period: 10
message: Successful rate of endpoint {name} is lower than 75%
Custom Telemetry Data Processing
Example 3: Custom Receiver and Handler
// Custom telemetry data receiver
@Component
@Slf4j
public class CustomTelemetryReceiver {
private final MeterSystem meterSystem;
private final SourceReceiver sourceReceiver;
private final List<ISource> sourceHandlers = new ArrayList<>();
public CustomTelemetryReceiver(MeterSystem meterSystem,
SourceReceiver sourceReceiver) {
this.meterSystem = meterSystem;
this.sourceReceiver = sourceReceiver;
setupCustomMetrics();
registerSourceHandlers();
}
private void setupCustomMetrics() {
// Define custom metrics
meterSystem.create("custom_business_metric", "count",
Meter.Id.newBuilder()
.setName("custom_business_metric")
.setTag(new Tag("service", "business-service"))
.build());
meterSystem.create("custom_throughput", "count/min",
Meter.Id.newBuilder()
.setName("custom_throughput")
.setTag(new Tag("operation", "process"))
.build());
}
private void registerSourceHandlers() {
// Register custom source handlers
sourceHandlers.add(new BusinessMetricsSource());
sourceHandlers.add(new CustomTraceHandler());
sourceHandlers.forEach(handler ->
sourceReceiver.registerHandler(handler.getSource(), handler));
}
public void receiveBusinessMetric(String serviceName, String operation,
double value, Map<String, String> tags) {
try {
BusinessMetric metric = BusinessMetric.newBuilder()
.setService(serviceName)
.setOperation(operation)
.setValue(value)
.setTimestamp(System.currentTimeMillis())
.putAllTags(tags)
.build();
sourceReceiver.receive(metric);
} catch (Exception e) {
log.error("Failed to receive business metric", e);
}
}
public void receiveCustomTrace(String traceId, String serviceName,
String operation, long duration,
boolean success) {
try {
CustomTrace trace = CustomTrace.newBuilder()
.setTraceId(traceId)
.setService(serviceName)
.setOperation(operation)
.setDuration(duration)
.setSuccess(success)
.setTimestamp(System.currentTimeMillis())
.build();
sourceReceiver.receive(trace);
} catch (Exception e) {
log.error("Failed to receive custom trace", e);
}
}
}
// Custom source implementation
@Component
@Slf4j
public class BusinessMetricsSource implements ISource {
private static final String SOURCE_NAME = "BusinessMetrics";
@Override
public String getSource() {
return SOURCE_NAME;
}
@Override
public void handle(ISource source) {
if (source instanceof BusinessMetric) {
processBusinessMetric((BusinessMetric) source);
}
}
private void processBusinessMetric(BusinessMetric metric) {
try {
// Process and aggregate business metrics
String serviceName = metric.getService();
String operation = metric.getOperation();
double value = metric.getValue();
// Update meter system
MeterSystem meterSystem = ModuleManager.INSTANCE.find(CoreModule.NAME)
.provider().getService(MeterSystem.class);
if (meterSystem != null) {
Meter.Id meterId = Meter.Id.newBuilder()
.setName("business_metric_" + operation)
.setTag(new Tag("service", serviceName))
.setTag(new Tag("operation", operation))
.build();
meterSystem.record(meterId, value);
}
log.debug("Processed business metric: {} - {} = {}",
serviceName, operation, value);
} catch (Exception e) {
log.error("Failed to process business metric", e);
}
}
@Override
public void handle(List<ISource> sources) {
sources.forEach(this::handle);
}
}
// Custom trace handler
@Component
@Slf4j
public class CustomTraceHandler implements ISource {
private static final String SOURCE_NAME = "CustomTrace";
@Override
public String getSource() {
return SOURCE_NAME;
}
@Override
public void handle(ISource source) {
if (source instanceof CustomTrace) {
processCustomTrace((CustomTrace) source);
}
}
private void processCustomTrace(CustomTrace trace) {
try {
// Process custom trace data
String traceId = trace.getTraceId();
String serviceName = trace.getService();
String operation = trace.getOperation();
long duration = trace.getDuration();
boolean success = trace.getSuccess();
// Convert to SkyWalking segment if needed
if (shouldConvertToSegment(trace)) {
convertToSegment(trace);
}
// Update trace analysis
updateTraceAnalysis(trace);
log.debug("Processed custom trace: {} - {} ({}ms) - {}",
serviceName, operation, duration, success ? "SUCCESS" : "FAILED");
} catch (Exception e) {
log.error("Failed to process custom trace", e);
}
}
private boolean shouldConvertToSegment(CustomTrace trace) {
return trace.getDuration() > 1000; // Convert slow traces to segments
}
private void convertToSegment(CustomTrace trace) {
// Implementation to convert custom trace to SkyWalking segment
// This would involve creating segment objects and spans
}
private void updateTraceAnalysis(CustomTrace trace) {
// Update trace analysis metrics
TraceAnalysis analysis = TraceAnalysis.getInstance();
analysis.analyzeCustomTrace(trace);
}
@Override
public void handle(List<ISource> sources) {
sources.forEach(this::handle);
}
}
// Protobuf definitions for custom data types
class BusinessMetric implements ISource {
private final String service;
private final String operation;
private final double value;
private final long timestamp;
private final Map<String, String> tags;
// Builder pattern implementation
public static Builder newBuilder() {
return new Builder();
}
// Getters and builder class...
public static class Builder {
private String service;
private String operation;
private double value;
private long timestamp;
private Map<String, String> tags = new HashMap<>();
public Builder setService(String service) {
this.service = service;
return this;
}
public Builder setOperation(String operation) {
this.operation = operation;
return this;
}
public Builder setValue(double value) {
this.value = value;
return this;
}
public Builder setTimestamp(long timestamp) {
this.timestamp = timestamp;
return this;
}
public Builder putTag(String key, String value) {
this.tags.put(key, value);
return this;
}
public Builder putAllTags(Map<String, String> tags) {
this.tags.putAll(tags);
return this;
}
public BusinessMetric build() {
return new BusinessMetric(service, operation, value, timestamp, tags);
}
}
}
Custom Analysis and Aggregation
Example 4: Custom Metrics Analysis
@Component
@Slf4j
public class BusinessMetricsAnalyzer {
private final MeterSystem meterSystem;
private final Map<String, BusinessMetricStats> metricStats = new ConcurrentHashMap<>();
public BusinessMetricsAnalyzer(MeterSystem meterSystem) {
this.meterSystem = meterSystem;
setupMetricsAggregation();
}
private void setupMetricsAggregation() {
// Define metrics aggregation rules
meterSystem.register("business_operation_count", "count",
BusinessMetricsAnalyzer::aggregateOperationCount);
meterSystem.register("business_operation_duration", "histogram",
BusinessMetricsAnalyzer::aggregateOperationDuration);
meterSystem.register("business_error_rate", "percentage",
BusinessMetricsAnalyzer::aggregateErrorRate);
}
public void analyzeBusinessOperation(String serviceName, String operation,
long duration, boolean success) {
String key = serviceName + ":" + operation;
BusinessMetricStats stats = metricStats.computeIfAbsent(key,
k -> new BusinessMetricStats());
stats.recordOperation(duration, success);
// Update meter system
updateMeterSystem(serviceName, operation, stats);
// Check for anomalies
checkForAnomalies(serviceName, operation, stats);
}
private void updateMeterSystem(String serviceName, String operation,
BusinessMetricStats stats) {
// Update count metric
Meter.Id countId = Meter.Id.newBuilder()
.setName("business_operation_count")
.setTag(new Tag("service", serviceName))
.setTag(new Tag("operation", operation))
.build();
meterSystem.record(countId, stats.getTotalCount());
// Update duration metric
Meter.Id durationId = Meter.Id.newBuilder()
.setName("business_operation_duration")
.setTag(new Tag("service", serviceName))
.setTag(new Tag("operation", operation))
.build();
meterSystem.record(durationId, stats.getAverageDuration());
// Update error rate metric
Meter.Id errorRateId = Meter.Id.newBuilder()
.setName("business_error_rate")
.setTag(new Tag("service", serviceName))
.setTag(new Tag("operation", operation))
.build();
meterSystem.record(errorRateId, stats.getErrorRate());
}
private void checkForAnomalies(String serviceName, String operation,
BusinessMetricStats stats) {
// Check for performance degradation
if (stats.getAverageDuration() > stats.getBaselineDuration() * 1.5) {
log.warn("Performance degradation detected for {}:{} - average duration: {}ms",
serviceName, operation, stats.getAverageDuration());
// Trigger alert
triggerPerformanceAlert(serviceName, operation, stats);
}
// Check for error rate increase
if (stats.getErrorRate() > 0.1) { // 10% error rate threshold
log.warn("High error rate detected for {}:{} - error rate: {}%",
serviceName, operation, stats.getErrorRate() * 100);
// Trigger alert
triggerErrorRateAlert(serviceName, operation, stats);
}
}
private void triggerPerformanceAlert(String serviceName, String operation,
BusinessMetricStats stats) {
// Implementation to trigger performance alerts
AlertService alertService = ModuleManager.INSTANCE.find(CoreModule.NAME)
.provider().getService(AlertService.class);
if (alertService != null) {
Alert alert = Alert.newBuilder()
.setScope(AlertScope.Service)
.setTarget(serviceName)
.setCondition("performance_degradation")
.setMessage(String.format(
"Performance degradation detected for operation %s in service %s. " +
"Average duration: %dms (baseline: %dms)",
operation, serviceName, stats.getAverageDuration(), stats.getBaselineDuration()))
.setTimestamp(System.currentTimeMillis())
.build();
alertService.trigger(alert);
}
}
private void triggerErrorRateAlert(String serviceName, String operation,
BusinessMetricStats stats) {
// Implementation to trigger error rate alerts
AlertService alertService = ModuleManager.INSTANCE.find(CoreModule.NAME)
.provider().getService(AlertService.class);
if (alertService != null) {
Alert alert = Alert.newBuilder()
.setScope(AlertScope.Service)
.setTarget(serviceName)
.setCondition("high_error_rate")
.setMessage(String.format(
"High error rate detected for operation %s in service %s. " +
"Error rate: %.2f%%",
operation, serviceName, stats.getErrorRate() * 100))
.setTimestamp(System.currentTimeMillis())
.build();
alertService.trigger(alert);
}
}
// Static aggregation methods for meter system
private static double aggregateOperationCount(List<MeterData> data) {
return data.stream().mapToDouble(MeterData::getValue).sum();
}
private static double aggregateOperationDuration(List<MeterData> data) {
return data.stream().mapToDouble(MeterData::getValue).average().orElse(0.0);
}
private static double aggregateErrorRate(List<MeterData> data) {
long total = (long) data.stream().filter(d -> d.getTags().containsKey("total")).findFirst()
.map(MeterData::getValue).orElse(0.0);
long errors = (long) data.stream().filter(d -> d.getTags().containsKey("errors")).findFirst()
.map(MeterData::getValue).orElse(0.0);
return total > 0 ? (double) errors / total : 0.0;
}
}
// Statistics holder for business metrics
@Data
class BusinessMetricStats {
private long totalCount = 0;
private long errorCount = 0;
private long totalDuration = 0;
private long baselineDuration = 1000; // 1 second baseline
private long windowStartTime = System.currentTimeMillis();
private static final long WINDOW_DURATION = 5 * 60 * 1000; // 5 minutes
public void recordOperation(long duration, boolean success) {
totalCount++;
totalDuration += duration;
if (!success) {
errorCount++;
}
// Reset window if expired
if (System.currentTimeMillis() - windowStartTime > WINDOW_DURATION) {
resetWindow();
}
}
public double getAverageDuration() {
return totalCount > 0 ? (double) totalDuration / totalCount : 0.0;
}
public double getErrorRate() {
return totalCount > 0 ? (double) errorCount / totalCount : 0.0;
}
private void resetWindow() {
totalCount = 0;
errorCount = 0;
totalDuration = 0;
windowStartTime = System.currentTimeMillis();
// Update baseline based on previous window
baselineDuration = Math.max(100, (long) (getAverageDuration() * 0.8));
}
}
Custom Storage Implementation
Example 5: Custom Storage Plugin
@Component
@Slf4j
public class CustomStoragePlugin implements StoragePlugin {
private static final String STORAGE_NAME = "custom-storage";
private CustomStorageClient storageClient;
private StorageConfig config;
@Override
public String name() {
return STORAGE_NAME;
}
@Override
public void prepare() throws StorageException {
log.info("Preparing custom storage plugin");
try {
this.config = getModuleConfig();
this.storageClient = new CustomStorageClient(config);
storageClient.connect();
log.info("Custom storage plugin prepared successfully");
} catch (Exception e) {
throw new StorageException("Failed to prepare custom storage", e);
}
}
@Override
public void start() throws StorageException {
log.info("Starting custom storage plugin");
try {
storageClient.initialize();
createRequiredIndices();
log.info("Custom storage plugin started successfully");
} catch (Exception e) {
throw new StorageException("Failed to start custom storage", e);
}
}
@Override
public void stop() throws StorageException {
log.info("Stopping custom storage plugin");
try {
if (storageClient != null) {
storageClient.close();
}
log.info("Custom storage plugin stopped successfully");
} catch (Exception e) {
throw new StorageException("Failed to stop custom storage", e);
}
}
@Override
public IRawMetricsQuery rawMetricsQuery() {
return new CustomRawMetricsQuery(storageClient);
}
@Override
public ITraceQuery traceQuery() {
return new CustomTraceQuery(storageClient);
}
@Override
public ILogQuery logQuery() {
return new CustomLogQuery(storageClient);
}
@Override
public IMetricsQuery metricsQuery() {
return new CustomMetricsQuery(storageClient);
}
@Override
public IMetadataQuery metadataQuery() {
return new CustomMetadataQuery(storageClient);
}
@Override
public IAlarmQuery alarmQuery() {
return new CustomAlarmQuery(storageClient);
}
@Override
public IBrowserLogQuery browserLogQuery() {
return new CustomBrowserLogQuery(storageClient);
}
@Override
public IEventQuery eventQuery() {
return new CustomEventQuery(storageClient);
}
@Override
public IProfileTaskQuery profileTaskQuery() {
return new CustomProfileTaskQuery(storageClient);
}
@Override
public IProfileThreadSnapshotQuery profileThreadSnapshotQuery() {
return new CustomProfileThreadSnapshotQuery(storageClient);
}
@Override
public IProfileAnalyzeQuery profileAnalyzeQuery() {
return new CustomProfileAnalyzeQuery(storageClient);
}
private StorageConfig getModuleConfig() {
ModuleProvider provider = ModuleManager.INSTANCE.find(StorageModule.NAME)
.getModuleProvider();
return provider.newConfigCreator().create();
}
private void createRequiredIndices() {
// Create required indices for SkyWalking data
storageClient.createIndex("service_traffic", getServiceTrafficMapping());
storageClient.createIndex("endpoint_traffic", getEndpointTrafficMapping());
storageClient.createIndex("service_instance_traffic", getServiceInstanceTrafficMapping());
storageClient.createIndex("segment", getSegmentMapping());
storageClient.createIndex("log", getLogMapping());
storageClient.createIndex("metrics", getMetricsMapping());
log.info("Created required indices for SkyWalking data");
}
private Map<String, Object> getServiceTrafficMapping() {
return Map.of(
"properties", Map.of(
"service_id", Map.of("type", "keyword"),
"name", Map.of("type", "keyword"),
"timestamp", Map.of("type", "long"),
"call_type", Map.of("type", "keyword"),
"latency", Map.of("type", "long"),
"status", Map.of("type", "boolean")
)
);
}
// Other mapping definitions...
}
// Custom storage client implementation
@Slf4j
public class CustomStorageClient {
private final StorageConfig config;
private boolean connected = false;
public CustomStorageClient(StorageConfig config) {
this.config = config;
}
public void connect() throws StorageException {
try {
// Implementation to connect to custom storage
// This could be a database, file system, or other storage system
log.info("Connected to custom storage: {}", config.getProperties());
connected = true;
} catch (Exception e) {
throw new StorageException("Failed to connect to custom storage", e);
}
}
public void initialize() throws StorageException {
if (!connected) {
throw new StorageException("Storage client not connected");
}
try {
// Initialize storage schema, tables, etc.
log.info("Initialized custom storage");
} catch (Exception e) {
throw new StorageException("Failed to initialize custom storage", e);
}
}
public void createIndex(String indexName, Map<String, Object> mapping) {
// Implementation to create index with mapping
log.debug("Creating index: {} with mapping: {}", indexName, mapping);
}
public void saveMetrics(String indexName, List<MetricData> metrics) {
// Implementation to save metrics data
metrics.forEach(metric ->
log.debug("Saving metric to {}: {}", indexName, metric));
}
public void saveTraces(String indexName, List<TraceData> traces) {
// Implementation to save trace data
traces.forEach(trace ->
log.debug("Saving trace to {}: {}", indexName, trace));
}
public void saveLogs(String indexName, List<LogData> logs) {
// Implementation to save log data
logs.forEach(log ->
log.debug("Saving log to {}: {}", indexName, log));
}
public List<MetricData> queryMetrics(String indexName, MetricQuery query) {
// Implementation to query metrics data
log.debug("Querying metrics from {}: {}", indexName, query);
return List.of();
}
public List<TraceData> queryTraces(String indexName, TraceQuery query) {
// Implementation to query trace data
log.debug("Querying traces from {}: {}", indexName, query);
return List.of();
}
public void close() throws StorageException {
try {
// Implementation to close storage connection
connected = false;
log.info("Closed custom storage connection");
} catch (Exception e) {
throw new StorageException("Failed to close custom storage", e);
}
}
}
Integration with Spring Boot
Example 6: Spring Boot Integration
@Configuration
@EnableConfigurationProperties(SkyWalkingProperties.class)
@Slf4j
public class SkyWalkingAutoConfiguration {
@Bean
@ConditionalOnProperty(name = "skywalking.enabled", havingValue = "true")
public EmbeddedOapServer embeddedOapServer(SkyWalkingProperties properties) {
return new EmbeddedOapServer(properties);
}
@Bean
@ConditionalOnMissingBean
public CustomTelemetryReceiver customTelemetryReceiver(MeterSystem meterSystem,
SourceReceiver sourceReceiver) {
return new CustomTelemetryReceiver(meterSystem, sourceReceiver);
}
@Bean
@ConditionalOnMissingBean
public BusinessMetricsAnalyzer businessMetricsAnalyzer(MeterSystem meterSystem) {
return new BusinessMetricsAnalyzer(meterSystem);
}
@Bean
@ConditionalOnMissingBean
public SkyWalkingTracingFilter skyWalkingTracingFilter() {
return new SkyWalkingTracingFilter();
}
}
// Configuration properties
@ConfigurationProperties(prefix = "skywalking")
@Data
public class SkyWalkingProperties {
private boolean enabled = false;
private OapServer oap = new OapServer();
private Agent agent = new Agent();
private Storage storage = new Storage();
@Data
public static class OapServer {
private String configFile = "config/application.yml";
private String storageType = "elasticsearch";
private int grpcPort = 11800;
private int restPort = 12800;
private Cluster cluster = new Cluster();
}
@Data
public static class Agent {
private String serviceName = "default-service";
private String backendService = "127.0.0.1:11800";
private boolean enableLogging = true;
private int loggingBufferSize = 10000;
}
@Data
public static class Storage {
private String type = "elasticsearch";
private Elasticsearch elasticsearch = new Elasticsearch();
private H2 h2 = new H2();
}
@Data
public static class Elasticsearch {
private String clusterNodes = "localhost:9200";
private String protocol = "http";
private String username = "";
private String password = "";
private int indexShardsNumber = 1;
private int indexReplicasNumber = 1;
}
@Data
public static class H2 {
private String url = "jdbc:h2:mem:skywalking-oap-db";
private String user = "sa";
private String password = "";
}
@Data
public static class Cluster {
private boolean enabled = false;
private String type = "standalone";
private List<String> addresses = new ArrayList<>();
}
}
// REST Controller for custom metrics
@RestController
@RequestMapping("/api/skywalking")
@Slf4j
public class SkyWalkingMetricsController {
private final CustomTelemetryReceiver telemetryReceiver;
private final BusinessMetricsAnalyzer metricsAnalyzer;
public SkyWalkingMetricsController(CustomTelemetryReceiver telemetryReceiver,
BusinessMetricsAnalyzer metricsAnalyzer) {
this.telemetryReceiver = telemetryReceiver;
this.metricsAnalyzer = metricsAnalyzer;
}
@PostMapping("/business-metric")
public ResponseEntity<String> recordBusinessMetric(@RequestBody BusinessMetricRequest request) {
try {
telemetryReceiver.receiveBusinessMetric(
request.getServiceName(),
request.getOperation(),
request.getValue(),
request.getTags()
);
return ResponseEntity.ok("Business metric recorded successfully");
} catch (Exception e) {
log.error("Failed to record business metric", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body("Failed to record business metric: " + e.getMessage());
}
}
@PostMapping("/business-operation")
public ResponseEntity<String> recordBusinessOperation(@RequestBody BusinessOperationRequest request) {
try {
metricsAnalyzer.analyzeBusinessOperation(
request.getServiceName(),
request.getOperation(),
request.getDuration(),
request.isSuccess()
);
return ResponseEntity.ok("Business operation recorded successfully");
} catch (Exception e) {
log.error("Failed to record business operation", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body("Failed to record business operation: " + e.getMessage());
}
}
@GetMapping("/metrics")
public ResponseEntity<Map<String, Object>> getCustomMetrics() {
try {
Map<String, Object> metrics = new HashMap<>();
// Add custom metrics data
metrics.put("timestamp", System.currentTimeMillis());
metrics.put("custom_metrics", getCurrentMetrics());
return ResponseEntity.ok(metrics);
} catch (Exception e) {
log.error("Failed to get custom metrics", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}
private Map<String, Object> getCurrentMetrics() {
// Implementation to get current custom metrics
return Map.of(
"business_operations_total", 1500,
"average_operation_duration", 245,
"error_rate", 0.02
);
}
}
@Data
class BusinessMetricRequest {
private String serviceName;
private String operation;
private double value;
private Map<String, String> tags = new HashMap<>();
}
@Data
class BusinessOperationRequest {
private String serviceName;
private String operation;
private long duration;
private boolean success;
}
Monitoring and Management
Example 7: OAP Server Monitoring
@Component
@Slf4j
public class OapServerMonitor {
private final MeterRegistry meterRegistry;
private final Gauge moduleStatusGauge;
private final Counter receivedDataCounter;
private final Timer processingTimer;
public OapServerMonitor(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.moduleStatusGauge = Gauge.builder("oap.server.modules.status")
.description("Status of OAP server modules")
.register(meterRegistry);
this.receivedDataCounter = Counter.builder("oap.server.data.received")
.description("Amount of data received by OAP server")
.register(meterRegistry);
this.processingTimer = Timer.builder("oap.server.processing.duration")
.description("Data processing duration in OAP server")
.register(meterRegistry);
startMonitoring();
}
private void startMonitoring() {
// Monitor module status
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(this::checkModuleStatus, 0, 30, TimeUnit.SECONDS);
// Monitor data processing
scheduler.scheduleAtFixedRate(this::monitorDataProcessing, 0, 10, TimeUnit.SECONDS);
}
private void checkModuleStatus() {
try {
ModuleManager moduleManager = ModuleManager.INSTANCE;
if (moduleManager != null && moduleManager.isInited()) {
int activeModules = moduleManager.getLoadedModules().size();
moduleStatusGauge.set(activeModules);
} else {
moduleStatusGauge.set(0);
}
} catch (Exception e) {
log.warn("Failed to check module status", e);
moduleStatusGauge.set(-1);
}
}
private void monitorDataProcessing() {
try {
// Monitor receiver status
ReceiverManager receiverManager = ReceiverManager.INSTANCE;
if (receiverManager != null) {
long receivedCount = receiverManager.getReceivedCount();
receivedDataCounter.increment(receivedCount);
}
// Monitor processing performance
Timer.Sample sample = Timer.start(meterRegistry);
// Simulate processing check
Thread.sleep(100);
sample.stop(processingTimer);
} catch (Exception e) {
log.warn("Failed to monitor data processing", e);
}
}
@EventListener
public void handleOapEvent(OapServerEvent event) {
switch (event.getType()) {
case STARTED:
log.info("OAP Server started - updating metrics");
moduleStatusGauge.set(1);
break;
case STOPPED:
log.info("OAP Server stopped - updating metrics");
moduleStatusGauge.set(0);
break;
case ERROR:
log.error("OAP Server error - updating metrics");
moduleStatusGauge.set(-1);
break;
}
}
}
// Health indicator for OAP server
@Component
@Slf4j
public class OapServerHealthIndicator implements HealthIndicator {
@Override
public Health health() {
try {
ModuleManager moduleManager = ModuleManager.INSTANCE;
if (moduleManager == null || !moduleManager.isInited()) {
return Health.down()
.withDetail("reason", "Module manager not initialized")
.build();
}
Map<String, Object> details = new HashMap<>();
details.put("loadedModules", moduleManager.getLoadedModules().size());
details.put("initialized", moduleManager.isInited());
details.put("timestamp", System.currentTimeMillis());
// Check storage connectivity
boolean storageConnected = checkStorageConnectivity();
details.put("storageConnected", storageConnected);
if (!storageConnected) {
return Health.down()
.withDetails(details)
.withDetail("storageError", "Storage connection failed")
.build();
}
return Health.up().withDetails(details).build();
} catch (Exception e) {
log.error("OAP Server health check failed", e);
return Health.down(e).build();
}
}
private boolean checkStorageConnectivity() {
try {
StorageModule storageModule = ModuleManager.INSTANCE.find(StorageModule.NAME);
if (storageModule != null) {
IStorageDAO storageDAO = storageModule.getService(IStorageDAO.class);
return storageDAO != null && storageDAO.isConnected();
}
return false;
} catch (Exception e) {
log.warn("Storage connectivity check failed", e);
return false;
}
}
}
Testing SkyWalking OAP Integration
Example 8: Integration Testing
@SpringBootTest
@Testcontainers
@Slf4j
public class SkyWalkingOapIntegrationTest {
@Container
private static final GenericContainer<?> elasticsearch =
new GenericContainer<>("docker.elastic.co/elasticsearch/elasticsearch:7.17.0")
.withEnv("discovery.type", "single-node")
.withEnv("xpack.security.enabled", "false")
.withExposedPorts(9200);
@Container
private static final GenericContainer<?> oapServer =
new GenericContainer<>("apache/skywalking-oap-server:9.4.0")
.withEnv("SW_STORAGE", "elasticsearch")
.withEnv("SW_STORAGE_ES_CLUSTER_NODES",
elasticsearch.getHost() + ":" + elasticsearch.getMappedPort(9200))
.withExposedPorts(12800, 11800)
.dependsOn(elasticsearch);
@Autowired
private CustomTelemetryReceiver telemetryReceiver;
@Autowired
private BusinessMetricsAnalyzer metricsAnalyzer;
@Test
void testBusinessMetricsCollection() {
// Given
String serviceName = "test-service";
String operation = "test-operation";
double metricValue = 42.5;
// When
telemetryReceiver.receiveBusinessMetric(serviceName, operation, metricValue,
Map.of("environment", "test"));
// Then - verify metric was processed
await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> {
// Verify metric was stored or processed
// This would involve querying the storage or checking internal state
});
}
@Test
void testTraceProcessing() {
// Given
String traceId = UUID.randomUUID().toString();
String serviceName = "test-service";
String operation = "test-operation";
long duration = 150L;
// When
telemetryReceiver.receiveCustomTrace(traceId, serviceName, operation, duration, true);
// Then - verify trace was processed
await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> {
// Verify trace was stored or analyzed
});
}
@Test
void testPerformanceAlerting() {
// Given - simulate slow operations
String serviceName = "alert-service";
String operation = "slow-operation";
// When - record slow operations
for (int i = 0; i < 10; i++) {
metricsAnalyzer.analyzeBusinessOperation(serviceName, operation, 2000L, true);
}
// Then - verify alert was triggered
await().atMost(15, TimeUnit.SECONDS)
.untilAsserted(() -> {
// Check if performance alert was triggered
});
}
}
Best Practices
Configuration Management
# application-skywalking.yml
skywalking:
enabled: true
oap:
config-file: "classpath:skywalking/application.yml"
storage-type: "elasticsearch"
cluster:
enabled: false
agent:
service-name: "${spring.application.name:unknown-service}"
backend-service: "localhost:11800"
enable-logging: true
storage:
elasticsearch:
cluster-nodes: "localhost:9200"
username: ""
password: ""
index-shards-number: 2
index-replicas-number: 1
# Custom business metrics configuration
business-metrics:
enabled: true
retention-days: 30
alert-thresholds:
performance-degradation: 1.5 # 50% increase from baseline
error-rate: 0.1 # 10% error rate
throughput-drop: 0.5 # 50% drop in throughput
Conclusion
SkyWalking OAP Server provides a powerful platform for distributed tracing and application performance monitoring. Key integration points include:
Core Integration Areas:
- Custom Telemetry Data: Extend beyond standard traces and metrics
- Business Metrics: Track domain-specific performance indicators
- Custom Storage: Support for alternative storage backends
- Advanced Analysis: Implement custom aggregation and alerting
Implementation Patterns:
- Embedded OAP Server: Run OAP within your application
- Custom Receivers: Process domain-specific telemetry data
- Business Metrics: Track application-specific KPIs
- Custom Storage: Integrate with preferred storage systems
- Spring Integration: Seamless Spring Boot configuration
Operational Considerations:
- Monitor OAP server health and performance
- Configure appropriate retention policies
- Set up meaningful alerts and dashboards
- Test thoroughly with realistic data volumes
- Plan for scalability and high availability
SkyWalking OAP Server's extensible architecture makes it suitable for complex monitoring scenarios beyond basic APM, enabling comprehensive observability across your entire system landscape.