Time-to-Live (TTL) for Row State
SDF introduces Time-to-Live (TTL) for row state values, a crucial feature for managing data hygiene and optimizing resource usage in your stateful transformations. With TTL, you can configure your state to automatically purge rows that haven't been updated within a specified time period.
Why Use TTL?
TTL is useful for:
- Data Hygiene: Automatically clean up stale or irrelevant data that no longer needs to be retained.
- Resource Management: Reduce memory and storage footprint by ensuring only active data resides in your state. This is especially beneficial for high-volume, real-time data streams.
- Compliance: Meet data retention policies by ensuring data is automatically removed after a set period.
- Performance: By keeping your state smaller and more focused on current data, you can potentially improve query performance.
How it Works
When you configure TTL for a keyed-state row, SDF monitors the _updated_at
timestamp for each row. If a row is not updated within the defined TTL duration, it is automatically purged from the state. This process is seamless and managed by SDF in the background.
Configuring TTL: An Example
Let's look at an example of how to enable TTL for a keyed-state in your SDF application manifest. In this scenario, we're performing a word count and want to ensure that word counts expire if a word hasn't been seen for a certain period.
Below is a snippet demonstrating a word count application where TTL is applied to the count-per-word state, it can be monitored by using the SDF SQL CLI interface:
# Perform word count on sentences in non windowed transformation
apiVersion: 0.6.0
meta:
name: wordcount-with-ttl
version: 0.1.0
namespace: my-org
# default config
config:
converter: json
consumer:
default_starting_offset:
value: 0
position: Beginning
topics:
sentence:
name: sentence
schema:
value:
type: string
converter: raw
services:
word-count-processing:
sources:
- type: topic
id: sentence
states:
count-per-word:
type: keyed-state
properties:
key:
type: string
value:
type: arrow-row
properties:
occurrences:
type: u32
ttl: 60s
transforms:
- operator: flat-map
run: |
fn split_sequence(sentence: String) -> Result<Vec<String>> {
Ok(sentence.split_whitespace().map(|word| word.chars().filter(|c| c.is_alphanumeric()).collect::<String>()).filter(|word| !word.is_empty()).collect())
}
partition:
assign-key:
run: |
fn assign_key_word(word: String) -> Result<String> {
Ok(word.to_lowercase().chars().filter(|c| c.is_alphanumeric()).collect())
}
update-state:
run: |
fn count_word(word: String) -> Result<()> {
let mut state = count_per_word();
state.occurrences += 1;
state.update()?;
Ok(())
}
Understanding the TTL Configuration
In the example above, the TTL is configured within the states section, specifically for count-per-word:
states:
count-per-word:
type: keyed-state
properties:
key:
type: string
value:
type: arrow-row
properties:
occurrences:
type: u32
ttl: 60s # <-- This is the TTL configuration
In this example, TTL is set as 60s
. 60s specifies a duration of 60 seconds.
If a row in count-per-word is not updated for 60 seconds, it will be automatically removed.
Important Considerations
-
_updated_at Column: TTL relies on the _updated_at column, which is automatically added to all DataFrames. When you
update()
a row in your state, this timestamp is automatically refreshed. -
Data Semantics: Carefully consider the implications of data expiring. Ensure that purging old data aligns with your application's requirements.
-
Choosing a TTL: The optimal TTL duration depends on your specific use case. A very short TTL might lead to data being removed too quickly, while a very long TTL might not provide the intended resource savings.
-
TTL is a powerful feature for maintaining efficient and relevant state in your SDF applications. Experiment with different durations to find what works best for your data streams!