PolarSPARC



Bhaskar S 12/19/2020


Overview

Thus far in this series:

In this FINAL part, we will continue the journey to the next RPC communication pattern - Bidirectional Streaming and also show how to use deadlines (or timeouts). A deadline is the total end-to-end time a client is willing to wait from the time of the request to the time of response from the server (even if the server makes other calls behind the covers)

Bidirectional Streaming RPC

The following diagram illustrates the high-level architecture of Bidirectional Streaming communication pattern:

Bidirectional Streaming Architecture
Figure-9

In the Bidirectional Streaming RPC mode, the client sends a sequence (or stream) of requests to the server and the server responds with a sequence (or stream) of responses back to the client. One *IMPORTANT* fact - the stream of requests and the stream of responses are independent of eath other.

For the Bidirectional Streaming RPC demonstration, we will implement a fictitious Book Recommendation service, where the client sends requests on their preferred 'topics' to the server and the server sends responses back to the client with the recommended books (with their 'title' and 'isbn').

We will first demonstrate the Book Recommendation using the Go programming language.

In the $GOPATH directory, create the project directory hierarchy by executing the following commands:

$ cd $GOPATH/src/polarsparc.com/grpc

$ mkdir -p bidirstream bidirstream/bookspb bidirstream/server bidirstream/client

The following are the contents of the file recommend.proto located in the directory $GOPATH/src/polarsparc.com/grpc/bidirstream/bookspb as shown below:


recommend.proto
/*
    @Author: Bhaskar S
    @Blog:   https://www.polarsparc.com
    @Date:   19 Dec 2020
*/

syntax = "proto3";

package recommend;

option go_package = "polarsparc.com/grpc/bidirstream/bookspb";

option java_multiple_files = true;
option java_package = "com.polarsparc.gbd";

message BookRecommendRequest {
  string topic = 1;
}

message BookRecommendResponse {
  string topic = 1;
  string title = 2;
  string isbn = 3;
}

service BookRecommendService {
  rpc recommendedBooks(stream BookRecommendRequest) returns (stream BookRecommendResponse);
}

The request message is defined as BookRecommendRequest and the response message is defined as BookRecommendResponse. The service interface is defined as BookRecommendService with an RPC method recommendedBooks that takes in a sequence (or stream) of BookRecommendRequest objects and returns a sequence (or stream) BookRecommendResponse objects.

To compile the recommend.proto file, execute the following commands:

$ cd $GOPATH/src/polarsparc.com/grpc/bidirstream

$ protoc bookspb/recommend.proto --go_out=plugins=grpc:$GOPATH/src

On success, this will generate the Go code file called recommend.pb.go located in the directory $GOPATH/src/polarsparc.com/grpc/bidirstream/bookspb.

From the file recommend.pb.go, we see the BookRecommendService interface, as shown below, that the server needs to implements:


recommend.pb.go
.
.
.
type BookRecommendServiceServer interface {
	RecommendedBooks(BookRecommendService_RecommendedBooksServer) error
}
.
.
.

The following are the contents of the file books_catalog.go that simulates an in-memory store for initializing and returning the recommended books for various topics and is located in the directory $GOPATH/src/polarsparc.com/grpc/bidirstream/server as shown below:


books_catalog.go
/*
  @Author: Bhaskar S
  @Blog:   https://www.polarsparc.com
  @Date:   19 Dec 2020
*/

package main

import (
  "fmt"
  "github.com/pkg/errors"
  "log"
  "strings"
)

type Book struct {
  Title string
  ISBN string
}

type BooksCatalog map[string][]Book

type server struct {
  timeout int
  catalog BooksCatalog
}

func (s *server) Init() {
  l1 := []Book{{":The C++ Programming Language", "11111"},
    {"C++ Primer", "22222"},
    {"A Tour of C++", "33333"}}
  s.catalog["C++"] = l1
  l2 := []Book{{":The Go Programming Language", "44444"},
    {"Go in Practice", "55555"},
    {"Black Hat Go", "66666"}}
  s.catalog["GO"] = l2
  l3 := []Book{{":Effective Java", "77777"},
    {"Modern Java in Action", "88888"},
    {"Java: The Complete Reference", "99999"}}
  s.catalog["JAVA"] = l3
  l4 := []Book{{":Python Crash Course", "12121"},
    {"Learning Python", "34343"},
    {"Effective Python", "56565"}}
  s.catalog["PYTHON"] = l4
}

func (s *server) SetTimeout(t int) {
  s.timeout = t
}

func (s *server) GetBooks(topic string) ([]Book, error) {
  key := strings.ToUpper(topic)

  log.Printf("Books request for topic: %s\n", key)

  books := s.catalog[key]
  if books == nil {
    return nil, errors.New(fmt.Sprintf("No books for topic: %s", topic))
  }

  log.Printf("Books for key: %s = %v", key, books)

  return books, nil
}

The following are the contents of the file server.go for the Bidirectional Streaming RPC server that implements the BookRecommendServiceServer interface and is located in the directory $GOPATH/src/polarsparc.com/grpc/bidirstream/server as shown below:


server.go
/*
  @Author: Bhaskar S
  @Blog:   https://www.polarsparc.com
  @Date:   19 Dec 2020
*/

package main

import (
  "google.golang.org/grpc"
  "google.golang.org/grpc/codes"
  "google.golang.org/grpc/status"
  "io"
  "log"
  "net"
  "os"
  "polarsparc.com/grpc/bidirstream/bookspb" // [1]
  "strconv"
  "time"
)

func (s *server) RecommendedBooks(stream bookspb.BookRecommendService_RecommendedBooksServer) error { // [2]
  for {
    req, err := stream.Recv() // [3]
    if err == nil {
      books, err := s.GetBooks(req.Topic) // [4]
      if err == nil {
        for _, bk := range books {
          res := &bookspb.BookRecommendResponse{Topic: req.Topic, Title: bk.Title, Isbn: bk.ISBN}
          err = stream.Send(res) // [5]
          if err != nil {
            code := status.Code(err)
            if code == codes.Canceled { // [6]
              log.Println("ERROR - Deadline breached by BookRecommendService at localhost:20004")
            } else {
              log.Printf("Failed to send from BookRecommendService at localhost:20004: %v", err)
            }
            return err
          }
          time.Sleep(time.Duration(s.timeout) * time.Millisecond) // [7]
        }
      } else {
        log.Printf("Encountered an error on the server: %v", err)
        return status.Errorf(codes.InvalidArgument, err.Error()) // [8]
      }
    } else if err == io.EOF {
      return nil
    } else {
      code := status.Code(err)
      if code == codes.Canceled {
        log.Println("ERROR - Deadline breached by BookRecommendService at localhost:20004")
      } else {
        log.Printf("Encountered an error for BookRecommendService at localhost:20004: %v\n", err)
      }
      return err
    }
  }
}

