基于 Pinecone 向量检索构建 SkyWalking 异常链路的语义分析管道


生产环境的告警总是让人措手不及。当一个全新的、描述晦涩的异常抛出时,传统的监控手段往往只能束手无策。我们依赖 SkyWalking 进行链路追踪,它在定位服务间调用关系和性能瓶颈方面表现出色。但面对一个复杂的业务异常,比如由一长串嵌套调用和特定业务数据触发的 NullPointerException,SkyWalking UI 只能告诉我们“哪里”出错了,却无法回答“为什么”,更无法关联到“历史上是否发生过类似的问题”。

通过 serviceNameendpointName 或者 traceId 进行精确查找是有效的,但前提是你已经知道了要找什么。而通过日志标签进行模糊的关键词搜索,在面对成千上万种异常堆栈变体时,效果微乎其微。一个常见的错误是,团队花费数小时去排查一个“新”问题,最后才发现它只是上个月另一个问题的变种,由于日志文本的细微差异而未能被搜索到。问题的核心在于,我们需要一种能理解“语义”而非“字面”的检索能力。

我们的初步构想是,如果能将每一条异常链路的关键信息——例如异常堆栈、关键业务标签——转换成一个数学向量,那么“相似的问题”就变成了“空间中相近的向量”。这正是向量数据库的用武之地。

技术选型决策相对直接:

  1. 链路追踪: 我们已经深度使用 Apache SkyWalking,其 OAP (Observability Analysis Platform) 提供了灵活的插件化机制,特别是 ReceiverExporter,这为我们注入自定义逻辑提供了完美的切入点。我们不需要替换现有系统,只需扩展它。
  2. 向量数据库: Pinecone 是一个成熟的托管方案。在真实项目中,我们没有时间和精力去维护一个自建的向量索引服务,比如 Faiss 或 Milvus。我们需要的是一个稳定、高性能、API 友好的服务,Pinecone 完全符合要求。
  3. 容器化: 整个方案包含一个定制版的 SkyWalking OAP,可能还有一个独立的 Embedding 服务。为了保证开发、测试、生产环境的一致性,并简化部署流程,Docker 和 Docker Compose 是不二之选。

我们的目标是实现一个自定义的 SkyWalking Receiver,它能在 OAP 内部接收到 trace 数据后,实时地将其中的异常信息提取、向量化,并存入 Pinecone。这样,当新的故障发生时,运维人员可以拿着异常信息,通过一个简单的查询工具,在 Pinecone 中找到历史上最相似的 N 个 traceId,从而极大缩短故障根因分析时间。

步骤一:构建自定义 OAP Receiver 项目

首先,我们需要创建一个 Maven 项目来开发自定义的 Receiver。这需要引入 SkyWalking OAP 的核心依赖。这里的坑在于,必须保证依赖的版本与你将要部署的 SkyWalking OAP 版本严格一致,否则会出现各种 ClassNotFound 或 MethodNotFound 的运行时错误。

pom.xml 的核心依赖部分如下:

<properties>
    <maven.compiler.source>11</maven.compiler.source>
    <maven.compiler.target>11</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <!-- 确保此版本与你的目标 OAP 版本一致 -->
    <skywalking.version>9.6.0</skywalking.version>
    <pinecone.version>0.2.0</pinecone.version>
    <!-- embedding模型库,这里选用一个轻量级的Java库作为示例 -->
    <djl.version>0.26.0</djl.version>
</properties>

