PolarSPARC



Bhaskar S 12/04/2020


Overview

In Part 1 of this series, we provided a high-level overview of gRPC, installed the necessary software, setup the environment, and finally demonstrated the Unary RPC communication in both Go and Java.

In this part, we will continue the journey to the next RPC communication pattern - Server Streaming.

Server Streaming RPC

The following diagram illustrates the high-level architecture of Server Streaming communication pattern:

Server Streaming Architecture
Figure-6

In the Server Streaming RPC mode, the client sends a request to the server and the server responds with sequence (or stream) of messages back to the client.

For the Server Streaming RPC demonstration, we will implement a fictitious Currency Rate service, where the client sends the 'from' currency and the 'to' currency as the request and the server responds with a stream of 'rates' from different 'agents'.

We will first demonstrate the Currency Rate service using the Go programming language.

In the $GOPATH directory, create the project directory hierarchy by executing the following commands:

$ cd $GOPATH/src/polarsparc.com/grpc

$ mkdir -p serverstream serverstream/currencypb serverstream/server serverstream/client

The following are the contents of the file currency.proto located in the directory $GOPATH/src/polarsparc.com/grpc/serverstream/currencypb as shown below:


currency.proto
/*
    @Author: Bhaskar S
    @Blog:   https://www.polarsparc.com
    @Date:   04 Dec 2020
*/

syntax = "proto3";

package serverstream;

option go_package = "polarsparc.com/grpc/serverstream/currencypb";

option java_multiple_files = true;
option java_package = "com.polarsparc.gss";

message CurrencyRateRequest {
  string from = 1;
  string to = 2;
}

message CurrencyRateResponse {
  string agent = 1;
  string from = 2;
  string to = 3;
  double rate = 4;
}

service CurrencyService {
  rpc getCurrencyRate(CurrencyRateRequest) returns (stream CurrencyRateResponse) {};
}

The request message is defined as CurrencyRateRequest and the response message is defined as CurrencyRateResponse. The service interface is defined as CurrencyService with an RPC method getCurrencyRate that takes in a CurrencyRateRequest as an input and returns a sequence (or stream) of CurrencyRateResponse objects.

To compile the currency.proto file, execute the following commands:

$ cd $GOPATH/src/polarsparc.com/grpc/serverstream

$ protoc currencypb/currency.proto --go_out=plugins=grpc:$GOPATH/src

On success, this will generate the Go code file called currency.pb.go located in the directory $GOPATH/src/polarsparc.com/grpc/serverstream/currencypb.

From the file currency.pb.go, we see the CurrencyServiceServer interface, as shown below, that the server needs to implements:


currency.pb.go
.
.
.
type CurrencyServiceServer interface {
	GetCurrencyRate(*CurrencyRateRequest, CurrencyService_GetCurrencyRateServer) error
}
.
.
.

The following are the contents of the file currency_provider.go that simulates an in-memory store for initializing and returning currency rates from fictitious agents and is located in the directory $GOPATH/src/polarsparc.com/grpc/serverstream/server as shown below:


currency_provider.go
/*
  @Author: Bhaskar S
  @Blog:   https://www.polarsparc.com
  @Date:   04 Dec 2020
*/

package main

import (
  "fmt"
  "github.com/pkg/errors"
  "log"
  "strings"
)

type CurrencyRate struct {
  Agent string
  Rate float64
}

type RatesCache map[string][]CurrencyRate

type server struct {
  cache RatesCache
}

func (s *server) Init() {
  l1 := []CurrencyRate{{Agent: "Alice", Rate: 1.30}, {Agent: "Bob", Rate: 1.302},{Agent: "Dave", Rate: 1.31}}
  s.cache["USD:CAD"] = l1
  l2 := []CurrencyRate{{Agent: "Alice", Rate: 0.85}, {Agent: "Charlie", Rate: 0.84}}
  s.cache["USD:EUR"] = l2
  l3 := []CurrencyRate{{Agent: "Bob", Rate: 0.75}, {Agent: "Charlie", Rate: 0.751},{Agent: "Eve", Rate: 0.74}}
  s.cache["USD:GBP"] = l3
}

