Let's write a soak tester!

May 29, 2017   #go  #concurrency 

What is soak testing?

I recently wrote some soak tests and I was surprised by how much fun it was. Soak testing works by sending a large number of concurrent requests to your system and seeing if it behaves correctly. The purpose is not to break the system due to the high load, but rather to discover bugs. Soak tests could run for hours or even days.

This can make it possible to exercise a lot of random scenarios and find concurrency bugs or memory leaks that would otherwise be undetected or take a long time to surface on a running system.

Here’s a web service

Here’s a very basic web service we’ll use as an example. You’ve created a new website which is aspiring to be the new instagram for cats.

Users can create a new profile for their cat by sending a POST /catname/ request. This creates a new directory with that name on the server.

They can then upload pictures with a POST /catname/picturename request and a body that contains the image. The picture will be stored at the specified directory and will be retrievable with GET /catname/picturename.

 1 package main
 2 
 3 import (
 4 	"flag"
 5 	"io"
 6 	"log"
 7 	"net/http"
 8 	"os"
 9 	"path"
10 
11 	"github.com/gorilla/mux"
12 )
13 
14 var uploadDir string
15 
16 func createProfile(w http.ResponseWriter, r *http.Request) {
17 	vars := mux.Vars(r)
18 	directory := vars["dir"]
19 	os.RemoveAll(path.Join(uploadDir, directory))
20 	if err := os.Mkdir(path.Join(uploadDir, directory), os.FileMode(0755)); err != nil {
21 		log.Println(err)
22 		w.WriteHeader(http.StatusInternalServerError)
23 		return
24 	}
25 }
26 func uploadPicture(w http.ResponseWriter, r *http.Request) {
27 	vars := mux.Vars(r)
28 	directory := vars["dir"]
29 	filename := vars["picture"]
30 	if _, err := os.Stat(path.Join(uploadDir, directory)); os.IsNotExist(err) {
31 		log.Println(err)
32 		w.WriteHeader(http.StatusBadRequest)
33 		return
34 	}
35 	out, err := os.Create(path.Join(uploadDir, directory, filename))
36 	if err != nil {
37 		log.Println(err)
38 		w.WriteHeader(http.StatusInternalServerError)
39 		return
40 	}
41 	defer out.Close()
42 	if _, err := io.Copy(out, r.Body); err != nil {
43 		log.Println(err)
44 		w.WriteHeader(http.StatusInternalServerError)
45 		return
46 	}
47 }
48 
49 func main() {
50 	flag.StringVar(&uploadDir, "uploadDir", "/tmp/uploads/", "Directory to store uploaded files in")
51 	if err := os.MkdirAll(uploadDir, 0755); err != nil {
52 		log.Fatal(err)
53 	}
54 	r := mux.NewRouter()
55 	r.HandleFunc("/{dir:[[:alnum:]_-]+}/", createProfile).Methods("POST")
56 	r.HandleFunc("/{dir:[[:alnum:]_-]+}/{picture:[[:alnum:]_-]+}", uploadPicture).Methods("POST")
57 	r.PathPrefix("/").Handler(http.FileServer(http.Dir(uploadDir)))
58 
59 	log.Fatal(http.ListenAndServe(":8000", r))
60 
61 }

You even tested it and it works!

$ go run server/main.go
$ curl -X POST http://localhost:8000/MissKitty/
$ curl -X POST http://localhost:8000/MissKitty/hai --data-binary @cat.jpg   

Let’s break it

