Refinement for heartbeat logic in sentinel-transport related module (#312)

- Code refinement for heartbeat component in transport related module
- Add remove support of `SentinelConfig`

Signed-off-by: Eric Zhao <sczyh16@gmail.com>
This commit is contained in:
Eric Zhao 2018-12-24 18:03:31 +08:00 committed by GitHub
parent 96dac7a1e0
commit d3bbe290c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 235 additions and 62 deletions

View File

@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentHashMap;
import com.alibaba.csp.sentinel.log.LogBase; import com.alibaba.csp.sentinel.log.LogBase;
import com.alibaba.csp.sentinel.log.RecordLog; import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.util.AppNameUtil; import com.alibaba.csp.sentinel.util.AppNameUtil;
import com.alibaba.csp.sentinel.util.AssertUtil;
import com.alibaba.csp.sentinel.util.StringUtil; import com.alibaba.csp.sentinel.util.StringUtil;
/** /**
@ -102,14 +103,24 @@ public class SentinelConfig {
* @return the config value. * @return the config value.
*/ */
public static String getConfig(String key) { public static String getConfig(String key) {
AssertUtil.notNull(key, "key cannot be null");
return props.get(key); return props.get(key);
} }
public static void setConfig(String key, String value) { public static void setConfig(String key, String value) {
AssertUtil.notNull(key, "key cannot be null");
AssertUtil.notNull(value, "value cannot be null");
props.put(key, value); props.put(key, value);
} }
public static String removeConfig(String key) {
AssertUtil.notNull(key, "key cannot be null");
return props.remove(key);
}
public static void setConfigIfAbsent(String key, String value) { public static void setConfigIfAbsent(String key, String value) {
AssertUtil.notNull(key, "key cannot be null");
AssertUtil.notNull(value, "value cannot be null");
String v = props.get(key); String v = props.get(key);
if (v == null) { if (v == null) {
props.put(key, value); props.put(key, value);

View File

@ -20,10 +20,16 @@
<groupId>com.alibaba</groupId> <groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId> <artifactId>fastjson</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>
<artifactId>junit</artifactId> <artifactId>junit</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -16,8 +16,8 @@
package com.alibaba.csp.sentinel.transport; package com.alibaba.csp.sentinel.transport;
/** /**
* Heartbeat interface. Sentinel core is responsible for invoking {@link #sendHeartbeat()} * The heartbeat sender which is responsible for sending heartbeat to remote dashboard
* at every {@link #intervalMs()} interval. * periodically per {@code interval}.
* *
* @author leyou * @author leyou
* @author Eric Zhao * @author Eric Zhao
@ -30,14 +30,15 @@ public interface HeartbeatSender {
* at every {@link #intervalMs()} interval. * at every {@link #intervalMs()} interval.
* *
* @return whether heartbeat is successfully send. * @return whether heartbeat is successfully send.
* @throws Exception * @throws Exception if error occurs
*/ */
boolean sendHeartbeat() throws Exception; boolean sendHeartbeat() throws Exception;
/** /**
* Millisecond interval of every {@link #sendHeartbeat()} * Default interval in milliseconds of the sender. It would take effect only when
* the heartbeat interval is not configured in Sentinel config property.
* *
* @return millisecond interval. * @return default interval of the sender in milliseconds
*/ */
long intervalMs(); long intervalMs();
} }

View File

@ -16,6 +16,7 @@
package com.alibaba.csp.sentinel.transport.config; package com.alibaba.csp.sentinel.transport.config;
import com.alibaba.csp.sentinel.config.SentinelConfig; import com.alibaba.csp.sentinel.config.SentinelConfig;
import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.util.HostNameUtil; import com.alibaba.csp.sentinel.util.HostNameUtil;
import com.alibaba.csp.sentinel.util.StringUtil; import com.alibaba.csp.sentinel.util.StringUtil;
@ -31,9 +32,19 @@ public class TransportConfig {
private static int runtimePort = -1; private static int runtimePort = -1;
/**
* Get heartbeat interval in milliseconds.
*
* @return heartbeat interval in milliseconds if exists, or null if not configured or invalid config
*/
public static Long getHeartbeatIntervalMs() { public static Long getHeartbeatIntervalMs() {
String interval = SentinelConfig.getConfig(HEARTBEAT_INTERVAL_MS); String interval = SentinelConfig.getConfig(HEARTBEAT_INTERVAL_MS);
return interval == null ? null : Long.parseLong(interval); try {
return interval == null ? null : Long.parseLong(interval);
} catch (Exception ex) {
RecordLog.warn("[TransportConfig] Failed to parse heartbeat interval: " + interval);
return null;
}
} }
/** /**

View File

@ -22,6 +22,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory; import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory;
import com.alibaba.csp.sentinel.config.SentinelConfig;
import com.alibaba.csp.sentinel.init.InitFunc; import com.alibaba.csp.sentinel.init.InitFunc;
import com.alibaba.csp.sentinel.log.RecordLog; import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.transport.HeartbeatSender; import com.alibaba.csp.sentinel.transport.HeartbeatSender;
@ -37,15 +38,12 @@ public class HeartbeatSenderInitFunc implements InitFunc {
private static ScheduledExecutorService pool = Executors.newScheduledThreadPool(2, private static ScheduledExecutorService pool = Executors.newScheduledThreadPool(2,
new NamedThreadFactory("sentinel-heartbeat-send-task", true)); new NamedThreadFactory("sentinel-heartbeat-send-task", true));
private boolean validHeartbeatInterval(Long interval) {
return interval != null && interval > 0;
}
@Override @Override
public void init() throws Exception { public void init() {
long heartBeatInterval = -1;
try {
heartBeatInterval = TransportConfig.getHeartbeatIntervalMs();
RecordLog.info("system property heartbeat interval set: " + heartBeatInterval);
} catch (Exception ex) {
RecordLog.info("Parse heartbeat interval failed, use that in code, " + ex.getMessage());
}
ServiceLoader<HeartbeatSender> loader = ServiceLoader.load(HeartbeatSender.class); ServiceLoader<HeartbeatSender> loader = ServiceLoader.load(HeartbeatSender.class);
Iterator<HeartbeatSender> iterator = loader.iterator(); Iterator<HeartbeatSender> iterator = loader.iterator();
if (iterator.hasNext()) { if (iterator.hasNext()) {
@ -53,24 +51,42 @@ public class HeartbeatSenderInitFunc implements InitFunc {
if (iterator.hasNext()) { if (iterator.hasNext()) {
throw new IllegalStateException("Only single heartbeat sender can be scheduled"); throw new IllegalStateException("Only single heartbeat sender can be scheduled");
} else { } else {
long interval = sender.intervalMs(); long interval = retrieveInterval(sender);
if (heartBeatInterval != -1) { setIntervalIfNotExists(interval);
interval = heartBeatInterval; scheduleHeartbeatTask(sender, interval);
}
pool.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
sender.sendHeartbeat();
} catch (Throwable e) {
e.printStackTrace();
RecordLog.info("[HeartbeatSender] Send heartbeat error", e);
}
}
}, 10000, interval, TimeUnit.MILLISECONDS);
RecordLog.info("[HeartbeatSenderInit] HeartbeatSender started: "
+ sender.getClass().getCanonicalName());
} }
} }
} }
private void setIntervalIfNotExists(long interval) {
SentinelConfig.setConfig(TransportConfig.HEARTBEAT_INTERVAL_MS, String.valueOf(interval));
}
long retrieveInterval(/*@NonNull*/ HeartbeatSender sender) {
Long intervalInConfig = TransportConfig.getHeartbeatIntervalMs();
if (validHeartbeatInterval(intervalInConfig)) {
RecordLog.info("[HeartbeatSenderInit] Using heartbeat interval in Sentinel config property: " + intervalInConfig);
return intervalInConfig;
} else {
long senderInterval = sender.intervalMs();
RecordLog.info("[HeartbeatSenderInit] Heartbeat interval not configured in config property or invalid, "
+ "using sender default: " + senderInterval);
return senderInterval;
}
}
private void scheduleHeartbeatTask(/*@NonNull*/ final HeartbeatSender sender, /*@Valid*/ long interval) {
pool.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
sender.sendHeartbeat();
} catch (Throwable e) {
RecordLog.warn("[HeartbeatSender] Send heartbeat error", e);
}
}
}, 5000, interval, TimeUnit.MILLISECONDS);
RecordLog.info("[HeartbeatSenderInit] HeartbeatSender started: "
+ sender.getClass().getCanonicalName());
}
} }