<dependencies>
    <!-- SkyWalking OAP 核心模块,提供插件化接口 -->
    <dependency>
        <groupId>org.apache.skywalking</groupId>
        <artifactId>oap-server</artifactId>
        <version>${skywalking.version}</version>
        <scope>provided</scope>
    </dependency>

    <!-- Pinecone Java 客户端 -->
    <dependency>
        <groupId>io.pinecone</groupId>
        <artifactId>pinecone-client</artifactId>
        <version>${pinecone.version}</version>
    </dependency>
    
    <!-- 用于文本 Embedding 的库,这里使用 DJL (Deep Java Library) -->
    <!-- 在生产环境中,更推荐调用一个独立的、可扩展的 Embedding 微服务 -->
    <dependency>
        <groupId>ai.djl</groupId>
        <artifactId>api</artifactId>
        <version>${djl.version}</version>
    </dependency>
    <dependency>
        <groupId>ai.djl.sentencepiece</groupId>
        <artifactId>sentencepiece</artifactId>
        <version>0.1.99</version>
    </dependency>
    <dependency>
        <groupId>ai.djl.pytorch</groupId>
        <artifactId>pytorch-engine</artifactId>
        <version>${djl.version}</version>
    </dependency>
    <dependency>
        <groupId>ai.djl.pytorch</groupId>
        <artifactId>pytorch-jni</artifactId>
        <version>2.1.2-0.26.0</version>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>ai.djl.model-zoo</groupId>
        <artifactId>model-zoo</artifactId>
        <version>${djl.version}</version>
    </dependency>

    <!-- 日志库,与OAP保持一致 -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.36</version>
        <scope>provided</scope>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.2.4</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

使用 maven-shade-plugin 是为了将所有依赖(除了 provided 的 SkyWalking 依赖)打包到一个 fat JAR 中,方便后续部署。

步骤二:实现核心处理逻辑

接下来是编写代码。我们需要实现 SkyWalking 的 ModuleProviderReceiverProvider 接口。

1. 模块定义 (PineconeVectorModule.java)

这个类是插件的入口,定义了模块的名称和它提供的服务。

package org.skywalking.custom.receiver.pinecone;

import org.apache.skywalking.oap.server.library.module.ModuleDefine;

public class PineconeVectorModule extends ModuleDefine {
    public static final String NAME = "pinecone-vector";

    public PineconeVectorModule() {
        super(NAME);
    }

    @Override
    public Class<?>[] services() {
        // 这个模块不向外提供服务,所以返回空数组
        return new Class[0];
    }
}

2. 模块提供者 (PineconeVectorModuleProvider.java)

这是模块的启动器,负责初始化配置、连接 Pinecone、加载 Embedding 模型,并启动我们的自定义 Receiver。

package org.skywalking.custom.receiver.pinecone;

import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.receiver.ReceiverCreator;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PineconeVectorModuleProvider extends ModuleProvider {
    private static final Logger logger = LoggerFactory.getLogger(PineconeVectorModuleProvider.class);

    private PineconeVectorReceiverConfig config;
    private PineconeTraceVectorHandler traceVectorHandler;

    @Override
    public String name() {
        return "default";
    }

    @Override
    public Class<PineconeVectorModule> module() {
        return PineconeVectorModule.class;
    }

    @Override
    public ModuleConfig createConfigBeanIfAbsent() {
        this.config = new PineconeVectorReceiverConfig();
        return this.config;
    }

    @Override
    public void prepare() throws ServiceNotProvidedException {
        logger.info("Preparing Pinecone Vector Module.");
        try {
            this.traceVectorHandler = new PineconeTraceVectorHandler(config);
            this.getManager().find(CoreModule.NAME)
                    .provider()
                    .getService(GRPCHandlerRegister.class)
                    .addHandler(traceVectorHandler);
        } catch (Exception e) {
            // 这里的错误处理至关重要,如果初始化失败,必须阻止 OAP 启动
            logger.error("Failed to prepare Pinecone Vector Module.", e);
            throw new ServiceNotProvidedException("Failed to initialize Pinecone client or Embedding model.", e);
        }
    }

    @Override
    public void start() {
        logger.info("Pinecone Vector Module started.");
    }

    @Override
    public void notifyAfterCompleted() {
        // 模块启动完成后的通知
    }

    @Override
    public String[] requiredModules() {
        // 依赖 OAP 的核心模块
        return new String[]{CoreModule.NAME};
    }
}

3. 配置类 (PineconeVectorReceiverConfig.java)

用于从 SkyWalking 的 application.yml 中读取我们的自定义配置。

package org.skywalking.custom.receiver.pinecone;

import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;

@Getter
@Setter
public class PineconeVectorReceiverConfig extends ModuleConfig {
    // Pinecone API Key
    private String apiKey;
    // Pinecone Environment
    private String environment;
    // Pinecone Project ID
    private String projectId;
    // 要写入的 Pinecone 索引名称
    private String indexName;
    // Embedding 模型的名称或路径
    private String embeddingModelName = "sentence-transformers/all-MiniLM-L6-v2";
    // 向量维度,必须与模型和Pinecone索引的配置一致
    private int dimension = 384;
    // 批处理大小,攒够这么多trace再统一处理
    private int batchSize = 10;
    // 哪些服务的异常trace需要被索引
    private String servicesToWatch = ".*"; // 默认监控所有服务
}

