Menu

Scaling gRPC With Kubernetes (Using Go)

By Noam Yadgar
#grpc #k8s #kubernetes #go #software-engineering #system-design

The Tech

gRPC is a strong player in microservices-based systems. Leveraging Protocol Buffers for well-defined API contracts, fast serialization (about X5 faster than JSON), smaller payloads, and the use of streams (thanks to HTTP/2). It’s easy to see why this technology for real-time microservice communication is a good choice.

HTTP/2

Unlike a typical REST API that’s built on top of HTTP/1.1, gRPC is built on top of HTTP/2. The most noticeable feature of HTTP/2 is the ability to perform server push. This feature allows servers to asynchronously push data to the client before the client asks for it. gRPC leverages server push to support streams, a key feature that separates gRPC from any other HTTP/1.1-based API. Because of that, HTTP/2 requires a long-lasting TCP connection between the client and the server.

The Problem

One thing that makes REST APIs exceptionally good at scaling is the notion of being stateless. Every single request is essentially a new TCP session that can be routed to any available replica of the server. This works perfectly with Kubernetes service’s load-balancing.

Things are a bit different when it comes to gRPC because each client is keeping the TCP connection for its entire lifespan (if not configured otherwise), the load-balancer will try to symmetrically spread the load across the clients and servers.

flowchart LR
    c1(client-1):::pod ==> s[service/server]:::srv
    c2(client-2):::pod ==> s
    c3(client-3):::pod ==> s
    c4(client-4):::pod ==> s
    s ==> p1(server-1):::pod
    s ==> p2(server-2):::pod
    s ==> p3(server-3):::pod
linkStyle 0,3,4 stroke: orange;
linkStyle 1,5 stroke: blue;
linkStyle 6,2 stroke: red;
classDef srv fill: #a3e4d7, stroke:  #148f77 
classDef pod fill: #85c1e9, stroke: #2874a6

Figure 1: The first three clients are symmetrically routed to the server’s three pods. The 4th client starts the next cycle.

Idle Pods

In Figure 1, we have more clients than servers, so even if we may not fully optimize the resource utilization of the servers, at least all of them are kept busy. But what happens if we have fewer clients than servers? The answer is that some pods will stand idle without doing work but waste resources.

flowchart LR
    c1(client-1):::pod ==> s[service/server]:::srv
    s ==> p1(server-1):::pod
    s --> p2(server-2):::pod
    s --> p3(server-3):::pod
linkStyle 0,1 stroke: orange;
classDef srv fill: #a3e4d7, stroke:  #148f77 
classDef pod fill: #85c1e9, stroke: #2874a6

Figure 2: One client - Three servers. The client is making requests only to one server.

Autoscaling

Autoscaling the number of server replicas might be the solution. The truth is - It’s not. Imagine a scenario where you’ve set an autoscaling rule (with a tool like Keda ) that increases the number of replicas whenever a pod reaches 90% of its memory consumption.

Since there’s no link between the number of clients and the number of servers, we can face a scenario in which one client is causing the autoscaling rule to be triggered, increasing the number of servers by one; however, it doesn’t use the new replica.

It wouldn’t make sense to scale the number of servers based on the number of clients either. Scaling based on resource utilization is a good rule, but we want to ensure that when a pod is too busy, it will share the load with new replicas that the autoscaling tool is adding.

The Test

To illustrate the problem, I wrote simple gRPC apps in Go (a client and a server) based on this .proto:

syntax = "proto3";

option go_package = "internal/";

service Service {
  rpc Get(Req) returns (Res) {}
}

message Req {}

message Res {
  string id = 1;
}

