小男孩‘自慰网亚洲一区二区,亚洲一级在线播放毛片,亚洲中文字幕av每天更新,黄aⅴ永久免费无码,91成人午夜在线精品,色网站免费在线观看,亚洲欧洲wwwww在线观看

分享

Pigeon源碼分析(一)-- 服務注冊

 Coder編程 2022-11-20 發(fā)布于北京

跟代碼過程中發(fā)現(xiàn)的問題記錄下

 ServiceFactory是一切的基礎,它是通過靜態(tài)代碼塊初始化的

static {
        try {
            ProviderBootStrap.init();
            String appname = ConfigManagerLoader.getConfigManager().getAppName();
            if (StringUtils.isBlank(appname) || "NULL".equalsIgnoreCase(appname)) {
                throw new RuntimeException("appname is not assigned");
            }
        } catch (Throwable var1) {
            logger.error("error while initializing service factory:", var1);
            System.exit(1);
        }

    }

  實際調(diào)用的是  ProviderBootStrap

ServiceFactory ServiceFactory.addService(ProviderConfig<T> providerConfig) 
public static <T> void addService(ProviderConfig<T> providerConfig) throws RpcException {
        if (StringUtils.isBlank(providerConfig.getUrl())) {
            providerConfig.setUrl(getServiceUrl(providerConfig));
        }

        try {
            ServicePublisher.addService(providerConfig);
            ServerConfig serverConfig = ProviderBootStrap.startup(providerConfig);
            providerConfig.setServerConfig(serverConfig);
            ServicePublisher.publishService(providerConfig, false);
        } catch (RegistryException var2) {
            throw new RpcException("error while adding service:" + providerConfig, var2);
        } catch (Throwable var3) {
            throw new RpcException("error while adding service:" + providerConfig, var3);
        }
    }

  

public static ServerConfig startup(ProviderConfig<?> providerConfig) {
    ServerConfig serverConfig = providerConfig.getServerConfig();
    if (serverConfig == null) {
        throw new IllegalArgumentException("server config is required");
    }
    // 根據(jù)protocol+port判斷當前服務器列表中是否存在相關的服務器實例
    Server server = serversMap.get(serverConfig.getProtocol() + serverConfig.getPort());
    if (server != null) {
        // 存在直接返回
        server.addService(providerConfig);
        return server.getServerConfig();
    } else {
        // 不存在先創(chuàng)建一個
        synchronized (ProviderBootStrap.class) {
            List<Server> servers = ExtensionLoader.newExtensionList(Server.class);
            for (Server s : servers) {
                if (!s.isStarted()) {
                    if (s.support(serverConfig)) {
                        s.start(serverConfig);
                        // 添加服務
                        s.addService(providerConfig);
                        serversMap.put(s.getProtocol() + serverConfig.getPort(), s);
                        logger.warn("pigeon " + s + "[version:" + VersionUtils.VERSION + "] has been started");
                        break;
                    }
                }
            }
            server = serversMap.get(serverConfig.getProtocol() + serverConfig.getPort());
            // 預啟動內(nèi)部的請求處理器核心線程
            if (server != null) {
                server.getRequestProcessor().getRequestProcessThreadPool().prestartAllCoreThreads();
                return server.getServerConfig();
            }
            return null;
        }
    }
}

  

public <T> void addService(ProviderConfig<T> providerConfig) {
        this.requestProcessor.addService(providerConfig);
        this.doAddService(providerConfig);
        List<ServiceChangeListener> listeners = ServiceChangeListenerContainer.getListeners();
        Iterator i$ = listeners.iterator();

        while(i$.hasNext()) {
            ServiceChangeListener listener = (ServiceChangeListener)i$.next();
            listener.notifyServiceAdded(providerConfig);
        }

    }

  RequestThreadPoolProcessor.addService(ProviderConfig<T> providerConfig) 負責創(chuàng)建線程池并緩存起來

public synchronized <T> void addService(ProviderConfig<T> providerConfig) {
        String url = providerConfig.getUrl();
        Map<String, ProviderMethodConfig> methodConfigs = providerConfig.getMethods();
        ServiceMethodCache methodCache = ServiceMethodFactory.getServiceMethodCache(url);
        Set<String> methodNames = methodCache.getMethodMap().keySet();
        if (this.needStandalonePool(providerConfig)) {
            if (this.methodThreadPools == null) {
                this.methodThreadPools = new ConcurrentHashMap();
            }

            if (this.serviceThreadPools == null) {
                this.serviceThreadPools = new ConcurrentHashMap();
            }

            if (providerConfig.getActives() > 0 && CollectionUtils.isEmpty(methodConfigs)) {
                DynamicThreadPool pool = (DynamicThreadPool)this.serviceThreadPools.get(url);
                if (pool == null) {
                    int actives = providerConfig.getActives();
                    int coreSize = (int)((float)actives / DEFAULT_POOL_RATIO_CORE) > 0 ? (int)((float)actives / DEFAULT_POOL_RATIO_CORE) : actives;
                    pool = new DynamicThreadPool("Pigeon-Server-Request-Processor-service", coreSize, actives, actives);
                    this.serviceThreadPools.putIfAbsent(url, pool);
                }
            }

            if (!CollectionUtils.isEmpty(methodConfigs)) {
                Iterator i$ = methodNames.iterator();

                while(i$.hasNext()) {
                    String name = (String)i$.next();
                    if (methodConfigs.containsKey(name)) {
                        String key = url + "#" + name;
                        DynamicThreadPool pool = (DynamicThreadPool)this.methodThreadPools.get(key);
                        if (pool == null) {
                            int actives = DEFAULT_POOL_ACTIVES;
                            ProviderMethodConfig methodConfig = (ProviderMethodConfig)methodConfigs.get(name);
                            if (methodConfig != null && methodConfig.getActives() > 0) {
                                actives = methodConfig.getActives();
                            }

                            int coreSize = (int)((float)actives / DEFAULT_POOL_RATIO_CORE) > 0 ? (int)((float)actives / DEFAULT_POOL_RATIO_CORE) : actives;
                            pool = new DynamicThreadPool("Pigeon-Server-Request-Processor-method", coreSize, actives, actives);
                            this.methodThreadPools.putIfAbsent(key, pool);
                        }
                    }
                }
            }
        }

    }

