在 Ruby on Rails 中构建基于 HBase 和 OAuth 2.0 的多租户向量检索服务


我们的一个核心系统中,所有文档的元数据和部分非结构化内容都存储在 HBase 集群里。数据量已经达到百亿行级别,HBase 的横向扩展能力和对稀疏数据的支持一直表现良好。最近,业务方提出了一个新需求:实现基于文档内容的语义相似度搜索,而不仅仅是关键词匹配。这意味着我们需要引入向量检索。团队的第一反应是引入专用的向量数据库,如 Milvus 或 Weaviate。但在评估了运维成本、数据同步的复杂性以及现有团队对新组件的学习曲线后,我们决定探索一个更具挑战性的路径:能否在现有的 HBase 基础设施之上,构建一个“足够好”的向量检索服务?

这个决策的核心驱动力是成本和风险控制。引入一个全新的、有状态的存储系统对于我们这个规模的团队来说,是一个重大的运维承诺。如果能利用 HBase 解决 80% 的问题,即便性能不是最优,也能为业务快速提供价值,并为未来是否迁移到专用方案提供数据支撑。

我们的技术栈是 Ruby on Rails,负责提供 API 服务。同时,系统是多租户的,数据隔离是第一要务。因此,整个方案必须整合现有的 OAuth 2.0 认证体系,确保任何检索请求都严格限制在租户自己的数据范围内。

第一步:HBase 中的向量数据建模

在 HBase 中存储向量,首先要解决的就是 Schema 设计。HBase 是一个面向列的 Key-Value 存储,它没有原生的高维向量索引能力。所有的查询效率都取决于 Row Key 的设计。

一个文档向量通常包含三部分信息:唯一的文档 ID、所属租户 ID、以及向量本身。

Row Key 设计:
为了实现高效的租户数据隔离和点查,Row Key 的设计至关重要。我们采用 tenant_id 作为前缀,后面跟上 document_id。为了避免热点问题,tenant_id 最好是经过哈希处理的字符串,但为了演示清晰,我们这里使用原始 ID。

[hashed_tenant_id]:[document_id]

例如:tnt_abcde_hash:doc_12345

Column Family 设计:
我们将数据划分为两个列族:

  1. meta:存放文档的元数据,如标题、创建时间等。
  2. vec:专门存放向量数据。

vec 列族中,我们只使用一个列 v 来存储完整的向量。直接存储浮点数数组会非常低效,最佳实践是将其序列化为二进制格式。MessagePack 是一个不错的选择,它比 JSON 更紧凑,解析速度也更快。

在 HBase Shell 中创建表的 DDL 如下:

# vectors_documents table definition
create 'vectors_documents', {NAME => 'meta', VERSIONS => 1}, {NAME => 'vec', VERSIONS => 1, COMPRESSION => 'SNAPPY'}

我们为 vec 列族启用了 Snappy 压缩,因为向量数据通常有冗余,压缩能有效减少存储空间。

第二步:Rails 服务的向量写入实现

接下来是在 Rails 应用中实现向量的写入逻辑。我们使用 hbase-jruby 这个 gem,它通过 JRuby 调用原生的 HBase Java 客户端,性能比纯 Ruby 的 Thrift 客户端要好得多。

首先,需要一个 HBase 的连接初始化器。

# config/initializers/hbase_client.rb

require 'hbase-jruby'

# 在生产环境中,配置应该从 Rails secrets 或环境变量中读取
hbase_config = {
  zookeeper_quorum: ENV.fetch('HBASE_ZOOKEEPER_QUORUM', 'localhost:2181'),
  # Kerberos acls for production
  # 'hbase.security.authentication' => 'kerberos',
  # 'hbase.master.kerberos.principal' => 'hbase/[email protected]'
}

# 使用连接池来管理 HBase 连接
# HBase::Client.new(...) 是线程安全的,但为每个请求创建新连接开销大
$hbase_connection_pool = ConnectionPool.new(size: 10, timeout: 5) do
  HBase.client(hbase_config)
end

# 确保在应用退出时关闭所有连接
at_exit do
  $hbase_connection_pool.shutdown { |conn| conn.close }
end

