分享

Go 简单可靠高效的分布式任务队列 asynq

 F2967527 2023-03-31 发布于北京

asynq 概述asynq 是由 Go 语言编写的简单、可靠、高效的分布式任务队列。

Go中简单、可靠、高效的分布式任务队列https://github.com/hibiken/asynqasynq 工作原理概述:

客户端将任务放入队列

服务器从队列中拉取任务并为每个任务启动一个工作协程

任务由多个 worker 同时处理任务队列用作跨多态机器分配工作的机制。一个系统 key 由多个工作服务器和代理组成,让位于高可用性和水平扩展。

图片

特征

保证至少执行一次任务

任务调度

失败任务的重试

worker 崩溃时自动恢复任务

加权优先级队列

严格的优先队列

添加任务的低延迟,因为 Redis 中的写入速度很快

使用唯一选项对任务进行重复数据删除

允许每个任务超时和截止日期

允许聚合任务组以批处理多个连续操作

支持中间件的灵活处理程序接口

能够暂停队列以停止处理队列中的任务

定期任务

支持Redis Cluster 实现自动分片和高可用

支持Redis Sentinels 以实现高可用性

与 Prometheus 集成以收集和可视化队列指标

用于检查和远程控制队列和任务的 Web UI

CLI 检查和远程控制队列和任务

asynq 的基本使用下面我将基于 asynq v0.24.0 版本,实现 asynq 的简单使用。

客户端:注册立即执行的任务 和 定时任务

服务端:处理注册的任务,完成业务逻辑等

代码目录结构如下:

$ tree.├── client                // 客户端服务│   ├── client.go         // main函数,调用asynq完成注册│   └── tasks│       └── tasks.go      // 客户端创建任务└── server                // 服务端服务    ├── server.go         // main函数,调用asynq监听任务并处理 └── tasks        └── tasks.go      // 服务端处理任务

客户端创建一个立即发送邮件的任务 和 定时生成报表的任务,分别使用asynq 的 NewClient 和 NewScheduler。

// test/asynq/client/tasks/tasks.go

package tasksimport ( 'encoding/json' 'fmt' 'github.com/hibiken/asynq')

const ( TypeEmailDelivery = 'asynq:email:delivery' // 立即发送邮件任务 TypeGenerateDataReport = 'asynq:generate:data:report' // 定时生成报表任务)

// EmailDeliveryPayload 立即发送邮件负载type EmailDeliveryPayload struct { UserId string TemplateId string}

// NewEmailDeliveryTask 创建一个立即发送邮件任务,为指定用户发送func NewEmailDeliveryTask(data EmailDeliveryPayload) (*asynq.Task, error) { payload, err := json.Marshal(data) if err != nil { return nil, err }

// asynq.NewTask // 传入任务名称、序列化的负载信息以及opts配置项 return asynq.NewTask(TypeEmailDelivery, payload), nil}

// NewGenerateDataReportTask 创建一个定时生成报表任务,处理所有用户func NewGenerateDataReportTask() (*asynq.Task, error) { return asynq.NewTask(TypeGenerateDataReport, nil), nil}

// RegisterEmailDeliveryTask 立即发送邮件任务func RegisterEmailDeliveryTask(client *asynq.Client, data EmailDeliveryPayload) error { task, err := NewEmailDeliveryTask(data) if err != nil { return err }

// Enqueue task to be processed immediately. taskInfo, err := client.Enqueue(task, asynq.MaxRetry(1)) if err != nil { return err } fmt.Printf('enqueued task id: %s,queue: %s\n', taskInfo.ID, taskInfo.Queue)

return nil}

// RegisterGenerateDataReportTask 注册生成报表任务func RegisterGenerateDataReportTask(scheduler *asynq.Scheduler, cronspec string) error { task, err := NewGenerateDataReportTask() if err != nil { return err }

entryId, err := scheduler.Register(cronspec, task, asynq.MaxRetry(3)) if err != nil { return err } fmt.Printf('register entryId: %s\n', entryId)

return nil}

// test/asynq/client/client.go

package main

import ( 'fmt' 'github.com/hibiken/asynq' 'test/asynq/client/tasks')