func main()  {
  tm := 100
  if len(os.Args) == 2 {
    tm, _ = strconv.Atoi(os.Args[1])
  }
  cs := &server{
    catalog: BooksCatalog{},
  }
  cs.Init()
  cs.SetTimeout(tm)

  log.Printf("Ready to start the BookRecommendService server with timeout %d...\n", cs.timeout)

  lis, err := net.Listen("tcp", "localhost:20004")
  if err != nil {
    log.Fatalf("Failed to create listener on localhost:20004")
  }

  srv := grpc.NewServer()

  bookspb.RegisterBookRecommendServiceServer(srv, cs)

  if err = srv.Serve(lis); err != nil {
    log.Fatalf("Failed to start server: %v", err)
  }
}

The following are brief descriptions for some of the Go type(s)/method(s) used in the code above:

The following are the contents of the file client.go that implements the Bidirectional Streaming RPC client for the BookRecommendService located in the directory $GOPATH/src/polarsparc.com/grpc/bidirstream/client as shown below:


client.go
/*
  @Author: Bhaskar S
  @Blog:   https://www.polarsparc.com
  @Date:   19 Dec 2020
*/

package main

import (
  "context"
  "google.golang.org/grpc"
  "google.golang.org/grpc/status"
  "io"
  "log"
  "polarsparc.com/grpc/bidirstream/bookspb"
  "time"
)

func main() {
  log.Println("Ready to start the BookRecommend client...")

  conn, err := grpc.Dial("localhost:20004", grpc.WithInsecure())
  if err != nil {
    log.Fatalf("Failed to connect to localhost:20004")
  }
  defer conn.Close()

  cl := bookspb.NewBookRecommendServiceClient(conn) // [1]

  wc := make(chan bool) // [2]

  // Test Case-1

  stream, err := cl.RecommendedBooks(context.Background()) // [3]
  if err != nil {
    log.Fatalf("[1] Failed to create client stub to localhost:20004: %v", err)
  }

  go func() { // [4]
    err = stream.Send(&bookspb.BookRecommendRequest{
      Topic: "go",
    })
    if err != nil {
      log.Fatalf("[1] Failed to send request to localhost:20004: %v", err)
    }

    err = stream.Send(&bookspb.BookRecommendRequest{
      Topic: "Python",
    })
    if err != nil {
      log.Fatalf("[1] Failed to send request to localhost:20004: %v", err)
    }

    err = stream.CloseSend()
    if err != nil {
      log.Fatalf("[1] Failed to close send stream: %v", err)
    }
  }()

  go func() { // [5]
    for {
      res, err := stream.Recv()
      if err == nil {
        log.Printf("[1] Recommended book => topic: %s, title: %s, isbn: %s\n",
          res.Topic, res.Title, res.Isbn)
      } else if err == io.EOF {
        wc <-true
        break
      } else {
        log.Fatalf("[1] Encountered an error on receive from server: %v", err)
      }
    }
  }()

  // Test Case-1 Wait
  <-wc // [6]

  // Test Case-2 - Error case

  stream, err = cl.RecommendedBooks(context.Background()) // [3]
  if err != nil {
    log.Fatalf("[2] Failed to create client stub to localhost:20004: %v", err)
  }

  go func() { // [4]
    err = stream.Send(&bookspb.BookRecommendRequest{
      Topic: "java",
    })
    if err != nil {
      log.Fatalf("[2] Failed to send request to localhost:20004: %v", err)
    }

    err = stream.Send(&bookspb.BookRecommendRequest{
      Topic: "rust",
    })
    if err != nil {
      log.Fatalf("[2] Failed to send request to localhost:20004: %v", err)
    }

    err = stream.CloseSend()
    if err != nil {
      log.Fatalf("[2] Failed to close send stream: %v", err)
    }
  }()

  go func() { // [5]
    for {
      res, err := stream.Recv()
      if err == nil {
        log.Printf("[2] Recommended book => topic: %s, title: %s, isbn: %s\n",
          res.Topic, res.Title, res.Isbn)
      } else if err == io.EOF {
        wc <-true
        break
      } else {
        st, ok := status.FromError(err)
        if ok {
          log.Printf("[2] Error - %s\n", st.Message())
        } else {
          log.Fatalf("[2] Unexpected failure from localhost:20004: %v", err)
        }
        wc <-true
        break
      }
    }
  }()

  // Test Case-2 Wait
  <-wc // [6]

  // Test Case-3 - Deadline/Timeout case

  duration := time.Now().Add(550 * time.Millisecond)
  ctx, cancel := context.WithDeadline(context.Background(), duration) // [7]
  defer cancel()

  stream, err = cl.RecommendedBooks(ctx)
  if err != nil {
    log.Fatalf("[3] Failed to create client stub to localhost:20004: %v", err)
  }

  go func() { // [4]
    err = stream.Send(&bookspb.BookRecommendRequest{
      Topic: "java",
    })
    if err != nil {
      log.Fatalf("[3] Failed to send request to localhost:20004: %v", err)
    }

    err = stream.Send(&bookspb.BookRecommendRequest{
      Topic: "python",
    })
    if err != nil {
      log.Fatalf("[3] Failed to send request to localhost:20004: %v", err)
    }

    err = stream.CloseSend()
    if err != nil {
      log.Fatalf("[3] Failed to close send stream: %v", err)
    }
  }()

  go func() { // [5]
    for {
      res, err := stream.Recv()
      if err == nil {
        log.Printf("[3] Recommended book => topic: %s, title: %s, isbn: %s\n",
          res.Topic, res.Title, res.Isbn)
      } else if err == io.EOF {
        wc <-true
        break
      } else {
        st, ok := status.FromError(err)
        if ok {
          log.Printf("[3] Error - %s\n", st.Message())
        } else {
          log.Fatalf("[3] Unexpected failure from localhost:20004: %v", err)
        }
        wc <-true
        break
      }
    }
  }()

  // Test Case-3 Wait
  <-wc // [6]
}

