分享

netty源码解析1

 印度阿三17 2020-11-30

文章目录

DefaultChannelPipeline

implements ChannelPipeline

  • 构造方法,维护头尾节点,头尾节点组成双向链表。ChannelHandler封装成ChannelHandlerContext,再有ChannelHandlerContext组成链表的元素。
protected DefaultChannelPipeline(Channel channel) {
    // ...
    // 头尾节点
    tail = new TailContext(this);
    head = new HeadContext(this);
    // 双向链表
    head.next = tail;
    tail.prev = head;
}
  • 往尾部添加handler,头尾节点固定,新增的handler的下一个节点为尾节点
private void addLast0(AbstractChannelHandlerContext newCtx) {
    // 在链表添加一个handler,即在head和tail之间构建双向链表
    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev;
    newCtx.next = tail;
    prev.next = newCtx;
    tail.prev = newCtx;
}
  • 写操作,很明显,从尾节点开始往前面的节点写数据
public final ChannelFuture write(Object msg) {
    return tail.write(msg);
}

AbstractChannelHandlerContext

implements ChannelHandlerContext

  • 向通道写数据
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        // 如果是同一个线程,那么执行下一个节点的invokeChannelRead方法
        next.invokeChannelRead(m);
    } else {
        // 如果不是同一线程,那么投放到队列去执行
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}
  • 找到下一个InboundHandler(从head到tail的顺序找)
private AbstractChannelHandlerContext findContextInbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.next;
    } while (!ctx.inbound);
    return ctx;
}
  • 找到下一个OutboundHandler(从tail到head的顺序)
private AbstractChannelHandlerContext findContextOutbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.prev;
    } while (!ctx.outbound);
    return ctx;
}

HeadContext

extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler,它是一个context,同时是一个出入站handler,说明不管出入站,都会流经。

  • HeadContext是入站的第一个handler,负责传递入站消息到下一个ChannelInboundHandler
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    ctx.fireChannelInactive();
}
  • HeadContext是出站的最后一个handler,直接往网络发送
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
    unsafe.close(promise);
}

@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
    unsafe.deregister(promise);
}

TailContext

extends AbstractChannelHandlerContext implements ChannelInboundHandler,它是一个context,同时是一个入站handler,也是最后一个入站handler

  • TailContext可用来做资源的释放,假如前面的handler不释放资源,那么它必须把释放资源的操作交给后面的handler,最后由TailContext处理
protected void onUnhandledInboundMessage(Object msg) {
    try {
        logger.debug(
                "Discarded inbound message {} that reached at the tail of the pipeline. "  
                        "Please check your pipeline configuration.", msg);
    } finally {
        ReferenceCountUtil.release(msg);
    }
}

ByteToMessageDecoder

extends ChannelInboundHandlerAdapter

  • 读操作
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    if (msg instanceof ByteBuf) {
        // 如果消息是一个ByteBuf,那么执行解码操作
        // 创建一个存放解码数据的list集合
        CodecOutputList out = CodecOutputList.newInstance();
        try {
            // cumulation也是一个ByteBuf,表示累计的消息
            first = cumulation == null;
            // 如果cumulation为空,则无需累加,初始化为一个空的buffer。否则,说明cumulation有数据,需要累加数据
            cumulation = cumulator.cumulate(ctx.alloc(),
                    first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);
            callDecode(ctx, cumulation, out); // 解码操作
        }
        // ...
    } else {
        // 如果消息不是ByteBuf,则传递给下一个handler
        ctx.fireChannelRead(msg);
    }
}
  • 解码逻辑
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    try {
        while (in.isReadable()) {
            // 读取ByteBuf直到没数据
            int outSize = out.size();

            if (outSize > 0) {
                // 如果有解码结果(List<Object>不为空),那么传递给下一个Handler
                fireChannelRead(ctx, out, outSize); 
                // 清空List<Object>
                out.clear();

                if (ctx.isRemoved()) {
                    break;
                }
                outSize = 0;
            }

            int oldInputLength = in.readableBytes();
            // 调用该方法进行解码
            decodeRemovalReentryProtection(ctx, in, out);

            // Check if this handler was removed before continuing the loop.
            // If it was removed, it is not safe to continue to operate on the buffer.
            //
            // See https://github.com/netty/netty/issues/1664
            if (ctx.isRemoved()) {
                break;
            }

            if (outSize == out.size()) {
                if (oldInputLength == in.readableBytes()) {
                    break;
                } else {
                    continue;
                }
            }

            if (oldInputLength == in.readableBytes()) {
                throw new DecoderException(
                        StringUtil.simpleClassName(getClass())  
                                ".decode() did not read anything but decoded a message.");
            }

            if (isSingleDecode()) {
                break;
            }
        }
    } catch (DecoderException e) {
        throw e;
    } catch (Exception cause) {
        throw new DecoderException(cause);
    }
}
  • 解码逻辑
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
        throws Exception {
    decodeState = STATE_CALLING_CHILD_DECODE;
    try {
        decode(ctx, in, out); // 调用decode方法,这个方法需要我们实现
    } finally {
        boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
        decodeState = STATE_INIT;
        if (removePending) {
            fireChannelRead(ctx, out, out.size());
            out.clear();
            handlerRemoved(ctx);
        }
    }
}