4. 核心处理器 (PineconeTraceVectorHandler.java)

这是整个方案最核心的部分。它接收 SkyWalking agent 发送的 TraceSegmentObject,解析其中的 Span,提取异常信息,调用模型生成向量,最后写入 Pinecone。

package org.skywalking.custom.receiver.pinecone;

import ai.djl.Application;
import ai.djl.ModelException;
import ai.djl.inference.Predictor;
import ai.djl.repository.zoo.Criteria;
import ai.djl.repository.zoo.ZooModel;
import ai.djl.training.util.ProgressBar;
import ai.djl.translate.TranslateException;
import com.google.common.collect.Lists;
import io.grpc.stub.StreamObserver;
import io.pinecone.clients.Pinecone;
import io.pinecone.unsigned_indices_model.QueryResponse;
import io.pinecone.unsigned_indices_model.Vector;
import org.apache.skywalking.apm.network.language.agent.v3.SegmentObject;
import org.apache.skywalking.apm.network.language.agent.v3.SpanObject;
import org.apache.skywalking.apm.network.language.agent.v3.Log;
import org.apache.skywalking.apm.network.language.agent.v3.TraceSegmentReportServiceGrpc;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.protobuf.Descriptors;
import com.google.protobuf.GeneratedMessageV3;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

public class PineconeTraceVectorHandler extends TraceSegmentReportServiceGrpc.TraceSegmentReportServiceImplBase implements GRPCHandler {
    private static final Logger logger = LoggerFactory.getLogger(PineconeTraceVectorHandler.class);

    private final Pinecone pinecone;
    private final ZooModel<String, float[]> embeddingModel;
    private final Predictor<String, float[]> predictor;
    private final PineconeVectorReceiverConfig config;
    private final Pattern servicePattern;

    // 使用阻塞队列作为缓冲区,实现异步批处理,避免阻塞gRPC线程
    private final BlockingQueue<TraceInfo> traceQueue = new LinkedBlockingQueue<>(10000);

    public PineconeTraceVectorHandler(PineconeVectorReceiverConfig config) throws ModelException, IOException {
        this.config = config;
        this.servicePattern = Pattern.compile(config.getServicesToWatch());

        logger.info("Initializing Pinecone client with environment: {}", config.getEnvironment());
        this.pinecone = new Pinecone.Builder(config.getApiKey())
            .withEnvironment(config.getEnvironment())
            .withProjectName(config.getProjectId())
            .build();
        
        logger.info("Loading embedding model: {}", config.getEmbeddingModelName());
        Criteria<String, float[]> criteria = Criteria.builder()
                .optApplication(Application.NLP.TEXT_EMBEDDING)
                .setTypes(String.class, float[].class)
                .optModelUrls("djl://ai.djl.huggingface.pytorch/" + config.getEmbeddingModelName())
                .optProgress(new ProgressBar())
                .build();
        this.embeddingModel = criteria.loadModel();
        this.predictor = embeddingModel.newPredictor();

        // 启动一个后台线程来消费队列中的数据
        Executors.newSingleThreadExecutor().submit(this::processQueue);
        logger.info("Pinecone trace vector handler initialized successfully.");
    }
    
    @Override
    public StreamObserver<SegmentObject> collect(StreamObserver<GeneratedMessageV3> responseObserver) {
        return new StreamObserver<>() {
            @Override
            public void onNext(SegmentObject segment) {
                // 只处理我们关心的服务的trace
                if (!servicePattern.matcher(segment.getService()).matches()) {
                    return;
                }

                // 筛选出包含异常的Trace
                boolean hasError = segment.getSpansList().stream().anyMatch(SpanObject::getIsError);
                if (hasError) {
                    try {
                        traceQueue.put(new TraceInfo(segment.getTraceId(), segment));
                    } catch (InterruptedException e) {
                        logger.error("Failed to add trace to queue.", e);
                        Thread.currentThread().interrupt();
                    }
                }
            }

            @Override
            public void onError(Throwable throwable) {
                logger.error("Error in receiving trace segments", throwable);
            }

            @Override
            public void onCompleted() {
                responseObserver.onNext(com.google.protobuf.Empty.getDefaultInstance());
                responseObserver.onCompleted();
            }
        };
    }