The following are brief descriptions for some of the Go type(s)/method(s) used in the code above:

For this demonstration, we will run 2 tests - one within the client specified deadline and the other that violates the client deadline.

Open two Terminal windows - one for the server and one for the client.

Test - 1

In the server Terminal, execute the following commands:

$ cd $GOPATH/src/polarsparc.com/grpc/bidirstream/server

$ go run server.go books_catalog.go 30

Notice we have specified a timeout of 30 ms. The following would be the typical output:

Output.15

2020/12/19 09:04:01 Ready to start the BookRecommendService server with timeout 30...

In the client Terminal, execute the following commands:

$ cd $GOPATH/src/polarsparc.com/grpc/bidirstream/client

$ go run client.go

The following would be the typical output:

Output.16

2020/12/19 09:04:31 Ready to start the BookRecommend client...
2020/12/19 09:04:31 [1] Recommended book => topic: go, title: :The Go Programming Language, isbn: 44444
2020/12/19 09:04:31 [1] Recommended book => topic: go, title: Go in Practice, isbn: 55555
2020/12/19 09:04:31 [1] Recommended book => topic: go, title: Black Hat Go, isbn: 66666
2020/12/19 09:04:31 [1] Recommended book => topic: Python, title: :Python Crash Course, isbn: 12121
2020/12/19 09:04:31 [1] Recommended book => topic: Python, title: Learning Python, isbn: 34343
2020/12/19 09:04:31 [1] Recommended book => topic: Python, title: Effective Python, isbn: 56565
2020/12/19 09:04:31 [2] Recommended book => topic: java, title: :Effective Java, isbn: 77777
2020/12/19 09:04:31 [2] Recommended book => topic: java, title: Modern Java in Action, isbn: 88888
2020/12/19 09:04:31 [2] Recommended book => topic: java, title: Java: The Complete Reference, isbn: 99999
2020/12/19 09:04:31 [2] Error - No books for topic: rust
2020/12/19 09:04:31 [3] Recommended book => topic: java, title: :Effective Java, isbn: 77777
2020/12/19 09:04:31 [3] Recommended book => topic: java, title: Modern Java in Action, isbn: 88888
2020/12/19 09:04:31 [3] Recommended book => topic: java, title: Java: The Complete Reference, isbn: 99999
2020/12/19 09:04:31 [3] Recommended book => topic: python, title: :Python Crash Course, isbn: 12121
2020/12/19 09:04:31 [3] Recommended book => topic: python, title: Learning Python, isbn: 34343
2020/12/19 09:04:31 [3] Recommended book => topic: python, title: Effective Python, isbn: 56565

The following would be the additional output on the Terminal running the server:

Output.17

2020/12/19 09:04:31 Books request for topic: GO
2020/12/19 09:04:31 Books for key: GO = [{:The Go Programming Language 44444} {Go in Practice 55555} {Black Hat Go 66666}]
2020/12/19 09:04:31 Books request for topic: PYTHON
2020/12/19 09:04:31 Books for key: PYTHON = [{:Python Crash Course 12121} {Learning Python 34343} {Effective Python 56565}]
2020/12/19 09:04:31 Books request for topic: JAVA
2020/12/19 09:04:31 Books for key: JAVA = [{:Effective Java 77777} {Modern Java in Action 88888} {Java: The Complete Reference 99999}]
2020/12/19 09:04:31 Books request for topic: RUST
2020/12/19 09:04:31 Encountered an error on the server: No books for topic: rust
2020/12/19 09:04:31 Books request for topic: JAVA
2020/12/19 09:04:31 Books for key: JAVA = [{:Effective Java 77777} {Modern Java in Action 88888} {Java: The Complete Reference 99999}]
2020/12/19 09:04:31 Books request for topic: PYTHON
2020/12/19 09:04:31 Books for key: PYTHON = [{:Python Crash Course 12121} {Learning Python 34343} {Effective Python 56565}]
Test - 2

In the server Terminal, terminate the currently running server by pressing CTRL-C and execute the following commands:

$ cd $GOPATH/src/polarsparc.com/grpc/bidirstream/server

$ go run server.go books_catalog.go 100

Now we have specified a longer timeout of 100 ms. The following would be the typical output:

Output.18

2020/12/19 09:08:25 Ready to start the BookRecommendService server with timeout 100...

In the client Terminal, re-execute the following commands:

$ cd $GOPATH/src/polarsparc.com/grpc/bidirstream/client

$ go run client.go

The following would be the typical output:

Output.19

2020/12/19 09:09:23 Ready to start the BookRecommend client...
2020/12/19 09:09:23 [1] Recommended book => topic: go, title: :The Go Programming Language, isbn: 44444
2020/12/19 09:09:23 [1] Recommended book => topic: go, title: Go in Practice, isbn: 55555
2020/12/19 09:09:23 [1] Recommended book => topic: go, title: Black Hat Go, isbn: 66666
2020/12/19 09:09:23 [1] Recommended book => topic: Python, title: :Python Crash Course, isbn: 12121
2020/12/19 09:09:23 [1] Recommended book => topic: Python, title: Learning Python, isbn: 34343
2020/12/19 09:09:23 [1] Recommended book => topic: Python, title: Effective Python, isbn: 56565
2020/12/19 09:09:23 [2] Recommended book => topic: java, title: :Effective Java, isbn: 77777
2020/12/19 09:09:23 [2] Recommended book => topic: java, title: Modern Java in Action, isbn: 88888
2020/12/19 09:09:24 [2] Recommended book => topic: java, title: Java: The Complete Reference, isbn: 99999
2020/12/19 09:09:24 [2] Error - No books for topic: rust
2020/12/19 09:09:24 [3] Recommended book => topic: java, title: :Effective Java, isbn: 77777
2020/12/19 09:09:24 [3] Recommended book => topic: java, title: Modern Java in Action, isbn: 88888
2020/12/19 09:09:24 [3] Recommended book => topic: java, title: Java: The Complete Reference, isbn: 99999
2020/12/19 09:09:24 [3] Recommended book => topic: python, title: :Python Crash Course, isbn: 12121
2020/12/19 09:09:24 [3] Recommended book => topic: python, title: Learning Python, isbn: 34343
2020/12/19 09:09:24 [3] Recommended book => topic: python, title: Effective Python, isbn: 56565
2020/12/19 09:09:24 [3] Error - context deadline exceeded

