国产无码免费,人妻口爆,国产V在线,99中文精品7,国产成人无码AA精品一,制度丝袜诱惑av,久久99免费麻辣视频,蜜臀久久99精品久久久久久酒店
        訂閱
        糾錯
        加入自媒體

        基于Spark的數據分析實踐

        2019-06-19 09:55
        EAWorld
        關注

        三、SparkSQL

        Spark 從 1.3 版本開始原有 SchemaRDD 的基礎上提供了類似Pandas DataFrame API。新的DataFrame API不僅可以大幅度降低普通開發者的學習門檻,同時還支持Scala、Java與Python三種語言。更重要的是,由于脫胎自SchemaRDD,DataFrame天然適用于分布式大數據場景。

        一般的數據處理步驟:讀入數據 -> 對數據進行處理 -> 分析結果  -> 寫入結果

        SparkSQL 結構化數據

        處理結構化數據(如 CSV,JSON,Parquet 等);

        把已經結構化數據抽象成 DataFrame (HiveTable);

        非結構化數據通過 RDD.map.filter 轉換成結構化進行處理;

        按照列式數據庫,只加載非結構化中可結構化的部分列(Hbase,MongoDB);

        處理非結構化數據,不能簡單的用 DataFrame 裝載。而是要用 SparkRDD 把數據讀入,在通過一系列的 Transformer Method 把非結構化的數據加工為結構化,或者過濾到不合法的數據。

        SparkSQL DataFrame

        SparkSQL 中一切都是 DataFrame,all in DataFrame. DataFrame是一種以RDD為基礎的分布式數據集,類似于傳統數據庫中的二維表格。DataFrame與RDD的主要區別在于,前者帶有schema元信息,即DataFrame所表示的二維表數據集的每一列都帶有名稱和類型。如果熟悉 Python Pandas 庫中的 DataFrame 結構,則會對 SparkSQL DataFrame 概念非常熟悉。

        TextFile DataFrame

        import.org.apache.spark.sql._//定義數據的列名稱和類型valdt=StructType(List(id:String,name:String,gender:String,age:Int))
        //導入user_info.csv文件并指定分隔符vallines = sc.textFile("/path/user_info.csv").map(_.split(","))
        //將表結構和數據關聯起來,把讀入的數據user.csv映射成行,構成數據集valrowRDD = lines.map(x=>Row(x(0),x(1),x(2),x(3).toInt))
        //通過SparkSession.createDataFrame()創建表,并且數據表表頭val df= spark.createDataFrame(rowRDD, dt)

        可左右滑動查看代碼

        讀取規則數據文件作為DataFrame

        SparkSession.Builder builder = SparkSession.builder()Builder.setMaster("local").setAppName("TestSparkSQLApp")SparkSession spark = builder.getOrCreate();SQLContext sqlContext = spark.sqlContext();
        # 讀取 JSON 數據,path 可為文件或者目錄valdf=sqlContext.read().json(path);
        # 讀取 HadoopParquet 文件vardf=sqlContext.read().parquet(path);
        # 讀取 HadoopORC 文件vardf=sqlContext.read().orc(path);

        可左右滑動查看代碼

        JSON 文件為每行一個 JSON 對象的文件類型,行尾無須逗號。文件頭也無須[]指定為數組;SparkSQL 讀取是只是按照每行一條 JSON Record序列化;

        Parquet文件

        Configurationconfig = new Configuration();ParquetFileReaderreader = ParquetFileReader.open(        HadoopInputFile.fromPath(new Path("hdfs:///path/file.parquet"),conf));Map<String, String>schema = reader.getFileMetaData().getKeyValueMetaData();String allFields= schema.get("org.apache.spark.sql.parquet.row.metadata");

        可左右滑動查看代碼

        allFiedls 的值就是各字段的名稱和具體的類型,整體是一個json格式進行展示。

        讀取 Hive 表作為 DataFrame

        Spark2 API 推薦通過 SparkSession.Builder 的 Builder 模式創建 SparkContext。 Builder.getOrCreate() 用于創建 SparkSession,SparkSession 是 SparkContext 的封裝。

        在Spark1.6中有兩個核心組件SQLcontext和HiveContext。SQLContext 用于處理在 SparkSQL 中動態注冊的表,HiveContext 用于處理 Hive 中的表。

        從Spark2.0以上的版本開始,spark是使用全新的SparkSession接口代替Spark1.6中的SQLcontext和HiveContext。SQLContext.sql 即可執行 Hive 中的表,也可執行內部注冊的表;

        在需要執行 Hive 表時,只需要在 SparkSession.Builder 中開啟 Hive 支持即可(enableHiveSupport())。

        SparkSession.Builder builder = SparkSession.builder().enableHiveSupport();SparkSession spark = builder.getOrCreate();SQLContext sqlContext = spark.sqlContext();

        可左右滑動查看代碼

        // db 指 Hive 庫中的數據庫名,如果不寫默認為 default

        // tableName 指 hive 庫的數據表名

        sqlContext.sql(“select * from db.tableName”)

        可左右滑動查看代碼

        SparkSQL ThriftServer

        //首先打開 Hive 的 Metastore服務

        hive$bin/hive –-service metastore –p 8093

        可左右滑動查看代碼

        //把 Spark 的相關 jar 上傳到hadoophdfs指定目錄,用于指定sparkonyarn的依賴 jar

        spark$hadoop fs –put jars/*.jar /lib/spark2

        可左右滑動查看代碼

        // 啟動 spark thriftserver 服務

        spark$ sbin/start-thriftserver.sh --master yarn-client --driver-memory 1G --conf spark.yarn.jars=hdfs:///lib/spark2/*.jar

        可左右滑動查看代碼

        當hdfs 上傳了spark 依賴 jar 時,通過spark.yarn.jars 可看到日志 spark 無須每個job 都上傳jar,可節省啟動時間

        19/06/1114:08:26 INFO Client: Source and destination file systems are the same. Notcopying hdfs://localhost:9000/lib/spark2/snappy-java-1.0.5.jar19/06/1114:08:26 INFO Client: Source and destination file systems are the same. Notcopying hdfs://localhost:9000/lib/spark2/snappy-java-1.1.7.3.jar

        可左右滑動查看代碼

        //通過 spark bin 下的 beeline 工具,可以連接到 spark ThriftServer(SparkOnHive)

        bin/beeline -u jdbc:hive2://ip:10000/default -n hadoop

        可左右滑動查看代碼

        -u 是指定 beeline 的執行驅動地址;

        -n 是指定登陸到 spark Session 上的用戶名稱;

        Beeline 還支持傳入-e 可傳入一行 SQL,

        -e <query>                      query that should be executed

        也可通過 –f 指定一個 SQL File,內部可用逗號分隔的多個 SQL(存儲過程)

        -f <exec file>                  script file that should be executed

        SparkSQL Beeline 的執行效果展示

        SparkSQL ThriftServer

        對于 SparkSQL ThriftServer 服務,每個登陸的用戶都有創建的 SparkSession,并且執行的對個 SQL 會通過時間順序列表展示。

        SparkSQL ThriftServer 服務可用于其他支持的數據庫工具創建查詢,也用于第三方的 BI 工具,如 tableau。

        四、SparkSQL Flow

        SparkSQL Flow 是以 SparkSQL 為基礎,開發的統一的基于 XML 配置化的可執行一連串的 SQL 操作,這一連串的 SQL 操作定義為一個 Flow。下文開始 SparkSQL Flow 的介紹:

        SparkSQL Flow 是基于 SparkSQL 開發的一種基于 XML 配置化的 SQL 數據流轉處理模型。該模型簡化了 SparkSQL 、Spark RDD的開發,并且降低開發了難度,適合了解數據業務但無法駕馭大數據以及 Spark 技術的開發者。

        一個由普元技術部提供的基于 SparkSQL 的開發模型;

        一個可二次定制開發的大數據開發框架,提供了靈活的可擴展 API;

        一個提供了 對文件,數據庫,NoSQL 等統一的數據開發視界語義;

        基于 SQL 的開發語言和 XML 的模板配置,支持 Spark UDF 的擴展管理;

        支持基于 Spark Standlone,Yarn,Mesos 資源管理平臺;

        支持開源、華為、星環等平臺統一認證。

        SparkSQL Flow 適合的場景:

        批量 ETL;

        非實時分析服務;

        SparkSQL Flow XML 概覽

        Properties 內定義一組變量,可用于宏替換;

        Methods 內可注冊 udf 和 udaf 兩種函數;

        Prepare 內可定義前置 SQL,用于執行 source 前的 sql 操作;

        Sources 內定義一個到多個數據表視圖;

        Transformer 內可定義 0 到多個基于 SQL 的數據轉換操作(支持 join);

        Targets 用于定義 1 到多個數據輸出;

        After 可定義 0到多個任務日志;

        如你所見,source 的 type 參數用于區分 source 的類型,source 支持的種類直接決定SparkSQL Flow 的數據源加載廣度;并且,根據 type 不同,source 也需要配置不同的參數,如數據庫還需要 driver,url,user和 password 參數。

        Transformer 是基于 source 定的數據視圖可執行的一組轉換 SQL,該 SQL 符合 SparkSQL 的語法(SQL99)。Transform 的 SQL 的執行結果被作為中間表命名為 table_name 指定的值。

        Targets 為定義輸出,table_name 的值需在 source 或者 Transformer 中定義。

        <上一頁  1  2  3  4  下一頁>  
        聲明: 本文由入駐維科號的作者撰寫,觀點僅代表作者本人,不代表OFweek立場。如有侵權或其他問題,請聯系舉報。

        發表評論

        0條評論,0人參與

        請輸入評論內容...

        請輸入評論/評論長度6~500個字

        您提交的評論過于頻繁,請輸入驗證碼繼續

        暫無評論

        暫無評論

          人工智能 獵頭職位 更多
          掃碼關注公眾號
          OFweek人工智能網
          獲取更多精彩內容
          文章糾錯
          x
          *文字標題:
          *糾錯內容:
          聯系郵箱:
          *驗 證 碼:

          粵公網安備 44030502002758號

          主站蜘蛛池模板: 岛国一级| 国产婬妇无码无遮挡A片在线观看| 英山县| 建德市| 精品va在线观看| 三成人免费看| 国产黄色短视频| 天天干天天色综合网| 辰溪县| 亚洲啊v.在线播放| 日韩精品极品视频在线观看免费| 曰批永久免费40分钟免费观看软件 | 国产精品高潮呻吟AV| 济阳县| 精品国模| 蜜臂aV| 丝袜a片| 99在线观看视频| 亚洲AV一卡| 国产成人久久久777777麻豆| 84pao强力打造| 五月丁香影院| 宝清县| 国产传媒淫语对白AV| 久久99嫩草熟妇人妻蜜臀| 德兴市| caopor在线| 本溪市| 抚顺市| 成人无码AV片| A级毛片18以上观看视频免费| 滦平县| 亚洲国产精品成人网站| 尼木县| 井陉县| 中日韩在线| 亚洲成人Av| 欧美3P视频| 91啪在线| 亚欧在线视频| 化隆|