然后,我们创建一个服务类 VectorIngestionService 来封装写入逻辑。

# app/services/vector_ingestion_service.rb

require 'msgpack'

class VectorIngestionService
  TABLE_NAME = 'vectors_documents'.freeze
  META_CF = 'meta'.freeze
  VECTOR_CF = 'vec'.freeze

  # @param tenant_id [String] The ID of the tenant
  # @param document_id [String] The ID of the document
  # @param vector [Array<Float>] The vector embedding
  # @param metadata [Hash] Additional metadata
  # @return [Boolean] Success or failure
  def self.call(tenant_id:, document_id:, vector:, metadata: {})
    raise ArgumentError, 'tenant_id is required' if tenant_id.blank?
    raise ArgumentError, 'document_id is required' if document_id.blank?
    raise ArgumentError, 'vector must be an array of floats' unless vector.is_a?(Array) && vector.all? { |v| v.is_a?(Float) }

    row_key = "#{tenant_id}:#{document_id}"
    
    # 将向量序列化为二进制 MessagePack 格式
    # 这比存储为字符串或 JSON 高效得多
    packed_vector = vector.to_msgpack

    $hbase_connection_pool.with do |hbase_client|
      table = hbase_client.table(TABLE_NAME)
      
      # HBase put 操作是原子的
      table.put(
        row_key,
        "#{VECTOR_CF}:v" => packed_vector,
        "#{META_CF}:doc_title" => metadata[:title].to_s,
        "#{META_CF}:created_at" => Time.now.utc.iso8601
      )
    end
    
    true
  rescue => e
    # 在生产环境中,这里应该是结构化的日志记录
    Rails.logger.error "Failed to ingest vector for doc #{document_id}: #{e.message}"
    false
  end
end

这段代码处理了参数校验、Row Key 构建、向量序列化和实际的 HBase put 操作。使用连接池是生产环境中的标准实践。

第三步:最朴素的检索实现 - 全表扫描

有了数据写入,我们来尝试实现最简单的检索方式:全表扫描。对于一个给定的查询向量,我们遍历指定租户下的所有文档,在应用层计算它们与查询向量的余弦相似度,然后返回最相似的 Top K 个。

# app/services/vector_search_service.rb

class VectorSearchService
  TABLE_NAME = 'vectors_documents'.freeze
  VECTOR_CF = 'vec'.freeze

  # 一个简单的余弦相似度计算辅助模块
  module CosineSimilarity
    def self.call(vec_a, vec_b)
      dot_product = vec_a.zip(vec_b).map { |a, b| a * b }.sum
      norm_a = Math.sqrt(vec_a.map { |x| x**2 }.sum)
      norm_b = Math.sqrt(vec_b.map { |x| x**2 }.sum)
      
      # 避免除以零
      return 0.0 if norm_a.zero? || norm_b.zero?
      
      dot_product / (norm_a * norm_b)
    end
  end

  # @param tenant_id [String]
  # @param query_vector [Array<Float>]
  # @param top_k [Integer]
  # @return [Array<Hash>] an array of {document_id:, score:}
  def self.brute_force_search(tenant_id:, query_vector:, top_k: 10)
    results = []

    $hbase_connection_pool.with do |hbase_client|
      table = hbase_client.table(TABLE_NAME)
      
      # 使用 tenant_id 前缀进行扫描,这是性能关键
      # scan 会返回一个 Enumerator,避免一次性加载所有数据到内存
      scanner = table.scan(start: "#{tenant_id}:", stop: "#{tenant_id};")

      scanner.each do |row|
        packed_vector = row.cells["#{VECTOR_CF}:v"]
        next unless packed_vector

        # 反序列化向量
        stored_vector = MessagePack.unpack(packed_vector)
        document_id = row.key.split(':').last

        score = CosineSimilarity.call(query_vector, stored_vector)
        
        results << { document_id: document_id, score: score }
      end
    end

    # 在内存中排序并取 Top K
    results.sort_by! { |r| -r[:score] }.first(top_k)
  rescue => e
    Rails.logger.error "Brute force search failed for tenant #{tenant_id}: #{e.message}"
    []
  end
end