func (s *server) GetAgentRates(from string, to string) ([]CurrencyRate, error) {
  key := strings.ToUpper(from + ":" + to)

  log.Printf("Currency rate request for key: %s\n", key)

  rates := s.cache[key]
  if rates == nil {
    return nil, errors.New(fmt.Sprintf("No rate for currency from: %s, to: %s", from, to))
  }

  log.Printf("Currency rates for key: %s = %v", key, rates)

  return rates, nil
}

The following are the contents of the file server.go for the Server Streaming RPC server that implements the CurrencyServiceServer interface and is located in the directory $GOPATH/src/polarsparc.com/grpc/serverstream/server as shown below:


server.go
/*
  @Author: Bhaskar S
  @Blog:   https://www.polarsparc.com
  @Date:   04 Dec 2020
*/

package main

import (
  "google.golang.org/grpc"
  "log"
  "net"
  "polarsparc.com/grpc/serverstream/currencypb" // [1]
  "time"
)

func (s *server) GetCurrencyRate(req *currencypb.CurrencyRateRequest,
  stream currencypb.CurrencyService_GetCurrencyRateServer) error { // [2]
  log.Printf("Received a CurrencyRate request with req: %v\n", req)

  from := req.From
  to := req.To

  rates, err := s.GetAgentRates(from, to)
  if err == nil {
    log.Printf("Rates from agents: %v\n", rates)
    for _, r := range rates {
      res := ¤cypb.CurrencyRateResponse{Agent: r.Agent, From: from, To: to, Rate: r.Rate}
      stream.Send(res) // [3]
      time.Sleep(250 * time.Millisecond)
    }
    return nil
  }

  return err
}

func main()  {
  cs := &server{
    cache: RatesCache{},
  }
  cs.Init() // [4]

  log.Println("Ready to start the CurrencyRate server...")

  lis, err := net.Listen("tcp", "localhost:20002")
  if err != nil {
    log.Fatalf("Failed to create listener on localhost:20002")
  }

  srv := grpc.NewServer() // [5]

  currencypb.RegisterCurrencyServiceServer(srv, cs) // [6]

  if err = srv.Serve(lis); err != nil {
    log.Fatalf("Failed to start server: %v", err)
  }
}

The following are brief descriptions for some of the Go type(s)/method(s) used in the code above:

The following are the contents of the file client.go that implements the Server Streaming RPC client for the CurrencyServiceServer located in the directory $GOPATH/src/polarsparc.com/grpc/serverstream/client as shown below:


client.go
/*
  @Author: Bhaskar S
  @Blog:   https://www.polarsparc.com
  @Date:   04 Dec 2020
*/

package main

import (
  "golang.org/x/net/context"
  "google.golang.org/grpc"
  "io"
  "log"
  "polarsparc.com/grpc/serverstream/currencypb"
)

func main() {
  log.Println("Ready to start the CurrencyRate client...")

  conn, err := grpc.Dial("localhost:20002", grpc.WithInsecure())
  if err != nil {
    log.Fatalf("Failed to connect to localhost:20002")
  }
  defer conn.Close()

  cl := currencypb.NewCurrencyServiceClient(conn) // [1]

  // Success
  req := ¤cypb.CurrencyRateRequest{From: "usd",
    To: "eur"} // [2]
  stream, err := cl.GetCurrencyRate(context.Background(), req) // [3]
  if err != nil {
    log.Fatalf("[1] Failed to send CurrencyRate request to localhost:20002")
  }
  for {
    res, err := stream.Recv() // [4]
    if err == io.EOF {
      break
    }
    if err != nil {
      log.Fatalf("[1] Received and error from CurrencyRate at localhost:20002: %v", err)
    }
    log.Printf("[1] ===> Agent: %s, Rate: %.03f\n", res.Agent, res.Rate)
  }

  // Error
  req2 := ¤cypb.CurrencyRateRequest{From: "usd",
    To: "jpy"}
  stream2, err := cl.GetCurrencyRate(context.Background(), req2)
  if err != nil {
    log.Fatalf("[2] Failed to send CurrencyRate request to localhost:20002")
  }
  for {
    res, err := stream2.Recv()
    if err == io.EOF {
      break
    }
    if err != nil {
      log.Fatalf("[2] Received and error from CurrencyRate at localhost:20002: %v", err)
    }
    log.Printf("[2] ===> Agent: %s, Rate: %.03f\n", res.Agent, res.Rate)
  }
}