MessageToByteEncoder

extends ChannelOutboundHandlerAdapter

  • 写操作
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    ByteBuf buf = null;
    try {
        if (acceptOutboundMessage(msg)) {
            @SuppressWarnings("unchecked")
            // 如果类型正确,强转
            I cast = (I) msg;
            buf = allocateBuffer(ctx, cast, preferDirect);
            try {
                encode(ctx, cast, buf); // 调用encode方法,需要我们实现
            } finally {
                // 结束,需要释放,为什么需要释放?因为Message转到ByteBuf了,说明已经是出站了,msg没用了,所以需要释放msg了。
                ReferenceCountUtil.release(cast);
            }

            if (buf.isReadable()) {
                ctx.write(buf, promise);
            } else {
                buf.release();
                ctx.write(Unpooled.EMPTY_BUFFER, promise);
            }
            buf = null;
        } else {
            ctx.write(msg, promise);
        }
    }
    // ...
}
  • 判断是否是我们想要的类型
public boolean acceptOutboundMessage(Object msg) throws Exception {
    return matcher.match(msg);
}

ChannelFuture

extends Future

状态:完成(可能成功、可能失败、也可能被取消),未完成

  • 添加监听器
ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);
  • 监听器
public interface GenericFutureListener<F extends Future<?>> extends EventListener {

    /**
     * Invoked when the operation associated with the {@link Future} has been completed.
     *
     * @param future  the source {@link Future} which called this callback
     */
    void operationComplete(F future) throws Exception;
}

一般不推荐直接使用future.get,建议使用监听器,完成后通知执行operationComplete(F future)方法,在这个方法里面去获取值

DefaultPromise

extends AbstractFuture implements Promise

public Promise<V> setSuccess(V result) {
    if (setSuccess0(result)) {
        return this;
    }
    throw new IllegalStateException("complete already: "   this);
}
private boolean setSuccess0(V result) {
    return setValue0(result == null ? SUCCESS : result);
}
private boolean setValue0(Object objResult) {
    if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
        // 使用原子类保障安全
        RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
        if (checkNotifyWaiters()) {
            // 执行成功后,唤醒监听器
            notifyListeners();
        }
        return true;
    }
    return false;
}
private void notifyListeners() {
    EventExecutor executor = executor();
    if (executor.inEventLoop()) {
        final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
        final int stackDepth = threadLocals.futureListenerStackDepth();
        if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
            threadLocals.setFutureListenerStackDepth(stackDepth   1);
            try {
                notifyListenersNow();
            } finally {
                threadLocals.setFutureListenerStackDepth(stackDepth);
            }
            return;
        }
    }

    safeExecute(executor, new Runnable() {
        @Override
        public void run() {
            notifyListenersNow();
        }
    });
}
  • 调用监听器的完成方法
private static void notifyListener0(Future future, GenericFutureListener l) {
    try {
        l.operationComplete(future);
    } catch (Throwable t) {
        if (logger.isWarnEnabled()) {
            logger.warn("An exception was thrown by "   l.getClass().getName()   ".operationComplete()", t);
        }
    }
}