    private void processQueue() {
        List<TraceInfo> batch = Lists.newArrayListWithCapacity(config.getBatchSize());
        while (!Thread.currentThread().isInterrupted()) {
            try {
                // 批量从队列中获取数据
                int drained = traceQueue.drainTo(batch, config.getBatchSize());
                if (drained == 0) {
                    // 如果队列为空,等待一小段时间
                    TimeUnit.SECONDS.sleep(1);
                    continue;
                }
                
                if (!batch.isEmpty()) {
                    processBatch(batch);
                    batch.clear();
                }
            } catch (InterruptedException e) {
                logger.error("Queue processing thread interrupted.", e);
                Thread.currentThread().interrupt();
            } catch (Exception e) {
                // 捕获所有异常,防止线程意外终止
                logger.error("Error processing trace batch.", e);
            }
        }
    }

    private void processBatch(List<TraceInfo> batch) throws TranslateException {
        logger.debug("Processing a batch of {} traces.", batch.size());
        
        List<String> textsToEmbed = batch.stream()
                .map(this::extractTextFromTrace)
                .collect(Collectors.toList());

        // 批量生成向量
        float[][] embeddings = predictor.batchPredict(textsToEmbed).toArray(new float[0][]);

        List<Vector> vectorsToUpsert = Lists.newArrayList();
        for (int i = 0; i < batch.size(); i++) {
            TraceInfo info = batch.get(i);
            float[] embedding = embeddings[i];
            
            // 构建Pinecone的Vector对象
            Vector vector = new Vector();
            vector.setId(info.getTraceId());
            vector.setValues(Lists.newArrayList(
                java.util.stream.IntStream.range(0, embedding.length)
                                          .mapToObj(idx -> embedding[idx])
                                          .collect(Collectors.toList())
            ));

            // 添加元数据,用于后续的过滤查询
            Map<String, Descriptors.Descriptor> metadata = Map.of(
                "service", info.getSegment().getService(),
                "instance", info.getSegment().getServiceInstance()
            );
            vector.setMetadata(metadata);
            vectorsToUpsert.add(vector);
        }

        // 批量写入Pinecone
        if (!vectorsToUpsert.isEmpty()) {
            try {
                pinecone.upsert(config.getIndexName(), vectorsToUpsert);
                logger.info("Successfully upserted {} vectors to Pinecone index '{}'.", vectorsToUpsert.size(), config.getIndexName());
            } catch(Exception e) {
                logger.error("Failed to upsert vectors to Pinecone.", e);
            }
        }
    }
    
    // 从Trace Segment中提取关键文本信息用于向量化
    private String extractTextFromTrace(TraceInfo traceInfo) {
        StringBuilder sb = new StringBuilder();
        SegmentObject segment = traceInfo.getSegment();
        sb.append("Service: ").append(segment.getService()).append(". ");
        
        for (SpanObject span : segment.getSpansList()) {
            if (span.getIsError()) {
                sb.append("Error in endpoint: ").append(span.getOperationName()).append(". ");
                for (Log log : span.getLogsList()) {
                    log.getDataList().forEach(kv -> {
                        if ("stack".equals(kv.getKey()) || "error.kind".equals(kv.getKey())) {
                            sb.append(kv.getValue()).append(" ");
                        }
                    });
                }
            }
            // 也可以提取一些关键的业务Tag
            span.getTagsList().forEach(tag -> {
                if (tag.getKey().startsWith("biz.")) { // 自定义业务标签前缀
                    sb.append(tag.getKey()).append(":").append(tag.getValue()).append(". ");
                }
            });
        }
        return sb.toString().trim().replaceAll("\\s+", " ");
    }
    
    // 用于在队列中传递信息的内部类
    private static class TraceInfo {
        private final String traceId;
        private final SegmentObject segment;
        
        public TraceInfo(String traceId, SegmentObject segment) {
            this.traceId = traceId;
            this.segment = segment;
        }

        public String getTraceId() { return traceId; }
        public SegmentObject getSegment() { return segment; }
    }
}

