源码解析|数据表监听

编辑
更新时间: 2024-09-18 · 连文湧

数据表结构要求

以应用级服务发现元数据表为例

param type description
data_center String local data center
revision String revision
app_name String appName
client_version String clientVersion
base_params String base_params
service_params String service_params
deleted boolean delete or no
gmt_create Date create time
gmt_modified Date last update time

写入

写入缓存

cachedExecutor.execute 执行缓存写入操作防止无法瞬间处理大量相同数据写入操作。防止大量节点上报相同的应用级服务发现元数据。

public V execute(K key, Callable<V> callable) throws Exception {
  V v = cache.getIfPresent(key);
  if (v != null) {
      //发现元素,命中+1
    hitCount.increment();
    return v;
  }
  return cache.get(
      key,
      () -> {
       //未发现元素,未命中+1
        missingCount.increment();
        onMiss(key);
        return callable.call();
      });
}

二参数的 execute 统计修订号revision的命中和未命中次数统计 metrics,在通讯数据压缩暴露到 prometheus

写入数据

protected void refreshEntryToStorage(AppRevisionDomain entry) {
  try {
    cachedExecutor.execute(
        entry.getRevision(),
        () -> {
            //判断是否执行replace写入数据
          if     (appRevisionMapper.heartbeat(entry.getDataCenter(), entry.getRevision()) == 0) {
            appRevisionMapper.replace(entry);
          }
         //省略日志操作
  }
}

cachedExecutor 默认指定 silentMs=10s,当缓存项在指定的时间段内没有更新就会被回收(移除 key),需要等待获取新值才会返回。10s 内没有更新说明数据量不大,也不需要进行写入缓存的操作。

通过 hearbeat() 底层通过 update 原子判断数据是否存在。刷新 gmt_modified 字段时间防止被误删。

 update app_revision set gmt_modified=CURRENT_TIMESTAMP  where data_center = #{dataCenter}
    and revision=#{revision}
    and deleted = '0'

update 没有命中的时候,使用 replace,保证能生成一个新的 id,用于后续的 watch 方法监听表获取元素更新变化,并刷新 gmt_modified 防止字段超时被删除。

replace into app_revision(
       data_center,
       revision,
       app_name,
       client_version,
       base_params,
       service_params,
       deleted,
       gmt_create,
       gmt_modified
   )
  values (
       #{dataCenter},
       #{revision},
       #{appName},
       #{clientVersion},
       #{baseParams},
       #{serviceParams},
       #{deleted},
       CURRENT_TIMESTAMP,
       CURRENT_TIMESTAMP
   )

获取数据增量改变

Watch

数据库没有提供订阅的操作,watch 方法缓存最新 id 值,增量读取数据库中更新的 id 值并更新最新的 id 值保证 lastLoadId 一直保持最新的状态

private void watch() {
  syncStart();
  try {
    long start = lastLoadId;
    if (listStableEntries(start, 1).size() == 0) {
      return;
    }
    logger.info("start watch from {}", start);
    long maxId =
        listToTail(
            (T entry) -> {
              container.onEntry(entry);
              logger.info("watch received entry: {}", entry);
            },
            start,
            100);
    logger.info("end watch to {}", maxId);
    lastLoadId = maxId;
  } finally {
    syncEnd();
  }
}

listStableEntries 提供从数据库获取最新数据的 id 值的方法,写入数据的方法底层通过 replace 写入,因此一定会有新的 id 生成。

机制存在问题:如果数据中间出现不连续的间断,无法得到更新后存在间隔的 id 值。

此方法主要列出所有创建完成 1s 后更新的元素。

if (entry.getGmtCreate().getTime() >= now.getTime() - DB_INSERT_DELAY_MS) {
  break;
}

其中 DB_INSERT_DELAY_MS=1s

为什么是 1s?

内部是分布式数据库,表内数据会被拆分到多个机器上,每台机器批量获取 id 属性,此时如果大量并发插入,可能产生 id 值高的已经入库,而低 id 还没有完全写入,这时 watch 方式会出现问题,漏掉低 id 值的数据,直到 list 调用才能被重新填入。而这种问题产生的间隔很短,因此 1s 的间隔能保证id值较低的数据已经被填入。

listToTail 方法返回当前最大可靠 id 值

private long listToTail(EntryCallable<T> callable, final long start, final int page) {
  long curStart = start;
  while (true) {
    List<T> entries = listStableEntries(curStart, page);
    if (CollectionUtils.isEmpty(entries)) {
      break;
    }
    for (T entry : entries) {
      callable.onEntry(entry);
      curStart = Math.max(curStart, entry.getId());
    }
    ConcurrentUtils.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
  }
  return curStart;
}

线程沉睡 10ms 为了防止某一时刻在进行读操作时有大量数据写入,提前先将数据放到 entries 进行更新。为下一个 watch() 减少数据的开销

List

提供定时的全量修正

private void list() {
  syncStart();
  try {
    C newContainer = containerFactory();
    long maxId = listToTail(newContainer::onEntry, 0, 1000);
    logger.info("end list to {}", maxId);
    preList(newContainer);
    this.container = newContainer;
    lastLoadId = maxId;
  } finally {
    syncEnd();
  }
}

preList() 将内存中全量数据和数据库中的数据进行一次对比,弥补了 Watch 对于非连续数据检测机制的不足。

ListLoop 周期:在 15-30 分钟之间产生一个随机的时间

private final class ListLoop extends WakeUpLoopRunnable {
  @Override
  public int getWaitingMillis() {
    int base = listLoopIntervalMs / 2;
    return (int) (base + Math.random() * base);
  }

因为进行一次 preList() 的全量数据比较需要较长时间,并且发生外部操作使数据的 id 值中断的概率比较小,只是一种检测意外事件发生的机制,因此间隔远大于 watchLoop 的间隔周期。

清理失效数据

正常存活的数据定期刷新 gmt_modified 延长存活周期。

public List<AppRevision> getExpired(Date beforeTime, int limit) {
  List<AppRevisionDomain> expired =
      appRevisionMapper.getExpired(
          defaultCommonConfig.getClusterId(tableName()), beforeTime, limit);
  return AppRevisionDomainConvertor.convert2Revisions(expired);
}

如果一个数据长时间得不到刷新,可以判断这个数据已经失效,更改 deleted=‘1’watch 会立刻感知删除事件。

后续会定期清理 deleted=‘1’ 和指定时间之前的数据。

<delete id="cleanDeleted">
    <![CDATA[
    delete from app_revision where data_center=#{dataCenter} and gmt_modified < #{beforeTime} and deleted='1'
    limit #{limit}
    ]]>
</delete>

通过 watch 机制自动感知实时到期失效数据, gmt_modified 时间以及 deleted 的值共同判断,可以准确删除大量失效的数据,保证数据库的存储资源得到释放。

总结

SOFARegistry 内部的部分配置的更新需要及时感知,比如应用级服务发现的元数据变更,常见数据库并没有数据表变化通知的接口,SOFARegistry 实现了对于数据表更新实时watch的机制。

watch 通过更新缓存id实现实时检测增量变化,实时感知失效数据。list 提供定时全量修正机制,补足 watch 对于的不足,缓存机制能防止大量节点同时上传大量相同数据造成可能的宕机。