{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "import $ivy.`org.apache.spark::spark-sql:2.4.0`\n",
    "\n",
    "import org.apache.log4j.{Level, Logger}\n",
    "Logger.getLogger(\"org\").setLevel(Level.OFF)\n",
    "\n",
    "import org.apache.spark.sql._\n",
    "\n",
    "val spark = {\n",
    "  NotebookSparkSession.builder()\n",
    "    .master(\"local[*]\")\n",
    "    .getOrCreate()\n",
    "}"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def sc = spark.sparkContext\n",
    "\n",
    "val rdd = sc.parallelize(1 to 100000000, 100)\n",
    "\n",
    "val n = rdd.map(_ + 1).sum()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "val lines = sc.textFile(\"crime.csv\")\n",
    "\n",
    "val lineLengths = lines.map(s => s.length)\n",
    "\n",
    "val totalLength = lineLengths.reduce((a, b) => a + b)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "val lines = sc.textFile(\"crime.csv\").cache()\n",
    "\n",
    "val totalLength = lines\n",
    "    .map(s => s.length)\n",
    "    .reduce((a, b) => a + b)\n",
    "\n",
    "val counts = lines\n",
    "    .map(s => (s, 1))\n",
    "    .reduceByKey((a, b) => a + b).take(10)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "val crimeFacts = spark\n",
    "    .read\n",
    "    .option(\"header\", \"true\")\n",
    "    .option(\"inferSchema\", \"true\")\n",
    "    .csv(\"crime.csv\")\n",
    "\n",
    "crimeFacts.show"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "crimeFacts.printSchema"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "crimeFacts.createOrReplaceTempView(\"crimes\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "spark.sql(\"select INCIDENT_NUMBER, DISTRICT from crimes limit 10\").show"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "import spark.implicits._\n",
    "\n",
    "case class Crime (\n",
    "    INCIDENT_NUMBER: String,\n",
    "    OFFENSE_CODE: Int,\n",
    "    OFFENSE_CODE_GROUP: String,\n",
    "    OFFENSE_DESCRIPTION: String,\n",
    "    DISTRICT: String,\n",
    "    REPORTING_AREA: String,\n",
    "    SHOOTING: String,\n",
    "    OCCURRED_ON_DATE: String,\n",
    "    YEAR: Int,\n",
    "    MONTH: Int,\n",
    "    DAY_OF_WEEK: String,\n",
    "    HOUR: Int,\n",
    "    UCR_PART: String,\n",
    "    STREET: String,\n",
    "    Lat: Double,\n",
    "    Long: Double,\n",
    "    Location: String\n",
    ")\n",
    "\n",
    "crimeFacts.as[Crime].filter(_.OFFENSE_DESCRIPTION == \"VANDALISM\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "val offenseCodes = spark.read.option(\"header\", \"true\").option(\"inferSchema\", \"true\").csv(\"offense_codes.csv\")\n",
    "offenseCodes.show"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import spark.implicits._\n",
    "\n",
    "crimeFacts\n",
    "    .join(offenseCodes, $\"CODE\" === $\"OFFENSE_CODE\")\n",
    "    .filter($\"NAME\".startsWith(\"ROBBERY\"))\n",
    "    .groupBy($\"NAME\")\n",
    "    .count()\n",
    "    .orderBy($\"count\".desc)\n",
    "    .show(false)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Scala",
   "language": "scala",
   "name": "scala"
  },
  "language_info": {
   "codemirror_mode": "text/x-scala",
   "file_extension": ".scala",
   "mimetype": "text/x-scala",
   "name": "scala",
   "nbconvert_exporter": "script",
   "version": "2.12.8"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