The following are brief descriptions for some of the Go type(s)/method(s) used in the code above:

Open two Terminal windows - one for the server and one for the client.

In the server Terminal, execute the following commands:

$ cd $GOPATH/src/polarsparc.com/grpc/serverstream/server

$ go run server.go currency_provider.go

The following would be the typical output:

Output.5

2020/12/04 21:03:37 Ready to start the CurrencyRate server...

In the client Terminal, execute the following commands:

$ cd $GOPATH/src/polarsparc.com/grpc/serverstream/client

$ go run client.go

The following would be the typical output:

Output.6

2020/12/04 21:04:31 Ready to start the CurrencyRate client...
2020/12/04 21:04:31 [1] ===> Agent: Alice, Rate: 0.850
2020/12/04 21:04:31 [1] ===> Agent: Charlie, Rate: 0.840
2020/12/04 21:04:32 [2] Received and error from CurrencyRate at localhost:20002: rpc error: code = Unknown desc = No rate for currency from: usd, to: jpy
exit status 1

EXCELLENT !!! We have successfully demonstrated the Server Streaming gRPC communication style using the Go language.

Copy the file currency.proto listed above to the directory $HOME/java/grpc/src/main/proto.

To compile the currency.proto file, execute the following commands:

$ cd $HOME/java/grpc

$ mvn compile

On success, this will generate some files in the directory $HOME/java/grpc/target/generated-sources/protobuf/java/com/polarsparc/gss.

The following diagram illustrates the contents of the directory $HOME/java/grpc/target/generated-sources :

generated-sources Directory
Figure-7

The following are the contents of the file CurrencyRate.java that acts a holder object for storing the fictitious agent and the currency rate they offer and is located in the directory $HOME/java/grpc/src/main/java/com/polarsparc/gss/server as shown below:


CurrencyRate.java
/*
  @Author: Bhaskar S
  @Blog:   https://www.polarsparc.com
  @Date:   04 Dec 2020
*/

package com.polarsparc.gss.server;

public class CurrencyRate {
    private final String agent;
    private final Double rate;

    public CurrencyRate(String agent, Double rate) {
        this.agent = agent;
        this.rate = rate;
    }

    public String getAgent() {
        return this.agent;
    }

    public Double getRate() {
        return this.rate;
    }

    @Override
    public String toString() {
        return "CurrencyRate{" +
                "agent='" + agent + '\'' +
                ", rate=" + rate +
                '}';
    }
}

The following are the contents of the file CurrencyRateProvider.java that simulates an in-memory store for initializing and returning currency rates from fictitious agents and is located in the directory $HOME/java/grpc/src/main/java/com/polarsparc/gss/server as shown below:


CurrencyRateProvider.java
/*
  @Author: Bhaskar S
  @Blog:   https://www.polarsparc.com
  @Date:   04 Dec 2020
*/

package com.polarsparc.gss.server;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;

public class CurrencyRateProvider {
    private final static Logger LOGGER = Logger.getLogger(CurrencyRateProvider.class.getName());

    private final static Map<String, List<CurrencyRate>>> ratesTable = new HashMap<>();

    static {
        LOGGER.setLevel(Level.INFO);

        ratesTable.put("USD:CAD", Arrays.asList(new CurrencyRate("Alice", 1.30),
                new CurrencyRate("Bob", 1.302),
                new CurrencyRate("Dave", 1.31)));
        ratesTable.put("USD:EUR", Arrays.asList(new CurrencyRate("Alice", 0.85),
                new CurrencyRate("Charlie", 0.84)));
        ratesTable.put("USD:GBP", Arrays.asList(new CurrencyRate("Bob", 0.75),
                new CurrencyRate("Charlie", 0.751),
                new CurrencyRate("Eve", 0.74)));
    }

    private CurrencyRateProvider() {
    }

    public static List<CurrencyRate>> getCurrencyRate(String from, String to) {
        String key = (from + ":" + to).toUpperCase();

        LOGGER.info(String.format("Currency rate request for key: %s", key));

        if (!ratesTable.containsKey(key)) {
            throw new RuntimeException(String.format("No rate for currency from: %s, to: %s", from, to));
        }

        List<CurrencyRate>> rates = ratesTable.get(key);

        LOGGER.info(String.format("Currency rates for key: %s = %s", key, rates));

        return rates;
    }
}

