Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 161 additions & 12 deletions src/handlers/http/query_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ const LOG_CONTEXT_ANCHORED_DUPLICATE: &str = "first";
#[serde(rename_all = "camelCase")]
pub struct LogContextRequest {
pub dataset: String,
pub context_window: String,
pub context_window: Option<String>,
pub context_start_time: Option<String>,
pub context_end_time: Option<String>,
pub p_timestamp: String,
pub log: Option<String>,
pub body: Option<String>,
Expand Down Expand Up @@ -145,8 +147,13 @@ pub async fn query_context(
let page_size = normalize_log_context_page_size(context_request.page_size)?;
span.record("page_size", page_size);
let anchor_timestamp = parse_log_context_timestamp(&context_request.p_timestamp)?;
let (context_start_time, context_end_time) =
log_context_bounds(anchor_timestamp, &context_request.context_window)?;
let (context_start_time, context_end_time) = resolve_log_context_bounds(
anchor_timestamp,
context_request.context_window.as_deref(),
context_request.context_start_time.as_deref(),
context_request.context_end_time.as_deref(),
)?;
validate_log_context_anchor_in_bounds(anchor_timestamp, context_start_time, context_end_time)?;
debug!(
page_size,
anchor_timestamp = %anchor_timestamp,
Expand Down Expand Up @@ -290,17 +297,23 @@ fn normalize_log_context_page_size(page_size: Option<u64>) -> Result<u64, QueryE
}

fn parse_log_context_timestamp(raw: &str) -> Result<DateTime<Utc>, QueryError> {
parse_log_context_time_field(raw, "pTimestamp")
}

fn parse_log_context_time_field(raw: &str, field_name: &str) -> Result<DateTime<Utc>, QueryError> {
let raw = raw.trim();
let timestamp = DateTime::parse_from_rfc3339(raw)
.map(|timestamp| timestamp.with_timezone(&Utc))
.or_else(|rfc3339_err| {
parse_log_context_naive_utc_timestamp(raw)
.map(|timestamp| DateTime::from_naive_utc_and_offset(timestamp, Utc))
.map_err(|_| QueryError::CustomError(format!("Invalid pTimestamp: {rfc3339_err}")))
.map_err(|_| {
QueryError::CustomError(format!("Invalid {field_name}: {rfc3339_err}"))
})
})?;

DateTime::from_timestamp_millis(timestamp.timestamp_millis()).ok_or_else(|| {
QueryError::CustomError("pTimestamp is outside the supported range".to_string())
QueryError::CustomError(format!("{field_name} is outside the supported range"))
})
}

Expand All @@ -309,7 +322,33 @@ fn parse_log_context_naive_utc_timestamp(raw: &str) -> Result<NaiveDateTime, chr
.or_else(|_| NaiveDateTime::parse_from_str(raw, "%Y-%m-%d %H:%M:%S%.f"))
}

fn log_context_bounds(
fn resolve_log_context_bounds(
anchor_timestamp: DateTime<Utc>,
context_window: Option<&str>,
context_start_time: Option<&str>,
context_end_time: Option<&str>,
) -> Result<(DateTime<Utc>, DateTime<Utc>), QueryError> {
match (context_window, context_start_time, context_end_time) {
(Some(_), Some(_), _) | (Some(_), _, Some(_)) => Err(QueryError::CustomError(
"Request must include either contextWindow or contextStartTime/contextEndTime, not both"
.to_string(),
)),
(Some(context_window), None, None) => {
log_context_window_bounds(anchor_timestamp, context_window)
}
(None, Some(context_start_time), Some(context_end_time)) => {
log_context_explicit_bounds(context_start_time, context_end_time)
}
(None, Some(_), None) | (None, None, Some(_)) => Err(QueryError::CustomError(
"contextStartTime and contextEndTime must be provided together".to_string(),
)),
(None, None, None) => Err(QueryError::CustomError(
"Request must include either contextWindow or contextStartTime/contextEndTime".to_string(),
)),
}
}

fn log_context_window_bounds(
anchor_timestamp: DateTime<Utc>,
context_window: &str,
) -> Result<(DateTime<Utc>, DateTime<Utc>), QueryError> {
Expand All @@ -333,6 +372,37 @@ fn log_context_bounds(
Ok((start, end))
}

fn log_context_explicit_bounds(
context_start_time: &str,
context_end_time: &str,
) -> Result<(DateTime<Utc>, DateTime<Utc>), QueryError> {
let start = parse_log_context_time_field(context_start_time, "contextStartTime")?;
let end = parse_log_context_time_field(context_end_time, "contextEndTime")?;

if start >= end {
return Err(QueryError::CustomError(
"contextStartTime must be before contextEndTime".to_string(),
));
}

Ok((start, end))
}

fn validate_log_context_anchor_in_bounds(
anchor_timestamp: DateTime<Utc>,
context_start_time: DateTime<Utc>,
context_end_time: DateTime<Utc>,
) -> Result<(), QueryError> {
if anchor_timestamp >= context_start_time && anchor_timestamp < context_end_time {
return Ok(());
}

Err(QueryError::CustomError(
"pTimestamp must be greater than or equal to contextStartTime and less than contextEndTime"
.to_string(),
))
}

fn normalize_log_context_match_fields(
log: &Option<String>,
body: &Option<String>,
Expand Down Expand Up @@ -942,7 +1012,7 @@ fn log_context_cursor_from_record(
}

fn format_log_context_api_time(timestamp: DateTime<Utc>) -> String {
timestamp.to_rfc3339_opts(SecondsFormat::Secs, true)
timestamp.to_rfc3339_opts(SecondsFormat::AutoSi, true)
}

fn timestamp_sql_literal(timestamp: DateTime<Utc>) -> String {
Expand Down Expand Up @@ -1023,16 +1093,93 @@ mod tests {
}

#[test]
fn log_context_bounds_apply_window_and_truncate_to_minute() {
let (start, end) = log_context_bounds(anchor_timestamp(), "1m").unwrap();
fn log_context_explicit_bounds_accept_start_and_end_times() {
let (start, end) =
log_context_explicit_bounds("2026-06-17T10:14:00Z", "2026-06-17T10:16:00Z").unwrap();
assert_eq!(format_log_context_api_time(start), "2026-06-17T10:14:00Z");
assert_eq!(format_log_context_api_time(end), "2026-06-17T10:16:00Z");

let (start, end) = log_context_bounds(anchor_timestamp(), "5s").unwrap();
let (start, end) =
log_context_explicit_bounds("2026-06-17T10:15:42.100Z", "2026-06-17T10:15:42.900Z")
.unwrap();
assert_eq!(
format_log_context_api_time(start),
"2026-06-17T10:15:42.100Z"
);
assert_eq!(format_log_context_api_time(end), "2026-06-17T10:15:42.900Z");

assert!(
log_context_explicit_bounds("2026-06-17T10:16:00Z", "2026-06-17T10:16:00Z").is_err()
);
assert!(
log_context_explicit_bounds("2026-06-17T10:17:00Z", "2026-06-17T10:16:00Z").is_err()
);
}

#[test]
fn log_context_window_bounds_apply_window_and_truncate_to_minute() {
let (start, end) = log_context_window_bounds(anchor_timestamp(), "1m").unwrap();
assert_eq!(format_log_context_api_time(start), "2026-06-17T10:14:00Z");
assert_eq!(format_log_context_api_time(end), "2026-06-17T10:16:00Z");

let (start, end) = log_context_window_bounds(anchor_timestamp(), "5s").unwrap();
assert_eq!(format_log_context_api_time(start), "2026-06-17T10:15:00Z");
assert_eq!(format_log_context_api_time(end), "2026-06-17T10:16:00Z");
}

#[test]
fn log_context_bounds_resolver_accepts_one_mode_only() {
let anchor = anchor_timestamp();
assert!(resolve_log_context_bounds(anchor, Some("1m"), None, None).is_ok());
assert!(
resolve_log_context_bounds(
anchor,
None,
Some("2026-06-17T10:14:00Z"),
Some("2026-06-17T10:16:00Z"),
)
.is_ok()
);
assert!(
resolve_log_context_bounds(anchor, Some("1m"), Some("2026-06-17T10:14:00Z"), None,)
.is_err()
);
assert!(
resolve_log_context_bounds(anchor, None, Some("2026-06-17T10:14:00Z"), None).is_err()
);
assert!(resolve_log_context_bounds(anchor, None, None, None).is_err());
}

#[test]
fn log_context_anchor_must_be_inside_context_bounds() {
let (start, end) =
log_context_explicit_bounds("2026-06-17T10:14:00Z", "2026-06-17T10:16:00Z").unwrap();

validate_log_context_anchor_in_bounds(
parse_log_context_timestamp("2026-06-17T10:14:00Z").unwrap(),
start,
end,
)
.unwrap();
validate_log_context_anchor_in_bounds(anchor_timestamp(), start, end).unwrap();
assert!(
validate_log_context_anchor_in_bounds(
parse_log_context_timestamp("2026-06-17T10:13:59.999Z").unwrap(),
start,
end,
)
.is_err()
);
assert!(
validate_log_context_anchor_in_bounds(
parse_log_context_timestamp("2026-06-17T10:16:00Z").unwrap(),
start,
end,
)
.is_err()
);
}

#[test]
fn log_context_match_fields_accept_exactly_one_anchor_field() {
let schema = schema_with(&[DEFAULT_TIMESTAMP_KEY, "body", "log", "message"]);
Expand Down Expand Up @@ -1130,7 +1277,8 @@ mod tests {
#[test]
fn log_context_anchor_count_sql_only_counts_anchor_duplicates() {
let anchor = anchor_timestamp();
let (start, end) = log_context_bounds(anchor, "1m").unwrap();
let (start, end) =
log_context_explicit_bounds("2026-06-17T10:14:00Z", "2026-06-17T10:16:00Z").unwrap();
let match_fields = vec![LogContextMatchField {
name: "message".to_string(),
value: "alpha".to_string(),
Expand All @@ -1150,7 +1298,8 @@ mod tests {
#[test]
fn log_context_cursor_sql_builds_previous_and_next_pages() {
let anchor = anchor_timestamp();
let (start, end) = log_context_bounds(anchor, "1m").unwrap();
let (start, end) =
log_context_explicit_bounds("2026-06-17T10:14:00Z", "2026-06-17T10:16:00Z").unwrap();
let match_fields = vec![LogContextMatchField {
name: "message".to_string(),
value: "alpha".to_string(),
Expand Down
Loading