分享

Grpc系列二 Grpc4种服务方法的定义和实现

 WindySky 2017-08-01

1. 概述

1.1 服务定义

向其它的RPC服务一样,GPRC的基础是服务的定义。服务定义远程调用方法的名称、传入参数和返回参数。GRPC默认使用 Protobuf描述服务,protobuf的信息见这篇博客Protobuf3 的第一个Java demo

GRPC一共定义4种服务方法:

  1. 一元RPC(Unary RPCs ):这是最简单的定义,客户端发送一个请求,服务端返回一个结果
  2. 服务器流RPC(Server streaming RPCs):客户端发送一个请求,服务端返回一个流给客户端,客户从流中读取一系列消息,直到读取所有小心
  3. 客户端流RPC(Client streaming RPCs ):客户端通过流向服务端发送一系列消息,然后等待服务端读取完数据并返回处理结果
  4. 双向流RPC(Bidirectional streaming RPCs):客户端和服务端都可以独立向对方发送或接受一系列的消息。客户端和服务端读写的顺序是任意。

    以上的服务方法定义在proto文件,如下:

syntax = "proto3";

option java_multiple_files = true;
option java_package = "com.hry.spring.grpc.mystream";
option java_outer_classname = "HelloStreamEntity";

service HelloStream {
  // A Unary RPC.
  rpc simpleRpc(Simple) returns (SimpleFeature) {}

  // A server-to-client streaming RPC.
  rpc server2ClientRpc(SimpleList) returns (stream SimpleFeature) {}

  // A client-to-server streaming RPC.
  rpc client2ServerRpc(stream Simple) returns (SimpleSummary) {}

  // A Bidirectional streaming RPC.
  rpc bindirectionalStreamRpc(stream Simple) returns (stream Simple) {}
}


message Simple {
  int32 num = 1;
  string name = 2;
}

message SimpleList {
  repeated Simple simpleList = 1;
}

message SimpleFeature {
  string name = 1;
  Simple location = 2;
}

message SimpleSummary {
  int32 feature_count = 2;
}

