天机阁

tRPC-Go 分布式互斥定时器

2022-05-15 · 3 min read
Go

一、前言

在业务开发中,我们经常使用定时任务去处理一些任务,比如定时更新数据等。本文将在 tRPC-Go 的定时任务教程基础上,补充说明如果有多个定时任务,如何共用一个定时策略。

二、tRPC-Go 搭建定时器服务

0. tRPC-Go 搭建定时器官方教程

tRPC-Go 定时器插件提供了一套定时器开发框架,可以方便的实现可配置的本地定时器和分布式互斥定时器服务。tRPC-Go搭建定时器服务

三、多个定时任务共用定时策略

1. 注册两个定时服务

假设我们有两个定时任务方法,分别是 TaskA 和 TaskB。
我们首先将这两个定时任务方法TaskA、TaskB分别绑定到两个服务 ServiceA、ServiceB上。
然后在 main 方法中注册这两个服务:

import 	"git.code.oa.com/trpc-go/trpc-database/timer"

timer.RegisterHandlerService(s.Service("ServiceA"), TaskA)
timer.RegisterHandlerService(s.Service("ServiceB"), TaskB)

2. 注册两个分布式调度策略

根据官方教程 tRPC-Go搭建定时器服务,我们使用 Redis 存储竞争锁实现了分布式调度策略如下:

var (
	// 分布式锁的 Key,其中 %s 表示 serviceName
	// serviceName的名称为 trpc.timer.file_server.{registed_scheduler}
	lockKey = "timer:%s"
)

// DistributedScheduler 分布式任务调度器
type DistributedScheduler struct{}

// Schedule 分布式调度器的方法实现
func (s *DistributedScheduler) Schedule(serviceName string, newNode string, holdTime time.Duration) (string, error) {
	r := credis.GetRedisClient()
	// 抢占分布式锁
	key := fmt.Sprintf(lockKey, serviceName)
	val, err := r.Lock(key, 2*time.Hour, holdTime)
	log.Infof("[Schedule] lock key: %s, val: %s, err: %+v\n", key, val, err)
	// 如果 err != nil,则此节点不执行定时任务
	return newNode, err
}

在 main 方法中注册两个分布式调度策略 scheduleA、scheduleB:

timer.RegisterScheduler("scheduleA", &DistributedScheduler{})
timer.RegisterScheduler("scheduleB", &DistributedScheduler{})

其中 DistributedScheduler{} 即为分布式任务调度器的 struct。

3. 配置 trpc_go.yaml

在 trpc_go.yaml 文件中配置这两个定时任务
参考配置如下:

server: #服务端配置
  service: #业务服务提供的service,可以有多个
    - name: ServiceA      #service的路由名称
      ip: 127.0.0.1           #服务监听ip地址 可使用占位符 ${ip},ip和nic二选一,优先ip
      #nic: eth0
      port: 8002                #服务监听端口 可使用占位符 ${port}
      network: "0 0 14 * * * ?startAtOnce=0&scheduler=schedulerA"   # 每天14:00分执行
      protocol: timer           # 定时器
      timeout: 1000            #请求最长处理时间 单位 毫秒
    - name: ServiceB      #service的路由名称
      ip: 127.0.0.1            #服务监听ip地址 可使用占位符 ${ip},ip和nic二选一,优先ip
      #nic: eth0
      port: 8003               #服务监听端口 可使用占位符 ${port}
      network: "0 0 15 * * * ?startAtOnce=0&scheduler=schedulerB"   # 每天15:00分执行
      protocol: timer         # 定时器
      timeout: 1000           #请求最长处理时间 单位 毫秒