View File

@ -1,25 +1,67 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.transport.config; package com.alibaba.csp.sentinel.transport.config;
import com.alibaba.csp.sentinel.config.SentinelConfig; import com.alibaba.csp.sentinel.config.SentinelConfig;
import com.alibaba.csp.sentinel.util.StringUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.*; import static org.junit.Assert.*;
public class TransportConfigTest { public class TransportConfigTest {
@Before
public void setUp() throws Exception {
SentinelConfig.removeConfig(TransportConfig.HEARTBEAT_INTERVAL_MS);
SentinelConfig.removeConfig(TransportConfig.HEARTBEAT_CLIENT_IP);
}
@After
public void tearDown() throws Exception {
SentinelConfig.removeConfig(TransportConfig.HEARTBEAT_INTERVAL_MS);
SentinelConfig.removeConfig(TransportConfig.HEARTBEAT_CLIENT_IP);
}
@Test @Test
public void getClientIp() { public void testGetHeartbeatInterval() {
//config heartbeat client ip long interval = 20000;
System.setProperty(TransportConfig.HEARTBEAT_CLIENT_IP, "10.10.10.10"); assertNull(TransportConfig.getHeartbeatIntervalMs());
// Set valid interval.
SentinelConfig.setConfig(TransportConfig.HEARTBEAT_INTERVAL_MS, String.valueOf(interval));
assertEquals(new Long(interval), TransportConfig.getHeartbeatIntervalMs());
// Set invalid interval.
SentinelConfig.setConfig(TransportConfig.HEARTBEAT_INTERVAL_MS, "Sentinel");
assertNull(TransportConfig.getHeartbeatIntervalMs());
}
@Test
public void testGetHeartbeatClientIp() {
String clientIp = "10.10.10.10";
SentinelConfig.setConfig(TransportConfig.HEARTBEAT_CLIENT_IP, clientIp);
// Set heartbeat client ip to system property.
String ip = TransportConfig.getHeartbeatClientIp(); String ip = TransportConfig.getHeartbeatClientIp();
assertNotNull(ip); assertNotNull(ip);
assertEquals(ip, "10.10.10.10"); assertEquals(clientIp, ip);
//no heartbeat client ip // Set no heartbeat client ip.
SentinelConfig.setConfig(TransportConfig.HEARTBEAT_CLIENT_IP, ""); SentinelConfig.setConfig(TransportConfig.HEARTBEAT_CLIENT_IP, "");
ip = TransportConfig.getHeartbeatClientIp(); assertTrue(StringUtil.isNotEmpty(TransportConfig.getHeartbeatClientIp()));
assertNotNull(ip);
} }
} }

