俞楓, 張忍, 鄭爽
(國泰君安證券股份有限公司, 上海 200120)
隨著移動互聯網技術的普遍應用, 券商行業開發客戶從傳統的線下營銷推廣模式逐步轉變為線下、線上并行的營銷推廣模式。線上營銷推廣活動具有用戶群體大、時效性強、規則變動大等特點, 保證并提升用戶的參與體驗、減少或消除營銷推廣活動對股票交易業務的影響、減少營銷推廣活動開發周期、變更周期是進行日常性、大規模線上營銷推廣活動需要解決的根本技術問題。
目前券商行業普遍采用的營銷推廣活動后臺框架是恒生統一金融接入系統(UFX)服務節點和Oracle關系型數據庫, 該框架存在以下弊端。
1)當參與營銷推廣活動的用戶數目激增時將過度占用統一金融總線(目前券商業務大部分通過該消息總線接入和傳遞)的消息通道資源,此時股票交易客戶的交易事件和營銷推廣活動參與客戶的參與事件都會出現響應超時現象,影響用戶體驗乃至給客戶造成經濟損失。
2)采用傳統的Oracle關系型數據庫,不同的營銷推廣活動因需要記錄的用戶參與狀態數據范式不同,需要預先定義不同的用戶參與狀態記錄表,并實現與之相對應的數據庫CRUD操作,活動難以實現動態、自由定義,拉長活動的開發、變更周期。采用傳統的Oracle關系型數據庫,需要額外的分庫分表中間件(如Mycat)以水平擴展數據容量,當用戶數目激增時難以快速擴展數據庫節點。
本文采用分布式流處理架構[1],如圖1所示。

圖1 系統架構圖
消息隊列中間件Kafka接收線上營銷推廣活動中各種渠道來源的用戶操作事件,Storm分布式計算框架并行處理用戶操作事件,MongoDB存儲活動規則,記錄用戶的活動參與狀態、用戶的獎勵狀態、事件處理狀態,Zookeeper分布式應用協調服務觸發Storm節點動態加載活動規則。
采用該技術方案具有支持并發用戶數大、事件流并發處理能力強、 營銷推廣活動可以模板化配置并動態生效、 不同活動用戶記錄可以同一個數據表記錄且無須預先定義字段、基本不占用統一金融總線資源、具備消息緩沖能力、無須額外中間件便可分庫分表等特性。
Apache Storm是目前主流的流式分布式計算框架之一,它是由Twitter開發,阿里巴巴發展的開源軟件。Storm可以在工作進程、線程、任務3個層級進行擴展以提升并行計算能力,同時Storm還具有高可用、高容錯特性以及完整的消息處理確認機制[2][3]。
Apache Kafka是目前主流的高吞吐量分布式發布訂閱消息系統之一,他是由LinkedIn開發的開源軟件。Kafka具有輕量級、分布式、可分區、分區多備份等技術特點,具有高可用、高可靠、高吞吐量等性能特點[4][5]。
MongoDB是基于分布式文件系統的非關系型數據庫。MongoDB是基于文檔存儲的,文檔的數據結構非常松散,類似于json數據格式的bjson格式。在使用MongoDB時無須預定義表字段,表里面的每條記錄可以具有不同的數據字段[6][7]。MongoDB分片副本集是高可用、易擴展的分庫分表數據庫方案,提升數據存儲能力及數據操作性能,同時不需要使用額外的數據庫中間件][8][9]。
Apache ZooKeeper是一個分布式的分布式應用程序協調服務,是Hadoop大數據生態圈的基礎設施之一。它可以在分布式計算框架Storm中作為計算節點的配置信息同步協調者[10]。
通過對常見的營銷推廣活動的規則進行分析,歸納總結出簡單營銷推廣活動通用性強的規則,比如以下幾個規則:
1)首次做某事送獎勵,比如首次注冊APP事件、首次上傳頭像事件等。
2)累計做某個動作達到N次即發放獎勵,比如累計簽到N天事件等。
3)每周期做某事,送獎勵,比如每天簽到事件等。
4)某指標在某范圍內送獎勵,比如賬戶資產超過某值事件,充值超過某值事件。
對簡單活動的通用規則抽象是實現營銷推廣活動模板化配置的前提。規則原型的定義含有原型ID、原型名稱、觸發事件列表、具體事件處理類、用戶狀態記錄字段定義信息,如表1所示。
表1中具體事件處理類是與觸發事件列表無關的處理類,比如每周期做某事的處理類、首次做某事的處理類。觸發事件列表包含了可以觸發該原型的用戶操作事件,比如用戶登錄APP。用戶狀態記錄字段主要是記錄用戶發生操作事件后事件的處理結果,比如首次登錄APP事件,將記錄首次登錄APP的時間,送過對應獎勵的標志。實際開發過程中只有具體事件處理類是需要開發的。記錄字段、規則入參、關聯動作等都是通過配置實現。具體事件處理類通過抽象可以高度復用,比如處理首次事件的處理類,可以復用于首次注冊APP、首次設置昵稱、首次上傳頭像等規則原型,極大地減少了代碼開發量、縮短了上線周期。規則原型配置可以在多個營銷推廣活動中復用,比如首次登錄APP事件可以在多個APP促活活動中重復使用。