The client sends 3000 messages via the Get method, and the server responds with a pre-generated UUID to reflect its unique identity. The returned value is then printed to stdout by the client. Here’s the client’s code:

 1package main
 2
 3import (
 4	"context"
 5	"example/grpc_lb/internal"
 6    "os"
 7	"fmt"
 8
 9	"google.golang.org/grpc"
10	"google.golang.org/grpc/credentials/insecure"
11)
12
13const n = 3000
14
15func main() {
16    target := os.Getenv("TARGET")
17	if target == "" {
18		panic("no target")
19	}
20	conn, err := grpc.NewClient(
21		target,
22        grpc.WithTransportCredentials(insecure.NewCredentials()),
23    )
24	if err != nil {
25		panic(err)
26	}
27	defer conn.Close()
28
29	c := internal.NewServiceClient(conn)
30
31	for i := 0; i < n; i++ {
32		res, _ := c.Get(context.Background(), &internal.Req{})
33		fmt.Println(res.GetId())
34	}
35}

Using minikube as my local Kubernetes cluster. I’ve deployed three servers, pointed from a service called server, and ran one client as a job:

kubectl get pods
NAME                      READY   STATUS      RESTARTS   AGE
client-xm42p              0/1     Completed   0          7m51s
server-6565dd7765-7g67b   1/1     Running     0          7m44s
server-6565dd7765-hl7ww   1/1     Running     0          7m44s
server-6565dd7765-l7hrp   1/1     Running     0          7m44s

kubectl get svc
NAME         TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)    AGE
server       ClusterIP   10.104.44.221   <none>        9090/TCP   7m40s

Let’s look at the logs:

kubectl logs job/client | uniq -c
 3000 7b4fcf65-2c93-4c5c-8433-5950c91553a9

As you can see, although we have three servers, the client sent all of its requests only to one of these servers. Maybe because our single client is making synchronous requests to the service. Let’s try to communicate concurrently:

15func main() {
16    target := os.Getenv("TARGET")
17	if target == "" {
18		panic("no target")
19	}
20	conn, err := grpc.NewClient(
21		target,
22		grpc.WithTransportCredentials(insecure.NewCredentials()),
23	)
24	if err != nil {
25		panic(err)
26	}
27	defer conn.Close()
28
29	c := internal.NewServiceClient(conn)
30
31	ch := make(chan string)
32	for i := 0; i < n; i++ {
33		go func() {
34			res, _ := c.Get(context.Background(), &internal.Req{})
35			ch <- res.GetId()
36		}()
37	}
38
39	for i := 0; i < n; i++ {
40		fmt.Println(<-ch)
41	}
42	close(ch)
43}
kubectl logs job/client | uniq -c
 3000 7b4fcf65-2c93-4c5c-8433-5950c91553a9

The same results. You’ll still face the same results even if you try to create the connection on every iteration. As mentioned above, the load-balancer is symmetrically spreading connections across pods, so a single client will always be connected to the same server.

The Solution

In this experiment, we’re trying to make our single client spread its messages (preferably evenly) across the different replicas of the server.

flowchart LR
    c1(client-1):::pod ==> s[service/server]:::srv
    s ==> p1(server-1):::pod
    s ==> p2(server-2):::pod
    s ==> p3(server-3):::pod
linkStyle 0,1,2,3 stroke: orange;
classDef srv fill: #a3e4d7, stroke:  #148f77 
classDef pod fill: #85c1e9, stroke: #2874a6

Figure 3: One client is sending requests to all server’s replicas.

Headless service

The trick is to not use Kubernetes’ built-in load balancer. Kubernetes supports headless services. Those services don’t have static IPs and, therefore, don’t have a single endpoint where they perform load-balancing. The purpose of headless services is to expose pod IPs deployed under the service via DNS, allowing other load-balancing/service-discovery implementations to replace the built-in one.

To turn a Kubernetes service into a headless service, we should simply add to our service definition:

apiVersion: v1
kind: Service
metadata:
  name: server
  namespace: default
  labels:
    app.kubernetes.io/name: server
spec:
  clusterIP: None
  ports:
    - name: grpc
      port: 9090
      targetPort: 9090
  selector:
    app.kubernetes.io/name: server

Client side load-balancing

Now that our service is headless, we can set the client’s load-balancing policy to make request in a round-robin fashion:

