CHAT-SERVER using gRPC Bidirectional Streaming

CHAT-SERVER using gRPC Bidirectional Streaming

In this post, we will cover how to create a basic chat server for one-to-one chatting using gRPC bidirectional streaming. We are going to use Go language for both gRPC server and gRPC client programs.

Protocol-buffer or protobuf is an efficient format of data transfer that is preferably used with gRPC. First, we will create a protobuf file with messages that define the format of communication between gRPC server and client and, we also define Remote Procedure Call (RPC) services in the same protobuf file.

Create a protobuf file chat.proto

Let's first create a chat.proto file in the project root directory and copy the following lines.

// path: ./chat.proto

syntax = "proto3";

package chatserver;

message FromClient {
    string name = 1;
    string body = 2;
}

message FromServer {
    string name = 1;
    string body = 2;
}

service Services {

    rpc ChatService(stream FromClient) returns (stream FromServer){};

}

The syntax = 'proto3' specifies that we are using proto3 version syntax (the default version while writing this post is proto2). We have also specified a package name as chatserver. Then, we have defined two messages, one for clients to server communication FromClient and one for the server to client communication FromServer. The fields in the message are identical in this case though; the name is for the user's name and the body is for the user's message.

To generate the service interface, we need to define RPC methods in the chat.proto file. In this example, we have defined only one RPC method i.e.ChatService. The ChatService method will be called by clients to set up bidirectional gRPC streams between client and server.

Compile protobuf file chat.proto

Now that we have our chat.proto file is ready. Next step is to compile the chat.proto file to generate the compiled *.pb.go file with Interfaces that we will be using to send and receive messages.

Before you proceed, please ensure that you have installed the following in your system, and both protoc and protoc-gen-go are in system $PATH:

  • protobuf compiler
  • Golang Protobuf plugin: go install google.golang.org/protobuf/cmd/protoc-gen-go

Now, run the command below in your preferred shell with PWD as your project root directory to generate golang code referring to chat.proto file.

$ protoc --go_out=plugins=grpc:chatserver chat.proto

The above command will generate a file chatserver.pb.go inside a directory ./chatserver/.

Create a file server.go under package main

Let's create our project as a Go module for dependency management by executing the following command in our project root directory. This will create a go.mod file that would identify our project as a module.

$ go mod init grpcChatServer

Now, create a file named as server.go in our project root directory. We define a listener associated with a port which can be referred from environment variables. In case PORT is not set as a system environment, then let's assign 5000 (some port) as the default Port for our gRPC server.

func main() {
  // assign port
  Port := os.Getenv("PORT")
    if Port == "" {
        Port = "5000"// port default : 5000 if in env port is not set
    } 

  // initiate listener
    listen, err := net.Listen("tcp", Port)
    if err != nil {
        log.Fatalf("Could not listen @ %v :: %v", Port, err)
    }
  log.Println("Listening @ "+Port)  

    // ...
}

Create a gRPC server instance that would listen and server through assigned Port.

grpcServer := grpc.NewServer()
err = grpcServer.Serve(listen)
    if err != nil {
        log.Fatalf("Failed to start gRPC server :: %v", err)
    }

Though we have created and also can run the gRPC server, it will not do anything impressive. Let's create a chatserver.go file that invokes interfaces defined in chatserver.pb.go file which will help to establish bidirectional stream between server and client.

Create chatserver.go file under package chatserver

Let's create a new file named aschatserver.go inside chatserver directory (this is the same directory that has chatserver.pb.go file generated earlier through the proto compiler).

Now, define a struct messageUnit for handling the message in the server. We also create another struct messageQue that basically a slice MQue of type messageUnit and variable mu of type sync.Muex . Go sync.Mutex implements memory Lock() and UnLock() which are used to avoid race conditions while accessing global variables asynchronously through multiple threads.

type messageUnit struct {
    ClientName        string
    MessageBody       string
    MessageUniqueCode int
    ClientUniqueCode  int
}

type messageQue struct {
    MQue []messageUnit
    mu sync.Mutex
}

var messageQueObject = messageQue{}

In addition to the above, lets define a type struct with name ChatServer which implemets a method ChatService as defined in chat.proto file. ChatService method takes an argument of type Services_ChatServiceServer (defined in chatserver.pb.go file) and returns error.

type ChatServer struct {
}

//ChatService 
func (is *ChatServer) ChatService(csi Services_ChatServiceServer) error {

// ...

}

Now, inside our chatservice method we need to call two methods - one for receiving messages from stream and one for sending messages to stream. With recieveFromStream method, we parse the message from the client and append it to our messageQue object.

// recieve from stream
func recieveFromStream(csi_ Services_ChatServiceServer, clientUniqueCode_ int) {

    for {
        req, err := csi_.Recv()
        if err != nil {
            log.Printf("Error reciving request from client :: %v", err)
            break

        } else {
            messageQueObject.mu.Lock()
            messageQueObject.MQue = append(messageQueObject.MQue, messageUnit{ClientName: req.Name, MessageBody: req.Body, MessageUniqueCode: rand.Intn(1e8), ClientUniqueCode: clientUniqueCode_})
            messageQueObject.mu.Unlock()
            log.Printf("%v", messageQueObject.MQue[len(messageQueObject.MQue)-1])
        }

    }

}

With sendToStream method, we serialize the message from messageQue and send it to the designated client.