func main() { redisOpt := asynq.RedisClientOpt{Addr: '127.0.0.1:6379'} // redis 集群方式 //redisOpt := asynq.RedisClusterClientOpt{ // Addrs: []string{'127.0.0.1:7001', '127.0.0.1:7002', '127.0.0.1:7003'}, //}

// 1. 注册立即执行的任务 client := asynq.NewClient(redisOpt) err := tasks.RegisterEmailDeliveryTask(client, tasks.EmailDeliveryPayload{ UserId: 'user_id', TemplateId: 'template_id', }) if err != nil { return }  defer client.Close()

// 2. 注册定时任务 scheduler := asynq.NewScheduler(redisOpt, &asynq.SchedulerOpts{ LogLevel: asynq.InfoLevel, PostEnqueueFunc: handlePostEnqueue, })

cronspec := '* * * * *' //'1 2 * * *' // 每天 02:01:00 执行定时任务,最大重试次数3次. err = tasks.RegisterGenerateDataReportTask(scheduler, cronspec) if err != nil { return }

err = scheduler.Run() // 这里需要调用run方法,别忘记了 if err != nil { fmt.Printf('scheduler run error: %v', err) return } defer scheduler.Shutdown()}

func handlePostEnqueue(taskInfo *asynq.TaskInfo, err error) { fmt.Printf('task id: %s, queue: %s, err: %+v\n', taskInfo.ID, taskInfo.Queue, err)}服务端基于客户端注册的任务,服务端启动 handler 来处理这些任务。

// test/asynq/server/tasks/tasks.gopackage tasks

import ( 'context' 'encoding/json' 'fmt' 'github.com/hibiken/asynq' 'time')

const ( TypeEmailDelivery = 'asynq:email:delivery' // 立即发送邮件任务 TypeGenerateDataReport = 'asynq:generate:data:report' // 定时生成报表任务)

// EmailDeliveryPayload 立即发送邮件负载type EmailDeliveryPayload struct { UserId string TemplateId string}

// HandleEmailDeliveryTask 处理发送邮件任务func HandleEmailDeliveryTask(ctx context.Context, t *asynq.Task) error { var data EmailDeliveryPayload err := json.Unmarshal(t.Payload(), &data) if err != nil { return err }

// do something fmt.Printf('email delivery data: %+v\n', data) return nil}

// HandleGenerateDataReportTask 处理生成报表任务func HandleGenerateDataReportTask(ctx context.Context, t *asynq.Task) error { // do something fmt.Printf('generate data report, time: %s\n', time.Now().Format(time.RFC3339))  time.Sleep(time.Second*30) return nil}

// test/asynq/server/server.gopackage main

import ( 'context' 'fmt' 'github.com/hibiken/asynq' 'test/asynq/server/tasks')

func main() { redisOpt := asynq.RedisClientOpt{Addr: '127.0.0.1:6379'} // redis 集群方式 //redisOpt := asynq.RedisClusterClientOpt{ // Addrs: []string{'127.0.0.1:7001', '127.0.0.1:7002', '127.0.0.1:7003'}, //} config := asynq.Config{ // Specify how many concurrent workers to use Concurrency: 10, // Optionally specify multiple queues with different priority. Queues: map[string]int{ 'critical': 6, 'default': 3, 'low': 1, }, ErrorHandler: asynq.ErrorHandlerFunc(errHandler), }

// mux maps a type to a handler mux := asynq.NewServeMux() RegisterTaskHandlers(mux)

server := asynq.NewServer(redisOpt, config) err := server.Run(mux) if err != nil { fmt.Printf('async run error: %v', err) return }}

func errHandler(ctx context.Context, task *asynq.Task, err error) { fmt.Printf('task type: %s, payload: %s, err: %+v\n', task.Type(), string(task.Payload()), err)}

// RegisterTaskHandlers 注册任务func RegisterTaskHandlers(mux *asynq.ServeMux) { mux.HandleFunc(tasks.TypeEmailDelivery, tasks.HandleEmailDeliveryTask) mux.HandleFunc(tasks.TypeGenerateDataReport, tasks.HandleGenerateDataReportTask)}分别启动客户端和服务端,运行结果如下:

图片

图片

命令行工具asynq 提供了命令行工具,具体安装如下:

go install github.com/hibiken/asynq/tools/asynq@latest命令如下:

$ asynq dash --helpDisplay interactive dashboard.

USAGE asynq dash [flags]

FLAGS --help Help for dash --refresh Interval between data refresh (default: 8s, min allowed: 1s)

INHERITED FLAGS --cluster Connect to redis cluster --cluster_addrs List of comma-separated redis server addresses --config Config file to set flag defaut values (default is $HOME/.asynq.yaml) --db Redis database number (default is 0) --password Password to use when connecting to redis server --tls_server Server name for TLS validation --uri Redis server URI

EXAMPLES $ asynq dash $ asynq dash --refresh=3s

LEARN MORE Use 'asynq--help' for more information about a command.

图片

图片

Web UIasynq 也提供了Web UI,支持Docker部署和继承到代码中,具体教程可见:

Web UI for monitoring & admininstering Asynq task queuehttps://github.com/hibiken/asynqmon下面使用客户端嵌入 asynqmon 示例:

// test/asynq/client/client.gopackage main

import ( 'github.com/hibiken/asynq' 'github.com/hibiken/asynqmon' 'net/http')

func main() { redisOpt := asynq.RedisClientOpt{Addr: '127.0.0.1:6379'}  startDashboard(redisOpt)}

func startDashboard(r asynq.RedisConnOpt) { go func() { h := asynqmon.New(asynqmon.Options{ RootPath: '/dashboard', // RootPath specifies the root for asynqmon app RedisConnOpt: r, })

// Note: We need the tailing slash when using net/http.ServeMux. http.Handle(h.RootPath()+'/', h) http.ListenAndServe(':8090', nil) }()}

图片

图片

图片

总结

asynq 作为一个分布式任务队列,如果有定时任务的需求,可以考虑使用它,用起来还是很简便的。

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多