View File

@ -0,0 +1,63 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.transport.init;
import com.alibaba.csp.sentinel.config.SentinelConfig;
import com.alibaba.csp.sentinel.transport.HeartbeatSender;
import com.alibaba.csp.sentinel.transport.config.TransportConfig;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
/**
* @author Eric Zhao
*/
public class HeartbeatSenderInitFuncTest {
@Before
public void setUp() throws Exception {
SentinelConfig.removeConfig(TransportConfig.HEARTBEAT_INTERVAL_MS);
}
@After
public void tearDown() throws Exception {
SentinelConfig.removeConfig(TransportConfig.HEARTBEAT_INTERVAL_MS);
}
@Test
public void testRetrieveInterval() {
HeartbeatSender sender = mock(HeartbeatSender.class);
long senderInterval = 5666;
long configInterval = 6777;
when(sender.intervalMs()).thenReturn(senderInterval);
HeartbeatSenderInitFunc func = new HeartbeatSenderInitFunc();
assertEquals(senderInterval, func.retrieveInterval(sender));
// Invalid interval.
SentinelConfig.setConfig(TransportConfig.HEARTBEAT_INTERVAL_MS, "-1");
assertEquals(senderInterval, func.retrieveInterval(sender));
SentinelConfig.setConfig(TransportConfig.HEARTBEAT_INTERVAL_MS, String.valueOf(configInterval));
assertEquals(configInterval, func.retrieveInterval(sender));
}
}

