1. 概述
1.1 服务定义
向其它的RPC服务一样,GPRC的基础是服务的定义。服务定义远程调用方法的名称、传入参数和返回参数。GRPC默认使用 Protobuf描述服务,protobuf的信息见这篇博客Protobuf3 的第一个Java demo
GRPC一共定义4种服务方法:
- 一元RPC(Unary RPCs ):这是最简单的定义,客户端发送一个请求,服务端返回一个结果
- 服务器流RPC(Server streaming RPCs):客户端发送一个请求,服务端返回一个流给客户端,客户从流中读取一系列消息,直到读取所有小心
- 客户端流RPC(Client streaming RPCs ):客户端通过流向服务端发送一系列消息,然后等待服务端读取完数据并返回处理结果
双向流RPC(Bidirectional streaming RPCs):客户端和服务端都可以独立向对方发送或接受一系列的消息。客户端和服务端读写的顺序是任意。
以上的服务方法定义在proto文件,如下:
syntax = "proto3";
option java_multiple_files = true;
option java_package = "com.hry.spring.grpc.mystream";
option java_outer_classname = "HelloStreamEntity";
service HelloStream {
// A Unary RPC.
rpc simpleRpc(Simple) returns (SimpleFeature) {}
// A server-to-client streaming RPC.
rpc server2ClientRpc(SimpleList) returns (stream SimpleFeature) {}
// A client-to-server streaming RPC.
rpc client2ServerRpc(stream Simple) returns (SimpleSummary) {}
// A Bidirectional streaming RPC.
rpc bindirectionalStreamRpc(stream Simple) returns (stream Simple) {}
}
message Simple {
int32 num = 1;
string name = 2;
}
message SimpleList {
repeated Simple simpleList = 1;
}
message SimpleFeature {
string name = 1;
Simple location = 2;
}
message SimpleSummary {
int32 feature_count = 2;
}
// 测试类
message SimpleFeatureDatabase {
repeated SimpleFeature feature = 1;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
1.2 同步RPC和异步RPC
GRPC 同时支持同步RPC和异步RPC。
同步RPC调用服务方法只支持流RPC(Server streaming RPCs)和一元RPC(Unary RPCs )。异步RPC调用服务方法支持4种方法。
1.3 生成基础代码 ###
参考的Grpc系列一 第一个hello world 例子生成protobuf基础类和HelloStreamGrpc,详细见这里代码
2. 测试代码
这里实现服务端和客户端代码
2.1 服务端
服务端类的实现,通过继承HelloStreamGrpc.HelloStreamImplBase实现,具体服务接口实现见下面:
/**
* 服务端类的实现
*
*/
private static class HelloStreamService extends HelloStreamGrpc.HelloStreamImplBase {
private final List<SimpleFeature> features;
public HelloStreamService(List<SimpleFeature> features) {
this.features = features;
}
...
@Override
public void simpleRpc(Simple request, StreamObserver<SimpleFeature> responseObserver) {
SimpleFeature rtn = SimpleFeature.newBuilder().setName(request.getName() + "simpleRpc").setLocation(request)
.build();
logger.info("recevier simpleRpc : {}", request);
responseObserver.onNext(rtn);
responseObserver.onCompleted();
}
- 服务器流RPC(Server streaming RPCs)服务端实现:
@Override
public void server2ClientRpc(SimpleList request, StreamObserver<SimpleFeature> responseObserver) {
logger.info("recevier server2ClientRpc : {}", request);
for (SimpleFeature feature : this.features) {
Simple simpleLocation = feature.getLocation();
for (Simple o : request.getSimpleListList()) {
if (o.getNum() == simpleLocation.getNum()) {
// 推送记录
responseObserver.onNext(feature);
}
}
}
responseObserver.onCompleted();
}
- 客户端流RPC(Client streaming RPCs )服务端实现:
/**
* 接收完所有的请求后,才返回一个对象
*/
@Override
public StreamObserver<Simple> client2ServerRpc(StreamObserver<SimpleSummary> responseObserver) {
return new StreamObserver<Simple>() {
int feature_count = 0;
@Override
public void onNext(Simple value) {
// 接收请求
logger.info("num={}, client2ServerRpc, content={} ", feature_count, value);
feature_count++;
}
@Override
public void onError(Throwable t) {
logger.error("Simple cancelled, e={}", t);
}
@Override
public void onCompleted() {
logger.info("onCompleted");
// 接收所有请求后,返回总数
SimpleSummary summary = SimpleSummary.newBuilder().setFeatureCount(feature_count).build();
responseObserver.onNext(summary);
// 结束请求
responseObserver.onCompleted();
}
};
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 双向流RPC(Bidirectional streaming RPCs)服务端实现:
/**
* 每接收一个请求,立即返回一个对象
*/
@Override
public StreamObserver<Simple> bindirectionalStreamRpc(StreamObserver<Simple> responseObserver) {
return new StreamObserver<Simple>() {
@Override
public void onNext(Simple value) {
logger.info("bindirectionalStreamRpc receive {}", value);
for (SimpleFeature feature : features) {
Simple simpleLocation = feature.getLocation();
if (value.getNum() == simpleLocation.getNum()) {
// 接收请求后,马上推送记录
Simple rtn = Simple.newBuilder().setName(feature.getName() + "rtn")
.setNum(feature.getLocation().getNum()).build();
responseObserver.onNext(rtn);
}
}
}
@Override
public void onError(Throwable t) {
logger.error("bindirectionalStreamRpc cancelled, e={}", t);
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
启动服务端main方法:
private static final Logger logger = LoggerFactory.getLogger(HelloStreamServer.class);
private final int port;
private final Server server;
public HelloStreamServer(int port) throws IOException {
this.port = port;
this.server = ServerBuilder.forPort(port).addService(new HelloStreamService(HelloUtil.parseFeatures())).build();
}
// 启动服务
public void start() throws IOException {
server.start();
logger.info("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.err.println("*** shutting down gRPC server since JVM is shutting down");
HelloStreamServer.this.stop();
System.err.println("*** server shut down");
}
});
}
// 启动服务
public void stop() {
if (server != null) {
server.shutdown();
}
}
/**
* Await termination on the main thread since the grpc library uses daemon
* threads.
*/
private void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}
public static void main(String[] args) throws Exception {
java.util.logging.Logger.getGlobal().setLevel(java.util.logging.Level.OFF);
HelloStreamServer server = new HelloStreamServer(8980);
server.start();
server.blockUntilShutdown();
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
2.2 客户端
客户端实现代码:创建客户端stub类
HelloStreamBlockingStub blockingStub:阻塞客户端,支持简单一元服务和流输出调用服务
HelloStreamStub asyncStub:异步客户端,支持所有类型调用
public class HelloStreamClient {
private static final Logger logger = LoggerFactory.getLogger(HelloStreamClient.class);
private final ManagedChannel channel;
private final HelloStreamBlockingStub blockingStub;
private final HelloStreamStub asyncStub;
private Random random = new Random();
public HelloStreamClient(String host, int port) {
ManagedChannelBuilder<?> channelBuilder = ManagedChannelBuilder.forAddress(host, port).usePlaintext(true);
channel = channelBuilder.build();
// 创建一个阻塞客户端,支持简单一元服务和流输出调用服务
blockingStub = HelloStreamGrpc.newBlockingStub(channel);
// 创建一个异步客户端,支持所有类型调用
asyncStub = HelloStreamGrpc.newStub(channel);
}
public void shutdown() throws InterruptedException {
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}
......
// 创建Simple对象
private Simple newSimple(int num) {
return Simple.newBuilder().setName("simple" + num).setNum(num).build();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
/**
* 一元服务调用
*/
public void simpleRpc(int num) {
logger.info("request simpleRpc: num={}", num);
Simple simple = Simple.newBuilder().setName("simpleRpc").setNum(num).build();
SimpleFeature feature;
try {
feature = blockingStub.simpleRpc(simple);
} catch (StatusRuntimeException e) {
logger.info("RPC failed: {0}", e.getStatus());
return;
}
logger.info("simpleRpc end called {}", feature);
}
- 服务器流RPC(Server streaming RPCs)客户端实现:
/**
* 阻塞服务器流
*/
public void server2ClientRpc(int num1, int num2) {
logger.info("request server2ClientRpc num1={}, num2={}", num1, num2);
Simple simple = Simple.newBuilder().setName("simple" + num1).setNum(num1).build();
Simple simple2 = Simple.newBuilder().setName("simple" + num2).setNum(num2).build();
SimpleList simpleList = SimpleList.newBuilder().addSimpleList(simple).addSimpleList(simple2).build();
Iterator<SimpleFeature> simpleFeatureIter = blockingStub.server2ClientRpc(simpleList);
for (int i = 1; simpleFeatureIter.hasNext(); i++) {
SimpleFeature feature = simpleFeatureIter.next();
logger.info("Result {} : {}", i, feature);
}
}
- 客户端流RPC(Client streaming RPCs )客户端实现:
/**
* 异步客户端流
*
*/
public void client2ServerRpc(int count) throws InterruptedException {
logger.info("request client2ServerRpc {}", count);
final CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<SimpleSummary> responseObserver = new StreamObserver<SimpleSummary>() {
@Override
public void onNext(SimpleSummary value) {
// 返回SimpleSummary
logger.info("client2ServerRpc onNext : {}", value);
}
@Override
public void onError(Throwable t) {
logger.error("client2ServerRpc error : {}", t);
finishLatch.countDown();
}
@Override
public void onCompleted() {
logger.error("client2ServerRpc finish");
finishLatch.countDown();
}
};
StreamObserver<Simple> requestObserver = asyncStub.client2ServerRpc(responseObserver);
try {
for (int i = 0; i < count; i++) {
logger.info("simple : {}", i);
Simple simple = Simple.newBuilder().setName("client2ServerRpc" + i).setNum(i).build();
requestObserver.onNext(simple);
Thread.sleep(random.nextInt(200) + 50);
}
} catch (RuntimeException e) {
// Cancel RPC
requestObserver.onError(e);
throw e;
}
// 结束请求
requestObserver.onCompleted();
// Receiving happens asynchronously
if (!finishLatch.await(1, TimeUnit.MINUTES)) {
logger.error("client2ServerRpc can not finish within 1 minutes");
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 双向流RPC(Bidirectional streaming RPCs)客户端实现:
/**
* 双向流
*
* @throws InterruptedException
*/
public void bindirectionalStreamRpc() throws InterruptedException {
logger.info("request bindirectionalStreamRpc");
final CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<Simple> requestObserver = asyncStub.bindirectionalStreamRpc(new StreamObserver<Simple>() {
@Override
public void onNext(Simple value) {
logger.info("bindirectionalStreamRpc receive message : {}", value);
}
@Override
public void onError(Throwable t) {
logger.error("bindirectionalStreamRpc Failed: {0}", Status.fromThrowable(t));
finishLatch.countDown();
}
@Override
public void onCompleted() {
logger.info("Finished bindirectionalStreamRpc");
finishLatch.countDown();
}
});
try {
Simple[] requests = { newSimple(1), newSimple(2), newSimple(3), newSimple(4) };
for (Simple request : requests) {
logger.info("Sending message {}", request);
requestObserver.onNext(request);
}
} catch (RuntimeException e) {
// Cancel RPC
requestObserver.onError(e);
throw e;
}
requestObserver.onCompleted();
if (!finishLatch.await(1, TimeUnit.MINUTES)) {
logger.error("routeChat can not finish within 1 minutes");
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
启动客户端
public static void main(String[] args) throws InterruptedException {
HelloStreamClient client = new HelloStreamClient("localhost", 8980);
try {
// simple rpc
client.simpleRpc(1);
// server2ClientRpc
client.server2ClientRpc(1, 2);
// client2ServerRpc
client.client2ServerRpc(2);
// bindirectionalStreamRpc
client.bindirectionalStreamRpc();
} finally {
client.shutdown();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
3. 代码
详细代码见这里Github
|