国产无码免费,人妻口爆,国产V在线,99中文精品7,国产成人无码AA精品一,制度丝袜诱惑av,久久99免费麻辣视频,蜜臀久久99精品久久久久久酒店
        訂閱
        糾錯
        加入自媒體

        詳解Flink CEP的概念及功能

        2021-04-16 11:37
        園陌
        關注

           result.print("惡意用戶>>>")
           env.execute("BarrageBehavior01")
         }

        實例二:監(jiān)測刷屏用戶

        規(guī)則:用戶如果在10s內,同時連續(xù)輸入同樣一句話超過5次,就認為是惡意刷屏。

        使用 Flink CEP檢測刷屏用戶

        object BarrageBehavior02 {
         case class Message(userId: String, ip: String, msg: String)
         def main(args: Array[String]): Unit = {
           //初始化運行環(huán)境
           val env = StreamExecutionEnvironment.getExecutionEnvironment
           //設置并行度
           env.setParallelism(1)
           // 模擬數據源
           val loginEventStream: DataStream[Message] = env.fromCollection(
             List(
               Message("1", "192.168.0.1", "beijing"),
               Message("1", "192.168.0.2", "beijing"),
               Message("1", "192.168.0.3", "beijing"),
               Message("1", "192.168.0.4", "beijing"),
               Message("2", "192.168.10.10", "shanghai"),
               Message("3", "192.168.10.10", "beijing"),
               Message("3", "192.168.10.11", "beijing"),
               Message("4", "192.168.10.10", "beijing"),
               Message("5", "192.168.10.11", "shanghai"),
               Message("4", "192.168.10.12", "beijing"),
               Message("5", "192.168.10.13", "shanghai"),
               Message("5", "192.168.10.14", "shanghai"),
               Message("5", "192.168.10.15", "beijing"),
               Message("6", "192.168.10.16", "beijing"),
               Message("6", "192.168.10.17", "beijing"),
               Message("6", "192.168.10.18", "beijing"),
               Message("5", "192.168.10.18", "shanghai"),
               Message("6", "192.168.10.19", "beijing"),
               Message("6", "192.168.10.19", "beijing"),
               Message("5", "192.168.10.18", "shanghai")
             )
           )
           //定義模式
           val loginbeijingPattern = Pattern.begin[Message]("start")
             .where(_.msg != null) //一條登錄失敗
             .times(5).optional  //將滿足五次的數據配對打印
             .within(Time.seconds(10))
           //進行分組匹配
           val loginbeijingDataPattern = CEP.pattern(loginEventStream.keyBy(_.userId), loginbeijingPattern)
           //查找符合規(guī)則的數據
           val loginbeijingResult: DataStream[Option[Iterable[Message]]] = loginbeijingDataPattern.select(patternSelectFun = (pattern: collection.Map[String, Iterable[Message]]) => {
             var loginEventList: Option[Iterable[Message]] = null
             loginEventList = pattern.get("start") match {
               case Some(value) => {
                 if (value.toList.map(x => (x.userId, x.msg)).distinct.size == 1) {
                   Some(value)
                 } else {
                   None
                 }
               }
             }
             loginEventList
           })
           //打印測試
           loginbeijingResult.filter(x=>x。絅one).map(x=>{
             x match {
               case Some(value)=> value
             }
           }).print()
           env.execute("BarrageBehavior02)
         }

        Flink CEP API

        除了案例中介紹的幾個API外,我們在介紹下其他的常用API:

        1. 條件 API

        為了讓傳入事件被模式所接受,給模式指定傳入事件必須滿足的條件,這些條件由事件本身的屬性或者前面匹配過的事件的屬性統(tǒng)計量等來設定。比如,事件的某個值大于5,或者大于先前接受事件的某個值的平均值。

        可以使用pattern.where()、pattern.or()、pattern.until()方法來指定條件。條件既可以是迭代條件IterativeConditions,也可以是簡單條件SimpleConditions。

        FlinkCEP支持事件之間的三種臨近條件:

        next():嚴格的滿足條件

        示例:模式為begin("first").where(_.name='a').next("second").where(.name='b')當且僅當數據為a,b時,模式才會被命中。如果數據為a,c,b,由于a的后面跟了c,所以a會被直接丟棄,模式不會命中。

        followedBy():松散的滿足條件

        示例:模式為begin("first").where(_.name='a').followedBy("second").where(.name='b')當且僅當數據為a,b或者為a,c,b,模式均被命中,中間的c會被忽略掉。

        followedByAny():非確定的松散滿足條件

        示例:模式為begin("first").where(_.name='a').followedByAny("second").where(.name='b')當且僅當數據為a,c,b,b時,對于followedBy模式而言命中的為{a,b},對于followedByAny而言會有兩次命中{a,b},{a,b}。

        2. 量詞 API

        還記得我們在上面講解模式概念時說過的一句話:一般情況下,模式都是單例模式,可以使用量詞(Quantifiers)將其轉換為循環(huán)模式。這里的量詞就是指的量詞API。

        以下這幾個量詞API,可以將模式指定為循環(huán)模式:

        pattern.oneOrMore():一個給定的事件有一次或多次出現,例如上面提到的b+。

        pattern.times(#ofTimes):一個給定類型的事件出現了指定次數,例如4次。

        pattern.times(#fromTimes, #toTimes):一個給定類型的事件出現的次數在指定次數范圍內,例如2~4次。

        可以使用pattern.greedy()方法將模式變成循環(huán)模式,但是不能讓一組模式都變成循環(huán)模式。greedy:就是盡可能的重復。

        使用pattern.optional()方法將循環(huán)模式變成可選的,即可以是循環(huán)模式也可以是單個模式。

        3. 匹配后的跳過策略

        所謂的匹配跳過策略,是對多個成功匹配的模式進行篩選。也就是說如果多個匹配成功,可能我不需要這么多,按照匹配策略,過濾下就可以。

        Flink中有五種跳過策略:

        NO_SKIP: 不過濾,所有可能的匹配都會被發(fā)出。

        SKIP_TO_NEXT: 丟棄與開始匹配到的事件相同的事件,發(fā)出開始匹配到的事件,即直接跳到下一個模式匹配到的事件,以此類推。

        SKIP_PAST_LAST_EVENT: 丟棄匹配開始后但結束之前匹配到的事件。

        SKIP_TO_FIRST[PatternName]: 丟棄匹配開始后但在PatternName模式匹配到的第一個事件之前匹配到的事件。

        SKIP_TO_LAST[PatternName]: 丟棄匹配開始后但在PatternName模式匹配到的最后一個事件之前匹配到的事件。

        怎么理解上述策略,我們以NO_SKIP和SKIP_PAST_LAST_EVENT為例講解下:

        在模式為:begin("start").where(_.name='a').oneOrMore().followedBy("second").where(_.name='b')中,我們輸入數據:a,a,a,a,b ,如果是NO_SKIP策略,即不過濾策略,模式匹配到的是:{a,b},{a,a,b},{a,a,a,b},{a,a,a,a,b};如果是SKIP_PAST_LAST_EVENT策略,即丟棄匹配開始后但結束之前匹配到的事件,模式匹配到的是:{a,a,a,a,b}。

        Flink CEP 的使用場景

        除上述案例場景外,Flink CEP 還廣泛用于網絡欺詐,故障檢測,風險規(guī)避,智能營銷等領域。

        1. 實時反作弊和風控

        對于電商來說,羊毛黨是必不可少的,國內拼多多曾爆出 100 元的無門檻券隨便領,當晚被人褥幾百億,對于這種情況肯定是沒有做好及時的風控。另外還有就是商家上架商品時通過頻繁修改商品的名稱和濫用標題來提高搜索關鍵字的排名、批量注冊一批機器賬號快速刷單來提高商品的銷售量等作弊行為,各種各樣的作弊手法也是需要不斷的去制定規(guī)則去匹配這種行為。

        2. 實時營銷

        分析用戶在手機 APP 的實時行為,統(tǒng)計用戶的活動周期,通過為用戶畫像來給用戶進行推薦。比如用戶在登錄 APP 后 1 分鐘內只瀏覽了商品沒有下單;用戶在瀏覽一個商品后,3 分鐘內又去查看其他同類的商品,進行比價行為;用戶商品下單后 1 分鐘內是否支付了該訂單。如果這些數據都可以很好的利用起來,那么就可以給用戶推薦瀏覽過的類似商品,這樣可以大大提高購買率。

        3. 實時網絡攻擊檢測

        當下互聯網安全形勢仍然嚴峻,網絡攻擊屢見不鮮且花樣眾多,這里我們以 DDOS(分布式拒絕服務攻擊)產生的流入流量來作為遭受攻擊的判斷依據。對網絡遭受的潛在攻擊進行實時檢測并給出預警,云服務廠商的多個數據中心會定時向監(jiān)控中心上報其瞬時流量,如果流量在預設的正常范圍內則認為是正,F象,不做任何操作;如果某數據中心在 10 秒內連續(xù) 5 次上報的流量超過正常范圍的閾值,則觸發(fā)一條警告的事件;如果某數據中心 30 秒內連續(xù)出現 30 次上報的流量超過正常范圍的閾值,則觸發(fā)嚴重的告警。

        Flink CEP 的原理簡單介紹

        Apache Flink在實現CEP時借鑒了Efficient Pattern Matching over Event Streams論文中NFA的模型,在這篇論文中,還提到了一些優(yōu)化,我們在這里先跳過,只說下NFA的概念。

        在這篇論文中,提到了NFA,也就是Non-determined Finite Automaton,叫做不確定的有限狀態(tài)機,指的是狀態(tài)有限,但是每個狀態(tài)可能被轉換成多個狀態(tài)(不確定)。

        非確定有限自動狀態(tài)機:

        先介紹兩個概念:

        狀態(tài):狀態(tài)分為三類,起始狀態(tài)、中間狀態(tài)和最終狀態(tài)。

        轉換:take/ignore/proceed都是轉換的名稱。

        在NFA匹配規(guī)則里,本質上是一個狀態(tài)轉換的過程。三種轉換的含義如下所示:

        Take: 主要是條件的判斷,當過來一條數據進行判斷,一旦滿足條件,獲取當前元素,放入到結果集中,然后將當前狀態(tài)轉移到下一個的狀態(tài)。

        Proceed:當前的狀態(tài)可以不依賴任何的事件轉移到下一個狀態(tài),比如說透傳的意思。

        Ignore:當一條數據到來的時候,可以忽略這個消息事件,當前的狀態(tài)保持不變,相當于自己到自己的一個狀態(tài)。

        NFA的特點:在NFA中,給定當前狀態(tài),可能有多個下一個狀態(tài)。可以隨機選擇下一個狀態(tài),也可以并行(同時)選擇下一個狀態(tài)。輸入符號可以為空。

        規(guī)則引擎

        規(guī)則引擎:將業(yè)務決策從應用程序代碼中分離出來,并使用預定義的語義模塊編寫業(yè)務決策。接受數據輸入,解釋業(yè)務規(guī)則,并根據業(yè)務規(guī)則做出業(yè)務決策。
        使用規(guī)則引擎可以通過降低實現復雜業(yè)務邏輯的組件的復雜性,降低應用程序的維護和可擴展性成本。

        1. Drools

        Drools 是一款使用 Java 編寫的開源規(guī)則引擎,通常用來解決業(yè)務代碼與業(yè)務規(guī)則的分離,它內置的 Drools Fusion 模塊也提供 CEP 的功能。

        優(yōu)勢:

        功能較為完善,具有如系統(tǒng)監(jiān)控、操作平臺等功能。規(guī)則支持動態(tài)更新。

        劣勢:

        以內存實現時間窗功能,無法支持較長跨度的時間窗。無法有效支持定時觸達(如用戶在瀏覽發(fā)生一段時間后觸達條件判斷)。2. Aviator

        Aviator 是一個高性能、輕量級的 Java 語言實現的表達式求值引擎,主要用于各種表達式的動態(tài)求值。

        優(yōu)勢:

        支持大部分運算操作符。支持函數調用和自定義函數。支持正則表達式匹配。支持傳入變量并且性能優(yōu)秀。

        劣勢:

        沒有 if else、do while 等語句,沒有賦值語句,沒有位運算符。3. EasyRules

        EasyRules 集成了 MVEL 和 SpEL 表達式的一款輕量級規(guī)則引擎。

        優(yōu)勢:

        輕量級框架,學習成本低; POJO。為定義業(yè)務引擎提供有用的抽象和簡便的應用。支持從簡單的規(guī)則組建成復雜規(guī)則。4. Esper

        Esper 設計目標為 CEP 的輕量級解決方案,可以方便的嵌入服務中,提供 CEP 功能。

        優(yōu)勢:

        輕量級可嵌入開發(fā),常用的 CEP 功能簡單好用。EPL 語法與 SQL 類似,學習成本較低。

        劣勢:

        單機全內存方案,需要整合其他分布式和存儲。以內存實現時間窗功能,無法支持較長跨度的時間窗。無法有效支持定時觸達(如用戶在瀏覽發(fā)生一段時間后觸達條件判斷)。5. Flink CEP

        Flink 是一個流式系統(tǒng),具有高吞吐低延遲的特點,Flink CEP 是一套極具通用性、易于使用的實時流式事件處理方案。

        優(yōu)勢:

        繼承了 Flink 高吞吐的特點。事件支持存儲到外部,可以支持較長跨度的時間窗?梢灾С侄〞r觸達(用 followedBy + PartternTimeoutFunction 實現)。


        <上一頁  1  2  
        聲明: 本文由入駐維科號的作者撰寫,觀點僅代表作者本人,不代表OFweek立場。如有侵權或其他問題,請聯系舉報。

        發(fā)表評論

        0條評論,0人參與

        請輸入評論內容...

        請輸入評論/評論長度6~500個字

        您提交的評論過于頻繁,請輸入驗證碼繼續(xù)

        暫無評論

        暫無評論

          人工智能 獵頭職位 更多
          掃碼關注公眾號
          OFweek人工智能網
          獲取更多精彩內容
          文章糾錯
          x
          *文字標題:
          *糾錯內容:
          聯系郵箱:
          *驗 證 碼:

          粵公網安備 44030502002758號

          主站蜘蛛池模板: xxxx欧美| AV不卡在线观看| 滦平县| 亚洲黄色片| 蕲春县| 国产女同疯狂摩擦奶6| 乌鲁木齐县| 丝袜无码| 国产中文在线| 1024免费看| 人妻少妇精品久久久久久0000 | 亚洲资源在线| 日韩色区| 中文区av无码中文字幕dⅴd| 在线观看视频91| 国产成人一区二区三区A片免费| 国产在线网址| 亚洲中文字幕日韩| 安岳县| 丰满熟妇高潮一二三区| 亚洲爽图| 普兰县| 成人亚洲视频| 青娱乐av| 乃东县| mdapptv免费下载| 夜夜国自一区| 漳浦县| xxx综合网| 亚洲最大成人| 铜鼓县| 7777精品伊人久久久大香价格| 999无码精品亚洲精品日韩人妻无码| 宁都县| 肉色网站| 日本噜噜影院| 久久熟女| 亚洲黄色成人网站| 成人网站国产| 99国产视频| 人妻人人爽|