这种方法的致命弱点是性能。虽然 HBase 的 scan 操作本身是高效的,但我们需要将一个租户下的所有向量数据从 HBase 拉到 Rails 应用内存中,然后进行计算。当一个租户有百万级文档时,网络 I/O 和应用服务器的 CPU 将会是巨大的瓶颈。这种方法只能用于原型验证或数据量极小的场景。

第四步:引入近似最近邻(ANN)索引 - 伪 LSH 实现

为了避免全表扫描,我们需要一种索引结构。既然 HBase 本身不支持,我们就在应用层和 HBase 的数据模型之上构建一个。局部敏感哈希(Locality-Sensitive Hashing, LSH)是一种适合这种场景的 ANN 算法。

LSH 的核心思想是设计一组哈希函数,使得原始空间中距离相近的点,经过哈希后有很大概率落入同一个“桶”中。查询时,我们只需要计算查询向量的哈希,然后只与落在相同桶里的候选向量进行精确的距离计算。

我们将构建一个 LSH 索引表。

HBase LSH 索引表 vectors_lsh_index 的设计:

  • Row Key: [tenant_id]:[hash_band_id]:[lsh_hash_value]
  • Column Family: idx
  • Column: d:[document_id]
  • Value: 1 (或空)

这里的 hash_band_id 是 LSH 中 “Bands” 的概念,用于提高召回率。一个查询向量会被哈希到多个 band 的桶中。查询时,我们会取出所有这些桶中的文档 ID 作为候选集。

graph TD
    A[Query Vector] --> B{LSH Hashing};
    B --> C1[Bucket 1 in Band A];
    B --> C2[Bucket 2 in Band B];
    B --> C3[Bucket 3 in Band C];
    subgraph HBase LSH Index Table
        C1 --> D1[Candidate Docs from Bucket 1];
        C2 --> D2[Candidate Docs from Bucket 2];
        C3 --> D3[Candidate Docs from Bucket 3];
    end
    subgraph Rails Application
        E[Union of Candidate Docs] --> F{Fetch Full Vectors};
        F --> G[Exact Similarity Calculation];
        G --> H[Re-rank & Top K];
    end
    D1 & D2 & D3 --> E;

首先,我们需要一个 LSH 的实现。这里我们用一个简化的版本,基于随机投影。

# lib/lsh_hasher.rb
# 这是一个非常简化的 LSH 实现,用于演示目的。
# 生产环境中需要更严谨的实现和参数调优。
class LshHasher
  attr_reader :num_bands, :num_hashes_per_band, :dim

  def initialize(dim:, num_bands: 8, num_hashes_per_band: 4)
    @dim = dim
    @num_bands = num_bands
    @num_hashes_per_band = num_hashes_per_band
    @num_hashes = num_bands * num_hashes_per_band
    
    # 生成随机投影平面
    # 为了每次应用重启都保持一致,需要持久化这些平面或使用固定的随机种子
    @planes = Array.new(@num_hashes) { Array.new(dim) { rand * 2 - 1 } }
  end

  def hash_vector(vector)
    # 计算哈希签名
    signature = @planes.map do |plane|
      dot_product = vector.zip(plane).map { |v, p| v * p }.sum
      dot_product > 0 ? '1' : '0'
    end.join

    # 将签名切分为多个 band
    bands = signature.chars.each_slice(num_hashes_per_band).map(&:join)
    
    # 为每个 band 计算一个最终的哈希值(这里用简单的字符串哈希)
    bands.map.with_index do |band_signature, i|
      { band_id: i, hash: Zlib.crc32(band_signature).to_s }
    end
  end
end

# 在初始化器中创建全局实例
# config/initializers/lsh_hasher.rb
# 假设我们的向量维度是 768
VECTOR_DIMENSION = 768
$lsh_hasher = LshHasher.new(dim: VECTOR_DIMENSION, num_bands: 16, num_hashes_per_band: 8)

现在,更新我们的写入服务,在写入原始向量的同时,也写入 LSH 索引。

