国产无码免费,人妻口爆,国产V在线,99中文精品7,国产成人无码AA精品一,制度丝袜诱惑av,久久99免费麻辣视频,蜜臀久久99精品久久久久久酒店
        訂閱
        糾錯
        加入自媒體

        一文詳解Flink知識體系

        2021-09-13 09:58
        園陌
        關注

        4) Flink 關聯 Hive 分區表

        Flink 1.12 支持了 Hive 最新的分區作為時態表的功能,可以通過 SQL 的方式直接關聯 Hive 分區表的最新分區,并且會自動監聽最新的 Hive 分區,當監控到新的分區后,會自動地做維表數據的全量替換。通過這種方式,用戶無需編寫 DataStream 程序即可完成 Kafka 流實時關聯最新的 Hive 分區實現數據打寬。

        具體用法:

        在 Sql Client 中注冊 HiveCatalog:

        vim conf/sql-client-defaults.yaml
        catalogs:
         - name: hive_catalog
           type: hive
           hive-conf-dir: /disk0/soft/hive-conf/ #該目錄需要包hive-site.xml文件

        創建 Kafka 表


        CREATE TABLE hive_catalog.flink_db.kfk_fact_bill_master_12 (  
           master Row

        Flink 事實表與 Hive 最新分區數據關聯

        dim_extend_shop_info 是 Hive 中已存在的表,所以我們用 table hint 動態地開啟維表參數。


        CREATE VIEW IF NOT EXISTS hive_catalog.flink_db.view_fact_bill_master as  
        SELECT * FROM  
        (select t1.*, t2.group_id, t2.shop_id, t2.group_name, t2.shop_name, t2.brand_id,  
            ROW_NUMBER() OVER (PARTITION BY groupID, shopID, orderKey ORDER BY actionTime desc) rn  
           from hive_catalog.flink_db.kfk_fact_bill_master_12 t1  
              JOIN hive_catalog.flink_db.dim_extend_shop_info  
         + OPTIONS('streaming-source.enable'='true',  
            'streaming-source.partition.include' = 'latest',  
            'streaming-source.monitor-interval' = '1 h',
            'streaming-source.partition-order' = 'partition-name')
           FOR SYSTEM_TIME AS OF t1.proctime AS t2 --時態表  
           ON t1.groupID = t2.group_id and t1.shopID = t2.shop_id  
           where groupID in (202042)) t  where t.rn = 1

        參數解釋:

        streaming-source.enable 開啟流式讀取 Hive 數據。

        streaming-source.partition.include 有以下兩個值:

        latest 屬性: 只讀取最新分區數據。all: 讀取全量分區數據 ,默認值為 all,表示讀所有分區,latest 只能用在 temporal join 中,用于讀取最新分區作為維表,不能直接讀取最新分區數據。

        streaming-source.monitor-interval 監聽新分區生成的時間、不宜過短 、最短是1 個小時,因為目前的實現是每個 task 都會查詢 metastore,高頻的查可能會對metastore 產生過大的壓力。需要注意的是,1.12.1 放開了這個限制,但仍建議按照實際業務不要配個太短的 interval。

        streaming-source.partition-order 分區策略,主要有以下 3 種,其中最為推薦的是 partition-name:

        partition-name 使用默認分區名稱順序加載最新分區create-time 使用分區文件創建時間順序partition-time 使用分區時間順序六、Flink 狀態管理

        我們前面寫的 wordcount 的例子,沒有包含狀態管理。如果一個task在處理過程中掛掉了,那么它在內存中的狀態都會丟失,所有的數據都需要重新計算。從容錯和消息處理的語義上(at least once, exactly once),Flink引入了state和checkpoint。

        因此可以說flink因為引入了state和checkpoint所以才支持的exactly once

        首先區分一下兩個概念:

        state:

        state一般指一個具體的task/operator的狀態:

        state數據默認保存在java的堆內存中,TaskManage節點的內存中。

        operator表示一些算子在運行的過程中會產生的一些中間結果。

        checkpoint:

        checkpoint可以理解為checkpoint是把state數據定時持久化存儲了,則表示了一個Flink Job在一個特定時刻的一份全局狀態快照,即包含了所有task/operator的狀態。

        注意:task(subTask)是Flink中執行的基本單位。operator指算子(transformation)

        State可以被記錄,在失敗的情況下數據還可以恢復。

        Flink中有兩種基本類型的State:

        Keyed State

        Operator State

        Keyed State和Operator State,可以以兩種形式存在:

        原始狀態(raw state)

        托管狀態(managed state)

        托管狀態是由Flink框架管理的狀態。

        我們說operator算子保存了數據的中間結果,中間結果保存在什么類型中,如果我們這里是托管狀態,則由flink框架自行管理

        原始狀態由用戶自行管理狀態具體的數據結構,框架在做checkpoint的時候,使用byte[]來讀寫狀態內容,對其內部數據結構一無所知。

        通常在DataStream上的狀態推薦使用托管的狀態,當實現一個用戶自定義的operator時,會使用到原始狀態。

        1. State-Keyed State

        基于KeyedStream上的狀態。這個狀態是跟特定的key綁定的,對KeyedStream流上的每一個key,都對應一個state,比如:stream.keyBy(…)。KeyBy之后的Operator State,可以理解為分區過的Operator State。

        保存state的數據結構:

        ValueState:即類型為T的單值狀態。這個狀態與對應的key綁定,是最簡單的狀態了。它可以通過update方法更新狀態值,通過value()方法獲取狀態值。

        ListState:即key上的狀態值為一個列表。可以通過add方法往列表中附加值;也可以通過get()方法返回一個Iterable來遍歷狀態值。

        ReducingState:這種狀態通過用戶傳入的reduceFunction,每次調用add方法添加值的時候,會調用reduceFunction,最后合并到一個單一的狀態值。

        MapState

        需要注意的是,以上所述的State對象,僅僅用于與狀態進行交互(更新、刪除、清空等),而真正的狀態值,有可能是存在內存、磁盤、或者其他分布式存儲系統中。相當于我們只是持有了這個狀態的句柄。

        1. ValueState

        使用ValueState保存中間結果對下面數據進行分組求和。

        開發步驟:

        1. 獲取流處理執行環境
         2. 加載數據源
         3. 數據分組
         4. 數據轉換,定義ValueState,保存中間結果
         5. 數據打印
         6. 觸發執行

        ValueState:測試數據源:

        List(
          (1L, 4L),
          (2L, 3L),
          (3L, 1L),
          (1L, 2L),
          (3L, 2L),
          (1L, 2L),
          (2L, 2L),
          (2L, 9L)
        )

        示例代碼:

        import org.apache.flink.api.common.functions.RichFlatMapFunction
        import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
        import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
        import org.apache.flink.configuration.Configuration
        import org.apache.flink.streaming.api.scala._
        import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
        import org.apache.flink.util.Collector
        object TestKeyedState {
         class CountWithKeyedState extends RichFlatMapFunction[(Long, Long), (Long, Long)] {
           *
            * ValueState狀態句柄. 第一個值為count,第二個值為sum。
           
           private var sum: ValueState[(Long, Long)] = _
           override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {
             // 獲取當前狀態值
             val tmpCurrentSum: (Long, Long) = sum.value
             // 狀態默認值
             val currentSum = if (tmpCurrentSum != null) {
               tmpCurrentSum
             } else {
               (0L, 0L)
             }
             // 更新
             val newSum = (currentSum._1 + 1, currentSum._2 + input._2)
             // 更新狀態值
             sum.update(newSum)
             // 如果count >=3 清空狀態值,重新計算
             if (newSum._1 >= 3) {
               out.collect((input._1, newSum._2 / newSum._1))
               sum.clear()
             }
           }
           override def open(parameters: Configuration): Unit = {
             sum = getRuntimeContext.getState(
               new ValueStateDescriptor[(Long, Long)]("average", // 狀態名稱
                 TypeInformation.of(new TypeHint[(Long, Long)](){}) )// 狀態類型
             )
           }
         }  
         def main(args: Array[String]): Unit = {
           //初始化執行環境
           val env = StreamExecutionEnvironment.getExecutionEnvironment
           //構建數據源
           val inputStream: DataStream[(Long, Long)] = env.fromCollection(
             List(
               (1L, 4L),
               (2L, 3L),
               (3L, 1L),
               (1L, 2L),
               (3L, 2L),
               (1L, 2L),
               (2L, 2L),
               (2L, 9L))
           )
           //執行數據處理
           inputStream.keyBy(0)
             .flatMap(new CountWithKeyedState)
             .setParallelism(1)
             .print
           //運行任務
           env.execute
         }
        }  
        2. MapState

        使用MapState保存中間結果對下面數據進行分組求和:

        1. 獲取流處理執行環境
         2. 加載數據源
         3. 數據分組
         4. 數據轉換,定義MapState,保存中間結果
         5. 數據打印
         6. 觸發執行

        MapState:測試數據源:

        List(
          ("java", 1),
          ("python", 3),
          ("java", 2),
          ("scala", 2),
          ("python", 1),
          ("java", 1),
          ("scala", 2)
        )  

        示例代碼:

        object MapState {
         def main(args: Array[String]): Unit = {
           val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
           env.setParallelism(1)
           *
             * 使用MapState保存中間結果對下面數據進行分組求和
             * 1.獲取流處理執行環境
             * 2.加載數據源
             * 3.數據分組
             * 4.數據轉換,定義MapState,保存中間結果
             * 5.數據打印
             * 6.觸發執行
             
           val source: DataStream[(String, Int)] = env.fromCollection(List(
             ("java", 1),
             ("python", 3),
             ("java", 2),
             ("scala", 2),
             ("python", 1),
             ("java", 1),
             ("scala", 2)))
         
           source.keyBy(0)
             .map(new RichMapFunction[(String, Int), (String, Int)] {
               var mste: MapState[String, Int] = _
               override def open(parameters: Configuration): Unit = {
                 val msState = new MapStateDescriptor[String, Int]("ms",
                   TypeInformation.of(new TypeHint[(String)] {}),
                   TypeInformation.of(new TypeHint[(Int)] {}))
                 mste = getRuntimeContext.getMapState(msState)
               }
               override def map(value: (String, Int)): (String, Int) = {
                 val i: Int = mste.get(value._1)
                 mste.put(value._1, value._2 + i)
                 (value._1, value._2 + i)
               }
             }).print()
           env.execute()
         }
        }  
        2. State-Operator State

        與Key無關的State,與Operator綁定的state,整個operator只對應一個state。

        保存state的數據結構:

        ListState

        舉例來說,Flink中的 Kafka Connector,就使用了operator state。它會在每個connector實例中,保存該實例中消費topic的所有(partition, offset)映射。

        步驟:

        獲取執行環境

        設置檢查點機制:路徑,重啟策略

        自定義數據源

        需要繼承并行數據源和CheckpointedFunction設置listState,通過上下文對象context獲取數據處理,保留offset制作快照

        數據打印

        觸發執行

        示例代碼:

        import java.util
        import org.apache.flink.api.common.restartstrategy.RestartStrategies
        import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
        import org.apache.flink.api.common.time.Time
        import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
        import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
        import org.apache.flink.runtime.state.filesystem.FsStateBackend
        import org.apache.flink.streaming.api.CheckpointingMode
        import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
        import org.apache.flink.streaming.api.environment.CheckpointConfig
        import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
        import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
        import org.apache.flink.streaming.api.scala._
        object ListOperate {
         def main(args: Array[String]): Unit = {
           val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
           env.setParallelism(1)
           env.enableCheckpointing(5000)
           env.setStateBackend(new FsStateBackend("hdfs://node01:8020/tmp/check/8"))
           env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
           env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
           env.getCheckpointConfig.setCheckpointTimeout(60000)
           env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
           env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
           //重啟策略
           env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(1), Time.seconds(5)))
           //模擬kakfa偏移量
           env.addSource(new MyRichParrelSourceFun)
             .print()
           env.execute()
         }
        }
        class MyRichParrelSourceFun extends RichParallelSourceFunction[String]
         with CheckpointedFunction {
         var listState: ListState[Long] = _
         var offset: Long = 0L
         //任務運行
         override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
           val iterState: util.Iterator[Long] = listState.get().iterator()
           while (iterState.hasNext) {
             offset = iterState.next()
           }
           while (true) {
             offset += 1
             ctx.collect("offset:"+offset)
             Thread.sleep(1000)
             if(offset > 10){
               1/0
             }
           }
         }
         //取消任務
         override def cancel(): Unit = ???
         //制作快照
         override def snapshotState(context: FunctionSnapshotContext): Unit = {
           listState.clear()
           listState.add(offset)
         }
         //初始化狀態
         override def initializeState(context: FunctionInitializationContext): Unit = {
           listState = context.getOperatorStateStore.getListState(new ListStateDescriptor[Long](
             "listState", TypeInformation.of(new TypeHint[Long] {})
           ))
         }
        }
        3. Broadcast State

        Broadcast State 是 Flink 1.5 引入的新特性。在開發過程中,如果遇到需要下發/廣播配置、規則等低吞吐事件流到下游所有 task 時,就可以使用 Broadcast State 特性。下游的 task 接收這些配置、規則并保存為 BroadcastState, 將這些配置應用到另一個數據流的計算中 。

        1) API介紹

        通常,我們首先會創建一個Keyed或Non-Keyed的Data Stream,然后再創建一個Broadcasted Stream,最后通過Data Stream來連接(調用connect方法)到Broadcasted Stream上,這樣實現將Broadcast State廣播到Data Stream下游的每個Task中。

        如果Data Stream是Keyed Stream,則連接到Broadcasted Stream后,添加處理ProcessFunction時需要使用KeyedBroadcastProcessFunction來實現,下面是KeyedBroadcastProcessFunction的API,代碼如下所示:

        public abstract class KeyedBroadcastProcessFunction

        上面泛型中的各個參數的含義,說明如下:

        KS:表示Flink程序從最上游的Source Operator開始構建Stream,當調用keyBy時所依賴的Key的類型;IN1:表示非Broadcast的Data Stream中的數據記錄的類型;IN2:表示Broadcast Stream中的數據記錄的類型;OUT:表示經過KeyedBroadcastProcessFunction的processElement()和processBroadcastElement()方法處理后輸出結果數據記錄的類型。

        如果Data Stream是Non-Keyed Stream,則連接到Broadcasted Stream后,添加處理ProcessFunction時需要使用BroadcastProcessFunction來實現,下面是BroadcastProcessFunction的API,代碼如下所示:

        public abstract class BroadcastProcessFunction

        上面泛型中的各個參數的含義,與前面KeyedBroadcastProcessFunction的泛型類型中的后3個含義相同,只是沒有調用keyBy操作對原始Stream進行分區操作,就不需要KS泛型參數。

        注意事項:

        Broadcast State 是Map類型,即K-V類型。

        Broadcast State 只有在廣播一側的方法中processBroadcastElement可以修改;在非廣播一側方法中processElement只讀。

        Broadcast State在運行時保存在內存中。

        2) 場景舉例

        動態更新計算規則: 如事件流需要根據最新的規則進行計算,則可將規則作為廣播狀態廣播到下游Task中。

        實時增加額外字段: 如事件流需要實時增加用戶的基礎信息,則可將用戶的基礎信息作為廣播狀態廣播到下游Task中。

        七、Flink的容錯1. Checkpoint介紹

        checkpoint機制是Flink可靠性的基石,可以保證Flink集群在某個算子因為某些原因(如 異常退出)出現故障時,能夠將整個應用流圖的狀態恢復到故障之前的某一狀態,保 證應用流圖狀態的一致性。Flink的checkpoint機制原理來自“Chandy-Lamport algorithm”算法。

        每個需要checkpoint的應用在啟動時,Flink的JobManager為其創建一個 CheckpointCoordinator(檢查點協調器),CheckpointCoordinator全權負責本應用的快照制作。

        CheckpointCoordinator(檢查點協調器) 周期性的向該流應用的所有source算子發送 barrier(屏障)。

        當某個source算子收到一個barrier時,便暫停數據處理過程,然后將自己的當前狀態制作成快照,并保存到指定的持久化存儲中,最后向CheckpointCoordinator報告自己快照制作情況,同時向自身所有下游算子廣播該barrier,恢復數據處理

        下游算子收到barrier之后,會暫停自己的數據處理過程,然后將自身的相關狀態制作成快照,并保存到指定的持久化存儲中,最后向CheckpointCoordinator報告自身快照情況,同時向自身所有下游算子廣播該barrier,恢復數據處理。

        每個算子按照步驟3不斷制作快照并向下游廣播,直到最后barrier傳遞到sink算子,快照制作完成。

        當CheckpointCoordinator收到所有算子的報告之后,認為該周期的快照制作成功; 否則,如果在規定的時間內沒有收到所有算子的報告,則認為本周期快照制作失敗。

        如果一個算子有兩個輸入源,則暫時阻塞先收到barrier的輸入源,等到第二個輸入源相 同編號的barrier到來時,再制作自身快照并向下游廣播該barrier。具體如下圖所示:

        假設算子C有A和B兩個輸入源

        在第i個快照周期中,由于某些原因(如處理時延、網絡時延等)輸入源A發出的 barrier 先到來,這時算子C暫時將輸入源A的輸入通道阻塞,僅收輸入源B的數據。

        當輸入源B發出的barrier到來時,算子C制作自身快照并向 CheckpointCoordinator 報告自身的快照制作情況,然后將兩個barrier合并為一個,向下游所有的算子廣播。

        當由于某些原因出現故障時,CheckpointCoordinator通知流圖上所有算子統一恢復到某個周期的checkpoint狀態,然后恢復數據流處理。分布式checkpoint機制保證了數據僅被處理一次(Exactly Once)。

        2. 持久化存儲1) MemStateBackend

        該持久化存儲主要將快照數據保存到JobManager的內存中,僅適合作為測試以及快照的數據量非常小時使用,并不推薦用作大規模商業部署。

        MemoryStateBackend 的局限性:

        默認情況下,每個狀態的大小限制為 5 MB。可以在MemoryStateBackend的構造函數中增加此值。

        無論配置的最大狀態大小如何,狀態都不能大于akka幀的大小(請參閱配置)。

        聚合狀態必須適合 JobManager 內存。

        建議MemoryStateBackend 用于:

        本地開發和調試。

        狀態很少的作業,例如僅包含一次記錄功能的作業(Map,FlatMap,Filter,...),kafka的消費者需要很少的狀態。

        2) FsStateBackend

        該持久化存儲主要將快照數據保存到文件系統中,目前支持的文件系統主要是 HDFS和本地文件。如果使用HDFS,則初始化FsStateBackend時,需要傳入以 “hdfs://”開頭的路徑(即: new FsStateBackend("hdfs:///hacluster/checkpoint")), 如果使用本地文件,則需要傳入以“file://”開頭的路徑(即:new FsStateBackend("file:///Data"))。在分布式情況下,不推薦使用本地文件。如果某 個算子在節點A上失敗,在節點B上恢復,使用本地文件時,在B上無法讀取節點 A上的數據,導致狀態恢復失敗。

        建議FsStateBackend:

        具有大狀態,長窗口,大鍵 / 值狀態的作業。

        所有高可用性設置。

        3) RocksDBStateBackend

        RocksDBStatBackend介于本地文件和HDFS之間,平時使用RocksDB的功能,將數 據持久化到本地文件中,當制作快照時,將本地數據制作成快照,并持久化到 FsStateBackend中(FsStateBackend不必用戶特別指明,只需在初始化時傳入HDFS 或本地路徑即可,如new RocksDBStateBackend("hdfs:///hacluster/checkpoint")或new RocksDBStateBackend("file:///Data"))。

        如果用戶使用自定義窗口(window),不推薦用戶使用RocksDBStateBackend。在自定義窗口中,狀態以ListState的形式保存在StatBackend中,如果一個key值中有多個value值,則RocksDB讀取該種ListState非常緩慢,影響性能。用戶可以根據應用的具體情況選擇FsStateBackend+HDFS或RocksStateBackend+HDFS。

        4) 語法val env = StreamExecutionEnvironment.getExecutionEnvironment()
        // start a checkpoint every 1000 ms
        env.enableCheckpointing(1000)
        // advanced options:
        // 設置checkpoint的執行模式,最多執行一次或者至少執行一次
        env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
        // 設置checkpoint的超時時間
        env.getCheckpointConfig.setCheckpointTimeout(60000)
        // 如果在只做快照過程中出現錯誤,是否讓整體任務失敗:true是  false不是
        env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(false)
        //設置同一時間有多少 個checkpoint可以同時執行
        env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
        5) 修改State Backend的兩種方式

        第一種:單任務調整

        修改當前任務代碼

        env.setStateBackend(new FsStateBackend("hdfs://namenode:9000/flink/checkpoints"));

        或者new MemoryStateBackend()

        或者new RocksDBStateBackend(filebackend, true);【需要添加第三方依賴】

        第二種:全局調整

        修改flink-conf.yaml

        state.backend: filesystem

        state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints

        注意:state.backend的值可以是下面幾種:jobmanager(MemoryStateBackend), filesystem(FsStateBackend), rocksdb(RocksDBStateBackend)

        6) Checkpoint的高級選項

        默認checkpoint功能是disabled的,想要使用的時候需要先啟用checkpoint開啟之后,默認的checkPointMode是Exactly-once

        //配置一秒鐘開啟一個checkpoint
        env.enableCheckpointing(1000)
        //指定checkpoint的執行模式
        //兩種可選:
        //CheckpointingMode.EXACTLY_ONCE:默認值
        //CheckpointingMode.AT_LEAST_ONCE
        env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
        一般情況下選擇CheckpointingMode.EXACTLY_ONCE,除非場景要求極低的延遲(幾毫秒)
        注意:如果需要保證EXACTLY_ONCE,source和sink要求必須同時保證EXACTLY_ONCE
        //如果程序被cancle,保留以前做的checkpoint
        env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
        默認情況下,檢查點不被保留,僅用于在故障中恢復作業,可以啟用外部持久化檢查點,同時指定保留策略:
        ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:在作業取消時保留檢查點,注意,在這種情況下,您必須在取消后手動清理檢查點狀態
        ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:當作業在被cancel時,刪除檢查點,檢查點僅在作業失敗時可用
        //設置checkpoint超時時間
        env.getCheckpointConfig.setCheckpointTimeout(60000)
        //Checkpointing的超時時間,超時時間內沒有完成則被終止
        //Checkpointing最小時間間隔,用于指定上一個checkpoint完成之后
        //最小等多久可以觸發另一個checkpoint,當指定這個參數時,maxConcurrentCheckpoints的值為1
        env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
        //設置同一個時間是否可以有多個checkpoint執行
        env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
        指定運行中的checkpoint最多可以有多少個
        env.getCheckpointConfig.setFailOnCheckpointingErrors(true)
        用于指定在checkpoint發生異常的時候,是否應該fail該task,默認是true,如果設置為false,則task會拒絕checkpoint然后繼續運行
        2. Flink的重啟策略

        Flink支持不同的重啟策略,這些重啟策略控制著job失敗后如何重啟。集群可以通過默認的重啟策略來重啟,這個默認的重啟策略通常在未指定重啟策略的情況下使用,而如果Job提交的時候指定了重啟策略,這個重啟策略就會覆蓋掉集群的默認重啟策略。

        1) 概覽

        默認的重啟策略是通過Flink的 flink-conf.yaml 來指定的,這個配置參數 restart-strategy 定義了哪種策略會被采用。如果checkpoint未啟動,就會采用 no restart 策略,如果啟動了checkpoint機制,但是未指定重啟策略的話,就會采用 fixed-delay 策略,重試 Integer.MAX_VALUE 次。請參考下面的可用重啟策略來了解哪些值是支持的。

        每個重啟策略都有自己的參數來控制它的行為,這些值也可以在配置文件中設置,每個重啟策略的描述都包含著各自的配置值信息。

        重啟策略重啟策略值Fixed delayfixed-delayFailure ratefailure-rateNo restartNone

        除了定義一個默認的重啟策略之外,你還可以為每一個Job指定它自己的重啟策略,這個重啟策略可以在 ExecutionEnvironment 中調用 setRestartStrategy() 方法來程序化地調用,注意這種方式同樣適用于 StreamExecutionEnvironment。

        下面的例子展示了如何為Job設置一個固定延遲重啟策略,一旦有失敗,系統就會嘗試每10秒重啟一次,重啟3次。

        val env = ExecutionEnvironment.getExecutionEnvironment()
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
         3, // 重啟次數
         Time.of(10, TimeUnit.SECONDS) // 延遲時間間隔
        ))
        2) 固定延遲重啟策略(Fixed Delay Restart Strategy)

        固定延遲重啟策略會嘗試一個給定的次數來重啟Job,如果超過了最大的重啟次數,Job最終將失敗。在連續的兩次重啟嘗試之間,重啟策略會等待一個固定的時間。

        重啟策略可以配置flink-conf.yaml的下面配置參數來啟用,作為默認的重啟策略:

        restart-strategy: fixed-delay
        配置參數描述默認值restart-strategy.fixed-delay.attempts在Job最終宣告失敗之前,Flink嘗試執行的次數1,如果啟用checkpoint的話是Integer.MAX_VALUErestart-strategy.fixed-delay.delay延遲重啟意味著一個執行失敗之后,并不會立即重啟,而是要等待一段時間。akka.ask.timeout,如果啟用checkpoint的話是1s

        例子:

        restart-strategy.fixed-delay.attempts: 3
        restart-strategy.fixed-delay.delay: 10 s

        固定延遲重啟也可以在程序中設置:

        val env = ExecutionEnvironment.getExecutionEnvironment()
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
         3, // 重啟次數
         Time.of(10, TimeUnit.SECONDS) // 重啟時間間隔
        ))
        3) 失敗率重啟策略

        失敗率重啟策略在Job失敗后會重啟,但是超過失敗率后,Job會最終被認定失敗。在兩個連續的重啟嘗試之間,重啟策略會等待一個固定的時間。

        <上一頁  1  2  3  4  下一頁>  余下全文
        聲明: 本文由入駐維科號的作者撰寫,觀點僅代表作者本人,不代表OFweek立場。如有侵權或其他問題,請聯系舉報。

        發表評論

        0條評論,0人參與

        請輸入評論內容...

        請輸入評論/評論長度6~500個字

        您提交的評論過于頻繁,請輸入驗證碼繼續

        暫無評論

        暫無評論

          掃碼關注公眾號
          OFweek人工智能網
          獲取更多精彩內容
          文章糾錯
          x
          *文字標題:
          *糾錯內容:
          聯系郵箱:
          *驗 證 碼:

          粵公網安備 44030502002758號

          主站蜘蛛池模板: 国产九九在线视频| 亚洲V色| 北流市| 泰来县| 日本精品视频| 大城县| 亚洲福利| 日韩成人无码| 亭亭五月丁香| 丝袜精品字幕| 亚洲天堂在线播放| 五月综合视频| 91人人操| 中文字幕精品人妻在线| 深水埗区| av动态| 91成人在线播放| 休宁县| 国产网曝门| 1024国产基地| A片入口| 日日艹| 亚洲精品乱码久久久久久蜜桃91| 欧美性交无码| 伊人AV在线| 夜色福利导航| 偷拍99| 日本va欧美ⅴa欧美Va精品 | 99啪啪| 中文字幕日产乱码中| 超碰在线成人| 久久久久久AV| 福利二区| 亚洲成人av| 日韩无码影院| 性欧美成人18| 91碰碰| jizz免费| 黄平县| 亚洲熟女豪乳视频| 宜城市|