ScalaPB(3): gRPC streaming
接着上期讨论的gRPC unary服务我们跟着介绍gRPC streaming,包括: Server-Streaming, Client-Streaming及Bidirectional-Streaming。我们首先在.proto文件里用IDL描述Server-Streaming服务:
/*
* responding stream of increment results
*/
service SumOneToMany {
rpc AddOneToMany(SumRequest) returns (stream SumResponse) {}
}
message SumRequest {
int32 toAdd = 1;
}
message SumResponse {
int32 currentResult = 1;
}
SumOneToMany服务中AddOneToMany函数接受一个SumRequest然后返回stream SumResponse,就这么简单。经过编译后产生了SumOneToManyGrpc.scala文件,在这个文件里提供了有关RPC操作的api。我们看看protoc把IDL描述的服务函数变成了什么样的scala函数:
def addOneToMany(request: SumRequest, responseObserver: StreamObserver[SumResponse]): Unit
调用scala函数addOneToMany需要传入参数SumRequest和StreamObserver[SumResponse],也就是说用户需要准备这两个入参数。在调用addOneToMany函数时用户事先构建这个StreamObserver传给server,由server把结果通过这个结构传回用户。gRPC是通过StreamObserver类型实例来实现数据streaming的。这个类型的构建例子如下:
val responseObserver = new StreamObserver[SumResponse] {
def onError(t: Throwable): Unit = println(s"ON_ERROR: $t")
def onCompleted(): Unit = println("ON_COMPLETED")
def onNext(value: SumResponse): Unit =
println(s"ON_NEXT: Current sum: ${value.currentResult}")
}
server端通过onNext把结果不断传回给client端,因为这个responseObserver是在client端构建的。下面是SumManyToMany的实现:
class SumOne2ManyService extends SumOneToManyGrpc.SumOneToMany {
override def addOneToMany(request: SumRequest, responseObserver: StreamObserver[SumResponse]): Unit = {
val currentSum: AtomicInt = Atomic(0)
(1 to request.toAdd).map { _ =>
responseObserver.onNext(SumResponse().withCurrentResult(currentSum.incrementAndGet()))
}
Thread.sleep(1000) //delay and then finish
responseObserver.onCompleted()
}
}
这个addOneToMany服务函数把 1-request.toAdd之间的数字逐个通过responseObserver返还调用方。 在客户端如下调用服务:
// get asyn stub
val client: SumOneToManyGrpc.SumOneToManyStub = SumOneToManyGrpc.stub(channel)
// prepare stream observer
val streamObserver = new StreamObserver[SumResponse] {
override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}")
override def onCompleted(): Unit = println("Done incrementing !!!")
override def onNext(value: SumResponse): Unit = println(s"current value: ${value.currentResult}")
}
// call service with stream observer
client.addOneToMany(SumRequest().withToAdd(6),streamObserver)
Client-Streaming服务的IDL如下:
/*
* responding a result from a request of stream of numbers
*/
service SumManyToOne {
rpc AddManyToOne(stream SumRequest ) returns (SumResponse) {}
}
传入stream SumRequest, 返回SumResponse。scalaPB自动产生scala代码中的addManyToOne函数款式如下:
def addManyToOne(responseObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest]
调用方提供StreamObserver[SumResponse]用作返回结果,函数返回客方需要的StreamObserver[SumRequest]用以传递request流。注意:虽然在.proto文件中AddManyToOne的返回结果是单个SumResponse,但产生的scala函数则提供了一个StreamObserver[SumResponse]类型,所以需要谨记只能调用一次onNext。下面是这个服务的实现代码:
class Many2OneService extends SumManyToOneGrpc.SumManyToOne {
val currentSum: AtomicInt = Atomic(0)
override def addManyToOne(responseObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest] =
new StreamObserver[SumRequest] {
val currentSum: AtomicInt = Atomic(0)
override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}")
override def onCompleted(): Unit = println("Done summing!")
override def onNext(value: SumRequest): Unit = {
//only allow one response
if (value.toAdd > 0)
currentSum.add(value.toAdd)
else
responseObserver.onNext(SumResponse(currentSum.addAndGet(value.toAdd)))
}
}
}
客户方调用示范如下:
//pass to server for result
val respStreamObserver = new StreamObserver[SumResponse] {
override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}")
override def onCompleted(): Unit = println("Done responding!")
override def onNext(value: SumResponse): Unit =
println(s"Result: ${value.currentResult}")
}
//get async stub
val client = SumManyToOneGrpc.stub(channel)
//get request stream observer from server
val reqStreamObserver = client.addManyToOne(respStreamObserver)
List(2,5,8,4,0).map { n =>
reqStreamObserver.onNext(SumRequest(n))
}
Bidirectional-Streaming的IDL描述如下:
/*
* Sums up numbers received from the client and returns the current result after each received request.
*/
service SumInter {
rpc AddInter(stream SumRequest) returns (stream SumResponse) {}
}
这个service SumInter 描述了stream SumRequest 及 stream SumResponse运算模式。产生的对应scala函数如下:
def addInter(responseObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest]
这个函数的款式与Client-Streaming服务函数是一样的。但是,我们可以通过responseObserver传递多个SumResponse。这个服务的实现代码是这样的:
class Many2ManyService extends SumInterGrpc.SumInter {
override def addInter(responseObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest] =
new StreamObserver[SumRequest] {
val currentSum: AtomicInt = Atomic(0)
override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}")
override def onCompleted(): Unit = println("Done requesting!")
override def onNext(value: SumRequest): Unit = {
responseObserver.onNext(SumResponse(currentSum.addAndGet(value.toAdd)))
}
}
}
我们可以多次调用responseObserver.onNext。客户端源代码如下:
//create stream observer for result stream
val responseObserver = new StreamObserver[SumResponse] {
def onError(t: Throwable): Unit = println(s"ON_ERROR: $t")
def onCompleted(): Unit = println("ON_COMPLETED")
def onNext(value: SumResponse): Unit =
println(s"ON_NEXT: Current sum: ${value.currentResult}")
}
//get request container
val requestObserver = client.addInter(responseObserver)
scheduler.scheduleWithFixedDelay(0.seconds, 1.seconds) {
val toBeAdded = Random.nextInt(11)
println(s"Adding number: $toBeAdded")
requestObserver.onNext(SumRequest(toBeAdded))
}
下面是本次示范的源代码:
project/scalapb.sbt
addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.18") libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.7.1"
build.sbt
import scalapb.compiler.Version.scalapbVersion
import scalapb.compiler.Version.grpcJavaVersion
name := "learn-gRPC"
version := "0.1"
scalaVersion := "2.12.6"
libraryDependencies ++= Seq(
"com.thesamet.scalapb" %% "scalapb-runtime" % scalapbVersion % "protobuf",
"io.grpc" % "grpc-netty" % grpcJavaVersion,
"com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapbVersion,
"io.monix" %% "monix" % "2.3.0"
)
PB.targets in Compile := Seq(
scalapb.gen() -> (sourceManaged in Compile).value
)
src/main/protobuf/sum.proto
syntax = "proto3";
package learn.grpc.services;
/*
* responding stream of increment results
*/
service SumOneToMany {
rpc AddOneToMany(SumRequest) returns (stream SumResponse) {}
}
/*
* responding a result from a request of stream of numbers
*/
service SumManyToOne {
rpc AddManyToOne(stream SumRequest ) returns (SumResponse) {}
}
/*
* Sums up numbers received from the client and returns the current result after each received request.
*/
service SumInter {
rpc AddInter(stream SumRequest) returns (stream SumResponse) {}
}
message SumRequest {
int32 toAdd = 1;
}
message SumResponse {
int32 currentResult = 1;
}
gRPCServer.scala
package learn.grpc.server
import io.grpc.{ServerBuilder,ServerServiceDefinition}
trait gRPCServer {
def runServer(service: ServerServiceDefinition): Unit = {
val server = ServerBuilder
.forPort(50051)
.addService(service)
.build
.start
// make sure our server is stopped when jvm is shut down
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run(): Unit = server.shutdown()
})
server.awaitTermination()
}
}
OneToManyServer.scala
package learn.grpc.sum.one2many.server
import io.grpc.stub.StreamObserver
import learn.grpc.services.sum._
import monix.execution.atomic.{Atomic,AtomicInt}
import learn.grpc.server.gRPCServer
object One2ManyServer extends gRPCServer {
class SumOne2ManyService extends SumOneToManyGrpc.SumOneToMany {
override def addOneToMany(request: SumRequest, responseObserver: StreamObserver[SumResponse]): Unit = {
val currentSum: AtomicInt = Atomic(0)
(1 to request.toAdd).map { _ =>
responseObserver.onNext(SumResponse().withCurrentResult(currentSum.incrementAndGet()))
}
Thread.sleep(1000) //delay and then finish
responseObserver.onCompleted()
}
}
def main(args: Array[String]) = {
val svc = SumOneToManyGrpc.bindService(new SumOne2ManyService, scala.concurrent.ExecutionContext.global)
runServer(svc)
}
}
OneToManyClient.scala
package learn.grpc.sum.one2many.client
import io.grpc.stub.StreamObserver
import learn.grpc.services.sum._
object One2ManyClient {
def main(args: Array[String]): Unit = {
//build connection channel
val channel = io.grpc.ManagedChannelBuilder
.forAddress("LocalHost",50051)
.usePlaintext(true)
.build()
// get asyn stub
val client: SumOneToManyGrpc.SumOneToManyStub = SumOneToManyGrpc.stub(channel)
// prepare stream observer
val streamObserver = new StreamObserver[SumResponse] {
override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}")
override def onCompleted(): Unit = println("Done incrementing !!!")
override def onNext(value: SumResponse): Unit = println(s"current value: ${value.currentResult}")
}
// call service with stream observer
client.addOneToMany(SumRequest().withToAdd(6),streamObserver)
// wait for async execution
scala.io.StdIn.readLine()
}
}
ManyToOneServer.scala
package learn.grpc.sum.many2one.server
import io.grpc.stub.StreamObserver
import learn.grpc.services.sum._
import learn.grpc.server.gRPCServer
import monix.execution.atomic.{Atomic,AtomicInt}
object Many2OneServer extends gRPCServer {
class Many2OneService extends SumManyToOneGrpc.SumManyToOne {
val currentSum: AtomicInt = Atomic(0)
override def addManyToOne(responseObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest] =
new StreamObserver[SumRequest] {
val currentSum: AtomicInt = Atomic(0)
override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}")
override def onCompleted(): Unit = println("Done summing!")
override def onNext(value: SumRequest): Unit = {
//only allow one response
if (value.toAdd > 0)
currentSum.add(value.toAdd)
else
responseObserver.onNext(SumResponse(currentSum.addAndGet(value.toAdd)))
}
}
}
def main(args: Array[String]): Unit = {
val svc = SumManyToOneGrpc.bindService(new Many2OneService,scala.concurrent.ExecutionContext.global)
runServer(svc)
}
}
ManyToOneClient.scala
package learn.grpc.sum.many2one.client
import io.grpc.stub.StreamObserver
import learn.grpc.services.sum._
object Many2OneClient {
def main(args: Array[String]): Unit = {
//build channel
val channel = io.grpc.ManagedChannelBuilder
.forAddress("LocalHost",50051)
.usePlaintext(true)
.build()
//pass to server for result
val respStreamObserver = new StreamObserver[SumResponse] {
override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}")
override def onCompleted(): Unit = println("Done responding!")
override def onNext(value: SumResponse): Unit =
println(s"Result: ${value.currentResult}")
}
//get async stub
val client = SumManyToOneGrpc.stub(channel)
//get request stream observer from server
val reqStreamObserver = client.addManyToOne(respStreamObserver)
List(2,5,8,4,0).map { n =>
reqStreamObserver.onNext(SumRequest(n))
}
scala.io.StdIn.readLine()
}
}
ManyToManyServer.scala
package learn.grpc.sum.many2many.server
import io.grpc.stub.StreamObserver
import learn.grpc.services.sum._
import learn.grpc.server.gRPCServer
import monix.execution.atomic.{Atomic,AtomicInt}
object Many2ManyServer extends gRPCServer {
class Many2ManyService extends SumInterGrpc.SumInter {
override def addInter(responseObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest] =
new StreamObserver[SumRequest] {
val currentSum: AtomicInt = Atomic(0)
override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}")
override def onCompleted(): Unit = println("Done requesting!")
override def onNext(value: SumRequest): Unit = {
responseObserver.onNext(SumResponse(currentSum.addAndGet(value.toAdd)))
}
}
}
def main(args: Array[String]): Unit = {
val svc = SumInterGrpc.bindService(new Many2ManyService, scala.concurrent.ExecutionContext.global)
runServer(svc)
}
}
ManyToManyClient.scala
package learn.grpc.sum.many2many.client
import monix.execution.Scheduler.{global => scheduler}
import learn.grpc.services.sum._
import scala.concurrent.duration._
import scala.util.Random
import io.grpc._
import io.grpc.stub.StreamObserver
object Many2ManyClient {
def main(args: Array[String]): Unit = {
val channel = ManagedChannelBuilder.forAddress("localhost", 50051).usePlaintext(true).build
val client = SumInterGrpc.stub(channel)
//create stream observer for result stream
val responseObserver = new StreamObserver[SumResponse] {
def onError(t: Throwable): Unit = println(s"ON_ERROR: $t")
def onCompleted(): Unit = println("ON_COMPLETED")
def onNext(value: SumResponse): Unit =
println(s"ON_NEXT: Current sum: ${value.currentResult}")
}
//get request container
val requestObserver = client.addInter(responseObserver)
scheduler.scheduleWithFixedDelay(0.seconds, 1.seconds) {
val toBeAdded = Random.nextInt(11)
println(s"Adding number: $toBeAdded")
requestObserver.onNext(SumRequest(toBeAdded))
}
scala.io.StdIn.readLine()
}
}