Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,18 @@ class AggregateOpDesc extends LogicalOp {
val inputSchema = inputSchemas(operatorInfo.inputPorts.head.id)
val outputSchema = Schema(
groupByKeys.map(key => inputSchema.getAttribute(key)) ++
localAggregations.map(agg =>
agg.getAggregationAttribute(inputSchema.getAttribute(agg.attribute).getType)
)
localAggregations.map { agg =>
// Only COUNT with an empty attribute (COUNT(*)) skips the column lookup:
// its result type is INTEGER regardless. Every other function resolves
// the input attribute (failing fast if it is missing/invalid).
val attrType =
if (
agg.aggFunction == AggregationFunction.COUNT &&
(agg.attribute == null || agg.attribute.trim.isEmpty)
) null
else inputSchema.getAttribute(agg.attribute).getType
agg.getAggregationAttribute(attrType)
Comment thread
tanishqgandhi1908 marked this conversation as resolved.
}
Comment thread
tanishqgandhi1908 marked this conversation as resolved.
)
Map(PortIdentity(internal = true) -> outputSchema)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,18 @@ class AggregateOpExec(descString: String) extends OperatorExecutor {

// Initialize distributedAggregations if it's not yet initialized
if (distributedAggregations == null) {
distributedAggregations = desc.aggregations.map(agg =>
agg.getAggFunc(tuple.getSchema.getAttribute(agg.attribute).getType)
)
distributedAggregations = desc.aggregations.map { agg =>
// Only COUNT with an empty attribute (COUNT(*)) skips the column lookup; its
// result does not depend on any input attribute. Every other function resolves
// the input attribute (failing fast if it is missing/invalid).
val attrType =
if (
agg.aggFunction == AggregationFunction.COUNT &&
(agg.attribute == null || agg.attribute.trim.isEmpty)
) null
else tuple.getSchema.getAttribute(agg.attribute).getType
agg.getAggFunc(attrType)
Comment thread
tanishqgandhi1908 marked this conversation as resolved.
}
Comment thread
tanishqgandhi1908 marked this conversation as resolved.
}

// Construct the group key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,23 @@ case class AveragePartialObj(sum: Double, count: Double) extends Serializable {}
}
]
}
}
},
"allOf": [
{
"if": {
"properties": {
"aggFunction": { "const": "count" }
}
},
"then": {},
"else": {
"required": ["attribute"],
"properties": {
"attribute": { "minLength": 1 }
}
}
Comment thread
tanishqgandhi1908 marked this conversation as resolved.
}
]
}
""")
class AggregationOperation {
Expand All @@ -64,8 +80,8 @@ class AggregationOperation {
@JsonPropertyDescription("sum, count, average, min, max, or concat")
var aggFunction: AggregationFunction = _

@JsonProperty(value = "attribute", required = true)
@JsonPropertyDescription("column to calculate average value")
@JsonProperty(value = "attribute")
@JsonPropertyDescription("column to aggregate on")
Comment thread
tanishqgandhi1908 marked this conversation as resolved.
@AutofillAttributeName
var attribute: String = _

Expand Down Expand Up @@ -106,6 +122,7 @@ class AggregationOperation {
@JsonIgnore
def getFinal: AggregationOperation = {
val newAggFunc = aggFunction match {
// COUNT emits partial counts locally; the global stage sums them.
case AggregationFunction.COUNT => AggregationFunction.SUM
case a: AggregationFunction => a
}
Expand Down Expand Up @@ -139,15 +156,13 @@ class AggregationOperation {
}

private def countAgg(): DistributedAggregation[Integer] = {
// An empty attribute means COUNT(*): count every row. Otherwise count only the
// rows whose attribute value is non-null (COUNT(column)).
val countAllRows = attribute == null || attribute.trim.isEmpty
new DistributedAggregation[Integer](
() => 0,
(partial, tuple) => {
val inc =
if (attribute == null) 1
else if (tuple.getField(attribute) != null) 1
else 0
partial + inc
},
(partial, tuple) =>
partial + (if (countAllRows || tuple.getField(attribute) != null) 1 else 0),
(partial1, partial2) => partial1 + partial2,
partial => partial
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,22 @@ class AggregateOpDescSpec extends AnyFlatSpec with Matchers {
.getExternalOutputSchemas(Map(PortIdentity() -> input)) shouldBe
Map(PortIdentity() -> Schema().add("avg", AttributeType.DOUBLE))
}

it should "type a COUNT(*) (empty attribute) result as INTEGER without looking up an input column" in {
// An empty attribute means COUNT(*); schema propagation must not dereference a column.
val input = Schema().add("v", AttributeType.LONG)
descWith(List.empty, aggOp(AggregationFunction.COUNT, "", "row_count"))
.getExternalOutputSchemas(Map(PortIdentity() -> input)) shouldBe
Map(PortIdentity() -> Schema().add("row_count", AttributeType.INTEGER))
}

it should "fail fast for a non-COUNT function with an empty attribute (only COUNT allows it)" in {
// Only COUNT tolerates a blank attribute; SUM/etc. must resolve the column and fail
// fast rather than propagate a null-typed output.
val input = Schema().add("v", AttributeType.LONG)
assertThrows[Exception] {
descWith(List.empty, aggOp(AggregationFunction.SUM, "", "total"))
.getExternalOutputSchemas(Map(PortIdentity() -> input))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,16 @@ class AggregateOpSpec extends AnyFunSuite {
assert(attr.getType == AttributeType.INTEGER)
}

test("getAggregationAttribute maps COUNT result to INTEGER even with a null input type") {
// COUNT(*) (empty attribute) has no input column, so schema propagation passes a
// null attrType; it must still resolve to INTEGER without dereferencing it.
val operation = makeAggregationOp(AggregationFunction.COUNT, "", "row_count")
val attr = operation.getAggregationAttribute(null)

assert(attr.getName == "row_count")
assert(attr.getType == AttributeType.INTEGER)
}

test("getAggregationAttribute maps CONCAT result type to STRING") {
val operation = makeAggregationOp(AggregationFunction.CONCAT, "tag", "all_tags")
val attr = operation.getAggregationAttribute(AttributeType.INTEGER)
Expand Down Expand Up @@ -142,7 +152,27 @@ class AggregateOpSpec extends AnyFunSuite {
assert(math.abs(result - 4.0) < 1e-6)
}

test("COUNT aggregation with attribute == null counts all rows") {
test("COUNT with an empty attribute (COUNT(*)) counts all rows regardless of nulls") {
// An empty attribute means COUNT(*); the GUI sends "" when no column is selected.
val schema = makeSchema("points" -> AttributeType.INTEGER)
val tuple1 = makeTuple(schema, 10)
val tuple2 = makeTuple(schema, null)
val tuple3 = makeTuple(schema, 20)

val operation = makeAggregationOp(AggregationFunction.COUNT, "", "row_count")
val agg = operation.getAggFunc(AttributeType.INTEGER)

var partial = agg.init()
partial = agg.iterate(partial, tuple1)
partial = agg.iterate(partial, tuple2)
partial = agg.iterate(partial, tuple3)

val result = agg.finalAgg(partial).asInstanceOf[Number].intValue()
assert(result == 3)
}

test("COUNT with a null attribute also counts all rows") {
// A null attribute is treated the same as empty (COUNT(*)).
val schema = makeSchema("points" -> AttributeType.INTEGER)
val tuple1 = makeTuple(schema, 10)
val tuple2 = makeTuple(schema, null)
Expand Down Expand Up @@ -535,4 +565,31 @@ class AggregateOpSpec extends AnyFunSuite {
assert(totalRevenue == 350)
assert(rowCount == 3)
}

test("AggregateOpExec computes COUNT(*) over every row (including nulls) end-to-end") {
// region (ignored), revenue (one null). An empty attribute means COUNT(*), so the
// executor must skip the input-column lookup and still count all 3 rows.
val schema = makeSchema(
"region" -> AttributeType.STRING,
"revenue" -> AttributeType.INTEGER
)

val tuple1 = makeTuple(schema, "west", 100)
val tuple2 = makeTuple(schema, "east", null)
val tuple3 = makeTuple(schema, "west", 50)

val desc = new AggregateOpDesc()
desc.aggregations = List(makeAggregationOp(AggregationFunction.COUNT, "", "row_count"))
desc.groupByKeys = List() // global aggregation

val exec = new AggregateOpExec(objectMapper.writeValueAsString(desc))
exec.open()
exec.processTuple(tuple1, 0)
exec.processTuple(tuple2, 0)
exec.processTuple(tuple3, 0)

val results = exec.onFinish(0).toList
assert(results.size == 1)
assert(results.head.getFields(0).asInstanceOf[Number].intValue() == 3)
}
}
6 changes: 4 additions & 2 deletions docs/reference/operators/data-cleaning/aggregate/aggregate.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ tags: [data-cleaning, aggregate]
|----------|-------------|------|---------|-------------|
| Aggregations | ✓ | List<Aggregation> | - | Multiple aggregation functions (min: 1,<br>aggregations cannot be empty) |
| ↳ Aggregate Func | ✓ | sum, count, average, min, max, concat | - | Sum, count, average, min, max, or concat |
| ↳ Attribute | ✓ | String | - | Column to calculate average value |
| ↳ Result Attribute | ✓ | String | - | Column name of average result |
| ↳ Attribute | ✓ (optional for `count`) | String | - | Column to aggregate on. Required for every function except `count`: leave it empty with `count` to count all rows (`COUNT(*)`), or pick a column to count its non-null values |
| ↳ Result Attribute | ✓ | String | - | Column name of the aggregation result |
| Group By Keys | | List | - | Group by columns |

> **Counting rows**: with the `count` function, leave **Attribute** empty to count every row (`COUNT(*)`, including rows with nulls), or choose a column to count only that column's non-null values.

### Output Ports

| Port | Mode |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ import { map, switchMap, take } from "rxjs/operators";

Quill.register("modules/cursors", QuillCursors);

// The Aggregate "count" function. With an empty attribute it means COUNT(*) (all rows);
// with a column it counts that column's non-null values. It is the only function whose
// attribute is optional.
export const AGGREGATE_COUNT = "count";
Comment thread
tanishqgandhi1908 marked this conversation as resolved.

/**
* Property Editor uses JSON Schema to automatically generate the form from the JSON Schema of an operator.
* For example, the JSON Schema of Sentiment Analysis could be:
Expand Down Expand Up @@ -545,6 +550,16 @@ export class OperatorPropertyEditFrameComponent implements OnInit, OnChanges, On
mappedField.type = "datasetversionselector";
}

// Aggregate: the attribute is optional for `count` (an empty attribute means COUNT(*),
// counting all rows) and required for every other function. Show the required marker
// (red *) accordingly, based on the sibling aggFunction within the same row.
if (this.currentOperatorSchema?.operatorType === "Aggregate" && mappedField.key === "attribute") {
mappedField.expressions = {
...mappedField.expressions,
"props.required": (field: FormlyFieldConfig) => field.parent?.model?.aggFunction !== AGGREGATE_COUNT,
};
}
Comment thread
tanishqgandhi1908 marked this conversation as resolved.

if (this.currentOperatorSchema?.operatorType === "FileScanOp" && mappedField.key === "outputFileName") {
mappedField.expressions = {
...mappedField.expressions,
Expand Down
Loading