国产无码免费,人妻口爆,国产V在线,99中文精品7,国产成人无码AA精品一,制度丝袜诱惑av,久久99免费麻辣视频,蜜臀久久99精品久久久久久酒店
        訂閱
        糾錯
        加入自媒體

        詳解Flink CEP的概念及功能

        2021-04-16 11:37
        園陌
        關注

        我們在看直播的時候,不管對于主播還是用戶來說,非常重要的一項就是彈幕文化。為了增加直播趣味性和互動性, 各大網絡直播平臺紛紛采用彈窗彈幕作為用戶實時交流的方式,內容豐富且形式多樣的彈幕數據中隱含著復雜的用戶屬性與用戶行為, 研究并理解在線直播平臺用戶具有彈幕內容審核與監控、輿論熱點預測、個性化摘要標注等多方面的應用價值。

        本文不分析彈幕數據的應用價值,只通過彈幕內容審核與監控案例來了解下Flink CEP的概念及功能。

        在用戶發彈幕時,直播平臺主要實時監控識別兩類彈幕內容:一類是發布不友善彈幕的用戶 ;一類是刷屏的用戶。

        我們先記住上述需要實時監控識別的兩類用戶,接下來介紹Flink CEP的API,然后使用CEP解決上述問題。

        Flink CEPFlink CEP 是什么

        Flink CEP是一個基于Flink的復雜事件處理庫,可以從多個數據流中發現復雜事件,識別有意義的事件(例如機會或者威脅),并盡快的做出響應,而不是需要等待幾天或則幾個月相當長的時間,才發現問題。

        Flink CEP API

        CEP API的核心是Pattern(模式) API,它允許你快速定義復雜的事件模式。每個模式包含多個階段(stage)或者我們也可稱為狀態(state)。從一個狀態切換到另一個狀態,用戶可以指定條件,這些條件可以作用在鄰近的事件或獨立事件上。

        介紹API之前先來理解幾個概念:

        1. 模式與模式序列

        簡單模式稱為模式,將最終在數據流中進行搜索匹配的復雜模式序列稱為模式序列,每個復雜模式序列是由多個簡單模式組成。

        匹配是一系列輸入事件,這些事件通過一系列有效的模式轉換,能夠訪問復雜模式圖的所有模式。

        每個模式必須具有唯一的名稱,我們可以使用模式名稱來標識該模式匹配到的事件。

        2. 單個模式

        一個模式既可以是單例的,也可以是循環的。單例模式接受單個事件,循環模式可以接受多個事件。

        3. 模式示例:

        有如下模式:a b+ c?d

        其中a,b,c,d這些字母代表的是模式,+代表循環,b+就是循環模式;?代表可選,c?就是可選模式;

        所以上述模式的意思就是:a后面可以跟一個或多個b,后面再可選的跟c,最后跟d。

        其中a、c? 、d是單例模式,b+是循環模式。

        一般情況下,模式都是單例模式,可以使用量詞(Quantifiers)將其轉換為循環模式。

        每個模式可以帶有一個或多個條件,這些條件是基于事件接收進行定義的。或者說,每個模式通過一個或多個條件來匹配和接收事件。

        了解完上述概念后,接下來介紹下案例中需要用到的幾個CEP API:

        案例中用到的CEP API:

        Begin:定義一個起始模式狀態

        用法:start = Pattern.<Event>begin("start");

        Next:附加一個新的模式狀態。匹配事件必須直接接續上一個匹配事件

        用法:next = start.next("next");

        Where:定義當前模式狀態的過濾條件。僅當事件通過過濾器時,它才能與狀態匹配

        用法:patternState.where(_.message == "TMD");

        Within: 定義事件序列與模式匹配的最大時間間隔。如果未完成的事件序列超過此時間,則將其丟棄

        用法:patternState.within(Time.seconds(10));

        Times:一個給定類型的事件出現了指定次數

        用法:patternState.times(5);

        API 先介紹以上這幾個,接下來我們解決下文章開頭提到的案例:

        監測用戶彈幕行為案例

        案例一:監測惡意用戶

        規則:用戶如果在10s內,同時輸入 TMD 超過5次,就認為用戶為惡意攻擊,識別出該用戶。

        使用 Flink CEP 檢測惡意用戶:

        import org.apache.flink.api.scala._
        import org.apache.flink.cep.PatternSelectFunction
        import org.apache.flink.cep.scala.{CEP, PatternStream}
        import org.apache.flink.cep.scala.pattern.Pattern
        import org.apache.flink.streaming.api.TimeCharacteristic
        import org.apache.flink.streaming.api.scala.{DataStream, OutputTag, StreamExecutionEnvironment}
        import org.apache.flink.streaming.api.windowing.time.Time
        object BarrageBehavior01 {
         case class  LoginEvent(userId:String, message:String, timestamp:Long){
           override def toString: String = userId
         }
         def main(args: Array[String]): Unit = {
           val env = StreamExecutionEnvironment.getExecutionEnvironment
           // 使用IngestionTime作為EventTime
           env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
           // 用于觀察測試數據處理順序
           env.setParallelism(1)
           // 模擬數據源
           val loginEventStream: DataStream[LoginEvent] = env.fromCollection(
             List(
               LoginEvent("1", "TMD", 1618498576),
               LoginEvent("1", "TMD", 1618498577),
               LoginEvent("1", "TMD", 1618498579),
               LoginEvent("1", "TMD", 1618498582),
               LoginEvent("2", "TMD", 1618498583),
               LoginEvent("1", "TMD", 1618498585)
             )
           ).assignAscendingTimestamps(_.timestamp * 1000)
           //定義模式
           val loginEventPattern: Pattern[LoginEvent, LoginEvent] = Pattern.begin[LoginEvent]("begin")
             .where(_.message == "TMD")
             .times(5)
             .within(Time.seconds(10))
           //匹配模式
           val patternStream: PatternStream[LoginEvent] = CEP.pattern(loginEventStream.keyBy(_.userId), loginEventPattern)
           import scala.collection.Map
           val result = patternStream.select((pattern:Map[String, Iterable[LoginEvent]])=> {
             val first = pattern.getOrElse("begin", null).iterator.next()
             (first.userId, first.timestamp)
           })
           //惡意用戶,實際處理可將按用戶進行禁言等處理,為簡化此處僅打印出該用戶

        1  2  下一頁>  
        聲明: 本文由入駐維科號的作者撰寫,觀點僅代表作者本人,不代表OFweek立場。如有侵權或其他問題,請聯系舉報。

        發表評論

        0條評論,0人參與

        請輸入評論內容...

        請輸入評論/評論長度6~500個字

        您提交的評論過于頻繁,請輸入驗證碼繼續

        暫無評論

        暫無評論

          人工智能 獵頭職位 更多
          掃碼關注公眾號
          OFweek人工智能網
          獲取更多精彩內容
          文章糾錯
          x
          *文字標題:
          *糾錯內容:
          聯系郵箱:
          *驗 證 碼:

          粵公網安備 44030502002758號

          主站蜘蛛池模板: 美女网站免费| 国产成人久久久777777麻豆| 红河县| 91纯肉无码动漫在线观看| 峨山| 中文字幕A片免费观看| 克什克腾旗| 秋霞一区| 婷婷激情综合| 国产又色| 无码中文人妻| 德令哈市| 青娱乐午夜| 少妇视频网站| 夜夜夜爽| 国产成人精品亚洲男人的天堂| 内射自拍| 亚洲天堂中文字幕| 桃江县| 九九国产在线观看| 女人天堂久久| 平顶山市| 欧美精| 小明福利社| 无码欧洲| 桃花岛av| 丝袜精品字幕| 97国产视频| 九色91| 国产精品18| 精品?国产区一区二| 思思热在线播放| 午夜性福利| 熟女老骚91PORN九色| 永久黄片| 亚洲熟女字幕| 中文字幕高清在线观看| 91热视频| 91探花视频在线观看| 亚洲另类图| 开江县|