Checkpoint 3

In this project, you'll extend your SQL runtime to support aggregate queries. It is worth 8 points.

Requirements

Grading Rubric

All tests will be run on dedicated hardware equipped with an Intel(R) Core(TM) i5-3210M CPU @ 2.50GHz with a standard 5400 RPM HDD. Queries will have a per-query timeout as listed below. Grading will be based on total runtime for each batch of queries.
8 randomly-generated queries based on the TPC-H benchmark, with a scale factor of 0.1 (100MB) and templates listed below
Under 80 seconds total: 8 of 8 points + leaderboard ranking
Under 150 seconds total: 8 of 8 points
Under 5 seconds total: 4 of 8 points
Under 1 minute per query: 2 of 8 points
Note in particular that these queries make extensive use of aggregates, equi-joins, order-by, and limit clauses, which will all need to be supported.

UnresolvedFunction

When the Spark SQL Parser encounters something that looks like a function, it doesn't try to interpret it directly. Instead, it'll produce a UnresolvedFunction expression node. You'll need to replace these.

Like most databases, Spark maintains a "Function Registry", a catalog of all functions and their implementations. All of the "built-in" functions are provided in FunctionRegistry.builtin. Here's a little snippet you can use to replace functions. It doesn't support everything, but will be sufficient for this project.

case UnresolvedFunction(name, arguments, isDistinct, filter, ignoreNulls) =>
  {
    val builder = 
      FunctionRegistry.builtin
        .lookupFunctionBuilder(name)
        .getOrElse {
          throw new RuntimeException(
            s"Unable to resolve function `${name}`"
          )
        }
    builder(arguments) // returns the replacement expression node.
  }

FunctionRegistry.lookupFunctionBuilder returns a 'builder' function. When called on the arguments of the UnresolvedFunction, the builder function returns an expression that implements the function. For example, looking up "regexp_extract" in the registry returns a function that, when called on two string-typed expressions and a literal integer, will return a RegExpExtract object.

Aggregates disguised as Projects

Because the Spark SQL Parser doesn't try to resolve functions, it is incapable of distinguishing between normal functions:

  SELECT regexp_extract(A, "a(b+)a", 1) FROM R
and aggregate functions:
  SELECT sum(A) FROM R
Both of these will parse into a LogicalPlan topped with a Project node.

While not required, you might find it easier to work with the resulting plans if you replace them with actual Aggregate plan nodes. Look for Project nodes with any expression in its projectList that is a subclass of AggregateFunction.

Aggregate

An Aggregate is a logical plan node with three fields:
groupingExpressions
The GROUP BY attributes. Normally these can be any expression, but for this checkpoint, it will be sufficient to assume that all of these expressions are Attributes
aggregateExpressions
The SELECT expressions. Normally these can be arbitrary arithmetic over aggregates, but for this checkpoint, it will be sufficient to assume that all of these expressions are either an Alias of an AggregateFunction, or an Attribute that also appears in the groupingExpressions field.
child
The input operator.

AggregateFunctions

AggregateFunctions are unevaluable, because they don't get evaluated on a single row. Instead, there are several methods on an AggregateFunction that describe how to initialize an accumulator (what Spark calls an AggregationBuffer), how to incorporate input rows into it, and how to extract a final result value from the buffer.

The AggregateFunction can be an instance of either:

For the purposes of this checkpoint, you will need to support SUM, COUNT, AVERAGE, MIN, and MAX, all of which are implemented in Spark as DeclarativeAggregates.

DeclarativeAggregates

The following methods are relevant:

aggBufferAttributes
The "schema" of the aggregate buffer. Note that these are attributes, and their ExprIds here line up with the Attributes used in the expressions below.
initialValues
A sequence of expressions, one for every attribute in aggBufferAttributes. These are the initial values for the buffer.
updateExpressions
A sequence of expressions, one for every attribute in aggBufferAttributes. Evaluate these expressions on an InternalRow that includes both the aggBufferAttributes and the .output of the Aggregate's child LogicalPlan operator.
evaluateExpression
An expression that, if evaluated on an InternalRow storing the aggregation buffer, will return the result of the aggregate function.

Example Queries

TPC-H is a standard database benchmark. The benchmark consists of a dataset generator and 22 standard query templates. This checkpoint uses three queries based on TPC-H Queries 1, 3, 5, 6, 10, 11, 12, and 14. The dataset generator and template values can be found at the TPC-H website, and is run at scaling factor (SF) 0.1. Minor variations in the queries may be made. The queries have been rewritten slightly to make them easier to Analyze.

