Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -188,37 +188,39 @@ void scopedResponseCarriesGlossaryNameAndIdPerNode(TestNamespace ns) throws Exce
Glossary glossary = GlossaryTestFactory.createWithName(ns, "labeled");
GlossaryTerm term = GlossaryTermTestFactory.createWithName(ns, glossary, "t1");

// The term node and its `group`/`glossaryId` projection land in RDF separately, so wait for the
// full projection — not just node presence — before asserting. Otherwise the node can already
// be
// present while `group` is still null on a slower run, which is exactly the flake this guards.
Awaitility.await()
.atMost(Duration.ofSeconds(30))
.pollInterval(Duration.ofMillis(500))
.untilAsserted(
() ->
assertTrue(
nodeIds(fetchGlossaryGraph(glossary.getId())).contains(term.getId()),
"Term should be projected to RDF before assertion"));

JsonNode scoped = fetchGlossaryGraph(glossary.getId());
JsonNode termNode = null;
for (JsonNode node : scoped.get("nodes")) {
JsonNode idNode = node.get("id");
if (idNode != null && term.getId().toString().equals(idNode.asText())) {
termNode = node;
break;
}
}
assertNotNull(termNode, "Scoped response should include the created term");
() -> {
JsonNode scoped = fetchGlossaryGraph(glossary.getId());
JsonNode termNode = null;
for (JsonNode node : scoped.get("nodes")) {
JsonNode idNode = node.get("id");
if (idNode != null && term.getId().toString().equals(idNode.asText())) {
termNode = node;
break;
}
}
assertNotNull(termNode, "Scoped response should include the created term");

JsonNode groupNode = termNode.get("group");
assertNotNull(
groupNode, "Term node should carry a `group` field with the parent glossary's name");
assertEquals(
glossary.getName(),
groupNode.asText(),
"Group label should match the parent glossary's name");
JsonNode groupNode = termNode.get("group");
assertNotNull(
groupNode,
"Term node should carry a `group` field with the parent glossary's name");
assertEquals(
glossary.getName(),
groupNode.asText(),
"Group label should match the parent glossary's name");

JsonNode glossaryIdNode = termNode.get("glossaryId");
assertNotNull(glossaryIdNode, "Term node should carry the parent glossary's id");
assertEquals(glossary.getId().toString(), glossaryIdNode.asText());
JsonNode glossaryIdNode = termNode.get("glossaryId");
assertNotNull(glossaryIdNode, "Term node should carry the parent glossary's id");
assertEquals(glossary.getId().toString(), glossaryIdNode.asText());
});
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@
import static org.openmetadata.csv.CsvUtil.addTermRelations;
import static org.openmetadata.service.Entity.GLOSSARY;
import static org.openmetadata.service.Entity.GLOSSARY_TERM;
import static org.openmetadata.service.search.SearchClient.GLOSSARY_TERM_SEARCH_INDEX;
import static org.openmetadata.service.search.SearchClient.GLOBAL_SEARCH_ALIAS;
import static org.openmetadata.service.search.SearchConstants.TAGS_FQN;
import static org.openmetadata.service.util.EntityUtil.compareTagLabel;

import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -566,46 +566,38 @@ private String reviewerOwnerReferencesToRecord(List<EntityReference> owners) {
}

private void updateAssetIndexes(Glossary original, Glossary updated) {
// Update ES indexes of entity tagged with the glossary term and its children terms to reflect
// its latest value.
GlossaryTermRepository repository =
(GlossaryTermRepository) Entity.getEntityRepository(GLOSSARY_TERM);
Set<String> targetFQNHashesFromDb =
new HashSet<>(
daoCollection
.tagUsageDAO()
.getTargetFQNHashForTagPrefix(updated.getFullyQualifiedName()));
List<GlossaryTerm> childTerms = getAllTerms(updated);

for (GlossaryTerm child : childTerms) {
targetFQNHashesFromDb.addAll( // for each child term find the targetFQNHashes of assets
daoCollection.tagUsageDAO().getTargetFQNHashForTag(child.getFullyQualifiedName()));
}

// List of entity references tagged with the glossary term
Map<String, EntityReference> targetFQNFromES =
repository.getGlossaryUsageFromES(
original.getFullyQualifiedName(), targetFQNHashesFromDb.size(), false);
List<EntityReference> childrenTerms =
searchRepository.getEntitiesContainingFQNFromES(
original.getFullyQualifiedName(),
getTermCount(updated),
GLOSSARY_TERM_SEARCH_INDEX); // get old value of children term from ES
for (EntityReference child : childrenTerms) {
targetFQNFromES.putAll( // List of entity references tagged with the children term
repository.getGlossaryUsageFromES(
child.getFullyQualifiedName(), targetFQNHashesFromDb.size(), false));
searchRepository.updateEntity(child); // update es index of child term
searchRepository.getSearchClient().reindexAcrossIndices("tags.tagFQN", child);
}

searchRepository.updateEntityIndex(original); // update es index of child term
searchRepository
.getSearchClient()
.reindexAcrossIndices("fullyQualifiedName", original.getEntityReference());
searchRepository
.getSearchClient()
.reindexAcrossIndices("glossary.name", original.getEntityReference());
String oldFqn = original.getFullyQualifiedName();
String newFqn = updated.getFullyQualifiedName();

// Re-index the glossary and all nested child terms from the renamed DB rows so each doc's own
// FQN and the glossary/parent denorm reflect the new name. Drained on the request thread
// post-commit = read-your-write, unlike the previous fire-and-forget reindexAcrossIndices that
// raced the commit and left child terms stale. Rebuilding from the authoritative rows is
// boundary-safe — getAllTerms matches on fixed-width fqnHash segments, where an ES
// prefix-rewrite over the raw FQN would also hit sibling glossaries sharing a name prefix
// (e.g. "Finance" vs "FinanceReports") and can't refresh glossary.name/fullyQualifiedName at
// all. The child terms go out as one bulk request, not N individual ES round-trips.
searchRepository.updateEntity(updated.getEntityReference());
searchRepository.deferIfFlushScopeActive(
() ->
searchRepository.updateEntitiesByReference(
getAllTerms(updated).stream().map(GlossaryTerm::getEntityReference).toList()),
"updateEntitiesByReference",
updated.getId().toString(),
newFqn,
GLOSSARY_TERM);

// Rewrite tags.tagFQN on every asset tagged with this glossary's terms in one synchronous
// prefix update-by-query (refresh=true) — the same in-line mechanism GlossaryTerm rename uses.
searchRepository.deferIfFlushScopeActive(
() ->
searchRepository
.getSearchClient()
.updateGlossaryTermByFqnPrefix(GLOBAL_SEARCH_ALIAS, oldFqn, newFqn, TAGS_FQN),
"updateGlossaryTermByFqnPrefix",
null,
newFqn,
GLOSSARY);
}

