diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/intervalJoin/IntervalOpExecSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/intervalJoin/IntervalOpExecSpec.scala index 97c60129833..aceab33f9d9 100644 --- a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/intervalJoin/IntervalOpExecSpec.scala +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/intervalJoin/IntervalOpExecSpec.scala @@ -498,4 +498,39 @@ class IntervalOpExecSpec extends AnyFlatSpec with BeforeAndAfter { ) } + it should "join timestamps across every interval unit" in { + val base = Timestamp.valueOf("2020-01-01 00:00:00") + Seq( + TimeIntervalType.YEAR, + TimeIntervalType.MONTH, + TimeIntervalType.HOUR, + TimeIntervalType.MINUTE, + TimeIntervalType.SECOND + ).foreach { unit => + val desc = new IntervalJoinOpDesc("point", "range", 3L, true, true, unit) + val exec = new IntervalJoinOpExec(objectMapper.writeValueAsString(desc)) + exec.open() + try { + assert(exec.processTuple(timeStampTuple("range", 1, base), right).isEmpty) + val out = exec.processTuple(timeStampTuple("point", 1, base), left).toList + assert(out.size == 1, s"expected a match for unit $unit") + assert(out.head.getFields.length == 4) + } finally exec.close() + } + } + + it should "reject a join key whose type does not support interval comparison" in { + val desc = new IntervalJoinOpDesc("point", "range", 3L, true, true, TimeIntervalType.DAY) + val exec = new IntervalJoinOpExec(objectMapper.writeValueAsString(desc)) + exec.open() + assert( + exec.processTuple(newTuple[String]("range", 1, "a", AttributeType.STRING), right).isEmpty + ) + val ex = intercept[org.apache.texera.amber.core.WorkflowRuntimeException] { + exec.processTuple(newTuple[String]("point", 1, "a", AttributeType.STRING), left).toList + } + assert(ex.getMessage == "The data type can not support comparison: string") + exec.close() + } + } diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/sleep/SleepOpExecSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/sleep/SleepOpExecSpec.scala new file mode 100644 index 00000000000..2aa143f2948 --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/sleep/SleepOpExecSpec.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.sleep + +import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema, Tuple} +import org.apache.texera.amber.util.JSONUtils.objectMapper +import org.scalatest.flatspec.AnyFlatSpec + +class SleepOpExecSpec extends AnyFlatSpec { + + private val schema: Schema = Schema().add(new Attribute("v", AttributeType.INTEGER)) + + private def tuple(v: Int): Tuple = + Tuple.builder(schema).add(new Attribute("v", AttributeType.INTEGER), Integer.valueOf(v)).build() + + // SleepOpDesc is a LogicalOp: serialize a real instance so the polymorphic operatorType + // discriminator is present (a hand-written JSON string would fail to deserialize). + private def descString(sleepTime: Int): String = { + val desc = new SleepOpDesc() + desc.sleepTime = sleepTime + objectMapper.writeValueAsString(desc) + } + + "SleepOpExec" should "construct from a serialized SleepOpDesc" in { + val exec = new SleepOpExec(descString(0)) + assert(exec != null) + } + + "SleepOpExec.processTuple" should "return the input tuple unchanged" in { + // sleepTime = 0 -> Thread.sleep(0), instant + val exec = new SleepOpExec(descString(0)) + assert(exec.processTuple(tuple(7), 0).toList == List(tuple(7))) + } + + it should "emit exactly one tuple per input" in { + val exec = new SleepOpExec(descString(0)) + val emitted = (0 until 5).flatMap(i => exec.processTuple(tuple(i), 0).toList) + assert(emitted == (0 until 5).map(tuple).toList) + } +}