protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
while (in.isReadable()) {
// 读取ByteBuf直到没数据
int outSize = out.size();
if (outSize > 0) {
// 如果有解码结果(List<Object>不为空),那么传递给下一个Handler
fireChannelRead(ctx, out, outSize);
// 清空List<Object>
out.clear();
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
int oldInputLength = in.readableBytes();
// 调用该方法进行解码
decodeRemovalReentryProtection(ctx, in, out);
// Check if this handler was removed before continuing the loop.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See https://github.com/netty/netty/issues/1664
if (ctx.isRemoved()) {
break;
}
if (outSize == out.size()) {
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;
}
}
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(
StringUtil.simpleClassName(getClass())
".decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Exception cause) {
throw new DecoderException(cause);
}
}
public boolean acceptOutboundMessage(Object msg) throws Exception {
return matcher.match(msg);
}
ChannelFuture
extends Future
状态:完成(可能成功、可能失败、也可能被取消),未完成
添加监听器
ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);
监听器
public interface GenericFutureListener<F extends Future<?>> extends EventListener {
/**
* Invoked when the operation associated with the {@link Future} has been completed.
*
* @param future the source {@link Future} which called this callback
*/
void operationComplete(F future) throws Exception;
}
private final SelectableChannel ch;
protected final int readInterestOp;
volatile SelectionKey selectionKey; // nio的selectionkey
boolean readPending;
private final Runnable clearReadPendingRunnable = new Runnable() {
@Override
public void run() {
clearReadPending0();
}
};
/**
* The future of the current connection attempt. If not null, subsequent
* connection attempts will fail.
*/
private ChannelPromise connectPromise;
private ScheduledFuture<?> connectTimeoutFuture;
private SocketAddress requestedRemoteAddress;
构造方法
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp; // 设置通道关注事件类型为读
try {
ch.configureBlocking(false); // 设置为非阻塞
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
注册逻辑
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 0表示所有事件都不感兴趣
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true; // 注册成功
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
开始读
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
// 大爷表示我对读事件很感兴趣 AbstractNioByteChannel()构造方法可以看下,会调用本类的构造方法,把read事件传递过去
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
// 如果当前不是读事件,那么设置成读事件,位运算
selectionKey.interestOps(interestOps | readInterestOp);
}
}
AbstractNioByteChannel
写方法,为什么需要对写的循环次数做限制,因为担心写线程假死
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
int writeSpinCount = config().getWriteSpinCount(); // 循环写的次数,缺省16次
do {
Object msg = in.current();
if (msg == null) {
// 为空,说明写完了,那么本大爷就对写事件不感兴趣了
// Wrote all messages.
clearOpWrite();
// Directly return here so incompleteWrite(...) is not called.
return;
}
writeSpinCount -= doWriteInternal(in, msg); // 每次写后,次数-1
} while (writeSpinCount > 0); // 最多循环16次
incompleteWrite(writeSpinCount < 0); // 超过16次没有写完的数据,再来写
}
取消写事件
protected final void clearOpWrite() {
final SelectionKey key = selectionKey();
// Check first if the key is still valid as it may be canceled as part of the deregistration
// from the EventLoop
// See https://github.com/netty/netty/issues/2104
if (!key.isValid()) {
return;
}
final int interestOps = key.interestOps();
if ((interestOps & SelectionKey.OP_WRITE) != 0) {
key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
}
}
写逻辑
private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
// 如果是bytebuf类型
ByteBuf buf = (ByteBuf) msg;
if (!buf.isReadable()) {
in.remove();
return 0; // 返回0,说明不扣减,
}
final int localFlushedAmount = doWriteBytes(buf);
if (localFlushedAmount > 0) {
in.progress(localFlushedAmount);
if (!buf.isReadable()) {
in.remove();
}
return 1;
}
} else if (msg instanceof FileRegion) {
// 如果是文件传输类型
FileRegion region = (FileRegion) msg;
if (region.transferred() >= region.count()) {
in.remove();
return 0;
}
long localFlushedAmount = doWriteFileRegion(region);
if (localFlushedAmount > 0) {
in.progress(localFlushedAmount);
if (region.transferred() >= region.count()) {
in.remove();
}
return 1;
}
} else {
// Should not reach here.
throw new Error();
}
return WRITE_STATUS_SNDBUF_FULL;
}