In this case, the client specified deadline exceeded resulting in a gRPC error.

The following would be the additional output on the Terminal running the server:

Output.20

2020/12/19 09:09:23 Books request for topic: GO
2020/12/19 09:09:23 Books for key: GO = [{:The Go Programming Language 44444} {Go in Practice 55555} {Black Hat Go 66666}]
2020/12/19 09:09:23 Books request for topic: PYTHON
2020/12/19 09:09:23 Books for key: PYTHON = [{:Python Crash Course 12121} {Learning Python 34343} {Effective Python 56565}]
2020/12/19 09:09:23 Books request for topic: JAVA
2020/12/19 09:09:23 Books for key: JAVA = [{:Effective Java 77777} {Modern Java in Action 88888} {Java: The Complete Reference 99999}]
2020/12/19 09:09:24 Books request for topic: RUST
2020/12/19 09:09:24 Encountered an error on the server: No books for topic: rust
2020/12/19 09:09:24 Books request for topic: JAVA
2020/12/19 09:09:24 Books for key: JAVA = [{:Effective Java 77777} {Modern Java in Action 88888} {Java: The Complete Reference 99999}]
2020/12/19 09:09:24 Books request for topic: PYTHON
2020/12/19 09:09:24 Books for key: PYTHON = [{:Python Crash Course 12121} {Learning Python 34343} {Effective Python 56565}]
2020/12/19 09:09:24 ERROR - Deadline breached by BookRecommendService at localhost:20004

PERFECTO !!! We have successfully demonstrated the Client Streaming gRPC communication style using the Go language.

Copy the file recommend.proto listed above to the directory $HOME/java/grpc/src/main/proto.

To compile the recommend.proto file, execute the following commands:

$ cd $HOME/java/grpc

$ mvn compile

On success, this will generate some files in the directory $HOME/java/grpc/target/generated-sources/protobuf/java/com/polarsparc/gbd.

The following are the contents of the file Book.java that represents a holder object for storing the book informartion such as the title and the ISBN. It is located in the directory $HOME/java/grpc/src/main/java/com/polarsparc/gbd/server as shown below:


Book.java
/*
  @Author: Bhaskar S
  @Blog:   https://www.polarsparc.com
  @Date:   19 Dec 2020
*/

package com.polarsparc.gbd.server;

public class Book {
    private final String title;

    private final String ISBN;

    public Book(String title, String isbn) {
        this.title = title;
        this.ISBN = isbn;
    }

    public String getTitle() {
        return title;
    }

    public String getISBN() {
        return ISBN;
    }

    @Override
    public String toString() {
        return "Book{" +
                "title='" + title + '\'' +
                ", ISBN='" + ISBN + '\'' +
                '}';
    }
}

The following are the contents of the file BooksCatalog.java that simulates an in-memory store for initializing and returning a list of recommended books for a given topic and is located in the directory $HOME/java/grpc/src/main/java/com/polarsparc/gbd/server as shown below:


BooksCatalog.java
/*
  @Author: Bhaskar S
  @Blog:   https://www.polarsparc.com
  @Date:   19 Dec 2020
*/

package com.polarsparc.gbd.server;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;

public class BooksCatalog {
    private final static Logger LOGGER = Logger.getLogger(BooksCatalog.class.getName());

    private final static Map<String, List<Book>> catalog = new HashMap<>();

    static {
        LOGGER.setLevel(Level.INFO);

        catalog.put("C++", Arrays.asList(
                new Book(":The C++ Programming Language", "11111"),
                new Book("C++ Primer", "22222"),
                new Book("A Tour of C++", "33333")
        ));

        catalog.put("GO", Arrays.asList(
                new Book(":The Go Programming Language", "44444"),
                new Book("Go in Practice", "55555"),
                new Book("Black Hat Go", "66666")
        ));

        catalog.put("JAVA", Arrays.asList(
                new Book(":Effective Java", "77777"),
                new Book("Modern Java in Action", "88888"),
                new Book("Java: The Complete Reference", "99999")
        ));

        catalog.put("PYTHON", Arrays.asList(
                new Book(":Python Crash Course", "12121"),
                new Book("Learning Python", "34343"),
                new Book("Effective Python", "56565")
        ));
    }

    private BooksCatalog() {
    }

    public static List<Book> getBooks(String topic) {
        String key = topic.toUpperCase();

        LOGGER.info(String.format("Books request for topic: %s", key));

        if (!catalog.containsKey(key)) {
            LOGGER.severe(String.format("No books for topic: %s", key));
            throw new RuntimeException(String.format("No books for topic: %s", key));
        }

        List<Book> books = catalog.get(key);

        LOGGER.info(String.format("Books for key: %s = %s", key, books));

        return books;
    }
}

To receive the sequence of requests from the client, the server needs to return a stub handler object to the client that implements the interface StreamObserver. The following are the contents of the Java program called BookRecommendRequestStreamObserver.java that implements the required interface and located in the directory $HOME/java/grpc/src/test/java/com/polarsparc/gbd/server as shown below:


BookRecommendRequestStreamObserver.java
/*
  @Author: Bhaskar S
  @Blog:   https://www.polarsparc.com
  @Date:   19 Dec 2020
*/

package com.polarsparc.gbd.server;

import com.polarsparc.gbd.BookRecommendRequest;
import com.polarsparc.gbd.BookRecommendResponse;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;

import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

public class BookRecommendRequestStreamObserver implements StreamObserver<BookRecommendRequest> {
    private final static Logger LOGGER = Logger.getLogger(BookRecommendRequestStreamObserver.class.getName());

