国产无码免费,人妻口爆,国产V在线,99中文精品7,国产成人无码AA精品一,制度丝袜诱惑av,久久99免费麻辣视频,蜜臀久久99精品久久久久久酒店
        訂閱
        糾錯
        加入自媒體

        Flink未來將與 Pulsar集成提供大規模的彈性數據處理

        未來整合

        Pulsar可以以不同的方式與Apache Flink集成。一些潛在的集成包括使用流式連接器為流式工作負載提供支持,并使用批量源連接器支持批量工作負載。Pulsar還提供對schema 的本地支持,可以與Flink集成并提供對數據的結構化訪問,例如使用Flink SQL作為在Pulsar中查詢數據的方式。最后,集成這些技術的另一種方法可能包括使用Pulsar作為Flink的狀態后端。由于Pulsar具有分層架構(Streams和Segmented Streams,由Apache Bookkeeper提供支持),因此將Pulsar用作存儲層并存儲Flink狀態變得很自然。

        從體系結構的角度來看,我們可以想象兩個框架之間的集成,它使用Apache Pulsar作為統一的數據層視圖,Apache Flink作為統一的計算和數據處理框架和API。

        現有集成

        兩個框架之間的集成正在進行中,開發人員已經可以通過多種方式將Pulsar與Flink結合使用。例如,Pulsar可用作Flink DataStream應用程序中的流媒體源和流式接收器。開發人員可以將Pulsar中的數據提取到Flink作業中,該作業可以計算和處理實時數據,然后將數據作為流式接收器發送回Pulsar主題。這樣的例子如下所示:

        // create and configure Pulsar consumer

        PulsarSourceBuilder<String>builder = PulsarSourceBuilder

        .builder(new SimpleStringSchema())

        .serviceUrl(serviceUrl)

        .topic(inputTopic)

        .subscriptionName(subscription);

        SourceFunction<String> src = builder.build();

        // ingest DataStream with Pulsar consumer

        DataStream<String> words = env.addSource(src);

        // perform computation on DataStream (here a simple WordCount)

        DataStream<WordWithCount> wc = words

        .flatMap((FlatMapFunction<String, WordWithCount>) (word, collector) -> {

        collector.collect(new WordWithCount(word, 1));

        })

        .returns(WordWithCount.class)

        .keyBy("word")

        .timeWindow(Time.seconds(5))

        .reduce((ReduceFunction<WordWithCount>) (c1, c2) ->

        new WordWithCount(c1.word, c1.count + c2.count));

        // emit result via Pulsar producer

        wc.addSink(new FlinkPulsarProducer<>(

        serviceUrl,

        outputTopic,

        new AuthenticationDisabled(),

        wordWithCount -> wordWithCount.toString().getBytes(UTF_8),

        wordWithCount -> wordWithCount.word)

        );

        開發人員可以利用的兩個框架之間的另一個集成包括將Pulsar用作Flink SQL或Table API查詢的流式源和流式表接收器,如下例所示:

        // obtain a DataStream with words

        DataStream<String> words = ...

        // register DataStream as Table "words" with two attributes ("word", "ts").

        //   "ts" is an event-time timestamp.

        tableEnvironment.registerDataStream("words", words, "word, ts.rowtime");

        // create a TableSink that produces to Pulsar

        TableSink sink = new PulsarJsonTableSink(

        serviceUrl,

        outputTopic,

        new AuthenticationDisabled(),

        ROUTING_KEY);

        // register Pulsar TableSink as table "wc"

        tableEnvironment.registerTableSink(

        "wc",

        sink.configure(

        new String[]{"word", "cnt"},

        new TypeInformation[]{Types.STRING, Types.LONG}));

        // count words per 5 seconds and write result to table "wc"

        tableEnvironment.sqlUpdate(

        "INSERT INTO wc " +

        "SELECT word, COUNT(*) AS cnt " +

        "FROM words " +

        "GROUP BY word, TUMBLE(ts, INTERVAL '5' SECOND)");

        最后,Flink將批量工作負載與Pulsar集成為批處理接收器,其中所有結果在Apache Flink完成靜態數據集中的計算后被推送到Pulsar。這樣的例子如下所示:

        // obtain DataSet from arbitrary computation

        DataSet<WordWithCount> wc = ...

        // create PulsarOutputFormat instance

        OutputFormat pulsarOutputFormat = new PulsarOutputFormat(

        serviceUrl,

        topic,

        new AuthenticationDisabled(),

        wordWithCount -> wordWithCount.toString().getBytes());

        // write DataSet to Pulsar

        wc.output(pulsarOutputFormat);

        結論

        Pulsar和Flink都對應用程序的數據和計算級別如何以批量作為特殊情況流“流式傳輸”方式分享了類似的觀點。通過Pulsar的Segmented Streams方法和Flink在一個框架下統一批處理和流處理工作負載的步驟,有許多方法將這兩種技術集成在一起,以提供大規模的彈性數據處理。

        <上一頁  1  2  
        聲明: 本文由入駐維科號的作者撰寫,觀點僅代表作者本人,不代表OFweek立場。如有侵權或其他問題,請聯系舉報。

        發表評論

        0條評論,0人參與

        請輸入評論內容...

        請輸入評論/評論長度6~500個字

        您提交的評論過于頻繁,請輸入驗證碼繼續

        暫無評論

        暫無評論

          人工智能 獵頭職位 更多
          掃碼關注公眾號
          OFweek人工智能網
          獲取更多精彩內容
          文章糾錯
          x
          *文字標題:
          *糾錯內容:
          聯系郵箱:
          *驗 證 碼:

          粵公網安備 44030502002758號

          主站蜘蛛池模板: 安福县| 婷婷有码| 国产免费制服丝袜调教视频| 久久精品夜色噜噜亚洲A∨| 含山县| 中文字幕在线高清| 大香蕉一区二区三区| 中文字幕少妇人妻| 色色午夜天| 国产丝袜在线播放| 51精品视频| 国产家庭乱伦| 中文字幕在线高清| 亚洲看片| 欧美精品在线视频| 亚洲成人在线网址| 油尖旺区| 看亚洲一级黄色片啪啪啪| AV在线影院| 宾川县| 平原县| 黑人巨大超大另类videos| 制服丝袜亚洲在线| 91成人社区| 国产超碰在线| 伊人久久av| 无码少妇人妻| 18禁久久久久久久| 国产精品电影久久| 亚洲黄色成人网站| 狼友福利网| 精品3p| 聊城市| 男人天堂2019| 故城县| 亚洲熟女VS国产对比| 建始县| 99国产在线| 人妻少妇精品| 超碰福利导航| 人妻中文第二页|