Query 1
SELECT
  LINEITEM.RETURNFLAG,
  LINEITEM.LINESTATUS,
  SUM(LINEITEM.QUANTITY) AS SUM_QTY,
  SUM(LINEITEM.EXTENDEDPRICE) AS SUM_BASE_PRICE, 
  SUM(LINEITEM.EXTENDEDPRICE*(CAST(1.0 as float)-LINEITEM.DISCOUNT)) AS SUM_DISC_PRICE, 
  SUM(LINEITEM.EXTENDEDPRICE*(CAST(1.0 as float)-LINEITEM.DISCOUNT)*(CAST(1.0 as float)+LINEITEM.TAX)) AS SUM_CHARGE, 
  AVG(LINEITEM.QUANTITY) AS AVG_QTY,
  AVG(LINEITEM.EXTENDEDPRICE) AS AVG_PRICE,
  AVG(LINEITEM.DISCOUNT) AS AVG_DISC,
  COUNT(*) AS COUNT_ORDER
FROM
  LINEITEM
WHERE
  LINEITEM.SHIPDATE <= DATE '1998-10-01'
GROUP BY 
  LINEITEM.RETURNFLAG, LINEITEM.LINESTATUS 
ORDER BY
  LINEITEM.RETURNFLAG, LINEITEM.LINESTATUS
Query 3
SELECT
  LINEITEM.ORDERKEY,
  SUM(LINEITEM.EXTENDEDPRICE*(CAST(1.0 as float)-LINEITEM.DISCOUNT)) AS REVENUE, 
  ORDERS.ORDERDATE,
  ORDERS.SHIPPRIORITY
FROM
  CUSTOMER,
  ORDERS,
  LINEITEM 
WHERE
  CUSTOMER.MKTSEGMENT = 'BUILDING' AND CUSTOMER.CUSTKEY = ORDERS.CUSTKEY
  AND LINEITEM.ORDERKEY = ORDERS.ORDERKEY 
  AND ORDERS.ORDERDATE < DATE '1995-03-15'
  AND LINEITEM.SHIPDATE > DATE '1995-03-15'
GROUP BY LINEITEM.ORDERKEY, ORDERS.ORDERDATE, ORDERS.SHIPPRIORITY 
ORDER BY REVENUE DESC, ORDERDATE
LIMIT 10
Query 5
SELECT
  NATION.NAME,
  SUM(LINEITEM.EXTENDEDPRICE * (CAST(1.0 as float) - LINEITEM.DISCOUNT)) AS REVENUE 
FROM
  REGION, NATION, CUSTOMER, ORDERS, LINEITEM, SUPPLIER
WHERE
  CUSTOMER.CUSTKEY = ORDERS.CUSTKEY
  AND LINEITEM.ORDERKEY = ORDERS.ORDERKEY
  AND LINEITEM.SUPPKEY = SUPPLIER.SUPPKEY
  AND CUSTOMER.NATIONKEY = NATION.NATIONKEY 
  AND SUPPLIER.NATIONKEY = NATION.NATIONKEY
  AND NATION.REGIONKEY = REGION.REGIONKEY
  AND REGION.NAME = 'ASIA'
  AND ORDERS.ORDERDATE >= DATE '1994-01-01'
  AND ORDERS.ORDERDATE < DATE '1995-01-01'
GROUP BY NATION.NAME
ORDER BY REVENUE DESC
Query 6
SELECT
  SUM(LINEITEM.EXTENDEDPRICE*LINEITEM.DISCOUNT) AS REVENUE
FROM LINEITEM
WHERE LINEITEM.SHIPDATE >= DATE '1994-01-01'
  AND LINEITEM.SHIPDATE < DATE '1995-01-01'
  AND LINEITEM.DISCOUNT > CAST(0.05 AS float) AND LINEITEM.DISCOUNT < CAST(0.07 as float)
  AND LINEITEM.QUANTITY < CAST(24 AS float)
Query 10
SELECT 
  CUSTOMER.CUSTKEY, 
  SUM(LINEITEM.EXTENDEDPRICE * (CAST(1.0 as float) - LINEITEM.DISCOUNT)) AS REVENUE, 
  CUSTOMER.ACCTBAL, 
  NATION.NAME, 
  CUSTOMER.ADDRESS, 
  CUSTOMER.PHONE, 
  CUSTOMER.COMMENT