    private final StreamObserver<BookRecommendResponse> response;
    private final int timeout;
    private boolean flag = true; // [1]

    public BookRecommendRequestStreamObserver(int timeout, StreamObserver<BookRecommendResponse> response) {
        LOGGER.setLevel(Level.INFO);

        this.timeout = timeout;
        this.response = response;

        LOGGER.info(String.format("Timeout value: %d\n", timeout));
    }

    @Override
    public void onNext(BookRecommendRequest request) {
        try {
            List<Book> books = BooksCatalog.getBooks(request.getTopic());
            for (Book bk : books) {
                BookRecommendResponse res = BookRecommendResponse.newBuilder()
                        .setTopic(request.getTopic())
                        .setTitle(bk.getTitle())
                        .setIsbn(bk.getISBN())
                        .build();
                response.onNext(res);
                try {
                    Thread.sleep(timeout);
                }
                catch (InterruptedException ignored) {
                }
            }
        }
        catch (RuntimeException ex) {
            onError(ex); // [3]
        }
    }

    @Override
    public void onError(Throwable ex) { // [4]
        Status status = Status.fromThrowable(ex);
        if (status.getCode() == Status.CANCELLED.getCode()) {
            LOGGER.severe("ERROR - Deadline breached by BookRecommendService at localhost:20004");
        } else {
            LOGGER.severe(ex.getMessage());
        }
        response.onError(status.asRuntimeException());
        flag = false;
    }

    @Override
    public void onCompleted() { // [5]
        if (flag) {
            response.onCompleted();
        }
    }
}

The following are brief descriptions for some of the Java class(es)/method(s) used in the code above:

The following are the contents of the Java program called BookRecommendService.java that implements the Bidirectional Streaming gRPC service BookRecommendService located in the directory $HOME/java/grpc/src/main/java/com/polarsparc/gbd/server as shown below:


BookRecommendService.java
/*
  @Author: Bhaskar S
  @Blog:   https://www.polarsparc.com
  @Date:   19 Dec 2020
*/

package com.polarsparc.gbd.server;

import com.polarsparc.gbd.BookRecommendRequest;
import com.polarsparc.gbd.BookRecommendResponse;
import com.polarsparc.gbd.BookRecommendServiceGrpc;
import io.grpc.stub.StreamObserver;

public class BookRecommendService extends BookRecommendServiceGrpc.BookRecommendServiceImplBase { // [1]
    private final int timeout;

    public BookRecommendService(int timeout) {
        this.timeout = timeout;
    }

    @Override
    public StreamObserver<BookRecommendRequest> recommendedBooks(StreamObserver<BookRecommendResponse> responseObserver) {
        return new BookRecommendRequestStreamObserver(timeout, responseObserver);
    }
}

The following are brief descriptions for some of the Java class(es)/method(s) used in the code above:

The following are the contents of the Java program called BookRecommendServer.java that registers the Bidirectional Streaming RPC service BookRecommendService as a gRPC server and is located in the directory $HOME/java/grpc/src/main/java/com/polarsparc/gbd/server as shown below:


BookRecommendServer.java
/*
  @Author: Bhaskar S
  @Blog:   https://www.polarsparc.com
  @Date:   19 Dec 2020
*/

package com.polarsparc.gbd.server;

import io.grpc.Server;
import io.grpc.ServerBuilder;

import java.io.IOException;