The following are the contents of the Java program called CurrencyRateService.java that implements the Server Streaming gRPC service CurrencyService located in the directory $HOME/java/grpc/src/main/java/com/polarsparc/gss/server as shown below:


CurrencyRateService.java
/*
  @Author: Bhaskar S
  @Blog:   https://www.polarsparc.com
  @Date:   04 Dec 2020
*/

package com.polarsparc.gss.server;

import com.polarsparc.gss.CurrencyRateRequest;
import com.polarsparc.gss.CurrencyRateResponse;
import com.polarsparc.gss.CurrencyServiceGrpc;

import io.grpc.Status;
import io.grpc.stub.StreamObserver;

import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

public class CurrencyRateService extends CurrencyServiceGrpc.CurrencyServiceImplBase { // [1]
    private final static Logger LOGGER = Logger.getLogger(CurrencyRateService.class.getName());

    static {
        LOGGER.setLevel(Level.INFO);
    }

    @Override
    public void getCurrencyRate(CurrencyRateRequest request, 
      StreamObserver<CurrencyRateResponse> responseObserver) { // [2]
        String from = request.getFrom();
        String to = request.getTo();

        List<CurrencyRate> rates;
        try {
            rates = CurrencyRateProvider.getCurrencyRate(from, to);
        }
        catch (RuntimeException ex) {
            Status status = Status.FAILED_PRECONDITION.withDescription(ex.getMessage());
            responseObserver.onError(status.asRuntimeException());
            return;
        }

        if (rates != null) {
            LOGGER.info(String.format("Rates from agents: %s", rates));

            rates.forEach(r -> {
                CurrencyRateResponse response = CurrencyRateResponse.newBuilder()
                        .setAgent(r.getAgent())
                        .setFrom(from)
                        .setTo(to)
                        .setRate(r.getRate())
                        .build();
                responseObserver.onNext(response); // [3]
                try {
                    Thread.sleep(250);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            responseObserver.onCompleted(); // [4]
        }
    }
}

The following are brief descriptions for some of the Java class(es)/method(s) used in the code above:

The following are the contents of the Java program called CurrencyRateServer.java that registers the Server Streaming RPC service CurrencyRateService as a gRPC server and is located in the directory $HOME/java/grpc/src/main/java/com/polarsparc/gss/server as shown below:


CurrencyRateServer.java
/*
  @Author: Bhaskar S
  @Blog:   https://www.polarsparc.com
  @Date:   04 Dec 2020
*/

package com.polarsparc.gss.server;

import io.grpc.Server;
import io.grpc.ServerBuilder;

import java.io.IOException;

public class CurrencyRateServer {
    public static void main(String[] args) {
        Server server = ServerBuilder.forPort(20002) // [1]
                .addService(new CurrencyRateService()) // [2]
                .build();

        try {
            server.start(); // [3]
        } catch (IOException e) {
            e.printStackTrace();
        }

        System.out.print("Started the gRPC CurrencyRateService on 20002 ...\n");

        try {
            server.awaitTermination(); // [4]
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

The following are brief descriptions for some of the Java class(es)/method(s) used in the code above:

To receive responses from the server in an asynchronous fashion, a client needs to implement the interface StreamObserver and register it as a callback handler on the client stub. The following are the contents of the Java program called CurrencyRateStreamObserver.java that implements the required interface for the asynchronous callback and located in the directory $HOME/java/grpc/src/test/java/com/polarsparc/gss/client as shown below:


CurrencyRateStreamObserver.java
/*
  @Author: Bhaskar S
  @Blog:   https://www.polarsparc.com
  @Date:   04 Dec 2020
*/

package com.polarsparc.gss.client;

import com.polarsparc.gss.CurrencyRateResponse;
import io.grpc.stub.StreamObserver;

import java.util.concurrent.CountDownLatch;

public class CurrencyRateStreamObserver implements StreamObserver<CurrencyRateResponse> {
    private final CountDownLatch latch;

    public CurrencyRateStreamObserver(CountDownLatch latch) {
        this.latch = latch;
    }

    @Override
    public void onNext(CurrencyRateResponse response) { // [1]
        System.out.printf("Agent: %s, Rate: %.03f\n", response.getAgent(), response.getRate());
    }

    @Override
    public void onError(Throwable ex) { // [2]
        System.out.println("Exception: " + ex.getMessage());
    }

    @Override
    public void onCompleted() { // [3]
        System.out.println("Done !!!");
        latch.countDown();
    }
}

The following are brief descriptions for some of the Java class(es)/method(s) used in the code above:

The following are the contents of the Java program called CurrencyRateClientTest.java that implements the Server Streaming RPC client for CurrencyService located in the directory $HOME/java/grpc/src/test/java/com/polarsparc/gss/client as shown below:


CurrencyRateClientTest.java
/*
  @Author: Bhaskar S
  @Blog:   https://www.polarsparc.com
  @Date:   04 Dec 2020
*/

package com.polarsparc.gss.client;

import com.polarsparc.gss.CurrencyRateRequest;
import com.polarsparc.gss.CurrencyRateResponse;
import com.polarsparc.gss.CurrencyServiceGrpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;

import java.util.Iterator;
import java.util.concurrent.CountDownLatch;

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class CurrencyRateClientTest {
    private CurrencyServiceGrpc.CurrencyServiceBlockingStub blockingStub;
    private CurrencyServiceGrpc.CurrencyServiceStub stub;

    @BeforeAll
    public void setup() {
        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 20002) // [1]
                .usePlaintext() // [2]
                .build();
        this.blockingStub = CurrencyServiceGrpc.newBlockingStub(channel); // [3]
        this.stub = CurrencyServiceGrpc.newStub(channel); // [4]
    }

    @Test
    public void currencyRateBlockingTestOne() {
        CurrencyRateRequest request = CurrencyRateRequest.newBuilder()
                .setFrom("usd")
                .setTo("eur")
                .build(); // [5]
        Iterator<CurrencyRateResponse> response = this.blockingStub.getCurrencyRate(request); // [6]
        response.forEachRemaining(res -> System.out.printf("Agent: %s, Rate: %.03f\n", res.getAgent(), res.getRate()));
    }

    @Test
    public void currencyRateBlockingTestTwo() {
        CurrencyRateRequest request = CurrencyRateRequest.newBuilder()
                .setFrom("eur")
                .setTo("jpy")
                .build(); // [5]
        Iterator<CurrencyRateResponse> response = this.blockingStub.getCurrencyRate(request); // [6]
        Assertions.assertThrows(io.grpc.StatusRuntimeException.class, () -> response.forEachRemaining(res ->
                System.out.printf("Agent: %s, Rate: %.03f\n", res.getAgent(), res.getRate())));
    }

    @Test
    public void currencyRateAsyncTestOne() {
        CountDownLatch latch = new CountDownLatch(1);
        CurrencyRateRequest request = CurrencyRateRequest.newBuilder()
                .setFrom("usd")
                .setTo("cad")
                .build(); // [5]
        this.stub.getCurrencyRate(request, new CurrencyRateStreamObserver(latch)); // [7]
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

The following are brief descriptions for some of the Java class(es)/method(s) used in the code above:

Open two Terminal windows - one for the server and one for the client.

In the server Terminal, execute the following commands:

$ cd $HOME/java/grpc

$ mvn exec:java -Dexec.mainClass=com.polarsparc.gss.server.CurrencyRateServer

The following would be the typical output:

Output.7

Started the gRPC CurrencyRateService on 20002 ...

In the client Terminal, execute the following commands:

$ cd $HOME/java/grpc

$ mvn test -Dtest=com.polarsparc.gss.client.CurrencyRateClientTest

The following would be the typical output:

Output.8

Running com.polarsparc.gss.client.CurrencyRateClientTest
Agent: Alice, Rate: 1.300
Agent: Bob, Rate: 1.302
Agent: Dave, Rate: 1.310
Done !!!
Agent: Alice, Rate: 0.850
Agent: Charlie, Rate: 0.840
Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 1.849 s

One could also test with the Go server running and using the Java client and vice versa.

References

Introduction to gRPC - Part 1

Introduction to Google Protocol Buffers

gRPC Go Documentation

gRPC Java Documentation


© PolarSPARC