# app/services/vector_ingestion_service.rb (Modified)
class VectorIngestionService
  INDEX_TABLE_NAME = 'vectors_lsh_index'.freeze
  INDEX_CF = 'idx'.freeze

  def self.call(...) # 参数不变
    # ... (前面的代码不变) ...

    lsh_hashes = $lsh_hasher.hash_vector(vector)

    $hbase_connection_pool.with do |hbase_client|
      # 写入原始数据
      data_table = hbase_client.table(TABLE_NAME)
      data_table.put(...)

      # 写入 LSH 索引数据
      index_table = hbase_client.table(INDEX_TABLE_NAME)
      lsh_hashes.each do |h|
        index_row_key = "#{tenant_id}:#{h[:band_id]}:#{h[:hash]}"
        # 列名包含 document_id,方便一次性获取所有 doc id
        index_table.put(index_row_key, "#{INDEX_CF}:d:#{document_id}" => '1')
      end
    end
    
    true
  # ... (错误处理不变) ...
  end
end

有了索引,检索逻辑也需要重写。

# app/services/vector_search_service.rb (Modified)

class VectorSearchService
  INDEX_TABLE_NAME = 'vectors_lsh_index'.freeze
  INDEX_CF = 'idx'.freeze

  def self.lsh_search(tenant_id:, query_vector:, top_k: 10, candidate_multiplier: 10)
    # 1. 计算查询向量的 LSH 哈希
    query_hashes = $lsh_hasher.hash_vector(query_vector)
    
    candidate_doc_ids = Set.new
    
    $hbase_connection_pool.with do |hbase_client|
      # 2. 从索引表中获取所有候选 document_id
      index_table = hbase_client.table(INDEX_TABLE_NAME)
      index_keys = query_hashes.map { |h| "#{tenant_id}:#{h[:band_id]}:#{h[:hash]}" }
      
      # 使用 batch get 高效获取所有候选桶的内容
      index_table.get(index_keys).each do |key, row|
        next if row.nil?
        row.cells.keys.each do |col_name|
          # 列名格式: 'idx:d:doc_id_123'
          candidate_doc_ids.add(col_name.split(':').last)
        end
      end

      # 如果候选集太小,可以考虑查询相邻的桶,这里简化处理
      return [] if candidate_doc_ids.empty?

      # 3. 批量获取候选向量的完整数据
      data_table = hbase_client.table(TABLE_NAME)
      candidate_row_keys = candidate_doc_ids.map { |doc_id| "#{tenant_id}:#{doc_id}" }
      
      full_vectors = {}
      data_table.get(candidate_row_keys).each do |key, row|
        next if row.nil?
        doc_id = key.split(':').last
        packed_vector = row.cells["#{VECTOR_CF}:v"]
        full_vectors[doc_id] = MessagePack.unpack(packed_vector) if packed_vector
      end
      
      # 4. 在内存中对候选集进行精确计算和重排
      results = full_vectors.map do |doc_id, vector|
        {
          document_id: doc_id,
          score: CosineSimilarity.call(query_vector, vector)
        }
      end

      results.sort_by! { |r| -r[:score] }.first(top_k)
    end
  rescue => e
    Rails.logger.error "LSH search failed for tenant #{tenant_id}: #{e.message}"
    []
  end
end

这个 LSH 方案将原先的 O(N) 扫描,变成了两次高效的 HBase batch get 操作。第一次获取候选 ID,第二次获取候选向量。计算量被大大缩小,从租户全量数据减少到了一个较小的候选集。

第五步:API 接口与 OAuth 2.0 安全集成

最后一步是暴露 API 接口,并用 OAuth 2.0 保护起来,确保严格的多租户隔离。我们使用 doorkeeper gem 作为 OAuth 2.0 provider。

首先定义路由:

# config/routes.rb
namespace :api do
  namespace :v1 do
    # 定义需要 OAuth 2.0 保护的范围
    scope_options = { scope: 'vector.search', as: 'vector_search' }
    post 'vectors/search', to: 'vectors#search', **scope_options
  end
end

然后是 Controller 的实现。这里的关键是 doorkeeper_authorize!,它会校验请求头中的 Authorization: Bearer <token>。校验通过后,我们可以通过 doorkeeper_token 对象获取到与该 token 关联的资源所有者(用户或应用),并从中提取出 tenant_id