private void updateEntityLinksOnGlossaryRename(String oldFqn, String newFqn, Glossary updated) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.openmetadata.schema.utils.JsonUtils;
import org.openmetadata.service.Entity;
import org.openmetadata.service.exception.CatalogExceptionMessage;
import org.openmetadata.service.exception.EntityNotFoundException;
import org.openmetadata.service.jdbi3.FeedRepository.TaskWorkflow;
import org.openmetadata.service.jdbi3.FeedRepository.ThreadContext;
import org.openmetadata.service.resources.feeds.MessageParser.EntityLink;
Expand Down Expand Up @@ -194,15 +195,28 @@ private Map<UUID, EntityReference> batchFetchServices(List<MlModel> mlModels) {

for (CollectionDAO.EntityRelationshipObject record : records) {
UUID mlModelId = UUID.fromString(record.getToId());
EntityReference serviceRef =
Entity.getEntityReferenceById(
Entity.MLMODEL_SERVICE, UUID.fromString(record.getFromId()), NON_DELETED);
serviceMap.put(mlModelId, serviceRef);
EntityReference serviceRef = resolveServiceRefLeniently(UUID.fromString(record.getFromId()));
if (serviceRef != null) {
serviceMap.put(mlModelId, serviceRef);
}
}

return serviceMap;
}

private EntityReference resolveServiceRefLeniently(UUID serviceId) {
EntityReference serviceRef = null;
try {
serviceRef = Entity.getEntityReferenceById(Entity.MLMODEL_SERVICE, serviceId, NON_DELETED);
} catch (EntityNotFoundException e) {
// The parent service can be hard-deleted concurrently (e.g. a sibling test's cascade delete)
// between the relationship lookup above and this resolution. The ml model row is mid-cascade
// and about to be removed, so tolerate the missing service rather than failing the read.
LOG.debug("MlModel service {} not found (concurrent delete); skipping", serviceId);
}
return serviceRef;
}

@Override
public void clearFields(MlModel mlModel, Fields fields) {
mlModel.setDashboard(fields.contains("dashboard") ? mlModel.getDashboard() : null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1555,6 +1555,31 @@ public void updateEntity(EntityReference entityReference) {
updateEntityIndex(entity);
}

/**
* Re-read each referenced entity with the same bounded field set {@link
* #updateEntity(EntityReference)} uses, then push one bulk index update. Use this in place of a
* per-entity {@code updateEntity} loop when a cascade (e.g. a glossary rename) must re-index many
* siblings: it keeps the rebuilt-from-DB correctness but collapses N individual ES round-trips
* into a single bulk request. Callers inside a transaction should wrap the call in {@link
* #deferIfFlushScopeActive} so the re-reads see committed rows.
*/
public void updateEntitiesByReference(List<EntityReference> references) {
if (nullOrEmpty(references)) {
return;
}
List<EntityInterface> entities = new ArrayList<>(references.size());
for (EntityReference reference : references) {
EntityRepository<?> entityRepository = Entity.getEntityRepository(reference.getType());
String fields = String.join(",", searchIndexFactory.getReindexFieldsFor(reference.getType()));
EntityInterface entity =
entityRepository.get(
null, reference.getId(), entityRepository.getOnlySupportedFields(fields));
entity.setChangeDescription(null);
entities.add(entity);
}
updateEntitiesIndex(entities);
Comment thread
gitar-bot[bot] marked this conversation as resolved.
}

/**
* Bulk update multiple entities in the search index. This is much more efficient than calling
* updateEntity() for each entity individually.
Expand Down
Loading