Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.texera.amber.core.storage.model

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

import java.io.File
import java.nio.charset.StandardCharsets
import java.nio.file.Files

class ReadonlyLocalFileDocumentSpec extends AnyFlatSpec with Matchers {

private def withTempDoc(
content: String
)(body: (ReadonlyLocalFileDocument, File) => Unit): Unit = {
val tmp = File.createTempFile("readonly-local-", ".txt")
try {
Files.write(tmp.toPath, content.getBytes(StandardCharsets.UTF_8))
body(new ReadonlyLocalFileDocument(tmp.toURI), tmp)
} finally {
tmp.delete()
}
}

"ReadonlyLocalFileDocument" should "expose the backing URI and file" in {
withTempDoc("hello") { (doc, tmp) =>
doc.getURI shouldBe tmp.toURI
doc.asFile() shouldBe new File(tmp.toURI)
doc.asFile().getCanonicalFile shouldBe tmp.getCanonicalFile
}
}

it should "read the file contents through an input stream" in {
withTempDoc("hello world") { (doc, _) =>
val in = doc.asInputStream()
try new String(in.readAllBytes(), "UTF-8") shouldBe "hello world"
finally in.close()
}
}

it should "not support the collection accessors" in {
withTempDoc("x") { (doc, _) =>
intercept[NotImplementedError](doc.getItem(0)).getMessage shouldBe
"getItem is not supported for ReadonlyLocalFileDocument"
intercept[NotImplementedError](doc.get()).getMessage shouldBe
"get is not supported for ReadonlyLocalFileDocument"
intercept[NotImplementedError](doc.getRange(0, 1)).getMessage shouldBe
"getRange is not supported for ReadonlyLocalFileDocument"
intercept[NotImplementedError](doc.getAfter(0)).getMessage shouldBe
"getAfter is not supported for ReadonlyLocalFileDocument"
intercept[NotImplementedError](doc.getCount).getMessage shouldBe
"getCount is not supported for ReadonlyLocalFileDocument"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.texera.amber.core.storage.model

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

import java.io.ByteArrayInputStream
import java.net.URI

class VirtualDocumentSpec extends AnyFlatSpec with Matchers {

// A minimal concrete document overriding only the two abstract members
// (getURI, clear); every other method keeps its default throwing body.
private class MinimalDoc extends VirtualDocument[Int] {
override def getURI: URI = new URI("file:///stub/doc")
override def clear(): Unit = ()
}

// one shared instance: every default method throws statelessly, so all assertions can run
// against the same document (in particular the URI and clear checks below)
private val doc: VirtualDocument[Int] = new MinimalDoc

"VirtualDocument" should "return the implemented URI and support clear" in {
doc.getURI shouldBe new URI("file:///stub/doc")
noException should be thrownBy doc.clear()
}

it should "throw NotImplementedError for every unimplemented read accessor" in {
intercept[NotImplementedError](doc.getItem(0)).getMessage shouldBe
"getItem method is not implemented"
intercept[NotImplementedError](doc.get()).getMessage shouldBe "get method is not implemented"
intercept[NotImplementedError](doc.getRange(0, 1)).getMessage shouldBe
"getRange method is not implemented"
intercept[NotImplementedError](doc.getRange(0, 1, columns = Some(Seq("c")))).getMessage shouldBe
"getRange method is not implemented"
intercept[NotImplementedError](doc.getAfter(0)).getMessage shouldBe
"getAfter method is not implemented"
intercept[NotImplementedError](doc.getCount).getMessage shouldBe
"getCount method is not implemented"
}

it should "throw NotImplementedError for every unimplemented write accessor" in {
intercept[NotImplementedError](doc.setItem(0, 5)).getMessage shouldBe
"setItem method is not implemented"
// note: the writer default reports the message "write method is not implemented"
intercept[NotImplementedError](doc.writer("w")).getMessage shouldBe
"write method is not implemented"
intercept[NotImplementedError](doc.append(5)).getMessage shouldBe
"append method is not implemented"
intercept[NotImplementedError](doc.append(Iterator(1, 2))).getMessage shouldBe
"append method is not implemented"
intercept[NotImplementedError](
doc.appendStream(new ByteArrayInputStream(Array[Byte](1)))
).getMessage shouldBe "appendStream method is not implemented"
}

it should "throw NotImplementedError for the unimplemented file/stat accessors" in {
intercept[NotImplementedError](doc.asInputStream()).getMessage shouldBe
"asInputStream method is not implemented"
intercept[NotImplementedError](doc.asFile()).getMessage shouldBe
"asFile method is not implemented"
intercept[NotImplementedError](doc.getTableStatistics).getMessage shouldBe
"getTableStatistics method is not implemented"
intercept[NotImplementedError](doc.getTotalFileSize).getMessage shouldBe
"getTotalFileSize method is not implemented"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.texera.amber.core.storage.result

import org.apache.texera.amber.core.tuple.AttributeType
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

class ResultSchemaSpec extends AnyFlatSpec with Matchers {

"ResultSchema.runtimeStatisticsSchema" should "declare the runtime-statistics columns in order" in {
val schema = ResultSchema.runtimeStatisticsSchema
schema.getAttributeNames shouldBe List(
"operatorId",
"time",
"inputTupleCnt",
"inputTupleSize",
"outputTupleCnt",
"outputTupleSize",
"dataProcessingTime",
"controlProcessingTime",
"idleTime",
"numWorkers",
"status"
)
schema.getAttribute("operatorId").getType shouldBe AttributeType.STRING
schema.getAttribute("time").getType shouldBe AttributeType.TIMESTAMP
List(
"inputTupleCnt",
"inputTupleSize",
"outputTupleCnt",
"outputTupleSize",
"dataProcessingTime",
"controlProcessingTime",
"idleTime"
).foreach(name => schema.getAttribute(name).getType shouldBe AttributeType.LONG)
schema.getAttribute("numWorkers").getType shouldBe AttributeType.INTEGER
schema.getAttribute("status").getType shouldBe AttributeType.INTEGER
}

"ResultSchema.consoleMessagesSchema" should "declare a single string message column" in {
val schema = ResultSchema.consoleMessagesSchema
schema.getAttributeNames shouldBe List("message")
schema.getAttribute("message").getType shouldBe AttributeType.STRING
}
}
Loading