// 测试类
message SimpleFeatureDatabase {
  repeated SimpleFeature feature = 1;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

1.2 同步RPC和异步RPC

GRPC 同时支持同步RPC和异步RPC。
同步RPC调用服务方法只支持流RPC(Server streaming RPCs)和一元RPC(Unary RPCs )。异步RPC调用服务方法支持4种方法。

1.3 生成基础代码 ###

参考的Grpc系列一 第一个hello world 例子生成protobuf基础类和HelloStreamGrpc,详细见这里代码

2. 测试代码

这里实现服务端和客户端代码

2.1 服务端

服务端类的实现,通过继承HelloStreamGrpc.HelloStreamImplBase实现,具体服务接口实现见下面:

/**
     * 服务端类的实现
     *
     */
    private static class HelloStreamService extends HelloStreamGrpc.HelloStreamImplBase {
        private final List<SimpleFeature> features;

        public HelloStreamService(List<SimpleFeature> features) {
            this.features = features;
        }
    ...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 一元RPC(Unary RPCs )服务端实现:
        @Override
        public void simpleRpc(Simple request, StreamObserver<SimpleFeature> responseObserver) {
            SimpleFeature rtn = SimpleFeature.newBuilder().setName(request.getName() + "simpleRpc").setLocation(request)
                    .build();
            logger.info("recevier simpleRpc : {}", request);
            responseObserver.onNext(rtn);
            responseObserver.onCompleted();
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 服务器流RPC(Server streaming RPCs)服务端实现:
        @Override
        public void server2ClientRpc(SimpleList request, StreamObserver<SimpleFeature> responseObserver) {
            logger.info("recevier server2ClientRpc : {}", request);
            for (SimpleFeature feature : this.features) {
                Simple simpleLocation = feature.getLocation();
                for (Simple o : request.getSimpleListList()) {
                    if (o.getNum() == simpleLocation.getNum()) {
                        // 推送记录
                        responseObserver.onNext(feature);
                    }
                }
            }
            responseObserver.onCompleted();
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 客户端流RPC(Client streaming RPCs )服务端实现:
/**
         * 接收完所有的请求后,才返回一个对象
         */
        @Override
        public StreamObserver<Simple> client2ServerRpc(StreamObserver<SimpleSummary> responseObserver) {

            return new StreamObserver<Simple>() {
                int feature_count = 0;

                @Override
                public void onNext(Simple value) {
                    // 接收请求
                    logger.info("num={}, client2ServerRpc, content={} ", feature_count, value);
                    feature_count++;
                }

                @Override
                public void onError(Throwable t) {
                    logger.error("Simple cancelled, e={}", t);
                }

                @Override
                public void onCompleted() {
                    logger.info("onCompleted");
                    // 接收所有请求后,返回总数
                    SimpleSummary summary = SimpleSummary.newBuilder().setFeatureCount(feature_count).build();
                    responseObserver.onNext(summary);
                    // 结束请求
                    responseObserver.onCompleted();
                }
            };
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 双向流RPC(Bidirectional streaming RPCs)服务端实现:
        /**
         * 每接收一个请求,立即返回一个对象
         */
        @Override
        public StreamObserver<Simple> bindirectionalStreamRpc(StreamObserver<Simple> responseObserver) {
            return new StreamObserver<Simple>() {
                @Override
                public void onNext(Simple value) {
                    logger.info("bindirectionalStreamRpc receive {}", value);
                    for (SimpleFeature feature : features) {
                        Simple simpleLocation = feature.getLocation();
                        if (value.getNum() == simpleLocation.getNum()) {
                            // 接收请求后,马上推送记录
                            Simple rtn = Simple.newBuilder().setName(feature.getName() + "rtn")
                                    .setNum(feature.getLocation().getNum()).build();
                            responseObserver.onNext(rtn);
                        }
                    }
                }

                @Override
                public void onError(Throwable t) {
                    logger.error("bindirectionalStreamRpc cancelled, e={}", t);
                }

                @Override
                public void onCompleted() {
                    responseObserver.onCompleted();
                }
            };
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

启动服务端main方法:

    private static final Logger logger = LoggerFactory.getLogger(HelloStreamServer.class);

    private final int port;
    private final Server server;

    public HelloStreamServer(int port) throws IOException {
        this.port = port;
        this.server = ServerBuilder.forPort(port).addService(new HelloStreamService(HelloUtil.parseFeatures())).build();
    }

    // 启动服务
    public void start() throws IOException {
        server.start();
        logger.info("Server started, listening on " + port);
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                System.err.println("*** shutting down gRPC server since JVM is shutting down");
                HelloStreamServer.this.stop();
                System.err.println("*** server shut down");
            }
        });
    }

    // 启动服务
    public void stop() {
        if (server != null) {
            server.shutdown();
        }
    }

    /**
     * Await termination on the main thread since the grpc library uses daemon
     * threads.
     */
    private void blockUntilShutdown() throws InterruptedException {
        if (server != null) {
            server.awaitTermination();
        }
    }

    public static void main(String[] args) throws Exception {
        java.util.logging.Logger.getGlobal().setLevel(java.util.logging.Level.OFF);
        HelloStreamServer server = new HelloStreamServer(8980);
        server.start();
        server.blockUntilShutdown();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

2.2 客户端

客户端实现代码:创建客户端stub类
HelloStreamBlockingStub blockingStub:阻塞客户端,支持简单一元服务和流输出调用服务
HelloStreamStub asyncStub:异步客户端,支持所有类型调用

public class HelloStreamClient {
    private static final Logger logger = LoggerFactory.getLogger(HelloStreamClient.class);

    private final ManagedChannel channel;
    private final HelloStreamBlockingStub blockingStub;
    private final HelloStreamStub asyncStub;

    private Random random = new Random();

    public HelloStreamClient(String host, int port) {
        ManagedChannelBuilder<?> channelBuilder = ManagedChannelBuilder.forAddress(host, port).usePlaintext(true);
        channel = channelBuilder.build();
        // 创建一个阻塞客户端,支持简单一元服务和流输出调用服务
        blockingStub = HelloStreamGrpc.newBlockingStub(channel);
        // 创建一个异步客户端,支持所有类型调用
        asyncStub = HelloStreamGrpc.newStub(channel);
    }

    public void shutdown() throws InterruptedException {
        channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
    }
    ......
    // 创建Simple对象
    private Simple newSimple(int num) {
        return Simple.newBuilder().setName("simple" + num).setNum(num).build();
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 一元RPC(Unary RPCs )客户端实现:
    /**
     * 一元服务调用
     */
    public void simpleRpc(int num) {
        logger.info("request simpleRpc: num={}", num);
        Simple simple = Simple.newBuilder().setName("simpleRpc").setNum(num).build();
        SimpleFeature feature;
        try {
            feature = blockingStub.simpleRpc(simple);
        } catch (StatusRuntimeException e) {
            logger.info("RPC failed: {0}", e.getStatus());
            return;
        }
        logger.info("simpleRpc end called {}", feature);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 服务器流RPC(Server streaming RPCs)客户端实现:
    /**
     * 阻塞服务器流
     */
    public void server2ClientRpc(int num1, int num2) {
        logger.info("request server2ClientRpc num1={}, num2={}", num1, num2);
        Simple simple = Simple.newBuilder().setName("simple" + num1).setNum(num1).build();
        Simple simple2 = Simple.newBuilder().setName("simple" + num2).setNum(num2).build();

        SimpleList simpleList = SimpleList.newBuilder().addSimpleList(simple).addSimpleList(simple2).build();
        Iterator<SimpleFeature> simpleFeatureIter = blockingStub.server2ClientRpc(simpleList);
        for (int i = 1; simpleFeatureIter.hasNext(); i++) {
            SimpleFeature feature = simpleFeatureIter.next();
            logger.info("Result {} : {}", i, feature);
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 客户端流RPC(Client streaming RPCs )客户端实现:
    /**
     * 异步客户端流
     * 
     */
    public void client2ServerRpc(int count) throws InterruptedException {
        logger.info("request client2ServerRpc {}", count);
        final CountDownLatch finishLatch = new CountDownLatch(1);

        StreamObserver<SimpleSummary> responseObserver = new StreamObserver<SimpleSummary>() {
            @Override
            public void onNext(SimpleSummary value) {
                // 返回SimpleSummary
                logger.info("client2ServerRpc onNext : {}", value);
            }

            @Override
            public void onError(Throwable t) {
                logger.error("client2ServerRpc error : {}", t);
                finishLatch.countDown();
            }

            @Override
            public void onCompleted() {
                logger.error("client2ServerRpc finish");
                finishLatch.countDown();
            }
        };
        StreamObserver<Simple> requestObserver = asyncStub.client2ServerRpc(responseObserver);
        try {
            for (int i = 0; i < count; i++) {
                logger.info("simple : {}", i);
                Simple simple = Simple.newBuilder().setName("client2ServerRpc" + i).setNum(i).build();
                requestObserver.onNext(simple);
                Thread.sleep(random.nextInt(200) + 50);
            }
        } catch (RuntimeException e) {
            // Cancel RPC
            requestObserver.onError(e);
            throw e;
        }
        // 结束请求
        requestObserver.onCompleted();

        // Receiving happens asynchronously
        if (!finishLatch.await(1, TimeUnit.MINUTES)) {
            logger.error("client2ServerRpc can not finish within 1 minutes");
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 双向流RPC(Bidirectional streaming RPCs)客户端实现:
    /**
     * 双向流
     * 
     * @throws InterruptedException
     */
    public void bindirectionalStreamRpc() throws InterruptedException {
        logger.info("request bindirectionalStreamRpc");
        final CountDownLatch finishLatch = new CountDownLatch(1);
        StreamObserver<Simple> requestObserver = asyncStub.bindirectionalStreamRpc(new StreamObserver<Simple>() {

            @Override
            public void onNext(Simple value) {
                logger.info("bindirectionalStreamRpc receive message : {}", value);
            }

            @Override
            public void onError(Throwable t) {
                logger.error("bindirectionalStreamRpc Failed: {0}", Status.fromThrowable(t));
                finishLatch.countDown();
            }

            @Override
            public void onCompleted() {
                logger.info("Finished bindirectionalStreamRpc");
                finishLatch.countDown();
            }
        });

        try {
            Simple[] requests = { newSimple(1), newSimple(2), newSimple(3), newSimple(4) };

            for (Simple request : requests) {
                logger.info("Sending message {}", request);
                requestObserver.onNext(request);
            }
        } catch (RuntimeException e) {
            // Cancel RPC
            requestObserver.onError(e);
            throw e;
        }
        requestObserver.onCompleted();

        if (!finishLatch.await(1, TimeUnit.MINUTES)) {
            logger.error("routeChat can not finish within 1 minutes");
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

启动客户端

    public static void main(String[] args) throws InterruptedException {
        HelloStreamClient client = new HelloStreamClient("localhost", 8980);
        try {
            // simple rpc
            client.simpleRpc(1);

            // server2ClientRpc
            client.server2ClientRpc(1, 2);

            // client2ServerRpc
            client.client2ServerRpc(2);

            // bindirectionalStreamRpc
            client.bindirectionalStreamRpc();

        } finally {
            client.shutdown();
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

3. 代码

详细代码见这里Github

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多