Ok now we’ll write a soak test. The soaker takes as input the number of tests we want to run, and the number of goroutines that we want to run concurrently. In lines 34-50 we create a channel and add all the test scenarios to it. Then we create concurrency number of goroutines, which repeatedly read the test scenarios from the channel and run them, until it’s empty.

 1 package main
 2 
 3 import (
 4 	"flag"
 5 	"fmt"
 6 	"log"
 7 	"net/http"
 8 	"os"
 9 	"path"
10 	"sync"
11 )
12 
13 var numTests, concurrency int
14 var server string
15 
16 func main() {
17 	flag.IntVar(&numTests, "n", 100000, "Number of tests")
18 	flag.IntVar(&concurrency, "c", 100, "Number of concurrent tests")
19 	flag.StringVar(&server, "server", "http://localhost:8000", "Server address")
20 	flag.Parse()
21 
22 	serverDef := CatServerDefinition{baseUrl: server, resourcesDir: "test_resources"}
23 	Soak(serverDef, numTests, concurrency)
24 }
25 
26 func Soak(def EndpointDefinition, numTests int, concurrency int) {
27 	client := &http.Client{
28 		Transport: &http.Transport{
29 			MaxIdleConns:        100,
30 			MaxIdleConnsPerHost: 100,
31 		},
32 	}
33 
34 	var wg sync.WaitGroup
35 	wg.Add(concurrency)
36 	queue := make(chan TestScenario, concurrency)
37 
38 	for i := 0; i < concurrency; i++ {
39 		go func() {
40 			defer wg.Done()
41 			for scenario := range queue {
42 				scenario.Run(client)
43 			}
44 		}()
45 	}
46 	for i := 0; i < numTests; i++ {
47 		queue <- def.NextTest()
48 	}
49 	close(queue)
50 	wg.Wait()
51 }
52 
53 type EndpointDefinition interface {
54 	NextTest() TestScenario
55 }
56 type TestScenario interface {
57 	Run(client *http.Client)
58 }
59 
60 type CatServerDefinition struct {
61 	baseUrl      string
62 	resourcesDir string
63 }
64 
65 func (csd CatServerDefinition) NextTest() TestScenario {
66 	return CreateProfileScenario{CatServerDefinition: csd, profileName: "MissKitty", pictureName: "hai"}
67 }
68 
69 type CreateProfileScenario struct {
70 	CatServerDefinition
71 	profileName string
72 	pictureName string
73 }
74 
75 func (cps CreateProfileScenario) Run(client *http.Client) {
76 	url := fmt.Sprintf("%s/%s/", cps.baseUrl, cps.profileName)
77 	req, err := http.NewRequest("POST", url, nil)
78 	resp, err := client.Do(req)
79 	if err != nil {
80 		log.Println(err)
81 		return
82 	}
83 	defer resp.Body.Close()
84 	if resp.StatusCode != 200 {
85 		log.Println("POST ", url, "Expected response status 200, got ", resp.StatusCode)
86 		return
87 	}
88 }

In this example we have a single TestScenario and all it does is create a cat profile by sending a POST request to /MissKitty/ and checking it got the right status code back.

Here’s the result of running 1 test with 1 goroutine:

$ go run soaker/main.go  -n 1 -c 1

So far so good. Let’s run 10 tests with 10 goroutines:

$ go run soaker/main.go  -n 10 -c 10
POST  http://localhost:8000/MissKitty/ Expected response status 200, got  500
POST  http://localhost:8000/MissKitty/ Expected response status 200, got  500
POST  http://localhost:8000/MissKitty/ Expected response status 200, got  500
POST  http://localhost:8000/MissKitty/ Expected response status 200, got  500
POST  http://localhost:8000/MissKitty/ Expected response status 200, got  500
POST  http://localhost:8000/MissKitty/ Expected response status 200, got  500
POST  http://localhost:8000/MissKitty/ Expected response status 200, got  500

Oh no!

Let’s fix it

You might have noticed already, but there is a not-so-subtle race condition. Multiple goroutines run the same piece of code that deletes and creates the same directory, at the same time. This causes the os.Mkdir call on line 20 to fail.

We want each goroutine to use a lock before deleting the directory and unlock after creating it. But we don’t want to lock for more than is necessary. Goroutines that create directories at different paths don’t interfere with each other, so we want a lock per directory name. We create a lock struct which has a map with a lock per directory.

But there’s one more thing to worry about. Go’s maps are not thread-safe. So we need one more Mutex in the struct to synchronise access to the map itself.

var locks = lock{locks: map[string]*sync.Mutex{}}

type lock struct {
	sync.Mutex
	locks map[string]*sync.Mutex
}

func (l *lock) getLock(name string) *sync.Mutex {
	l.Lock()
	defer l.Unlock()
	_, ok := l.locks[name]
	if !ok {
		l.locks[name] = &sync.Mutex{}
	}
	return l.locks[name]
}

func createProfile(w http.ResponseWriter, r *http.Request) {
	vars := mux.Vars(r)
	directory := vars["dir"]
	mutex := locks.getLock(directory)
	mutex.Lock()
	defer mutex.Unlock()
	os.RemoveAll(path.Join(uploadDir, directory))
	if err := os.Mkdir(path.Join(uploadDir, directory), os.FileMode(0755)); err != nil {
		log.Println(err)
		w.WriteHeader(http.StatusInternalServerError)
		return
	}
}

And now running the test with many goroutines succeeds:

$ go run soaker/main.go  -n 1000 -c 100