//send to stream
func sendToStream(csi_ Services_ChatServiceServer, clientUniqueCode_ int, errch_ chan error) {

    for {

        for {
            time.Sleep(500 * time.Millisecond)
            messageQueObject.mu.Lock()
            if len(messageQueObject.MQue) == 0 {
                messageQueObject.mu.Unlock()
                break
            }
            senderUniqueCode := messageQueObject.MQue[0].ClientUniqueCode
            senderName4client := messageQueObject.MQue[0].ClientName
            message4client := messageQueObject.MQue[0].MessageBody
            messageQueObject.mu.Unlock()
            if senderUniqueCode != clientUniqueCode_ {
                err := csi_.Send(&FromServer{Name: senderName4client, Body: message4client})

                if err != nil {
                    errch_ <- err
                }
                messageQueObject.mu.Lock()
                if len(messageQueObject.MQue) >= 2 {
                    messageQueObject.MQue = messageQueObject.MQue[1:] // if send success > delete message
                } else {
                    messageQueObject.MQue = []messageUnit{}
                }
                messageQueObject.mu.Unlock()

            }

        }

        time.Sleep(1 * time.Second)

    }

}

Let's call both recieveFromStream and sendToStream method from ChatService method through go routines. We have created one channel of type error which is used to pass error, if any, to the client during sending message to stream; the same error also would lead to termination of the connection from server-side. Every time a client connects to the server, two go routines would be spawned for recieving and sending messages. Whenever multiple Go-routines asynchronously access a variable (memory location), mutex blocking is used above to avoid Race condition.

// ChatService  
func (is *ChatServer) ChatService(csi Services_ChatServiceServer) error {

    clientUniqueCode := rand.Intn(1e3)

    // recieve request <<< client
    go recieveFromStream(csi, clientUniqueCode)
    //stream >>> client
    errch := make(chan error)
    go sendToStream(csi, clientUniqueCode, errch)

    return <-errch
}

Now, we need to register ChatService by adding the following two lines in server.go file.

// register ChatService
    cs := chatserver.ChatServer{}
    chatserver.RegisterServicesServer(grpcServer, &cs)

with this, we have finished our serverside code for gRPC bidirectional streaming. We can choose any language that supports gRPC for creating client stub. For simplicity, in this post let's create our client-side program in Golang. The chatserver.pb.go file also includes interfaces for the client.

Create client.go for client side stubs

let's create a client.go file in our project root directory. The main intention now is to have two clients that would first establish bidirectional streaming connections with the server and then chat with each other. In this post, we will be using the console in the terminal for posting and receiving messages.

For the sake of simplicity, in this post, we have two clients who communicate through one-to-one message. Clients shall have a name and would be able to send and receive messages. Let's define a struct clientHandle which basically has two fields one is clientName and the second one is the stream. clientHandle implements two methods one for sending message sendMessage and one for receiving message receiveMessage.

type clientHandle struct {
    stream     chatserver.Services_ChatServiceClient
    clientName string
}

// Assign name
func (ch *clientHandle) clientConfig() {

    reader := bufio.NewReader(os.Stdin)
    fmt.Printf("Your Name : ")
    msg, err := reader.ReadString('\n')
    if err != nil {
        log.Fatalf("Failed to read from console :: %v", err)

    }
    ch.clientName = strings.TrimRight(msg, "\r\n")

}

// send Message
func (ch *clientHandle) sendMessage() {

    for {
        reader := bufio.NewReader(os.Stdin)
        clientMessage, err := reader.ReadString('\n')
        clientMessage = strings.TrimRight(clientMessage, "\r\n")
        if err != nil {
            log.Printf("Failed to read from console :: %v", err)
            continue
        }

        clientMessageBox := &chatserver.FromClient{
            Name: ch.clientName,
            Body: clientMessage,
        }

        err = ch.stream.Send(clientMessageBox)

        if err != nil {
            log.Printf("Error while sending to server :: %v", err)
        }

    }

}

// receive message
func (ch *clientHandle) receiveMessage() {

    for {
        resp, err := ch.stream.Recv()
        if err != nil {
            log.Fatalf("can not receive %v", err)
        }
        log.Printf("%s : %s", resp.Name, resp.Body)
    }
}

In the main function, the client dial gRPC server addressing the port that the server is listening to. The client.ChatService method is called with a context to initiate a bidirectional stream between the client and the gRPC server. Next, we are running sendMessage and receiveMessage loops in separate go-routines. The main method is blocked by a dummy chan.

func main() {

    const serverID = "localhost:5000"

    log.Println("Connecting : " + serverID)
    conn, err := grpc.Dial(serverID, grpc.WithInsecure())

    if err != nil {
        log.Fatalf("Failed to connect gRPC server :: %v", err)
    }
    defer conn.Close()

    client := chatserver.NewServicesClient(conn)

    stream, err := client.ChatService(context.Background())
    if err != nil {
        log.Fatalf("Failed to get response from gRPC server :: %v", err)
    }

    ch := clientHandle{stream: stream}
    ch.clientConfig()
    go ch.sendMessage()
    go ch.receiveMessage()

    // block main
    bl := make(chan bool)
    <-bl

}

Now we are done! Let's type the following commands on the terminal to run our gRPC chatserver and clients.

# run server (Termina-1)

$ env PORT=5000 go run server.go

# Run client.go file in two separate terminals (Terminal -2 and Terminal -3)

$ go run server.go

server.png clients.png

I hope, you enjoyed this post. I would appreciate your feedback/suggestions/comments in the comment section below.

Thanks.

Follow Me :

YouTube CodeFuture , Twitter

https://github.com/rrrCode9/gRPC-Bidirectional-Streaming-ChatServer.git

protobuf compiler, grpc.io

#gRPC #BidirectionalStreaming #Golang #chatServer