| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657 |
- import org.apache.spark.sql.SparkSession
- val spark = SparkSession
- .builder()
- .appName("balldontlie")
- .config("spark.master", "local")
- .getOrCreate()
- import spark.implicits._
- def readInput(filePath: String) = {
- spark.read.option("multiline", "true").json(filePath)
- }
- val games_df = readInput("games.json")
- val stats_df = readInput("stats.json")
- games_df.describe().show()
- games_df.printSchema()
- val games_df_short = games_df
- .selectExpr(
- "id as game_id",
- "home_team.id as home_team_id",
- "home_team.full_name as home_team_name",
- "home_team_score"
- )
- games_df_short.show()
- val stats_df_short = stats_df.selectExpr(
- "id",
- "game.id as game_id",
- "player.id as player_id",
- "concat(player.first_name, ' ', player.last_name) as player_name",
- "pts",
- "reb",
- "ast",
- "blk"
- )
- stats_df_short.show()
- stats_df_short
- .groupBy("player_id", "player_name")
- .sum("pts")
- .sort($"sum(pts)".desc)
- .show()
- val df_merge = stats_df_short
- .join(games_df_short, games_df_short("game_id") === stats_df_short("game_id"))
- .drop(games_df_short("game_id"))
- .orderBy($"pts".desc)
- df_merge.show()
- df_merge.write.option("header", true).mode("overwrite").csv("output")
- // spark.stop()
|