Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -498,4 +498,38 @@ class IntervalOpExecSpec extends AnyFlatSpec with BeforeAndAfter {
)
}

it should "join timestamps across every interval unit" in {
val base = Timestamp.valueOf("2020-01-01 00:00:00")
Seq(
TimeIntervalType.YEAR,
TimeIntervalType.MONTH,
TimeIntervalType.HOUR,
TimeIntervalType.MINUTE,
TimeIntervalType.SECOND
).foreach { unit =>
val desc = new IntervalJoinOpDesc("point", "range", 3L, true, true, unit)
val exec = new IntervalJoinOpExec(objectMapper.writeValueAsString(desc))
exec.open()
assert(exec.processTuple(timeStampTuple("range", 1, base), right).isEmpty)
val out = exec.processTuple(timeStampTuple("point", 1, base), left).toList
assert(out.size == 1, s"expected a match for unit $unit")
assert(out.head.getFields.length == 4)
exec.close()
Comment thread
aglinxinyuan marked this conversation as resolved.
Outdated
}
}

it should "reject a join key whose type does not support interval comparison" in {
val desc = new IntervalJoinOpDesc("point", "range", 3L, true, true, TimeIntervalType.DAY)
val exec = new IntervalJoinOpExec(objectMapper.writeValueAsString(desc))
exec.open()
assert(
exec.processTuple(newTuple[String]("range", 1, "a", AttributeType.STRING), right).isEmpty
)
val ex = intercept[org.apache.texera.amber.core.WorkflowRuntimeException] {
exec.processTuple(newTuple[String]("point", 1, "a", AttributeType.STRING), left).toList
}
assert(ex.getMessage == "The data type can not support comparison: string")
exec.close()
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.texera.amber.operator.sleep

import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema, Tuple}
import org.apache.texera.amber.util.JSONUtils.objectMapper
import org.scalatest.flatspec.AnyFlatSpec

class SleepOpExecSpec extends AnyFlatSpec {

private val schema: Schema = Schema().add(new Attribute("v", AttributeType.INTEGER))

private def tuple(v: Int): Tuple =
Tuple.builder(schema).add(new Attribute("v", AttributeType.INTEGER), Integer.valueOf(v)).build()

// SleepOpDesc is a LogicalOp: serialize a real instance so the polymorphic operatorType
// discriminator is present (a hand-written JSON string would fail to deserialize).
private def descString(sleepTime: Int): String = {
val desc = new SleepOpDesc()
desc.sleepTime = sleepTime
objectMapper.writeValueAsString(desc)
}

"SleepOpExec" should "construct from a serialized SleepOpDesc" in {
val exec = new SleepOpExec(descString(0))
assert(exec != null)
}

"SleepOpExec.processTuple" should "return the input tuple unchanged" in {
// sleepTime = 0 -> Thread.sleep(0), instant
val exec = new SleepOpExec(descString(0))
val emitted = exec.processTuple(tuple(7), 0).toList
assert(emitted.map(_.asInstanceOf[Tuple]) == List(tuple(7)))
}
Comment thread
aglinxinyuan marked this conversation as resolved.
Outdated

it should "emit exactly one tuple per input" in {
val exec = new SleepOpExec(descString(0))
val emitted = (0 until 5).flatMap(i => exec.processTuple(tuple(i), 0).toList)
assert(emitted.map(_.asInstanceOf[Tuple]) == (0 until 5).map(tuple).toList)
Comment thread
aglinxinyuan marked this conversation as resolved.
Outdated
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.texera.amber.operator.source.cache

import org.apache.texera.amber.core.storage.VFSURIFactory
import org.apache.texera.amber.core.virtualidentity.{
ExecutionIdentity,
OperatorIdentity,
PhysicalOpIdentity,
WorkflowIdentity
}
import org.apache.texera.amber.core.workflow.{GlobalPortIdentity, PortIdentity}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

import java.net.URI

/**
* Unit tests for the in-memory validation in [[CacheSourceOpExec]]'s constructor. The
* result-storage read path requires a live Iceberg catalog and is covered by the amber
* integration suite, not here.
*/
class CacheSourceOpExecSpec extends AnyFlatSpec with Matchers {

private val globalPortId =
GlobalPortIdentity(
PhysicalOpIdentity(OperatorIdentity("opA"), "main"),
PortIdentity(0),
input = true
)

"CacheSourceOpExec" should "reject a storage URI whose resource type is not RESULT" in {
val stateUri =
VFSURIFactory.stateURI(
VFSURIFactory.createPortBaseURI(WorkflowIdentity(7L), ExecutionIdentity(11L), globalPortId)
)
val ex = intercept[RuntimeException](new CacheSourceOpExec(stateUri))
ex.getMessage shouldBe "The storage URI must point to a result storage"
}

it should "reject a non-vfs storage URI" in {
intercept[IllegalArgumentException](
new CacheSourceOpExec(new URI("http:///wid/1/eid/1/result"))
)
}
}
Loading