二 服務的發(fā)布

  

1 調(diào)用鏈路

ServiceFactory.addService(ProviderConfig<T> providerConfig)

-> ServicePublisher.addService(providerConfig); 這里調(diào)用主要是緩存服務信息

-> RequestThreadPoolProcessor.addService(ProviderConfig<T> providerConfig) 不知道怎么就走到這里了,以后還得多跟代碼

-> AbstractServer.doAddService

AbstractServer的實現(xiàn)類默認的是NettyServer

-> ServerConfig serverConfig = ProviderBootStrap.startup(providerConfig);

-> ServicePublisher.publishService

String url, String registryUrl, int port, String group

url = http://service./cargo-detail/DriverVisitDetailFacade_1.0.0

registryUrl = @HTTP@http://service./cargo-detail/DriverVisitDetailFacade_1.0.0

port = 4080

group = hfd33

2 寫zk節(jié)點

RegistryManager.getInstance().registerSupportNewProtocol(serverAddress, registryUrl, false);

serverAddress = 10.190.20.66:4080

registryUrl = @HTTP@http://service./cargo-detail/DriverVisitDetailFacade_1.0.0

通過跟代碼解析出來的zk地址是 10.13.65.186:2181,10.13.65.187:2181,10.13.65.188:2181

setSupportNewProtocol的意義是是否支持新協(xié)議,什么是新協(xié)議呢,比如thrift,false沒關系的

CuratorRegistry.setSupportNewProtocol
public void setSupportNewProtocol(String serviceAddress, String serviceName, boolean support) throws RegistryException {
      try {
          String protocolPath = Utils.getProtocolPath(serviceAddress);//       /DP/PROTOCOL/10.190.20.66:4080
          Stat stat = new Stat();
           
          String info = this.client.get(protocolPath, stat);
          // zk中的一個節(jié)點 {"@HTTP@http://service./cargo-detail/DriverVisitDetailFacade_1.0.0":false,"@HTTP@http://service./cargo-detail/cargoDetailFacade_1.0.0":false,"@HTTP@http://service./cargo-detail/cargoStatusFacade_1.0.0":false}
          if (info != null) {
              Map<String, Boolean> infoMap = Utils.getProtocolInfoMap(info);
              infoMap.put(serviceName, support);
              // infoMap的內(nèi)容
              @HTTP@http://service./cargo-detail/DriverVisitDetailFacade_1.0.0 -> {Boolean@17173} false
@HTTP@http://service./cargo-detail/cargoDetailFacade_1.0.0 -> {Boolean@17173} false
@HTTP@http://service./cargo-detail/cargoStatusFacade_1.0.0 -> {Boolean@17173} false 不知道為啥 這三個都是false是不是因為這個導致了 服務不對外提供呢
              this.client.set(protocolPath, Utils.getProtocolInfo(infoMap), stat.getVersion());
              //這個就是寫zk
          } else {
              Map<String, Boolean> infoMap = ImmutableMap.of(serviceName, support);
              this.client.create(protocolPath, Utils.getProtocolInfo(infoMap));
          }

上面的是protocol在zk上的格式,還有另一個更重要的zk節(jié)點

registerPersistentNode

該方法不止會寫一個zk節(jié)點,分別如下

1 this.client.set(weightPath, "" + weight);

/DP/WEIGHT/10.190.20.66:4080 1

2 this.client.create(servicePath, serviceAddress);

/DP/SERVER/@HTTP@http:^^service.^cargo-detail^DriverVisitDetailFacade_1.0.0/hfd33

10.190.20.66:4080

3

public void setServerApp(String serverAddress, String app) {
  String path = Utils.getAppPath(serverAddress);// /DP/APP/10.190.20.66:4080
  if (StringUtils.isNotBlank(app)) {
      try {
          this.client.set(path, app);// app = cargo-detail
      } catch (Throwable var5) {
          logger.error("failed to set app of " + serverAddress + " to " + app);
      }
  }
?
}

4 RegistryManager.getInstance().setServerVersion(serverAddress, "2.7.8"); // serverAddress = 10.190.20.66:4080

最后寫入zk的path是 /DP/VERSION/10.190.20.66:4080

 

    本站是提供個人知識管理的網(wǎng)絡存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點。請注意甄別內(nèi)容中的聯(lián)系方式、誘導購買等信息,謹防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊一鍵舉報。
    轉(zhuǎn)藏 分享 獻花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多