In this post, we will cover how to create a basic chat server for one-to-one chatting using gRPC bidirectional streaming. We are going to use Go language for both gRPC server and gRPC client programs.
Protocol-buffer or protobuf is an efficient format of data transfer that is preferably used with gRPC. First, we will create a protobuf file with messages that define the format of communication between gRPC server and client and, we also define Remote Procedure Call (RPC) services in the same protobuf file.
Create a protobuf file chat.proto
Let's first create a chat.proto file in the project root directory and copy the following lines.
// path: ./chat.proto
syntax = "proto3";
package chatserver;
message FromClient {
string name = 1;
string body = 2;
}
message FromServer {
string name = 1;
string body = 2;
}
service Services {
rpc ChatService(stream FromClient) returns (stream FromServer){};
}
The syntax = 'proto3'
specifies that we are using proto3
version syntax (the default version while writing this post is proto2
). We have also specified a package name as chatserver
. Then, we have defined two messages, one for clients to server communication FromClient
and one for the server to client communication FromServer
. The fields in the message are identical in this case though; the name
is for the user's name and the body is for the user's message.
To generate the service interface, we need to define RPC methods in the chat.proto
file. In this example, we have defined only one RPC method i.e.ChatService
. The ChatService
method will be called by clients to set up bidirectional gRPC streams between client and server.
Compile protobuf file chat.proto
Now that we have our chat.proto
file is ready. Next step is to compile the chat.proto
file to generate the compiled *.pb.go
file with Interfaces that we will be using to send and receive messages.
Before you proceed, please ensure that you have installed the following in your system, and both protoc and protoc-gen-go are in system $PATH:
- protobuf compiler
- Golang Protobuf plugin:
go install google.golang.org/protobuf/cmd/protoc-gen-go
Now, run the command below in your preferred shell with PWD as your project root directory to generate golang code referring to chat.proto
file.
$ protoc --go_out=plugins=grpc:chatserver chat.proto
The above command will generate a file chatserver.pb.go
inside a directory ./chatserver/
.
Create a file server.go
under package main
Let's create our project as a Go module for dependency management by executing the following command in our project root directory. This will create a go.mod
file that would identify our project as a module.
$ go mod init grpcChatServer
Now, create a file named as server.go
in our project root directory. We define a listener associated with a port which can be referred from environment variables. In case PORT
is not set as a system environment, then let's assign 5000 (some port) as the default Port for our gRPC server.
func main() {
// assign port
Port := os.Getenv("PORT")
if Port == "" {
Port = "5000"// port default : 5000 if in env port is not set
}
// initiate listener
listen, err := net.Listen("tcp", Port)
if err != nil {
log.Fatalf("Could not listen @ %v :: %v", Port, err)
}
log.Println("Listening @ "+Port)
// ...
}
Create a gRPC server instance that would listen and server through assigned Port
.
grpcServer := grpc.NewServer()
err = grpcServer.Serve(listen)
if err != nil {
log.Fatalf("Failed to start gRPC server :: %v", err)
}
Though we have created and also can run the gRPC server, it will not do anything impressive. Let's create a chatserver.go
file that invokes interfaces defined in chatserver.pb.go
file which will help to establish bidirectional stream between server and client.
Create chatserver.go file under package chatserver
Let's create a new file named aschatserver.go
inside chatserver directory (this is the same directory that has chatserver.pb.go
file generated earlier through the proto compiler).
Now, define a struct messageUnit
for handling the message in the server. We also create another struct messageQue
that basically a slice MQue
of type messageUnit
and variable mu
of type sync.Muex
. Go sync.Mutex
implements memory Lock()
and UnLock()
which are used to avoid race conditions while accessing global variables asynchronously through multiple threads.
type messageUnit struct {
ClientName string
MessageBody string
MessageUniqueCode int
ClientUniqueCode int
}
type messageQue struct {
MQue []messageUnit
mu sync.Mutex
}
var messageQueObject = messageQue{}
In addition to the above, lets define a type struct with name ChatServer
which implemets a method ChatService
as defined in chat.proto file. ChatService
method takes an argument of type Services_ChatServiceServer
(defined in chatserver.pb.go file) and returns error
.
type ChatServer struct {
}
//ChatService
func (is *ChatServer) ChatService(csi Services_ChatServiceServer) error {
// ...
}
Now, inside our chatservice
method we need to call two methods - one for receiving messages from stream and one for sending messages to stream. With recieveFromStream
method, we parse the message from the client and append it to our messageQue
object.
// recieve from stream
func recieveFromStream(csi_ Services_ChatServiceServer, clientUniqueCode_ int) {
for {
req, err := csi_.Recv()
if err != nil {
log.Printf("Error reciving request from client :: %v", err)
break
} else {
messageQueObject.mu.Lock()
messageQueObject.MQue = append(messageQueObject.MQue, messageUnit{ClientName: req.Name, MessageBody: req.Body, MessageUniqueCode: rand.Intn(1e8), ClientUniqueCode: clientUniqueCode_})
messageQueObject.mu.Unlock()
log.Printf("%v", messageQueObject.MQue[len(messageQueObject.MQue)-1])
}
}
}
With sendToStream
method, we serialize the message from messageQue
and send it to the designated client.
//send to stream
func sendToStream(csi_ Services_ChatServiceServer, clientUniqueCode_ int, errch_ chan error) {
for {
for {
time.Sleep(500 * time.Millisecond)
messageQueObject.mu.Lock()
if len(messageQueObject.MQue) == 0 {
messageQueObject.mu.Unlock()
break
}
senderUniqueCode := messageQueObject.MQue[0].ClientUniqueCode
senderName4client := messageQueObject.MQue[0].ClientName
message4client := messageQueObject.MQue[0].MessageBody
messageQueObject.mu.Unlock()
if senderUniqueCode != clientUniqueCode_ {
err := csi_.Send(&FromServer{Name: senderName4client, Body: message4client})
if err != nil {
errch_ <- err
}
messageQueObject.mu.Lock()
if len(messageQueObject.MQue) >= 2 {
messageQueObject.MQue = messageQueObject.MQue[1:] // if send success > delete message
} else {
messageQueObject.MQue = []messageUnit{}
}
messageQueObject.mu.Unlock()
}
}
time.Sleep(1 * time.Second)
}
}
Let's call both recieveFromStream
and sendToStream
method from ChatService
method through go
routines. We have created one channel of type error
which is used to pass error, if any, to the client during sending message to stream; the same error also would lead to termination of the connection from server-side. Every time a client connects to the server, two go
routines would be spawned for recieving and sending messages. Whenever multiple Go-routines asynchronously access a variable (memory location), mutex blocking is used above to avoid Race condition.
// ChatService
func (is *ChatServer) ChatService(csi Services_ChatServiceServer) error {
clientUniqueCode := rand.Intn(1e3)
// recieve request <<< client
go recieveFromStream(csi, clientUniqueCode)
//stream >>> client
errch := make(chan error)
go sendToStream(csi, clientUniqueCode, errch)
return <-errch
}
Now, we need to register ChatService
by adding the following two lines in server.go file.
// register ChatService
cs := chatserver.ChatServer{}
chatserver.RegisterServicesServer(grpcServer, &cs)
with this, we have finished our serverside code for gRPC bidirectional streaming. We can choose any language that supports gRPC for creating client stub. For simplicity, in this post let's create our client-side program in Golang. The chatserver.pb.go
file also includes interfaces for the client.
Create client.go
for client side stubs
let's create a client.go file in our project root directory. The main intention now is to have two clients that would first establish bidirectional streaming connections with the server and then chat with each other. In this post, we will be using the console in the terminal for posting and receiving messages.
For the sake of simplicity, in this post, we have two clients who communicate through one-to-one message. Clients shall have a name and would be able to send and receive messages. Let's define a struct clientHandle
which basically has two fields one is clientName
and the second one is the stream
. clientHandle
implements two methods one for sending message sendMessage
and one for receiving message receiveMessage
.
type clientHandle struct {
stream chatserver.Services_ChatServiceClient
clientName string
}
// Assign name
func (ch *clientHandle) clientConfig() {
reader := bufio.NewReader(os.Stdin)
fmt.Printf("Your Name : ")
msg, err := reader.ReadString('\n')
if err != nil {
log.Fatalf("Failed to read from console :: %v", err)
}
ch.clientName = strings.TrimRight(msg, "\r\n")
}
// send Message
func (ch *clientHandle) sendMessage() {
for {
reader := bufio.NewReader(os.Stdin)
clientMessage, err := reader.ReadString('\n')
clientMessage = strings.TrimRight(clientMessage, "\r\n")
if err != nil {
log.Printf("Failed to read from console :: %v", err)
continue
}
clientMessageBox := &chatserver.FromClient{
Name: ch.clientName,
Body: clientMessage,
}
err = ch.stream.Send(clientMessageBox)
if err != nil {
log.Printf("Error while sending to server :: %v", err)
}
}
}
// receive message
func (ch *clientHandle) receiveMessage() {
for {
resp, err := ch.stream.Recv()
if err != nil {
log.Fatalf("can not receive %v", err)
}
log.Printf("%s : %s", resp.Name, resp.Body)
}
}
In the main function, the client dial gRPC server addressing the port that the server is listening to. The client.ChatService
method is called with a context
to initiate a bidirectional stream between the client and the gRPC server. Next, we are running sendMessage
and receiveMessage
loops in separate go-routines. The main method is blocked by a dummy chan
.
func main() {
const serverID = "localhost:5000"
log.Println("Connecting : " + serverID)
conn, err := grpc.Dial(serverID, grpc.WithInsecure())
if err != nil {
log.Fatalf("Failed to connect gRPC server :: %v", err)
}
defer conn.Close()
client := chatserver.NewServicesClient(conn)
stream, err := client.ChatService(context.Background())
if err != nil {
log.Fatalf("Failed to get response from gRPC server :: %v", err)
}
ch := clientHandle{stream: stream}
ch.clientConfig()
go ch.sendMessage()
go ch.receiveMessage()
// block main
bl := make(chan bool)
<-bl
}
Now we are done! Let's type the following commands on the terminal to run our gRPC chatserver and clients.
# run server (Termina-1)
$ env PORT=5000 go run server.go
# Run client.go file in two separate terminals (Terminal -2 and Terminal -3)
$ go run server.go
I hope, you enjoyed this post. I would appreciate your feedback/suggestions/comments in the comment section below.
Thanks.
Follow Me :
Link to GitHub :
https://github.com/rrrCode9/gRPC-Bidirectional-Streaming-ChatServer.git
Useful Links :
#gRPC #BidirectionalStreaming #Golang #chatServer