表1 規則原型表
選擇營銷推廣活動所需要的規則原型,快速構建活動處理規則,比如APP簽到活動,可以選擇每次簽到規則原型、累計簽到規則原型、連續簽到規則原型,設置每個實例規則的獎勵條件(如累計簽到多少天)和獎勵類型、數目,設置活動開始、結束日期、面向的客戶范圍等必要信息就能產生一個活動完整的規則配置。規則具體處理類、規則原型高度復用可以減少上線新營銷活動所需的開發測試時間,同時活動規則參數、活動規則獎勵支持動態自由配置。
本系統的組件如圖1所示,用戶參與事件流轉如圖2所示。

圖2 用戶事件流轉流程圖
MongoDB主要作用是存儲規則原型定義、活動規則定義、事件處理狀態、用戶的活動參與狀態、用戶獎勵數據及發放狀態等。如果采用傳統的關系型數據庫則每次創建新活動時要創建不同的用戶參與狀態記錄表,因為每個活動需要記錄的活動參與狀態數據字段是不一樣的,比如簽到活動和新開戶用戶活動,一個活動需要記錄的是簽到狀態,一個需要記錄的新開戶狀態,兩者在表結構上并不兼容。采用MongoDB的優點是MongoDB數據表中的字段無須預先定義,同一個表里可以容納數據字段完全不同的記錄,通過活動ID和用戶ID作為MongoDB用戶狀態表的索引和公共字段,可以快速檢索到每個活動下每個用戶的參與狀態,而用戶的參與狀態記錄字段通過規則配置已經確定,無須再額外定義。為了滿足大量線上用戶同時參與活動的需求,MongoDB集群采用分片副本集群如圖3所示。

圖3 MongoDB分片副本集
分片類似于傳統關系型數據庫的分庫分表,可以提升數據容量,副本集通過數據多節點備份保證數據高可用,并可主從讀寫分離提升讀寫性能。MongoS節點主要負責數據路由,MongoC節點主要負責維持數據分片信息,MongoD節點主要負責存儲數據,客戶端Client通過訪問MongoS從MongoC中獲取數據所在數據副本集并從對應的MongoD數據副本集中獲取、存儲數據。以活動ID、用戶ID作為數據分片依據,將用戶參與狀態、用戶獎勵數據分散到各數據副本集中,可以控制每個數據副本集的數據量,提高系統的數據容量、數據讀寫速度,以滿足互聯網化的應用需求。
Kafka消息通道主要作用是采用異步消息發送模式將營銷推廣活動的用戶操作事件快速從統一金融總線剝離,減少營銷活動對統一金融總線資源的占用,使其更專注于證券交易核心業務。利用Kafka的高吞吐量、高可用特性以滿足互聯網環境下大量用戶同時產生的操作事件消息的及時投遞。對各種觸發事件采用統一的Kafka通道和消息格式可以減少各種操作事件消息的對接時間,便于各類型消息的統一校驗和備份。如圖4所示。

