Golang Simple Job Queue With Redis Streams

keith wachira
keith wachira
Software Engineer at Ndunyu.
May 4, 2021 11 min read

Some time your may have a need to send emails,notifications or text messages to your users in the background. In a language like python this could be done using external libraries such as Celery. This is an issue I was facing recently, Like many developers I thought I needed Kafka or RabbitMq to solve this problem.I did not want to go through the headache of installing and maintaining Rabbitmq or Kafka.That’s when I started looking for simpler alternatives and found redis streams and realized they could easily solve my problem without much effort.

In this tutorial I will show you how I ended up solving my issue with a simple go task queue.

Creating Simple golang task queue To process our events

First I had to create a queue that would consume my events in a controlled manner.Which is a very easy task to do in go since we have channels and goroutines.

Let’s write the code first, and I will explain what is happening at each step:

You can also use this library I have written from github go-taskq

  0
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
import (
"log"
"sync"
)
// JobCallBack work on the queued item
/*type JobCallBack interface {
	Process(interface{})
}*/
type JobCallBack func(job interface{})
type Queue struct {
	//Workers Number of goroutines(workers,consumers) to be used to process the jobs
	Workers int
	//Capacity is the number of  items that will be held in the JobQueue channel (its capacity) at a time before blocking
	//i.e capacity of our JobQueue channel
	Capacity int
	//JobQueue a buffered channel of capacity [Queue.Capacity] that will temporary hold
	//jobs before they are assigned to a worker
	JobQueue chan interface{}
	//Wg will be used to make sure the program does not terminate before all of our goroutines
	//complete the job assigned to them
	Wg *sync.WaitGroup
	//QuitChan will be used to stop all goroutines [Queue.Workers]
	QuitChan chan struct{}
	//JobCallBack is the function to be called when a job (event) is received
	//it should implement JobCallBack i.e a function or method with only one parameter and no return value
	JobCallBack JobCallBack
}
// NewQueue create a new job Queue
//and assign all required parameters
func NewQueue(workers int, capacity int, jobCallBack JobCallBack) Queue {
	var wg sync.WaitGroup
	jobQueue := make(chan interface{}, capacity)
	quit := make(chan struct{})
	return Queue{
		Workers:     workers,
		JobQueue:    jobQueue,
		JobCallBack: jobCallBack,
		Wg:          &wg,
		QuitChan:    quit,
	}
}
// Stop close all the running goroutines
// and stops processing any more jobs
func (q *Queue) Stop() {
	q.QuitChan <- struct{}{}
}
//EnqueueJobNonBlocking  use this to queue the jobs you need to execute
//in an unblocking way...(i.e if the [Queue.JobQueue] is full it will not block)
//Returns false if the buffer is full
//else if it is accepted the job it returns true
//use case imagine you are receiving events and you want to prevent anymore
//events from being submitted if the buffered channel
//is full,you can return an error to the user if this function returns false...
//although a better approach would be to store it in redis if is it is rejected for
//later processing once the [JobQueue] has available space to prevent loss of events
//Note  if you are using a for loop to consume the jobs, it's better  to use [Queue.EnqueueJobBlocking ]
//to prevent you from having a Busy wait(continuous pooling to check if a space is available to queue the job)
//which might utilize 90% of your cpu or more.
func (q *Queue) EnqueueJobNonBlocking(job interface{}) bool {
	select {
	case q.JobQueue <- job:
		return true
	default:
		return false
	}
}
// EnqueueJobBlocking queues jobs and blocks if [JobQueue] is full
//once the [JobQueue] is no longer full the job will be accepted
//this is better  for your cpu utilization  unlike [Queue.EnqueueJobNonBlocking]
//when consuming a job via a for loop.
//since it stop looping and blocks if you have no space in the Queue
//it will only continue looping when you have more space in the  Queue
func (q *Queue) EnqueueJobBlocking(job interface{}) {
	q.JobQueue <- job
}
// StartWorkers start  goroutines  and add them to wait group
//the goroutines added(started) will be determined by the number of [Queue.Workers] you specified for this queue
//for example if you specified 10 Workers 10 goroutines will be used to process the job
//they can now start picking jobs and processing them
func (q *Queue) StartWorkers() {
	for i := 0; i < q.Workers; i++ {
		//add the goroutine  to a wait group to prevent the program from exiting
		//be a goroutine return
		q.Wg.Add(1)
		go q.worker()
	}
	q.Wg.Wait()
}
func (q *Queue) worker() {
	defer q.Wg.Done()
	for {
		select {
		//terminate the goroutine
		case <-q.QuitChan:
			log.Println("closing the  workers")
			return
		case job := <-q.JobQueue:
			//a job has been received  call this function
			q.JobCallBack(job)
		}
	}
}

