定义消息和服务
syntax = "proto3";option java_multiple_files = true;package greeter;message HelloRequest {string name = 1;}message HelloReply {string message = 1;}service GreeterService {rpc SayHello (HelloRequest) returns (HelloReply) {}rpc ItKeepsTalking (stream HelloRequest) returns (HelloReply) {}rpc ItKeepsReplying (HelloRequest) returns (stream HelloReply) {}rpc StreamHellos (stream HelloRequest) returns (stream HelloReply) {}}
这里定义了两个消息:HelloRequest、HelloReply和GreeterService服务,GreeterService定义了4个服务方法,分别是:
SayHello:经典的请求-响应服务,发送一个请求获得一个响应;
ItKeepsTalking:持续不断的发送多个请求,在请求停止后获得一个响应;
ItKeepsReplying:发送一个请求,获得持续不断的多个响应;
streamHellos:持续不断的发送响应的同时也可获得持续不断的响应,可以通过Source.queue来获得可发送数据的Queue和获得响应数据的Source。
实现 gRPC 服务
class GreeterServiceImpl()(implicit system: ActorSystem[_]) extends GreeterService {import system.executionContextoverride def sayHello(in: HelloRequest): Future[HelloReply] = {Future.successful(HelloReply(s"Hello, ${in.name}."))}override def itKeepsTalking(in: Source[HelloRequest, NotUsed]): Future[HelloReply] = {in.runWith(Sink.seq).map(ins => HelloReply("Hello, " + ins.map(_.name).mkString("", ", ", ".")))}override def itKeepsReplying(in: HelloRequest): Source[HelloReply, NotUsed] = {Source.fromIterator(() => Iterator.from(1)).map(n => HelloReply(s"Hello, ${in.name}; this is $n times."))}override def streamHellos(ins: Source[HelloRequest, NotUsed]): Source[HelloReply, NotUsed] = {ins.map(in => HelloReply(s"Hello, ${in.name}."))}}
Akka gRPC提供了基于 Akka Streams 的API,更多 Akka Streams 的内容请参阅:Akka 流(Streams)。
itKeepsTalking服务从客户端持续接收HelloRequest消息流,直到客户端主动停止(服务端也可以停止这个流,但这个服务语义上并未体现这一点)。这里收集客户端发送的所有元素并通过Sink.seq汇聚成一个序列,再构造HelloReply消息发送回客户端。
itKeepsReplying服务从客户端接收一个请求,持续不断的向客户端发送响应(一直到客户端主动终止)。这可以用来实现某些实时监控业务,当服务端收到对某个指标的监控请求后,服务端按一定的时间间隔持续不断的向客户端发送监控指标:
def sendMetrics(in: MetricRequest): Source[MetricItem, NotUsed] = {val (queue, source) =Source.queue[MetricItem](16, OverflowStrategy.backpressure).preMaterialize()Source.tick(1.seconds, 1.seconds, MetricItem()).runForeach(metric => queue.offer(metric))source}
sendMetrics服务模拟了一个监控指标发送,每隔1秒钟向客户端发送一个指标数据。
streamHellos服务从客户端获得持续的请求,同时可异步的向客户端返回持续的响应。我们可以基于它来实现心跳。
def heartbeat(in: Source[Heartbeat, NotUsed]): Source[HeartbeatAck, NotUsed] = {val ref: ActorRef[Heartbeat] = getClientManager(in.clientId) //in.map { req =>ref ! reqHeartbeatAck()}}
heartbeat收到心跳请求后马上就像客户端返回一个HeartbeatAck的心跳确认请求,因为这里心跳只用于保持连接,返回一个空响应即可。而ref ! req将心跳请求发送给ref指代的一个客户端Manager业务处理actor,由actor实现心跳超时监控,可以通过配置actor的ReceiveTimeout来实现心跳超时判断。
测试 gRPC 服务
使用 Scalatest 对实现的4个gRPC服务进行测试,下面是单元测试代码:
"sayHello" in {greeterClient.sayHello(HelloRequest("Scala")).futureValue should ===(HelloReply("Hello, Scala."))}"itKeepsReplying" in {greeterClient.itKeepsReplying(HelloRequest("Scala")).take(5).runWith(Sink.seq).futureValue should ===(Seq(HelloReply("Hello, Scala; this is 1 times."),HelloReply("Hello, Scala; this is 2 times."),HelloReply("Hello, Scala; this is 3 times."),HelloReply("Hello, Scala; this is 4 times."),HelloReply("Hello, Scala; this is 5 times.")))}"itKeepsTalking" in {val (queue, in) =Source.queue[HelloRequest](16, OverflowStrategy.backpressure).preMaterialize()val f = greeterClient.itKeepsTalking(in)Seq("Scala", "Java", "Groovy", "Kotlin").foreach(program =>queue.offer(HelloRequest(program)))TimeUnit.SECONDS.sleep(1)queue.complete()f.futureValue should ===(HelloReply("Hello, Scala, Java, Groovy, Kotlin."))}"streamHellos" in {val (queue, in) =Source.queue[HelloRequest](16, OverflowStrategy.backpressure).preMaterialize()val f = greeterClient.streamHellos(in).runWith(Sink.seq)Seq("Scala", "Java", "Groovy", "Kotlin").foreach(item =>queue.offer(HelloRequest(item)))TimeUnit.SECONDS.sleep(1)queue.complete()f.futureValue should ===(Seq(HelloReply("Hello, Scala."),HelloReply("Hello, Java."),HelloReply("Hello, Groovy."),HelloReply("Hello, Kotlin.")))}
在运行测试前需要先启动gRPC服务,在 Scalatest 的beforeAll函数内启动gRPC HTTP 2服务:
override protected def beforeAll(): Unit = {super.beforeAll()val handler = GreeterServiceHandler(new GreeterServiceImpl())Http().bindAndHandleAsync(handler, "localhost", 8000)greeterClient = GreeterServiceClient(GrpcClientSettings.fromConfig(GreeterService.name))}
在构造 GreeterServiceClient gRCP客户端时需要提供GrpcClientSettings设置选项,这里通过调用fromConfig函数来从 HOCON 配置文件里读取gRPC服务选项,相应的application-test.conf配置文件内容如下:
akka.http.server.preview.enable-http2 = onakka.grpc.client {"greeter.GreeterService" {host = "localhost"port = 8000use-tls = false}}
其中use-tls设置gRPC客户端不使用HTTPs建立连接,因为我们这个单元测试启动的gRPC HTTP服务不未启动SSL/TLS。
小结
本文节选自《Akka Cookbook》,更多内容请访问《Akka Cookbook》:
https://www.yangbajing.me/akka-cookbook/grpc/grpc.html