这段代码中的关键设计:

  • 异步批处理: gRPC 的 onNext 方法必须快速返回,不能执行耗时操作。因此,我们使用 BlockingQueue 将接收到的 Trace 暂存,由一个独立的后台线程进行消费、向量化和上传。这是一个在真实项目中非常重要的模式。
  • 信息提取: extractTextFromTrace 方法是可定制的。当前实现提取了服务名、异常端点、堆栈信息和自定义业务标签。在实际应用中,应根据业务场景精确提取最有价值的文本。
  • 错误处理: 在初始化、队列操作和 Pinecone API 调用等关键位置都加入了详尽的日志和异常处理,确保插件的健壮性。

步骤三:打包与配置

  1. 打包: 运行 mvn clean package,会生成一个 ...-shaded.jar 文件。

  2. 部署:

    • 将这个 fat JAR 复制到 SkyWalking OAP 发行版的 oap-libs 目录下。

    • config/alarm-settings.yml 同级目录下,创建一个新的配置文件 application.yml 的激活文件 config/receiver-pinecone-vector/provider.yml

      config/receiver-pinecone-vector/provider.yml:

      default:

      这个文件告诉 SkyWalking 加载我们这个名为 pinecone-vector 的模块,并使用 default 这个 provider。

  3. 配置 OAP: 修改 config/application.yml,添加我们的自定义配置,并激活模块。

# 在文件末尾添加我们的模块
# ... SkyWalking原有配置 ...

# 激活我们的自定义模块
core:
  default:
    # ...
    # 在 activeModules 列表中加入我们的模块名
    activeModules: ${SW_CORE_ACTIVE_MODULES:...,pinecone-vector} 

# 添加我们模块的专属配置
receiver-pinecone-vector:
  default:
    apiKey: ${SW_PINECONE_API_KEY:"YOUR_PINECONE_API_KEY"}
    environment: ${SW_PINECONE_ENV:"YOUR_PINECONE_ENVIRONMENT"}
    projectId: ${SW_PINECONE_PROJECT_ID:"YOUR_PINECONE_PROJECT_ID"}
    indexName: "skywalking-traces"
    # 要监控的服务名,使用正则表达式,例如 "service-a|service-b"
    servicesToWatch: ".*" 

注意,使用环境变量 (${...}) 来传递敏感信息(如 API Key)是生产环境的最佳实践。

步骤四:Docker化与部署

为了让整个系统一键启动,我们使用 Docker Compose。

1. 自定义 OAP 的 Dockerfile

# 使用官方的 SkyWalking OAP 镜像作为基础
FROM apache/skywalking-oap-server:9.6.0

# 作者信息
LABEL maintainer="[email protected]"

# 将我们构建的自定义 receiver fat JAR 复制到 oap-libs 目录
COPY ./target/skywalking-pinecone-receiver-1.0.0-shaded.jar /skywalking/oap-libs/

# 复制我们自定义的 provider.yml 配置文件
COPY ./provider.yml /skywalking/config/receiver-pinecone-vector/provider.yml

这里的 provider.yml 就是前面创建的那个空内容的 default: 文件。

2. docker-compose.yml

这个文件将启动 Elasticsearch (SkyWalking 的默认存储)、我们定制的 OAP 和 SkyWalking UI。

version: '3.8'

services:
  elasticsearch:
    image: elasticsearch:7.17.10
    container_name: es-for-skywalking
    ports:
      - "9200:9200"
    healthcheck:
      test: ["CMD-SHELL", "curl --silent --fail localhost:9200/_cluster/health || exit 1"]
      interval: 30s
      timeout: 10s
      retries: 3
    environment:
      - discovery.type=single-node
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms1g -Xmx1g"

  oap:
    # 使用我们上面Dockerfile构建的镜像
    build: .
    container_name: skywalking-oap-custom
    depends_on:
      elasticsearch:
        condition: service_healthy
    ports:
      - "11800:11800"
      - "12800:12800"
    healthcheck:
      test: ["CMD-SHELL", "/skywalking/bin/swctl ch"]
      interval: 30s
      timeout: 10s
      retries: 3
    environment:
      - SW_STORAGE=elasticsearch
      - SW_STORAGE_ES_CLUSTER_NODES=elasticsearch:9200
      # 通过环境变量传入Pinecone的配置
      - SW_PINECONE_API_KEY=YOUR_PINECONE_API_KEY
      - SW_PINECONE_ENV=YOUR_PINECONE_ENVIRONMENT
      - SW_PINECONE_PROJECT_ID=YOUR_PINECONE_PROJECT_ID
      - JAVA_OPTS=-Xms2g -Xmx2g

  ui:
    image: apache/skywalking-ui:9.6.0
    container_name: skywalking-ui
    depends_on:
      oap:
        condition: service_healthy
    ports:
      - "8080:8080"
    environment:
      - SW_OAP_ADDRESS=http://oap:12800

