Apache Spark 学習1_2

サンプルソースの解説していこうと思います。

以下、RDDを結合してファイルへ出力するまでです。


// csvファイルからRDD作成するファンクションの作成

def createSaleRDD(csvFile : String) = { val logRDD = sc.textFile(csvFile) logRDD.map 

{ record => val splitRecord = record.split(",") val productId = splitRecord(2) 

val numOfSoId = splitRecord(3).toInt (productId,numOfSoId) 

} }

// csvファイルを読み込み

val salesOctRDD = createSaleRDD("sales-october.csv")

// csvファイルを読み込み

val salesNovRDD = createSaleRDD("sales-november.csv")

import org.apache.spark.rdd.RDD

// 50以上のRDDを作成するファンクション

def createOver50SoldRDD(rdd : RDD[(String, Int)]) = { rdd.reduceByKey(_ + _).filter(_._2 >= 50) }

// 50以上のデータを作成しRRDへ変換

val octOver50SoldRDD = createOver50SoldRDD(salesOctRDD) 

// 50以上のデータを作成しRRDへ変換

val novOver50SoldRDD = createOver50SoldRDD(salesNovRDD) 

// 結合

val bothOver50SoldRDD = octOver50SoldRDD.join(novOver50SoldRDD) bothOver50SoldRDD.collect.foreach(println) 

// RDDへ変更

val over50SoldAndAmountRDD = bothOver50SoldRDD.map { case (productId , (octAmount, novAmount)) => (productId, octAmount + novAmount) }

over50SoldAndAmountRDD.collect.foreach(println) scala.collection.mutable.HashMap

// 必要なクラスをインポート

import java.io.{BufferedReader,InputStreamReader} 

import org.apache.hadoop.conf.Configuration

import org.apache.hadoop.fs.{FileSystem,Path}

import collection.mutable._ 

// HashMapを宣言

val productsMap = new HashMap[String,(String,Int)] 

val hadoopConf = new Configuration 

val fileSystem = FileSystem.get(hadoopConf) 

// csv読み込み

val inputStream = fileSystem.open(new Path("products.csv"))

var line = productsCSVReader.readLine while (line != null)

val splitLine = line.split(",")

// product IDを設定

val productId = splitLine(0)

// プロダクト名を設定

val productName = splitLine(1) 

// 金額を設定

val unitPrice = splitLine(2).toInt 

productsMap(productId) = (productName,unitPrice)

line = productsCSVReader.readLine } 

productsCSVReader.close() 

val broadcastedMap = sc.broadcast(productsMap) 

val resultRDD = over50SoldAndAmountRDD.map

case (productId,amount) => 

val productsMap = broadcastedMap.value

val (productName,unitPrice) = productsMap(productId) 

(productName,amount,amount * unitPrice)

}

resultRDD.collect.foreach(println) 

// 結果を出力

resultRDD.saveAsTextFile("oct-nov-over-50-sold")

管理者オススメのアプリ

簡単に日々のスケジュールを管理できる無料のphoneアプリです。
スケジュール、シフト管理、TODO,お小遣い管理、メモなどいろいろ使用できるアプリ。
メニューの並び替えも自由!


すごい手帳を使ってみる

アクセス数: 無料カウンター




トラックバック(0)

トラックバックURL: http://smartwolf.sakura.ne.jp/Blog/mt-tb.cgi/214

コメントする

ウェブページ

Powered by Movable Type 5.2.7