Key Points from the job queue above

  1. You can pass a function that has one parameter, or a method (of a struct) that has one parameter as JobCallBack.Passing method of a struct can help you pass dependencies such as redis or database or your logger.
  2. To add items to our channel (queue)we can either use EnqueueJobBlocking or EnqueueJobNonBlocking the difference between the two is that EnqueueJobBlocking if our queue is full it blocks and wait for the queue to have a space before unblocking,on the other hand EnqueueJobNonBlocking will not block instead it will return false.You can use this behaviour for example when you want to return an error to users when your job queue is full.
  3. If you are consuming your events in a loop it is better to use EnqueueJobBlocking to reduce your cpu utilization as you might encounter a busy wait if your consumer process your jobs slowly.
  4. You should always call the StartWorkers() method in a new go routines to prevent blocking you main goroutine i.e (go q.StartWorkers()).

Simple usage of our queue

Before using our Job queues with redis, let’s see how we can use it to accomplish a small task of printing 0 to 9 :

 0
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
func main() () {
	///done := make(chan bool)
	///create a new task Queue that has 3 goroutines to process numbers
	//and its queue can hold 4 items
	q := taskq.NewQueue(3, 4, func(number interface{}) {
		log.Println(number)
	})
	go q.StartWorkers()
   //we can now start sending number we want processed to our queue
   //for our workers to pick up
	for i := 0; i < 10; i++ {
		q.EnqueueJobBlocking(i)
	}
	//sleep for 10 second to wait for printing to be done
	//you should replace this with a channel that blocks until the goroutines complete all jobs
	time.Sleep(time.Second * 10)
	///<-done

}

Yah creating a job queue is that simple.

NB:Before reading this section make sure you understand the basics of redis stream You can learn more about redis streams here.

Using redis stream streams with our job queue to send emails to our users.

Imagine you wanted to send an email to a user whenever they complete a transaction or whenever they complete an order. We can use redis to store the messages we want to send to the user and create a queue that will consume those messages. This will make sure we do not lose any message and if for some reason our consumer crash we can pick up from where they left without losing data. Redis has many data structures, but the one that fits to our use case the best is streams.

You can learn more about redis streams here.

For today, we will focus on two streams commands

  1. XADD -this is used to write data to our stream .Stream data is stored in the format {“email”:“[email protected]”,“message”:“you completed an order”}i.e( like a golang map or redis hash)

  2. XREAD -which reads data from one or multiple streams

We will also be using the golang redis client go-redis which can be downloaded from here.

Let’s write the code, and I will explain it at each step:

  0
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import (
	"context"
	"fmt"
	"log"
	"net/http"
	"github.com/go-redis/redis/v8"
	"github.com/keithwachira/go-taskq"
)
//this is the redis key we want to use for our stream
var streamName = "send_order_emails"
func main() {
   //we start by creating a new go-redis client
   //that we will use to access our redis instance
	rdb := redis.NewClient(&redis.Options{
		Addr:     "localhost:6379",
		Password: "",
		DB:       0, // use default DB
	})
	//start processing any received job in redis
    //you can see the definition of this function below
	go StartProcessingEmails(rdb)
    //to mimic user behaviour we will create a single end point to send email requests.
	handler := http.NewServeMux()
	///we create a new router to expose our api
	//to our users
	s := Server{Redis: rdb}
	handler.HandleFunc("/api/order", s.NewOrderReceivedFromClient)
	//Every time a  request is sent to the endpoint ("/api/order")
	// will mock sending an email
	err := http.ListenAndServe("0.0.0.0:8080", handler)
	if err != nil {
		log.Fatal(err)
	}
}

