RabbitMQ Golang教程(二)任务队列什么是任务队列 ?
首先,需要编写发送端的程序。该程序会将任务安排到我们的工作队列中,因此将其命名为new_task.gopackage mainimport ( "fmt" "github.com/streadway/amqp" "log" "os" "strings")func main() { // 1. 尝试连接RabbitMQ,建立连接 // 该连接抽象了套接字连接,并为我们处理协议版本协商和认证等。 conn,err := amqp.Dial("amqp://admin:admin@xx.xxx.xxx.xxx:xxx/") if err != nil { fmt.Printf("connect to RabbitMQ failed, err:%v\n", err) return } defer conn.Close() // 2. 接下来,我们创建一个通道,大多数API都是用过该通道操作的。 ch,err := conn.Channel() if err != nil { fmt.Printf("open a channel failed, err:%v\n", err) return } defer ch.Close() // 3. 要发送,我们必须声明要发送到的队列。 q, err := ch.QueueDeclare( "task_queue", // name true, // 持久的 false, // delete when unused false, // 独有的 false, // no-wait nil, // arguments ) if err != nil { fmt.Printf("declare a queue failed, err:%v\n", err) return } // 4. 然后我们可以将消息发布到声明的队列 body := bodyFrom(os.Args) err = ch.Publish( "", // exchange q.Name, // routing key false, // 立即 false, // 强制 amqp.Publishing{ DeliveryMode: amqp.Persistent, // 持久 ContentType: "text/plain", Body: []byte(body), }) if err != nil { fmt.Printf("publish a message failed, err:%v\n", err) return } log.Printf(" [x] Sent %s", body)}// bodyFrom 从命令行获取将要发送的消息内容func bodyFrom(args []string) string { var s string if (len(args) < 2) || os.Args[1] == "" { s = "hello" } else { s = strings.Join(args[1:], " ") } return s} 然后,需要编写接收端程序,它需要为消息正文中出现的每个.伪造一秒钟的工作。它将从队列中弹出消息并执行任务,因此将其称为worker.gopackage mainimport ( "bytes" "fmt" "github.com/streadway/amqp" "log" "time")func main() { conn,err := amqp.Dial("amqp://admin:admin@49.234.192.212:5672/") if err != nil { fmt.Printf("connect to RabbitMQ failed, err:%v\n", err) return } defer conn.Close() ch,err := conn.Channel() if err != nil { fmt.Printf("open a channel failed, err:%v\n", err) return } defer ch.Close() // 声明一个queue q, err := ch.QueueDeclare( "task_queue", // name true, // 声明为持久队列 false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) err = ch.Qos( 1, // prefetch count 0, // prefetch size false, // global ) if err != nil { fmt.Printf("ch.Qos() failed, err:%v\n", err) return } // 立即返回一个Delivery的通道 msgs, err := ch.Consume( q.Name, // queue "", // consumer false, // 注意这里传false,关闭自动消息确认 false, // exclusive false, // no-local false, // no-wait nil, // args ) if err != nil { fmt.Printf("ch.Consume failed, err:%v\n", err) return } // 开启循环不断地消费消息 forever := make(chan bool) go func() { for d := range msgs { log.Printf("Received a message: %s", d.Body) dotCount := bytes.Count(d.Body, []byte(".")) t := time.Duration(dotCount) time.Sleep(t * time.Second) log.Printf("Done") d.Ack(false) // 手动传递消息确认 } }() log.Printf(" [*] Waiting for messages. To exit press CTRL+C") <-forever} 执行结果单个执行
循环调度使用任务队列的优点之一是能够轻松并行化工作。 首先,尝试同时运行两个worker.go脚本。它们都将从队列中获取消息。 需要打开三个控制台。其中两个将运行worker.go脚本。这些控制台将成为我们的两个消费者——C1和C2。
|
|