diff --git a/sentinel-dashboard/src/main/java/com/alibaba/csp/sentinel/dashboard/repository/metric/InMemoryMetricsRepository.java b/sentinel-dashboard/src/main/java/com/alibaba/csp/sentinel/dashboard/repository/metric/InMemoryMetricsRepository.java index bed4ea1c..4afeb0cc 100644 --- a/sentinel-dashboard/src/main/java/com/alibaba/csp/sentinel/dashboard/repository/metric/InMemoryMetricsRepository.java +++ b/sentinel-dashboard/src/main/java/com/alibaba/csp/sentinel/dashboard/repository/metric/InMemoryMetricsRepository.java @@ -15,6 +15,11 @@ */ package com.alibaba.csp.sentinel.dashboard.repository.metric; +import com.alibaba.csp.sentinel.dashboard.datasource.entity.MetricEntity; +import com.alibaba.csp.sentinel.util.StringUtil; +import com.alibaba.csp.sentinel.util.TimeUtil; +import org.springframework.stereotype.Component; + import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashMap; @@ -22,14 +27,9 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; -import com.alibaba.csp.sentinel.dashboard.datasource.entity.MetricEntity; -import com.alibaba.csp.sentinel.util.StringUtil; - -import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap; -import org.springframework.stereotype.Component; - /** * Caches metrics data in a period of time in memory. * @@ -44,54 +44,71 @@ public class InMemoryMetricsRepository implements MetricsRepository resource -> timestamp -> metric} */ - private Map>> allMetrics = new ConcurrentHashMap<>(); + private Map>> allMetrics = new ConcurrentHashMap<>(); + private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); @Override - public synchronized void save(MetricEntity entity) { + public void save(MetricEntity entity) { if (entity == null || StringUtil.isBlank(entity.getApp())) { return; } - allMetrics.computeIfAbsent(entity.getApp(), e -> new ConcurrentHashMap<>(16)) - .computeIfAbsent(entity.getResource(), e -> new ConcurrentLinkedHashMap.Builder() - .maximumWeightedCapacity(MAX_METRIC_LIVE_TIME_MS).weigher((key, value) -> { - // Metric older than {@link #MAX_METRIC_LIVE_TIME_MS} will be removed. - int weight = (int)(System.currentTimeMillis() - key); - // weight must be a number greater than or equal to one - return Math.max(weight, 1); - }).build()).put(entity.getTimestamp().getTime(), entity); + readWriteLock.writeLock().lock(); + try { + allMetrics.computeIfAbsent(entity.getApp(), e -> new HashMap<>(16)) + .computeIfAbsent(entity.getResource(), e -> new LinkedHashMap() { + @Override + protected boolean removeEldestEntry(Entry eldest) { + // Metric older than {@link #MAX_METRIC_LIVE_TIME_MS} will be removed. + return eldest.getKey() < TimeUtil.currentTimeMillis() - MAX_METRIC_LIVE_TIME_MS; + } + }).put(entity.getTimestamp().getTime(), entity); + } finally { + readWriteLock.writeLock().unlock(); + } + } @Override - public synchronized void saveAll(Iterable metrics) { + public void saveAll(Iterable metrics) { if (metrics == null) { return; } - metrics.forEach(this::save); + readWriteLock.writeLock().lock(); + try { + metrics.forEach(this::save); + } finally { + readWriteLock.writeLock().unlock(); + } } @Override - public synchronized List queryByAppAndResourceBetween(String app, String resource, - long startTime, long endTime) { + public List queryByAppAndResourceBetween(String app, String resource, + long startTime, long endTime) { List results = new ArrayList<>(); if (StringUtil.isBlank(app)) { return results; } - Map> resourceMap = allMetrics.get(app); + Map> resourceMap = allMetrics.get(app); if (resourceMap == null) { return results; } - ConcurrentLinkedHashMap metricsMap = resourceMap.get(resource); + LinkedHashMap metricsMap = resourceMap.get(resource); if (metricsMap == null) { return results; } - for (Entry entry : metricsMap.entrySet()) { - if (entry.getKey() >= startTime && entry.getKey() <= endTime) { - results.add(entry.getValue()); + readWriteLock.readLock().lock(); + try { + for (Entry entry : metricsMap.entrySet()) { + if (entry.getKey() >= startTime && entry.getKey() <= endTime) { + results.add(entry.getValue()); + } } + return results; + } finally { + readWriteLock.readLock().unlock(); } - return results; } @Override @@ -101,44 +118,49 @@ public class InMemoryMetricsRepository implements MetricsRepository timestamp -> metric - Map> resourceMap = allMetrics.get(app); + Map> resourceMap = allMetrics.get(app); if (resourceMap == null) { return results; } final long minTimeMs = System.currentTimeMillis() - 1000 * 60; Map resourceCount = new ConcurrentHashMap<>(32); - for (Entry> resourceMetrics : resourceMap.entrySet()) { - for (Entry metrics : resourceMetrics.getValue().entrySet()) { - if (metrics.getKey() < minTimeMs) { - continue; - } - MetricEntity newEntity = metrics.getValue(); - if (resourceCount.containsKey(resourceMetrics.getKey())) { - MetricEntity oldEntity = resourceCount.get(resourceMetrics.getKey()); - oldEntity.addPassQps(newEntity.getPassQps()); - oldEntity.addRtAndSuccessQps(newEntity.getRt(), newEntity.getSuccessQps()); - oldEntity.addBlockQps(newEntity.getBlockQps()); - oldEntity.addExceptionQps(newEntity.getExceptionQps()); - oldEntity.addCount(1); - } else { - resourceCount.put(resourceMetrics.getKey(), MetricEntity.copyOf(newEntity)); + readWriteLock.readLock().lock(); + try { + for (Entry> resourceMetrics : resourceMap.entrySet()) { + for (Entry metrics : resourceMetrics.getValue().entrySet()) { + if (metrics.getKey() < minTimeMs) { + continue; + } + MetricEntity newEntity = metrics.getValue(); + if (resourceCount.containsKey(resourceMetrics.getKey())) { + MetricEntity oldEntity = resourceCount.get(resourceMetrics.getKey()); + oldEntity.addPassQps(newEntity.getPassQps()); + oldEntity.addRtAndSuccessQps(newEntity.getRt(), newEntity.getSuccessQps()); + oldEntity.addBlockQps(newEntity.getBlockQps()); + oldEntity.addExceptionQps(newEntity.getExceptionQps()); + oldEntity.addCount(1); + } else { + resourceCount.put(resourceMetrics.getKey(), MetricEntity.copyOf(newEntity)); + } } } + // Order by last minute b_qps DESC. + return resourceCount.entrySet() + .stream() + .sorted((o1, o2) -> { + MetricEntity e1 = o1.getValue(); + MetricEntity e2 = o2.getValue(); + int t = e2.getBlockQps().compareTo(e1.getBlockQps()); + if (t != 0) { + return t; + } + return e2.getPassQps().compareTo(e1.getPassQps()); + }) + .map(Entry::getKey) + .collect(Collectors.toList()); + } finally { + readWriteLock.readLock().unlock(); } - // Order by last minute b_qps DESC. - return resourceCount.entrySet() - .stream() - .sorted((o1, o2) -> { - MetricEntity e1 = o1.getValue(); - MetricEntity e2 = o2.getValue(); - int t = e2.getBlockQps().compareTo(e1.getBlockQps()); - if (t != 0) { - return t; - } - return e2.getPassQps().compareTo(e1.getPassQps()); - }) - .map(Entry::getKey) - .collect(Collectors.toList()); } } diff --git a/sentinel-dashboard/src/test/java/com/alibaba/csp/sentinel/dashboard/repository/metric/InMemoryMetricsRepositoryTest.java b/sentinel-dashboard/src/test/java/com/alibaba/csp/sentinel/dashboard/repository/metric/InMemoryMetricsRepositoryTest.java index c4052526..23ef6cbb 100644 --- a/sentinel-dashboard/src/test/java/com/alibaba/csp/sentinel/dashboard/repository/metric/InMemoryMetricsRepositoryTest.java +++ b/sentinel-dashboard/src/test/java/com/alibaba/csp/sentinel/dashboard/repository/metric/InMemoryMetricsRepositoryTest.java @@ -16,17 +16,24 @@ package com.alibaba.csp.sentinel.dashboard.repository.metric; import com.alibaba.csp.sentinel.dashboard.datasource.entity.MetricEntity; - -import org.assertj.core.util.Lists; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.springframework.util.CollectionUtils; +import java.util.ArrayList; import java.util.ConcurrentModificationException; import java.util.Date; import java.util.List; -import java.util.concurrent.*; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.junit.Assert.*; @@ -37,13 +44,11 @@ import static org.junit.Assert.*; */ public class InMemoryMetricsRepositoryTest { - private static final String DEFAULT_APP = "default"; - private static final String DEFAULT_EXPIRE_APP = "default_expire_app"; - private static final String DEFAULT_RESOURCE = "test"; + private final static String DEFAULT_APP = "defaultApp"; + private final static String DEFAULT_RESOURCE = "defaultResource"; private static final long EXPIRE_TIME = 1000 * 60 * 5L; private InMemoryMetricsRepository inMemoryMetricsRepository; - private ExecutorService executorService; @Before @@ -57,27 +62,50 @@ public class InMemoryMetricsRepositoryTest { executorService.shutdownNow(); } - private void testSave() { - for (int i = 0; i < 1000000; i++) { + + @Test + public void testSave() { + MetricEntity entry = new MetricEntity(); + entry.setApp("testSave"); + entry.setResource("testResource"); + entry.setTimestamp(new Date(System.currentTimeMillis())); + entry.setPassQps(1L); + entry.setExceptionQps(1L); + entry.setBlockQps(0L); + entry.setSuccessQps(1L); + inMemoryMetricsRepository.save(entry); + List resources = inMemoryMetricsRepository.listResourcesOfApp("testSave"); + Assert.assertTrue(resources.size() == 1 && "testResource".equals(resources.get(0))); + } + + + @Test + public void testSaveAll() { + List entities = new ArrayList<>(10000); + for (int i = 0; i < 10000; i++) { MetricEntity entry = new MetricEntity(); - entry.setApp(DEFAULT_APP); - entry.setResource(DEFAULT_RESOURCE); + entry.setApp("testSaveAll"); + entry.setResource("testResource" + i); entry.setTimestamp(new Date(System.currentTimeMillis())); entry.setPassQps(1L); entry.setExceptionQps(1L); entry.setBlockQps(0L); entry.setSuccessQps(1L); - inMemoryMetricsRepository.save(entry); + entities.add(entry); } + inMemoryMetricsRepository.saveAll(entities); + List result = inMemoryMetricsRepository.listResourcesOfApp("testSaveAll"); + Assert.assertTrue(result.size() == entities.size()); } + @Test public void testExpireMetric() { long now = System.currentTimeMillis(); MetricEntity expireEntry = new MetricEntity(); - expireEntry.setApp(DEFAULT_EXPIRE_APP); + expireEntry.setApp(DEFAULT_APP); expireEntry.setResource(DEFAULT_RESOURCE); - expireEntry.setTimestamp(new Date(now - EXPIRE_TIME - 10L)); + expireEntry.setTimestamp(new Date(now - EXPIRE_TIME - 1L)); expireEntry.setPassQps(1L); expireEntry.setExceptionQps(1L); expireEntry.setBlockQps(0L); @@ -85,7 +113,7 @@ public class InMemoryMetricsRepositoryTest { inMemoryMetricsRepository.save(expireEntry); MetricEntity entry = new MetricEntity(); - entry.setApp(DEFAULT_EXPIRE_APP); + entry.setApp(DEFAULT_APP); entry.setResource(DEFAULT_RESOURCE); entry.setTimestamp(new Date(now)); entry.setPassQps(1L); @@ -95,47 +123,40 @@ public class InMemoryMetricsRepositoryTest { inMemoryMetricsRepository.save(entry); List list = inMemoryMetricsRepository.queryByAppAndResourceBetween( - DEFAULT_EXPIRE_APP, DEFAULT_RESOURCE, now - 2 * EXPIRE_TIME, now + EXPIRE_TIME); + DEFAULT_APP, DEFAULT_RESOURCE, now - EXPIRE_TIME, now); assertFalse(CollectionUtils.isEmpty(list)); assertEquals(1, list.size()); + assertTrue(list.get(0).getTimestamp().getTime() >= now - EXPIRE_TIME && list.get(0).getTimestamp().getTime() <= now); + } + @Test - public void testListResourcesOfApp() { - // prepare basic test data - testSave(); - System.out.println( "[" + System.currentTimeMillis() + "] Basic test data ready in testListResourcesOfApp"); + public void testConcurrentPutAndGet() { - List futures = Lists.newArrayList(); - - // concurrent query resources of app + List futures = new ArrayList<>(10000); final CyclicBarrier cyclicBarrier = new CyclicBarrier(8); + for (int j = 0; j < 10000; j++) { - futures.add( - CompletableFuture.runAsync(() -> { + final int finalJ = j; + futures.add(CompletableFuture.runAsync(() -> { try { cyclicBarrier.await(); - inMemoryMetricsRepository.listResourcesOfApp(DEFAULT_APP); + if (finalJ % 2 == 0) { + batchSave(); + } else { + inMemoryMetricsRepository.listResourcesOfApp(DEFAULT_APP); + } + } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } - }, executorService) + + }, executorService) ); } - // batch add metric entity - for (int i = 0; i < 10000; i++) { - MetricEntity entry = new MetricEntity(); - entry.setApp(DEFAULT_APP); - entry.setResource(DEFAULT_RESOURCE); - entry.setTimestamp(new Date(System.currentTimeMillis() - EXPIRE_TIME - 1000L)); - entry.setPassQps(1L); - entry.setExceptionQps(1L); - entry.setBlockQps(0L); - entry.setSuccessQps(1L); - inMemoryMetricsRepository.save(entry); - } CompletableFuture all = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); @@ -155,4 +176,19 @@ public class InMemoryMetricsRepositoryTest { } } + private void batchSave() { + for (int i = 0; i < 100; i++) { + MetricEntity entry = new MetricEntity(); + entry.setApp(DEFAULT_APP); + entry.setResource(DEFAULT_RESOURCE); + entry.setTimestamp(new Date(System.currentTimeMillis())); + entry.setPassQps(1L); + entry.setExceptionQps(1L); + entry.setBlockQps(0L); + entry.setSuccessQps(1L); + inMemoryMetricsRepository.save(entry); + } + } + + } \ No newline at end of file