type Server struct {
	Redis *redis.Client
}
// NewOrderReceivedFromClient this is mocking an endpoint that users use to place an order
//once we receive an order here we should register it to redis
//then the workers will pick it up and send an email to the user
func (S *Server) NewOrderReceivedFromClient(w http.ResponseWriter, r *http.Request) {
	data := map[string]interface{}{"email": "[email protected]", "message": "We have received you order and we are working on it."}
	//we have received  an order here send it to
	//redis has a function called xadd that we will use to add this to our stream
     //you can read more about it on the link shared above.
	err := S.Redis.XAdd(context.Background(), &redis.XAddArgs{
		///this is the name we want to give to our stream
		///in our case we called it send_order_emails
		//note you can have as many stream as possible
		//such as one for email...another for notifications
		Stream:       streamName,
		MaxLen:       0,
		MaxLenApprox: 0,
		ID:           "",
		//values is the data you want to send to the stream
		//in our case we send a map with email and message keys
		Values: data,
	}).Err()
	if err != nil {
		http.Error(w, "something went wrong", http.StatusInternalServerError)
		return
	}
	fmt.Fprintf(w, `We received you order`)
}
// RedisStreamsProcessing 
//you can also pass a database here too
//if you need it to process you work
type RedisStreamsProcessing struct {
	Redis *redis.Client
	//other dependencies e.g. logger database goes here
}
// Process this method implements JobCallBack
///it will read and process each email and send it to our users
//the logic to send the emails goes here
func (r *RedisStreamsProcessing) Process(job interface{}) {
     //the go redis client returns the redis stream data as type [redis.XMessage]
	if data, ok := job.(redis.XMessage); ok {
		email := data.Values["email"].(string)
		message := data.Values["message"].(string)
		fmt.Printf("I am sending an email to the email  %vwith message:%v   \n ", email, message)
		//here we can decide to delete each entry when it is processed
		//in that case you can use the redis xdel command i.e:
		///r.Redis.XDel(context.Background(),streamName,data.ID).Err()
	} else {
		log.Println("wrong type of data sent")
	}
}
func StartProcessingEmails(rdb *redis.Client) {
	//create a new consumer instance to process the job
    //and pass it to our task queue
	redisStreams := RedisStreamsProcessing{
		Redis: rdb,
	}
	//in this case we have started 5 goroutines so at any moment we will
	//be sending a maximum of 5 emails.
	//you can adjust these parameters to increase or reduce
	q := taskq.NewQueue(5, 10, redisStreams.Process)
	//call startWorkers  in a different goroutine otherwise it will block
	go q.StartWorkers()
	//with our workers running now we can start listening to new events from redis stream
	//we start from id 0 i.e. the first item in the stream
	id := "0"
	for {
		var ctx = context.Background()
		data, err := rdb.XRead(ctx, &redis.XReadArgs{
			Streams: []string{streamName, id},
			//count is number of entries we want to read from redis
			Count: 4,
			//we use the block command to make sure if no entry is found we wait 
			//until an entry is found
			Block: 0,
		}).Result()
		if err != nil {
			log.Println(err)
			log.Fatal(err)
		}
		///we have received the data we should loop it and queue the messages
        //so that our jobs can start processing
		for _, result := range data {
			for _, message := range result.Messages {
				///we use EnqueueJobBlocking to send out jobs to the workers
				q.EnqueueJobBlocking(message)
				//here we set a new start id because we don't want to process old emails
				//so we have set the id to the last id we saw
				id = message.ID
			}
		}
	}
}

You try sending a request (localhost:8080//api/order) to try and see if this implementation work.

This has a lot of application,you can also use it to send weekly news later or to just anything you want.

If you want to use this as a library you can find the taskq library on github