AbstractNioChannel

extends AbstractChannel

  • 封装了nio的一些东东
private final SelectableChannel ch;
protected final int readInterestOp;
volatile SelectionKey selectionKey; // nio的selectionkey
boolean readPending;
private final Runnable clearReadPendingRunnable = new Runnable() {
    @Override
    public void run() {
        clearReadPending0();
    }
};

/**
 * The future of the current connection attempt.  If not null, subsequent
 * connection attempts will fail.
 */
private ChannelPromise connectPromise;
private ScheduledFuture<?> connectTimeoutFuture;
private SocketAddress requestedRemoteAddress;
  • 构造方法
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp; // 设置通道关注事件类型为读
    try {
        ch.configureBlocking(false); // 设置为非阻塞
    } catch (IOException e) {
        try {
            ch.close();
        } catch (IOException e2) {
            logger.warn(
                        "Failed to close a partially initialized socket.", e2);
        }

        throw new ChannelException("Failed to enter non-blocking mode.", e);
    }
}
  • 注册逻辑
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            // 0表示所有事件都不感兴趣
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                // Force the Selector to select now as the "canceled" SelectionKey may still be
                // cached and not removed because no Select.select(..) operation was called yet.
                eventLoop().selectNow();
                selected = true; // 注册成功
            } else {
                // We forced a select operation on the selector before but the SelectionKey is still cached
                // for whatever reason. JDK bug ?
                throw e;
            }
        }
    }
}
  • 开始读
protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }

    readPending = true;

    // 大爷表示我对读事件很感兴趣 AbstractNioByteChannel()构造方法可以看下,会调用本类的构造方法,把read事件传递过去
    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        // 如果当前不是读事件,那么设置成读事件,位运算
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}

AbstractNioByteChannel

  • 写方法,为什么需要对写的循环次数做限制,因为担心写线程假死
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    int writeSpinCount = config().getWriteSpinCount(); // 循环写的次数,缺省16次
    do {
        Object msg = in.current();
        if (msg == null) {
            // 为空,说明写完了,那么本大爷就对写事件不感兴趣了
            // Wrote all messages.
            clearOpWrite();
            // Directly return here so incompleteWrite(...) is not called.
            return;
        }
        writeSpinCount -= doWriteInternal(in, msg); // 每次写后,次数-1
    } while (writeSpinCount > 0); // 最多循环16次

    incompleteWrite(writeSpinCount < 0); // 超过16次没有写完的数据,再来写
}
  • 取消写事件
protected final void clearOpWrite() {
    final SelectionKey key = selectionKey();
    // Check first if the key is still valid as it may be canceled as part of the deregistration
    // from the EventLoop
    // See https://github.com/netty/netty/issues/2104
    if (!key.isValid()) {
        return;
    }
    final int interestOps = key.interestOps();
    if ((interestOps & SelectionKey.OP_WRITE) != 0) {
        key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
    }
}
  • 写逻辑
private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {
    if (msg instanceof ByteBuf) {
        // 如果是bytebuf类型
        ByteBuf buf = (ByteBuf) msg;
        if (!buf.isReadable()) {
            in.remove();
            return 0; // 返回0,说明不扣减,
        }

        final int localFlushedAmount = doWriteBytes(buf);
        if (localFlushedAmount > 0) {
            in.progress(localFlushedAmount);
            if (!buf.isReadable()) {
                in.remove();
            }
            return 1;
        }
    } else if (msg instanceof FileRegion) {
        // 如果是文件传输类型
        FileRegion region = (FileRegion) msg;
        if (region.transferred() >= region.count()) {
            in.remove();
            return 0;
        }

        long localFlushedAmount = doWriteFileRegion(region);
        if (localFlushedAmount > 0) {
            in.progress(localFlushedAmount);
            if (region.transferred() >= region.count()) {
                in.remove();
            }
            return 1;
        }
    } else {
        // Should not reach here.
        throw new Error();
    }
    return WRITE_STATUS_SNDBUF_FULL;
}
来源:https://www./content-1-769201.html

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多