Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,68 @@
import java.io.Serializable;
import java.util.Arrays;

/**
* Serializable, typed view over a Flink {@link ParameterTool} that exposes a Dagger job's
* configuration.
*
* <p>It wraps the parameters supplied at job submission and provides typed accessors (string,
* string array, integer, boolean, long) with optional defaults, so the rest of the codebase can
* read configuration keys without dealing with parsing. Being {@link Serializable}, the
* configuration can be captured by operators and shipped with the Flink job graph.
*/
public class Configuration implements Serializable {
/** The underlying Flink parameter source backing all lookups. */
private final ParameterTool param;

/**
* Wraps the given Flink parameters.
*
* @param param the parameter source holding the job's configuration key/value pairs
*/
public Configuration(ParameterTool param) {
this.param = param;
}

/**
* Returns the underlying Flink parameter source.
*
* @return the wrapped {@link ParameterTool}
*/
public ParameterTool getParam() {
return param;
}

/**
* Returns the value of a configuration key as a string.
*
* @param configKey the configuration key to look up
* @return the configured value, or {@code null} if the key is absent
*/
public String getString(String configKey) {
return param.get(configKey);
}

/**
* Returns the value of a configuration key as a string, or a default when absent.
*
* @param configKey the configuration key to look up
* @param defaultValue the value to return when the key is not present
* @return the configured value, or {@code defaultValue} if the key is absent
*/
public String getString(String configKey, String defaultValue) {
return param.get(configKey, defaultValue);
}

/**
* Returns the value of a configuration key split into a trimmed string array.
*
* <p>The raw value is split on commas with each element trimmed. When the key is missing or its
* value is blank, the supplied default array is returned instead.
*
* @param configKey the configuration key to look up
* @param defaultValue the array to return when the key is absent or blank
* @return the parsed comma-separated values, or {@code defaultValue} if none are present
*/
public String[] getStringArray(String configKey, String[] defaultValue) {
String value = param.get(configKey);
if (value == null || value.trim().isEmpty()) {
Expand All @@ -33,14 +76,35 @@ public String[] getStringArray(String configKey, String[] defaultValue) {
return Arrays.stream(value.split(",")).map(String::trim).toArray(String[]::new);
}

/**
* Returns the value of a configuration key as an integer, or a default when absent.
*
* @param configKey the configuration key to look up
* @param defaultValue the value to return when the key is not present
* @return the configured integer, or {@code defaultValue} if the key is absent
*/
public Integer getInteger(String configKey, Integer defaultValue) {
return param.getInt(configKey, defaultValue);
}

/**
* Returns the value of a configuration key as a boolean, or a default when absent.
*
* @param configKey the configuration key to look up
* @param defaultValue the value to return when the key is not present
* @return the configured boolean, or {@code defaultValue} if the key is absent
*/
public Boolean getBoolean(String configKey, Boolean defaultValue) {
return param.getBoolean(configKey, defaultValue);
}

/**
* Returns the value of a configuration key as a long, or a default when absent.
*
* @param configKey the configuration key to look up
* @param defaultValue the value to return when the key is not present
* @return the configured long, or {@code defaultValue} if the key is absent
*/
public Long getLong(String configKey, Long defaultValue) {
return param.getLong(configKey, defaultValue);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,33 +1,67 @@
package com.gotocompany.dagger.common.core;

/**
* Centralized constants shared across Dagger's common module.
*
* <p>Holds the configuration keys (and their default values) for the Stencil schema-registry
* client, a handful of telemetry/UDF identifiers, stream input-schema configuration keys, and the
* internal column names Dagger appends to every deserialized {@link org.apache.flink.types.Row}.
* This is a constant holder and is not intended to be instantiated.
*/
public class Constants {
/** Configuration key toggling use of the remote Stencil schema registry. */
public static final String SCHEMA_REGISTRY_STENCIL_ENABLE_KEY = "SCHEMA_REGISTRY_STENCIL_ENABLE";
/** Default for {@link #SCHEMA_REGISTRY_STENCIL_ENABLE_KEY}: remote Stencil disabled. */
public static final boolean SCHEMA_REGISTRY_STENCIL_ENABLE_DEFAULT = false;
/** Configuration key holding the comma-separated Stencil registry URLs. */
public static final String SCHEMA_REGISTRY_STENCIL_URLS_KEY = "SCHEMA_REGISTRY_STENCIL_URLS";
/** Default for {@link #SCHEMA_REGISTRY_STENCIL_URLS_KEY}: no URLs configured. */
public static final String SCHEMA_REGISTRY_STENCIL_URLS_DEFAULT = "";
/** Configuration key for the Stencil descriptor fetch timeout, in milliseconds. */
public static final String SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS = "SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS";
/** Default Stencil fetch timeout: {@code 10000} ms (10 seconds). */
public static final Integer SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS_DEFAULT = 10000;
/** Configuration key for the comma-separated {@code name:value} HTTP headers sent to Stencil. */
public static final String SCHEMA_REGISTRY_STENCIL_FETCH_HEADERS_KEY = "SCHEMA_REGISTRY_STENCIL_FETCH_HEADERS";
/** Default for {@link #SCHEMA_REGISTRY_STENCIL_FETCH_HEADERS_KEY}: no headers. */
public static final String SCHEMA_REGISTRY_STENCIL_FETCH_HEADERS_DEFAULT = "";
/** Configuration key toggling automatic refresh of the Stencil descriptor cache. */
public static final String SCHEMA_REGISTRY_STENCIL_CACHE_AUTO_REFRESH_KEY = "SCHEMA_REGISTRY_STENCIL_CACHE_AUTO_REFRESH";
/** Default for {@link #SCHEMA_REGISTRY_STENCIL_CACHE_AUTO_REFRESH_KEY}: auto-refresh disabled. */
public static final boolean SCHEMA_REGISTRY_STENCIL_CACHE_AUTO_REFRESH_DEFAULT = false;
/** Configuration key for the Stencil descriptor cache time-to-live, in milliseconds. */
public static final String SCHEMA_REGISTRY_STENCIL_CACHE_TTL_MS_KEY = "SCHEMA_REGISTRY_STENCIL_CACHE_TTL_MS";
/** Default Stencil cache TTL: {@code 900000} ms (15 minutes). */
public static final Long SCHEMA_REGISTRY_STENCIL_CACHE_TTL_MS_DEFAULT = 900000L;
/** Configuration key selecting the Stencil schema refresh strategy. */
public static final String SCHEMA_REGISTRY_STENCIL_REFRESH_STRATEGY_KEY = "SCHEMA_REGISTRY_STENCIL_REFRESH_STRATEGY";
/** Default Stencil refresh strategy: {@code "LONG_POLLING"}. */
public static final String SCHEMA_REGISTRY_STENCIL_REFRESH_STRATEGY_DEFAULT = "LONG_POLLING";
/** Configuration key for the minimum back-off between Stencil fetch retries, in milliseconds. */
public static final String SCHEMA_REGISTRY_STENCIL_FETCH_BACKOFF_MIN_MS_KEY = "SCHEMA_REGISTRY_STENCIL_FETCH_BACKOFF_MIN_MS";
/** Default minimum Stencil fetch back-off: {@code 60000} ms (1 minute). */
public static final Long SCHEMA_REGISTRY_STENCIL_FETCH_BACKOFF_MIN_MS_DEFAULT = 60000L;
/** Configuration key for the number of times a failed Stencil fetch is retried. */
public static final String SCHEMA_REGISTRY_STENCIL_FETCH_RETRIES_KEY = "SCHEMA_REGISTRY_STENCIL_FETCH_RETRIES";
/** Default number of Stencil fetch retries: {@code 4}. */
public static final Integer SCHEMA_REGISTRY_STENCIL_FETCH_RETRIES_DEFAULT = 4;

/** Metric group key under which user-defined function (UDF) telemetry is reported. */
public static final String UDF_TELEMETRY_GROUP_KEY = "udf";
/** Aspect/field name used when publishing gauge metric values. */
public static final String GAUGE_ASPECT_NAME = "value";

/** Length of the sliding time window used when aggregating/reporting metrics. */
public static final long SLIDING_TIME_WINDOW = 10;
/** Stream configuration key naming the protobuf class for an input stream's schema. */
public static final String STREAM_INPUT_SCHEMA_PROTO_CLASS = "INPUT_SCHEMA_PROTO_CLASS";
/** Stream configuration key naming the table/alias for an input stream. */
public static final String STREAM_INPUT_SCHEMA_TABLE = "INPUT_SCHEMA_TABLE";
/** Configuration key holding the definition of the job's input streams. */
public static final String INPUT_STREAMS = "STREAMS";

/** Name of the internal boolean column Dagger appends to flag whether a record parsed successfully. */
public static final String INTERNAL_VALIDATION_FIELD_KEY = "__internal_validation_field__";
/** Default name of the event-time (rowtime) attribute column appended to every row. */
public static final String ROWTIME = "rowtime";
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,25 @@
* It initializes with StreamExecutionEnvironment, StreamTableEnvironment and Configuration.
*/
public class DaggerContext {
/**
* Logger used to record lifecycle events of the {@link DaggerContext} singleton.
*/
private static final Logger LOGGER = LoggerFactory.getLogger(DaggerContext.class.getName());
/**
* The lazily-created, {@code volatile} singleton instance shared across the Dagger job.
*/
private static volatile DaggerContext daggerContext = null;
/**
* The Flink {@link StreamExecutionEnvironment} that backs the streaming job.
*/
private final StreamExecutionEnvironment executionEnvironment;
/**
* The Flink {@link StreamTableEnvironment} used to evaluate the Table/SQL pipeline.
*/
private final StreamTableEnvironment tableEnvironment;
/**
* The user-supplied {@link Configuration} that parameterizes the Dagger job.
*/
private final Configuration configuration;

/**
Expand Down Expand Up @@ -55,14 +70,29 @@ public static synchronized DaggerContext init(Configuration configuration) {
return daggerContext;
}

/**
* Returns the Flink {@link StreamExecutionEnvironment} held by this context.
*
* @return the stream execution environment
*/
public StreamExecutionEnvironment getExecutionEnvironment() {
return executionEnvironment;
}

/**
* Returns the Flink {@link StreamTableEnvironment} held by this context.
*
* @return the stream table environment
*/
public StreamTableEnvironment getTableEnvironment() {
return tableEnvironment;
}

/**
* Returns the {@link Configuration} that was used to initialize this context.
*
* @return the configuration
*/
public Configuration getConfiguration() {
return configuration;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,42 @@
import java.util.Map;


/**
* Serializable cache of protobuf field positions, used to keep field indices stable when the
* Stencil schema is refreshed at runtime.
*
* <p>On construction it walks a protobuf {@link Descriptors.Descriptor} recursively (descending into
* nested {@code MESSAGE} fields) and records, for every field, its original declared index keyed by
* fully-qualified name, plus the field count (arity) of every message type keyed by fully-qualified
* name. The protobuf and Parquet deserializers consult this cache so that rows are built against the
* field layout captured at startup rather than against a possibly-reordered refreshed descriptor.
*/
public class FieldDescriptorCache implements Serializable {
/** Maps each field's fully-qualified name to its original declared index within its message. */
private final Map<String, Integer> fieldDescriptorIndexMap = new HashMap<>();
/** Maps each message type's fully-qualified name to its original field count (arity). */
private final Map<String, Integer> protoDescriptorArityMap = new HashMap<>();

/**
* Builds a cache by recursively indexing the given descriptor and all nested message types.
*
* @param descriptor the root protobuf descriptor to index
*/
public FieldDescriptorCache(Descriptors.Descriptor descriptor) {

cacheFieldDescriptorMap(descriptor);
}

/**
* Recursively records field indices and message arities for the given descriptor.
*
* <p>Returns early if this message type has already been cached, which both avoids repeated work
* and guards against recursive or self-referential schemas. For every field it stores the
* field's original index, and for nested {@code MESSAGE} fields it recurses into the referenced
* message type.
*
* @param descriptor the protobuf descriptor whose fields should be cached
*/
public void cacheFieldDescriptorMap(Descriptors.Descriptor descriptor) {

if (protoDescriptorArityMap.containsKey(descriptor.getFullName())) {
Expand All @@ -37,18 +64,38 @@ public void cacheFieldDescriptorMap(Descriptors.Descriptor descriptor) {
}
}

/**
* Returns the cached original index of a protobuf field.
*
* @param fieldDescriptor the field whose original index is requested
* @return the field's original declared index captured when it was cached
* @throws IllegalArgumentException if the field is not present in the cache
*/
public int getOriginalFieldIndex(Descriptors.FieldDescriptor fieldDescriptor) {
if (!fieldDescriptorIndexMap.containsKey(fieldDescriptor.getFullName())) {
throw new IllegalArgumentException("The Field Descriptor " + fieldDescriptor.getFullName() + " was not found in the cache");
}
return fieldDescriptorIndexMap.get(fieldDescriptor.getFullName());
}

/**
* Indicates whether a field (by fully-qualified name) is present in the cache.
*
* @param fieldName the fully-qualified field name to look up
* @return {@code true} if the field has been cached, {@code false} otherwise
*/
public boolean containsField(String fieldName) {

return fieldDescriptorIndexMap.containsKey(fieldName);
}

/**
* Returns the cached original field count (arity) of a protobuf message type.
*
* @param descriptor the message descriptor whose original field count is requested
* @return the number of fields the message declared when it was cached
* @throws IllegalArgumentException if the descriptor is not present in the cache
*/
public int getOriginalFieldCount(Descriptors.Descriptor descriptor) {
if (!protoDescriptorArityMap.containsKey(descriptor.getFullName())) {
throw new IllegalArgumentException("The Proto Descriptor " + descriptor.getFullName() + " was not found in the cache");
Expand Down
Loading
Loading