View File

@ -15,6 +15,9 @@
*/ */
package com.alibaba.csp.sentinel.transport.heartbeat; package com.alibaba.csp.sentinel.transport.heartbeat;
import java.util.ArrayList;
import java.util.List;
import com.alibaba.csp.sentinel.Constants; import com.alibaba.csp.sentinel.Constants;
import com.alibaba.csp.sentinel.transport.config.TransportConfig; import com.alibaba.csp.sentinel.transport.config.TransportConfig;
import com.alibaba.csp.sentinel.log.RecordLog; import com.alibaba.csp.sentinel.log.RecordLog;
@ -23,6 +26,7 @@ import com.alibaba.csp.sentinel.util.HostNameUtil;
import com.alibaba.csp.sentinel.transport.HeartbeatSender; import com.alibaba.csp.sentinel.transport.HeartbeatSender;
import com.alibaba.csp.sentinel.util.PidUtil; import com.alibaba.csp.sentinel.util.PidUtil;
import com.alibaba.csp.sentinel.util.StringUtil; import com.alibaba.csp.sentinel.util.StringUtil;
import com.alibaba.csp.sentinel.util.function.Tuple2;
import org.apache.http.client.config.RequestConfig; import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.CloseableHttpResponse;
@ -51,24 +55,48 @@ public class HttpHeartbeatSender implements HeartbeatSender {
public HttpHeartbeatSender() { public HttpHeartbeatSender() {
this.client = HttpClients.createDefault(); this.client = HttpClients.createDefault();
String consoleServer = TransportConfig.getConsoleServer(); List<Tuple2<String, Integer>> dashboardList = parseDashboardList();
if (StringUtil.isEmpty(consoleServer)) { if (dashboardList == null || dashboardList.isEmpty()) {
RecordLog.info("[Heartbeat] Console server address is not configured!"); RecordLog.info("[NettyHttpHeartbeatSender] No dashboard available");
} else { } else {
String consoleHost = consoleServer; consoleHost = dashboardList.get(0).r1;
int consolePort = 80; consolePort = dashboardList.get(0).r2;
if (consoleServer.contains(",")) { RecordLog.info("[NettyHttpHeartbeatSender] Dashboard address parsed: <" + consoleHost + ':' + consolePort + ">");
consoleHost = consoleServer.split(",")[0];
}
if (consoleHost.contains(":")) {
String[] strs = consoleServer.split(":");
consoleHost = strs[0];
consolePort = Integer.parseInt(strs[1]);
}
this.consoleHost = consoleHost;
this.consolePort = consolePort;
} }
}
private List<Tuple2<String, Integer>> parseDashboardList() {
List<Tuple2<String, Integer>> list = new ArrayList<Tuple2<String, Integer>>();
try {
String ipsStr = TransportConfig.getConsoleServer();
if (StringUtil.isBlank(ipsStr)) {
RecordLog.warn("[NettyHttpHeartbeatSender] Dashboard server address is not configured");
return list;
}
for (String ipPortStr : ipsStr.split(",")) {
if (ipPortStr.trim().length() == 0) {
continue;
}
ipPortStr = ipPortStr.trim();
if (ipPortStr.startsWith("http://")) {
ipPortStr = ipPortStr.substring(7);
}
if (ipPortStr.startsWith(":")) {
continue;
}
String[] ipPort = ipPortStr.trim().split(":");
int port = 8080;
if (ipPort.length > 1) {
port = Integer.parseInt(ipPort[1].trim());
}
list.add(Tuple2.of(ipPort[0].trim(), port));
}
} catch (Exception ex) {
RecordLog.warn("[NettyHttpHeartbeatSender] Parse dashboard list failed, current address list: " + list, ex);
ex.printStackTrace();
}
return list;
} }
@Override @Override
@ -76,8 +104,6 @@ public class HttpHeartbeatSender implements HeartbeatSender {
if (StringUtil.isEmpty(consoleHost)) { if (StringUtil.isEmpty(consoleHost)) {
return false; return false;
} }
RecordLog.info(String.format("[Heartbeat] Sending heartbeat to %s:%d", consoleHost, consolePort));
URIBuilder uriBuilder = new URIBuilder(); URIBuilder uriBuilder = new URIBuilder();
uriBuilder.setScheme("http").setHost(consoleHost).setPort(consolePort) uriBuilder.setScheme("http").setHost(consoleHost).setPort(consolePort)
.setPath("/registry/machine") .setPath("/registry/machine")

View File

@ -19,7 +19,6 @@ import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import com.alibaba.csp.sentinel.config.SentinelConfig;
import com.alibaba.csp.sentinel.log.RecordLog; import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.transport.HeartbeatSender; import com.alibaba.csp.sentinel.transport.HeartbeatSender;
import com.alibaba.csp.sentinel.transport.config.TransportConfig; import com.alibaba.csp.sentinel.transport.config.TransportConfig;
@ -54,9 +53,6 @@ public class SimpleHttpHeartbeatSender implements HeartbeatSender {
List<InetSocketAddress> newAddrs = getDefaultConsoleIps(); List<InetSocketAddress> newAddrs = getDefaultConsoleIps();
RecordLog.info("[SimpleHttpHeartbeatSender] Default console address list retrieved: " + newAddrs); RecordLog.info("[SimpleHttpHeartbeatSender] Default console address list retrieved: " + newAddrs);
this.addressList = newAddrs; this.addressList = newAddrs;
// Set interval config.
String interval = System.getProperty(TransportConfig.HEARTBEAT_INTERVAL_MS, String.valueOf(DEFAULT_INTERVAL));
SentinelConfig.setConfig(TransportConfig.HEARTBEAT_INTERVAL_MS, interval);
} }
@Override @Override
@ -78,7 +74,7 @@ public class SimpleHttpHeartbeatSender implements HeartbeatSender {
return true; return true;
} }
} catch (Exception e) { } catch (Exception e) {
RecordLog.info("Failed to send heart beat to " + addr + " : ", e); RecordLog.warn("[SimpleHttpHeartbeatSender] Failed to send heartbeat to " + addr + " : ", e);
} }
return false; return false;
} }
@ -104,6 +100,7 @@ public class SimpleHttpHeartbeatSender implements HeartbeatSender {
try { try {
String ipsStr = TransportConfig.getConsoleServer(); String ipsStr = TransportConfig.getConsoleServer();
if (StringUtil.isEmpty(ipsStr)) { if (StringUtil.isEmpty(ipsStr)) {
RecordLog.warn("[NettyHttpHeartbeatSender] Dashboard server address not configured");
return newAddrs; return newAddrs;
} }
@ -122,7 +119,7 @@ public class SimpleHttpHeartbeatSender implements HeartbeatSender {
newAddrs.add(new InetSocketAddress(ipPort[0].trim(), port)); newAddrs.add(new InetSocketAddress(ipPort[0].trim(), port));
} }
} catch (Exception ex) { } catch (Exception ex) {
RecordLog.info("[SimpleHeartbeatSender] Parse console list failed, current address list: " + newAddrs, ex); RecordLog.warn("[SimpleHeartbeatSender] Parse dashboard list failed, current address list: " + newAddrs, ex);
ex.printStackTrace(); ex.printStackTrace();
} }
return newAddrs; return newAddrs;