15func main() {
16	target := os.Getenv("TARGET")
17	if target == "" {
18		panic("no target")
19	}
20	conn, err := grpc.NewClient(
21		target,
22		grpc.WithTransportCredentials(insecure.NewCredentials()),
23		grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`),
24	)
25
26    //... rest of the code

If we run this version of the client against a headless service:

kubectl logs job/client | sort | uniq -c
 871 7b4fcf65-2c93-4c5c-8433-5950c91553a9
1258 afaeeea0-87c3-4473-b8ad-1c5b61407b57
 871 ca66dd65-c6df-44c4-bb3e-60c00f8776b5

Great! All 3000 messages were sent across all pods. The spread is not quite even. One pod received about 42% of messages, and the others about 29%. The reason is that our client sent all of its messages concurrently. If we send the messages synchronously:

k logs job/client | sort | uniq -c
 999 7b4fcf65-2c93-4c5c-8433-5950c91553a9
1000 afaeeea0-87c3-4473-b8ad-1c5b61407b57
1001 ca66dd65-c6df-44c4-bb3e-60c00f8776b5

The client has almost successfully cycled around all three servers with a nearly 100% even spread.

Discovery

We’re not done yet. Our client doesn’t have a mechanism for knowing about new pods during its runtime. When calling NewClient with the load-balancing settings, our client retrieves the list of available IPs and will never try to fetch it again by default.

To illustrate this point, I throttled the client by adding 50 milliseconds to each iteration, giving me 2.5 minutes to play with the number of replicas during the client’s runtime.

31	ch := make(chan string)
32	for i := 0; i < n; i++ {
33		go func() {
34			res, _ := c.Get(context.Background(), &internal.Req{})
35			ch <- res.GetId()
36		}()
37		time.Sleep(time.Millisecond * 50)
38	}
39    // ... rest of the code

I removed one replica during runtime and gradually added up to 5 replicas. Let’s see the results:

k logs job/client | sort | uniq -c
1
1112 afaeeea0-87c3-4473-b8ad-1c5b61407b57
1887 b48de8b8-6a88-472f-8043-f2e72d22175f

Interestingly, we’ve lost one pod forever after the first change (removing one replica). In fact, it (coincidentally) failed to make its first request, and our client’s load balancer discarded it for good. Notice that our client wasn’t aware that I’d added more replicas (up to 5) during its runtime.

To solve this, we need a DNS resolver that allows the client to periodically fetch new IPs. The google.golang.org/grpc module conveniently has a built-in DNS resolver. By using its resolver package, we can globally register a DNS resolver as follows:

17func init() {
18	resolver.Register(resolver.Get("dns"))
19}
20
21func main() {
22	target := os.Getenv("TARGET")
23	if target == "" {
24		panic("no target")
25	}
26	conn, err := grpc.NewClient(
27		fmt.Sprintf("dns:///%s", target),
28		grpc.WithTransportCredentials(insecure.NewCredentials()),
29		grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`),
30	)
31    // ... rest of the code

On this attempt, I started with three replicas, then dropped one, added one, and finally dropped one. Let’s look at the results:

k logs job/client | sort | uniq -c
 512 74025783-3545-4d2c-b1c9-1b20b5042b31
1169 b48de8b8-6a88-472f-8043-f2e72d22175f
1169 ca66dd65-c6df-44c4-bb3e-60c00f8776b5
 150 f17c9530-1970-4b53-a86b-5b66ce0e575e

Not a single miss. The last one, with the 150 messages, is probably the first pod dropped since I dropped it closer to the start of the client’s job. Then, I’ve added one, perhaps the pod with the 512 messages. After I removed one replica again, I was left with two pods, running until the job was finished, explaining the two pods with identical 1169 messages.

By understanding the benefits of headless services in Kubernetes and combining client-side load-balancing with a DNS resolver, we’ve managed to make a gRPC client in Go that knows how to spread its messages in a round-robin manner while being aware of new available servers during runtime.