public class BookRecommendServer {
    public static void main(String[] args) {
        int timeout = 100;
        if (args.length == 1) {
            timeout = Integer.parseInt(args[0]); // [1]
        }

        Server server = ServerBuilder.forPort(20004) // [2]
                .addService(new BookRecommendService(timeout)) // [3]
                .build();

        try {
            server.start(); // [4]
        } catch (IOException e) {
            e.printStackTrace();
        }

        System.out.printf("Started the gRPC BookRecommendService on 20004 with timeout %d...\n", timeout);

        try {
            server.awaitTermination(); // [5]
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

The following are brief descriptions for some of the Java class(es)/method(s) used in the code above:

To receive responses from the server in an asynchronous fashion, a client needs to implement the interface StreamObserver and register it as a callback handler on the client stub. The following are the contents of the Java program called BookRecommendStreamObserver.java that implements required interface for the asynchronous callback and located in the directory $HOME/java/grpc/src/test/java/com/polarsparc/gbd/client as shown below:


BookRecommendStreamObserver.java
/*
  @Author: Bhaskar S
  @Blog:   https://www.polarsparc.com
  @Date:   19 Dec 2020
*/

package com.polarsparc.gbd.client;

import com.polarsparc.gbd.BookRecommendResponse;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;

import java.util.concurrent.CountDownLatch;
import java.util.logging.Level;
import java.util.logging.Logger;

public class BookRecommendStreamObserver implements StreamObserver<BookRecommendResponse> {
    private final static Logger LOGGER = Logger.getLogger(BookRecommendStreamObserver.class.getName());

    private final CountDownLatch latch;

    static {
        LOGGER.setLevel(Level.INFO);
    }

    public BookRecommendStreamObserver(CountDownLatch latch) {
        this.latch = latch;
    }

    @Override
    public void onNext(BookRecommendResponse response) { // [1]
        LOGGER.info(String.format("Recommended book => topic: %s, title: %s, isbn: %s\n",
                response.getTopic(), response.getTitle(), response.getIsbn()));
    }

    @Override
    public void onError(Throwable ex) { // [2]
        Status status = Status.fromThrowable(ex); // [3]
        LOGGER.severe(String.format("Error status: code - %s, description - %s\n", status.getCode(), status.getDescription()));
        latch.countDown();
    }

    @Override
    public void onCompleted() { // [4]
        latch.countDown();
    }
}

The following are brief descriptions for some of the Java class(es)/method(s) used in the code above:

The following are the contents of the Java program called BookRecommendClientTest.java that implements the Client Streaming RPC client for BookRecommendService and is located in the directory $HOME/java/grpc/src/test/java/com/polarsparc/gbd/client as shown below:


BookRecommendClientTest.java
/*
  @Author: Bhaskar S
  @Blog:   https://www.polarsparc.com
  @Date:   19 Dec 2020
*/

package com.polarsparc.gbd.client;

import com.polarsparc.gbd.BookRecommendRequest;
import com.polarsparc.gbd.BookRecommendServiceGrpc;
import io.grpc.Deadline;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class BookRecommendClientTest {
    private ManagedChannel channel;

    @BeforeAll
    public void setup() {
        channel = ManagedChannelBuilder.forAddress("localhost", 20004) // [1]
                .usePlaintext() // [2]
                .build();
    }

    @Test
    public void bookRecommendAsyncTestOne() {
        CountDownLatch latch = new CountDownLatch(1);
        BookRecommendServiceGrpc.BookRecommendServiceStub stub = BookRecommendServiceGrpc.newStub(channel); // [3]
        StreamObserver<BookRecommendRequest> requestObserver = stub.recommendedBooks(
                new BookRecommendStreamObserver(latch)); // [5]
        BookRecommendRequest req1 = BookRecommendRequest.newBuilder().setTopic("go").build(); // [6]
        BookRecommendRequest req2 = BookRecommendRequest.newBuilder().setTopic("Python").build(); // [6]
        requestObserver.onNext(req1); // [7]
        requestObserver.onNext(req2); // [7]
        requestObserver.onCompleted(); // [8]
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Test
    public void bookRecommendAsyncTestTwo() {
        CountDownLatch latch = new CountDownLatch(1);
        BookRecommendServiceGrpc.BookRecommendServiceStub stub = BookRecommendServiceGrpc.newStub(channel); // [3]
        StreamObserver<BookRecommendRequest> requestObserver = stub.recommendedBooks(
                new BookRecommendStreamObserver(latch)); // [5]
        BookRecommendRequest req1 = BookRecommendRequest.newBuilder().setTopic("java").build(); // [6]
        BookRecommendRequest req2 = BookRecommendRequest.newBuilder().setTopic("rust").build(); // [6]
        requestObserver.onNext(req1); // [7]
        requestObserver.onNext(req2); // [7]
        requestObserver.onCompleted(); // [8]
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Test
    public void bookRecommendAsyncTestThree() {
        CountDownLatch latch = new CountDownLatch(1);
        BookRecommendServiceGrpc.BookRecommendServiceStub stub =
                BookRecommendServiceGrpc.newStub(channel)
                .withDeadline(Deadline.after(550, TimeUnit.MILLISECONDS)); // [4]
        StreamObserver<BookRecommendRequest> requestObserver = stub.recommendedBooks(
                new BookRecommendStreamObserver(latch)); // [5]
        BookRecommendRequest req1 = BookRecommendRequest.newBuilder().setTopic("java").build(); // [6]
        BookRecommendRequest req2 = BookRecommendRequest.newBuilder().setTopic("python").build(); // [6]
        requestObserver.onNext(req1); // [7]
        requestObserver.onNext(req2); // [7]
        requestObserver.onCompleted(); // [8]
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

The following are brief descriptions for some of the Java class(es)/method(s) used in the code above:

As was done previously, we will run 2 tests - one within the client specified deadline and the other that violates the client deadline.

Open two Terminal windows - one for the server and one for the client.

Test - 1

In the server Terminal, execute the following commands:

$ cd $HOME/java/grpc

$ mvn exec:java -Dexec.mainClass=com.polarsparc.gbd.server.BookRecommendServer -Dexec.args="30"

Notice we have specified a timeout of 30 ms. The following would be the typical output:

The following would be the typical output:

Output.21

Started the gRPC BookRecommendService on 20004 with timeout 30...

In the client Terminal, execute the following commands:

$ cd $HOME/java/grpc

$ mvn test -Dtest=com.polarsparc.gbd.client.BookRecommendClientTest

The following would be the typical output:

Output.22

Running com.polarsparc.gbd.client.BookRecommendClientTest
Dec 19, 2020 9:21:34 AM com.polarsparc.gbd.client.BookRecommendStreamObserver onNext
INFO: Recommended book => topic: go, title: :The Go Programming Language, isbn: 44444

Dec 19, 2020 9:21:34 AM com.polarsparc.gbd.client.BookRecommendStreamObserver onNext
INFO: Recommended book => topic: go, title: Go in Practice, isbn: 55555

Dec 19, 2020 9:21:34 AM com.polarsparc.gbd.client.BookRecommendStreamObserver onNext
INFO: Recommended book => topic: go, title: Black Hat Go, isbn: 66666

Dec 19, 2020 9:21:34 AM com.polarsparc.gbd.client.BookRecommendStreamObserver onNext
INFO: Recommended book => topic: Python, title: :Python Crash Course, isbn: 12121

Dec 19, 2020 9:21:34 AM com.polarsparc.gbd.client.BookRecommendStreamObserver onNext
INFO: Recommended book => topic: Python, title: Learning Python, isbn: 34343

Dec 19, 2020 9:21:34 AM com.polarsparc.gbd.client.BookRecommendStreamObserver onNext
INFO: Recommended book => topic: Python, title: Effective Python, isbn: 56565

Dec 19, 2020 9:21:34 AM com.polarsparc.gbd.client.BookRecommendStreamObserver onNext
INFO: Recommended book => topic: java, title: :Effective Java, isbn: 77777

Dec 19, 2020 9:21:34 AM com.polarsparc.gbd.client.BookRecommendStreamObserver onNext
INFO: Recommended book => topic: java, title: Modern Java in Action, isbn: 88888

Dec 19, 2020 9:21:34 AM com.polarsparc.gbd.client.BookRecommendStreamObserver onNext
INFO: Recommended book => topic: java, title: Java: The Complete Reference, isbn: 99999

Dec 19, 2020 9:21:34 AM com.polarsparc.gbd.client.BookRecommendStreamObserver onError
SEVERE: Error status: code - UNKNOWN, description - null

Dec 19, 2020 9:21:34 AM com.polarsparc.gbd.client.BookRecommendStreamObserver onNext
INFO: Recommended book => topic: java, title: :Effective Java, isbn: 77777

Dec 19, 2020 9:21:34 AM com.polarsparc.gbd.client.BookRecommendStreamObserver onNext
INFO: Recommended book => topic: java, title: Modern Java in Action, isbn: 88888

Dec 19, 2020 9:21:34 AM com.polarsparc.gbd.client.BookRecommendStreamObserver onNext
INFO: Recommended book => topic: java, title: Java: The Complete Reference, isbn: 99999

Dec 19, 2020 9:21:34 AM com.polarsparc.gbd.client.BookRecommendStreamObserver onNext
INFO: Recommended book => topic: python, title: :Python Crash Course, isbn: 12121

Dec 19, 2020 9:21:34 AM com.polarsparc.gbd.client.BookRecommendStreamObserver onNext
INFO: Recommended book => topic: python, title: Learning Python, isbn: 34343

Dec 19, 2020 9:21:34 AM com.polarsparc.gbd.client.BookRecommendStreamObserver onNext
INFO: Recommended book => topic: python, title: Effective Python, isbn: 56565
Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 1.022 s

The following would be the additional output on the Terminal running the server:

Output.23

Dec 19, 2020 9:21:33 AM com.polarsparc.gbd.server.BookRecommendRequestStreamObserver <init>
INFO: Timeout value: 30

Dec 19, 2020 9:21:34 AM com.polarsparc.gbd.server.BooksCatalog getBooks
INFO: Books request for topic: GO
Dec 19, 2020 9:21:34 AM com.polarsparc.gbd.server.BooksCatalog getBooks
INFO: Books for key: GO = [Book{title=':The Go Programming Language', ISBN='44444'}, Book{title='Go in Practice', ISBN='55555'}, Book{title='Black Hat Go', ISBN='66666'}]
Dec 19, 2020 9:21:34 AM com.polarsparc.gbd.server.BooksCatalog getBooks
INFO: Books request for topic: PYTHON
Dec 19, 2020 9:21:34 AM com.polarsparc.gbd.server.BooksCatalog getBooks
INFO: Books for key: PYTHON = [Book{title=':Python Crash Course', ISBN='12121'}, Book{title='Learning Python', ISBN='34343'}, Book{title='Effective Python', ISBN='56565'}]
Dec 19, 2020 9:21:34 AM com.polarsparc.gbd.server.BookRecommendRequestStreamObserver <init>
INFO: Timeout value: 30

Dec 19, 2020 9:21:34 AM com.polarsparc.gbd.server.BooksCatalog getBooks
INFO: Books request for topic: JAVA
Dec 19, 2020 9:21:34 AM com.polarsparc.gbd.server.BooksCatalog getBooks
INFO: Books for key: JAVA = [Book{title=':Effective Java', ISBN='77777'}, Book{title='Modern Java in Action', ISBN='88888'}, Book{title='Java: The Complete Reference', ISBN='99999'}]
Dec 19, 2020 9:21:34 AM com.polarsparc.gbd.server.BooksCatalog getBooks
INFO: Books request for topic: RUST
Dec 19, 2020 9:21:34 AM com.polarsparc.gbd.server.BooksCatalog getBooks
SEVERE: No books for topic: RUST
Dec 19, 2020 9:21:34 AM com.polarsparc.gbd.server.BookRecommendRequestStreamObserver onError
SEVERE: No books for topic: RUST
Dec 19, 2020 9:21:34 AM com.polarsparc.gbd.server.BookRecommendRequestStreamObserver <init>
INFO: Timeout value: 30

Dec 19, 2020 9:21:34 AM com.polarsparc.gbd.server.BooksCatalog getBooks
INFO: Books request for topic: JAVA
Dec 19, 2020 9:21:34 AM com.polarsparc.gbd.server.BooksCatalog getBooks
INFO: Books for key: JAVA = [Book{title=':Effective Java', ISBN='77777'}, Book{title='Modern Java in Action', ISBN='88888'}, Book{title='Java: The Complete Reference', ISBN='99999'}]
Dec 19, 2020 9:21:34 AM com.polarsparc.gbd.server.BooksCatalog getBooks
INFO: Books request for topic: PYTHON
Dec 19, 2020 9:21:34 AM com.polarsparc.gbd.server.BooksCatalog getBooks
INFO: Books for key: PYTHON = [Book{title=':Python Crash Course', ISBN='12121'}, Book{title='Learning Python', ISBN='34343'}, Book{title='Effective Python', ISBN='56565'}]
Test - 2

In the server Terminal, terminate the currently running server by pressing CTRL-C and execute the following commands:

$ cd $HOME/java/grpc

$ mvn exec:java -Dexec.mainClass=com.polarsparc.gbd.server.BookRecommendServer -Dexec.args="100"

Now we have specified a longer timeout of 100 ms. The following would be the typical output:

The following would be the typical output:

Output.24

Started the gRPC BookRecommendService on 20004 with timeout 100...

In the client Terminal, re-execute the following commands:

$ cd $HOME/java/grpc

$ mvn test -Dtest=com.polarsparc.gbd.client.BookRecommendClientTest

The following would be the typical output:

Output.25

Running com.polarsparc.gbd.client.BookRecommendClientTest
Dec 19, 2020 9:28:38 AM com.polarsparc.gbd.client.BookRecommendStreamObserver onNext
INFO: Recommended book => topic: go, title: :The Go Programming Language, isbn: 44444

Dec 19, 2020 9:28:38 AM com.polarsparc.gbd.client.BookRecommendStreamObserver onNext
INFO: Recommended book => topic: go, title: Go in Practice, isbn: 55555

Dec 19, 2020 9:28:38 AM com.polarsparc.gbd.client.BookRecommendStreamObserver onNext
INFO: Recommended book => topic: go, title: Black Hat Go, isbn: 66666

Dec 19, 2020 9:28:38 AM com.polarsparc.gbd.client.BookRecommendStreamObserver onNext
INFO: Recommended book => topic: Python, title: :Python Crash Course, isbn: 12121

Dec 19, 2020 9:28:38 AM com.polarsparc.gbd.client.BookRecommendStreamObserver onNext
INFO: Recommended book => topic: Python, title: Learning Python, isbn: 34343

Dec 19, 2020 9:28:38 AM com.polarsparc.gbd.client.BookRecommendStreamObserver onNext
INFO: Recommended book => topic: Python, title: Effective Python, isbn: 56565

Dec 19, 2020 9:28:38 AM com.polarsparc.gbd.client.BookRecommendStreamObserver onNext
INFO: Recommended book => topic: java, title: :Effective Java, isbn: 77777

Dec 19, 2020 9:28:38 AM com.polarsparc.gbd.client.BookRecommendStreamObserver onNext
INFO: Recommended book => topic: java, title: Modern Java in Action, isbn: 88888

Dec 19, 2020 9:28:39 AM com.polarsparc.gbd.client.BookRecommendStreamObserver onNext
INFO: Recommended book => topic: java, title: Java: The Complete Reference, isbn: 99999

Dec 19, 2020 9:28:39 AM com.polarsparc.gbd.client.BookRecommendStreamObserver onError
SEVERE: Error status: code - UNKNOWN, description - null

Dec 19, 2020 9:28:39 AM com.polarsparc.gbd.client.BookRecommendStreamObserver onNext
INFO: Recommended book => topic: java, title: :Effective Java, isbn: 77777

Dec 19, 2020 9:28:39 AM com.polarsparc.gbd.client.BookRecommendStreamObserver onNext
INFO: Recommended book => topic: java, title: Modern Java in Action, isbn: 88888

Dec 19, 2020 9:28:39 AM com.polarsparc.gbd.client.BookRecommendStreamObserver onNext
INFO: Recommended book => topic: java, title: Java: The Complete Reference, isbn: 99999

Dec 19, 2020 9:28:39 AM com.polarsparc.gbd.client.BookRecommendStreamObserver onNext
INFO: Recommended book => topic: python, title: :Python Crash Course, isbn: 12121

Dec 19, 2020 9:28:39 AM com.polarsparc.gbd.client.BookRecommendStreamObserver onNext
INFO: Recommended book => topic: python, title: Learning Python, isbn: 34343

Dec 19, 2020 9:28:39 AM com.polarsparc.gbd.client.BookRecommendStreamObserver onNext
INFO: Recommended book => topic: python, title: Effective Python, isbn: 56565

Dec 19, 2020 9:28:39 AM com.polarsparc.gbd.client.BookRecommendStreamObserver onError
SEVERE: Error status: code - DEADLINE_EXCEEDED, description - deadline exceeded after 0.549538451s. [remote_addr=localhost/127.0.0.1:20004]
Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 2.023 s

In this case, the client specified deadline exceeded resulting in a gRPC error.

The following would be the additional output on the Terminal running the server:

Output.26

Dec 19, 2020 9:28:38 AM com.polarsparc.gbd.server.BookRecommendRequestStreamObserver <init>
INFO: Timeout value: 100

Dec 19, 2020 9:28:38 AM com.polarsparc.gbd.server.BooksCatalog getBooks
INFO: Books request for topic: GO
Dec 19, 2020 9:28:38 AM com.polarsparc.gbd.server.BooksCatalog getBooks
INFO: Books for key: GO = [Book{title=':The Go Programming Language', ISBN='44444'}, Book{title='Go in Practice', ISBN='55555'}, Book{title='Black Hat Go', ISBN='66666'}]
Dec 19, 2020 9:28:38 AM com.polarsparc.gbd.server.BooksCatalog getBooks
INFO: Books request for topic: PYTHON
Dec 19, 2020 9:28:38 AM com.polarsparc.gbd.server.BooksCatalog getBooks
INFO: Books for key: PYTHON = [Book{title=':Python Crash Course', ISBN='12121'}, Book{title='Learning Python', ISBN='34343'}, Book{title='Effective Python', ISBN='56565'}]
Dec 19, 2020 9:28:38 AM com.polarsparc.gbd.server.BookRecommendRequestStreamObserver <init>
INFO: Timeout value: 100

Dec 19, 2020 9:28:38 AM com.polarsparc.gbd.server.BooksCatalog getBooks
INFO: Books request for topic: JAVA
Dec 19, 2020 9:28:38 AM com.polarsparc.gbd.server.BooksCatalog getBooks
INFO: Books for key: JAVA = [Book{title=':Effective Java', ISBN='77777'}, Book{title='Modern Java in Action', ISBN='88888'}, Book{title='Java: The Complete Reference', ISBN='99999'}]
Dec 19, 2020 9:28:39 AM com.polarsparc.gbd.server.BooksCatalog getBooks
INFO: Books request for topic: RUST
Dec 19, 2020 9:28:39 AM com.polarsparc.gbd.server.BooksCatalog getBooks
SEVERE: No books for topic: RUST
Dec 19, 2020 9:28:39 AM com.polarsparc.gbd.server.BookRecommendRequestStreamObserver onError
SEVERE: No books for topic: RUST
Dec 19, 2020 9:28:39 AM com.polarsparc.gbd.server.BookRecommendRequestStreamObserver <init>
INFO: Timeout value: 100

Dec 19, 2020 9:28:39 AM com.polarsparc.gbd.server.BooksCatalog getBooks
INFO: Books request for topic: JAVA
Dec 19, 2020 9:28:39 AM com.polarsparc.gbd.server.BooksCatalog getBooks
INFO: Books for key: JAVA = [Book{title=':Effective Java', ISBN='77777'}, Book{title='Modern Java in Action', ISBN='88888'}, Book{title='Java: The Complete Reference', ISBN='99999'}]
Dec 19, 2020 9:28:39 AM com.polarsparc.gbd.server.BooksCatalog getBooks
INFO: Books request for topic: PYTHON
Dec 19, 2020 9:28:39 AM com.polarsparc.gbd.server.BooksCatalog getBooks
INFO: Books for key: PYTHON = [Book{title=':Python Crash Course', ISBN='12121'}, Book{title='Learning Python', ISBN='34343'}, Book{title='Effective Python', ISBN='56565'}]
Dec 19, 2020 9:28:39 AM com.polarsparc.gbd.server.BookRecommendRequestStreamObserver onError
SEVERE: ERROR - Deadline breached by BookRecommendService at localhost:20004

One could also test with the Go server running and using the Java client and vice versa.

References

Introduction to gRPC - Part 3

Introduction to gRPC - Part 2

Introduction to gRPC - Part 1

Introduction to Google Protocol Buffers

gRPC Go Documentation

gRPC Java Documentation

gRPC Standard Error Model

gRPC and Deadlines

GitHub - gRPC Go

GitHub - gRPC Java


© PolarSPARC