圖4 Kafka分布式消息隊列
Kafka Producer 異步發送消息到Kafka 集群,Kafka Consumer(Storm Spout)從Kafka消費消息。Producer和Consumer互不干擾,Consumer未能及時消費消息并不會阻塞Producer發送消息,Kafka會將未消費的消息存儲到文件系統,Consumer按自己的消費能力從Kafka拉取消息,所以Kafka在本系統中還起到訪問壓力緩沖的功能。目前對接的事件包含注冊、開戶、登錄、簽到、邀請、點擊頁面、賬戶操作等事件。
Storm流式分布式計算框架分為三級處理節點如圖5,第1級 KafkaSpout按分區從Kafka獲取數據。第 2級 MsgParseBolt進行Kafka消息格式校驗、并轉換為特定數據格式。第 3級 MsgProcessBolt匹配活動ID和觸發事件,找出對應的處理規則,判斷是否滿足規則,記錄事件處理狀態和用戶參與狀態并決定是否產生獎勵流水。MsgParseBolt轉換消息格式后會將屬于同一個用戶的操作事件發送到同一個ProcessBolt 進行串行處理,這樣可以減少同一用戶數據并發操作時的鎖等待時間,同時也可避免少量客戶通過未知方式攻擊系統時造成系統完全不可用的可能性,如圖5客戶U1、U2的操作事件EA、EB達到MsgProcessBolt時同一個客戶的事件由同一個MsgProcessBolt處理。MsgProcessBolt 處理完用戶操作事件會發確認消息給Spout,當Spout未確認處理的用戶操作事件超過一定數目時,Spout會停止從Kafka拉取數據,避免Storm內部各節點消息堆積造成內存占用過多等問題。
Storm的三級處理節點可以按照需要獨立靈活配置所需節點數目,可以在進程、線程、任務等多個層級進行配置,可根據用戶流量快速進行系統擴容以滿足互聯網環境下大量用戶參與活動的需要。
Zookeeper主要作用是協調Storm分布式計算節點間的數據同步。活動配置信息是Storm拓撲啟動時加載到內存的,當有活動配置變更時,通過Zookeeper告知Storm各節點重新加載活動配置,保證不用停止Storm計算拓撲也可以重新加載規則并保證各計算節點活動配置一致。如圖5所示。

圖5 分布式計算框架Storm
對Kafka性能測試結果,如表2所示。
可以發現采用異步模式能提升Kafka的消息接收能力,本文采用了該種模式,各種來源的用戶操作事件統一成Kafka消息格式并異步發送到多分區的Kafka消息隊列上由Storm統一消費并處理。采用多分區是為了提升Storm Spout節點的并發處理能力,Storm Spout的數目和Kafka分區的數目保持一致時能使Spout的性能最優,即不會有Storm Spout空閑也不會有Storm Spout要處理多個分區,每個Storm Spout專注處理一個分區。消息隊列3個分區可以由3個Storm Spout并發處理消息,增加Storm第1級的并發處理能力。

表2 Kafka性能測試
MongoDB性能測試結果,如表3所示。

表3 MongoDB數據庫性能測試
通過上述測試可以發現分片副本集INSERT,UPDATE性能上優于普通副本集,因此本文采用的時MongoDB分片副本集。
測試不同Storm 節點下的Storm的消息處理能力。如表4所示。

表4 Storm性能測試
目前而言,本系統的峰值壓力每秒1000個用戶操作事件,通過測試可以發現目前系統架構能夠承受該峰值壓力。
傳統架構和分布式流計算架構的性能對比(一個用戶操作事件如圖2觸發1次消息處理狀態查詢、1次用戶狀態查詢、1次用戶狀態修改、1次消息處理狀態修改,1次獎勵庫存查詢,1次獎勵發放流水新增共6個數據庫操作),如表5所示。

表5 架構性能測試
可以得出結論分布式流計算架構對總線通道的占用時間遠小于傳統的金融總線服務節點+Oracle架構的占用時間,極大地減少了營銷推廣活動對核心交易業務產生影響的可能性。在事件處理性能上,分布式流計算架構每秒處理用戶操作事件數顯著高于傳統架構,且當用戶操作事件數高于分布式流計算架構處理能力時,Kafka可以起到消息緩沖作用,操作事件并不會堆積在金融總線隊列上,而使用金融總線服務節點是沒有緩沖機制的,超過處理能力將導致事件堆積在金融總線隊列上,可能影響正常交易。
本文通過歸納總結一般營銷推廣活動常見的用戶事件類型,演化出通用性強的活動規則原型,并基于MongoDB數據表無模式的特點形成可以動態配置無須預定義用戶參與狀態記錄表結構的活動規則。通過使用Kafka異步多分區模式承接營銷活動中大量用戶同時產生的操作事件并起到緩沖作用,通過使用分布式Storm計算框架和MongoDB 分片副本集提升實時流數據處理能力,并可根據需要快速水平擴展計算能力,通過Zookeeper協調Storm分布式計算節點進行活動配置同步,實現活動配置的動態設置和動態生效并保證各節點活動配置一致,經性能測試本系統能夠承受目前券商公司線上營銷推廣活動的用戶并發操作壓力。