FROM 
  CUSTOMER, ORDERS, LINEITEM, NATION
WHERE
  CUSTOMER.CUSTKEY = ORDERS.CUSTKEY
  AND LINEITEM.ORDERKEY = ORDERS.ORDERKEY
  AND ORDERS.ORDERDATE >= DATE '1993-10-01'
  AND ORDERS.ORDERDATE < DATE '1994-01-01'
  AND LINEITEM.RETURNFLAG = 'R'
  AND CUSTOMER.NATIONKEY = NATION.NATIONKEY
GROUP BY 
  CUSTOMER.CUSTKEY, CUSTOMER.ACCTBAL, CUSTOMER.PHONE, NATION.NAME, CUSTOMER.ADDRESS, CUSTOMER.COMMENT
ORDER BY REVENUE ASC
LIMIT 20
Query 11
SELECT PK_V.PARTKEY, 
       PK_V.VALUE
FROM (
  SELECT PS.PARTKEY,
         SUM(PS.SUPPLYCOST * CAST(PS.AVAILQTY AS float)) AS VALUE
  FROM PARTSUPP PS,
       SUPPLIER S,
       NATION N
  WHERE PS.SUPPKEY = S.SUPPKEY
    AND S.NATIONKEY = N.NATIONKEY
    AND N.NAME = 'GERMANY'
  GROUP BY PS.PARTKEY 
) PK_V, (
  SELECT SUM(PS.SUPPLYCOST * CAST(PS.AVAILQTY AS float)) AS VALUE
  FROM PARTSUPP PS,
       SUPPLIER S,
       NATION N
  WHERE PS.SUPPKEY = S.SUPPKEY
    AND S.NATIONKEY = N.NATIONKEY
    AND N.NAME = 'GERMANY'
) CUTOFF_V
WHERE PK_V.VALUE > (CUTOFF_V.VALUE * CAST(0.0001 AS double) / CAST(100.0 AS double))
ORDER BY PK_V.VALUE DESC
Query 12
SELECT  LINEITEM.SHIPMODE, 
        SUM(CASE WHEN ORDERS.ORDERPRIORITY = '1-URGENT'
                     OR ORDERS.ORDERPRIORITY = '2-HIGH'
                   THEN 1
                   ELSE 0 END) AS HIGH_LINE_COUNT,
        SUM(CASE WHEN ORDERS.ORDERPRIORITY <> '1-URGENT'
                     AND ORDERS.ORDERPRIORITY <> '2-HIGH'
                   THEN 1
                   ELSE 0 END) AS LOW_LINE_COUNT
FROM LINEITEM, ORDERS
WHERE ORDERS.ORDERKEY = LINEITEM.ORDERKEY
  AND (LINEITEM.SHIPMODE='MAIL' OR LINEITEM.SHIPMODE='SHIP')
  AND LINEITEM.COMMITDATE < LINEITEM.RECEIPTDATE
  AND LINEITEM.SHIPDATE < LINEITEM.COMMITDATE
  AND LINEITEM.RECEIPTDATE >= DATE '1994-01-01'
  AND LINEITEM.RECEIPTDATE < DATE '1995-01-01'
GROUP BY LINEITEM.SHIPMODE
ORDER BY LINEITEM.SHIPMODE
Query 14
SELECT
  CAST(100.00 AS double) 
    * PROMO_ONLY 
    / ALL_REVENUE
      AS PROMO_REVENUE
FROM (
  SELECT
    SUM(
      CASE  WHEN PART.TYPE LIKE 'PROMO%'
            THEN LINEITEM.EXTENDEDPRICE * (CAST(1.0 as float) - LINEITEM.DISCOUNT)
            ELSE cast(0 as float)
      END
    ) AS PROMO_ONLY,
    SUM(
      LINEITEM.EXTENDEDPRICE * (CAST(1.0 as float) - LINEITEM.DISCOUNT)
    ) AS ALL_REVENUE
  FROM 
    LINEITEM,
    PART
  WHERE
    LINEITEM.PARTKEY = PART.PARTKEY
    AND LINEITEM.SHIPDATE >= DATE '1995-09-01'
    AND LINEITEM.SHIPDATE < DATE '1995-10-01'
) AGGREGATE

This page last updated 2024-12-09 13:28:25 -0500