asynq 概述asynq 是由 Go 语言编写的简单、可靠、高效的分布式任务队列。 Go中简单、可靠、高效的分布式任务队列https://github.com/hibiken/asynqasynq 工作原理概述: 客户端将任务放入队列 服务器从队列中拉取任务并为每个任务启动一个工作协程 任务由多个 worker 同时处理任务队列用作跨多态机器分配工作的机制。一个系统 key 由多个工作服务器和代理组成,让位于高可用性和水平扩展。 特征 保证至少执行一次任务 任务调度 失败任务的重试 worker 崩溃时自动恢复任务 加权优先级队列 严格的优先队列 添加任务的低延迟,因为 Redis 中的写入速度很快 使用唯一选项对任务进行重复数据删除 允许每个任务超时和截止日期 允许聚合任务组以批处理多个连续操作 支持中间件的灵活处理程序接口 能够暂停队列以停止处理队列中的任务 定期任务 支持Redis Cluster 实现自动分片和高可用 支持Redis Sentinels 以实现高可用性 与 Prometheus 集成以收集和可视化队列指标 用于检查和远程控制队列和任务的 Web UI CLI 检查和远程控制队列和任务 asynq 的基本使用下面我将基于 asynq v0.24.0 版本,实现 asynq 的简单使用。 客户端:注册立即执行的任务 和 定时任务 服务端:处理注册的任务,完成业务逻辑等 代码目录结构如下: $ tree.├── client // 客户端服务│ ├── client.go // main函数,调用asynq完成注册│ └── tasks│ └── tasks.go // 客户端创建任务└── server // 服务端服务 ├── server.go // main函数,调用asynq监听任务并处理 └── tasks └── tasks.go // 服务端处理任务 客户端创建一个立即发送邮件的任务 和 定时生成报表的任务,分别使用asynq 的 NewClient 和 NewScheduler。 // test/asynq/client/tasks/tasks.go package tasksimport ( 'encoding/json' 'fmt' 'github.com/hibiken/asynq') const ( TypeEmailDelivery = 'asynq:email:delivery' // 立即发送邮件任务 TypeGenerateDataReport = 'asynq:generate:data:report' // 定时生成报表任务) // EmailDeliveryPayload 立即发送邮件负载type EmailDeliveryPayload struct { UserId string TemplateId string} // NewEmailDeliveryTask 创建一个立即发送邮件任务,为指定用户发送func NewEmailDeliveryTask(data EmailDeliveryPayload) (*asynq.Task, error) { payload, err := json.Marshal(data) if err != nil { return nil, err } // asynq.NewTask // 传入任务名称、序列化的负载信息以及opts配置项 return asynq.NewTask(TypeEmailDelivery, payload), nil} // NewGenerateDataReportTask 创建一个定时生成报表任务,处理所有用户func NewGenerateDataReportTask() (*asynq.Task, error) { return asynq.NewTask(TypeGenerateDataReport, nil), nil} // RegisterEmailDeliveryTask 立即发送邮件任务func RegisterEmailDeliveryTask(client *asynq.Client, data EmailDeliveryPayload) error { task, err := NewEmailDeliveryTask(data) if err != nil { return err } // Enqueue task to be processed immediately. taskInfo, err := client.Enqueue(task, asynq.MaxRetry(1)) if err != nil { return err } fmt.Printf('enqueued task id: %s,queue: %s\n', taskInfo.ID, taskInfo.Queue) return nil} // RegisterGenerateDataReportTask 注册生成报表任务func RegisterGenerateDataReportTask(scheduler *asynq.Scheduler, cronspec string) error { task, err := NewGenerateDataReportTask() if err != nil { return err } entryId, err := scheduler.Register(cronspec, task, asynq.MaxRetry(3)) if err != nil { return err } fmt.Printf('register entryId: %s\n', entryId) return nil} // test/asynq/client/client.go package main import ( 'fmt' 'github.com/hibiken/asynq' 'test/asynq/client/tasks') func main() { redisOpt := asynq.RedisClientOpt{Addr: '127.0.0.1:6379'} // redis 集群方式 //redisOpt := asynq.RedisClusterClientOpt{ // Addrs: []string{'127.0.0.1:7001', '127.0.0.1:7002', '127.0.0.1:7003'}, //} // 1. 注册立即执行的任务 client := asynq.NewClient(redisOpt) err := tasks.RegisterEmailDeliveryTask(client, tasks.EmailDeliveryPayload{ UserId: 'user_id', TemplateId: 'template_id', }) if err != nil { return } defer client.Close() // 2. 注册定时任务 scheduler := asynq.NewScheduler(redisOpt, &asynq.SchedulerOpts{ LogLevel: asynq.InfoLevel, PostEnqueueFunc: handlePostEnqueue, }) cronspec := '* * * * *' //'1 2 * * *' // 每天 02:01:00 执行定时任务,最大重试次数3次. err = tasks.RegisterGenerateDataReportTask(scheduler, cronspec) if err != nil { return } err = scheduler.Run() // 这里需要调用run方法,别忘记了 if err != nil { fmt.Printf('scheduler run error: %v', err) return } defer scheduler.Shutdown()} func handlePostEnqueue(taskInfo *asynq.TaskInfo, err error) { fmt.Printf('task id: %s, queue: %s, err: %+v\n', taskInfo.ID, taskInfo.Queue, err)}服务端基于客户端注册的任务,服务端启动 handler 来处理这些任务。 // test/asynq/server/tasks/tasks.gopackage tasks import ( 'context' 'encoding/json' 'fmt' 'github.com/hibiken/asynq' 'time') const ( TypeEmailDelivery = 'asynq:email:delivery' // 立即发送邮件任务 TypeGenerateDataReport = 'asynq:generate:data:report' // 定时生成报表任务) // EmailDeliveryPayload 立即发送邮件负载type EmailDeliveryPayload struct { UserId string TemplateId string} // HandleEmailDeliveryTask 处理发送邮件任务func HandleEmailDeliveryTask(ctx context.Context, t *asynq.Task) error { var data EmailDeliveryPayload err := json.Unmarshal(t.Payload(), &data) if err != nil { return err } // do something fmt.Printf('email delivery data: %+v\n', data) return nil} // HandleGenerateDataReportTask 处理生成报表任务func HandleGenerateDataReportTask(ctx context.Context, t *asynq.Task) error { // do something fmt.Printf('generate data report, time: %s\n', time.Now().Format(time.RFC3339)) time.Sleep(time.Second*30) return nil} // test/asynq/server/server.gopackage main import ( 'context' 'fmt' 'github.com/hibiken/asynq' 'test/asynq/server/tasks') func main() { redisOpt := asynq.RedisClientOpt{Addr: '127.0.0.1:6379'} // redis 集群方式 //redisOpt := asynq.RedisClusterClientOpt{ // Addrs: []string{'127.0.0.1:7001', '127.0.0.1:7002', '127.0.0.1:7003'}, //} config := asynq.Config{ // Specify how many concurrent workers to use Concurrency: 10, // Optionally specify multiple queues with different priority. Queues: map[string]int{ 'critical': 6, 'default': 3, 'low': 1, }, ErrorHandler: asynq.ErrorHandlerFunc(errHandler), } // mux maps a type to a handler mux := asynq.NewServeMux() RegisterTaskHandlers(mux) server := asynq.NewServer(redisOpt, config) err := server.Run(mux) if err != nil { fmt.Printf('async run error: %v', err) return }} func errHandler(ctx context.Context, task *asynq.Task, err error) { fmt.Printf('task type: %s, payload: %s, err: %+v\n', task.Type(), string(task.Payload()), err)} // RegisterTaskHandlers 注册任务func RegisterTaskHandlers(mux *asynq.ServeMux) { mux.HandleFunc(tasks.TypeEmailDelivery, tasks.HandleEmailDeliveryTask) mux.HandleFunc(tasks.TypeGenerateDataReport, tasks.HandleGenerateDataReportTask)}分别启动客户端和服务端,运行结果如下: 命令行工具asynq 提供了命令行工具,具体安装如下: go install github.com/hibiken/asynq/tools/asynq@latest命令如下: $ asynq dash --helpDisplay interactive dashboard. USAGE asynq dash [flags] FLAGS --help Help for dash --refresh Interval between data refresh (default: 8s, min allowed: 1s) INHERITED FLAGS --cluster Connect to redis cluster --cluster_addrs List of comma-separated redis server addresses --config Config file to set flag defaut values (default is $HOME/.asynq.yaml) --db Redis database number (default is 0) --password Password to use when connecting to redis server --tls_server Server name for TLS validation --uri Redis server URI EXAMPLES $ asynq dash $ asynq dash --refresh=3s LEARN MORE Use 'asynq--help' for more information about a command. Web UIasynq 也提供了Web UI,支持Docker部署和继承到代码中,具体教程可见: Web UI for monitoring & admininstering Asynq task queuehttps://github.com/hibiken/asynqmon下面使用客户端嵌入 asynqmon 示例: // test/asynq/client/client.gopackage main import ( 'github.com/hibiken/asynq' 'github.com/hibiken/asynqmon' 'net/http') func main() { redisOpt := asynq.RedisClientOpt{Addr: '127.0.0.1:6379'} startDashboard(redisOpt)} func startDashboard(r asynq.RedisConnOpt) { go func() { h := asynqmon.New(asynqmon.Options{ RootPath: '/dashboard', // RootPath specifies the root for asynqmon app RedisConnOpt: r, }) // Note: We need the tailing slash when using net/http.ServeMux. http.Handle(h.RootPath()+'/', h) http.ListenAndServe(':8090', nil) }()} 总结 asynq 作为一个分布式任务队列,如果有定时任务的需求,可以考虑使用它,用起来还是很简便的。 |
|