Note that in the example above the getLock method needs a pointer receiver. Changing (l *lock) to (l lock) will pass the lock struct by value, which means there will be a brand new sync.Mutex each time it’s called and everything will go wrong because access to the map won’t be synchronised. We can witness this by running the server with the -race flag:

$ go run -race server/main.go

Then when running the test we get messages like that:

    ==================
WARNING: DATA RACE
Read at 0x00c42007cc60 by goroutine 8:
  runtime.mapaccess2_faststr()
      /usr/local/go/src/runtime/hashmap_fast.go:317 +0x0
  main.lock.getLock()
      /Users/irene/projects/soaker/server/main.go:27 +0xfd
  main.createProfile()
      /Users/irene/projects/soaker/server/main.go:37 +0xfc
  net/http.HandlerFunc.ServeHTTP()
      /usr/local/go/src/net/http/server.go:1942 +0x51
  github.com/gorilla/mux.(*Router).ServeHTTP()
      /Users/irene/go/src/github.com/gorilla/mux/mux.go:114 +0x187
  net/http.serverHandler.ServeHTTP()
      /usr/local/go/src/net/http/server.go:2568 +0xbc
  net/http.(*conn).serve()
      /usr/local/go/src/net/http/server.go:1825 +0x71a

Previous write at 0x00c42007cc60 by goroutine 6:
  runtime.mapassign()
      /usr/local/go/src/runtime/hashmap.go:485 +0x0
  main.lock.getLock()
      /Users/irene/projects/soaker/server/main.go:29 +0x1de
  main.createProfile()
      /Users/irene/projects/soaker/server/main.go:37 +0xfc
  net/http.HandlerFunc.ServeHTTP()
      /usr/local/go/src/net/http/server.go:1942 +0x51
  github.com/gorilla/mux.(*Router).ServeHTTP()
      /Users/irene/go/src/github.com/gorilla/mux/mux.go:114 +0x187
  net/http.serverHandler.ServeHTTP()
      /usr/local/go/src/net/http/server.go:2568 +0xbc
  net/http.(*conn).serve()
      /usr/local/go/src/net/http/server.go:1825 +0x71a

It’s still broken

Let’s extend that test scenario to do more things. After creating a cat profile we’ll upload a picture.

func (cps CreateProfileScenario) Run(client *http.Client) {
	url := fmt.Sprintf("%s/%s/", cps.baseUrl, cps.profileName)
	req, err := http.NewRequest("POST", url, nil)
	resp, err := client.Do(req)
	if err != nil {
		log.Println(err)
		return
	}
	defer resp.Body.Close()
	if resp.StatusCode != 200 {
		log.Println("POST ", url, "Expected response status 200, got ", resp.StatusCode)
		return
	}

	file, err := os.Open(path.Join(cps.resourcesDir, "cat.jpg"))
	if err != nil {
		log.Println(err)
		return
	}
	defer file.Close()

	url = fmt.Sprintf("%s/%s/%s", cps.baseUrl, cps.profileName, cps.pictureName)
	req, err = http.NewRequest("POST", url, nil)
	resp, err = client.Do(req)
	if err != nil {
		log.Println(err)
		return
	}

	defer resp.Body.Close()
	if resp.StatusCode != 200 {
		log.Println("POST ", url, "Expected response status 200, got ", resp.StatusCode)
		return
	}
}

And running the test results in:

$ go run soaker/main.go  -n 1000 -c 100
POST  http://localhost:8000/MissKitty/ Expected response status 200, got  500
POST  http://localhost:8000/MissKitty/hai Expected response status 200, got  500
POST  http://localhost:8000/MissKitty/hai Expected response status 200, got  400
POST  http://localhost:8000/MissKitty/hai Expected response status 200, got  400

Let’s fix it

The upoadPicture endpoint also relies on the existance of the same directory. It checks that it exists, and then proceeds to create a file underneath it. Of course in the meantime the directory might have ceased to exist because createProfile is deleting it.

We’ll use the same fix by adding these lines to the uploadPicture method, before doing anything related to the files.

mutex := locks.getLock(directory)
mutex.Lock()
defer mutex.Unlock()

Everything is restored again:

$ go run soaker/main.go  -n 10000  -c 100

Conclusion

The soak test pointed us to two concurrency bugs within seconds of running, which we might otherwise have not thought about. These bugs would not be caught by a unit test and wouldn’t happen under a low server load. They were also pretty fun to reason about and fix!

The full example is here.