<dfn id="hx5t3"><strike id="hx5t3"><em id="hx5t3"></em></strike></dfn>

    <thead id="hx5t3"></thead><nobr id="hx5t3"><font id="hx5t3"><rp id="hx5t3"></rp></font></nobr>

    <listing id="hx5t3"></listing>

    <var id="hx5t3"></var>
    <big id="hx5t3"></big>

      
      

      <output id="hx5t3"><ruby id="hx5t3"></ruby></output>
      <menuitem id="hx5t3"><dfn id="hx5t3"></dfn></menuitem>

      <big id="hx5t3"></big>

        賈順名

        賈順名 查看完整檔案

        北京編輯  |  填寫畢業院校快手  |  資深研發工程師 編輯 github.com/jiasm 編輯
        編輯

        個人動態

        賈順名 發布了文章 · 2020-12-15

        ioredis源碼閱讀[0]

        最近因為工作需要,要去搞一個 Node.js 端的 Redis Client 組件出來,暫時選擇通過 ioredis 來作為 fork 對象。
        因為之前有遇到過 Redis 在使用 twemproxy 時會一直出現無法連接服務器的問題,詳情見 issues:https://github.com/luin/ioredis/issues/573
        所以會修改源碼修改這一問題,不過在修改完成之后跑單元測試發現,事情沒有那么簡單,并不只是 info -> ping 這樣,所以只好去熟悉源碼,然后針對性地調整一下邏輯。

        <!--more-->

        ioredis 項目結構

        從項目中看,源碼都在 lib 文件夾下,是一個純粹的 TS 項目。
        lib 目錄下的文件主要是一些通用能力的提供,比如 command、pipeline以及數據的傳輸等。

        .
        ├── DataHandler.ts                    # 數據處理
        ├── ScanStream.ts
        ├── SubscriptionSet.ts
        ├── autoPipelining.ts
        ├── cluster                           # Redis Cluster 模式的實現
        │?? ├── ClusterOptions.ts
        │?? ├── ClusterSubscriber.ts
        │?? ├── ConnectionPool.ts
        │?? ├── DelayQueue.ts
        │?? ├── index.ts
        │?? └── util.ts
        ├── command.ts                        # 命令的具體實現
        ├── commander.ts                      # command 的調度方
        ├── connectors                        # 網絡連接相關
        │?? ├── AbstractConnector.ts
        │?? ├── SentinelConnector
        │?? ├── StandaloneConnector.ts
        │?? └── index.ts
        ├── errors                            # 異常信息相關
        │?? ├── ClusterAllFailedError.ts
        │?? ├── MaxRetriesPerRequestError.ts
        │?? └── index.ts
        ├── index.ts                          # 入口文件
        ├── pipeline.ts                       # 管道邏輯
        ├── promiseContainer.ts               # Promise 的一個封裝
        ├── Redis                             # `Redis 實例的實現`
        │?? ├── RedisOptions.ts
        │?? ├── event_handler.ts
        │?? └── index.ts
        ├── script.ts
        ├── transaction.ts
        ├── types.ts
        └── utils                             # 一些工具函數的實現
            ├── debug.ts
            ├── index.ts
            └── lodash.ts

        而下分的兩個文件夾,rediscluster 都是具體的 redisclient 實現,cluster 是對應的 cluster 集群化實現。
        所以在看 README 的時候我們會發現有兩種實例可以使用,https://www.npmjs.com/package/ioredis

        new Redis
        new Redis.Cluster

        我們先從最普通的 Redis 開始看,本篇筆記主要是針對 Redis,結合著 README 一步步捋邏輯。

        const `Redis` = require("ioredis");
        const `Redis` = `new Redis`();
        
        redis.set("foo", "bar");
        redis.get("foo", function (err, result) {
          if (err) {
            console.error(err);
          } else {
            console.log(result);
          }
        });

        最基礎的使用順序,首先實例化一個 Redis 對象,然后調用 Redis 對應的命令,如果對 Redis 命令不熟悉可以看一下這個網站:https://redis.io/commands#

        入口代碼位于 redis/index.ts,雖說 ioredis 用了 TS,但是構造函數的實現依然使用的是很古老的 ES5 方式,分別繼承了 EventEmitterCommander 兩個類,第一個是 events 的,第二個則是 ioredis 自己提供的一個類,就在 commander.ts 文件中實現。

        Redis 實例化

        Redis 主要做的事情就是:

        • 建立并維護與 Redis Server 的網絡連接
        • 健康檢查
        • 維護隊列在異常情況下保證請求不丟,可重試

        看回到 Redis 會看到針對 this.connector 的一個賦值,拋開自定義的 ConnectorSentinels 只看最后一項最普通的 StandaloneConnector,這里就是用來建立與 Redis Server 的連接的。
        翻看 lib/connectors/StandaloneConnector.ts 的文件會發現,最終調用的是 net.createConnection,這個其實也能和咱們在上邊提到的 RESP 所對應上,就是用的最基本的 Redis 通訊協議來完成操作的。

        各項參數初始化完畢后,則會調用 connect 來與 Redis Server 建立真正的連接。

        net 模塊的 createConnection 只能建立網絡連接,并不能保證是我們預期的 Redis 服務。
        通過 connect 拿到的 stream 對象其實就是 socket clienthttps://github.com/luin/ioredis/blob/master/lib/redis/index.ts#L321
        connect 方法中主要就是去建立與 Redis Server 的鏈接,在建立連接以后,我們會調用 event_handler.connectHandler方法。
        這里主要做了兩件事:

        1. 去嘗試 check Redis Server 的狀態,也就是我們最開始提到的遇到的那個坑了,我們可以通過 Redis.prototype._readyCheck 方法看到具體的實現, ioredis 采用 info 命令作為探針,但是這個在 twemproxy 集群模式下就會產生一些問題,因為該模式會禁用一些命令,其中就包括 info,那么這就會導致 Redis Client 始終認為服務是不可用的。
        2. 添加了針對 socket clientdata 事件監聽,這里是用于后續接受返回數據的,主要邏輯在 DataHandler.ts,后邊會提到。
        readyCheck 的邏輯存在于 redis/index.ts 和 redis/event_handler.ts 文件中
        Redis.prototype._readyCheck = function (callback) {
          const _this = this;
          this.info(function (err, res) {
            if (err) {
              return callback(err);
            }
            if (typeof res !== "string") {
              return callback(null, res);
            }
        
            const info: { [key: string]: any } = {};
        
            const lines = res.split("\r\n");
            for (let i = 0; i < lines.length; ++i) {
              const [fieldName, ...fieldValueParts] = lines[i].split(":");
              const fieldValue = fieldValueParts.join(":");
              if (fieldValue) {
                info[fieldName] = fieldValue;
              }
            }
        
            if (!info.loading || info.loading === "0") {
              callback(null, info);
            } else {
              const loadingEtaMs = (info.loading_eta_seconds || 1) * 1000;
              const retryTime =
                _this.options.maxLoadingRetryTime &&
                _this.options.maxLoadingRetryTime < loadingEtaMs
                  ? _this.options.maxLoadingRetryTime
                  : loadingEtaMs;
              debug("Redis server still loading, trying again in " + retryTime + "ms");
              setTimeout(function () {
                _this._readyCheck(callback);
              }, retryTime);
            }
          });
        };

        在檢測 Redis 可用以后則會觸發 callback,該 callback 還會去檢查 offlineQueue 是否有值,可以理解為是 Redis 可用之前調用命令的那些記錄, ioredis 并不會直接報錯告訴你說連接未建立,而是暫存在自己的一個隊列中,等到可用后按照順序發出去。
        Redis 在實例化的過程中主要也就是做了這些事情,接下來我們就要看 Redis 命令發出以后,具體執行的邏輯了。

        Commander

        Commander 的作用就是實現了各種 Redis Client 的命令,通過 https://www.npmjs.com/package/redis-commands 遍歷得到的。
        同時會針對 ClientReady 狀態進行處理,在 Ready 之前會做一些暫存命令之類的操作。
        比較像是一個抽象類,因為 RedisRedis Cluster 都會繼承并覆蓋一些 API 來完成工作。

        commands.forEach(function (commandName) {
          Commander.prototype[commandName] = generateFunction(commandName, "utf8");
          Commander.prototype[commandName + "Buffer"] = generateFunction(
            commandName,
            null
          );
        });
        
        function generateFunction(_encoding: string);
        function generateFunction(_commandName: string | void, _encoding: string);
        function generateFunction(_commandName?: string, _encoding?: string) {
          if (typeof _encoding === "undefined") {
            _encoding = _commandName;
            _commandName = null;
          }
        
          return function (...args) {
            const commandName = _commandName || args.shift();
            let callback = args[args.length - 1];
        
            if (typeof callback === "function") {
              args.pop();
            } else {
              callback = undefined;
            }
        
            const options = {
              errorStack: this.options.showFriendlyErrorStack
                ? new Error().stack
                : undefined,
              keyPrefix: this.options.keyPrefix,
              replyEncoding: _encoding,
            };
        
            if (this.options.dropBufferSupport && !_encoding) {
              return asCallback(
                PromiseContainer.get().reject(new Error(DROP_BUFFER_SUPPORT_ERROR)),
                callback
              );
            }
        
            // No auto pipeline, use regular command sending
            if (!shouldUseAutoPipelining(this, commandName)) {
              return this.sendCommand(
                new Command(commandName, args, options, callback)
              );
            }
        
            // Create a new pipeline and make sure it's scheduled
            return executeWithAutoPipelining(this, commandName, args, callback);
          };
        }

        在實現所有命令的同時還實現了一批 Buffer 后綴的 API,他們主要的區別我們可以通過 generateFunction 函數的實現來看到,被傳入到了 Command 實例中。
        Command 對象則是具體的命令實現,所以我們還需要先去看一下 Command。

        Command

        Command 負責的事情,主要是參數的處理、返回值的處理,生成命令傳輸的實際值以及 callback 的觸發。

        實例化

        Command 的實例化過程中,除去一些屬性的賦值,還調用了一個 initPromise 方法,在內部生成了一個 Promise 對象。
        其中有兩處比較重要的處理,一個是關于參數的轉換,還有一個是返回值的處理。

        private initPromise() {
          const Promise = getPromise();
          const promise = new Promise((resolve, reject) => {
            if (!this.transformed) {
              this.transformed = true;
              const transformer = Command._transformer.argument[this.name];
              if (transformer) {
                this.args = transformer(this.args);
              }
              this.stringifyArguments();
            }
        
            this.resolve = this._convertValue(resolve);
            if (this.errorStack) {
              this.reject = (err) => {
                reject(optimizeErrorStack(err, this.errorStack, __dirname));
              };
            } else {
              this.reject = reject;
            }
          });
        
          this.promise = asCallback(promise, this.callback);
        }

        參數、返回值特殊處理

        如果檢索 Command.ts 文件,會發現 Command._transformer.argument 通過 setArgumentTransformer 方法進行設置。
        然后再觀察代碼中有用到 setArgumentTransformer 的是少數幾個 hset 命令,以及 mset 命令。

        Command.setArgumentTransformer("hmset", function (args) {
          if (args.length === 2) {
            if (typeof Map !== "undefined" && args[1] instanceof Map) {
              return [args[0]].concat(convertMapToArray(args[1]));
            }
            if (typeof args[1] === "object" && args[1] !== null) {
              return [args[0]].concat(convertObjectToArray(args[1]));
            }
          }
          return args;
        });

        如果大家使用過 Redishash set 操作,應該都會知道,操作多個鍵值的方式是通過追加參數完成的:

        > HMSET key field value [field value ...]

        這樣在 JS 中使用也需要將一個數組傳遞進去,由用戶自己維護數組的 key value,這樣一個順序的操作方式,必然是沒有寫 JS 習慣的 Object 傳參要舒服的,所以 ioredis 提供一個參數轉換的邏輯,用來將 Object 轉換為一維數組:

        export function convertObjectToArray(obj) {
          const result = [];
          const keys = Object.keys(obj);
        
          for (let i = 0, l = keys.length; i < l; i++) {
            result.push(keys[i], obj[keys[i]]);
          }
          return result;
        }
        
        export function convertMapToArray<K, V>(map: Map<K, V>): Array<K | V> {
          const result = [];
          let pos = 0;
          map.forEach(function (value, key) {
            result[pos] = key;
            result[pos + 1] = value;
            pos += 2;
          });
          return result;
        }

        如果仔細看 Command._transformer 會發現還有一個 reply 屬性值,這里的邏輯主要在 _convertValue 中有所體現,大致就是在接收到返回值以后,會先調用我們傳入的自定義函數用來處理返回值。
        目前翻代碼用到的唯一一處是 hgetall 的處理邏輯,hmgethgetallRedis 中都是返回一個數組的數據,而 ioredis 將數組按照 kv 的格式拼接為一個 Object 方便用戶操作。

        Command.setReplyTransformer("hgetall", function (result) {
          if (Array.isArray(result)) {
            const obj = {};
            for (let i = 0; i < result.length; i += 2) {
              obj[result[i]] = result[i + 1];
            }
            return obj;
          }
          return result;
        });

        設置 key 前綴

        如果看 Command 實例化的過程中,還會發現有 _iterateKeys 這樣的一個函數調用,該函數具有兩個作用:

        1. 提取參數中所有的 key
        2. 可選的將 key 添加一個前綴(prefix)

        函數內部使用了 redis-commands 的兩個 API, existsgetKeyIndexes,用來獲取參數數組中所有的 key 的下標。
        因為這個函數做了兩件事,所以在第一次看到構造函數的用法時,再看函數具體的實現,會對最后返回的 this.keys 很疑惑,但是當看到 Command 還提供了一個 getKeys API 就能夠明白是怎樣的邏輯了。

        如果設置了 keyPrefix ,則會觸發 _iterateKeys 用來調整 key 名,并存儲到 keys 中用于返回值。
        當調用 getKeys 時,如果沒有設置 keyPrefix ,則會用默認的空處理函數來執行同樣的邏輯,就是獲取所有的 key,然后返回出去;如果之前已經設置過 keyPrefix 那么就會直接返回 this.keys 不再重復執行邏輯。

        image

        // 構造函數內邏輯
        if (options.keyPrefix) {
          this._iterateKeys((key) => options.keyPrefix + key);
        }
        
        // 另一處調用的位置
        public getKeys(): Array<string | Buffer> {
          return this._iterateKeys();
        }
        
        private _iterateKeys(
          transform: Function = (key) => key
        ): Array<string | Buffer> {
          if (typeof this.keys === "undefined") {
            this.keys = [];
            if (commands.exists(this.name)) {
              const keyIndexes = commands.getKeyIndexes(this.name, this.args);
              for (const index of keyIndexes) {
                this.args[index] = transform(this.args[index]);
                this.keys.push(this.args[index] as string | Buffer);
              }
            }
          }
          return this.keys;
        }

        發送命令數據的生成

        大家使用 Redis 應該更多的是通過代碼中的 Client 調用各種命令來做,偶爾會通過 redis-cli 直接命令行操作。
        但其實 Redis 使用了一個叫做 RESP (REdis Serialization Protocol) 的協議來進行傳輸。
        如果本機有 Redis 的話,我們在本地可以很簡單的進行演示。

        > echo -e '*1\r\n$4\r\nPING\r\n' | nc 127.0.0.1 6379
        +PONG

        我們會得到一個 +PONG 字符串。這樣的一個交互其實才是絕大多數 ClientRedis Server 交互時所使用的格式。

        P.S. RESP 有提供人類可讀的版本進行交互,但是性能相對要低一些。

        舉例說明如果我們要執行一個 set 和一個 get 應該怎樣去寫這個命令:

        # 開頭代表注釋
        
        # SET hello world
        # 參數個數
        *3
        # 該行命令值的長度(set 命令)
        $3
        # 命令對應的值(set 命令)
        SET
        # 該行命令值的長度(具體的 key: hello)
        $5
        # 命令對應的值(具體的 key: hello)
        hello
        # 該行命令值的長度(value 的長度)
        $5
        # 命令對應的值(value 本體)
        world
        
        # GET hello
        # 參數個數
        *2
        # 該行命令值的長度(get 命令)
        $3
        # 命令對應的值(get 命令)
        GET
        # 該行命令值的長度(具體的 key: hello)
        $5
        # 命令對應的值(具體的 key: hello)
        hello

        set 的返回值沒什么意外,就是一個 +OK,而 get 的返回值則有兩行,第一行 $5 表示返回值的長度,第二行才是真正的返回值 world。
        所以如果去看 Command 的 toWritable 函數就是實現了這樣的邏輯,因為比較長所以就不貼了:https://github.com/luin/ioredis/blob/master/lib/command.ts#L269

        Command 主要實現的就是這些邏輯,我們在 Commander 的視線中可以看到所有命令調用的末尾都會執行 this.sendCommand, 具體的調度就是在 Redis、Redis Cluster 等具體的實現中做的了。所以我們可以回到 Redis 去看下實現邏輯。

        Redis 發送命令

        sendCommand 的實現中,會進行 Redis 狀態的檢查,如果是 wait 或者 end 之類的,會進行對應的處理。
        然后我們會去檢查當前是否是一個可以發送命令的狀態:

        let writable =
            this.status === "ready" ||
            (!stream &&
              this.status === "connect" &&
              commands.exists(command.name) &&
              commands.hasFlag(command.name, "loading"));
          if (!this.stream) {
            writable = false;
          } else if (!this.stream.writable) {
            writable = false;
          } else if (this.stream._writableState && this.stream._writableState.ended) {
            writable = false;
          }

        代碼還算比較清晰,這里也要提到一點,我們在處理 info 命令的問題是,使用 ping 命令來代替 info,最初就卡在了這里,后續 debug 發現, ping 命令并不具備 loading 這一 flag 特性,所以 ping 命令都被放到了 offlineQueue 中,針對這一情況,我們將 ping 添加一個額外的判斷邏輯,確保 write 的值為真。

        接下來如果 write 為真,那么我們就會使用 stream 也就是前邊建立的 socket 連接來發送我們真實的命令了,這時候就是調用的 write 并將 Command#toWritable 的返回值作為數據傳進去,也就是之前提到的基于 RESP 格式的序列化。
        同時會將一些信息放到 commandQueue 中,它和 offlineQueue 都是同一個類型的實例,后邊會提到具體的作用。

        this.commandQueue.push({
          command: command,  // Command 實例
          stream: stream,    // socket client(其實并沒有地方會用到它,不知道為什么要傳過去)
          select: this.condition.select, // 這個也是沒有被用到
        });
        另一個開源的模塊, denque: https://www.npmjs.com/package...

        如果 write 為假,那么命令就會被放到 offlineQueue 中。

        結束邏輯后會把 command.promise 進行返回,我們在 Command 實例化過程中可以看到,其實是實例化了一個 Promise 對象,并把 resolvereject 做了一次引用,后邊在數據返回時會用到。
        當我們命令已經發送完畢后,那么下一步就是等數據返回了,這里就要說到前邊在介紹 React 實例化后 connect 所調用的 DataHandler 實例所做的事情了。

        DataHandler

        DataHandler 是一個比較另類的類的寫法,因為使用時就直接 new 了但并沒有接收返回值。
        在構造函數中,就做了兩件事,一個是實例化了一個 RedisParser 對象,另一個就是監聽了 redis.stream.on('data') 事件,也就是我們在實例化 Redis 時傳遞過來的 socket client,在 data 事件觸發時調用 RedisParser.execute 來完成解析。
        RedisParser 是另一個開源模塊了,有興趣的小伙伴可以看這里:https://www.npmjs.com/package/redis-parser
        目前可以認為在調用 execute 方法后會調用實例化時傳入的 return Reply 就可以了,這是一個解析后的 response,我們會拿到這個 response 之后會從 commandQueue 中依次取出之前傳入的對象。
        取出的方式是按照隊列的方式來取的,通過 shift,每次取出隊列中的第一個元素。
        然后調用元素中 command 屬性的 resolve 方法,也就是我們在調用各種 Redis 命令時傳入的 callback 了。

        這里需要補充一些 Redis 相關的知識,我們從整個邏輯鏈路可以看到,大致是這樣的:

        1. 用戶執行命令
        2. Redis 實例化 Command 并放入隊列
        3. 接收到數據響應后解析數據,并獲取隊列中第一個元素,調用對應的 callback

        同時間可能會有很多 Redis 請求被發出去,但是再接收到數據后并不需要去判斷這次響應對應的是哪一個 command,因為 Redis 本身也是一個單進程的工作模式,命令的處理也會按照接收數據的先后順序來處理,因為本身 ioredis 用的也是同一個 socket 連接,所以也不會存在說命令發送到遠端的先后順序會發生變化。
        所以我們就可以很放心的通過最簡單的方式, push + shift 來處理數據了。

        這也是為什么一些大 key 的操作會導致整個 Redis 服務響應變慢了。(在不做分片之類的處理情況下)

        小結

        到此為止,普通模式下的 Redis Client 整體邏輯我們已經梳理完了,從創建到發送命令到接收返回值。
        后邊會針對 Redis Cluster 再輸出一篇筆記,一起來看一下在 Cluster 模式下又會有什么不一樣的處理邏輯。

        參考資料

        查看原文

        贊 1 收藏 1 評論 0

        賈順名 發布了文章 · 2020-12-01

        grpc-node 源碼閱讀筆記[0]

        簡單介紹 gRPC

        貼一張掛在官網的圖片:https://grpc.io/docs/what-is-...

        image

        可以理解 gRPC 是 RPC(遠程過程調用)框架的一種實現,關于 RPC 的介紹因為并不是本次的主題,所以放個鏈接來幫助大家理解:https://www.zhihu.com/questio...

        我所理解 RPC 整個執行的過程就是 Client 調用方法 -> 序列化請求參數 -> 傳輸數據 -> 反序列化請求參數 -> Server 處理請求 -> 序列化返回數據 -> 傳輸數據 -> Client 接收到方法返回值:

        image

        其主要邏輯會集中在 數據的序列化/反序列化 以及 數據的傳輸上,而這兩項 gRPC 分別選用了 Protocol BuffersHTTP2 來作為默認選項。

        gRPC 在 Node.js 的實現

        gRPC 在 Node.js 的實現上一共有兩個官方版本,一個是基于 c++ addon 的版本,另一個是純 JS 實現的版本。

        gRPC 在 Node.js 中相關的模塊

        除了上邊提到的兩個 gRPC 的實現,在 Node.js 中還存在一些其他的模塊用來輔助使用 gRPC。

        • grpc-tools 這個是每個語言都會用的,用來根據 proto 文件生成對應,插件提供了 Node.js 語言的實現
        • proto-loader 用來動態加載 proto 文件,不需要使用 grpc_tools 提前生成代碼(性能比上邊的方式稍差)

        這次筆記主要是針對 grpc-node 方式的實現,在 c++ addon 模塊的實現下,并不是一個 gRPC 的完整實現,做的事情更多的是一個銜接的工作,通過 JS、c++ 兩層封裝將 c++ 版本的 gRPC 能力暴露出來供用戶使用。

        之所以選擇它是因為覺得邏輯會較 grpc-js 清晰一些,更適合理解 gRPC 整體的運行邏輯。

        在項目倉庫中,兩個目錄下是我們需要關注的:

        • src(JS 代碼)
        • ext(c++ 代碼)

        ext 中的代碼主要用于調用 c++ 版本 gRPC 的接口,并通過 NAN 提供 c++ addon 模塊。
        src 中的代碼則是調用了 ext 編譯后的模塊,并進行一層應用上的封裝。
        而作為使用 gRPC 的用戶就是引用的 src 下的文件了。

        我們先通過官方的 hello world 示例來說明我們是如何使用 gRPC 的,因為 gRPC 默認的數據序列化方式采用的 protobuf,所以首先我們需要有一個 proto 文件,然后通過 gRPC 提供的文件來生成對應的代碼,生成出來的文件包含了 proto 中所定義的 service、method、message 等各種結構的定義,并能夠讓我們用比較熟悉的方式去使用。

        示例中的 proto 文件:

        package helloworld;
        
        // The greeting service definition.
        service Greeter {
          // Sends a greeting
          rpc SayHello (HelloRequest) returns (HelloReply) {}
        }
        
        // The request message containing the user's name.
        message HelloRequest {
          string name = 1;
        }
        
        // The response message containing the greetings
        message HelloReply {
          string message = 1;
        }

        grpc_tools 是用來生成 proto 對應代碼的,這個命令行工具提供了多種語言的生成版本。
        在 Node 中,會生成兩個文件,一般命名規則為 xxx_pb.js、xxx_grpc_pb.js,xxx_pb.js 是 proto 中各種 service、method 以及 message 的結構描述及如何使用的接口定義,而 xxx_grpc_pb.js 主要則是針對 xxx_pb.js 的一個整合,按照 proto 文件中定義的結構生成對應的代碼,在用戶使用的時候,使用前者多半用于構造消息結構,使用后者則是方法的調用。

        生成后的關鍵代碼(XXX_grpc_pb.js):

        const grpc = require('@grpc/grpc');
        const helloworld_pb = require('./helloworld_pb.js');
        
        function serialize_helloworld_HelloReply(arg) {
          if (!(arg instanceof helloworld_pb.HelloReply)) {
            throw new Error('Expected argument of type helloworld.HelloReply');
          }
          return Buffer.from(arg.serializeBinary());
        }
        
        function deserialize_helloworld_HelloReply(buffer_arg) {
          return helloworld_pb.HelloReply.deserializeBinary(new Uint8Array(buffer_arg));
        }
        
        function serialize_helloworld_HelloRequest(arg) {
          if (!(arg instanceof helloworld_pb.HelloRequest)) {
            throw new Error('Expected argument of type helloworld.HelloRequest');
          }
          return Buffer.from(arg.serializeBinary());
        }
        
        function deserialize_helloworld_HelloRequest(buffer_arg) {
          return helloworld_pb.HelloRequest.deserializeBinary(new Uint8Array(buffer_arg));
        }
        
        
        // The greeting service definition.
        const GreeterService = exports.GreeterService = {
          // Sends a greeting
        sayHello: {
            path: '/helloworld.Greeter/SayHello',
            requestStream: false,
            responseStream: false,
            requestType: helloworld_pb.HelloRequest,
            responseType: helloworld_pb.HelloReply,
            requestSerialize: serialize_helloworld_HelloRequest,
            requestDeserialize: deserialize_helloworld_HelloRequest,
            responseSerialize: serialize_helloworld_HelloReply,
            responseDeserialize: deserialize_helloworld_HelloReply,
          },
        };
        
        exports.GreeterClient = grpc.makeGenericClientConstructor(GreeterService);

        最終導出的 sayHello 就是我們在 proto 文件中定義的 SayHello 方法,所以我們在作為 Client 的時候使用,就是很簡單的調用 sayHello 就行了:

        const messages = require('./helloworld_pb');
        const services = require('./helloworld_grpc_pb');
        const grpc = require('grpc');
        
        const client = new services.GreeterClient(
          target,
          grpc.credentials.createInsecure()
        );
        
        const request = new messages.HelloRequest();
        
        request.setName('Niko');
        
        client.sayHello(request, function(err, response) {
          console.log('Greeting:', response.getMessage());
        });

        其實真實寫的代碼也就上邊的幾行,實例化了一個 Client,實例化一個 Message 并構建數據,然后通過 client 調用對應的 method 傳入 message,就完成了一個 gRPC 請求的發送。
        在這個過程中,我們直接可見的用到了 grpc-nodecredentials 以及 makeGenericClientConstructor,我們就拿這兩個作為入口,首先從 makeGenericClientConstructor 來說。

        源碼分析

        makeGenericClientConstructor

        在翻看 index.js 文件中可以發現, makeGenericClientConstructor 其實是 client.makeClientConstructor 的一個別名,所以我們需要去查看 src/client.js 中對應函數的定義,就像函數名一樣,它是用來生成一個 Client 的構造函數的,這個構造函數就是我們在上邊示例中的 GreeterClient。
        源碼所在位置: https://github.com/grpc/grpc-...

        當對照著 xxx_grpc_pb.js 與源碼來看時,會發現調用函數只傳入了一個參數,而函數定義卻存在三個參數,這個其實是歷史原因導致的,我們可以直接忽略后邊的兩個參數。

        精簡后的源碼:

        exports.makeClientConstructor = function(methods) {
          function ServiceClient(address, credentials, options) {
            Client.call(this, address, credentials, options);
          }
        
          util.inherits(ServiceClient, Client);
          ServiceClient.prototype.$method_definitions = methods;
          ServiceClient.prototype.$method_names = {};
        
          Object.keys(methods).forEach(name => {
            const attrs = methods[name];
            if (name.indexOf('$') === 0) {
              throw new Error('Method names cannot start with $');
            }
            var method_type = common.getMethodType(attrs);
            var method_func = function() {
              return requester_funcs[method_type].apply(this,
                [ attrs.path, attrs.requestSerialize, attrs.responseDeserialize ]
                .concat([].slice.call(arguments))
              );
            };
            
            ServiceClient.prototype[name] = method_func;
            ServiceClient.prototype.$method_names[attrs.path] = name;
            // Associate all provided attributes with the method
            Object.assign(ServiceClient.prototype[name], attrs);
            if (attrs.originalName) {
              ServiceClient.prototype[attrs.originalName] =
                ServiceClient.prototype[name];
            }
          });
        
          ServiceClient.service = methods;
        
          return ServiceClient;
        };

        methods 參數就是我們上邊文件中生成的對象,包括服務地址、是否使用 stream、以及 請求/返回值 的類型及對應的序列化/反序列化 方式。

        大致的邏輯就是創建一個繼承自 Client 的子類,然后遍歷我們整個 service 來看里邊有多少個 method,并根據 method 不同的傳輸類型來區分使用不同的函數進行數據的傳輸,最后以 method 為 key 放到 Client 子類的原型鏈上。

        common.getMethodType 就是用來區分 method 究竟是什么類型的請求的,目前 gRPC 一共分了四種類型,雙向 Stream、兩個單向 Stream,以及 Unary 模式:

        exports.getMethodType = function(method_definition) {
          if (method_definition.requestStream) {
            if (method_definition.responseStream) {
              return constants.methodTypes.BIDI_STREAMING;
            } else {
              return constants.methodTypes.CLIENT_STREAMING;
            }
          } else {
            if (method_definition.responseStream) {
              return constants.methodTypes.SERVER_STREAMING;
            } else {
              return constants.methodTypes.UNARY;
            }
          }
        };

        在最后幾行有一處判斷 originalName 是否存在的操作,這個是在 proto-loader 中存在的一個邏輯,將 methodName 轉換成純小寫放了進去,單純看注釋的話,這并不是一個長期的解決方案: https://github.com/grpc/grpc-...

        P.S. proto-loader 是 JS 里邊一種動態加載 proto 文件的方式,性能比通過 grpc_tools 預生成代碼的方式要低一些。

        所有的請求方式,都被放在了一個叫做 requester_funcs 的對象中,源碼中的定義是這樣的:

        var requester_funcs = {
          [methodTypes.UNARY]: Client.prototype.makeUnaryRequest,
          [methodTypes.CLIENT_STREAMING]: Client.prototype.makeClientStreamRequest,
          [methodTypes.SERVER_STREAMING]: Client.prototype.makeServerStreamRequest,
          [methodTypes.BIDI_STREAMING]: Client.prototype.makeBidiStreamRequest
        };

        從這里就可以看出,其實是和我們 getMethodType 所對應的四種處理方式。

        最終,將繼承自 Client 的子類返回,完成了整個函數的執行。

        Client

        首先我們需要看看繼承的 Client 構造函數究竟做了什么事情。
        拋開參數類型的檢查,首先是針對攔截器的處理,我們可以通過兩種方式來實現攔截器,一個是提供攔截器的具體函數,這個在所有 method 觸發時都會執行,還有一個可以通過傳入 interceptor_provider 來實現動態的生成攔截器,函數會在初始化 Client 的時候觸發,并要求返回一個新的 interceptor 對象用于執行攔截器的邏輯。

        interceptor 的用法

        // interceptors 用法
        const interceptor = function(options, nextCall) {
          console.log('trigger')
          return new InterceptingCall(nextCall(options));
        }
        const client = new services.GreeterClient(
          target,
          grpc.credentials.createInsecure(),
          {
            interceptors: [interceptor]
          }
        );
        
        // interceptor_providers 用法
        const interceptor = function(options, nextCall) {
          console.log('trigger')
          return new InterceptingCall(nextCall(options));
        }
        
        const interceptorProvider = (methodDefinition) => {
          console.log('call interceptorProvider', methodDefinition)
          return interceptor
        }
        
        const client = new services.GreeterClient(
          target,
          grpc.credentials.createInsecure(),
          {
            interceptor_providers: [interceptorProvider]
          }
        );
        P.S. 需要注意的是,如果傳入 interceptor_providers,則會在兩個地方觸發調用,一個是實例化 Client 的時候,還有一個是在 method 真實調用的時候,每次調用都會觸發,所以如果要復用 interceptor,最好在函數之外構建出函數體

        但是這樣的攔截器其實是沒有太多意義的,我們不能夠針對 metadata、message 來做自己的修改,如果我們觀察 InterceptingCall 的具體函數簽名,會發現它支持兩個參數的傳入。

        function InterceptingCall(next_call, requester) {
          this.next_call = next_call;
          this.requester = requester;
        }

        上邊示例只介紹了第一個參數,這個參數預期接受一個對象,對象會提供多個方法,我們可以通過console.log(nextCall(options).constructor.prototype)來查看都有哪些,例如 sendMessage、start 之類的。
        而觀察這些函數的實現,會發現他們都調用了一個 _callNext。

        InterceptingCall.prototype.sendMessage = function(message) {
          this._callNext('sendMessage', [message]);
        };
        
        InterceptingCall.prototype.halfClose = function() {
          this._callNext('halfClose');
        };
        
        InterceptingCall.prototype.cancel = function() {
          this._callNext('cancel');
        };
        
        InterceptingCall.prototype._callNext = function(method_name, args, next) {
          var args_array = args || [];
          var next_call = next ? next : this._getNextCall(method_name);
          if (this.requester && this.requester[method_name]) {
            // Avoid using expensive `apply` calls
            var num_args = args_array.length;
            switch (num_args) {
              case 0:
                return this.requester[method_name](next_call);
              case 1:
                return this.requester[method_name](args_array[0], next_call);
              case 2:
                return this.requester[method_name](args_array[0], args_array[1],
                                                   next_call);
            }
          } else {
            if (next_call === emptyNext) {
              throw new Error('Interceptor call chain terminated unexpectedly');
            }
            return next_call(args_array[0], args_array[1]);
          }
        };

        _callNext 方法中,我們就可以找到 requester 參數究竟是有什么用了,如果 requester 也有實現對應的 method_name,那么就會先執行 requester 的方法,隨后將 next_call 對應的方法作為調用 requester 方法的最后一個參數傳入。
        在 grpc-node 中,攔截器的執行順序與傳入順序有關,是一個隊列,先傳入的攔截器先執行,如果傳入了第二個參數,則先執行第二個參數對應的方法,后執行第一個參數對應的方法。

        所以如果我們想做一些額外的事情,比如說針對 metadata 添加一個我們想要的字段,那么就可以這么來寫攔截器:

        var interceptor = function(options, nextCall) {
          return new InterceptingCall(nextCall(options), {
            start: function(metadata, listener, next) {
              next(metadata, {
                onReceiveMetadata: function (metadata, next) {
                  metadata.set('xxx', 'xxx')
                  next(metadata);
                },
              });
             },
          });
        };
        稍微特殊的地方是,start函數的next參數被調用時傳入的第二個參數并不是一個InterceptingCall的實例,而是一個InterceptingListener的實例,兩者都有_callNext的實現,只不過所提供的方法不完全一樣罷了。

        Channel 的創建

        接下來的代碼邏輯主要是用于創建 Channel,可以通過傳遞不同的參數來覆蓋 Channel,也可以用默認的 Channel,這個 Channel 對應的 gRPC 中其實就是做數據傳輸的那一個模塊,可以理解為 HTTP2 最終是在這里使用的。
        一般很少會去覆蓋默認的 Channel,所以我們直接去看 grpc-node 里邊的 Channel 是如何實現的。

        Channel 是 c++ 代碼實現的,代碼的位置: https://github.com/grpc/grpc-...

        如果有同學嘗試過混用 grpc-nodegrpc-js,那么你一定有看到過這個報錯:Channel's second argument (credentials) must be a ChannelCredentials
        原因就在于 Channel 實例化過程中會進行檢查我們創建 Channel 傳入的 credential 是否是繼承自 grpc 中的 ChannelCredentials 類。
        grpc-nodegrpc-js 用的是兩個不同的類,所以混用的話可能會出現這個問題。

        然后就是根據傳入的 credential 的不同來判斷是否要使用加密,而一般常用的 grpc.credentials.createInsecure() 其實就是不走加密的意思了,我們可以在 https://github.com/grpc/grpc-...https://github.com/grpc/grpc-... 來看到對應的邏輯。

        后邊就是調用 c++ 版本的 grpc 來構建對應的 Channel 了,如果有老鐵看過 c++ 版本是如何創建 grpc Client 的,那么這些代碼就比較熟悉了: https://github.com/grpc/grpc/...
        grpc-node 中也是調用的同樣的 API 來創建的。

        makeUnaryRequest

        Client 被創建出來后,我們會調用 Client 上的方法(也就是發請求了),這時候就會觸發到上邊提到的 requester_funcs 其中的一個,我們先從最簡單的 Unary 來說,這種 Client/Server 都是 Unary 請求方式時會觸發的函數。
        我們通過上邊 method_func 中調用方式可以確定傳遞了什么參數進去,有幾個固定的參數 path、request 序列化方式,以及 response 的反序列化方式。
        后邊的參數就是由調用時傳入的動態參數了,這些可以在 makeUnaryRequest 函數定義中看到,分別是 argument(也就是 request body)、metadata(可以理解為 header,一些元數據)、options 是一個可選的參數(自定義的攔截器是放在這里的),可以用于覆蓋 method 的一些描述信息,以及最后的 callback 就是我們接收到 response 后應該做的操作了。

        整個函數的實現,按長度來說,有一半都是在處理參數,而剩下的部分則做了兩件事,一個是實例化了 ClientUnaryCall 對象,另一個則是處理攔截器相關的邏輯,并啟動攔截器來發送整個請求。
        makeUnaryRequest 函數中涉及到攔截器的部分有這么幾塊 resolveInterceptorProviders、getLastListenergetInterceptingCall。

        ClientUnaryCall

        先來看 ClientUnaryCall 做了什么事情,在源碼中有這樣的一個代碼塊,是使用該對象的場景:

        function ClientUnaryCall(call) {
          EventEmitter.call(this);
          this.call = call;
        }
        
        var callProperties = {
          argument: argument,
          metadata: metadata,
          call: new ClientUnaryCall(),
          channel: this.$channel,
          methodDefinition: method_definition,
          callOptions: options,
          callback: callback
        };
        
        // 以及后續與攔截器產生了一些關聯
        var emitter = callProperties.call;
        // 這行代碼很詭異,看起來是可以在實例化的時候傳入的,卻選擇了在這里覆蓋屬性值
        emitter.call = intercepting_call;
        
        var last_listener = client_interceptors.getLastListener(
          methodDefinition,
          emitter,
          callProperties.callback
        );

        關于 ClientUnaryCall 的定義也非常簡單,其實是一個繼承自 EventEmitter 的子類,增加了一個 call 屬性的定義,以及兩個方法封裝調用了 call 屬性對應的一些方法。

        強烈懷疑 這部分代碼是后期有過調整,因為 ClientUnaryCall 構造函數的實現中是可以接受一個參數作為 call 屬性的賦值的,然而在代碼應用中選擇了后續覆蓋 call 屬性,而非直接在實例化的時候傳入進去

        resolveInterceptorProviders

        resolveInterceptorProviders 是用來處理用戶傳入的攔截器的,這個函數在 Client 的整個生命周期會有兩處調用,一個是在上邊 Client 實例化的過程中會觸發一次,再有就是每次 method 被調用之前,會重新觸發該函數。
        resolveInterceptorProviders 的邏輯很簡單,就是遍歷我們傳入的 interceptor_provider 并將對應 method 的信息描述傳入并執行,得到 provider 返回的 interceptor 用作攔截器。
        Client 實例化過程中是會遍歷所有的 method 來執行,而在具體的 method 觸發時則只觸發當前 method 相關的 provider 邏輯。

        getLastListener

        getLastListener 按照注釋中的描述,是為了獲得一個最后會觸發的監聽者,源碼大致是這樣的:
        https://github.com/grpc/grpc-...

        var listenerGenerators = {
          [methodTypes.UNARY]: _getUnaryListener,
          [methodTypes.CLIENT_STREAMING]: _getClientStreamingListener,
          [methodTypes.SERVER_STREAMING]: _getServerStreamingListener,
          [methodTypes.BIDI_STREAMING]: _getBidiStreamingListener
        };
        
        function getLastListener(method_definition, emitter, callback) {
          if (emitter instanceof Function) {
            callback = emitter;
            callback = function() {};
          }
          if (!(callback instanceof Function)) {
            callback = function() {};
          }
          if (!((emitter instanceof EventEmitter) &&
               (callback instanceof Function))) {
            throw new Error('Argument mismatch in getLastListener');
          }
          var method_type = common.getMethodType(method_definition);
          var generator = listenerGenerators[method_type];
          return generator(method_definition, emitter, callback);
        }

        同樣也使用了一個枚舉來區分不同的方法類型來調用不同的函數來生成對應的 listener。

        比如這里用到的 getUnaryListener,是這樣的一個邏輯:

        function _getUnaryListener(method_definition, emitter, callback) {
          var resultMessage;
          return {
            onReceiveMetadata: function (metadata) {
              emitter.emit('metadata', metadata);
            },
            onReceiveMessage: function (message) {
              resultMessage = message;
            },
            onReceiveStatus: function (status) {
              if (status.code !== constants.status.OK) {
                var error = common.createStatusError(status);
                callback(error);
              } else {
                callback(null, resultMessage);
              }
              emitter.emit('status', status);
            }
          };
        }

        代碼也算比較清晰,在不同的階段會觸發不同的事件,然后再真正返回結果以后,觸發 callback 來告知用戶請求響應。
        也就是我們在示例中調用 sayHello 時傳入的 callback 被調用的地方了。

        getInterceptingCall

        getInterceptingCall 函數的調用會返回一個實例,通過操作該實例我們可以控制請求的開始、數據的發送以及請求的結束。
        我們上邊 getLastListener 返回的對象觸發的時機也是會在這里可以找到的。

        從源碼上來看會涉及到這么幾個函數:

        var interceptorGenerators = {
          [methodTypes.UNARY]: _getUnaryInterceptor,
          [methodTypes.CLIENT_STREAMING]: _getClientStreamingInterceptor,
          [methodTypes.SERVER_STREAMING]: _getServerStreamingInterceptor,
          [methodTypes.BIDI_STREAMING]: _getBidiStreamingInterceptor
        };
        
        function getInterceptingCall(method_definition, options,
                                     interceptors, channel, responder) {
          var last_interceptor = _getLastInterceptor(method_definition, channel,
                                                    responder);
          var all_interceptors = interceptors.concat(last_interceptor);
          return _buildChain(all_interceptors, options);
        }
        
        function _getLastInterceptor(method_definition, channel, responder) {
          var callback = (responder instanceof Function) ? responder : function() {};
          var emitter = (responder instanceof EventEmitter) ? responder :
                                                              new EventEmitter();
          var method_type = common.getMethodType(method_definition);
          var generator = interceptorGenerators[method_type];
          return generator(method_definition, channel, emitter, callback);
        }
        
        function _buildChain(interceptors, options) {
          var next = function(interceptors) {
            if (interceptors.length === 0) {
              return function (options) {};
            }
            var head_interceptor = interceptors[0];
            var rest_interceptors = interceptors.slice(1);
            return function (options) {
              return head_interceptor(options, next(rest_interceptors));
            };
          };
          var chain = next(interceptors)(options);
          return new InterceptingCall(chain);
        }
        _getUnaryInterceptor 由于篇幅較長,直接貼 GitHub 鏈接了:https://github.com/grpc/grpc-...

        大致的邏輯就是我們通過 method_definition、channel 等參數來獲取到一個 interceptor,并將其拼接到原有的 interceptor 后邊,作為最后執行的攔截器, _buildChain 函數比較簡單,就是實現了一個鏈式調用的函數,用來按順序執行攔截器。

        關于 interceptor 如何使用可以看我們介紹 interceptor 用法時寫的 demo

        主要的邏輯實際上在 _getUnaryInterceptor 中,我們會創建一個功能全面的 interceptor,函數會返回一個匿名函數,就是我們在上邊代碼中看到的調用 generator 的地方了,而在匿名函數的開頭部門,我們就調用了 getCall 來獲取一個 call 對象,這個 call 對象就是我們與 gRPC 服務器之間的通道了,請求最終是由 call 對象負責發送的。

        getCall 中實際上調用了 channel 對象的 createCall 方法,這部分的邏輯也是在 c++ 中做的了,包含數據的發送之類的邏輯。

        這是我們回到 makeUnaryRequest 函數,再看函數結束的地方調用的那三個方法,第一個 start,將我們的 metadata(可以理解為 header) 發送了過去,然后將真實的信息發送了過去,最后調用關閉方法。

        我們可以在 _getUnaryInterceptor 中的 start、sendMessage 以及 halfClose 函數中都有調用 _startBatchIfReady 函數,而這個方法實際上就是調用的 channel 上的 startBatch 方法,再根據調用鏈查找,最終會看到處理邏輯在這里:https://github.com/grpc/grpc/...
        opType 與 代碼中 switch-case 中的對應關系在這里: https://github.com/grpc/grpc-...

        首先在 start 里邊主要是發送了 metadata,并且嘗試接受服務端返回過來的 metadata,并在回調中觸發我們傳入的 listeneronReceiveMetadata 方法。
        然后檢查 response 的狀態是否正確,并觸發 listeneronReceiveStatus 方法。

        接下來是調用 sendMessage 方法,在這里我們將消息體進行序列化,并發送,在回調中就會去調用我們傳入的 callback。

        最后在 halfClose 方法中其實就是發送一個指令來設置請求的結束。

        整個的流程細化以后大概是這個樣子的:

        image

        小結

        上邊整體的記錄就是關于 Client 這一側是如何實現的了。
        主要涉及到 Client 的構建、發送請求時做的事情、攔截器的作用。
        而更深入的一些邏輯其實是在 c++ 版本的 gRPC 庫里所實現,所以本次筆記并沒有過多的涉及。

        文章涉及到的部分示例代碼倉庫地址:https://github.com/Jiasm/grpc...
        查看原文

        贊 1 收藏 1 評論 0

        賈順名 回答了問題 · 2020-10-23

        解決Nodejs 有沒有什么辦法可以快速比較文件夾是否發生變化,以及變化的文件?

        遞歸判斷文件獲取修改時間吧。
        https://nodejs.org/dist/lates...

        關注 2 回答 2

        賈順名 發布了文章 · 2020-08-29

        PM2 源碼分析

        近期有需求需要了解 PM2 一些功能的實現方式,所以趁勢看了一下 PM2 的源碼,也算是用了這么多年的 PM2,第一次進入內部進行一些探索。
        PM2 是一個 基于 node.js 的進程管理工具,本身 node.js 是一個單進程的語言,但是 PM2 可以實現多進程的運行及管理(當然還是基于 node 的 API),還提供程序系統信息的展示,包括 內存、CPU 等數據。

        PM2 的核心功能概覽

        源碼位置
        官方網站

        PM2 的功能、插件非常的豐富,但比較核心的功能其實不多:

        1. 多進程管理
        2. 系統信息監控
        3. 日志管理

        其他的一些功能就都是基于 PM2 之上的輔助功能了。

        項目結構

        PM2 的項目結構算是比較簡潔的了,主要的源碼都在 lib 目錄下, God 目錄為核心功能多進程管理的實現,以及 API 目錄則是提供了各種能力,包括 日志管理、面板查看系統信息以及各種輔助功能,最后就是 Sysinfo 目錄下關于如何采集系統信息的實現了。

        # 刪除了多個不相干的文件、文件夾
        lib
        ├── API     # 日志管理、GUI 等輔助功能
        ├── God     # 多進程管理邏輯實現位置
        └── Sysinfo # 系統信息采集

        幾個比較關鍵的文件作用:

        • Daemon.js

          • 守護進程的主要邏輯實現,包括 rpc server,以及各種守護進程的能力
        • God.js

          • 業務進程的包裹層,負責與守護進程建立連接,以及注入一些操作,我們編寫的代碼最終是由這里執行的
        • Client.js

          • 執行 PM2 命令的主要邏輯實現,包括與守護進程建立 rpc 連接,以及各種請求守護進程的操作
        • API.js

          • 各種功能性的實現,包括啟動、關閉項目、展示列表、展示系統信息等操作,會調用 Client 的各種函數
        • binaries/CLI.js

          • 執行 pm2 命令時候觸發的入口文件

        守護進程與 Client 進程通訊方式

        看源碼后會知道,PM2 與 Client 進程(也就是我們 pm2 start XXX 時對應的進程),是通過 RPC 進行通訊的,這樣就能保證所有的 Client 進程可以與守護進程進行通訊,上報一些信息,以及從守護進程層面執行一些操作。

        PM2 啟動程序的方式

        PM2 并不是簡單的使用 node XXX 來啟動我們的程序,就像前邊所提到了守護進程與 Client 進程的通訊方式,Client 進程會將啟動業務進程所需要的配置,通過 rpc 傳遞給守護進程,由守護進程去啟動程序。
        這樣,在 PM2 start 命令執行完成以后業務進程也在后臺運行起來了,然后等到我們后續想再針對業務進程進行一些操作的時候,就可以通過列表查看對應的 pid、name 來進行對應的操作,同樣是通過 Client 觸發 rpc 請求到守護進程,實現邏輯。

        當然,我們其實很少會有單獨啟動守護進程的操作,守護進程的啟動其實被寫在了 Client 啟動的邏輯中,在 Client 啟動的時候會檢查是否有存活的守護進程,如果沒有的話,會嘗試啟動一個新的守護進程用于后續的使用。
        具體方式就是通過 spawn + detached: true 來實現的,創建一個單獨的進程,這樣即便是我們的 Client 作為父進程退出了,守護進程依然是可以獨立運行在后臺的。

        P.S. 在使用 PM2 的時候應該有時也會看到有些這樣的輸出,這個其實就是 Client 運行時監測到守護進程還沒有啟動,主動啟動了守護進程:

        > [PM2] Spawning PM2 daemon with pm2_home=/Users/jiashunming/.pm2
        > [PM2] PM2 Successfully daemonized

        image

        多進程管理

        一般使用 PM2 實現多進程管理主要的目的是為了能夠讓我們的 node 程序可以運行在多核 CPU 上,比如四核機器,我們就希望能夠存在四個進程在運行,以便更高效的支持服務。
        在進程管理上,PM2 提供了一個大家經常會用到的參數: exec_mode,它的取值只有兩個,clusterfork,fork 是一個比較常規的模式,相當于就是執行了多次的 node XXX.js。
        但是這樣去運行 node 程序就會有一個問題,如果是一個 HTTP 服務的話,很容易就會出現端口沖突的問題:

        const http = require('http')
        
        http.createServer(() => {}).listen(8000)

        比如我們有這樣的一個 PM2 配置文件,那么執行的時候你就會發現,報錯了,提示端口沖突:

        module.exports = {
          apps: [
            {
              // 設置啟動實例個數
              "instances": 2,
              // 設置運行模式
              "exec_mode": "fork",
              // 入口文件
              "script": "./test-create-server.js"
            }
          ]
        }

        這是因為在 PM2 的實現中, fork 模式下就是簡單的通過 spawn 執行入口文件罷了。

        實現位置:lib/God/ForkMode.js

        而當我們把 exec_mode 改為 cluster 之后,你會發現程序可以正常運行了,并不會出現端口占用的錯誤。
        這是因為 PM2 使用了 node 官方提供的 cluster 模塊來運行程序。

        cluster 是一個 master-slave 模型的運行方式(_最近 ms 這個說法貌似變得不政治正確了。。_),首先需要有一個 master 進程來負責創建一些工作進程,或者叫做 worker 吧。
        然后在 worker 進程中執行 createServer 監聽對應的端口號即可。

        const http = require('http')
        const cluster = require('cluster')
        
        if (cluster.isMaster) {
          let limit = 2
          while (limit--) {
            cluster.fork()
          }
        } else {
          http.createServer((req, res) => {
            res.write(String(process.pid))
            res.end()
          }).listen(8000)
        }

        詳情可以參考 node.js 中 TCP 模塊關于 listen 的實現:lib/net.js
        在內部實現邏輯大致為, master 進程負責監聽端口號,并通過 round_robin 算法來進行請求的分發,master 進程與 worker 進程之間會通過基于 EventEmitter 的消息進行通訊。

        具體的邏輯實現都在這里 lib/internal/cluster 因為是 node 的邏輯,并不是 PM2 的邏輯,所以就不太多說了。

        然后回到 PM2 關于 cluster 的實現,其實是設置了 N 多的默認參數,然后添加了一些與進程之間的 ipc 通訊邏輯,在進程啟動成功、出現異常等特殊情況時,進行對應的操作。
        因為前邊也提到了,PM2 是由守護進程維護管理所有的業務進程的,所以守護進程會維護與所有服務的連接。
        process 對象是繼承自 EventEmitter 的,所以我們只是監聽了一些特定的事件,包括 uncaughtException、unhandledRejection 等。
        在進程重啟的實現方式中,就是由子進程監聽到異常事件,向守護進程發送異常日志的信息,然后發送 disconnect 表示進程即將退出,最后觸發自身的 exit 函數終止掉進程。
        同時守護進程在接收到消息以后,也會重新創建新的進程,從而完成了進程自動重啟的邏輯。

        實現業務進程的主要邏輯在 lib/ProcessContainer 中,它是我們實際代碼執行的載體。

        系統信息監控

        系統信息監控這塊,在看源碼之前以為是用什么 addon 來做的,或者是某些黑科技。
        但是真的循著源碼看下去,發現了就是用了 pidusage 這個包來做的- -
        只關心 unix 系統的話,內部實際上就是ps -p XXX這么一個簡單的命令。

        至于在使用 pm2 monit、pm2 ls --watch 命令時,實際上就是定時器在循環調用上述的獲取系統信息方法了。

        具體實現邏輯:
        getMonitorData
        dashboard
        list

        后邊就是如何使用基于終端的 UI 庫展現數據的邏輯了。

        日志管理

        日志在 PM2 中的實現分了兩塊。
        一個是業務進程的日志、還有一個是 PM2 守護進程自身的日志。

        守護進程的日志實現方式是通過 hack 了 console 相關 API 實現的,在原有的輸出邏輯基礎上添加了一個基于 axon 的消息傳遞,是一個 pub/sub 模型的,主要是用于 Client 獲得日志,例如 pm2 attach、pm2 dashboard 等命令。
        業務進程的日志實現方式則是通過覆蓋了 process.stdout、process.stderr 對象上的方法(console API 基于它實現),在接收到日志以后會寫入文件,同時調用 process.send 將日志進行轉發,而守護進程監聽對應的數據,也會使用上述守護進程創建的 socket 服務將日志數據進行轉發,這樣業務進程與守護進程就有了統一的可以獲取的位置,通過 Client 就可以建立 socket 連接來實現日志的輸出了。

        hack console 的位置:lib/Utility.js
        hack stdout/stderr write 的位置:lib/Utility.js
        創建文件可寫流用于子進程寫入文件:lib/Utility.js
        子進程接收到輸出后寫入文件并發送消息到守護進程:lib/ProcessContainer.js
        守護進程監聽子進程消息并轉發:lib/God/ClusterMode.js
        守護進程將事件通過 socket 廣播:lib/Daemon.js
        Client 讀取并展示日志:lib/API/Extra.js

        image

        查看日志的流程中有一個小細節,就是業務日志, PM2 會先去讀取文件最后的幾行進行展示,然后才是依據 socket 服務返回的數據進行刷新終端展示數據。

        后記

        PM2 比較核心的也就是這幾塊了,因為通過 Client 可以與守護進程進行交互,而守護進程與業務進程之間也存在著聯系,可以執行一些操作。
        所以我們就可以很方便的對業務進程進行管理,剩下的邏輯基本就是基于這之上的一些輔助功能,以及還有就是 UI 展示上的邏輯處理了。

        PM2 是一個純 JavaScript 編寫的工具,在第一次看的時候還是會覺得略顯復雜,到處繞來繞去的比較暈,我推薦的一個閱讀源碼的方式是,通過找一些入口文件來下手,可以采用 調試 or 加日志的方式,一步步的來看代碼的執行順序。
        最終就會有一個較為清晰的概念。

        查看原文

        贊 4 收藏 3 評論 0

        賈順名 回答了問題 · 2020-08-26

        解決nodejs request的post報錯

        https://developers.weixin.qq.com/doc/offiaccount/Account_Management/Generating_a_Parametric_QR_Code.html

        你要用 JSON 結構的 body 就不要寫 QR_STR_SCENE,仔細看看文檔。

        關注 2 回答 2

        賈順名 回答了問題 · 2020-08-20

        解決nodejs從a文件調b文件方法,b文件方法調a內方法失???

        不要這么互相調用就好了,互相調用的時候其中一個模塊還沒有初始化好,可以嘗試抽出來函數,將調用流程改成單向的。

        關注 2 回答 2

        賈順名 發布了文章 · 2020-08-08

        如何寫一個簡單的node.js c++擴展

        node 是由 c++ 編寫的,核心的 node 模塊也都是由 c++ 代碼來實現,所以同樣 node 也開放了讓使用者編寫 c++ 擴展來實現一些操作的窗口。
        如果大家對于 require 函數的描述還有印象的話,就會記得如果不寫文件后綴,它是有一個特定的匹配規則的:
        LOAD_AS_FILE(X)
        1. If X is a file, load X as its file extension format. STOP
        2. If X.js is a file, load X.js as JavaScript text. STOP
        3. If X.json is a file, parse X.json to a JavaScript Object. STOP
        4. If X.node is a file, load X.node as binary addon. STOP

        可以看到,最后會匹配一個 .node,而后邊的描述也表示該后綴的文件為一個二進制的資源。
        而這個 .node 文件一般就會是我們所編譯好的 c++ 擴展了。

        為什么要寫 c++ 擴展

        可以簡單理解為,如果想基于 node 寫一些代碼,做一些事情,那么有這么幾種選擇:

        1. 寫一段 JS 代碼,然后 require 執行
        2. 寫一段 c++ 代碼,編譯后 require 執行
        3. 打開 node 源碼,把你想要的代碼寫進去,然后重新編譯

        日常的開發其實只用第一項就夠了,我們用自己熟悉的語言,寫一段熟悉的代碼,然后發布在 NPM 之類的平臺上,其他有相同需求的人就可以下載我們上傳的包,然后在TA的項目中使用。
        但有的時候可能純粹寫 JS 滿足不了我們的需求,也許是工期趕不上,也許是執行效率不讓人滿意,也有可能是語言限制。
        所以我們會采用直接編寫一些 c++ 代碼,來創建一個 c++ 擴展讓 node 來加載并執行。
        況且如果已經有了 c++ 版本的輪子,我們通過擴展的方式來調用執行而不是自己從頭實現一套,也是避免重復造輪子的方法。

        一個簡單的例子,如果大家接觸過 webpack 并且用過 sass 的話,那么在安裝的過程中很可能會遇到各種各樣的報錯問題,也許會看到 gyp 的關鍵字,其實原因就是 sass 內部有使用一些 c++ 擴展來輔助完成一些操作,而 gyp 就是用來編譯 c++ 擴展的一種工具。

        image

        https://github.com/sass/node-sass

        當然,上邊也提到了還有第三種操作方法,我們可以直接魔改 node 源碼,但是如果你只是想要寫一些原生 JS 實現起來沒有那么美好的模塊,那么是沒有必要去魔改源碼的,畢竟改完了以后還要編譯,如果其他人需要用你的邏輯,還需要安裝你所編譯好的特殊版本。
        這樣的操作時很不易于傳播的,大家不會想使用 sass 就需要安裝一個 sass 版本的 node 吧。
        就像為了看星戰還要專門下載一個優酷- -。

        簡單總結一下,寫 c++ 的擴展大概有這么幾個好處:

        1. 可以復用 node 的模塊管理機制
        2. 有比 JS 更高效的執行效率
        3. 有更多的 c++ 版本的輪子可以拿來用

        怎么去寫一個簡單的擴展

        node 從問世到現在已經走過了 11 年,通過早期的資料、博客等各種信息渠道可以看到之前開發一個 c++ 擴展并不是很容易,但經過了這么些年迭代,各種大佬們的努力,我們再去編寫一個 c++ 擴展已經是比較輕松的事情了。
        這里直入正題,放出今天比較關鍵的一個工具:node-addon-api module
        以及這里是官方提供的各種簡單 demo 來讓大家熟悉這是一個什么樣的工具: node-addon-examples

        需要注意的一點是, demo 目錄下會分為三個子目錄,在 readme 中也有寫,分別是三種不同的 c++ 擴展的寫法(基于不同的工具)。
        我們本次介紹的是在 node-addon-api 目錄下的,算是三種里邊最為易用的一種了。

        首先是我們比較熟悉的 package.json 文件,我們需要依賴兩個組件來完成開發,分別是 bindingsnode-addon-api。

        然后我們還需要簡單了解一下 gyp 的用法,因為編譯一個 c++ 擴展需要用到它。
        就像 helloworld 示例中的 binding.gyp 文件示例:

        {
          "targets": [
            {
              // 導出的文件名
              "target_name": "hello",
              // 編譯標識的定義 禁用異常機制(注意感嘆號表示排除過濾)
              "cflags!": [ "-fno-exceptions" ],
              // c++ 編譯標識的定義 禁用異常機制(注意感嘆號表示排除過濾,也就是 c++ 編譯器會去除該標識)
              "cflags_cc!": [ "-fno-exceptions" ],
              // 源碼入口文件
              "sources": [ "hello.cc" ],
              // 源碼包含的目錄
              "include_dirs": [
                // 這里表示一段 shell 的運行,用來獲取 node-addon-api 的一些參數,有興趣的老鐵可以自行 node -p "require('node-addon-api').include" 來看效果
                "<!@(node -p \"require('node-addon-api').include\")"
              ],
              // 環境變量的定義
              'defines': [ 'NAPI_DISABLE_CPP_EXCEPTIONS' ],
            }
          ]
        }

        gyp 的語法挺多的,這次并不是單獨針對 gyp 的一次記錄,所以就不過多的介紹。

        從最簡單的數字相加來實現

        然后我們來實現一個簡單的創建一個函數,讓兩個參數相加,并返回結果。

        源碼位置:https://github.com/Jiasm/node...

        我們需要這樣的一個 binding.gyp 文件:

        {
          "targets": [
            {
              "target_name": "add",
              "cflags!": [ "-fno-exceptions" ],
              "cflags_cc!": [ "-fno-exceptions" ],
              "sources": [ "add.cc" ],
              "include_dirs": [
                "<!@(node -p \"require('node-addon-api').include\")"
              ],
              'defines': [ 'NAPI_DISABLE_CPP_EXCEPTIONS' ],
            }
          ]
        }

        然后我們在項目根目錄創建 package.json 文件,并安裝 bindingsnode-addon-api 兩個依賴。

        接下來就是去編寫我們的 c++ 代碼了:

        #include <napi.h>
        
        // 定義 Add 函數
        Napi::Value Add(const Napi::CallbackInfo& info) {
          Napi::Env env = info.Env();
        
          // 接收第一個參數
          double arg0 = info[0].As<Napi::Number>().DoubleValue();
          // 接收第二個參數
          double arg1 = info[1].As<Napi::Number>().DoubleValue();
          // 將兩個參數相加并返回
          Napi::Number num = Napi::Number::New(env, arg0 + arg1);
        
          return num;
        }
        
        // 入口函數,用于注冊我們的函數、對象等等
        Napi::Object Init(Napi::Env env, Napi::Object exports) {
          // 將一個名為 add 的函數掛載到 exports 上
          exports.Set(Napi::String::New(env, "add"), Napi::Function::New(env, Add));
          return exports;
        }
        
        // 固定的宏使用
        NODE_API_MODULE(addon, Init)

        在 c++ 代碼完成以后就是需要用到 node-gyp 的時候了,建議全局安裝 node-gyp,避免一個項目中出現多個 node_modules 目錄的時候使用 npx 會出現一些不可預料的問題:

        > npm i -g node-gyp
        # 生成構建文件
        > node-gyp configure
        # 構建
        > node-gyp build

        這時候你會發現項目目錄下已經生成了一個名為 add.node 的文件,就是我們在 binding.gyp 里邊的 target_name 所設置的值了。
        最后我們就是要寫一段 JS 代碼來調用所生成的 .node 文件了:

        const { add } = require('bindings')('add.node')
        
        console.log(add(1, 2))     // 3
        console.log(add(0.1, 0.2)) // 熟悉的 0.3XXXXX

        實現一個函數柯里化

        接下來我們來整點好玩的,實現一個前端的高頻考題,如何實現一個函數柯里化,定義如下:

        add(1)(2)(3) // => 6
        add(1, 2, 3) // => 6
        源碼位置:https://github.com/Jiasm/node...

        我們會用到的一些技術點:

        • 如何在 c++ 函數中返回一個函數供 JS 調用
        • 如何讓返回值既支持函數調用又支持取值操作
        • 如何處理非固定數量的參數(其實這個很簡單了,從上邊也能看出來,本身就是一個數組)

        不再贅述 binding.gyp 與 package.json 的配置,我們直接上 c++ 代碼:

        #include <napi.h>
        
        // 用來覆蓋 valueOf 實現的函數
        Napi::Value GetValue(const Napi::CallbackInfo& info) {
          Napi::Env env = info.Env();
        
          // 獲取我們在創建 valueOf 函數的時候傳入的 result
          double* storageData = reinterpret_cast<double*>(info.Data());
        
          // 避免空指針情況
          if (storageData == NULL) {
            return Napi::Number::New(env, 0);
          } else {
            return Napi::Number::New(env, *storageData);
          }
        
        }
        
        Napi::Function CurryAdd(const Napi::CallbackInfo& info) {
          Napi::Env env = info.Env();
        
          // 獲取我們下邊在創建 curryAdd 函數的時候傳入的 result
          double* storageData = reinterpret_cast<double*>(info.Data());
        
          double* result = new double;
        
          // 遍歷傳入的所有參數
          long len, index;
          for (len = info.Length(), index = 0; index < len; index++) {
            double arg = info[index].As<Napi::Number>().DoubleValue();
        
            *result += arg;
          }
        
          // 用于多次的計算
          if (storageData != NULL) {
            *result += *storageData;
          }
        
          // 創建一個新的函數用于函數的返回值
          Napi::Function fn = Napi::Function::New(env, CurryAdd, "curryAdd", result);
        
          // 篡改 valueOf 方法,用于輸出結果
          fn.Set("valueOf", Napi::Function::New(env, GetValue, "valueOf", result));
        
          return fn;
        }
        
        Napi::Object Init(Napi::Env env, Napi::Object exports) {
          Napi::Function fn = Napi::Function::New(env, CurryAdd, "curryAdd");
        
          exports.Set(Napi::String::New(env, "curryAdd"), fn);
        
          return exports;
        }
        
        NODE_API_MODULE(curryadd, Init)

        編譯完成以后,再寫一段簡單的 JS 代碼來調用驗證結果即可:

        const { curryAdd } = require('bindings')('curry-add');
        
        const fn = curryAdd(1, 2, 3);
        const fn2 = fn(4);
        
        console.log(fn.valueOf())     // => 6
        console.log(fn2.valueOf())    // => 10
        console.log(fn2(5).valueOf()) // => 15

        然后可以講一下上邊列出來的三個技術點是如何解決的:

        • 如何在 c++ 函數中返回一個函數供 JS 調用

          • 通過 Napi::Function::New 創建新的函數,并將計算結果存入函數可以獲取到的地方供下次使用
        • 如何讓返回值既支持函數調用又支持取值操作

          • 通過 fn.Set 篡改 valueOf 函數并返回結果
        • 如何處理非固定數量的參數(其實這個很簡單了,從上邊也能看出來,本身就是一個數組)

          • 通過拿到 infoLength 來遍歷獲取

        與 JS 進行對比

        當然,就例如柯里化之類的函數,拿JS來實現的話會非常簡單,配合 reduce 函數基本上五行以內就可以寫出來。
        那我們折騰這么多究竟是為了什么呢?
        這就要回到開頭所說的優勢了: 執行效率

        采用冒泡排序來對比

        為了證明效率的差異,我們選擇用一個排序算法來驗證,采用了最簡單易懂的冒泡排序來做,首先是 JS 版本的:

        源碼位置:https://github.com/Jiasm/node...
        function bubble (arr) {
          for (let i = 0, len = arr.length; i < len; i++) {
            for (let j = i + 1; j < len; j++) {
              if (arr[i] < arr[j]) {
                [arr[i], arr[j]] = [arr[j], arr[i]]
              }
            }
          }
        
          return arr
        }
        
        bubble([7, 2, 1, 5, 3, 4])

        然后是我們的 c++ 版本,因為是一個 JS 的擴展,所以會涉及到數據類型轉換的問題,大致代碼如下:

        #include <napi.h>
        
        void bubbleSort(double* arr, int len) {
          double temp;
          int i, j;
          for (i = 0; i < len; i++) {
            for (j = i + 1; j < len; j++) {
              if (*(arr + i) < *(arr + j)) {
                temp = *(arr + i);
                *(arr + i) = *(arr + j);
                *(arr + j) = temp;
              }
            }
          }
        }
        
        Napi::Value Add(const Napi::CallbackInfo& info) {
          Napi::Env env = info.Env();
        
          Napi::Array array = info[0].As<Napi::Array>();
        
        
          int len = array.Length(), i;
        
          // 返回值
          Napi::Array arr = Napi::Array::New(env, len);
        
          double* list = new double[len];
        
          // 將 Array 轉換為 c++ 可方便使用的 double 數組
          for (i = 0; i < len; i++) {
            Napi::Value i_v = array[i];
        
            list[i] = i_v.ToNumber().DoubleValue();
          }
        
          // 執行排序
          bubbleSort(list, len);
        
          // 將 double 數組轉換為要傳遞給 JS 的數據類型
          for (i = 0; i < len; i++) {
            arr[i] = Napi::Number::New(env, list[i]);
          }
        
          return arr;
        }
        
        Napi::Object Init(Napi::Env env, Napi::Object exports) {
          exports.Set(Napi::String::New(env, "bubble"), Napi::Function::New(env, Add));
          return exports;
        }
        
        NODE_API_MODULE(bubble, Init)

        然后我們通過一個隨機生成的數組來對比耗時:

        const { bubble } = require('bindings')('bubble.node')
        
        const arr = Array.from(new Array(1e3), () => Math.random() * 1e6 | 0)
        
        console.time('c++')
        const a = bubble(arr)
        console.timeEnd('c++')
        
        function bubbleJS (arr) {
          for (let i = 0, len = arr.length; i < len; i++) {
            for (let j = i + 1; j < len; j++) {
              if (arr[i] < arr[j]) {
                [arr[i], arr[j]] = [arr[j], arr[i]]
              }
            }
          }
        
          return arr
        }
        
        console.time('js')
        bubbleJS(arr)
        console.timeEnd('js')

        1,000 數據量的時候耗時差距大概在 6 倍左右,在 10,000 數據量的時候耗時差距大概在 3 倍左右。
        也是簡單的證實了在相同算法情況下 c++ 效率確實是會比 JS 高一些。

        當然了,也通過上邊的 bubble sort 可以來證實另一個觀點: 有更多的 c++ 版本的輪子可以拿來用
        就比如上邊的 bubbleSort 函數,可能就是一個其他的加密算法實現、SDK 封裝,如果沒有 node 版本,而我們要使用就需要參考它的邏輯重新實現一遍,但如果采用 c++ 擴展的方式,完全可以基于原有的 c++ 函數進行一次簡單的封裝就擁有了一個 node 版本的 函數/SDK。

        后記

        上邊的一些內容就是如何使用 node-addon-api 來快速開發一個 c++ 擴展,以及如何使用 node-gyp 進行編譯,還有最后的如何使用 JS 調用 c++ 擴展。
        在開發 node 程序的過程中,如果能夠適當的利用 c++ 的能力是會對項目有很大的幫助的,在一些比較關鍵的地方,亦或者 node 弱項的地方,使用更鋒利的 c++ 來幫助我們解決問題。
        不要讓編程語言限制了你的想象力

        參考資料

        查看原文

        贊 15 收藏 9 評論 2

        賈順名 回答了問題 · 2020-07-28

        解決js如何將二維對象數組生成一維對象數據

        一個簡單的方法:

        大致思路就是使用一個 Map(就用 Object 也可以) 來中轉數據。

        Map 的 key 采用 date 的值,然后 value 中存儲 A組 B組 等各種字段所構成的一個 Object。

        遍歷所有的數據,填充到對應的 Map item 中,最后遍歷整個 Map,拼接成你想要的這種數據即可。

        Map 的大致結構如下:

        {
          [date1]: {
            [A組]: XXX,
            [B組]: XXX,
            ...
          },
          [date2]: {
            [A組]: XXX,
            [B組]: XXX,
            ...
          },
          ...
        }

        關注 2 回答 2

        賈順名 回答了問題 · 2019-10-19

        TypeScript 類型疑問

        function urlParams(data: Record<string, string | string[]>) {
          let urlParams = new URLSearchParams();
          for (let key in data) {
            urlParams.append(key, data[key]);
          }
          return urlParams.toString();
        }

        關注 3 回答 3

        賈順名 發布了文章 · 2019-05-30

        GitLab CI/CD 在 Node.js 項目中的實踐

        近期在按照業務劃分項目時,我們組被分了好多的項目過來,大量的是基于 Node.js 的,也是我們組持續在使用的語言。

        現有流程中的一些問題

        在維護多個項目的時候,會暴露出一些問題:

        1. 如何有效的使用 測試用例
        2. 如何有效的使用 ESLint
        3. 部署上線還能再快一些嗎

          1. 使用了 TypeScript 以后帶來的額外成本

        測試用例

        首先是測試用例,最初我們設計在了 git hooks 里邊,在執行 git commit 之前會進行檢查,在本地運行測試用例。
        這會帶來一個時間上的問題,如果是日常開發,這么操作還是沒什么問題的,但如果是線上 bug 修復,執行測試用例的時間依據項目大小可能會持續幾分鐘。
        而為了修復 bug,可能會采用 commit 的時候添加 -n 選項來跳過 hooks ,在修復 bug 時這么做無可厚非,但是即使大家在日常開發中都采用commit -n 的方式來跳過繁瑣的測試過程,這個也是沒有辦法管控的,畢竟是在本地做的這個校驗,是否遵循這個規則,全靠大家自覺。

        所以一段時間后發現,通過這種方式執行測試用例來規避一些風險的作用可能并不是很有效。

        ESLint

        然后就是 ESLint,我們團隊基于airbnbESLint 規則自定義了一套更符合團隊習慣的規則,我們會在編輯器中引入插件用來幫助高亮一些錯誤,以及進行一些自動格式化的操作。
        同時我們也在 git hooks 中添加了對應的處理,也是在 git commit 的時候進行檢查,如果不符合規范則不允許提交。
        不過這個與測試用例是相同的問題:

        1. 編輯器是否安裝 ESLint 插件無從得知,即使安裝插件、是否人肉忽略錯誤提示也無從得知。
        2. git hooks 可以被繞過

        部署上線的方式

        之前團隊的部署上線是使用shipit周邊套件進行部署的。
        部署環境強依賴本地,因為需要在本地建立倉庫的臨時目錄,并經過多次ssh XXX "command"的方式完成 部署 + 上線 的操作。
        shipit提供了一個有效的回滾方案,就是在部署后的路徑添加多個歷史部署版本的記錄,回滾時將當前運行的項目目錄指向之前的某個版本即可。_不過有一點兒坑的是,很難去選擇我要回滾到那個節點,以及保存歷史記錄需要占用額外的磁盤空間_
        不過正因為如此,shipit在部署多臺服務器時會遇到一些令人不太舒服的地方。

        如果是多臺新增的服務器,那么可以通過在shipit配置文件中傳入多個目標服務器地址來進行批量部署。
        但是假設某天需要上線一些小流量(比如四臺機器中的一臺),因為前邊提到的shipit回滾策略,這會導致單臺機器與其他三臺機器的歷史版本時間戳不一致(因為這幾臺機器不是同一時間上線的)
        提到了這個時間戳就另外提一嘴,這個時間戳的生成是基于執行上線操作的那臺機器的本地時間,之前有遇到過同事在本地測試代碼,將時間調整為了幾天前的時間,后時間沒有改回正確的時間時進行了一次部署操作,代碼出現問題后卻發現回滾失敗了,原因是該同事部署的版本時間戳太小,shipit 找不到之前的版本(shipit 可以設置保留歷史版本的數量,當時最早的一次時間戳也是大于本次出問題的時間戳的)

        也就是說,哪怕有一次進行過小流量上線,那么以后就用不了批量上線的功能了 (沒有去仔細研究shipit官方文檔,不知道會不會有類似--force之類的忽略歷史版本的操作)

        基于上述的情況,我們的部署上線耗時變為了: (__機器數量__)X(__基于本地網速的倉庫克隆、多次 ssh 操作的耗時總和__)。 P.S. 為了保證倉庫的有效性,每次執行 shipit 部署,它都會刪除之前的副本,重新克隆

        尤其是服務端項目,有時緊急的 bug 修復可能是在非工作時間,這意味著可能當時你所處的網絡環境并不是很穩定。
        我曾經晚上接到過同事的微信,讓我幫他上線項目,他家的 Wi-Fi 是某博士的,下載項目依賴的時候出了些問題。
        還有過使用移動設備開熱點的方式進行上線操作,有一次非前后分離的項目上線后,直接就收到了聯通的短信:「您本月流量已超出XXX」(當時還在用合約套餐,一月就800M流量)。

        TypeScript

        在去年下半年開始,我們團隊就一直在推動 TypeScript 的應用,因為在大型項目中,擁有明確類型的 TypeScript 顯然維護性會更高一些。
        但是大家都知道的, TypeScript 最終需要編譯轉換為 JavaScript(也有 tsc 那種的不生成 JS 文件,直接運行,不過這個更多的是在本地開發時使用,線上代碼的運行我們還是希望變量越少越好)。

        所以之前的上線流程還需要額外的增加一步,編譯 TS。
        而且因為shipit是在本地克隆的倉庫并完成部署的,所以這就意味著我們必須要把生成后的 JS 文件也放入到倉庫中,最直觀的,從倉庫的概覽上看著就很丑(50% TS、50% JS),同時這進一步增加了上線的成本。

        總結來說,現有的部署上線流程過于依賴本地環境,因為每個人的環境不同,這相當于給部署流程增加了很多不可控因素。

        如何解決這些問題

        上邊我們所遇到的一些問題,其實可以分為兩塊:

        1. 有效的約束代碼質量
        2. 快速的部署上線

        所以我們就開始尋找解決方案,因為我們的源碼是使用自建的 GitLab 倉庫來進行管理的,首先就找到了 GitLab CI/CD。
        在研究了一番文檔以后發現,它能夠很好的解決我們現在遇到的這些問題。

        要使用 GitLab CI/CD 是非常簡單的,只需要額外的使用一臺服務器安裝 gitlab-runner,并將要使用 CI/CD 的項目注冊到該服務上就可以了。
        GitLab 官方文檔中有非常詳細的安裝注冊流程:

        install | runner
        register | runner
        group register | repo 注冊 Group 項目時的一些操作

        上邊的注冊選擇的是注冊 group ,也就是整個 GitLab 某個分組下所有的項目。
        主要目的是因為我們這邊項目數量太多,單個注冊太過繁瑣(還要登錄到 runner 服務器去執行命令才能夠注冊)

        安裝時需要注意的地方

        官網的流程已經很詳細了,不過還是有一些地方可以做一些小提示,避免踩坑
        sudo gitlab-runner install --user=gitlab-runner --working-directory=/home/gitlab-runner

        這是 Linux 版本的安裝命令,安裝需要 root (管理員) 權限,后邊跟的兩個參數:

        • --userCI/CD 執行 job (后續所有的流程都是基于 job 的)時所使用的用戶名
        • --working-directoryCI/CD 執行時的根目錄路徑 個人的踩坑經驗是將目錄設置為一個空間大的磁盤上,因為 CI/CD 會生成大量的文件,尤其是如果使用 CI/CD 進行編譯 TS 文件并且將其生成后的 JS 文件緩存;這樣的操作會導致 innode 不足產生一些問題
        --user 的意思就是 CI/CD 執行使用該用戶進行執行,所以如果要編寫腳本之類的,建議在該用戶登錄的狀態下編寫,避免出現無權限執行 sudo su gitlab-runner

        注冊時需要注意的地方

        在按照官網的流程執行時,我們的 tag 是留空的,暫時沒有找到什么用途。。
        以及 executor 這個比較重要了,因為我們是從手動部署上線還是往這邊靠攏的,所以穩妥的方式是一步步來,也就是說我們選擇的是 shell ,最常規的一種執行方式,對項目的影響也是比較小的(官網示例給的是 docker

        .gitlab-ci.yml 配置文件

        上邊的環境已經全部裝好了,接下來就是需要讓 CI/CD 真正的跑起來
        runner 以哪種方式運行,就靠這個配置文件來描述了,按照約定需要將文件放置到 repo 倉庫的根路徑下。
        當該文件存在于倉庫中,執行 git push 命令后就會自動按照配置文件中所描述的動作進行執行了。

        上邊的兩個鏈接里邊信息非常完整,包含各種可以配置的選項。

        一般來講,配置文件的結構是這樣的:

        stages:
          - stage1
          - stage2
          - stage3
        
        job 1:
          stage: stage1
          script: echo job1
        
        job 2:
          stage: stage2
          script: echo job2
        
        job 3:
          stage: stage2
          script:
            - echo job3-1
            - echo job3-2
        
        job 4:
          stage: stage3
          script: echo job4

        stages 用來聲明有效的可被執行的 stage,按照聲明的順序執行。
        下邊的那些 job XXX 名字不重要,這個名字是在 GitLab CI/CD Pipeline 界面上展示時使用的,重要的是那個 stage 屬性,他用來指定當前的這一塊 job 隸屬于哪個 stage。
        script 則是具體執行的腳本內容,如果要執行多行命令,就像job 3那種寫法就好了。

        如果我們將上述的 stage、job 之類的換成我們項目中的一些操作install_dependencies、test、eslint之類的,然后將script字段中的值換成類似npx eslint之類的,當你把這個文件推送到遠端服務器后,你的項目就已經開始自動運行這些腳本了。
        并且可以在Pipelines界面看到每一步執行的狀態。

        P.S. 默認情況下,上一個 stage 沒有執行完時不會執行下一個 stage 的,不過也可以通過額外的配置來修改:
        allow failure
        when

        設置僅在特定的情況下觸發 CI/CD

        上邊的配置文件存在一個問題,因為在配置文件中并沒有指定哪些分支的提交會觸發 CI/CD 流程,所以默認的所有分支上的提交都會觸發,這必然不是我們想要的結果。
        CI/CD 的執行會占用系統的資源,如果因為一些開發分支的執行影響到了主干分支的執行,這是一件得不償失的事情。

        所以我們需要限定哪些分支才會觸發這些流程,也就是要用到了配置中的 only 屬性。

        使用only可以用來設置哪些情況才會觸發 CI/CD,一般我們這邊常用的就是用來指定分支,這個是要寫在具體的 job 上的,也就是大致是這樣的操作:

        具體的配置文檔
        job 1:
          stage: stage1
          script: echo job1
          only:
            - master
            - dev

        單個的配置是可以這樣寫的,不過如果 job 的數量變多,這么寫就意味著我們需要在配置文件中大量的重復這幾行代碼,也不是一個很好看的事情。
        所以這里可能會用到一個yaml的語法:

        這是一步可選的操作,只是想在配置文件中減少一些重復代碼的出現
        .access_branch_template: &access_branch
          only:
            - master
            - dev
        
        job 1:
          <<: *access_branch
          stage: stage1
          script: echo job1
        
        job 2:
          <<: *access_branch
          stage: stage2
          script: echo job2

        一個類似模版繼承的操作,官方文檔中也沒有提到,這個只是一個減少冗余代碼的方式,可有可無。

        緩存必要的文件

        因為默認情況下,CI/CD在執行每一步(job)時都會清理一下當前的工作目錄,保證工作目錄是干凈的、不包含一些之前任務留下的數據、文件。
        不過這在我們的 Node.js 項目中就會帶來一個問題。
        因為我們的 ESLint、單元測試 都是基于 node_modules 下邊的各種依賴來執行的。
        而目前的情況就相當于我們每一步都需要執行npm install,這顯然是一個不必要的浪費。

        所以就提到了另一個配置文件中的選項:cache

        用來指定某些文件、文件夾是需要被緩存的,而不能清除:

        cache:
          key: ${CI_BUILD_REF_NAME}
          paths:
            - node_modules/

        大致是這樣的一個操作,CI_BUILD_REF_NAME是一個 CI/CD 提供的環境變量,該變量的內容為執行 CI/CD 時所使用的分支名,通過這種方式讓兩個分支之間的緩存互不影響。

        部署項目

        如果基于上邊的一些配置,我們將 單元測試、ESLint 對應的腳本放進去,他就已經能夠完成我們想要的結果了,如果某一步執行出錯,那么任務就會停在那里不會繼續向后執行。
        不過目前來看,后邊已經沒有多余的任務供我們執行了,所以是時候將 部署 這一步操作接過來了。

        部署的話,我們目前選擇的是通過 rsync 來進行同步多臺服務器上的數據,一個比較簡單高效的部署方式。

        P.S. 部署需要額外的做一件事情,就是建立從gitlab runner所在機器gitlab-runner用戶到目標部署服務器對應用戶下的機器信任關系。
        有 N 多種方法可以實現,最簡單的就是在runner機器上執行 ssh-copy-id 將公鑰寫入到目標機器。
        或者可以像我一樣,提前將 runner 機器的公鑰拿出來,需要與機器建立信任關系時就將這個字符串寫入到目標機器的配置文件中。
        類似這樣的操作:ssh 10.0.0.1 "echo \"XXX\" >> ~/.ssh/authorized_keys"

        大致的配置如下:

        variables:
          DEPLOY_TO: /home/XXX/repo # 要部署的目標服務器項目路徑
        deploy:
          stage: deploy
          script:
            - rsync -e "ssh -o StrictHostKeyChecking=no" -arc --exclude-from="./exclude.list" --delete . 10.0.0.1:$DEPLOY_TO
            - ssh 10.0.0.1 "cd $DEPLOY_TO; npm i --only=production"
            - ssh 10.0.0.1 "pm2 start $DEPLOY_TO/pm2/$CI_ENVIRONMENT_NAME.json;"
        同時用到的還有variables,用來提出一些變量,在下邊使用。

        ssh 10.0.0.1 "pm2 start $DEPLOY_TO/pm2/$CI_ENVIRONMENT_NAME.json;",這行腳本的用途就是重啟服務了,我們使用pm2來管理進程,默認的約定項目路徑下的pm2文件夾存放著個個環境啟動時所需的參數。

        當然了,目前我們在用的沒有這么簡單,下邊會統一提到

        并且在部署的這一步,我們會有一些額外的處理

        這是比較重要的一點,因為我們可能會更想要對上線的時機有主動權,所以 deploy 的任務并不是自動執行的,我們會將其修改為手動操作還會觸發,這用到了另一個配置參數:

        deploy:
          stage: deploy
          script: XXX
          when: manual  # 設置該任務只能通過手動觸發的方式運行

        當然了,如果不需要,這個移除就好了,比如說我們在測試環境就沒有配置這個選項,僅在線上環境使用了這樣的操作

        更方便的管理 CI/CD 流程

        如果按照上述的配置文件進行編寫,實際上已經有了一個可用的、包含完整流程的 CI/CD 操作了。

        不過它的維護性并不是很高,尤其是如果 CI/CD 被應用在多個項目中,想做出某項改動則意味著所有的項目都需要重新修改配置文件并上傳到倉庫中才能生效。

        所以我們選擇了一個更靈活的方式,最終我們的 CI/CD 配置文件是大致這樣子的(省略了部分不相干的配置):

        variables:
          SCRIPTS_STORAGE: /home/gitlab-runner/runner-scripts
          DEPLOY_TO: /home/XXX/repo # 要部署的目標服務器項目路徑
        
        stages:
          - install
          - test
          - build
          - deploy_development
          - deploy_production
        
        install_dependencies:
          stage: install
          script: bash $SCRIPTS_STORAGE/install.sh
        
        unit_test:
          stage: test
          script: bash $SCRIPTS_STORAGE/test.sh
        
        eslint:
          stage: test
          script: bash $SCRIPTS_STORAGE/eslint.sh
        
        # 編譯 TS 文件
        build:
          stage: build
          script: bash $SCRIPTS_STORAGE/build.sh
        
        deploy_development:
          stage: deploy_development
          script: bash $SCRIPTS_STORAGE/deploy.sh 10.0.0.1
          only: dev     # 單獨指定生效分支
        
        deploy_production:
          stage: deploy_production
          script: bash $SCRIPTS_STORAGE/deploy.sh 10.0.0.2
          only: master  # 單獨指定生效分支

        我們將每一步 CI/CD 所需要執行的腳本都放到了 runner 那臺服務器上,在配置文件中只是執行了那個腳本文件。
        這樣當我們有什么策略上的調整,比如說 ESLint 規則的變更、部署方式之類的。
        這些都完全與項目之間進行解耦,后續的操作基本都不會讓正在使用 CI/CD 的項目重新修改才能夠支持(部分需要新增環境變量的導入之類的確實需要項目的支持)。

        接入釘釘通知

        實際上,當 CI/CD 執行成功或者失敗,我們可以在 Pipeline 頁面中看到,也可以設置一些郵件通知,但這些都不是時效性很強的。
        鑒于我們目前在使用釘釘進行工作溝通,所以就研究了一波釘釘機器人。
        發現有支持 GitLab 機器人,不過功能并不適用,只能處理一些 issues 之類的, CI/CD 的一些通知是缺失的,所以只好自己基于釘釘的消息模版實現一下了。

        因為上邊我們已經將各個步驟的操作封裝了起來,所以這個修改對同事們是無感知的,我們只需要修改對應的腳本文件,添加釘釘的相關操作即可完成,封裝了一個簡單的函數:

        function sendDingText() {
          local text="$1"
        
          curl -X POST "$DINGTALK_HOOKS_URL" \
          -H 'Content-Type: application/json' \
          -d '{
            "msgtype": "text",
            "text": {
                "content": "'"$text"'"
            }
          }'
        }
        
        # 具體發送時傳入的參數
        sendDingText "proj: $CI_PROJECT_NAME[$CI_JOB_NAME]\nenv: $CI_ENVIRONMENT_NAME\ndeploy success\n$CI_PIPELINE_URL\ncreated by: $GITLAB_USER_NAME\nmessage: $CI_COMMIT_MESSAGE"
        
        # 某些 case 失敗的情況下 是否需要更多的信息就看自己自定義咯
        sendDingText "error: $CI_PROJECT_NAME[$CI_JOB_NAME]\nenv: $CI_ENVIRONMENT_NAME"

        上述用到的環境變量,除了DINGTALK_HOOKS_URL是我們自定義的機器人通知地址以外,其他的變量都是有 GitLab runenr所提供的。

        各種變量可以從這里找到:predefined variables

        回滾處理

        聊完了正常的流程,那么也該提一下出問題時候的操作了。
        人非圣賢孰能無過,很有可能某次上線一些沒有考慮到的地方就會導致服務出現異常,這時候首要任務就是讓用戶還可以照常訪問,所以我們會選擇回滾到上一個有效的版本去。
        在項目中的 Pipeline 頁面 或者 Enviroment 頁面(這個需要在配置文件中某些 job 中手動添加這個屬性,一般會寫在 deploy 的那一步去),可以在頁面上選擇想要回滾的節點,然后重新執行 CI/CD 任務,即可完成回滾。

        不過這在 TypeScript 項目中會有一些問題,因為我們回滾一般來講是重新執行上一個版本 CI/CD 中的 deploy 任務,在 TS 項目中,我們在 runner 中緩存了 TS 轉換 JS 之后的 dist 文件夾,并且部署的時候也是直接將該文件夾推送到服務器的(TS項目的源碼就沒有再往服務器上推過了)。

        而如果我們直接點擊 retry 就會帶來一個問題,因為我們的 dist 文件夾是緩存的,而 deploy 并不會管這種事兒,他只會把對應的要推送的文件發送到服務器上,并重啟服務。

        而實際上 dist 還是最后一次(也就是出錯的那次)編譯出來的 JS 文件,所以解決這個問題有兩種方法:

        1. deploy 之前執行一下 build
        2. deploy 的時候進行判斷

        第一個方案肯定是不可行的,因為嚴重依賴于操作上線的人是否知道有這個流程。
        所以我們主要是通過第二種方案來解決這個問題。

        我們需要讓腳本在執行的時候知道,dist 文件夾里邊的內容是不是自己想要的。
        所以就需要有一個 __標識__,而做這個標識最簡單有效唾手可得的就是,git commit id。
        每一個 commit 都會有一個唯一的標識符號,而且我們的 CI/CD 執行也是依靠于新代碼的提交(也就意味著一定有 commit)。
        所以我們在 build 環節將當前的commit id也緩存了下來:

        git rev-parse --short HEAD > git_version

        同時在 deploy 腳本中添加額外的判斷邏輯:

        currentVersion=`git rev-parse --short HEAD`
        tagVersion=`touch git_version; cat git_version`
        
        if [ "$currentVersion" = "$tagVersion" ]
        then
            echo "git version match"
        else
            echo "git version not match, rebuild dist"
            bash ~/runner-scripts/build.sh  # 額外的執行 build 腳本
        fi

        這樣一來,就避免了回滾時還是部署了錯誤代碼的風險。

        關于為什么不將 build 這一步操作與 deploy 合并的原因是這樣的:
        因為我們會有很多臺機器,同時 job 會寫很多個,類似 deploy_1、deploy_2、deploy_all,如果我們將 build 的這一步放到 deploy
        那就意味著我們每次 deploy,即使是一次部署,但因為我們選擇一臺臺機器單獨操作,它也會重新生成多次,這也會帶來額外的時間成本

        hot fix 的處理

        CI/CD 運行了一段時間后,我們發現偶爾解決線上 bug 還是會比較慢,因為我們提交代碼后要等待完整的 CI/CD 流程走完。
        所以在研究后我們決定,針對某些特定情況hot fix,我們需要跳過ESLint、單元測試這些流程,快速的修復代碼并完成上線。

        CI/CD 提供了針對某些 Tag 可以進行不同的操作,不過我并不想這么搞了,原因有兩點:

        1. 這需要修改配置文件(所有項目)
        2. 這需要開發人員熟悉對應的規則(打 Tag

        所以我們采用了另一種取巧的方式來實現,因為我們的分支都是只接收Merge Request那種方式上線的,所以他們的commit title實際上是固定的:Merge branch 'XXX'。
        同時 CI/CD 會有環境變量告訴我們當前執行 CI/CDcommit message。
        我們通過匹配這個字符串來檢查是否符合某種規則來決定是否跳過這些job

        function checkHotFix() {
          local count=`echo $CI_COMMIT_TITLE | grep -E "^Merge branch '(hot)?fix/\w+" | wc -l`
        
          if [ $count -eq 0 ]
          then
            return 0
          else
            return 1
          fi
        }
        
        # 使用方法
        
        checkHotFix
        
        if [ $? -eq 0 ]
        then
          echo "start eslint"
          npx eslint --ext .js,.ts .
        else
          # 跳過該步驟
          echo "match hotfix, ignore eslint"
        fi

        這樣能夠保證如果我們的分支名為 hotfix/XXX 或者 fix/XXX 在進行代碼合并時, CI/CD 會跳過多余的代碼檢查,直接進行部署上線。 沒有跳過安裝依賴的那一步,因為 TS 編譯還是需要這些工具的

        小結

        目前團隊已經有超過一半的項目接入了 CI/CD 流程,為了方便同事接入(主要是編輯 .gitlab-ci.yml 文件),我們還提供了一個腳手架用于快速生成配置文件(包括自動建立機器之間的信任關系)。

        相較之前,部署的速度明顯的有提升,并且不再對本地網絡有各種依賴,只要是能夠將代碼 push 到遠程倉庫中,后續的事情就和自己沒有什么關系了,并且可以方便的進行小流量上線(部署單臺驗證有效性)。

        以及在回滾方面則是更靈活了一些,可在多個版本之間快速切換,并且通過界面的方式,操作起來也更加直觀。

        最終可以說,如果沒有 CI/CD,實際上開發模式也是可以忍受的,不過當使用了 CI/CD 以后,再去使用之前的部署方式,則會明顯的感覺到不舒適。(沒有對比,就沒有傷害??)

        完整的流程描述

        1. 安裝依賴
        2. 代碼質量檢查

          1. ESLint 檢查

            1. 檢查是否為 hotfix 分支,如果是則跳過本流程
          2. 單元測試

            1. 檢查是否為 hotfix 分支,如果是則跳過本流程
        3. 編譯 TS 文件
        4. 部署、上線

          1. 判斷當前緩存 dist 目錄是否為有效的文件夾,如果不是則重新執行第三步編譯 TS 文件
          2. 上線完畢后發送釘釘通知

        后續要做的

        接入 CI/CD 只是第一步,將部署上線流程統一后,可以更方便的做一些其他的事情。
        比如說在程序上線后可以驗證一下接口的有效性,如果發現有錯誤則自動回滾版本,重新部署。
        或者說接入 docker, 這些調整在一定程度上對項目維護者都是透明的。

        參考資料

        查看原文

        贊 14 收藏 8 評論 0

        認證與成就

        • 獲得 1230 次點贊
        • 獲得 14 枚徽章 獲得 1 枚金徽章, 獲得 2 枚銀徽章, 獲得 11 枚銅徽章

        擅長技能
        編輯

        開源項目 & 著作
        編輯

        • Router

          自己造的輪子,基于hash來做的一個單頁控制插件。 項目readme有詳細介紹。

        • Typescript-example

          一個基于`typescript`; 使用`sequelize-typescript`和`routing-controller`; 利用大量裝飾器實現的Node服務開發示例。

        注冊于 2015-10-28
        個人主頁被 5.4k 人瀏覽

        一本到在线是免费观看_亚洲2020天天堂在线观看_国产欧美亚洲精品第一页_最好看的2018中文字幕