Переглянути джерело

Add an worksheet with spark dataframes

theenglishway (time) 2 роки тому
батько
коміт
ba9c041b4f
2 змінених файлів з 62 додано та 1 видалено
  1. 5 1
      build.sbt
  2. 57 0
      src/main/scala/spark.worksheet.sc

+ 5 - 1
build.sbt

@@ -25,10 +25,14 @@ version := "1.0"
 // Want to use a published library in your project?
 // You can define other libraries as dependencies in your build like this:
 
+val sparkVersion = "3.4.1"
+
 libraryDependencies := List(
     "com.lihaoyi" %% "requests" % "0.8.0",
     "com.lihaoyi" %% "upickle" % "3.1.0",
-    "com.lihaoyi" %% "os-lib" % "0.9.1"
+    "com.lihaoyi" %% "os-lib" % "0.9.1",
+    "org.apache.spark" %% "spark-core" % sparkVersion,
+    "org.apache.spark" %% "spark-sql" % sparkVersion,
 )
 
 // Here, `libraryDependencies` is a set of dependencies, and by using `+=`,

+ 57 - 0
src/main/scala/spark.worksheet.sc

@@ -0,0 +1,57 @@
+import org.apache.spark.sql.SparkSession
+
+val spark = SparkSession
+  .builder()
+  .appName("balldontlie")
+  .config("spark.master", "local")
+  .getOrCreate()
+
+import spark.implicits._
+
+def readInput(filePath: String) = {
+  spark.read.option("multiline", "true").json(filePath)
+}
+
+val games_df = readInput("games.json")
+val stats_df = readInput("stats.json")
+games_df.describe().show()
+
+games_df.printSchema()
+
+val games_df_short = games_df
+  .selectExpr(
+    "id as game_id",
+    "home_team.id as home_team_id",
+    "home_team.full_name as home_team_name",
+    "home_team_score"
+  )
+
+games_df_short.show()
+
+val stats_df_short = stats_df.selectExpr(
+  "id",
+  "game.id as game_id",
+  "player.id as player_id",
+  "concat(player.first_name, ' ', player.last_name) as player_name",
+  "pts",
+  "reb",
+  "ast",
+  "blk"
+)
+stats_df_short.show()
+stats_df_short
+  .groupBy("player_id", "player_name")
+  .sum("pts")
+  .sort($"sum(pts)".desc)
+  .show()
+
+val df_merge = stats_df_short
+  .join(games_df_short, games_df_short("game_id") === stats_df_short("game_id"))
+  .drop(games_df_short("game_id"))
+  .orderBy($"pts".desc)
+
+df_merge.show()
+
+df_merge.write.option("header", true).mode("overwrite").csv("output")
+
+// spark.stop()