/* * 说明责任链数量已经超出最大允许数量,后面将没有规则会被检查 * Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE}, * so no rule checking will be done. */ if (chain == null) { returnnew CtEntry(resourceWrapper, null, context); }
// 创建当前条目 Entry e = new CtEntry(resourceWrapper, chain, context); try { // 触发责任链(从第一个开始执行到最后一个责任链节点,主要有创建节点、统计指标、验证各种规则...) chain.entry(context, resourceWrapper, null, count, prioritized, args); } catch (BlockException e1) { // 被阻塞后退出当前条目,并统计指标 e.exit(count, args); throw e1; } catch (Throwable e1) { // This should not happen, unless there are errors existing in Sentinel internal. RecordLog.info("Sentinel unexpected exception", e1); } return e; }
public ProcessorSlotChain build(){ // 创建调用链对象 ProcessorSlotChain chain = new DefaultProcessorSlotChain();
// Note: the instances of ProcessorSlot should be different, since they are not stateless. // 通过SPI发现并加载并排序所有的调用链节点 List<ProcessorSlot> sortedSlotList = SpiLoader.loadPrototypeInstanceListSorted(ProcessorSlot.class); for (ProcessorSlot slot : sortedSlotList) { if (!(slot instanceof AbstractLinkedProcessorSlot)) { RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain"); continue; } // 按顺序依次将调用链节点添加都最后一个,并关联下一个节点 chain.addLast((AbstractLinkedProcessorSlot<?>) slot); }
publicstatic <T> List<T> loadPrototypeInstanceListSorted(Class<T> clazz){ try { // @1 // Not use SERVICE_LOADER_MAP, to make sure the instances loaded are different. ServiceLoader<T> serviceLoader = ServiceLoaderUtil.getServiceLoader(clazz);
List<SpiOrderWrapper<T>> orderWrappers = new ArrayList<>(); for ( T spi : serviceLoader ) { // @2 int order = SpiOrderResolver.resolveOrder(spi); // @3 // Since SPI is lazy initialized in ServiceLoader, we use online sort algorithm here. SpiOrderResolver.insertSorted(orderWrappers, spi, order); RecordLog.debug("[SpiLoader] Found {} SPI: {} with order {}", clazz.getSimpleName(), spi.getClass().getCanonicalName(), order); } List<T> list = new ArrayList<>(orderWrappers.size()); // @4 for (int i = 0; i < orderWrappers.size(); i++) { list.add(orderWrappers.get(i).spi); } return list; } catch (Throwable t) { RecordLog.error("[SpiLoader] ERROR: loadPrototypeInstanceListSorted failed", t); t.printStackTrace(); returnnew ArrayList<>(); } }
/** * SampleCountProperty.SAMPLE_COUNT = 2 * IntervalProperty.INTERVAL = 1000 * Holds statistics of the recent {@code INTERVAL} seconds. The {@code INTERVAL} is divided into time spans * by given {@code sampleCount}. */ privatetransientvolatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL);
/** * Holds statistics of the recent 60 seconds. The windowLengthInMs is deliberately set to 1000 milliseconds, * meaning each bucket per second, in this way we can get accurate statistics of each second. */ privatetransient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
public WindowWrap<T> currentWindow(){ return currentWindow(TimeUtil.currentTimeMillis()); }
public WindowWrap<T> currentWindow(long timeMillis){ if (timeMillis < 0) { returnnull; }
// private int calculateTimeIdx(long timeMillis) { // long timeId = timeMillis / windowLengthInMs; // // Calculate current index so we can map the timestamp to the leap array. // return (int)(timeId % array.length()); // } int idx = calculateTimeIdx(timeMillis); // Calculate current bucket start time. long windowStart = calculateWindowStart(timeMillis);
/* * Get bucket item at given time from the array. * * (1) Bucket is absent, then just create a new bucket and CAS update to circular array. * (2) Bucket is up-to-date, then just return the bucket. * (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets. */ while (true) { WindowWrap<T> old = array.get(idx); if (old == null) { WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); if (array.compareAndSet(idx, null, window)) { return window; } else { Thread.yield(); } } elseif (windowStart == old.windowStart()) { return old; } elseif (windowStart > old.windowStart()) { if (updateLock.tryLock()) { try { // Successfully get the update lock, now we reset the bucket. return resetWindowTo(old, windowStart); } finally { updateLock.unlock(); } } else { // Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield(); } } elseif (windowStart < old.windowStart()) { // Should not go through here, as the provided time is already behind. returnnew WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); } } }
if (resourceWrapper.getEntryType() == EntryType.IN) { // Add count for global inbound entry node for global statistics. Constants.ENTRY_NODE.increaseBlockQps(count); }
try { // Do some checking. fireEntry(context, resourceWrapper, node, count, prioritized, args); ...省略部分代码 } catch (Throwable e) { // Unexpected error, set error to current entry. context.getCurEntry().setError(e);
// This should not happen. node.increaseExceptionQps(count); if (context.getCurEntry().getOriginNode() != null) { context.getCurEntry().getOriginNode().increaseExceptionQps(count); }
if (resourceWrapper.getEntryType() == EntryType.IN) { Constants.ENTRY_NODE.increaseExceptionQps(count); } throw e; }
publicstaticvoidtrace(Throwable e, int count){ traceContext(e, count, ContextUtil.getContext()); } publicstaticvoidtraceContext(Throwable e, int count, Context context){ if (!shouldTrace(e)) { return; }
if (context.getCurEntry().getError() == null) { // Calculate response time (max RT is statisticMaxRt from SentinelConfig). long rt = TimeUtil.currentTimeMillis() - context.getCurEntry().getCreateTime(); int maxStatisticRt = SentinelConfig.statisticMaxRt(); if (rt > maxStatisticRt) { rt = maxStatisticRt; }
// Record response time and success count. node.addRtAndSuccess(rt, count); if (context.getCurEntry().getOriginNode() != null) { context.getCurEntry().getOriginNode().addRtAndSuccess(rt, count); }
node.decreaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) { context.getCurEntry().getOriginNode().decreaseThreadNum(); }
if (resourceWrapper.getEntryType() == EntryType.IN) { Constants.ENTRY_NODE.addRtAndSuccess(rt, count); Constants.ENTRY_NODE.decreaseThreadNum(); } } else { // Error may happen. }
publiclongsum(){ long sum = base; Cell[] as = cells; if (as != null) { int n = as.length; for (int i = 0; i < n; ++i) { Cell a = as[i]; if (a != null) { sum += a.value; } } } return sum; }