# app/controllers/api/v1/vectors_controller.rb

module Api
  module V1
    class VectorsController < ApplicationController
      # doorkeeper_authorize! 会处理所有 Token 验证、过期、权限范围检查
      before_action -> { doorkeeper_authorize! :vector_search }

      # 在生产环境中,应该有更健壮的错误处理机制
      rescue_from ArgumentError, with: :render_bad_request

      def search
        # 参数校验
        query_vector = params.require(:vector)
        top_k = params.fetch(:top_k, 10).to_i
        
        unless query_vector.is_a?(Array) && query_vector.all? { |v| v.is_a?(Float) }
          raise ArgumentError, 'vector must be an array of floats'
        end

        # 核心安全逻辑:从 token 中获取租户 ID
        # 假设我们的 OAuth application 或 user 模型有一个 tenant_id 字段
        # 这是防止数据泄露的关键防线
        tenant_id = resolve_tenant_id_from_token
        unless tenant_id
          render json: { error: 'Unauthorized', error_description: 'Cannot resolve tenant from token' }, status: :unauthorized
          return
        end

        # 调用我们的检索服务
        results = VectorSearchService.lsh_search(
          tenant_id: tenant_id,
          query_vector: query_vector,
          top_k: top_k
        )

        render json: { data: results }, status: :ok
      end

      private

      def resolve_tenant_id_from_token
        # doorkeeper_token 是 doorkeeper 提供的 helper
        return nil unless doorkeeper_token

        # 场景1:Client Credentials Flow,token 属于一个应用
        if doorkeeper_token.resource_owner_id.nil?
          application = Doorkeeper::Application.find(doorkeeper_token.application_id)
          application&.tenant_id # 假设 Application 模型有关联的 tenant_id
        # 场景2:Authorization Code Flow,token 属于一个用户
        else
          user = User.find(doorkeeper_token.resource_owner_id)
          user&.tenant_id # 假设 User 模型有关联的 tenant_id
        end
      end
      
      def render_bad_request(exception)
        render json: { error: 'bad_request', error_description: exception.message }, status: :bad_request
      end
    end
  end
end

这段代码确保了 API 调用者必须提供一个有效的、具备 vector.search 范围的 Bearer Token。更重要的是,我们从 Token 自身绑定的实体(应用或用户)中解析出 tenant_id,并将其强制注入到后续的 HBase 查询中。这从根本上杜绝了用户 A 越权查询用户 B 数据的可能性。

方案的局限性与未来展望

我们成功地在现有的 HBase 和 Rails 技术栈上,构建了一个具备多租户安全隔离的向量检索服务。它避免了引入新存储系统带来的运维复杂性,并能快速响应业务需求。

然而,这个方案并非银弹,它存在明显的局限性:

  1. 性能与精度权衡:LSH 是一种“近似”算法。它的性能(召回率)高度依赖于参数(num_bands, num_hashes_per_band)的调优,这是一个复杂的、需要反复实验的过程。其性能和精度通常无法与基于图索引(如 HNSW)的专用向量数据库相媲美。
  2. 索引维护成本:我们实际上维护了两份数据:原始向量表和 LSH 索引表。数据写入的延迟增加,且需要保证两者之间的一致性。删除和更新操作也变得更加复杂,需要同时操作两个表。
  3. 计算下推的缺失:所有的相似度计算都在 Rails 应用层完成。更理想的架构是利用 HBase 的协处理器(Coprocessor),将部分计算逻辑(例如对候选集的精确计算)下推到 HBase RegionServer 上执行,从而大幅减少网络传输的数据量。这是一个重要的优化方向,但会增加 Java 开发和部署的复杂性。

总而言之,这个基于 HBase 的向量检索方案是一个典型的工程权衡产物。它是在特定约束(不引入新存储)下,为解决实际问题而设计的务实方案。对于中等规模的向量数据和对延迟不是极端敏感的场景,它是一个可行的起点。当业务发展到一定阶段,数据量和 QPS 要求都急剧增长时,积累的经验和数据将为我们迁移到更专业的向量数据库(如使用 OpenSearch、Elasticsearch 的向量插件,或 Milvus)提供坚实的基础和明确的动力。


  目录