dubbo HeartbeatHandler 源码
dubbo HeartbeatHandler 代码
文件路径:/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatHandler.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.remoting.exchange.support.header;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.exchange.Request;
import org.apache.dubbo.remoting.exchange.Response;
import org.apache.dubbo.remoting.transport.AbstractChannelHandlerDelegate;
import static org.apache.dubbo.common.constants.CommonConstants.HEARTBEAT_EVENT;
public class HeartbeatHandler extends AbstractChannelHandlerDelegate {
private static final Logger logger = LoggerFactory.getLogger(HeartbeatHandler.class);
public static final String KEY_READ_TIMESTAMP = "READ_TIMESTAMP";
public static final String KEY_WRITE_TIMESTAMP = "WRITE_TIMESTAMP";
public HeartbeatHandler(ChannelHandler handler) {
super(handler);
}
@Override
public void connected(Channel channel) throws RemotingException {
setReadTimestamp(channel);
setWriteTimestamp(channel);
handler.connected(channel);
}
@Override
public void disconnected(Channel channel) throws RemotingException {
clearReadTimestamp(channel);
clearWriteTimestamp(channel);
handler.disconnected(channel);
}
@Override
public void sent(Channel channel, Object message) throws RemotingException {
setWriteTimestamp(channel);
handler.sent(channel, message);
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
setReadTimestamp(channel);
if (isHeartbeatRequest(message)) {
Request req = (Request) message;
if (req.isTwoWay()) {
Response res = new Response(req.getId(), req.getVersion());
res.setEvent(HEARTBEAT_EVENT);
channel.send(res);
if (logger.isDebugEnabled()) {
int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress()
+ ", cause: The channel has no data-transmission exceeds a heartbeat period"
+ (heartbeat > 0 ? ": " + heartbeat + "ms" : ""));
}
}
return;
}
if (isHeartbeatResponse(message)) {
if (logger.isDebugEnabled()) {
logger.debug("Receive heartbeat response in thread " + Thread.currentThread().getName());
}
return;
}
handler.received(channel, message);
}
private void setReadTimestamp(Channel channel) {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
}
private void setWriteTimestamp(Channel channel) {
channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis());
}
private void clearReadTimestamp(Channel channel) {
channel.removeAttribute(KEY_READ_TIMESTAMP);
}
private void clearWriteTimestamp(Channel channel) {
channel.removeAttribute(KEY_WRITE_TIMESTAMP);
}
private boolean isHeartbeatRequest(Object message) {
return message instanceof Request && ((Request) message).isHeartbeat();
}
private boolean isHeartbeatResponse(Object message) {
return message instanceof Response && ((Response) message).isHeartbeat();
}
}
相关信息
相关文章
dubbo HeaderExchangeChannel 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
7、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