networks:
  default:
    driver: bridge

现在,在项目根目录运行 docker-compose up --build,整个链路分析管道就启动了。

成果验证与查询

当系统运行后,任何被监控的应用产生的异常链路都会被处理并存入 Pinecone。要验证其效果,我们可以编写一个简单的 Python 脚本来模拟故障排查过程。

query_similar_traces.py:

import os
import pinecone
from sentence_transformers import SentenceTransformer

# 1. 初始化
pinecone.init(
    api_key=os.environ.get("PINECONE_API_KEY"),
    environment=os.environ.get("PINECONE_ENVIRONMENT")
)
index = pinecone.Index("skywalking-traces")

model = SentenceTransformer('all-MiniLM-L6-v2')

# 2. 模拟一个新的、未见过的异常日志
new_error_log = """
Service: order-service. Error in endpoint: /orders/create. 
NullPointerException at com.example.orderservice.service.OrderServiceImpl.createOrder(OrderServiceImpl.java:123)
biz.userId:user-abc biz.productId:prod-xyz
"""

# 3. 将新日志向量化
query_vector = model.encode(new_error_log).tolist()

# 4. 在 Pinecone 中查询最相似的向量
results = index.query(
    vector=query_vector,
    top_k=5, # 找出最相似的5个历史trace
    include_metadata=True
)

# 5. 输出结果
print("Found similar historical traces:")
for match in results['matches']:
    trace_id = match['id']
    score = match['score']
    service = match['metadata'].get('service', 'N/A')
    print(f"  - Trace ID: {trace_id}")
    print(f"    Similarity Score: {score:.4f}")
    print(f"    Service: {service}")
    print(f"    SkyWalking UI Link: http://localhost:8080/?traceId={trace_id}\n")

当运行这个脚本时,即使 new_error_log 的文本与历史日志不完全一样,只要语义上接近,Pinecone 就能返回相似度最高的历史 traceId。运维人员拿到这些 traceId,可以直接在 SkyWalking UI 中打开,查看完整的调用链上下文,从而快速获得解决思路。

graph TD
    subgraph Monitored Application
        A[Java Agent] --> B{gRPC};
    end

    subgraph Custom SkyWalking OAP in Docker
        B --> C[OAP gRPC Receiver];
        C --> D[Trace Analysis Core];
        D -- TraceSegment --> E[PineconeVectorHandler];
        E -- hasError? --> F{Filter};
        F -- Yes --> G[Async Queue];
    end
    
    subgraph Background Processing
        G --> H[Batch Processor];
        H -- Text --> I[Embedding Model];
        I -- Vector --> J[Pinecone Client];
    end
    
    subgraph Cloud Service
        J -- Upsert --> K[(Pinecone Index)];
    end

    subgraph SRE Workflow
        L[New Incident Log] --> M[Query Script];
        M -- Vector --> N[Pinecone Client];
        N -- Query --> K;
        K -- Top-K traceIds --> M;
        M --> O{List of similar traceIds};
        O --> P[SkyWalking UI];
    end

当前方案的局限性在于 Embedding 模型的选择。我们使用的是一个通用的文本模型,对于理解 Java 堆栈这类高度结构化的文本可能不是最优的。未来的一个重要优化方向是,使用公司内部的代码和历史错误日志,对一个基础模型进行微调(Fine-tuning),使其能更精准地理解代码层面的语义相似性。此外,当前的实现是实时处理所有异常链路,对于流量极高的系统,可能会带来显著的成本。可以考虑增加采样率配置,或者只处理特定类型的、更严重的异常,以在成本和洞察力之间找到平衡。


  目录